Skip to content

Commit dbd6659

Browse files
authored
Support communication with AWS SNS SDK. (#132)
* Support communication with AWS SNS SDK. * Remove files we shouldn't install and make logging better.
1 parent f97d1ed commit dbd6659

24 files changed

+443
-194
lines changed

event_source.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ Gem::Specification.new do |spec|
2727
spec.files =
2828
Dir.chdir(File.expand_path('..', __FILE__)) do
2929
`git ls-files -z`.split("\x0").reject do |f|
30-
f.match(%r{^(test|spec|features)/})
30+
f.match(%r{^(bin|test|spec|features|hugo|docs_assets|log)/})
3131
end
3232
end
3333

lib/event_source.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
require 'event_source/operations/build_message_options'
4646
require 'event_source/operations/build_message'
4747
require 'event_source/boot_registry'
48+
require 'event_source/paginator'
49+
require 'event_source/worker_pool'
4850

4951
# Event source provides ability to compose, publish and subscribe to events
5052
module EventSource
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# frozen_string_literal: true
2+
3+
require "event_source/protocols/arn/contracts/sns_channel_bindings_contract"
4+
5+
module EventSource
6+
module AsyncApi
7+
module Contracts
8+
# Schema and validation rules for protocol bindings to a channel
9+
class ChannelBindingsContract < Contract
10+
params do
11+
optional(:sns).maybe(:hash)
12+
optional(:amqp).maybe(:hash)
13+
end
14+
15+
rule(:sns) do
16+
if key? && value
17+
validation_result = ::EventSource::Protocols::Arn::Contracts::SnsChannelBindingsContract.new.call(value)
18+
if validation_result&.failure?
19+
key.failure(text: 'invalid channel bindings', error: validation_result.errors.to_h)
20+
else
21+
values.data.merge(sns: validation_result.values.to_h)
22+
end
23+
end
24+
end
25+
end
26+
end
27+
end
28+
end

lib/event_source/async_api/contracts/channel_item_contract.rb

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# frozen_string_literal: true
22

3+
require_relative "channel_bindings_contract"
4+
35
module EventSource
46
module AsyncApi
57
module Contracts
@@ -21,7 +23,7 @@ class ChannelItemContract < Contract
2123
optional(:publish).value(:hash)
2224
optional(:description).value(:string)
2325
optional(:parameters).value(Types::HashOrNil)
24-
optional(:bindings).hash { optional(:amqp).maybe(:hash) }
26+
optional(:bindings).value(:hash)
2527
end
2628

2729
rule(:subscribe) do
@@ -45,6 +47,17 @@ class ChannelItemContract < Contract
4547
end
4648
end
4749
end
50+
51+
rule(:bindings) do
52+
if key? && value
53+
validation_result = ChannelBindingsContract.new.call(value)
54+
if validation_result&.failure?
55+
key.failure(text: 'invalid channel bindings', error: validation_result.errors.to_h)
56+
else
57+
values.data.merge(bindings: validation_result.values.to_h)
58+
end
59+
end
60+
end
4861
end
4962
end
5063
end

lib/event_source/logging.rb

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,36 @@ def logger
1717
)
1818

1919
# # only show "info" or higher messages on STDOUT using the Basic layout
20-
::Logging.appenders.stdout(level: :info, layout: layout)
20+
stdout_logger = ::Logging.appenders.stdout(level: :info, layout: layout)
2121

