Skip to content

Commit 9d1eb6f

Browse files
stephensearlesmreiferson
authored andcommitted
nsqd: refactor /stats; add gossip state
1 parent 473321b commit 9d1eb6f

File tree

1 file changed

+48
-27
lines changed

1 file changed

+48
-27
lines changed

nsqd/http.go

Lines changed: 48 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/bitly/nsq/internal/http_api"
1818
"github.com/bitly/nsq/internal/protocol"
1919
"github.com/bitly/nsq/internal/version"
20+
"github.com/hashicorp/serf/serf"
2021
)
2122

2223
type httpServer struct {
@@ -179,6 +180,10 @@ func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request) {
179180
if !s.ctx.nsqd.IsHealthy() {
180181
code = 500
181182
}
183+
if s.ctx.nsqd.serf != nil && (s.ctx.nsqd.serf.State() == serf.SerfAlive || len(s.ctx.nsqd.serf.Members()) < 2) {
184+
code = 500
185+
health = "NOK - gossip unhealthy"
186+
}
182187
w.Header().Set("Content-Length", strconv.Itoa(len(health)))
183188
w.WriteHeader(code)
184189
io.WriteString(w, health)
@@ -233,6 +238,10 @@ func (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, e
233238
}
234239

235240
func (s *httpServer) doLookup(req *http.Request) (interface{}, error) {
241+
if s.ctx.nsqd.serf == nil || s.ctx.nsqd.serf.State() != serf.SerfAlive {
242+
return nil, http_api.Err{400, "GOSSIP_NOT_ENABLED"}
243+
}
244+
236245
reqParams, err := http_api.NewReqParams(req)
237246
if err != nil {
238247
return nil, http_api.Err{400, "INVALID_REQUEST"}
@@ -528,8 +537,13 @@ func (s *httpServer) doStats(req *http.Request) (interface{}, error) {
528537
startTime := s.ctx.nsqd.GetStartTime()
529538
uptime := time.Since(startTime)
530539

540+
var serfStats map[string]string
541+
if s.ctx.nsqd.serf != nil {
542+
serfStats = s.ctx.nsqd.serf.Stats()
543+
}
544+
531545
if !jsonFormat {
532-
return s.printStats(stats, health, startTime, uptime), nil
546+
return s.printStats(stats, health, startTime, uptime, serfStats), nil
533547
}
534548

535549
return struct {
@@ -540,56 +554,55 @@ func (s *httpServer) doStats(req *http.Request) (interface{}, error) {
540554
}{version.Binary, health, startTime.Unix(), stats}, nil
541555
}
542556

543-
func (s *httpServer) printStats(stats []TopicStats, health string, startTime time.Time, uptime time.Duration) []byte {
544-
var buf bytes.Buffer
545-
w := &buf
557+
func (s *httpServer) printStats(stats []TopicStats, health string, startTime time.Time, uptime time.Duration, gossip map[string]string) []byte {
558+
w := &bytes.Buffer{}
546559
now := time.Now()
547-
io.WriteString(w, fmt.Sprintf("%s\n", version.String("nsqd")))
548-
io.WriteString(w, fmt.Sprintf("start_time %v\n", startTime.Format(time.RFC3339)))
549-
io.WriteString(w, fmt.Sprintf("uptime %s\n", uptime))
560+
fmt.Fprintf(w, "%s\n", version.String("nsqd"))
561+
fmt.Fprintf(w, "start_time %v\n", startTime.Format(time.RFC3339))
562+
fmt.Fprintf(w, "uptime %s\n", uptime)
550563
if len(stats) == 0 {
551-
io.WriteString(w, "\nNO_TOPICS\n")
552-
return buf.Bytes()
564+
w.WriteString("\nNO_TOPICS\n")
565+
return w.Bytes()
553566
}
554-
io.WriteString(w, fmt.Sprintf("\nHealth: %s\n", health))
567+
fmt.Fprintf(w, "\nHealth: %s\n", health)
555568
for _, t := range stats {
556569
var pausedPrefix string
557570
if t.Paused {
558571
pausedPrefix = "*P "
559572
} else {
560573
pausedPrefix = " "
561574
}
562-
io.WriteString(w, fmt.Sprintf("\n%s[%-15s] depth: %-5d be-depth: %-5d msgs: %-8d e2e%%: %s\n",
575+
fmt.Fprintf(w, "\n%s[%-15s] depth: %-5d be-depth: %-5d msgs: %-8d e2e%%: %s\n",
563576
pausedPrefix,
564577
t.TopicName,
565578
t.Depth,
566579
t.BackendDepth,
567580
t.MessageCount,
568-
t.E2eProcessingLatency))
581+
t.E2eProcessingLatency)
569582
for _, c := range t.Channels {
570583
if c.Paused {
571584
pausedPrefix = " *P "
572585
} else {
573586
pausedPrefix = " "
574587
}
575-
io.WriteString(w,
576-
fmt.Sprintf("%s[%-25s] depth: %-5d be-depth: %-5d inflt: %-4d def: %-4d re-q: %-5d timeout: %-5d msgs: %-8d e2e%%: %s\n",
577-
pausedPrefix,
578-
c.ChannelName,
579-
c.Depth,
580-
c.BackendDepth,
581-
c.InFlightCount,
582-
c.DeferredCount,
583-
c.RequeueCount,
584-
c.TimeoutCount,
585-
c.MessageCount,
586-
c.E2eProcessingLatency))
588+
fmt.Fprintf(w,
589+
"%s[%-25s] depth: %-5d be-depth: %-5d inflt: %-4d def: %-4d re-q: %-5d timeout: %-5d msgs: %-8d e2e%%: %s\n",
590+
pausedPrefix,
591+
c.ChannelName,
592+
c.Depth,
593+
c.BackendDepth,
594+
c.InFlightCount,
595+
c.DeferredCount,
596+
c.RequeueCount,
597+
c.TimeoutCount,
598+
c.MessageCount,
599+
c.E2eProcessingLatency)
587600
for _, client := range c.Clients {
588601
connectTime := time.Unix(client.ConnectTime, 0)
589602
// truncate to the second
590603
duration := time.Duration(int64(now.Sub(connectTime).Seconds())) * time.Second
591604
_, port, _ := net.SplitHostPort(client.RemoteAddress)
592-
io.WriteString(w, fmt.Sprintf(" [%s %-21s] state: %d inflt: %-4d rdy: %-4d fin: %-8d re-q: %-8d msgs: %-8d connected: %s\n",
605+
fmt.Fprintf(w, " [%s %-21s] state: %d inflt: %-4d rdy: %-4d fin: %-8d re-q: %-8d msgs: %-8d connected: %s\n",
593606
client.Version,
594607
fmt.Sprintf("%s:%s", client.Name, port),
595608
client.State,
@@ -599,9 +612,17 @@ func (s *httpServer) printStats(stats []TopicStats, health string, startTime tim
599612
client.RequeueCount,
600613
client.MessageCount,
601614
duration,
602-
))
615+
)
603616
}
604617
}
605618
}
606-
return buf.Bytes()
619+
620+
if gossip != nil {
621+
fmt.Fprintf(w, "\nGossip:\n")
622+
for k, v := range gossip {
623+
fmt.Fprintf(w, " %s: %s\n", k, v)
624+
}
625+
}
626+
627+
return w.Bytes()
607628
}

0 commit comments

Comments
 (0)