Skip to content

Commit 0799996

Browse files
authored
Back out "Revert D69698406: gloo: async lazy connect"
Differential Revision: D72425729 Pull Request resolved: #426
1 parent ae9b62a commit 0799996

27 files changed

+263
-77
lines changed

gloo/test/allgather_test.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,11 @@ TEST_F(AllgatherNewTest, TestTimeout) {
167167
AllgatherOptions opts(context);
168168
opts.setInput(input.getPointer(), 1);
169169
opts.setOutput(output.getPointer(), context->size);
170+
171+
// Run one operation first so we're measuring the operation timeout not
172+
// connection timeout.
173+
allgather(opts);
174+
170175
opts.setTimeout(std::chrono::milliseconds(10));
171176
if (context->rank == 0) {
172177
try {

gloo/test/allgatherv_test.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ TEST_F(AllgathervTest, TestTimeout) {
9595
std::vector<size_t> counts({1, 1});
9696
AllgathervOptions opts(context);
9797
opts.setOutput(output.getPointer(), counts);
98+
99+
// Run one operation first so we're measuring the operation timeout not
100+
// connection timeout.
101+
allgatherv(opts);
102+
98103
opts.setTimeout(std::chrono::milliseconds(10));
99104
if (context->rank == 0) {
100105
try {

gloo/test/allreduce_test.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,11 @@ TEST_F(AllreduceNewTest, TestTimeout) {
389389
AllreduceOptions opts(context);
390390
opts.setOutputs(outputs.getPointers(), 1);
391391
opts.setReduceFunction(getFunction<uint64_t>());
392+
393+
// Run one operation first so we're measuring the operation timeout not
394+
// connection timeout.
395+
allreduce(opts);
396+
392397
opts.setTimeout(std::chrono::milliseconds(10));
393398
if (context->rank == 0) {
394399
try {

gloo/test/barrier_test.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,18 @@ INSTANTIATE_TEST_CASE_P(
127127
TEST_F(BarrierNewTest, TestTimeout) {
128128
spawn(Transport::TCP, 2, [&](std::shared_ptr<Context> context) {
129129
BarrierOptions opts(context);
130+
131+
// Run barrier first so we're measuring the barrier timeout not connection
132+
// timeout.
133+
barrier(opts);
134+
130135
opts.setTimeout(std::chrono::milliseconds(10));
131136
if (context->rank == 0) {
132137
try {
133138
barrier(opts);
134139
FAIL() << "Expected exception to be thrown";
135140
} catch (::gloo::IoException& e) {
141+
std::cerr << e.what() << std::endl;
136142
ASSERT_NE(std::string(e.what()).find("Timed out"), std::string::npos);
137143
}
138144
}

gloo/test/base_test.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ const char* kDefaultDevice = "localhost";
1717
// Transports that instantiated algorithms can be tested against.
1818
const std::vector<Transport> kTransportsForClassAlgorithms = {
1919
Transport::TCP,
20+
Transport::TCP_LAZY,
2021
#if GLOO_HAVE_TRANSPORT_TCP_TLS
2122
Transport::TCP_TLS,
2223
#endif
@@ -27,6 +28,7 @@ const std::vector<Transport> kTransportsForClassAlgorithms = {
2728
// preferred over the instantiated style.
2829
const std::vector<Transport> kTransportsForFunctionAlgorithms = {
2930
Transport::TCP,
31+
Transport::TCP_LAZY,
3032
#if GLOO_HAVE_TRANSPORT_TCP_TLS
3133
Transport::TCP_TLS,
3234
#endif
@@ -37,6 +39,8 @@ std::shared_ptr<::gloo::transport::Device> createDevice(Transport transport) {
3739
#if GLOO_HAVE_TRANSPORT_TCP
3840
if (transport == Transport::TCP) {
3941
return ::gloo::transport::tcp::CreateDevice(kDefaultDevice);
42+
} else if (transport == Transport::TCP_LAZY) {
43+
return ::gloo::transport::tcp::CreateLazyDevice(kDefaultDevice);
4044
}
4145
#endif
4246
#if GLOO_HAVE_TRANSPORT_TCP_TLS

gloo/test/base_test.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ class Barrier {
5959

6060
enum Transport {
6161
TCP,
62+
TCP_LAZY,
6263
#if GLOO_HAVE_TRANSPORT_TCP_TLS
6364
TCP_TLS,
6465
#endif

gloo/test/broadcast_test.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,11 @@ TEST_F(BroadcastTest, TestTimeout) {
182182
BroadcastOptions opts(context);
183183
opts.setOutput(output.getPointer(), 1);
184184
opts.setRoot(0);
185+
186+
// Run one operation first so we're measuring the operation timeout not
187+
// connection timeout.
188+
broadcast(opts);
189+
185190
opts.setTimeout(std::chrono::milliseconds(10));
186191
if (context->rank == 0) {
187192
try {

gloo/test/gather_test.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,11 @@ TEST_F(GatherTest, TestTimeout) {
7676
opts.setInput(input.getPointer(), 1);
7777
opts.setOutput(output.getPointer(), context->size);
7878
opts.setRoot(0);
79+
80+
// Run one operation first so we're measuring the operation timeout not
81+
// connection timeout.
82+
gather(opts);
83+
7984
opts.setTimeout(std::chrono::milliseconds(10));
8085
if (context->rank == 0) {
8186
try {

gloo/test/gatherv_test.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ TEST_F(GathervTest, TestTimeout) {
106106
opts.setRoot(0);
107107
opts.setInput(input.getPointer(), 1);
108108
opts.setOutput(output.getPointer(), counts);
109+
110+
// Run one operation first so we're measuring the operation timeout not
111+
// connection timeout.
112+
gatherv(opts);
113+
109114
opts.setTimeout(std::chrono::milliseconds(10));
110115
if (context->rank == 0) {
111116
try {

gloo/test/multiproc_test.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,31 @@ class MultiProcWorker {
102102
auto device = createDevice(transport);
103103
context->setTimeout(std::chrono::milliseconds(kMultiProcTimeout));
104104
context->connectFullMesh(store_, device);
105+
106+
// Wait for all workers to be ready
107+
ringBarrier(context);
108+
105109
device.reset();
106110
sem_post(semaphore_);
107111
fn(std::move(context));
108112
}
109113

114+
void ringBarrier(std::shared_ptr<::gloo::rendezvous::Context>& context) {
115+
int sendScratch = 0;
116+
int recvScratch = 0;
117+
auto sendBuf =
118+
context->createUnboundBuffer(&sendScratch, sizeof(sendScratch));
119+
auto recvBuf =
120+
context->createUnboundBuffer(&recvScratch, sizeof(recvScratch));
121+
const auto leftRank = (context->size + context->rank - 1) % context->size;
122+
const auto rightRank = (context->rank + 1) % context->size;
123+
124+
sendBuf->send(leftRank, 0);
125+
recvBuf->recv(rightRank, 0);
126+
sendBuf->waitSend();
127+
recvBuf->waitRecv();
128+
}
129+
110130
protected:
111131
std::shared_ptr<::gloo::rendezvous::Store> store_;
112132
sem_t* semaphore_;

gloo/test/reduce_test.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,11 @@ TEST_F(ReduceTest, TestTimeout) {
101101
opts.setOutput(outputs.getPointer(), 1);
102102
opts.setRoot(0);
103103
opts.setReduceFunction(getFunction<uint64_t>());
104+
105+
// Run one operation first so we're measuring the operation timeout not
106+
// connection timeout.
107+
reduce(opts);
108+
104109
opts.setTimeout(std::chrono::milliseconds(10));
105110
if (context->rank == 0) {
106111
try {

gloo/test/scatter_test.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ TEST_F(ScatterTest, TestTimeout) {
7272
opts.setInputs(input.getPointers(), 1);
7373
opts.setOutput(output.getPointer(), 1);
7474
opts.setRoot(0);
75+
76+
// Run one operation first so we're measuring the operation timeout not
77+
// connection timeout.
78+
scatter(opts);
79+
7580
opts.setTimeout(std::chrono::milliseconds(10));
7681
if (context->rank == 0) {
7782
try {

gloo/test/tcp_test.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ namespace transport {
88
namespace tcp {
99

1010
TEST(TcpTest, ConnectTimeout) {
11-
auto loop = std::make_shared<Loop>();
11+
Loop loop;
1212

1313
std::mutex m;
1414
std::condition_variable cv;
@@ -25,7 +25,7 @@ TEST(TcpTest, ConnectTimeout) {
2525
EXPECT_TRUE(e);
2626
EXPECT_TRUE(dynamic_cast<const TimeoutError*>(&e));
2727
};
28-
connectLoop(*loop, remote, 0, 5, timeout, std::move(fn));
28+
connectLoop(loop, remote, 0, 5, timeout, std::move(fn));
2929

3030
std::unique_lock<std::mutex> lock(m);
3131
cv.wait(lock, [&] { return done; });

0 commit comments

Comments
 (0)