2222
Dir.mkdir('log') unless File.directory?('log')
2323
# # send all log events to the development log (including debug) as JSON
24-
::Logging.appenders.rolling_file(
24+
rf = ::Logging.appenders.rolling_file(
2525
'log/event_source.log',
2626
age: 'daily',
2727
level: EventSource.config.log_level,
2828
keep: 7,
2929
layout: ::Logging.layouts.json
3030
)
3131

32-
logger_instance.add_appenders 'stdout', 'log/event_source.log'
32+
logger_instance.add_appenders stdout_logger, rf
3333
end
3434
logger_instance
3535
end
3636
end
37+
38+
# Logger that pulls the base settings from the event source log.
39+
class InheritedLogger
40+
extend EventSource::Logging
41+
42+
def self.logger_for(klass)
43+
es_base_logger = logger
44+
klass_logger = ::Logging.logger[klass]
45+
if klass_logger.appenders.empty?
46+
klass_logger.additive = false
47+
klass_logger.appenders = es_base_logger.appenders
48+
end
49+
klass_logger
50+
end
51+
end
3752
end

lib/event_source/paginator.rb

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# frozen_string_literal: true
2+
3+
module EventSource
4+
# Abstract paginator for protocols which support it.
5+
class Paginator
6+
attr_reader :state, :continuation_data
7+
8+
def initialize(more, state, continue_data)
9+
@more = more
10+
@state = state
11+
@continuation_data = continue_data
12+
end
13+
14+
def more?
15+
@more
16+
end
17+
18+
def self.continue(state, continue_data)
19+
self.new(true, state, continue_data)
20+
end
21+
22+
def self.finished
23+
self.new(false, nil, nil)
24+
end
25+
end
26+
end

lib/event_source/protocols/arn/arn_channel_proxy.rb

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,24 @@ def initialize(channel_name, channel_item, client, connection_arn)
2121
# settings and configuration
2222
# @return [Object] a protocol-specific proxy for publishing
2323
def add_publish_operation(async_api_channel_item)
24-
raise NotImplementedError, "Service not yet implemented for ARN: #{@connection_arn.service}" unless @connection_arn.sns_service?
24+
if async_api_channel_item.bindings&.key?("sns")
25+
sns_bindings = async_api_channel_item.bindings["sns"]
26+
return SnsSdkPublisherProxy.new(client, async_api_channel_item) if sns_bindings.key?("x-aws-sdk")
27+
end
28+
raise NotImplementedError, "Publish Service not yet implemented for ARN: #{@connection_arn.service}" unless @connection_arn.sns_service?
2529

2630
SnsPublisherProxy.new(client, async_api_channel_item)
2731
end
2832

33+
# Add a subscriber operation.
34+
def add_subscribe_operation(async_api_channel_item)
35+
if async_api_channel_item.bindings&.key?("sns")
36+
sns_bindings = async_api_channel_item.bindings["sns"]
37+
return SnsSdkSubscriberProxy.new(client, async_api_channel_item) if sns_bindings.key?("x-aws-sdk")
38+
end
39+
raise NotImplementedError, "Subscriber Service not yet implemented for ARN: #{@connection_arn.service}"
40+
end
41+
2942
# Called after a publish proxy created. Normally does nothing.
3043
# It's used for example by AMQP to create any exchange to exchange
3144
# bindings.
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# frozen_string_literal: true
2+
3+
module EventSource
4+
module Protocols
5+
module Arn
6+
module Contracts
7+
# Validate parameters for AsyncAPI SNS channel bindings.
8+
class SnsChannelBindingsContract < Dry::Validation::Contract
9+
params do
10+
# At the channel level, we currently only support sdk bindings.
11+
optional(:"x-aws-sdk").maybe(:hash) do
12+
required(:sdk_action).maybe(:string, :included_in? => ["ListPhoneNumbersOptedOut"])
13+
optional(:paginate).maybe(:bool)
14+
end
15+
end
16+
end
17+
end
18+
end
19+
end
20+
end

lib/event_source/protocols/arn/sns_publisher_proxy.rb

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,28 @@ def publish(event)
2727
raise Errors::MissingSmsPhoneError, @publish_info.operationId unless phone
2828
message = symbolized_payload[:message]
2929
raise Errors::MissingSmsMessageError.new(@publish_info.operationId, phone) unless message
30-
@client.publish(
31-
phone_number: phone,
32-
message: message
33-
)
30+
remaining_keys = symbolized_payload.keys - [:message, phone_key.to_sym]
31+
extra_attributes = {}
32+
remaining_keys.each do |k|
33+
v = symbolized_payload[k]
34+
next unless v.is_a?(String)
35+
extra_attributes[k.to_s] = {
36+
data_type: "String",
37+
string_value: v
38+
}
39+
end
40+
if extra_attributes.any?
41+
@client.publish(
42+
phone_number: phone,
43+
message: message,
44+
message_attributes: extra_attributes
45+
)
46+
else
47+
@client.publish(
48+
phone_number: phone,
49+
message: message
50+
)
51+
end
3452
end
3553
end
3654
end
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# frozen_string_literal: true
2+
3+
module EventSource
4+
module Protocols
5+
module Arn
6+
# Create and manage a SNS SDK Proxy
7+
class SnsSdkPublisherProxy
8+
def initialize(client, async_api_channel_item)
9+
@client = client
10+
@channel_item = async_api_channel_item
11+
@sdk_bindings = async_api_channel_item.bindings["sns"]["x-aws-sdk"]
12+
@sdk_m = @sdk_bindings["sdk_action"].underscore
13+
@paginate = [true, "true"].include?(@sdk_bindings["paginate"])
14+
end
15+
16+
def publish(_event)
17+
sub_klass, executable = EventSource::Protocols::Arn.sns_sdk_worker_registry.resolve_worker_for_event(@channel_item.publish.operationId)
18+
worker_pool = EventSource::WorkerPool.instance
19+
# rubocop:disable Lint/RescueException
20+
worker_pool.post do
21+
if @paginate
22+
result = executable.call(nil, @client.send(@sdk_m))
23+
# rubocop:disable Style/WhileUntilModifier
24+
while result.more?
25+
result = executable.call(result.state, @client.send(@sdk_m, result.continuation_data))
26+
end
27+
# rubocop:enable Style/WhileUntilModifier
28+
else
29+
executable.call(@client.send(@sdk_m))
30+
end
31+
rescue Exception => e
32+
worker_logger = ::EventSource::InheritedLogger.logger_for(sub_klass)
33+
worker_logger.error e
34+
end
35+
# rubocop:enable Lint/RescueException
36+
end
37+
end
38+
end
39+
end
40+
end

0 commit comments

Comments
 (0)