Skip to content

Commit 572c381

Browse files
add support for bi directional streaming
1 parent ae6a56c commit 572c381

File tree

6 files changed

+128
-35
lines changed

6 files changed

+128
-35
lines changed

dgraph/cmd/dgraphimport/import_client.go

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"errors"
1111
"fmt"
1212
"io"
13+
"log"
1314
"math"
1415
"os"
1516
"path/filepath"
@@ -109,6 +110,7 @@ func streamSnapshot(ctx context.Context, dc apiv2.DgraphClient, baseDir string,
109110
Finish: true,
110111
DropData: false,
111112
}
113+
112114
if _, err := dc.UpdateExtSnapshotStreamingState(ctx, req); err != nil {
113115
glog.Errorf("[import] failed to disable drain mode: %v", err)
114116
return fmt.Errorf("failed to disable drain mode: %v", err)
@@ -127,7 +129,6 @@ func streamSnapshotForGroup(ctx context.Context, dc apiv2.DgraphClient, pdir str
127129
if err != nil {
128130
return fmt.Errorf("failed to start external snapshot stream for group %d: %w", groupId, err)
129131
}
130-
defer out.CloseSend()
131132

132133
// Open the BadgerDB instance at the specified directory
133134
opt := badger.DefaultOptions(pdir)
@@ -153,11 +154,44 @@ func streamSnapshotForGroup(ctx context.Context, dc apiv2.DgraphClient, pdir str
153154

154155
// Configure and start the BadgerDB stream
155156
glog.Infof("[import] Starting BadgerDB stream for group [%v]", groupId)
157+
waitc := make(chan struct{})
158+
count := 0
159+
go func() {
160+
for {
161+
res, err := out.Recv()
162+
fmt.Println("res is ---------------in go routine-------->", res)
163+
count++
164+
fmt.Println("count is ---------------in go routine-------->", count)
165+
if err == io.EOF {
166+
log.Println("Stream closed by server")
167+
close(waitc)
168+
169+
return
170+
}
171+
if err != nil {
172+
log.Fatalf("error receiving: %v", err)
173+
}
174+
175+
}
176+
}()
156177

157178
if err := streamBadger(ctx, ps, out, groupId); err != nil {
158179
return fmt.Errorf("badger streaming failed for group [%v]: %v", groupId, err)
159180
}
160181

182+
// resp, err := out.Recv()
183+
// if err != nil {
184+
// glog.Errorf("failed to close the stream for group [%v]: %v", groupId, err)
185+
// return fmt.Errorf("failed to close the stream for group [%v]: %v", groupId, err)
186+
// }
187+
188+
// glog.Info("resp is ---------------after done------------------------", resp)
189+
190+
// glog.Infof("[import] Group [%v]: Received ACK ", groupId)
191+
192+
// out.CloseSend()
193+
<-waitc
194+
161195
return nil
162196
}
163197

@@ -173,7 +207,12 @@ func streamBadger(ctx context.Context, ps *badger.DB, out apiv2.Dgraph_StreamExt
173207
if err := out.Send(&apiv2.StreamExtSnapshotRequest{Pkt: p}); err != nil && !errors.Is(err, io.EOF) {
174208
return fmt.Errorf("failed to send data chunk: %w", err)
175209
}
176-
// TODO: receive a response from the server to confirm the receive
210+
// resp, err := out.Recv()
211+
// if err != nil {
212+
// return fmt.Errorf("failed to recive from stream: %v", err)
213+
// }
214+
// glog.Errorf("recived a package ----------------------", resp)
215+
177216
return nil
178217
}
179218

@@ -190,7 +229,27 @@ func streamBadger(ctx context.Context, ps *badger.DB, out apiv2.Dgraph_StreamExt
190229
return fmt.Errorf("failed to send 'done' signal for group [%d]: %w", groupId, err)
191230
}
192231

232+
out.CloseSend()
233+
234+
// resp, err := out.Recv()
235+
// if err != nil {
236+
// return fmt.Errorf("failed to recive from stream: %v", err)
237+
// }
238+
// glog.Errorf("recived a package ----------------------", resp)
239+
240+
// resp, err := out.Recv()
241+
// if err != nil {
242+
// return fmt.Errorf("failed to recive from stream: %v", err)
243+
// }
244+
245+
// glog.Infof("resp is ---------after done------------------------>", resp.Finish)
246+
// if resp.Finish == false {
247+
// glog.Errorf("Something went wrong with streaming: %v", err)
248+
// return fmt.Errorf("Something went wrong with streaming: %v", err)
249+
// }
193250
// TODO: receive a response from the server to confirm the completion
194251

252+
// out.CloseSend()
253+
195254
return nil
196255
}

dgraph/cmd/dgraphimport/import_test.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -107,15 +107,15 @@ func TestDrainModeAfterStartSnapshotStream(t *testing.T) {
107107

108108
func TestImportApis(t *testing.T) {
109109
tests := []testcase{
110-
{
111-
name: "SingleGroupShutTwoAlphasPerGroup",
112-
numGroups: 1,
113-
targetAlphas: 3,
114-
replicasFactor: 3,
115-
downAlphas: 2,
116-
description: "Single group with 3 alphas, shutdown 2 alphas",
117-
err: "failed to initiate external snapshot stream",
118-
},
110+
// {
111+
// name: "SingleGroupShutTwoAlphasPerGroup",
112+
// numGroups: 1,
113+
// targetAlphas: 3,
114+
// replicasFactor: 3,
115+
// downAlphas: 2,
116+
// description: "Single group with 3 alphas, shutdown 2 alphas",
117+
// err: "failed to initiate external snapshot stream",
118+
// },
119119
// {
120120
// name: "TwoGroupShutTwoAlphasPerGroup",
121121
// numGroups: 2,
@@ -170,15 +170,15 @@ func TestImportApis(t *testing.T) {
170170
// description: "Three groups with 3 alphas each, shutdown 1 alpha per group",
171171
// err: "",
172172
// },
173-
// {
174-
// name: "SingleGroupAllAlphasOnline",
175-
// numGroups: 1,
176-
// targetAlphas: 3,
177-
// replicasFactor: 3,
178-
// downAlphas: 0,
179-
// description: "Single group with multiple alphas, all alphas are online",
180-
// err: "",
181-
// },
173+
{
174+
name: "SingleGroupAllAlphasOnline",
175+
numGroups: 1,
176+
targetAlphas: 3,
177+
replicasFactor: 3,
178+
downAlphas: 0,
179+
description: "Single group with multiple alphas, all alphas are online",
180+
err: "",
181+
},
182182
// {
183183
// name: "TwoGroupAllAlphasOnline",
184184
// numGroups: 2,

protos/pb/pb.pb.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

protos/pb/pb_grpc.pb.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

worker/draft.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,10 @@ func (n *node) applyCommitted(proposal *pb.Proposal, key uint64) error {
723723
}
724724
return nil
725725
case proposal.ExtSnapshotState.Finish && x.IsExtSnapshotStreamingStateTrue():
726+
glog.Infof("================================>")
727+
glog.Infof("================================>", proposal.ExtSnapshotState.Finish)
728+
glog.Infof("============drop====================>", proposal.ExtSnapshotState.DropData)
729+
glog.Infof("============start====================>", proposal.ExtSnapshotState.Start)
726730
lastApplied := n.Applied.LastIndex()
727731
pl := groups().Leader(n.gid)
728732
if pl == nil {

worker/import.go

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,15 @@ func (ps *pubSub) close() {
5757
}
5858

5959
func (ps *pubSub) handlePublisher(ctx context.Context, stream apiv2.Dgraph_StreamExtSnapshotServer) error {
60+
count := 0
6061
for {
6162
select {
6263
case <-ctx.Done():
6364
glog.Info("[import] Context cancelled, stopping receive goroutine.")
6465
return ctx.Err()
6566
default:
6667
msg, err := stream.Recv()
68+
6769
if err != nil {
6870
if !errors.Is(err, io.EOF) {
6971
glog.Errorf("[import] Error receiving from in stream: %v", err)
@@ -72,6 +74,11 @@ func (ps *pubSub) handlePublisher(ctx context.Context, stream apiv2.Dgraph_Strea
7274
return nil
7375
}
7476
ps.publish(msg)
77+
glog.Info("send a package ----------------------", count)
78+
count++
79+
if err := stream.Send(&apiv2.StreamExtSnapshotResponse{Finish: false}); err != nil {
80+
return err
81+
}
7582
}
7683
}
7784

@@ -114,7 +121,7 @@ Loop:
114121
return nil
115122
}
116123

117-
func (ps *pubSub) runLocalSubscriber(ctx context.Context) error {
124+
func (ps *pubSub) runLocalSubscriber(ctx context.Context, stream pb.Worker_StreamExtSnapshotServer) error {
118125
buffer := ps.subscribe()
119126
size := 0
120127
glog.Infof("[import:flush] flushing external snapshot in badger db")
@@ -150,8 +157,16 @@ Loop:
150157
if err := sw.Flush(); err != nil {
151158
return err
152159
}
160+
153161
glog.Infof("[import:flush] successfully flushed data in badger db")
154-
return postStreamProcessing(ctx)
162+
if err := postStreamProcessing(ctx); err != nil {
163+
return err
164+
}
165+
glog.Info("stream is closed")
166+
if err := stream.Send(&apiv2.StreamExtSnapshotResponse{Finish: true}); err != nil {
167+
glog.Errorf("[import] failed to send close on in: %v", err)
168+
}
169+
return nil
155170
}
156171

157172
func ProposeDrain(ctx context.Context, drainMode *apiv2.UpdateExtSnapshotStreamingStateRequest) ([]uint32, error) {
@@ -237,12 +252,19 @@ func pipeTwoStream(in apiv2.Dgraph_StreamExtSnapshotServer, out pb.Worker_Stream
237252
glog.Infof("[import] [forward from group-%v to group-%v] forwarding stream", currentGroup, groupId)
238253

239254
defer func() {
240-
if err := in.Send(&apiv2.StreamExtSnapshotResponse{}); err != nil {
255+
if err := in.Send(&apiv2.StreamExtSnapshotResponse{Finish: true}); err != nil {
241256
glog.Errorf("[import] [forward from group %v to group %v] failed to send close on in"+
242257
" stream for group [%v]: %v", currentGroup, groupId, groupId, err)
243258
}
244259
}()
245-
defer out.CloseSend()
260+
261+
defer func() {
262+
// Wait for ACK from the out stream
263+
if err := out.CloseSend(); err != nil {
264+
glog.Errorf("[import] [forward from group %v to group %v] failed to receive ACK from group [%v]: %v",
265+
currentGroup, groupId, groupId, err)
266+
}
267+
}()
246268

247269
ps := &pubSub{}
248270
eg, egCtx := errgroup.WithContext(in.Context())
@@ -279,7 +301,7 @@ func (w *grpcWorker) UpdateExtSnapshotStreamingState(ctx context.Context,
279301
return nil, errors.New("UpdateExtSnapshotStreamingStateRequest cannot have both Start and Finish set to true")
280302
}
281303

282-
glog.Infof("[import] Applying import mode proposal: %v", req)
304+
glog.Infof("[import] Applying import mode proposal: %v", req.Finish, req.DropData, req.Start)
283305
err := groups().Node.proposeAndWait(ctx, &pb.Proposal{ExtSnapshotState: req})
284306

285307
return &pb.Status{}, err
@@ -363,9 +385,10 @@ func streamInGroup(stream apiv2.Dgraph_StreamExtSnapshotServer, forward bool) er
363385
successfulNodes := make(map[string]bool)
364386

365387
for _, member := range groups().state.Groups[node.gid].Members {
388+
glog.Info("memers are======================>", groups().state.Groups[node.gid].Members)
366389
if member.Addr == node.MyAddr {
367390
eg.Go(func() error {
368-
if err := ps.runLocalSubscriber(errGCtx); err != nil {
391+
if err := ps.runLocalSubscriber(errGCtx, stream); err != nil {
369392
glog.Errorf("[import:flush] failed to run local subscriber: %v", err)
370393
updateNodeStatus(&ps.RWMutex, successfulNodes, member.Addr, false)
371394
return err
@@ -379,6 +402,7 @@ func streamInGroup(stream apiv2.Dgraph_StreamExtSnapshotServer, forward bool) er
379402
// We are not going to return any error from here because we care about the majority of nodes.
380403
// If the majority of nodes are able to receive the data, the remaining ones can catch up later.
381404
if forward {
405+
glog.Infof("[import] Streaming external snapshot to [%v] from [%v] forward [%v]", member.Addr, node.MyAddr)
382406
eg.Go(func() error {
383407
glog.Infof(`[import:forward] streaming external snapshot to [%v] from [%v]`, member.Addr, node.MyAddr)
384408
if member.AmDead {
@@ -398,7 +422,11 @@ func streamInGroup(stream apiv2.Dgraph_StreamExtSnapshotServer, forward bool) er
398422
glog.Errorf("failed to establish stream with peer %v: %v", member.Addr, err)
399423
return nil
400424
}
401-
defer peerStream.CloseSend()
425+
defer func() {
426+
if err := peerStream.CloseSend(); err != nil {
427+
glog.Errorf("[import:forward] failed to receive ACK from [%v]: %v", member.Addr, err)
428+
}
429+
}()
402430

403431
forwardReq := &apiv2.StreamExtSnapshotRequest{Forward: false}
404432
if err := peerStream.Send(forwardReq); err != nil {
@@ -422,18 +450,20 @@ func streamInGroup(stream apiv2.Dgraph_StreamExtSnapshotServer, forward bool) er
422450

423451
eg.Go(func() error {
424452
defer ps.close()
425-
defer func() {
426-
if err := stream.Send(&apiv2.StreamExtSnapshotResponse{}); err != nil {
427-
glog.Errorf("[import] failed to send close on in: %v", err)
428-
}
429-
}()
453+
// defer func() {
454+
455+
// }()
456+
glog.Info("stream is started")
430457
if err := ps.handlePublisher(errGCtx, stream); err != nil {
431458
return err
432459
}
433460

461+
glog.Info("sent false ---------------------------->")
434462
return nil
435463
})
436464

465+
glog.Info("here---------------------")
466+
437467
if err := eg.Wait(); err != nil {
438468
return err
439469
}
@@ -444,7 +474,7 @@ func streamInGroup(stream apiv2.Dgraph_StreamExtSnapshotServer, forward bool) er
444474
glog.Error("[import] Majority of nodes failed to receive data.")
445475
return errors.New("failed to send data to majority of the nodes")
446476
}
447-
477+
glog.Info("done---------------------")
448478
return nil
449479
}
450480

0 commit comments

Comments
 (0)