Skip to content

Commit 3f59358

Browse files
committed
Use proper types for protocol.
1 parent 7d13ac2 commit 3f59358

File tree

7 files changed

+289
-130
lines changed

7 files changed

+289
-130
lines changed

bake/async/bus.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ def server
55

66
Async do
77
server = Async::Bus::Server.new
8+
things = Array.new
89

910
server.accept do |connection|
10-
binding.irb
11+
connection[:things] = things
1112
end
1213
end
1314
end

lib/async/bus/protocol/connection.rb

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,20 @@ def initialize(peer, id)
4141
@finalized = ::Thread::Queue.new
4242
end
4343

44+
def flush
45+
@packer.flush
46+
end
47+
48+
def write(message)
49+
# $stderr.puts "Writing: #{message.inspect}"
50+
@packer.write(message)
51+
@packer.flush
52+
end
53+
54+
def inspect
55+
"#<#{self.class} #{@objects.size} objects>"
56+
end
57+
4458
attr :objects
4559
attr :proxies
4660

@@ -79,6 +93,10 @@ def bind(name, object)
7993
proc{@finalized << name}
8094
end
8195

96+
def []=(name, object)
97+
@objects[name] = object
98+
end
99+
82100
def [](name)
83101
unless proxy = @proxies[name]
84102
proxy = Proxy.new(self, name)
@@ -92,47 +110,47 @@ def [](name)
92110

93111
def invoke(name, arguments, options = {}, &block)
94112
id = self.next_id
113+
# $stderr.puts "-> Invoking: #{name} #{arguments.inspect} #{options.inspect}", caller
95114

96115
transaction = Transaction.new(self, id)
97116
@transactions[id] = transaction
98117

99118
transaction.invoke(name, arguments, options, &block)
100119
ensure
101120
transaction&.close
121+
# $stderr.puts "<- Invoked: #{name}"
102122
end
103123

104124
def run
105125
finalizer_task = Async do
106126
while name = @finalized.pop
107-
@packer.write([:release, name])
127+
self.write(Release.new(name))
108128
end
109129
end
110130

111131
@unpacker.each do |message|
112-
id = message.shift
132+
# $stderr.puts "Message received: #{message.inspect}"
113133

114-
if id == :release
115-
name = message.shift
116-
@objects.delete(name) if name.is_a?(String)
117-
elsif transaction = @transactions[id]
118-
transaction.received.enqueue(message)
119-
elsif message.first == :invoke
120-
message.shift
121-
122-
transaction = Transaction.new(self, id)
123-
@transactions[id] = transaction
134+
case message
135+
when Release
136+
@objects.delete(message.name)
137+
when Invoke
138+
transaction = Transaction.new(self, message.id)
139+
@transactions[message.id] = transaction
124140

125-
name = message.shift
126-
object = @objects[name]
141+
object = @objects[message.name]
127142

128143
Async do
129-
transaction.accept(object, *message)
144+
# $stderr.puts "-> Accepting: #{message.name} #{message.arguments.inspect} #{message.options.inspect}"
145+
transaction.accept(object, message.arguments, message.options, message.block_given)
130146
ensure
147+
# $stderr.puts "<- Accepted: #{message.name}"
131148
# This will also delete the transaction from @transactions:
132149
transaction.close
133150
end
134151
else
135-
raise "Out of order message: #{message}"
152+
transaction = @transactions[message.id]
153+
transaction.received.enqueue(message)
136154
end
137155
end
138156
ensure

lib/async/bus/protocol/proxy.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ def respond_to?(name, include_all = false)
4646
def respond_to_missing?(name, include_all = false)
4747
@connection.invoke(@name, [:respond_to?, name, include_all])
4848
end
49+
50+
def inspect
51+
"#<proxy #{@name}>"
52+
end
4953
end
5054
end
5155
end

lib/async/bus/protocol/transaction.rb

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,17 @@ def initialize(connection, id)
2222

2323
def read
2424
if @received.empty?
25-
@connection.packer.flush
25+
@connection.flush
2626
end
2727

2828
@received.dequeue
2929
end
3030

31-
def write(*arguments)
31+
def write(message)
32+
# $stderr.puts "Transaction Writing: #{message.inspect}"
33+
3234
if @connection
33-
@connection.packer.write([id, *arguments])
34-
@connection.packer.flush
35+
@connection.write(message)
3536
else
3637
raise RuntimeError, "Transaction is closed!"
3738
end
@@ -50,23 +51,21 @@ def close
5051
def invoke(name, arguments, options, &block)
5152
Console.debug(self) {[name, arguments, options, block]}
5253

53-
self.write(:invoke, name, arguments, options, block_given?)
54+
self.write(Invoke.new(@id, name, arguments, options, block_given?))
5455

5556
while response = self.read
56-
what, result = response
57-
58-
case what
59-
when :error
60-
raise(result)
61-
when :return
62-
return(result)
63-
when :yield
57+
case response
58+
when Return
59+
return response.result
60+
when Yield
6461
begin
65-
result = yield(*result)
66-
self.write(:next, result)
62+
result = yield(*response.result)
63+
self.write(Next.new(@id, result))
6764
rescue => error
68-
self.write(:error, error)
65+
self.write(Error.new(@id, error))
6966
end
67+
when Error
68+
raise(response.result)
7069
end
7170
end
7271

@@ -75,30 +74,31 @@ def invoke(name, arguments, options, &block)
7574
end
7675

7776
# Accept a remote procedure invokation.
78-
def accept(object, arguments, options, block)
79-
if block
77+
def accept(object, arguments, options, block_given)
78+
if block_given
8079
result = object.public_send(*arguments, **options) do |*yield_arguments|
81-
self.write(:yield, yield_arguments)
82-
what, result = self.read
80+
self.write(Yield.new(@id, yield_arguments))
81+
82+
response = self.read
8383

84-
case what
85-
when :next
86-
result
87-
when :close
88-
return
89-
when :error
90-
raise(result)
84+
case response
85+
when Next
86+
response.result
87+
when Error
88+
raise(response.result)
89+
when Close
90+
break
9191
end
9292
end
9393
else
9494
result = object.public_send(*arguments, **options)
9595
end
9696

97-
self.write(:return, result)
97+
self.write(Return.new(@id, result))
9898
rescue UncaughtThrowError => error
99-
self.write(:throw, error.tag)
99+
self.write(Throw.new(@id, error.tag))
100100
rescue => error
101-
self.write(:error, error)
101+
self.write(Error.new(@id, error))
102102
# ensure
103103
# self.write(:close)
104104
end

0 commit comments

Comments
 (0)