Skip to content

Commit e9d936e

Browse files
committed
nsqd: new metadata filename without ID
symlink old metadta filename to new when loading, if both exist, ensure they match this makes rollback possible without losing messages (when rolling back forward, some manual intervention is required)
1 parent df2a1d7 commit e9d936e

File tree

3 files changed

+63
-14
lines changed

3 files changed

+63
-14
lines changed

apps/nsqd/nsqd.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,11 @@ func (p *program) Start() error {
218218
options.Resolve(opts, flagSet, cfg)
219219
nsqd := nsqd.New(opts)
220220

221-
nsqd.LoadMetadata()
222-
err := nsqd.PersistMetadata()
221+
err := nsqd.LoadMetadata()
222+
if err != nil {
223+
log.Fatalf("ERROR: %s", err.Error())
224+
}
225+
err = nsqd.PersistMetadata()
223226
if err != nil {
224227
log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
225228
}

nsqd/nsqd.go

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package nsqd
22

33
import (
4+
"bytes"
45
"crypto/tls"
56
"crypto/x509"
67
"encoding/json"
@@ -267,24 +268,51 @@ type meta struct {
267268
} `json:"topics"`
268269
}
269270

270-
func (n *NSQD) LoadMetadata() {
271+
func readOrEmpty(fn string) ([]byte, error) {
272+
data, err := ioutil.ReadFile(fn)
273+
if err != nil {
274+
if !os.IsNotExist(err) {
275+
return nil, fmt.Errorf("failed to read metadata from %s - %s", fn, err)
276+
}
277+
}
278+
return data, nil
279+
}
280+
281+
func (n *NSQD) LoadMetadata() error {
271282
atomic.StoreInt32(&n.isLoading, 1)
272283
defer atomic.StoreInt32(&n.isLoading, 0)
273284

274-
fn := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID)
275-
data, err := ioutil.ReadFile(fn)
285+
fn := path.Join(n.getOpts().DataPath, "nsqd.dat")
286+
// old metadata filename with ID, maintained in parallel to enable roll-back
287+
fnID := path.Join(n.getOpts().DataPath, fmt.Sprintf("nsqd.%d.dat", n.getOpts().ID))
288+
289+
data, err := readOrEmpty(fn)
276290
if err != nil {
277-
if !os.IsNotExist(err) {
278-
n.logf("ERROR: failed to read channel metadata from %s - %s", fn, err)
291+
return err
292+
}
293+
dataID, errID := readOrEmpty(fnID)
294+
if errID != nil {
295+
return errID
296+
}
297+
298+
if data == nil && dataID == nil {
299+
return nil // fresh start
300+
}
301+
if data != nil && dataID != nil {
302+
if bytes.Compare(data, dataID) != 0 {
303+
return fmt.Errorf("metadata in %s and %s do not match (delete one)", fn, fnID)
279304
}
280-
return
305+
}
306+
if data == nil {
307+
// only old metadata file exists, use it
308+
fn = fnID
309+
data = dataID
281310
}
282311

283312
var m meta
284313
err = json.Unmarshal(data, &m)
285314
if err != nil {
286-
n.logf("ERROR: failed to parse metadata - %s", err)
287-
return
315+
return fmt.Errorf("failed to parse metadata in %s - %s", fn, err)
288316
}
289317

290318
for _, t := range m.Topics {
@@ -308,12 +336,15 @@ func (n *NSQD) LoadMetadata() {
308336
}
309337
}
310338
}
339+
return nil
311340
}
312341

313342
func (n *NSQD) PersistMetadata() error {
314-
// persist metadata about what topics/channels we have
315-
// so that upon restart we can get back to the same state
316-
fileName := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID)
343+
// persist metadata about what topics/channels we have, across restarts
344+
fileName := path.Join(n.getOpts().DataPath, "nsqd.dat")
345+
// old metadata filename with ID, maintained in parallel to enable roll-back
346+
fileNameID := path.Join(n.getOpts().DataPath, fmt.Sprintf("nsqd.%d.dat", n.getOpts().ID))
347+
317348
n.logf("NSQ: persisting topic/channel metadata to %s", fileName)
318349

319350
js := make(map[string]interface{})
@@ -370,6 +401,21 @@ func (n *NSQD) PersistMetadata() error {
370401
return err
371402
}
372403

404+
stat, err := os.Lstat(fileNameID)
405+
if err != nil || (stat.Mode()&os.ModeSymlink) == 0 {
406+
tmpFileNameID := fmt.Sprintf("%s.%d.tmp", fileNameID, rand.Int())
407+
408+
err = os.Symlink(fileName, tmpFileNameID)
409+
if err != nil {
410+
return err
411+
}
412+
err = os.Rename(tmpFileNameID, fileNameID)
413+
if err != nil {
414+
return err
415+
}
416+
}
417+
418+
// technically should fsync DataPath here
373419
return nil
374420
}
375421

nsqd/nsqd_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ const (
2424
)
2525

2626
func getMetadata(n *NSQD) (*meta, error) {
27-
fn := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID)
27+
fn := path.Join(n.getOpts().DataPath, "nsqd.dat")
2828
data, err := ioutil.ReadFile(fn)
2929
if err != nil {
3030
return nil, err

0 commit comments

Comments
 (0)