@@ -15,13 +15,8 @@ import (
15
15
)
16
16
17
17
type Protocol struct {
18
- connOpts []amqp.ConnOption
19
- sessionOpts []amqp.SessionOption
20
- senderLinkOpts []amqp.LinkOption
21
- receiverLinkOpts []amqp.LinkOption
22
-
23
18
// AMQP
24
- Client * amqp.Client
19
+ Client * amqp.Conn
25
20
Session * amqp.Session
26
21
ownedClient bool
27
22
Node string
@@ -35,54 +30,60 @@ type Protocol struct {
35
30
}
36
31
37
32
// NewProtocolFromClient creates a new amqp transport.
38
- func NewProtocolFromClient (client * amqp.Client , session * amqp.Session , queue string , opts ... Option ) (* Protocol , error ) {
33
+ func NewProtocolFromClient (
34
+ ctx context.Context ,
35
+ client * amqp.Conn ,
36
+ session * amqp.Session ,
37
+ queue string ,
38
+ senderOptions * amqp.SenderOptions ,
39
+ receiverOptions * amqp.ReceiverOptions ,
40
+ ) (* Protocol , error ) {
39
41
t := & Protocol {
40
- Node : queue ,
41
- senderLinkOpts : []amqp.LinkOption (nil ),
42
- receiverLinkOpts : []amqp.LinkOption (nil ),
43
- Client : client ,
44
- Session : session ,
45
- }
46
- if err := t .applyOptions (opts ... ); err != nil {
47
- return nil , err
42
+ Node : queue ,
43
+ Client : client ,
44
+ Session : session ,
48
45
}
49
46
50
- t .senderLinkOpts = append (t .senderLinkOpts , amqp .LinkTargetAddress (queue ))
51
-
52
47
// Create a sender
53
- amqpSender , err := session .NewSender (t . senderLinkOpts ... )
48
+ amqpSender , err := session .NewSender (ctx , queue , senderOptions )
54
49
if err != nil {
55
50
_ = client .Close ()
56
51
_ = session .Close (context .Background ())
57
52
return nil , err
58
53
}
59
- t .Sender = NewSender (amqpSender ).(* sender )
54
+ t .Sender = NewSender (amqpSender , & amqp. SendOptions {} ).(* sender )
60
55
t .SenderContextDecorators = []func (context.Context ) context.Context {}
61
56
62
- t .receiverLinkOpts = append (t .receiverLinkOpts , amqp .LinkSourceAddress (t .Node ))
63
- amqpReceiver , err := t .Session .NewReceiver (t .receiverLinkOpts ... )
57
+ amqpReceiver , err := t .Session .NewReceiver (ctx , t .Node , receiverOptions )
64
58
if err != nil {
65
59
return nil , err
66
60
}
67
- t .Receiver = NewReceiver (amqpReceiver ).(* receiver )
61
+ t .Receiver = NewReceiver (amqpReceiver , & amqp. ReceiveOptions {} ).(* receiver )
68
62
return t , nil
69
63
}
70
64
71
65
// NewProtocol creates a new amqp transport.
72
- func NewProtocol (server , queue string , connOption []amqp.ConnOption , sessionOption []amqp.SessionOption , opts ... Option ) (* Protocol , error ) {
73
- client , err := amqp .Dial (server , connOption ... )
66
+ func NewProtocol (
67
+ ctx context.Context ,
68
+ server , queue string ,
69
+ connOptions * amqp.ConnOptions ,
70
+ sessionOptions * amqp.SessionOptions ,
71
+ senderOptions * amqp.SenderOptions ,
72
+ receiverOptions * amqp.ReceiverOptions ,
73
+ ) (* Protocol , error ) {
74
+ client , err := amqp .Dial (ctx , server , connOptions )
74
75
if err != nil {
75
76
return nil , err
76
77
}
77
78
78
79
// Open a session
79
- session , err := client .NewSession (sessionOption ... )
80
+ session , err := client .NewSession (ctx , sessionOptions )
80
81
if err != nil {
81
82
_ = client .Close ()
82
83
return nil , err
83
84
}
84
85
85
- p , err := NewProtocolFromClient (client , session , queue , opts ... )
86
+ p , err := NewProtocolFromClient (ctx , client , session , queue , senderOptions , receiverOptions )
86
87
if err != nil {
87
88
return nil , err
88
89
}
@@ -92,69 +93,70 @@ func NewProtocol(server, queue string, connOption []amqp.ConnOption, sessionOpti
92
93
}
93
94
94
95
// NewSenderProtocolFromClient creates a new amqp sender transport.
95
- func NewSenderProtocolFromClient (client * amqp.Client , session * amqp.Session , address string , opts ... Option ) (* Protocol , error ) {
96
+ func NewSenderProtocolFromClient (
97
+ ctx context.Context ,
98
+ client * amqp.Conn ,
99
+ session * amqp.Session ,
100
+ address string ,
101
+ senderOptions * amqp.SenderOptions ,
102
+ ) (* Protocol , error ) {
96
103
t := & Protocol {
97
- Node : address ,
98
- senderLinkOpts : []amqp.LinkOption (nil ),
99
- receiverLinkOpts : []amqp.LinkOption (nil ),
100
- Client : client ,
101
- Session : session ,
104
+ Node : address ,
105
+ Client : client ,
106
+ Session : session ,
102
107
}
103
- if err := t .applyOptions (opts ... ); err != nil {
104
- return nil , err
105
- }
106
- t .senderLinkOpts = append (t .senderLinkOpts , amqp .LinkTargetAddress (address ))
108
+
107
109
// Create a sender
108
- amqpSender , err := session .NewSender (t . senderLinkOpts ... )
110
+ amqpSender , err := session .NewSender (ctx , address , senderOptions )
109
111
if err != nil {
110
112
_ = client .Close ()
111
113
_ = session .Close (context .Background ())
112
114
return nil , err
113
115
}
114
- t .Sender = NewSender (amqpSender ).(* sender )
116
+ t .Sender = NewSender (amqpSender , & amqp. SendOptions {} ).(* sender )
115
117
t .SenderContextDecorators = []func (context.Context ) context.Context {}
116
118
117
119
return t , nil
118
120
}
119
121
120
122
// NewReceiverProtocolFromClient creates a new receiver amqp transport.
121
- func NewReceiverProtocolFromClient (client * amqp.Client , session * amqp.Session , address string , opts ... Option ) (* Protocol , error ) {
123
+ func NewReceiverProtocolFromClient (
124
+ ctx context.Context ,
125
+ client * amqp.Conn ,
126
+ session * amqp.Session ,
127
+ address string ,
128
+ receiverOptions * amqp.ReceiverOptions ,
129
+ ) (* Protocol , error ) {
122
130
t := & Protocol {
123
- Node : address ,
124
- senderLinkOpts : []amqp.LinkOption (nil ),
125
- receiverLinkOpts : []amqp.LinkOption (nil ),
126
- Client : client ,
127
- Session : session ,
128
- }
129
- if err := t .applyOptions (opts ... ); err != nil {
130
- return nil , err
131
+ Node : address ,
132
+ Client : client ,
133
+ Session : session ,
131
134
}
132
135
133
136
t .Node = address
134
- t .receiverLinkOpts = append (t .receiverLinkOpts , amqp .LinkSourceAddress (address ))
135
- amqpReceiver , err := t .Session .NewReceiver (t .receiverLinkOpts ... )
137
+ amqpReceiver , err := t .Session .NewReceiver (ctx , address , receiverOptions )
136
138
if err != nil {
137
139
return nil , err
138
140
}
139
- t .Receiver = NewReceiver (amqpReceiver ).(* receiver )
141
+ t .Receiver = NewReceiver (amqpReceiver , & amqp. ReceiveOptions {} ).(* receiver )
140
142
return t , nil
141
143
}
142
144
143
145
// NewSenderProtocol creates a new sender amqp transport.
144
- func NewSenderProtocol (server , address string , connOption [] amqp.ConnOption , sessionOption [] amqp.SessionOption , opts ... Option ) (* Protocol , error ) {
145
- client , err := amqp .Dial (server , connOption ... )
146
+ func NewSenderProtocol (ctx context. Context , server , address string , connOptions * amqp.ConnOptions , sessionOptions * amqp.SessionOptions , senderOptions * amqp. SenderOptions ) (* Protocol , error ) {
147
+ client , err := amqp .Dial (ctx , server , connOptions )
146
148
if err != nil {
147
149
return nil , err
148
150
}
149
151
150
152
// Open a session
151
- session , err := client .NewSession (sessionOption ... )
153
+ session , err := client .NewSession (ctx , sessionOptions )
152
154
if err != nil {
153
155
_ = client .Close ()
154
156
return nil , err
155
157
}
156
158
157
- p , err := NewSenderProtocolFromClient (client , session , address , opts ... )
159
+ p , err := NewSenderProtocolFromClient (ctx , client , session , address , senderOptions )
158
160
if err != nil {
159
161
return nil , err
160
162
}
@@ -164,20 +166,20 @@ func NewSenderProtocol(server, address string, connOption []amqp.ConnOption, ses
164
166
}
165
167
166
168
// NewReceiverProtocol creates a new receiver amqp transport.
167
- func NewReceiverProtocol (server , address string , connOption [] amqp.ConnOption , sessionOption [] amqp.SessionOption , opts ... Option ) (* Protocol , error ) {
168
- client , err := amqp .Dial (server , connOption ... )
169
+ func NewReceiverProtocol (ctx context. Context , server , address string , connOptions * amqp.ConnOptions , sessionOptions * amqp.SessionOptions , receiverOptions * amqp. ReceiverOptions ) (* Protocol , error ) {
170
+ client , err := amqp .Dial (ctx , server , connOptions )
169
171
if err != nil {
170
172
return nil , err
171
173
}
172
174
173
175
// Open a session
174
- session , err := client .NewSession (sessionOption ... )
176
+ session , err := client .NewSession (ctx , sessionOptions )
175
177
if err != nil {
176
178
_ = client .Close ()
177
179
return nil , err
178
180
}
179
181
180
- p , err := NewReceiverProtocolFromClient (client , session , address , opts ... )
182
+ p , err := NewReceiverProtocolFromClient (ctx , client , session , address , receiverOptions )
181
183
182
184
if err != nil {
183
185
return nil , err
@@ -187,15 +189,6 @@ func NewReceiverProtocol(server, address string, connOption []amqp.ConnOption, s
187
189
return p , nil
188
190
}
189
191
190
- func (t * Protocol ) applyOptions (opts ... Option ) error {
191
- for _ , fn := range opts {
192
- if err := fn (t ); err != nil {
193
- return err
194
- }
195
- }
196
- return nil
197
- }
198
-
199
192
func (t * Protocol ) Close (ctx context.Context ) (err error ) {
200
193
if t .ownedClient {
201
194
// Closing the client will close at cascade sender and receiver
0 commit comments