Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ var (
)

type Config struct {
Com CommonConfig `toml:"common"`
Reg RegistryConfig `toml:"registry"`
Usg UsageConfig `toml:"usage"`
LinkStats LinkStasConfig `toml:"linkstats"`
IDC []IDCConfig `toml:"idc"`
Nsq NsqConfig `toml:"nsq"`
Log LogConfig `toml:"log"`
Com CommonConfig `toml:"common"`
Reg RegistryConfig `toml:"registry"`
Usg UsageConfig `toml:"usage"`
LinkStats LinkStasConfig `toml:"linkstats"`
IDC []IDCConfig `toml:"idc"`
Nsq NsqConfig `toml:"nsq"`
TSDB Influxdbv2Config `toml:"tsdb"`
Log LogConfig `toml:"log"`
}

type CommonConfig struct {
Expand Down Expand Up @@ -76,6 +77,12 @@ type NsqConfig struct {
TopicPrefix string `toml:"topicPrefix"`
}

type Influxdbv2Config struct {
Port int `toml:"port"`
Org string `toml:"org"`
Token string `toml:"token"`
}

func (this NsqConfig) GetNsqConfig() *nsq.Config {
nsqConfig := nsq.NewConfig()
nsqConfig.MaxAttempts = this.MaxAttempts
Expand Down
5 changes: 5 additions & 0 deletions etc/router.sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
chan = "router"
topicPrefix = "collect"

[tsdb]
port = 9999
org = "monitor"
token = "xxxxxxxxxxxxxxx"

[registry]
link = "http://registry.test.com"
expireDur = 300
Expand Down
28 changes: 20 additions & 8 deletions influx/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/url"
"regexp"
"strings"
"sync"

"github.com/lodastack/router/config"
"github.com/lodastack/router/loda"
Expand Down Expand Up @@ -138,34 +139,45 @@ func WritePoints(influxDbs []string, pointsObj models.Points) error {
if len(influxDbs) > 1 {
for _, indexDB := range influxDbs[1:] {
limit.Take()
go writePoints(indexDB, db, precision, data, pointsCnt)
go writePoints(indexDB, db, precision, data, pointsCnt, pointsObj)
}
}
limit.Take()
return writePoints(influxDb, db, precision, data, pointsCnt)
return writePoints(influxDb, db, precision, data, pointsCnt, pointsObj)
}

func writePoints(influxDb string, db string, precision string, data []byte, pointsCnt int) error {
var v2Cache = make(map[string]bool)
var mu sync.RWMutex

func writePoints(influxDb string, db string, precision string, data []byte, pointsCnt int, pointsObj models.Points) error {
defer limit.Release()
fullUrl := fmt.Sprintf("%s?%s", GetWriteUrl(influxDb), ParseParams(map[string]string{

mu.RLock()
_, ok := v2Cache[influxDb]
mu.RUnlock()
if ok {
return WritePointsv2([]string{influxDb}, pointsObj, false)
}
fullURL := fmt.Sprintf("%s?%s", GetWriteUrl(influxDb), ParseParams(map[string]string{
"db": db,
"precision": precision,
}))

var err error
var resp *requests.Resp
if resp, err = requests.PostBytes(fullUrl, data); err != nil {
if resp, err = requests.PostBytes(fullURL, data); err != nil {
// clean cache, maybe config changed
loda.PurgeChan <- db
return err
return WritePointsv2([]string{influxDb}, pointsObj, true)
//return err
} else if resp.Status == 500 {
return fmt.Errorf("Influxdb returned invalid status code: %v", resp.Status)
} else if resp.Status == 204 {
log.Debug(string(data))
log.Infof("%d return by %s ,handle points %d", resp.Status, influxDb, pointsCnt)
return nil
} else if (resp.Status == 200 || resp.Status == 404) && strings.Contains(string(resp.Body), "database not found") {
err := createDbAndRP([]string{influxDb}, db)
err := createDBAndRP([]string{influxDb}, db)
if err != nil {
return err
}
Expand All @@ -182,7 +194,7 @@ var rpMap = map[string]string{
".mail.it.loda": "500d",
}

func createDbAndRP(influxDbs []string, db string) (err error) {
func createDBAndRP(influxDbs []string, db string) (err error) {
_, err = Query(influxDbs, map[string]string{
"q": fmt.Sprintf("create database \"%s\"", db),
}, "")
Expand Down
225 changes: 225 additions & 0 deletions influx/influxv2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package influx

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"

"github.com/lodastack/router/config"
"github.com/lodastack/router/loda"
"github.com/lodastack/router/models"

"github.com/lodastack/log"
)

func getURLv2(host string, module string) string {
return fmt.Sprintf("http://%s:%d/api/v2/%s", IntranetIP(host), config.GetConfig().TSDB.Port, module)
}

// WritePointsv2 writes points into influxdb v2.
func WritePointsv2(influxDbs []string, pointsObj models.Points, try bool) error {
db := pointsObj.Database
precision := "ns"

pointsCnt := len(pointsObj.Points)
points := convLinePointv2(pointsObj.Points)
data := []byte(strings.Join(points, "\n"))

var influxDb string
if len(influxDbs) > 0 {
influxDb = influxDbs[0]
} else {
return fmt.Errorf("no db config")
}
// write data to mutile DBs
if len(influxDbs) > 1 {
for _, indexDB := range influxDbs[1:] {
//limit.Take()
go writePointsv2(indexDB, db, precision, data, pointsCnt, try)
}
}
//limit.Take()
return writePointsv2(influxDb, db, precision, data, pointsCnt, try)
}

func writePointsv2(influxDb string, db string, precision string, data []byte, pointsCnt int, try bool) error {
//defer limit.Release()
fullURL := fmt.Sprintf("%s?%s", getURLv2(influxDb, "write"), ParseParams(map[string]string{
"bucket": db,
"precision": "ns",
"org": config.GetConfig().TSDB.Org,
}))

// Set client timeout
client := &http.Client{Timeout: time.Second * 5}
req, err := newRequest(fullURL, data, config.GetConfig().TSDB.Token, "POST")
if err != nil {
return err
}

// Send request
resp, err := client.Do(req)
if err != nil {
// clean cache, maybe config changed
loda.PurgeChan <- db
// clean cache
mu.Lock()
delete(v2Cache, db)
mu.Unlock()
return err
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
// clean cache, maybe config changed
loda.PurgeChan <- db
return err
} else if resp.StatusCode == 500 {
return fmt.Errorf("Influxdb returned invalid status code: %v", resp.Status)
} else if resp.StatusCode == 204 {

// cache add
if try {
mu.Lock()
v2Cache[db] = true
mu.Unlock()
}

log.Debug(string(data))
log.Infof("%d return by %s ,handle points %d", resp.StatusCode, influxDb, pointsCnt)
return nil
} else if (resp.StatusCode == 200 || resp.StatusCode == 404) && strings.Contains(string(body), "not found") {
err := createDBAndRPv2(influxDb, db)
if err != nil {
return err
}
return fmt.Errorf("just create db, need retry the points")
} else {
log.Warningf("abandon points, unknow return from influxdb %s, status: %d, body: %s", influxDb, resp.StatusCode, resp.Body)
return nil
}
}

var rpMapv2 = map[string]int{
".api.loda": 43200000,
".switch.loda": 43200000,
".mail.it.loda": 43200000,
}

func createDBAndRPv2(influxDB string, db string) (err error) {
u := getURLv2(influxDB, "buckets")
// {
// "name": "string",
// "orgID": "string",
// "retentionRules": [
// {
// "everySeconds": 7776000,
// "type": "expire"
// }
// ],
// "rp": "loda"
// }
type rr struct {
EverySeconds int `json:"everySeconds"`
Type string `json:"type"`
}
type bucket struct {
Name string `json:"name"`
OrgID string `json:"orgID"`
RetentionRules []rr `json:"retentionRules"`
RP string `json:"rp"`
}
// we keep 90 days by default, unit second.
duration := 7776000
for k, v := range rpMapv2 {
if strings.HasSuffix(db, k) {
duration = v
}
}

b := bucket{
Name: db,
OrgID: config.GetConfig().TSDB.Org,
RetentionRules: []rr{{
EverySeconds: duration,
Type: "expire",
}},
RP: "loda",
}

data, _ := json.Marshal(b)
// Set client timeout
client := &http.Client{Timeout: time.Second * 5}
req, err := newRequest(u, data, config.GetConfig().TSDB.Token, "POST")
if err != nil {
return err
}

// Send request
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
return fmt.Errorf("Influxdb returned invalid status code: %v", resp.Status)
}
return nil
}

func convLinePointv2(points []*models.Point) []string {
var linePoints []string
for _, point := range points {
line, err := convLinev2(point)
if err != nil {
log.Warningf("point %v conv to line failed %s", point, err)
continue
}
linePoints = append(linePoints, line)
}
return linePoints
}

func convLinev2(p *models.Point) (string, error) {
key := p.Measurement

if len(p.Tags) > 0 {
var tags []string
for k, v := range p.Tags {
if "" == v {
return "", fmt.Errorf("invalid tag value for %s", k)
}
tags = append(tags, fmt.Sprintf("%s=%s", k, v))
}
key = fmt.Sprintf("%s,%s", key, strings.Join(tags, ","))
}

var values []string
for k, v := range p.Fields {
if v == nil {
return "", fmt.Errorf("invalid field value nil")
}
values = append(values, fmt.Sprintf("%s=%v", k, v))
}

value := strings.Join(values, ",")

return fmt.Sprintf("%s %s %d", key, value, p.Timestamp*1e9), nil
}

func newRequest(url string, data []byte, token string, method string) (*http.Request, error) {
req, err := http.NewRequest(method, url, bytes.NewBuffer(data))
if err != nil {
return nil, err
}
// Set headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Token "+token)
return req, nil
}