Skip to content
This repository was archived by the owner on Sep 29, 2018. It is now read-only.

Commit 8f12f5f

Browse files
committed
some other stuff like wait for cluster state
1 parent cd8bfad commit 8f12f5f

File tree

3 files changed

+65
-51
lines changed

3 files changed

+65
-51
lines changed

README.md

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,33 +10,36 @@
1010

1111
```
1212
Application Options:
13-
-s, --source= Source elasticsearch instance
14-
-d, --dest= Destination elasticsearch instance
15-
-c, --count= Number of documents at a time: ie "size" in the scroll
16-
request (100)
17-
-t, --time= Scroll time (1m)
18-
--settings Copy sharding settings from source (true)
19-
-f, --force Delete destination index before copying (false)
20-
-i, --indexes= List of indexes to copy, comma separated (_all)
21-
-a, --all Copy indexes starting with . (false)
22-
-w, --workers= Concurrency (1)
13+
-s, --source= source elasticsearch instance
14+
-d, --dest= destination elasticsearch instance
15+
-c, --count= number of documents at a time: ie "size" in the scroll request (100)
16+
-t, --time= scroll time (1m)
17+
-f, --force delete destination index before copying (false)
18+
--shards= set a number of shards on newly created indexes
19+
--docs-only load documents only, do not try to recreate indexes (false)
20+
--index-only only create indexes, do not load documents (false)
21+
--replicate enable replication while indexing into the new indexes (false)
22+
-i, --indexes= list of indexes to copy, comma separated (_all)
23+
-a, --all copy indexes starting with . and _ (false)
24+
-w, --workers= concurrency (1)
25+
--settings copy sharding settings from source (true)
26+
--green wait for both hosts cluster status to be green before dump. otherwise yellow is okay (false)
2327
```
2428

2529

2630
## NOTES:
2731

