Skip to content

Commit 6a287dc

Browse files
committed
Support communication with AWS SNS SDK.
1 parent f97d1ed commit 6a287dc

20 files changed

+418
-14
lines changed

lib/event_source.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
require 'event_source/operations/build_message_options'
4646
require 'event_source/operations/build_message'
4747
require 'event_source/boot_registry'
48+
require 'event_source/paginator'
49+
require 'event_source/worker_pool'
4850

4951
# Event source provides ability to compose, publish and subscribe to events
5052
module EventSource
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# frozen_string_literal: true
2+
3+
require "event_source/protocols/arn/contracts/sns_channel_bindings_contract"
4+
5+
module EventSource
6+
module AsyncApi
7+
module Contracts
8+
# Schema and validation rules for protocol bindings to a channel
9+
class ChannelBindingsContract < Contract
10+
params do
11+
optional(:sns).maybe(:hash)
12+
optional(:amqp).maybe(:hash)
13+
end
14+
15+
rule(:sns) do
16+
if key? && value
17+
validation_result = ::EventSource::Protocols::Arn::Contracts::SnsChannelBindingsContract.new.call(value)
18+
if validation_result&.failure?
19+
key.failure(text: 'invalid channel bindings', error: validation_result.errors.to_h)
20+
else
21+
values.data.merge(sns: validation_result.values.to_h)
22+
end
23+
end
24+
end
25+
end
26+
end
27+
end
28+
end

lib/event_source/async_api/contracts/channel_item_contract.rb

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# frozen_string_literal: true
22

3+
require_relative "channel_bindings_contract"
4+
35
module EventSource
46
module AsyncApi
57
module Contracts
@@ -21,7 +23,7 @@ class ChannelItemContract < Contract
2123
optional(:publish).value(:hash)
2224
optional(:description).value(:string)
2325
optional(:parameters).value(Types::HashOrNil)
24-
optional(:bindings).hash { optional(:amqp).maybe(:hash) }
26+
optional(:bindings).value(:hash)
2527
end
2628

2729
rule(:subscribe) do
@@ -45,6 +47,17 @@ class ChannelItemContract < Contract
4547
end
4648
end
4749
end
50+
51+
rule(:bindings) do
52+
if key? && value
53+
validation_result = ChannelBindingsContract.new.call(value)
54+
if validation_result&.failure?
55+
key.failure(text: 'invalid channel bindings', error: validation_result.errors.to_h)
56+
else
57+
values.data.merge(bindings: validation_result.values.to_h)
58+
end
59+
end
60+
end
4861
end
4962
end
5063
end

lib/event_source/connection_manager.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ def find_connection(params)
9090
end
9191
else
9292
connections.detect do |connection|
93+
if params[:protocol] == :arn
94+
logger.info params[:subscribe_operation_name].inspect
95+
logger.info connection.subscribe_operations.keys.inspect
96+
end
9397
connection.subscribe_operation_exists?(
9498
params[:subscribe_operation_name]
9599
)

lib/event_source/paginator.rb

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# frozen_string_literal: true
2+
3+
module EventSource
4+
# Abstract paginator for protocols which support it.
5+
class Paginator
6+
attr_reader :state, :continuation_data
7+
8+
def initialize(more, state, continue_data)
9+
@more = more
10+
@state = state
11+
@continuation_data = continue_data
12+
end
13+
14+
def more?
15+
@more
16+
end
17+
18+
def self.continue(state, continue_data)
19+
self.new(true, state, continue_data)
20+
end
21+
22+
def self.finished
23+
self.new(false, nil, nil)
24+
end
25+
end
26+
end

lib/event_source/protocols/arn/arn_channel_proxy.rb

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,24 @@ def initialize(channel_name, channel_item, client, connection_arn)
2121
# settings and configuration
2222
# @return [Object] a protocol-specific proxy for publishing
2323
def add_publish_operation(async_api_channel_item)
24-
raise NotImplementedError, "Service not yet implemented for ARN: #{@connection_arn.service}" unless @connection_arn.sns_service?
24+
if async_api_channel_item.bindings&.key?("sns")
25+
sns_bindings = async_api_channel_item.bindings["sns"]
26+
return SnsSdkPublisherProxy.new(client, async_api_channel_item) if sns_bindings.key?("x-aws-sdk")
27+
end
28+
raise NotImplementedError, "Publish Service not yet implemented for ARN: #{@connection_arn.service}" unless @connection_arn.sns_service?
2529

2630
SnsPublisherProxy.new(client, async_api_channel_item)
2731
end
2832

