Skip to content

Commit 2b46fc2

Browse files
stephensearlesmreiferson
authored andcommitted
nsqd: refactor /stats; add gossip state
1 parent 65da65c commit 2b46fc2

File tree

1 file changed

+48
-27
lines changed

1 file changed

+48
-27
lines changed

nsqd/http.go

Lines changed: 48 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/bitly/nsq/internal/protocol"
2121
"github.com/bitly/nsq/internal/version"
2222
"github.com/julienschmidt/httprouter"
23+
"github.com/hashicorp/serf/serf"
2324
)
2425

2526
type httpServer struct {
@@ -113,6 +114,10 @@ func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps ht
113114
if !s.ctx.nsqd.IsHealthy() {
114115
code = 500
115116
}
117+
if s.ctx.nsqd.serf != nil && (s.ctx.nsqd.serf.State() == serf.SerfAlive || len(s.ctx.nsqd.serf.Members()) < 2) {
118+
code = 500
119+
health = "NOK - gossip unhealthy"
120+
}
116121
w.Header().Set("Content-Length", strconv.Itoa(len(health)))
117122
w.WriteHeader(code)
118123
io.WriteString(w, health)
@@ -167,6 +172,10 @@ func (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, e
167172
}
168173

169174
func (s *httpServer) doLookup(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
175+
if s.ctx.nsqd.serf == nil || s.ctx.nsqd.serf.State() != serf.SerfAlive {
176+
return nil, http_api.Err{400, "GOSSIP_NOT_ENABLED"}
177+
}
178+
170179
reqParams, err := http_api.NewReqParams(req)
171180
if err != nil {
172181
return nil, http_api.Err{400, "INVALID_REQUEST"}
@@ -484,8 +493,13 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro
484493
startTime := s.ctx.nsqd.GetStartTime()
485494
uptime := time.Since(startTime)
486495

496+
var serfStats map[string]string
497+
if s.ctx.nsqd.serf != nil {
498+
serfStats = s.ctx.nsqd.serf.Stats()
499+
}
500+
487501
if !jsonFormat {
488-
return s.printStats(stats, health, startTime, uptime), nil
502+
return s.printStats(stats, health, startTime, uptime, serfStats), nil
489503
}
490504

491505
return struct {
@@ -496,56 +510,55 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro
496510
}{version.Binary, health, startTime.Unix(), stats}, nil
497511
}
498512

499-
func (s *httpServer) printStats(stats []TopicStats, health string, startTime time.Time, uptime time.Duration) []byte {
500-
var buf bytes.Buffer
501-
w := &buf
513+
func (s *httpServer) printStats(stats []TopicStats, health string, startTime time.Time, uptime time.Duration, gossip map[string]string) []byte {
514+
w := &bytes.Buffer{}
502515
now := time.Now()
503-
io.WriteString(w, fmt.Sprintf("%s\n", version.String("nsqd")))
504-
io.WriteString(w, fmt.Sprintf("start_time %v\n", startTime.Format(time.RFC3339)))
505-
io.WriteString(w, fmt.Sprintf("uptime %s\n", uptime))
516+
fmt.Fprintf(w, "%s\n", version.String("nsqd"))
517+
fmt.Fprintf(w, "start_time %v\n", startTime.Format(time.RFC3339))
518+
fmt.Fprintf(w, "uptime %s\n", uptime)
506519
if len(stats) == 0 {
507-
io.WriteString(w, "\nNO_TOPICS\n")
508-
return buf.Bytes()
520+
w.WriteString("\nNO_TOPICS\n")
521+
return w.Bytes()
509522
}
510-
io.WriteString(w, fmt.Sprintf("\nHealth: %s\n", health))
523+
fmt.Fprintf(w, "\nHealth: %s\n", health)
511524
for _, t := range stats {
512525
var pausedPrefix string
513526
if t.Paused {
514527
pausedPrefix = "*P "
515528
} else {
516529
pausedPrefix = " "
517530
}
518-
io.WriteString(w, fmt.Sprintf("\n%s[%-15s] depth: %-5d be-depth: %-5d msgs: %-8d e2e%%: %s\n",
531+
fmt.Fprintf(w, "\n%s[%-15s] depth: %-5d be-depth: %-5d msgs: %-8d e2e%%: %s\n",
519532
pausedPrefix,
520533
t.TopicName,
521534
t.Depth,
522535
t.BackendDepth,
523536
t.MessageCount,
524-
t.E2eProcessingLatency))
537+
t.E2eProcessingLatency)
525538
for _, c := range t.Channels {
526539
if c.Paused {
527540
pausedPrefix = " *P "
528541
} else {
529542
pausedPrefix = " "
530543
}
531-
io.WriteString(w,
532-
fmt.Sprintf("%s[%-25s] depth: %-5d be-depth: %-5d inflt: %-4d def: %-4d re-q: %-5d timeout: %-5d msgs: %-8d e2e%%: %s\n",
533-
pausedPrefix,
534-
c.ChannelName,
535-
c.Depth,
536-
c.BackendDepth,
537-
c.InFlightCount,
538-
c.DeferredCount,
539-
c.RequeueCount,
540-
c.TimeoutCount,
541-
c.MessageCount,
542-
c.E2eProcessingLatency))
544+
fmt.Fprintf(w,
545+
"%s[%-25s] depth: %-5d be-depth: %-5d inflt: %-4d def: %-4d re-q: %-5d timeout: %-5d msgs: %-8d e2e%%: %s\n",
546+
pausedPrefix,
547+
c.ChannelName,
548+
c.Depth,
549+
c.BackendDepth,
550+
c.InFlightCount,
551+
c.DeferredCount,
552+
c.RequeueCount,
553+
c.TimeoutCount,
554+
c.MessageCount,
555+
c.E2eProcessingLatency)
543556
for _, client := range c.Clients {
544557
connectTime := time.Unix(client.ConnectTime, 0)
545558
// truncate to the second
546559
duration := time.Duration(int64(now.Sub(connectTime).Seconds())) * time.Second
547560
_, port, _ := net.SplitHostPort(client.RemoteAddress)
548-
io.WriteString(w, fmt.Sprintf(" [%s %-21s] state: %d inflt: %-4d rdy: %-4d fin: %-8d re-q: %-8d msgs: %-8d connected: %s\n",
561+
fmt.Fprintf(w, " [%s %-21s] state: %d inflt: %-4d rdy: %-4d fin: %-8d re-q: %-8d msgs: %-8d connected: %s\n",
549562
client.Version,
550563
fmt.Sprintf("%s:%s", client.Name, port),
551564
client.State,
@@ -555,11 +568,19 @@ func (s *httpServer) printStats(stats []TopicStats, health string, startTime tim
555568
client.RequeueCount,
556569
client.MessageCount,
557570
duration,
558-
))
571+
)
559572
}
560573
}
561574
}
562-
return buf.Bytes()
575+
576+
if gossip != nil {
577+
fmt.Fprintf(w, "\nGossip:\n")
578+
for k, v := range gossip {
579+
fmt.Fprintf(w, " %s: %s\n", k, v)
580+
}
581+
}
582+
583+
return w.Bytes()
563584
}
564585

565586
func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {

0 commit comments

Comments
 (0)