28-
1. Has been tested getting data from 0.9 onto a 1.4 box. For other scenaries YMMV.
32+
1. Has been tested getting data from 0.9 onto a 1.4 box. For other scenaries YMMV. (look out for this bug: https://github.com/elasticsearch/elasticsearch/issues/5165)
2933
1. Copies using the [_source](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/mapping-source-field.html) field in elasticsearch. If you have made modifications to it (excluding fields, etc) they will not be indexed on the destination host.
3034
1. ```--force``` will delete indexes on the destination host. Otherwise an error will be returned if the index exists
3135
1. ```--time``` is the [scroll time](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-scroll.html#scroll-search-context) passed to the source host, default is 1m. This is a string in es's format.
3236
1. ```--count``` is the [number of documents](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-scroll.html#scroll-scan) that will be request and bulk indexed at a time. Note that this depends on the number of shards (ie: size of 10 on 5 shards is 50 documents)
3337
1. ```--indexes``` is a comma separated list of indexes to copy
3438
1. ```--all``` indexes starting with . and _ are ignored by default, --all overrides this behavior
3539
1. ```--workers``` concurrency when we post to the bulk api. Only one post happens at a time, but higher concurrency should give you more throughput when using larger scroll sizes.
36-
1. Ports are required, otherwise 80 is the assumed port
40+
1. Ports are required, otherwise 80 is the assumed port (what)
3741

3842
## BUGS:
3943

40-
1. It will not do anything special when copying the _id (copies _id from source host). If _id is remapped this probably won't do what you want.
41-
1. Should check if the bulk index requests starts getting large (in bytes), and force a flush if that is the case. Right now we show an error if elasticsearch refuses a large request.
44+
1. It will not do anything special when copying the _id (copies _id from source host). If _id is remapped it may not do what you want.
4245
1. Should assume a default port of 9200

main.go

Lines changed: 46 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -40,24 +40,25 @@ type ClusterHealth struct {
4040

4141
type Config struct {
4242
FlushLock sync.Mutex
43-
DocChan chan Document
43+
DocChan chan map[string]interface{}
4444
ErrChan chan error
4545
Uid string // es scroll uid
4646

4747
// config options
4848
SrcEs string `short:"s" long:"source" description:"source elasticsearch instance" required:"true"`
4949
DstEs string `short:"d" long:"dest" description:"destination elasticsearch instance" required:"true"`
50-
DocBufferCount int `short:"c" long:"count" description:"number of documents at a time: ie \"size\" in the scroll request. If 0 size will not be sent to es" default:"100"`
50+
DocBufferCount int `short:"c" long:"count" description:"number of documents at a time: ie \"size\" in the scroll request" default:"100"`
5151
ScrollTime string `short:"t" long:"time" description:"scroll time" default:"1m"`
5252
Destructive bool `short:"f" long:"force" description:"delete destination index before copying" default:"false"`
5353
ShardsCount int `long:"shards" description:"set a number of shards on newly created indexes"`
5454
DocsOnly bool `long:"docs-only" description:"load documents only, do not try to recreate indexes" default:"false"`
5555
CreateIndexesOnly bool `long:"index-only" description:"only create indexes, do not load documents" default:"false"`
5656
EnableReplication bool `long:"replicate" description:"enable replication while indexing into the new indexes" default:"false"`
5757
IndexNames string `short:"i" long:"indexes" description:"list of indexes to copy, comma separated" default:"_all"`
58-
CopyAllIndexes bool `short:"a" long:"all" description:"copy all indexes, if false indexes starting with . and _ are not copied" default:"false"`
58+
CopyAllIndexes bool `short:"a" long:"all" description:"copy indexes starting with . and _" default:"false"`
5959
Workers int `short:"w" long:"workers" description:"concurrency" default:"1"`
6060
CopySettings bool `long:"settings" description:"copy sharding settings from source" default:"true"`
61+
WaitForGreen bool `long:"green" description:"wait for both hosts cluster status to be green before dump. otherwise yellow is okay" default:"false"`
6162
}
6263

6364
func main() {
@@ -77,7 +78,7 @@ func main() {
7778
}
7879

7980
// enough of a buffer to hold all the search results across all workers
80-
c.DocChan = make(chan Document, c.DocBufferCount*c.Workers)
81+
c.DocChan = make(chan map[string]interface{}, c.DocBufferCount*c.Workers)
8182

8283
// get all indexes from source
8384
idxs := Indexes{}
@@ -125,30 +126,22 @@ func main() {
125126
return
126127
}
127128

128-
// wait for cluster state to be okay in case we are copying many indexes or
129-
// shards
129+
// wait for cluster state to be okay before dumping
130130
timer := time.NewTimer(time.Second * 3)
131-
var srcReady, dstReady bool
132131
for {
133-
if health := ClusterStatus(c.SrcEs); health.Status == "red" {
134-
fmt.Printf("%s is %s %s, delaying start\n", health.Name, c.SrcEs, health.Status)
135-
srcReady = false
136-
} else {
137-
srcReady = true
138-
}
139-
140-
if health := ClusterStatus(c.DstEs); health.Status == "red" {
141-
fmt.Printf("%s on %s is %s, delaying start\n", health.Name, c.DstEs, health.Status)
142-
dstReady = false
143-
} else {
144-
dstReady = true
132+
if status, ready := c.ClusterReady(c.SrcEs); !ready {
133+
fmt.Printf("%s at %s is %s, delaying dump\n", status.Name, c.SrcEs, status.Status)
134+
<-timer.C
135+
continue
145136
}
146-
147-
if !srcReady || !dstReady {
137+
if status, ready := c.ClusterReady(c.DstEs); !ready {
138+
fmt.Printf("%s at %s is %s, delaying dump\n", status.Name, c.DstEs, status.Status)
148139
<-timer.C
149-
} else {
150-
break
140+
continue
151141
}
142+
143+
timer.Stop()
144+
break
152145
}
153146
fmt.Println("starting dump..")
154147

@@ -195,7 +188,13 @@ func (c *Config) NewWorker(docCount *int, bar *pb.ProgressBar, wg *sync.WaitGrou
195188

196189
for {
197190
var err error
198-
doc, open := <-c.DocChan
191+
docI, open := <-c.DocChan
192+
doc := Document{
193+
Index: docI["_index"].(string),
194+
Type: docI["_type"].(string),
195+
source: docI["_source"].(map[string]interface{}),
196+
Id: docI["_id"].(string),
197+
}
199198

200199
// if channel is closed flush and gtfo
201200
if !open {
@@ -219,8 +218,8 @@ func (c *Config) NewWorker(docCount *int, bar *pb.ProgressBar, wg *sync.WaitGrou
219218
c.ErrChan <- err
220219
}
221220

222-
// if we approach the 100mb (95mb) limit, flush to es and reset mainBuf
223-
if mainBuf.Len()+docBuf.Len() > 95000000 {
221+
// if we approach the 100mb es limit, flush to es and reset mainBuf
222+
if mainBuf.Len()+docBuf.Len() > 100000000 {
224223
c.BulkPost(&mainBuf)
225224
}
226225

@@ -384,7 +383,7 @@ func (c *Config) CopyShardingSettings(idxs *Indexes) (err error) {
384383
// try the new style syntax first, which has an index object
385384
shards = settings.(map[string]interface{})["settings"].(map[string]interface{})["index"].(map[string]interface{})["number_of_shards"].(string)
386385
} else {
387-
// if not, could be running from an old es intace, try the old style index.number_of_shards
386+
// if not, could be running from old es, try the old style index.number_of_shards
388387
shards = settings.(map[string]interface{})["settings"].(map[string]interface{})["index.number_of_shards"].(string)
389388
}
390389
index.(map[string]interface{})["settings"].(map[string]interface{})["index"] = map[string]interface{}{
@@ -441,8 +440,6 @@ func (c *Config) NewScroll() (scroll *Scroll, err error) {
441440
scroll = &Scroll{}
442441
err = dec.Decode(scroll)
443442

444-
fmt.Println(scroll.ScrollId)
445-
446443
return
447444
}
448445

@@ -466,7 +463,7 @@ func (s *Scroll) Next(c *Config) (done bool) {
466463
// XXX this might be bad, but assume we are done
467464
if resp.StatusCode != 200 {
468465
b, _ := ioutil.ReadAll(resp.Body)
469-
c.ErrChan <- fmt.Errorf("bad scroll response: %s", string(b))
466+
c.ErrChan <- fmt.Errorf("scroll response: %s", string(b))
470467
// flush and quit
471468
return true
472469
}
@@ -498,13 +495,7 @@ func (s *Scroll) Next(c *Config) (done bool) {
498495

499496
// write all the docs into a channel
500497
for _, docI := range docs {
501-
doc := docI.(map[string]interface{})
502-
c.DocChan <- Document{
503-
Index: doc["_index"].(string),
504-
Type: doc["_type"].(string),
505-
source: doc["_source"].(map[string]interface{}),
506-
Id: doc["_id"].(string),
507-
}
498+
c.DocChan <- docI.(map[string]interface{})
508499
}
509500

510501
return
@@ -532,6 +523,24 @@ func (c *Config) BulkPost(data *bytes.Buffer) {
532523
}
533524
}
534525

526+
func (c *Config) ClusterReady(host string) (*ClusterHealth, bool) {
527+
528+
health := ClusterStatus(host)
529+
if health.Status == "red" {
530+
return health, false
531+
}
532+
533+
if c.WaitForGreen == false && health.Status == "yellow" {
534+
return health, true
535+
}
536+
537+
if health.Status == "green" {
538+
return health, true
539+
}
540+
541+
return health, false
542+
}
543+
535544
func ClusterStatus(host string) *ClusterHealth {
536545

537546
resp, err := http.Get(fmt.Sprintf("%s/_cluster/health", host))

release.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,5 @@ unset -f go-alias
7474

7575
# crosscompile
7676
go-build-all
77+
# dont arm wtf
78+
rm -f elasticsearch-dump*arm*

0 commit comments

Comments
 (0)