|
1 | 1 | import fs from 'fs'
|
2 | 2 |
|
3 | 3 | const INFILE = process.env['INFILE'],
|
4 |
| - OUTFILE = process.env['OUTFILE'] |
| 4 | + OUTFILE = process.env['OUTFILE'], |
| 5 | + REMOTE_SB = process.env['REMOTE_SB'] |
5 | 6 |
|
6 | 7 | export default function scuttlebuttServer(server) {
|
7 | 8 | const primusServer = new (require('primus'))(server, {}),
|
@@ -55,38 +56,70 @@ export default function scuttlebuttServer(server) {
|
55 | 56 |
|
56 | 57 | }, 10000) // max 6/minute
|
57 | 58 |
|
| 59 | + // connect dispatcher to redux |
58 | 60 | connectRedux(gossip)
|
59 | 61 |
|
60 |
| - if (INFILE || OUTFILE) { |
61 |
| - if (INFILE) { |
62 |
| - const gossipWriteSteam = gossip.createWriteStream() |
63 |
| - fs.createReadStream(INFILE).pipe(gossipWriteSteam) |
| 62 | + // read actions from file |
| 63 | + if (INFILE) { |
| 64 | + const gossipWriteSteam = gossip.createWriteStream() |
| 65 | + fs.createReadStream(INFILE).pipe(gossipWriteSteam) |
64 | 66 |
|
65 |
| - console.log('📼 Reading from ' + INFILE) |
66 |
| - } |
| 67 | + console.log('📼 Reading from ' + INFILE) |
| 68 | + } |
67 | 69 |
|
68 |
| - if (OUTFILE) { |
69 |
| - const gossipReadSteam = gossip.createReadStream() |
| 70 | + // stream actions to file -- this will include all actions in INFILE |
| 71 | + if (OUTFILE) { |
| 72 | + const gossipReadSteam = gossip.createReadStream() |
70 | 73 |
|
71 |
| - // For some reason, we're not getting any 'sync' events from Dispatcher, |
72 |
| - // so we'll listen for it in the datastream and write to disk after it |
73 |
| - // <https://github.com/dominictarr/scuttlebutt#persistence> |
| 74 | + // For some reason, we're not getting any 'sync' events from Dispatcher, |
| 75 | + // so we'll listen for it in the datastream and write to disk after it |
| 76 | + // <https://github.com/dominictarr/scuttlebutt#persistence> |
74 | 77 |
|
75 |
| - gossipReadSteam.on('data', (data) => { |
76 |
| - if (data === '"SYNC"\n') { |
77 |
| - console.log('📼 Writing to ' + OUTFILE) |
78 |
| - gossipReadSteam.pipe(fs.createWriteStream(OUTFILE)) |
79 |
| - } |
80 |
| - }) |
81 |
| - |
82 |
| - // this doesn't fire. |
83 |
| - gossip.on('sync', function () { |
84 |
| - console.log('📼 [NATURAL SYNC] Writing to ' + OUTFILE) |
| 78 | + gossipReadSteam.on('data', (data) => { |
| 79 | + if (data === '"SYNC"\n') { |
| 80 | + console.log('📼 Writing to ' + OUTFILE) |
85 | 81 | gossipReadSteam.pipe(fs.createWriteStream(OUTFILE))
|
86 |
| - }) |
| 82 | + } |
| 83 | + }) |
| 84 | + |
| 85 | + // this doesn't fire. |
| 86 | + gossip.on('sync', function () { |
| 87 | + console.log('📼 [NATURAL SYNC] Writing to ' + OUTFILE) |
| 88 | + gossipReadSteam.pipe(fs.createWriteStream(OUTFILE)) |
| 89 | + }) |
| 90 | + |
| 91 | + console.log('📼 Ready to write to ' + OUTFILE) |
| 92 | + } |
| 93 | + |
| 94 | + // connect to remote redux-scuttlebutt instance |
| 95 | + if (REMOTE_SB) { |
| 96 | + var remoteStream = gossip.createStream(), |
| 97 | + remoteClient = new primusServer.Socket(REMOTE_SB) |
87 | 98 |
|
88 |
| - console.log('📼 Ready to write to ' + OUTFILE) |
| 99 | + console.log('💡 connecting to remote '+ REMOTE_SB) |
| 100 | + |
| 101 | + remoteClient.pipe(remoteStream).pipe(remoteClient) |
| 102 | + |
| 103 | + statistics['REMOTE_SB'] = { |
| 104 | + recv: 0, sent: 0, s: 'remote' |
89 | 105 | }
|
| 106 | + |
| 107 | + remoteClient.on('data', function recv(data) { |
| 108 | + // console.log('[io]', 'REMOTE_SB', '<-', data); |
| 109 | + statistics['REMOTE_SB'].recv++ |
| 110 | + statisticsDirty = true |
| 111 | + }); |
| 112 | + |
| 113 | + remoteStream.on('data', (data) => { |
| 114 | + // console.log('[io]', 'REMOTE_SB' || 'origin', '->', data); |
| 115 | + statistics['REMOTE_SB'].sent++ |
| 116 | + statisticsDirty = true |
| 117 | + }) |
| 118 | + |
| 119 | + remoteStream.on('error', (error) => { |
| 120 | + console.log('[io]', 'REMOTE_SB', 'ERROR:', error); |
| 121 | + remoteClient.end('Disconnecting due to error', { reconnect: true }) |
| 122 | + }) |
90 | 123 | }
|
91 | 124 |
|
92 | 125 | primusServer.on('connection', (spark) => {
|
|
0 commit comments