Skip to content

Commit d500767

Browse files
authored
Implement handler removal (#20)
1 parent 8a83848 commit d500767

File tree

9 files changed

+1038
-185
lines changed

9 files changed

+1038
-185
lines changed

conn.go

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,14 @@ func (d *Daemon) handleConn(c net.Conn) {
8080
return
8181
}
8282

83+
case pb.Request_REMOVE_STREAM_HANDLER:
84+
res := d.doRemoveStreamHandler(&req)
85+
err := w.WriteMsg(res)
86+
if err != nil {
87+
log.Debugw("error writing response", "error", err)
88+
return
89+
}
90+
8391
case pb.Request_DHT:
8492
res, ch, cancel := d.doDHT(&req)
8593
err := w.WriteMsg(res)
@@ -271,22 +279,55 @@ func (d *Daemon) doStreamHandler(req *pb.Request) *pb.Response {
271279
}
272280
for _, sp := range req.StreamHandler.Proto {
273281
p := protocol.ID(sp)
274-
_, ok := d.handlers[p]
282+
round_robin, ok := d.handlers[p]
275283
if !ok {
276284
d.handlers[p] = utils.NewRoundRobin()
277-
d.handlers[p].Push(maddr)
285+
d.handlers[p].Append(maddr)
278286
d.host.SetStreamHandler(p, d.handleStream)
279287
} else if !req.StreamHandler.GetBalanced() {
280288
return errorResponseString(fmt.Sprintf("handler for protocol %s already set", p))
281289
} else {
282-
d.handlers[p].Push(maddr)
290+
round_robin.Append(maddr)
283291
}
284292
log.Debugw("set stream handler", "protocol", sp, "to", maddr)
285293
}
286294

287295
return okResponse()
288296
}
289297

298+
func (d *Daemon) doRemoveStreamHandler(req *pb.Request) *pb.Response {
299+
if req.RemoveStreamHandler == nil {
300+
return errorResponseString("Malformed request; missing parameters")
301+
}
302+
303+
d.mx.Lock()
304+
defer d.mx.Unlock()
305+
306+
maddr, err := ma.NewMultiaddrBytes(req.RemoveStreamHandler.Addr)
307+
if err != nil {
308+
return errorResponse(err)
309+
}
310+
for _, sp := range req.RemoveStreamHandler.Proto {
311+
p := protocol.ID(sp)
312+
round_robin, ok := d.handlers[p]
313+
if !ok {
314+
return errorResponseString(fmt.Sprintf("handler for protocol %s does not exist", p))
315+
}
316+
317+
ok = round_robin.Remove(maddr)
318+
if !ok {
319+
return errorResponseString(fmt.Sprintf("handler for protocol %s with maddr %s does not exist", p, maddr.String()))
320+
}
321+
if round_robin.Len() == 0 {
322+
d.host.RemoveStreamHandler(p)
323+
delete(d.handlers, p)
324+
}
325+
log.Debugw("removed stream handler", "protocol", sp, "to", maddr)
326+
}
327+
328+
return okResponse()
329+
}
330+
290331
func (d *Daemon) doListPeers(req *pb.Request) *pb.Response {
291332
conns := d.host.Network().Conns()
292333
peers := make([]*pb.PeerInfo, len(conns))

internal/utils/round_robin.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package utils
22

3+
import (
4+
"reflect"
5+
)
6+
37
type RoundRobin struct {
48
data []interface{}
59
next int
@@ -11,10 +15,32 @@ func (r *RoundRobin) Next() interface{} {
1115
return v
1216
}
1317

14-
func (r *RoundRobin) Push(v interface{}) {
18+
func (r *RoundRobin) Append(v interface{}) {
1519
r.data = append(r.data, v)
1620
}
1721

22+
func (r *RoundRobin) Remove(v interface{}) bool {
23+
found := -1
24+
for index, item := range r.data {
25+
if reflect.DeepEqual(item, v) {
26+
found = index
27+
break
28+
}
29+
}
30+
if found == -1 {
31+
return false
32+
}
33+
34+
r.data = append(r.data[:found], r.data[found+1:]...)
35+
if found < r.next {
36+
r.next--
37+
}
38+
if r.next == r.Len() {
39+
r.next = 0
40+
}
41+
return true
42+
}
43+
1844
func (r *RoundRobin) Len() int {
1945
return len(r.data)
2046
}

p2pclient/streams.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,3 +222,42 @@ func (c *Client) NewStreamHandler(protos []string, handler StreamHandlerFunc, ba
222222

223223
return nil
224224
}
225+
226+
func (c *Client) RemoveStreamHandler(protos []string) error {
227+
raw_control, err := c.newControlConn()
228+
if err != nil {
229+
return err
230+
}
231+
control := &byteReaderConn{raw_control}
232+
defer control.Close()
233+
234+
c.mhandlers.Lock()
235+
defer c.mhandlers.Unlock()
236+
237+
w := ggio.NewDelimitedWriter(control)
238+
req := &pb.Request{
239+
Type: pb.Request_REMOVE_STREAM_HANDLER.Enum(),
240+
RemoveStreamHandler: &pb.RemoveStreamHandlerRequest{
241+
Addr: c.listenMaddr.Bytes(),
242+
Proto: protos,
243+
},
244+
}
245+
if err := w.WriteMsg(req); err != nil {
246+
return err
247+
}
248+
249+
resp := &pb.Response{}
250+
err = readMsgSafe(control, resp)
251+
if err != nil {
252+
return err
253+
}
254+
if err := resp.GetError(); err != nil {
255+
return fmt.Errorf("error from daemon: %s", err.GetMsg())
256+
}
257+
258+
for _, proto := range protos {
259+
delete(c.handlers, proto)
260+
}
261+
262+
return nil
263+
}

p2pclient/unary_handlers.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,31 @@ func (c *Client) AddUnaryHandler(proto protocol.ID, handler UnaryHandlerFunc, ba
153153
return nil
154154
}
155155

156+
func (c *Client) RemoveUnaryHandler(proto protocol.ID) error {
157+
w := c.getPersistentWriter()
158+
159+
callID := uuid.New()
160+
161+
w.WriteMsg(
162+
&pb.PersistentConnectionRequest{
163+
CallId: callID[:],
164+
Message: &pb.PersistentConnectionRequest_RemoveUnaryHandler{
165+
RemoveUnaryHandler: &pb.RemoveUnaryHandlerRequest{
166+
Proto: (*string)(&proto),
167+
},
168+
},
169+
},
170+
)
171+
172+
if _, err := c.getResponse(callID); err != nil {
173+
return err
174+
}
175+
176+
c.unaryHandlers.Delete(proto)
177+
178+
return nil
179+
}
180+
156181
func (c *Client) CallUnaryHandler(
157182
ctx context.Context,
158183
peerID peer.ID,

0 commit comments

Comments
 (0)