Skip to content

Commit 21edcea

Browse files
committed
nsqd: save both old and new metadata filenames
and when loading, if both exist, ensure they match this makes rolling-back possible without losing messages
1 parent 64a5e7c commit 21edcea

File tree

2 files changed

+48
-17
lines changed

2 files changed

+48
-17
lines changed

apps/nsqd/nsqd.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,10 @@ func (p *program) Start() error {
218218
options.Resolve(opts, flagSet, cfg)
219219
nsqd := nsqd.New(opts)
220220

221-
nsqd.LoadMetadata()
221+
err := nsqd.LoadMetadata()
222+
if err != nil {
223+
log.Fatalf("ERROR: %s", err.Error())
224+
}
222225
err := nsqd.PersistMetadata()
223226
if err != nil {
224227
log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())

nsqd/nsqd.go

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package nsqd
22

33
import (
4+
"bytes"
45
"crypto/tls"
56
"crypto/x509"
67
"encoding/json"
@@ -267,32 +268,45 @@ type meta struct {
267268
} `json:"topics"`
268269
}
269270

270-
func (n *NSQD) LoadMetadata() {
271+
func (n *NSQD) LoadMetadata() error {
271272
atomic.StoreInt32(&n.isLoading, 1)
272273
defer atomic.StoreInt32(&n.isLoading, 0)
273274

274275
fn := path.Join(n.getOpts().DataPath, "nsqd.dat")
275-
oldFn := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID)
276+
// old metadata filename with ID, maintained in parallel to enable roll-back
277+
fnID := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID)
276278

277279
data, err := ioutil.ReadFile(fn)
278280
if err != nil {
279281
if !os.IsNotExist(err) {
280-
n.logf("ERROR: failed to read channel metadata from %s - %s", fn, err)
282+
return fmt.Errorf("failed to read channel metadata from %s - %s", fn, err)
281283
}
282-
data, err = ioutil.ReadFile(oldFn)
283-
if err != nil {
284-
if !os.IsNotExist(err) {
285-
n.logf("ERROR: failed to read channel metadata from %s - %s", fn, err)
286-
}
287-
return
284+
}
285+
286+
dataID, errID := ioutil.ReadFile(fnID)
287+
if errID != nil {
288+
if !os.IsNotExist(errID) {
289+
return fmt.Errorf("failed to read channel metadata from %s - %s", fnID, errID)
288290
}
289291
}
290292

293+
if err != nil && errID != nil {
294+
return nil
295+
}
296+
if err == nil && errID == nil {
297+
if bytes.Compare(data, dataID) != 0 {
298+
return fmt.Errorf("metadata in %s and %s do not match (delete one)", fn, fnID)
299+
}
300+
}
301+
if err != nil {
302+
fn = fnID
303+
data = dataID
304+
}
305+
291306
var m meta
292307
err = json.Unmarshal(data, &m)
293308
if err != nil {
294-
n.logf("ERROR: failed to parse metadata - %s", err)
295-
return
309+
return fmt.Errorf("failed to parse metadata in %s - %s", fn, err)
296310
}
297311

298312
for _, t := range m.Topics {
@@ -316,13 +330,15 @@ func (n *NSQD) LoadMetadata() {
316330
}
317331
}
318332
}
333+
return nil
319334
}
320335

321336
func (n *NSQD) PersistMetadata() error {
322337
// persist metadata about what topics/channels we have
323338
// so that upon restart we can get back to the same state
324339
fileName := path.Join(n.getOpts().DataPath, "nsqd.dat")
325-
oldFileName := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID)
340+
// old metadata filename with ID, maintained in parallel to enable roll-back
341+
fileNameID := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID)
326342
n.logf("NSQ: persisting topic/channel metadata to %s", fileName)
327343

328344
js := make(map[string]interface{})
@@ -365,7 +381,19 @@ func (n *NSQD) PersistMetadata() error {
365381
if err != nil {
366382
return err
367383
}
384+
_, err = f.Write(data)
385+
if err != nil {
386+
f.Close()
387+
return err
388+
}
389+
f.Sync()
390+
f.Close()
368391

392+
tmpFileNameID := fmt.Sprintf("%s.%d.tmp", fileNameID, rand.Int())
393+
f, err = os.OpenFile(tmpFileNameID, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
394+
if err != nil {
395+
return err
396+
}
369397
_, err = f.Write(data)
370398
if err != nil {
371399
f.Close()
@@ -378,11 +406,11 @@ func (n *NSQD) PersistMetadata() error {
378406
if err != nil {
379407
return err
380408
}
381-
382-
err = os.Remove(oldFileName)
383-
if err != nil && !os.IsNotExist(err) {
384-
n.logf("NSQ: WARNING: failed to delete old metadata file %s: %s", oldFileName, err)
409+
err = atomicRename(tmpFileNameID, fileNameID)
410+
if err != nil {
411+
return err
385412
}
413+
// technically should fsync DataPath here
386414

387415
return nil
388416
}

0 commit comments

Comments
 (0)