Skip to content

Commit 1093c4a

Browse files
authored
Merge pull request #644 from iExecBlockchainComputing/hotfix/8.2.3
Check if Worker can still accept more work right before giving it a new replicate
2 parents e9ab24a + 5ddcf26 commit 1093c4a

File tree

9 files changed

+257
-171
lines changed

9 files changed

+257
-171
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
All notable changes to this project will be documented in this file.
44

5+
## [[8.2.3]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v8.2.3) 2023-12-14
6+
7+
### Bug Fixes
8+
9+
- Check if Worker can still accept more work right before giving it a new replicate. (#644)
10+
511
## [[8.2.2]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v8.2.2) 2023-12-13
612

713
### Bug Fixes

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
version=8.2.2
1+
version=8.2.3
22
iexecCommonVersion=8.3.0
33
iexecCommonsPocoVersion=3.1.0
44
iexecBlockchainAdapterVersion=8.2.0

src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java

Lines changed: 53 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,9 @@ public ReplicateSupplyService(ReplicatesService replicatesService,
8686
*/
8787
@Retryable(value = {OptimisticLockingFailureException.class}, maxAttempts = 5)
8888
Optional<ReplicateTaskSummary> getAvailableReplicateTaskSummary(long workerLastBlock, String walletAddress) {
89-
// return empty if max computing task is reached or if the worker is not found
90-
if (!workerService.canAcceptMoreWorks(walletAddress)) {
91-
return Optional.empty();
92-
}
93-
9489
// return empty if the worker is not sync
9590
//TODO Check if worker node is sync
96-
boolean isWorkerLastBlockAvailable = workerLastBlock > 0;
91+
final boolean isWorkerLastBlockAvailable = workerLastBlock > 0;
9792
if (!isWorkerLastBlockAvailable) {
9893
return Optional.empty();
9994
}
@@ -102,13 +97,16 @@ Optional<ReplicateTaskSummary> getAvailableReplicateTaskSummary(long workerLastB
10297
return Optional.empty();
10398
}
10499

105-
// TODO : Remove this, the optional can never be empty
106-
// This is covered in workerService.canAcceptMoreWorks
107-
Optional<Worker> optional = workerService.getWorker(walletAddress);
100+
final Optional<Worker> optional = workerService.getWorker(walletAddress);
108101
if (optional.isEmpty()) {
109102
return Optional.empty();
110103
}
111-
Worker worker = optional.get();
104+
final Worker worker = optional.get();
105+
106+
// return empty if max computing task is reached or if the worker is not found
107+
if (!workerService.canAcceptMoreWorks(worker)) {
108+
return Optional.empty();
109+
}
112110

113111
return getReplicateTaskSummaryForAnyAvailableTask(
114112
walletAddress,
@@ -161,8 +159,8 @@ private Optional<ReplicateTaskSummary> getReplicateTaskSummary(Task task, String
161159
chainTaskId,
162160
task.getEnclaveChallenge());
163161
ReplicateTaskSummaryBuilder replicateTaskSummary = ReplicateTaskSummary.builder()
164-
.workerpoolAuthorization(authorization);
165-
if(task.isTeeTask()){
162+
.workerpoolAuthorization(authorization);
163+
if (task.isTeeTask()) {
166164
replicateTaskSummary.smsUrl(task.getSmsUrl());
167165
}
168166
return Optional.of(replicateTaskSummary.build());
@@ -173,7 +171,7 @@ private Optional<ReplicateTaskSummary> getReplicateTaskSummary(Task task, String
173171
* tries to accept the task - i.e. create a new {@link Replicate}
174172
* for that task on that worker.
175173
*
176-
* @param task {@link Task} needing at least one new {@link Replicate}.
174+
* @param task {@link Task} needing at least one new {@link Replicate}.
177175
* @param walletAddress Wallet address of a worker looking for new {@link Task}.
178176
* @return {@literal true} if the task has been accepted,
179177
* {@literal false} otherwise.
@@ -184,22 +182,6 @@ private boolean acceptOrRejectTask(Task task, String walletAddress) {
184182
}
185183

186184
final String chainTaskId = task.getChainTaskId();
187-
final Optional<ReplicatesList> oReplicatesList = replicatesService.getReplicatesList(chainTaskId);
188-
// Check is only here to prevent
189-
// "`Optional.get()` without `isPresent()` warning".
190-
// This case should not happen.
191-
if (oReplicatesList.isEmpty()) {
192-
return false;
193-
}
194-
195-
final ReplicatesList replicatesList = oReplicatesList.get();
196-
197-
final boolean hasWorkerAlreadyParticipated =
198-
replicatesList.hasWorkerAlreadyParticipated(walletAddress);
199-
if (hasWorkerAlreadyParticipated) {
200-
return false;
201-
}
202-
203185
final Lock lock = taskAccessForNewReplicateLocks
204186
.computeIfAbsent(chainTaskId, k -> new ReentrantLock());
205187
if (!lock.tryLock()) {
@@ -209,33 +191,56 @@ private boolean acceptOrRejectTask(Task task, String walletAddress) {
209191
}
210192

211193
try {
212-
final boolean taskNeedsMoreContributions = ConsensusHelper.doesTaskNeedMoreContributionsForConsensus(
213-
chainTaskId,
214-
replicatesList.getReplicates(),
215-
task.getTrust(),
216-
task.getMaxExecutionTime());
217-
218-
if (!taskNeedsMoreContributions
219-
|| taskService.isConsensusReached(replicatesList)) {
220-
return false;
221-
}
222-
223-
replicatesService.addNewReplicate(chainTaskId, walletAddress);
224-
workerService.addChainTaskIdToWorker(chainTaskId, walletAddress);
194+
return replicatesService.getReplicatesList(chainTaskId)
195+
.map(replicatesList -> acceptOrRejectTask(task, walletAddress, replicatesList))
196+
.orElse(false);
225197
} finally {
226198
// We should always unlock the task
227199
// so that it could be taken by another replicate
228200
// if there's any issue.
229201
lock.unlock();
230202
}
203+
}
231204

232-
return true;
205+
/**
206+
* Given a {@link Task}, a {@code walletAddress} of a worker and a {@link ReplicatesList},
207+
* tries to accept the task - i.e. create a new {@link Replicate}
208+
* for that task on that worker.
209+
*
210+
* @param task {@link Task} needing at least one new {@link Replicate}.
211+
* @param walletAddress Wallet address of a worker looking for new {@link Task}.
212+
* @param replicatesList Replicates of given {@link Task}.
213+
* @return {@literal true} if the task has been accepted,
214+
* {@literal false} otherwise.
215+
*/
216+
boolean acceptOrRejectTask(Task task, String walletAddress, ReplicatesList replicatesList) {
217+
final boolean hasWorkerAlreadyParticipated =
218+
replicatesList.hasWorkerAlreadyParticipated(walletAddress);
219+
if (hasWorkerAlreadyParticipated) {
220+
return false;
221+
}
222+
223+
final String chainTaskId = replicatesList.getChainTaskId();
224+
final boolean taskNeedsMoreContributions = ConsensusHelper.doesTaskNeedMoreContributionsForConsensus(
225+
chainTaskId,
226+
replicatesList.getReplicates(),
227+
task.getTrust(),
228+
task.getMaxExecutionTime());
229+
230+
if (!taskNeedsMoreContributions
231+
|| taskService.isConsensusReached(replicatesList)) {
232+
return false;
233+
}
234+
235+
return workerService.addChainTaskIdToWorker(chainTaskId, walletAddress)
236+
.map(worker -> replicatesService.addNewReplicate(replicatesList, walletAddress))
237+
.orElse(false);
233238
}
234239

235240
/**
236241
* Get notifications missed by the worker during the time it was absent.
237-
*
238-
* @param blockNumber last seen blocknumber by the worker
242+
*
243+
* @param blockNumber last seen blocknumber by the worker
239244
* @param walletAddress of the worker
240245
* @return list of missed notifications. Can be empty if no notification is found
241246
*/
@@ -264,7 +269,7 @@ public List<TaskNotification> getMissedTaskNotifications(long blockNumber, Strin
264269
continue;
265270
}
266271
TaskNotificationExtra taskNotificationExtra =
267-
getTaskNotificationExtra(task, taskNotificationType.get(), walletAddress, enclaveChallenge);
272+
getTaskNotificationExtra(task, taskNotificationType.get(), walletAddress, enclaveChallenge);
268273

269274
TaskNotification taskNotification = TaskNotification.builder()
270275
.chainTaskId(chainTaskId)
@@ -286,7 +291,7 @@ public List<TaskNotification> getMissedTaskNotifications(long blockNumber, Strin
286291
private TaskNotificationExtra getTaskNotificationExtra(Task task, TaskNotificationType taskNotificationType, String walletAddress, String enclaveChallenge) {
287292
TaskNotificationExtra taskNotificationExtra = TaskNotificationExtra.builder().build();
288293

289-
switch (taskNotificationType){
294+
switch (taskNotificationType) {
290295
case PLEASE_CONTRIBUTE:
291296
WorkerpoolAuthorization authorization = signatureService.createAuthorization(
292297
walletAddress, task.getChainTaskId(), enclaveChallenge);
@@ -312,7 +317,7 @@ public Optional<TaskNotificationType> getTaskNotificationType(Task task, Replica
312317
// CONTRIBUTION_TIMEOUT or CONSENSUS_REACHED without contribution
313318
if (task.getCurrentStatus().equals(TaskStatus.CONTRIBUTION_TIMEOUT)
314319
|| (task.getCurrentStatus().equals(TaskStatus.CONSENSUS_REACHED)
315-
&& !replicate.containsContributedStatus())) {
320+
&& !replicate.containsContributedStatus())) {
316321
return Optional.of(TaskNotificationType.PLEASE_ABORT);
317322
}
318323

src/main/java/com/iexec/core/replicate/ReplicatesService.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -71,22 +71,21 @@ public ReplicatesService(ReplicatesRepository replicatesRepository,
7171
this.taskLogsService = taskLogsService;
7272
}
7373

74-
public void addNewReplicate(String chainTaskId, String walletAddress) {
75-
if (getReplicate(chainTaskId, walletAddress).isEmpty()) {
76-
Optional<ReplicatesList> optional = getReplicatesList(chainTaskId);
77-
if (optional.isPresent()) {
78-
ReplicatesList replicatesList = optional.get();
79-
Replicate replicate = new Replicate(walletAddress, chainTaskId);
80-
replicate.setWorkerWeight(iexecHubService.getWorkerWeight(walletAddress));// workerWeight value for pendingWeight estimate
81-
replicatesList.getReplicates().add(replicate);
82-
83-
replicatesRepository.save(replicatesList);
84-
log.info("New replicate saved [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress);
85-
}
74+
public boolean addNewReplicate(ReplicatesList replicatesList, String walletAddress) {
75+
final String chainTaskId = replicatesList.getChainTaskId();
76+
if (replicatesList.getReplicateOfWorker(walletAddress).isEmpty()) {
77+
Replicate replicate = new Replicate(walletAddress, chainTaskId);
78+
replicate.setWorkerWeight(iexecHubService.getWorkerWeight(walletAddress));// workerWeight value for pendingWeight estimate
79+
replicatesList.getReplicates().add(replicate);
80+
81+
replicatesRepository.save(replicatesList);
82+
log.info("New replicate saved [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress);
8683
} else {
8784
log.error("Replicate already saved [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress);
85+
return false;
8886
}
8987

88+
return true;
9089
}
9190

9291
public synchronized void createEmptyReplicateList(String chainTaskId) {
@@ -635,4 +634,4 @@ public void setRevealTimeoutStatusIfNeeded(String chainTaskId, Replicate replica
635634
updateReplicateStatus(chainTaskId, replicate.getWalletAddress(), statusUpdate);
636635
}
637636
}
638-
}
637+
}

src/main/java/com/iexec/core/worker/WorkerService.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,10 @@ public Optional<Worker> getWorker(String walletAddress) {
6969
return workerRepository.findByWalletAddress(walletAddress);
7070
}
7171

72-
public boolean isAllowedToJoin(String workerAddress){
72+
public boolean isAllowedToJoin(String workerAddress) {
7373
List<String> whitelist = workerConfiguration.getWhitelist();
7474
// if the whitelist is empty, there is no restriction on the workers
75-
if (whitelist.isEmpty()){
75+
if (whitelist.isEmpty()) {
7676
return true;
7777
}
7878
return whitelist.contains(workerAddress);
@@ -133,19 +133,13 @@ public List<Worker> getAliveWorkers() {
133133
return workerRepository.findByWalletAddressIn(aliveWorkers);
134134
}
135135

136-
public boolean canAcceptMoreWorks(String walletAddress) {
137-
Optional<Worker> optionalWorker = getWorker(walletAddress);
138-
if (optionalWorker.isEmpty()){
139-
return false;
140-
}
141-
142-
Worker worker = optionalWorker.get();
136+
public boolean canAcceptMoreWorks(Worker worker) {
143137
int workerMaxNbTasks = worker.getMaxNbTasks();
144138
int runningReplicateNb = worker.getComputingChainTaskIds().size();
145139

146140
if (runningReplicateNb >= workerMaxNbTasks) {
147141
log.debug("Worker asking for too many replicates [walletAddress:{}, runningReplicateNb:{}, workerMaxNbTasks:{}]",
148-
walletAddress, runningReplicateNb, workerMaxNbTasks);
142+
worker.getWalletAddress(), runningReplicateNb, workerMaxNbTasks);
149143
return false;
150144
}
151145

@@ -154,44 +148,44 @@ public boolean canAcceptMoreWorks(String walletAddress) {
154148

155149
public int getAliveAvailableCpu() {
156150
int availableCpus = 0;
157-
for (Worker worker: getAliveWorkers()) {
151+
for (Worker worker : getAliveWorkers()) {
158152
if (worker.isGpuEnabled()) {
159153
continue;
160154
}
161155

162156
int workerCpuNb = worker.getCpuNb();
163157
int computingReplicateNb = worker.getComputingChainTaskIds().size();
164158
int availableCpu = workerCpuNb - computingReplicateNb;
165-
availableCpus+= availableCpu;
159+
availableCpus += availableCpu;
166160
}
167161
return availableCpus;
168162
}
169163

170164
public int getAliveTotalCpu() {
171165
int totalCpus = 0;
172-
for (Worker worker: getAliveWorkers()){
173-
if(worker.isGpuEnabled()) {
166+
for (Worker worker : getAliveWorkers()) {
167+
if (worker.isGpuEnabled()) {
174168
continue;
175169
}
176-
totalCpus+= worker.getCpuNb();
170+
totalCpus += worker.getCpuNb();
177171
}
178172
return totalCpus;
179173
}
180174

181175
// We suppose for now that 1 Gpu enabled worker has only one GPU
182176
public int getAliveTotalGpu() {
183177
int totalGpus = 0;
184-
for(Worker worker: getAliveWorkers()) {
178+
for (Worker worker : getAliveWorkers()) {
185179
if (worker.isGpuEnabled()) {
186180
totalGpus++;
187181
}
188182
}
189183
return totalGpus;
190184
}
191185

192-
public int getAliveAvailableGpu () {
186+
public int getAliveAvailableGpu() {
193187
int availableGpus = getAliveTotalGpu();
194-
for (Worker worker: getAliveWorkers()) {
188+
for (Worker worker : getAliveWorkers()) {
195189
if (worker.isGpuEnabled()) {
196190
boolean isWorking = !worker.getComputingChainTaskIds().isEmpty();
197191
if (isWorking) {
@@ -246,13 +240,20 @@ public Optional<Worker> addChainTaskIdToWorker(String chainTaskId, String wallet
246240
}
247241

248242
private Optional<Worker> addChainTaskIdToWorkerWithoutThreadSafety(String chainTaskId, String walletAddress) {
249-
Optional<Worker> optional = workerRepository.findByWalletAddress(walletAddress);
243+
final Optional<Worker> optional = workerRepository.findByWalletAddress(walletAddress);
250244
if (optional.isPresent()) {
251-
Worker worker = optional.get();
245+
final Worker worker = optional.get();
246+
if (!canAcceptMoreWorks(worker)) {
247+
log.warn("Can't add chainTaskId to worker when already full [chainTaskId:{}, workerName:{}]",
248+
chainTaskId, walletAddress);
249+
return Optional.empty();
250+
}
252251
worker.addChainTaskId(chainTaskId);
253252
log.info("Added chainTaskId to worker [chainTaskId:{}, workerName:{}]", chainTaskId, walletAddress);
254253
return Optional.of(workerRepository.save(worker));
255254
}
255+
log.warn("Can't add chainTaskId to worker when unknown worker [chainTaskId:{}, workerName:{}]",
256+
chainTaskId, walletAddress);
256257
return Optional.empty();
257258
}
258259

src/test/java/com/iexec/core/replicate/ReplicateServiceTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ void shouldCreateNewReplicate() {
8383
ReplicatesList replicatesList = new ReplicatesList(CHAIN_TASK_ID, list);
8484
when(replicatesRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList));
8585
when(replicatesRepository.save(any())).thenReturn(replicatesList);
86-
replicatesService.addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_3);
86+
replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_3);
8787
Mockito.verify(replicatesRepository, Mockito.times(1))
8888
.save(any());
8989
}
@@ -103,11 +103,11 @@ void shouldNotCreateNewReplicate() {
103103
when(replicatesRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList));
104104
when(replicatesRepository.save(any())).thenReturn(replicatesList);
105105

106-
replicatesService.addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_1);
106+
replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_1);
107107
Mockito.verify(replicatesRepository, Mockito.times(0))
108108
.save(any());
109109

110-
replicatesService.addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_2);
110+
replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_2);
111111
Mockito.verify(replicatesRepository, Mockito.times(0))
112112
.save(any());
113113
}
@@ -1505,4 +1505,4 @@ void computeUpdateReplicateStatusArgsResultUploadFailed() {
15051505
.build());
15061506
}
15071507

1508-
}
1508+
}

0 commit comments

Comments
 (0)