Skip to content

Commit d10faf0

Browse files
authored
Merge pull request #602 from iExecBlockchainComputing/hotfix/worker-service-race-condition
Release version 8.1.2
2 parents 2b9bfe4 + 2647a7f commit d10faf0

File tree

4 files changed

+235
-75
lines changed

4 files changed

+235
-75
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,13 @@
22

33
All notable changes to this project will be documented in this file.
44

5+
## [[8.1.2]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v8.1.2) 2023-06-29
6+
7+
## Bug fixes
8+
- Prevent race conditions in `WorkerService`. (#602)
9+
### Dependency Upgrades
10+
- Upgrade to `iexec-commons-poco` 3.0.5. (#602)
11+
512
## [[8.1.1]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v8.1.1) 2023-06-23
613

714
### Dependency Upgrades

gradle.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
version=8.1.1
1+
version=8.1.2
22
iexecCommonVersion=8.2.1
3-
iexecCommonsPocoVersion=3.0.4
3+
iexecCommonsPocoVersion=3.0.5
44
iexecBlockchainAdapterVersion=8.1.1
55
iexecResultVersion=8.1.1
66
iexecSmsVersion=8.1.1

src/main/java/com/iexec/core/worker/WorkerService.java

Lines changed: 129 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.iexec.core.worker;
1818

19+
import com.iexec.common.utils.ContextualLockRunner;
1920
import com.iexec.core.configuration.WorkerConfiguration;
2021
import lombok.extern.slf4j.Slf4j;
2122
import org.springframework.stereotype.Service;
@@ -27,39 +28,33 @@
2728

2829
import static com.iexec.common.utils.DateTimeUtils.addMinutesToDate;
2930

31+
/**
32+
* Manage {@link Worker} objects.
33+
* <p>
34+
* /!\ Private read-and-write methods are not thread-safe.
35+
* They can sometime lead to race conditions.
36+
* Please use the public, thread-safe, versions of these methods instead.
37+
*/
3038
@Slf4j
3139
@Service
3240
public class WorkerService {
3341

3442
private final WorkerRepository workerRepository;
3543
private final WorkerConfiguration workerConfiguration;
44+
private final ContextualLockRunner<String> contextualLockRunner;
3645

3746
public WorkerService(WorkerRepository workerRepository,
3847
WorkerConfiguration workerConfiguration) {
3948
this.workerRepository = workerRepository;
4049
this.workerConfiguration = workerConfiguration;
50+
this.contextualLockRunner = new ContextualLockRunner<>();
4151
}
4252

53+
// region Read methods
4354
public Optional<Worker> getWorker(String walletAddress) {
4455
return workerRepository.findByWalletAddress(walletAddress);
4556
}
4657

47-
public Worker addWorker(Worker worker) {
48-
Optional<Worker> oWorker = workerRepository.findByWalletAddress(worker.getWalletAddress());
49-
50-
if (oWorker.isPresent()) {
51-
Worker existingWorker = oWorker.get();
52-
log.info("The worker is already registered [workerId:{}]", existingWorker.getId());
53-
worker.setId(existingWorker.getId());
54-
worker.setParticipatingChainTaskIds(existingWorker.getParticipatingChainTaskIds());
55-
worker.setComputingChainTaskIds(existingWorker.getComputingChainTaskIds());
56-
} else {
57-
log.info("Registering new worker");
58-
}
59-
60-
return workerRepository.save(worker);
61-
}
62-
6358
public boolean isAllowedToJoin(String workerAddress){
6459
List<String> whitelist = workerConfiguration.getWhitelist();
6560
// if the whitelist is empty, there is no restriction on the workers
@@ -69,18 +64,6 @@ public boolean isAllowedToJoin(String workerAddress){
6964
return whitelist.contains(workerAddress);
7065
}
7166

72-
public Optional<Worker> updateLastAlive(String walletAddress) {
73-
Optional<Worker> optional = workerRepository.findByWalletAddress(walletAddress);
74-
if (optional.isPresent()) {
75-
Worker worker = optional.get();
76-
worker.setLastAliveDate(new Date());
77-
workerRepository.save(worker);
78-
return Optional.of(worker);
79-
}
80-
81-
return Optional.empty();
82-
}
83-
8467
public boolean isWorkerAllowedToAskReplicate(String walletAddress) {
8568
Optional<Date> oDate = getLastReplicateDemand(walletAddress);
8669
if (oDate.isEmpty()) {
@@ -105,29 +88,6 @@ public Optional<Date> getLastReplicateDemand(String walletAddress) {
10588
return Optional.ofNullable(worker.getLastReplicateDemandDate());
10689
}
10790

108-
public Optional<Worker> updateLastReplicateDemandDate(String walletAddress) {
109-
Optional<Worker> optional = workerRepository.findByWalletAddress(walletAddress);
110-
if (optional.isPresent()) {
111-
Worker worker = optional.get();
112-
worker.setLastReplicateDemandDate(new Date());
113-
workerRepository.save(worker);
114-
return Optional.of(worker);
115-
}
116-
117-
return Optional.empty();
118-
}
119-
120-
public Optional<Worker> addChainTaskIdToWorker(String chainTaskId, String walletAddress) {
121-
Optional<Worker> optional = workerRepository.findByWalletAddress(walletAddress);
122-
if (optional.isPresent()) {
123-
Worker worker = optional.get();
124-
worker.addChainTaskId(chainTaskId);
125-
log.info("Added chainTaskId to worker [chainTaskId:{}, workerName:{}]", chainTaskId, walletAddress);
126-
return Optional.of(workerRepository.save(worker));
127-
}
128-
return Optional.empty();
129-
}
130-
13191
public List<String> getChainTaskIds(String walletAddress) {
13292
Optional<Worker> optional = workerRepository.findByWalletAddress(walletAddress);
13393
if (optional.isPresent()) {
@@ -146,28 +106,6 @@ public List<String> getComputingTaskIds(String walletAddress) {
146106
return Collections.emptyList();
147107
}
148108

149-
public Optional<Worker> removeChainTaskIdFromWorker(String chainTaskId, String walletAddress) {
150-
Optional<Worker> optional = workerRepository.findByWalletAddress(walletAddress);
151-
if (optional.isPresent()) {
152-
Worker worker = optional.get();
153-
worker.removeChainTaskId(chainTaskId);
154-
log.info("Removed chainTaskId from worker [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress);
155-
return Optional.of(workerRepository.save(worker));
156-
}
157-
return Optional.empty();
158-
}
159-
160-
public Optional<Worker> removeComputedChainTaskIdFromWorker(String chainTaskId, String walletAddress) {
161-
Optional<Worker> optional = workerRepository.findByWalletAddress(walletAddress);
162-
if (optional.isPresent()) {
163-
Worker worker = optional.get();
164-
worker.removeComputedChainTaskId(chainTaskId);
165-
log.info("Removed computed chainTaskId from worker [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress);
166-
return Optional.of(workerRepository.save(worker));
167-
}
168-
return Optional.empty();
169-
}
170-
171109

172110
// worker is considered lost if it didn't ping for 1 minute
173111
public List<Worker> getLostWorkers() {
@@ -249,4 +187,122 @@ public int getAliveAvailableGpu () {
249187
}
250188
return availableGpus;
251189
}
190+
// endregion
191+
192+
// region Read-and-write methods
193+
public Worker addWorker(Worker worker) {
194+
return contextualLockRunner.applyWithLock(
195+
worker.getWalletAddress(),
196+
address -> addWorkerWithoutThreadSafety(worker)
197+
);
198+
}
199+
200+
private Worker addWorkerWithoutThreadSafety(Worker worker) {
201+
Optional<Worker> oWorker = workerRepository.findByWalletAddress(worker.getWalletAddress());
202+
203+
if (oWorker.isPresent()) {
204+
Worker existingWorker = oWorker.get();
205+
log.info("The worker is already registered [workerId:{}]", existingWorker.getId());
206+
worker.setId(existingWorker.getId());
207+
worker.setParticipatingChainTaskIds(existingWorker.getParticipatingChainTaskIds());
208+
worker.setComputingChainTaskIds(existingWorker.getComputingChainTaskIds());
209+
} else {
210+
log.info("Registering new worker");
211+
}
212+
213+
return workerRepository.save(worker);
214+
}
215+
216+
public Optional<Worker> updateLastAlive(String walletAddress) {
217+
return contextualLockRunner.applyWithLock(
218+
walletAddress,
219+
this::updateLastAliveWithoutThreadSafety
220+
);
221+
}
222+
223+
private Optional<Worker> updateLastAliveWithoutThreadSafety(String walletAddress) {
224+
Optional<Worker> optional = workerRepository.findByWalletAddress(walletAddress);
225+
if (optional.isPresent()) {
226+
Worker worker = optional.get();
227+
worker.setLastAliveDate(new Date());
228+
workerRepository.save(worker);
229+
return Optional.of(worker);
230+
}
231+
232+
return Optional.empty();
233+
}
234+
235+
public Optional<Worker> updateLastReplicateDemandDate(String walletAddress) {
236+
return contextualLockRunner.applyWithLock(
237+
walletAddress,
238+
this::updateLastReplicateDemandDateWithoutThreadSafety
239+
);
240+
}
241+
242+
private Optional<Worker> updateLastReplicateDemandDateWithoutThreadSafety(String walletAddress) {
243+
Optional<Worker> optional = workerRepository.findByWalletAddress(walletAddress);
244+
if (optional.isPresent()) {
245+
Worker worker = optional.get();
246+
worker.setLastReplicateDemandDate(new Date());
247+
workerRepository.save(worker);
248+
return Optional.of(worker);
249+
}
250+
251+
return Optional.empty();
252+
}
253+
254+
public Optional<Worker> addChainTaskIdToWorker(String chainTaskId, String walletAddress) {
255+
return contextualLockRunner.applyWithLock(
256+
walletAddress,
257+
address -> addChainTaskIdToWorkerWithoutThreadSafety(chainTaskId, address)
258+
);
259+
}
260+
261+
private Optional<Worker> addChainTaskIdToWorkerWithoutThreadSafety(String chainTaskId, String walletAddress) {
262+
Optional<Worker> optional = workerRepository.findByWalletAddress(walletAddress);
263+
if (optional.isPresent()) {
264+
Worker worker = optional.get();
265+
worker.addChainTaskId(chainTaskId);
266+
log.info("Added chainTaskId to worker [chainTaskId:{}, workerName:{}]", chainTaskId, walletAddress);
267+
return Optional.of(workerRepository.save(worker));
268+
}
269+
return Optional.empty();
270+
}
271+
272+
public Optional<Worker> removeChainTaskIdFromWorker(String chainTaskId, String walletAddress) {
273+
return contextualLockRunner.applyWithLock(
274+
walletAddress,
275+
address -> removeChainTaskIdFromWorkerWithoutThreadSafety(chainTaskId, address)
276+
);
277+
}
278+
279+
private Optional<Worker> removeChainTaskIdFromWorkerWithoutThreadSafety(String chainTaskId, String walletAddress) {
280+
Optional<Worker> optional = workerRepository.findByWalletAddress(walletAddress);
281+
if (optional.isPresent()) {
282+
Worker worker = optional.get();
283+
worker.removeChainTaskId(chainTaskId);
284+
log.info("Removed chainTaskId from worker [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress);
285+
return Optional.of(workerRepository.save(worker));
286+
}
287+
return Optional.empty();
288+
}
289+
290+
public Optional<Worker> removeComputedChainTaskIdFromWorker(String chainTaskId, String walletAddress) {
291+
return contextualLockRunner.applyWithLock(
292+
walletAddress,
293+
address -> removeComputedChainTaskIdFromWorkerWithoutThreadSafety(chainTaskId, address)
294+
);
295+
}
296+
297+
private Optional<Worker> removeComputedChainTaskIdFromWorkerWithoutThreadSafety(String chainTaskId, String walletAddress) {
298+
Optional<Worker> optional = workerRepository.findByWalletAddress(walletAddress);
299+
if (optional.isPresent()) {
300+
Worker worker = optional.get();
301+
worker.removeComputedChainTaskId(chainTaskId);
302+
log.info("Removed computed chainTaskId from worker [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress);
303+
return Optional.of(workerRepository.save(worker));
304+
}
305+
return Optional.empty();
306+
}
307+
// endregion
252308
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright 2023-2023 IEXEC BLOCKCHAIN TECH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.iexec.core.worker;
18+
19+
import com.iexec.core.configuration.WorkerConfiguration;
20+
import lombok.extern.slf4j.Slf4j;
21+
import org.assertj.core.api.Assertions;
22+
import org.awaitility.Awaitility;
23+
import org.junit.jupiter.api.BeforeEach;
24+
import org.junit.jupiter.api.Test;
25+
import org.mockito.Mock;
26+
import org.mockito.MockitoAnnotations;
27+
import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest;
28+
import org.springframework.boot.test.mock.mockito.SpyBean;
29+
import org.springframework.test.context.DynamicPropertyRegistry;
30+
import org.springframework.test.context.DynamicPropertySource;
31+
import org.testcontainers.containers.MongoDBContainer;
32+
import org.testcontainers.junit.jupiter.Container;
33+
import org.testcontainers.junit.jupiter.Testcontainers;
34+
import org.testcontainers.utility.DockerImageName;
35+
36+
import java.time.Duration;
37+
import java.util.Date;
38+
import java.util.List;
39+
import java.util.Optional;
40+
import java.util.concurrent.*;
41+
import java.util.stream.Collectors;
42+
import java.util.stream.IntStream;
43+
44+
import static com.iexec.commons.poco.utils.TestUtils.WALLET_WORKER_1;
45+
46+
@Slf4j
47+
@DataMongoTest
48+
@Testcontainers
49+
class WorkerServiceRealRepositoryTests {
50+
@Container
51+
private static final MongoDBContainer mongoDBContainer = new MongoDBContainer(DockerImageName.parse("mongo:4.2"));
52+
53+
@DynamicPropertySource
54+
static void registerProperties(DynamicPropertyRegistry registry) {
55+
registry.add("spring.data.mongodb.host", mongoDBContainer::getContainerIpAddress);
56+
registry.add("spring.data.mongodb.port", () -> mongoDBContainer.getMappedPort(27017));
57+
}
58+
59+
@SpyBean
60+
private WorkerRepository workerRepository;
61+
@Mock
62+
private WorkerConfiguration workerConfiguration;
63+
private WorkerService workerService;
64+
65+
@BeforeEach
66+
void init() {
67+
MockitoAnnotations.openMocks(this);
68+
workerService = new WorkerService(workerRepository, workerConfiguration);
69+
}
70+
71+
/**
72+
* Try and add N tasks to a single worker at the same time.
73+
* If everything goes right, the Worker should finally have been assigned N tasks.
74+
*/
75+
@Test
76+
void addMultipleTaskIds() {
77+
workerService.addWorker(
78+
Worker.builder()
79+
.walletAddress(WALLET_WORKER_1)
80+
.build()
81+
);
82+
83+
final int nThreads = 10;
84+
final ExecutorService executor = Executors.newFixedThreadPool(nThreads);
85+
86+
final List<Future<Optional<Worker>>> futures = IntStream.range(0, nThreads)
87+
.mapToObj(i -> executor.submit(() -> workerService.addChainTaskIdToWorker(new Date().getTime() + "", WALLET_WORKER_1)))
88+
.collect(Collectors.toList());
89+
90+
Awaitility.await()
91+
.atMost(Duration.ofMinutes(1))
92+
.until(() -> futures.stream().map(Future::isDone).reduce(Boolean::logicalAnd).orElse(false));
93+
94+
Assertions.assertThat(workerService.getWorker(WALLET_WORKER_1).get().getComputingChainTaskIds())
95+
.hasSize(nThreads);
96+
}
97+
}

0 commit comments

Comments
 (0)