@@ -15,13 +15,8 @@ import (
1515)
1616
1717type Protocol struct {
18- connOpts []amqp.ConnOption
19- sessionOpts []amqp.SessionOption
20- senderLinkOpts []amqp.LinkOption
21- receiverLinkOpts []amqp.LinkOption
22-
2318 // AMQP
24- Client * amqp.Client
19+ Client * amqp.Conn
2520 Session * amqp.Session
2621 ownedClient bool
2722 Node string
@@ -35,54 +30,60 @@ type Protocol struct {
3530}
3631
3732// NewProtocolFromClient creates a new amqp transport.
38- func NewProtocolFromClient (client * amqp.Client , session * amqp.Session , queue string , opts ... Option ) (* Protocol , error ) {
33+ func NewProtocolFromClient (
34+ ctx context.Context ,
35+ client * amqp.Conn ,
36+ session * amqp.Session ,
37+ queue string ,
38+ senderOptions * amqp.SenderOptions ,
39+ receiverOptions * amqp.ReceiverOptions ,
40+ ) (* Protocol , error ) {
3941 t := & Protocol {
40- Node : queue ,
41- senderLinkOpts : []amqp.LinkOption (nil ),
42- receiverLinkOpts : []amqp.LinkOption (nil ),
43- Client : client ,
44- Session : session ,
45- }
46- if err := t .applyOptions (opts ... ); err != nil {
47- return nil , err
42+ Node : queue ,
43+ Client : client ,
44+ Session : session ,
4845 }
4946
50- t .senderLinkOpts = append (t .senderLinkOpts , amqp .LinkTargetAddress (queue ))
51-
5247 // Create a sender
53- amqpSender , err := session .NewSender (t . senderLinkOpts ... )
48+ amqpSender , err := session .NewSender (ctx , queue , senderOptions )
5449 if err != nil {
5550 _ = client .Close ()
5651 _ = session .Close (context .Background ())
5752 return nil , err
5853 }
59- t .Sender = NewSender (amqpSender ).(* sender )
54+ t .Sender = NewSender (amqpSender , & amqp. SendOptions {} ).(* sender )
6055 t .SenderContextDecorators = []func (context.Context ) context.Context {}
6156
62- t .receiverLinkOpts = append (t .receiverLinkOpts , amqp .LinkSourceAddress (t .Node ))
63- amqpReceiver , err := t .Session .NewReceiver (t .receiverLinkOpts ... )
57+ amqpReceiver , err := t .Session .NewReceiver (ctx , t .Node , receiverOptions )
6458 if err != nil {
6559 return nil , err
6660 }
67- t .Receiver = NewReceiver (amqpReceiver ).(* receiver )
61+ t .Receiver = NewReceiver (amqpReceiver , & amqp. ReceiveOptions {} ).(* receiver )
6862 return t , nil
6963}
7064
7165// NewProtocol creates a new amqp transport.
72- func NewProtocol (server , queue string , connOption []amqp.ConnOption , sessionOption []amqp.SessionOption , opts ... Option ) (* Protocol , error ) {
73- client , err := amqp .Dial (server , connOption ... )
66+ func NewProtocol (
67+ ctx context.Context ,
68+ server , queue string ,
69+ connOptions * amqp.ConnOptions ,
70+ sessionOptions * amqp.SessionOptions ,
71+ senderOptions * amqp.SenderOptions ,
72+ receiverOptions * amqp.ReceiverOptions ,
73+ ) (* Protocol , error ) {
74+ client , err := amqp .Dial (ctx , server , connOptions )
7475 if err != nil {
7576 return nil , err
7677 }
7778
7879 // Open a session
79- session , err := client .NewSession (sessionOption ... )
80+ session , err := client .NewSession (ctx , sessionOptions )
8081 if err != nil {
8182 _ = client .Close ()
8283 return nil , err
8384 }
8485
85- p , err := NewProtocolFromClient (client , session , queue , opts ... )
86+ p , err := NewProtocolFromClient (ctx , client , session , queue , senderOptions , receiverOptions )
8687 if err != nil {
8788 return nil , err
8889 }
@@ -92,69 +93,70 @@ func NewProtocol(server, queue string, connOption []amqp.ConnOption, sessionOpti
9293}
9394
9495// NewSenderProtocolFromClient creates a new amqp sender transport.
95- func NewSenderProtocolFromClient (client * amqp.Client , session * amqp.Session , address string , opts ... Option ) (* Protocol , error ) {
96+ func NewSenderProtocolFromClient (
97+ ctx context.Context ,
98+ client * amqp.Conn ,
99+ session * amqp.Session ,
100+ address string ,
101+ senderOptions * amqp.SenderOptions ,
102+ ) (* Protocol , error ) {
96103 t := & Protocol {
97- Node : address ,
98- senderLinkOpts : []amqp.LinkOption (nil ),
99- receiverLinkOpts : []amqp.LinkOption (nil ),
100- Client : client ,
101- Session : session ,
104+ Node : address ,
105+ Client : client ,
106+ Session : session ,
102107 }
103- if err := t .applyOptions (opts ... ); err != nil {
104- return nil , err
105- }
106- t .senderLinkOpts = append (t .senderLinkOpts , amqp .LinkTargetAddress (address ))
108+
107109 // Create a sender
108- amqpSender , err := session .NewSender (t . senderLinkOpts ... )
110+ amqpSender , err := session .NewSender (ctx , address , senderOptions )
109111 if err != nil {
110112 _ = client .Close ()
111113 _ = session .Close (context .Background ())
112114 return nil , err
113115 }
114- t .Sender = NewSender (amqpSender ).(* sender )
116+ t .Sender = NewSender (amqpSender , & amqp. SendOptions {} ).(* sender )
115117 t .SenderContextDecorators = []func (context.Context ) context.Context {}
116118
117119 return t , nil
118120}
119121
120122// NewReceiverProtocolFromClient creates a new receiver amqp transport.
121- func NewReceiverProtocolFromClient (client * amqp.Client , session * amqp.Session , address string , opts ... Option ) (* Protocol , error ) {
123+ func NewReceiverProtocolFromClient (
124+ ctx context.Context ,
125+ client * amqp.Conn ,
126+ session * amqp.Session ,
127+ address string ,
128+ receiverOptions * amqp.ReceiverOptions ,
129+ ) (* Protocol , error ) {
122130 t := & Protocol {
123- Node : address ,
124- senderLinkOpts : []amqp.LinkOption (nil ),
125- receiverLinkOpts : []amqp.LinkOption (nil ),
126- Client : client ,
127- Session : session ,
128- }
129- if err := t .applyOptions (opts ... ); err != nil {
130- return nil , err
131+ Node : address ,
132+ Client : client ,
133+ Session : session ,
131134 }
132135
133136 t .Node = address
134- t .receiverLinkOpts = append (t .receiverLinkOpts , amqp .LinkSourceAddress (address ))
135- amqpReceiver , err := t .Session .NewReceiver (t .receiverLinkOpts ... )
137+ amqpReceiver , err := t .Session .NewReceiver (ctx , address , receiverOptions )
136138 if err != nil {
137139 return nil , err
138140 }
139- t .Receiver = NewReceiver (amqpReceiver ).(* receiver )
141+ t .Receiver = NewReceiver (amqpReceiver , & amqp. ReceiveOptions {} ).(* receiver )
140142 return t , nil
141143}
142144
143145// NewSenderProtocol creates a new sender amqp transport.
144- func NewSenderProtocol (server , address string , connOption [] amqp.ConnOption , sessionOption [] amqp.SessionOption , opts ... Option ) (* Protocol , error ) {
145- client , err := amqp .Dial (server , connOption ... )
146+ func NewSenderProtocol (ctx context. Context , server , address string , connOptions * amqp.ConnOptions , sessionOptions * amqp.SessionOptions , senderOptions * amqp. SenderOptions ) (* Protocol , error ) {
147+ client , err := amqp .Dial (ctx , server , connOptions )
146148 if err != nil {
147149 return nil , err
148150 }
149151
150152 // Open a session
151- session , err := client .NewSession (sessionOption ... )
153+ session , err := client .NewSession (ctx , sessionOptions )
152154 if err != nil {
153155 _ = client .Close ()
154156 return nil , err
155157 }
156158
157- p , err := NewSenderProtocolFromClient (client , session , address , opts ... )
159+ p , err := NewSenderProtocolFromClient (ctx , client , session , address , senderOptions )
158160 if err != nil {
159161 return nil , err
160162 }
@@ -164,20 +166,20 @@ func NewSenderProtocol(server, address string, connOption []amqp.ConnOption, ses
164166}
165167
166168// NewReceiverProtocol creates a new receiver amqp transport.
167- func NewReceiverProtocol (server , address string , connOption [] amqp.ConnOption , sessionOption [] amqp.SessionOption , opts ... Option ) (* Protocol , error ) {
168- client , err := amqp .Dial (server , connOption ... )
169+ func NewReceiverProtocol (ctx context. Context , server , address string , connOptions * amqp.ConnOptions , sessionOptions * amqp.SessionOptions , receiverOptions * amqp. ReceiverOptions ) (* Protocol , error ) {
170+ client , err := amqp .Dial (ctx , server , connOptions )
169171 if err != nil {
170172 return nil , err
171173 }
172174
173175 // Open a session
174- session , err := client .NewSession (sessionOption ... )
176+ session , err := client .NewSession (ctx , sessionOptions )
175177 if err != nil {
176178 _ = client .Close ()
177179 return nil , err
178180 }
179181
180- p , err := NewReceiverProtocolFromClient (client , session , address , opts ... )
182+ p , err := NewReceiverProtocolFromClient (ctx , client , session , address , receiverOptions )
181183
182184 if err != nil {
183185 return nil , err
@@ -187,15 +189,6 @@ func NewReceiverProtocol(server, address string, connOption []amqp.ConnOption, s
187189 return p , nil
188190}
189191
190- func (t * Protocol ) applyOptions (opts ... Option ) error {
191- for _ , fn := range opts {
192- if err := fn (t ); err != nil {
193- return err
194- }
195- }
196- return nil
197- }
198-
199192func (t * Protocol ) Close (ctx context.Context ) (err error ) {
200193 if t .ownedClient {
201194 // Closing the client will close at cascade sender and receiver
0 commit comments