33+
# Add a subscriber operation.
34+
def add_subscribe_operation(async_api_channel_item)
35+
if async_api_channel_item.bindings&.key?("sns")
36+
sns_bindings = async_api_channel_item.bindings["sns"]
37+
return SnsSdkSubscriberProxy.new(client, async_api_channel_item) if sns_bindings.key?("x-aws-sdk")
38+
end
39+
raise NotImplementedError, "Subscriber Service not yet implemented for ARN: #{@connection_arn.service}"
40+
end
41+
2942
# Called after a publish proxy created. Normally does nothing.
3043
# It's used for example by AMQP to create any exchange to exchange
3144
# bindings.
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# frozen_string_literal: true
2+
3+
module EventSource
4+
module Protocols
5+
module Arn
6+
module Contracts
7+
# Validate parameters for AsyncAPI SNS channel bindings.
8+
class SnsChannelBindingsContract < Dry::Validation::Contract
9+
params do
10+
# At the channel level, we currently only support sdk bindings.
11+
optional(:"x-aws-sdk").maybe(:hash) do
12+
required(:sdk_action).maybe(:string, :included_in? => ["ListPhoneNumbersOptedOut"])
13+
optional(:paginate).maybe(:bool)
14+
end
15+
end
16+
end
17+
end
18+
end
19+
end
20+
end

lib/event_source/protocols/arn/sns_publisher_proxy.rb

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,28 @@ def publish(event)
2727
raise Errors::MissingSmsPhoneError, @publish_info.operationId unless phone
2828
message = symbolized_payload[:message]
2929
raise Errors::MissingSmsMessageError.new(@publish_info.operationId, phone) unless message
30-
@client.publish(
31-
phone_number: phone,
32-
message: message
33-
)
30+
remaining_keys = symbolized_payload.keys - [:message, phone_key.to_sym]
31+
extra_attributes = {}
32+
remaining_keys.each do |k|
33+
v = symbolized_payload[k]
34+
next unless v.is_a?(String)
35+
extra_attributes[k.to_s] = {
36+
data_type: "String",
37+
string_value: v
38+
}
39+
end
40+
if extra_attributes.any?
41+
@client.publish(
42+
phone_number: phone,
43+
message: message,
44+
message_attributes: extra_attributes
45+
)
46+
else
47+
@client.publish(
48+
phone_number: phone,
49+
message: message
50+
)
51+
end
3452
end
3553
end
3654
end
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# frozen_string_literal: true
2+
3+
module EventSource
4+
module Protocols
5+
module Arn
6+
# Create and manage a SNS SDK Proxy
7+
class SnsSdkPublisherProxy
8+
def initialize(client, async_api_channel_item)
9+
@client = client
10+
@channel_item = async_api_channel_item
11+
@sdk_bindings = async_api_channel_item.bindings["sns"]["x-aws-sdk"]
12+
@sdk_m = @sdk_bindings["sdk_action"].underscore
13+
@paginate = [true, "true"].include?(@sdk_bindings["paginate"])
14+
end
15+
16+
def publish(_event)
17+
executable = EventSource::Protocols::Arn.sns_sdk_worker_registry.resolve_worker_for_event(@channel_item.publish.operationId)
18+
worker_pool = EventSource::WorkerPool.instance
19+
worker_pool.post do
20+
if @paginate
21+
result = executable.call(nil, @client.send(@sdk_m))
22+
# rubocop:disable Style/WhileUntilModifier
23+
while result.more?
24+
result = executable.call(result.state, @client.send(@sdk_m, result.continuation_data))
25+
end
26+
# rubocop:enable Style/WhileUntilModifier
27+
else
28+
executable.call(@client.send(@sdk_m))
29+
end
30+
end
31+
end
32+
end
33+
end
34+
end
35+
end
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# frozen_string_literal: true
2+
3+
module EventSource
4+
module Protocols
5+
module Arn
6+
# Create and manage an SNS SDK Proxy response handler
7+
class SnsSdkSubscriberProxy
8+
def initialize(client, async_api_channel_item)
9+
@client = client
10+
@channel_item = async_api_channel_item
11+
@sdk_bindings = async_api_channel_item.bindings["sns"]["x-aws-sdk"]
12+
end
13+
14+
def subscribe(subscriber_klass, _bindings)
15+
EventSource::Protocols::Arn.sns_sdk_worker_registry.register_subscriber(@channel_item.subscribe.operationId, subscriber_klass)
16+
end
17+
end
18+
end
19+
end
20+
end

0 commit comments

Comments
 (0)