diff --git a/internal/pqueue/pqueue.go b/internal/pqueue/pqueue.go index 1e28ed822..21edaf5f6 100644 --- a/internal/pqueue/pqueue.go +++ b/internal/pqueue/pqueue.go @@ -2,11 +2,12 @@ package pqueue import ( "container/heap" + "time" ) type Item struct { Value interface{} - Priority int64 + Priority time.Time Index int } @@ -23,7 +24,7 @@ func (pq PriorityQueue) Len() int { } func (pq PriorityQueue) Less(i, j int) bool { - return pq[i].Priority < pq[j].Priority + return pq[i].Priority.Before(pq[j].Priority) } func (pq PriorityQueue) Swap(i, j int) { @@ -60,14 +61,14 @@ func (pq *PriorityQueue) Pop() interface{} { return item } -func (pq *PriorityQueue) PeekAndShift(max int64) (*Item, int64) { +func (pq *PriorityQueue) PeekAndShift(max time.Time) (*Item, time.Duration) { if pq.Len() == 0 { return nil, 0 } item := (*pq)[0] - if item.Priority > max { - return nil, item.Priority - max + if item.Priority.After(max) { + return nil, item.Priority.Sub(max) } heap.Remove(pq, 0) diff --git a/internal/pqueue/pqueue_test.go b/internal/pqueue/pqueue_test.go index 1eca107ba..946b3261a 100644 --- a/internal/pqueue/pqueue_test.go +++ b/internal/pqueue/pqueue_test.go @@ -8,6 +8,7 @@ import ( "runtime" "sort" "testing" + "time" ) func equal(t *testing.T, act, exp interface{}) { @@ -24,7 +25,7 @@ func TestPriorityQueue(t *testing.T) { pq := New(c) for i := 0; i < c+1; i++ { - heap.Push(&pq, &Item{Value: i, Priority: int64(i)}) + heap.Push(&pq, &Item{Value: i, Priority: time.Unix(int64(i), 0)}) } equal(t, pq.Len(), c+1) equal(t, cap(pq), c*2) @@ -44,7 +45,7 @@ func TestUnsortedInsert(t *testing.T) { for i := 0; i < c; i++ { v := rand.Int() ints = append(ints, v) - heap.Push(&pq, &Item{Value: i, Priority: int64(v)}) + heap.Push(&pq, &Item{Value: i, Priority: time.Unix(int64(v), 0)}) } equal(t, pq.Len(), c) equal(t, cap(pq), c) @@ -52,8 +53,8 @@ func TestUnsortedInsert(t *testing.T) { sort.Ints(ints) for i := 0; i < c; i++ { - item, _ := pq.PeekAndShift(int64(ints[len(ints)-1])) - equal(t, item.Priority, int64(ints[i])) + item, _ := pq.PeekAndShift(time.Unix(int64(ints[len(ints)-1]), 0)) + equal(t, item.Priority, time.Unix(int64(ints[i]), 0)) } } @@ -63,7 +64,7 @@ func TestRemove(t *testing.T) { for i := 0; i < c; i++ { v := rand.Int() - heap.Push(&pq, &Item{Value: "test", Priority: int64(v)}) + heap.Push(&pq, &Item{Value: "test", Priority: time.Unix(int64(v), 0)}) } for i := 0; i < 10; i++ { @@ -73,7 +74,7 @@ func TestRemove(t *testing.T) { lastPriority := heap.Pop(&pq).(*Item).Priority for i := 0; i < (c - 10 - 1); i++ { item := heap.Pop(&pq) - equal(t, lastPriority < item.(*Item).Priority, true) + equal(t, lastPriority.Before(item.(*Item).Priority), true) lastPriority = item.(*Item).Priority } } diff --git a/internal/quantile/quantile.go b/internal/quantile/quantile.go index 4aa01be31..8363c893a 100644 --- a/internal/quantile/quantile.go +++ b/internal/quantile/quantile.go @@ -63,7 +63,7 @@ func (q *Quantile) Result() *Result { return &result } -func (q *Quantile) Insert(msgStartTime int64) { +func (q *Quantile) Insert(msgStartTime time.Time) { q.Lock() now := time.Now() @@ -71,7 +71,7 @@ func (q *Quantile) Insert(msgStartTime int64) { q.moveWindow() } - q.currentStream.Insert(float64(now.UnixNano() - msgStartTime)) + q.currentStream.Insert(float64(now.Sub(msgStartTime).Nanoseconds())) q.Unlock() } diff --git a/nsqd/channel.go b/nsqd/channel.go index 606387267..8a3a1a951 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -340,7 +340,7 @@ func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout ti newTimeout = msg.deliveryTS.Add(c.ctx.nsqd.getOpts().MaxMsgTimeout) } - msg.pri = newTimeout.UnixNano() + msg.pri = newTimeout err = c.pushInFlightMessage(msg) if err != nil { return err @@ -431,7 +431,7 @@ func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout tim now := time.Now() msg.clientID = clientID msg.deliveryTS = now - msg.pri = now.Add(timeout).UnixNano() + msg.pri = now.Add(timeout) err := c.pushInFlightMessage(msg) if err != nil { return err @@ -441,7 +441,7 @@ func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout tim } func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error { - absTs := time.Now().Add(timeout).UnixNano() + absTs := time.Now().Add(timeout) item := &pqueue.Item{Value: msg, Priority: absTs} err := c.pushDeferredMessage(item) if err != nil { @@ -531,7 +531,7 @@ func (c *Channel) addToDeferredPQ(item *pqueue.Item) { c.deferredMutex.Unlock() } -func (c *Channel) processDeferredQueue(t int64) bool { +func (c *Channel) processDeferredQueue(t time.Time) bool { c.exitMutex.RLock() defer c.exitMutex.RUnlock() @@ -562,7 +562,7 @@ exit: return dirty } -func (c *Channel) processInFlightQueue(t int64) bool { +func (c *Channel) processInFlightQueue(t time.Time) bool { c.exitMutex.RLock() defer c.exitMutex.RUnlock() diff --git a/nsqd/guid.go b/nsqd/guid.go index 89ba83e9e..9aaae3e06 100644 --- a/nsqd/guid.go +++ b/nsqd/guid.go @@ -12,6 +12,7 @@ package nsqd import ( "encoding/hex" "errors" + "math/rand" "sync" "time" ) @@ -36,55 +37,22 @@ type guid int64 type guidFactory struct { sync.Mutex - nodeID int64 - sequence int64 - lastTimestamp int64 - lastID guid + nodeID int64 + randng *rand.Rand } func NewGUIDFactory(nodeID int64) *guidFactory { return &guidFactory{ nodeID: nodeID, + randng: rand.New(rand.NewSource(time.Now().UnixNano() ^ nodeID)), } } func (f *guidFactory) NewGUID() (guid, error) { f.Lock() - - // divide by 1048576, giving pseudo-milliseconds - ts := time.Now().UnixNano() >> 20 - - if ts < f.lastTimestamp { - f.Unlock() - return 0, ErrTimeBackwards - } - - if f.lastTimestamp == ts { - f.sequence = (f.sequence + 1) & sequenceMask - if f.sequence == 0 { - f.Unlock() - return 0, ErrSequenceExpired - } - } else { - f.sequence = 0 - } - - f.lastTimestamp = ts - - id := guid(((ts - twepoch) << timestampShift) | - (f.nodeID << nodeIDShift) | - f.sequence) - - if id <= f.lastID { - f.Unlock() - return 0, ErrIDBackwards - } - - f.lastID = id - + id := f.randng.Int63() f.Unlock() - - return id, nil + return guid(id), nil } func (g guid) Hex() MessageID { diff --git a/nsqd/in_flight_pqueue.go b/nsqd/in_flight_pqueue.go index 4ad1b491b..52de1e15b 100644 --- a/nsqd/in_flight_pqueue.go +++ b/nsqd/in_flight_pqueue.go @@ -1,5 +1,7 @@ package nsqd +import "time" + type inFlightPqueue []*Message func newInFlightPqueue(capacity int) inFlightPqueue { @@ -55,14 +57,14 @@ func (pq *inFlightPqueue) Remove(i int) *Message { return x } -func (pq *inFlightPqueue) PeekAndShift(max int64) (*Message, int64) { +func (pq *inFlightPqueue) PeekAndShift(max time.Time) (*Message, time.Duration) { if len(*pq) == 0 { return nil, 0 } x := (*pq)[0] - if x.pri > max { - return nil, x.pri - max + if x.pri.After(max) { + return nil, x.pri.Sub(max) } pq.Pop() @@ -72,7 +74,7 @@ func (pq *inFlightPqueue) PeekAndShift(max int64) (*Message, int64) { func (pq *inFlightPqueue) up(j int) { for { i := (j - 1) / 2 // parent - if i == j || (*pq)[j].pri >= (*pq)[i].pri { + if i == j || (*pq)[j].pri.After((*pq)[i].pri) { break } pq.Swap(i, j) @@ -87,10 +89,10 @@ func (pq *inFlightPqueue) down(i, n int) { break } j := j1 // left child - if j2 := j1 + 1; j2 < n && (*pq)[j1].pri >= (*pq)[j2].pri { + if j2 := j1 + 1; j2 < n && (*pq)[j1].pri.After((*pq)[j2].pri) { j = j2 // = 2*i + 2 // right child } - if (*pq)[j].pri >= (*pq)[i].pri { + if (*pq)[j].pri.After((*pq)[i].pri) { break } pq.Swap(i, j) diff --git a/nsqd/in_flight_pqueue_test.go b/nsqd/in_flight_pqueue_test.go index 1f6572c6a..4dd0c530e 100644 --- a/nsqd/in_flight_pqueue_test.go +++ b/nsqd/in_flight_pqueue_test.go @@ -5,6 +5,7 @@ import ( "math/rand" "sort" "testing" + "time" "github.com/nsqio/nsq/internal/test" ) @@ -14,7 +15,7 @@ func TestPriorityQueue(t *testing.T) { pq := newInFlightPqueue(c) for i := 0; i < c+1; i++ { - pq.Push(&Message{clientID: int64(i), pri: int64(i)}) + pq.Push(&Message{clientID: int64(i), pri: time.Unix(int64(i), 0)}) } test.Equal(t, c+1, len(pq)) test.Equal(t, c*2, cap(pq)) @@ -34,7 +35,7 @@ func TestUnsortedInsert(t *testing.T) { for i := 0; i < c; i++ { v := rand.Int() ints = append(ints, v) - pq.Push(&Message{pri: int64(v)}) + pq.Push(&Message{pri: time.Unix(int64(v), 0)}) } test.Equal(t, c, len(pq)) test.Equal(t, c, cap(pq)) @@ -42,8 +43,8 @@ func TestUnsortedInsert(t *testing.T) { sort.Ints(ints) for i := 0; i < c; i++ { - msg, _ := pq.PeekAndShift(int64(ints[len(ints)-1])) - test.Equal(t, int64(ints[i]), msg.pri) + msg, _ := pq.PeekAndShift(time.Unix(int64(ints[len(ints)-1]), 0)) + test.Equal(t, int64(ints[i]), msg.pri.Unix()) } } @@ -53,8 +54,8 @@ func TestRemove(t *testing.T) { msgs := make(map[MessageID]*Message) for i := 0; i < c; i++ { - m := &Message{pri: int64(rand.Intn(100000000))} - copy(m.ID[:], fmt.Sprintf("%016d", m.pri)) + m := &Message{pri: time.Unix(int64(rand.Intn(100000000)), 0)} + copy(m.ID[:], fmt.Sprintf("%016d", m.pri.Unix())) msgs[m.ID] = m pq.Push(m) } @@ -75,7 +76,7 @@ func TestRemove(t *testing.T) { lastPriority := pq.Pop().pri for i := 0; i < (c - 10 - 1); i++ { msg := pq.Pop() - test.Equal(t, true, lastPriority <= msg.pri) + test.Equal(t, true, lastPriority.Before(msg.pri)) lastPriority = msg.pri } } diff --git a/nsqd/message.go b/nsqd/message.go index 77ee4c79d..f1d4c2082 100644 --- a/nsqd/message.go +++ b/nsqd/message.go @@ -18,13 +18,13 @@ type MessageID [MsgIDLength]byte type Message struct { ID MessageID Body []byte - Timestamp int64 + Timestamp time.Time Attempts uint16 // for in-flight handling deliveryTS time.Time clientID int64 - pri int64 + pri time.Time index int deferred time.Duration } @@ -33,7 +33,7 @@ func NewMessage(id MessageID, body []byte) *Message { return &Message{ ID: id, Body: body, - Timestamp: time.Now().UnixNano(), + Timestamp: time.Now(), } } @@ -41,7 +41,7 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) { var buf [10]byte var total int64 - binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp)) + binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp.UnixNano())) binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts)) n, err := w.Write(buf[:]) @@ -82,7 +82,7 @@ func decodeMessage(b []byte) (*Message, error) { return nil, fmt.Errorf("invalid message buffer size (%d)", len(b)) } - msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8])) + msg.Timestamp = time.Unix(0, int64(binary.BigEndian.Uint64(b[:8]))) msg.Attempts = binary.BigEndian.Uint16(b[8:10]) copy(msg.ID[:], b[10:10+MsgIDLength]) msg.Body = b[10+MsgIDLength:] diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 2a6b4b0b4..769e80976 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -623,7 +623,7 @@ func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, close for { select { case c := <-workCh: - now := time.Now().UnixNano() + now := time.Now() dirty := false if c.processInFlightQueue(now) { dirty = true diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 54cd526a9..770244384 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -180,9 +180,11 @@ func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) { case bytes.Equal(params[0], []byte("REQ")): return p.REQ(client, params) case bytes.Equal(params[0], []byte("PUB")): - return p.PUB(client, params) + a, b := p.PUB(client, params) + return a, b case bytes.Equal(params[0], []byte("MPUB")): - return p.MPUB(client, params) + a, b := p.MPUB(client, params) + return a, b case bytes.Equal(params[0], []byte("DPUB")): return p.DPUB(client, params) case bytes.Equal(params[0], []byte("NOP")): @@ -793,7 +795,6 @@ func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) { if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body") } - if err := p.CheckAuth(client, "PUB", topicName, ""); err != nil { return nil, err } @@ -822,7 +823,6 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) { return nil, protocol.NewFatalClientErr(nil, "E_BAD_TOPIC", fmt.Sprintf("E_BAD_TOPIC MPUB topic name %q is not valid", topicName)) } - if err := p.CheckAuth(client, "MPUB", topicName, ""); err != nil { return nil, err } @@ -846,6 +846,7 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) { messages, err := readMPUB(client.Reader, client.lenSlice, topic, p.ctx.nsqd.getOpts().MaxMsgSize, p.ctx.nsqd.getOpts().MaxBodySize) + if err != nil { return nil, err } @@ -859,7 +860,6 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) { } client.PublishedMessage(topicName, uint64(len(messages))) - return okBytes, nil } @@ -909,7 +909,6 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) { if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "DPUB failed to read message body") } - if err := p.CheckAuth(client, "DPUB", topicName, ""); err != nil { return nil, err } @@ -990,7 +989,6 @@ func readMPUB(r io.Reader, tmp []byte, topic *Topic, maxMessageSize int64, maxBo if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "MPUB failed to read message body") } - messages = append(messages, NewMessage(topic.GenerateID(), msgBody)) } diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index e8f7d1ff6..a1de56e73 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -1468,7 +1468,7 @@ func TestReqTimeoutRange(t *testing.T) { channel.deferredMutex.Unlock() test.NotNil(t, pqItem) - test.Equal(t, true, pqItem.Priority >= minTs) + test.Equal(t, true, pqItem.Priority.UnixNano() >= minTs) } func TestClientAuth(t *testing.T) {