Skip to content

Commit a7906e6

Browse files
committed
Create nodeInfo struct for separating startCmds per node and state per node
1 parent 97a8658 commit a7906e6

File tree

4 files changed

+119
-107
lines changed

4 files changed

+119
-107
lines changed

testserver/tenant.go

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) {
5858
tenantID, err := func() (int, error) {
5959
ts.mu.Lock()
6060
defer ts.mu.Unlock()
61-
if ts.nodeStates[0] != stateRunning {
61+
if ts.nodes[0].state != stateRunning {
6262
return 0, errors.New("TestServer must be running before NewTenantServer may be called")
6363
}
6464
if ts.isTenant() {
@@ -192,7 +192,7 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) {
192192
return nil, err
193193
}
194194

195-
args := [][]string{{
195+
args := []string{
196196
cockroachBinary,
197197
"mt",
198198
"start-sql",
@@ -202,21 +202,26 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) {
202202
"--kv-addrs=" + pgURL.Host,
203203
"--sql-addr=" + sqlAddr,
204204
"--http-addr=:0",
205-
}}
205+
}
206+
207+
nodes := []nodeInfo{
208+
{
209+
state: stateNew,
210+
startCmdArgs: args,
211+
// TODO(asubiotto): Specify listeningURLFile once we support dynamic
212+
// ports.
213+
listeningURLFile: "",
214+
},
215+
}
206216

207217
tenant := &testServerImpl{
208-
serverArgs: ts.serverArgs,
209-
version: ts.version,
210-
state: stateNew,
211-
nodeStates: []int{stateNew},
212-
baseDir: ts.baseDir,
213-
cmdArgs: args,
214-
cmd: make([]*exec.Cmd, ts.serverArgs.numNodes),
215-
stdout: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stdout", tenantID)),
216-
stderr: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stderr", tenantID)),
217-
// TODO(asubiotto): Specify listeningURLFile once we support dynamic
218-
// ports.
219-
listeningURLFile: []string{""},
218+
serverArgs: ts.serverArgs,
219+
version: ts.version,
220+
serverState: stateNew,
221+
baseDir: ts.baseDir,
222+
stdout: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stdout", tenantID)),
223+
stderr: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stderr", tenantID)),
224+
nodes: nodes,
220225
}
221226

222227
// Start the tenant.

testserver/testserver.go

Lines changed: 82 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,17 @@ type TestServer interface {
101101
// BaseDir returns directory StoreOnDiskOpt writes to if used.
102102
BaseDir() string
103103

104-
WaitForNode(numNode int) error
104+
// WaitForInitFinishForNode waits until a node has completed
105+
// initialization and is available to connect to and query on.
106+
WaitForInitFinishForNode(numNode int) error
107+
// StartNode runs the "cockroach start" command for the node.
105108
StartNode(i int) error
109+
// StopNode kills the node's process.
106110
StopNode(i int) error
111+
// UpgradeNode stops the node, then starts the node on the with the
112+
// binary specified at "upgradeBinaryPath".
107113
UpgradeNode(i int) error
114+
// PGURLForNode returns the PGUrl for the node.
108115
PGURLForNode(nodeNum int) *url.URL
109116
}
110117

@@ -117,24 +124,29 @@ type pgURLChan struct {
117124
orig url.URL
118125
}
119126

127+
// nodeInfo contains the info to start a node and the state of the node.
128+
type nodeInfo struct {
129+
startCmd *exec.Cmd
130+
startCmdArgs []string
131+
listeningURLFile string
132+
state int
133+
}
134+
120135
// testServerImpl is a TestServer implementation.
121136
type testServerImpl struct {
122-
mu sync.RWMutex
123-
version *version.Version
124-
serverArgs testServerArgs
125-
state int
126-
nodeStates []int
127-
baseDir string
128-
pgURL []pgURLChan
129-
cmd []*exec.Cmd
130-
cmdArgs [][]string
131-
initCmd *exec.Cmd
132-
initCmdArgs []string
133-
stdout string
134-
stderr string
135-
stdoutBuf logWriter
136-
stderrBuf logWriter
137-
listeningURLFile []string
137+
mu sync.RWMutex
138+
version *version.Version
139+
serverArgs testServerArgs
140+
serverState int
141+
baseDir string
142+
pgURL []pgURLChan
143+
initCmd *exec.Cmd
144+
initCmdArgs []string
145+
stdout string
146+
stderr string
147+
stdoutBuf logWriter
148+
stderrBuf logWriter
149+
nodes []nodeInfo
138150

139151
// curTenantID is used to allocate tenant IDs. Refer to NewTenantServer for
140152
// more information.
@@ -291,7 +303,7 @@ func StopDownloadInMiddleOpt() TestServerOpt {
291303
}
292304
}
293305

294-
func ThreeNode() TestServerOpt {
306+
func ThreeNodeOpt() TestServerOpt {
295307
return func(args *testServerArgs) {
296308
args.numNodes = 3
297309
}
@@ -376,11 +388,6 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) {
376388
return nil, fmt.Errorf("%s: %w", testserverMessagePrefix, err)
377389
}
378390

379-
listeningURLFile := make([]string, serverArgs.numNodes)
380-
for i := 0; i < serverArgs.numNodes; i++ {
381-
listeningURLFile[i] = filepath.Join(baseDir, fmt.Sprintf("listen-url%d", i))
382-
}
383-
384391
secureOpt := "--insecure"
385392
if serverArgs.secure {
386393
// Create certificates.
@@ -436,40 +443,44 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) {
436443
storeArg = fmt.Sprintf("--store=type=mem,size=%.2f", serverArgs.storeMemSize)
437444
}
438445

439-
args := make([][]string, serverArgs.numNodes)
446+
nodes := make([]nodeInfo, serverArgs.numNodes)
440447
var initArgs []string
441-
if serverArgs.numNodes <= 1 {
442-
args[0] = []string{
443-
serverArgs.cockroachBinary,
444-
startCmd,
445-
"--logtostderr",
446-
secureOpt,
447-
"--host=localhost",
448-
"--port=0",
449-
"--http-port=" + strconv.Itoa(serverArgs.httpPort),
450-
storeArg,
451-
"--listening-url-file=" + listeningURLFile[0],
452-
}
453-
} else {
454-
for i := 0; i < serverArgs.numNodes; i++ {
455-
args[i] = []string{
448+
for i := 0; i < serverArgs.numNodes; i++ {
449+
nodes[i].state = stateNew
450+
nodes[i].listeningURLFile = filepath.Join(baseDir, fmt.Sprintf("listen-url%d", i))
451+
if serverArgs.numNodes > 1 {
452+
nodes[i].startCmdArgs = []string{
456453
serverArgs.cockroachBinary,
457454
startCmd,
458455
secureOpt,
459456
storeArg + strconv.Itoa(i),
460457
fmt.Sprintf("--listen-addr=localhost:%d", 26257+i),
461458
fmt.Sprintf("--http-addr=localhost:%d", 8080+i),
462-
"--listening-url-file=" + listeningURLFile[i],
459+
"--listening-url-file=" + nodes[i].listeningURLFile,
463460
fmt.Sprintf("--join=localhost:%d,localhost:%d,localhost:%d", 26257, 26258, 26259),
464461
}
462+
} else {
463+
nodes[0].startCmdArgs = []string{
464+
serverArgs.cockroachBinary,
465+
startCmd,
466+
"--logtostderr",
467+
secureOpt,
468+
"--host=localhost",
469+
"--port=0",
470+
"--http-port=" + strconv.Itoa(serverArgs.httpPort),
471+
storeArg,
472+
"--listening-url-file=" + nodes[i].listeningURLFile,
473+
}
465474
}
475+
}
466476

467-
initArgs = []string{
468-
serverArgs.cockroachBinary,
469-
"init",
470-
secureOpt,
471-
"--host=localhost:26259",
472-
}
477+
// We only need initArgs if we're creating a testserver
478+
// with multiple nodes.
479+
initArgs = []string{
480+
serverArgs.cockroachBinary,
481+
"init",
482+
secureOpt,
483+
"--host=localhost:26259",
473484
}
474485

475486
states := make([]int, serverArgs.numNodes)
@@ -478,18 +489,15 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) {
478489
}
479490

480491
ts := &testServerImpl{
481-
serverArgs: *serverArgs,
482-
version: v,
483-
state: stateNew,
484-
nodeStates: states,
485-
baseDir: baseDir,
486-
cmdArgs: args,
487-
cmd: make([]*exec.Cmd, serverArgs.numNodes),
488-
initCmdArgs: initArgs,
489-
stdout: filepath.Join(logDir, "cockroach.stdout"),
490-
stderr: filepath.Join(logDir, "cockroach.stderr"),
491-
listeningURLFile: listeningURLFile,
492-
curTenantID: firstTenantID,
492+
serverArgs: *serverArgs,
493+
version: v,
494+
serverState: stateNew,
495+
baseDir: baseDir,
496+
initCmdArgs: initArgs,
497+
stdout: filepath.Join(logDir, "cockroach.stdout"),
498+
stderr: filepath.Join(logDir, "cockroach.stderr"),
499+
curTenantID: firstTenantID,
500+
nodes: nodes,
493501
}
494502
ts.pgURL = make([]pgURLChan, serverArgs.numNodes)
495503

@@ -546,7 +554,7 @@ func (ts *testServerImpl) setPGURLForNode(nodeNum int, u *url.URL) {
546554
close(ts.pgURL[nodeNum].set)
547555
}
548556

549-
func (ts *testServerImpl) WaitForNode(nodeNum int) error {
557+
func (ts *testServerImpl) WaitForInitFinishForNode(nodeNum int) error {
550558
db, err := sql.Open("postgres", ts.PGURLForNode(nodeNum).String())
551559
defer func() {
552560
_ = db.Close()
@@ -558,29 +566,29 @@ func (ts *testServerImpl) WaitForNode(nodeNum int) error {
558566
if _, err = db.Query("SHOW DATABASES"); err == nil {
559567
return err
560568
}
561-
log.Printf("%s: WaitForNode %d: Trying again after error: %v", testserverMessagePrefix, nodeNum, err)
569+
log.Printf("%s: WaitForInitFinishForNode %d: Trying again after error: %v", testserverMessagePrefix, nodeNum, err)
562570
time.Sleep(time.Millisecond * 100)
563571
}
564572
return nil
565573
}
566574

567575
// WaitForInit retries until a connection is successfully established.
568576
func (ts *testServerImpl) WaitForInit() error {
569-
return ts.WaitForNode(0)
577+
return ts.WaitForInitFinishForNode(0)
570578
}
571579

572580
func (ts *testServerImpl) pollListeningURLFile(nodeNum int) error {
573581
var data []byte
574582
for {
575583
ts.mu.RLock()
576-
state := ts.nodeStates[nodeNum]
584+
state := ts.nodes[nodeNum].state
577585
ts.mu.RUnlock()
578586
if state != stateRunning {
579587
return fmt.Errorf("server stopped or crashed before listening URL file was available")
580588
}
581589

582590
var err error
583-
data, err = ioutil.ReadFile(ts.listeningURLFile[nodeNum])
591+
data, err = ioutil.ReadFile(ts.nodes[nodeNum].listeningURLFile)
584592
if err == nil {
585593
break
586594
} else if !os.IsNotExist(err) {
@@ -624,9 +632,9 @@ func (ts *testServerImpl) pollListeningURLFile(nodeNum int) error {
624632
// to restart a testserver, but use NewTestServer().
625633
func (ts *testServerImpl) Start() error {
626634
ts.mu.Lock()
627-
if ts.state != stateNew {
635+
if ts.serverState != stateNew {
628636
ts.mu.Unlock()
629-
switch ts.state {
637+
switch ts.serverState {
630638
case stateRunning:
631639
return nil // No-op if server is already running.
632640
case stateStopped, stateFailed:
@@ -636,7 +644,7 @@ func (ts *testServerImpl) Start() error {
636644
"Please use NewTestServer()")
637645
}
638646
}
639-
ts.state = stateRunning
647+
ts.serverState = stateRunning
640648
ts.mu.Unlock()
641649

642650
for i := 0; i < ts.serverArgs.numNodes; i++ {
@@ -662,18 +670,18 @@ func (ts *testServerImpl) Stop() {
662670
ts.mu.RLock()
663671
defer ts.mu.RUnlock()
664672

665-
if ts.state == stateNew {
673+
if ts.serverState == stateNew {
666674
log.Fatalf("%s: Stop() called, but Start() was never called", testserverMessagePrefix)
667675
}
668-
if ts.state == stateFailed {
676+
if ts.serverState == stateFailed {
669677
log.Fatalf("%s: Stop() called, but process exited unexpectedly. Stdout:\n%s\nStderr:\n%s\n",
670678
testserverMessagePrefix,
671679
ts.Stdout(),
672680
ts.Stderr())
673681
return
674682
}
675683

676-
if ts.state != stateStopped {
684+
if ts.serverState != stateStopped {
677685
if p := ts.proxyProcess; p != nil {
678686
_ = p.Kill()
679687
}
@@ -686,16 +694,15 @@ func (ts *testServerImpl) Stop() {
686694
log.Printf("%s: failed to close stderr: %v", testserverMessagePrefix, closeErr)
687695
}
688696

689-
for _, cmd := range ts.cmd {
697+
ts.serverState = stateStopped
698+
for _, node := range ts.nodes {
699+
cmd := node.startCmd
690700
if cmd.Process != nil {
691701
_ = cmd.Process.Kill()
692702
}
693-
}
694703

695-
ts.state = stateStopped
696-
for _, nodeState := range ts.nodeStates {
697-
if nodeState != stateStopped {
698-
ts.state = stateFailed
704+
if node.state != stateStopped {
705+
ts.serverState = stateFailed
699706
}
700707
}
701708

0 commit comments

Comments
 (0)