Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ endif
APPS = nsqd nsqlookupd nsqadmin nsq_pubsub nsq_to_nsq nsq_to_file nsq_to_http nsq_tail nsq_stat to_nsq
all: $(APPS)

$(BLDDIR)/nsqd: $(wildcard apps/nsqd/*.go nsqd/*.go nsq/*.go internal/*/*.go)
$(BLDDIR)/nsqd: $(wildcard apps/nsqd/*.go nsqd/*.go nsq/*.go internal/*/*.go nsqd/mod/*.go nsqd/mod/*/*.go)
$(BLDDIR)/nsqlookupd: $(wildcard apps/nsqlookupd/*.go nsqlookupd/*.go nsq/*.go internal/*/*.go)
$(BLDDIR)/nsqadmin: $(wildcard apps/nsqadmin/*.go nsqadmin/*.go nsqadmin/templates/*.go internal/*/*.go)
$(BLDDIR)/nsq_pubsub: $(wildcard apps/nsq_pubsub/*.go nsq/*.go internal/*/*.go)
Expand Down
12 changes: 12 additions & 0 deletions apps/nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import (
"github.com/judwhite/go-svc/svc"
"github.com/mreiferson/go-options"
"github.com/nsqio/nsq/internal/app"
"github.com/nsqio/nsq/internal/lg"
"github.com/nsqio/nsq/internal/version"
"github.com/nsqio/nsq/nsqd"
"github.com/nsqio/nsq/nsqd/mod"
)

type tlsRequiredOption int
Expand Down Expand Up @@ -144,6 +146,9 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
flagSet.Int("max-deflate-level", opts.MaxDeflateLevel, "max deflate compression level a client can negotiate (> values == > nsqd CPU usage)")
flagSet.Bool("snappy", opts.SnappyEnabled, "enable snappy feature negotiation (client compression)")

optModulesOptions := app.StringArray{}
flagSet.Var(&optModulesOptions, "mod-opt", "optional module options, of form: --mod-opt={{moduleName}}={{moduleOpt}}={{moduleOptValue}}")

return flagSet
}

Expand Down Expand Up @@ -232,6 +237,13 @@ func (p *program) Start() error {
}
nsqd.Main()

// hook optional modules
err = mod.Init(opts.ModOpt, nsqd)
if err != nil {
nsqd.Logf(lg.ERROR, "Failed initializing an optional module: %s", err)
// abort/exit? (Main() already started goroutines)
}

p.nsqd = nsqd
return nil
}
Expand Down
6 changes: 6 additions & 0 deletions nsqd/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,9 @@ func (n *NSQD) logf(level lg.LogLevel, f string, args ...interface{}) {
opts := n.getOpts()
lg.Logf(opts.Logger, opts.logLevel, level, f, args...)
}

// would like to expose logf to the contrib modules so that contrib can share the
// configuration end user specifies on CLI
func (n *NSQD) Logf(level lg.LogLevel, f string, args ...interface{}) {
n.logf(level, f, args...)
}
28 changes: 28 additions & 0 deletions nsqd/mod/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
## Optional/Contrib Modules

Contrib modules are a way to add functionality to nsqd, in a decoupled way.


The modules currently available are:

- Datadog


### Architecture

Contrib modules are initialized by passing in `--mod-opt=` to nsqd. This may
be provided multiple times. An array of `mod-opt`s are then passed to the
contrib module initializer (during nsqd initialization). Each module is then
passed its options to see if valid options were provided, after which it is
initialized and added to the nsqd waitGroup.


### Datadog

Datadog contrib module, reports nsqd statistics to a datadog daemon. The options
it exposes are:

- `--mod-opt=-dogstatsd-address=<IP:PORT>`
- `--mod-opt=-dogstatsd-interval=<INT_SECONDS>`
- `--mod-opt=-dogstatsd-prefix=<STRING>`

83 changes: 83 additions & 0 deletions nsqd/mod/dogstatsd/datadog_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package dogstatsd

import (
"errors"
"fmt"
"net"
"strings"
"time"
)

type DataDogClient struct {
conn net.Conn
addr string
prefix string
}

type DataDogTag struct {
k string
v string
}

type DataDogTags struct {
tags []*DataDogTag
}

// returns dogstatd compatible string
// "#tag1:value1,tag2:value2
func (ddt *DataDogTags) String() string {
ts := []string{}
for _, tag := range ddt.tags {
ts = append(ts, fmt.Sprintf("%s:%s", tag.k, tag.v))
}
return "#" + strings.Join(ts, ",")
}

func NewDataDogClient(addr string, prefix string) *DataDogClient {
return &DataDogClient{
addr: addr,
prefix: prefix,
}
}

func (c *DataDogClient) String() string {
return c.addr
}

func (c *DataDogClient) CreateSocket() error {
conn, err := net.DialTimeout("udp", c.addr, time.Second)
if err != nil {
return err
}
c.conn = conn
return nil
}

func (c *DataDogClient) Close() error {
return c.conn.Close()
}

func (c *DataDogClient) Incr(stat string, count int64, tags *DataDogTags) error {
return c.send(stat, "%d|c", count, tags)
}

func (c *DataDogClient) Decr(stat string, count int64, tags *DataDogTags) error {
return c.send(stat, "%d|c", -count, tags)
}

func (c *DataDogClient) Timing(stat string, delta int64, tags *DataDogTags) error {
return c.send(stat, "%d|ms", delta, tags)
}

func (c *DataDogClient) Gauge(stat string, value int64, tags *DataDogTags) error {
return c.send(stat, "%d|g", value, tags)
}

func (c *DataDogClient) send(stat string, format string, value int64, tags *DataDogTags) error {
if c.conn == nil {
return errors.New("not connected")
}
format = fmt.Sprintf("%s%s:%s|%s", c.prefix, stat, format, tags.String())
_, err := fmt.Fprintf(c.conn, format, value)
return err
}
59 changes: 59 additions & 0 deletions nsqd/mod/dogstatsd/datadog_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package dogstatsd

import (
"net"
"testing"

"github.com/nsqio/nsq/internal/test"
)

func TestDDTagsStringNoTags(t *testing.T) {
test.Equal(
t,
(&DataDogTags{}).String(),
"#",
)
}

func TestDDTagsStringSingleString(t *testing.T) {
test.Equal(
t,
(&DataDogTags{
tags: []*DataDogTag{
{k: "topic_name", v: "test_topic"},
},
}).String(),
"#topic_name:test_topic",
)
}

func TestDDTagsStringMultipleStrings(t *testing.T) {
test.Equal(
t,
(&DataDogTags{
tags: []*DataDogTag{
{k: "topic_name", v: "test_topic"},
{k: "channel_name", v: "test_channel"},
},
}).String(),
"#topic_name:test_topic,channel_name:test_channel",
)
}

func TestDDCSend(t *testing.T) {
r, w := net.Pipe()
b := make([]byte, len("nsq.topic.depth:100|t|#"))

go func() {
ddc := &DataDogClient{
conn: w,
addr: "test",
prefix: "nsq.",
}
testValue := int64(100)
ddc.send("topic.depth", "%d|t", testValue, &DataDogTags{})
}()

r.Read(b)
test.Equal(t, string(b), "nsq.topic.depth:100|t|#")
}
Loading