Skip to content

Commit 1cdeed2

Browse files
authored
Fix missed replicate status update detectors to avoid false positives (#653)
* Fix missed replicate status update detectors to avoid false positives * Fix javadoc
1 parent 0d1c83b commit 1cdeed2

File tree

8 files changed

+218
-223
lines changed

8 files changed

+218
-223
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ All notable changes to this project will be documented in this file.
3232
- Use less MongoDB calls when updating a task to a final status. (#649)
3333
- Save contribution and result updload replicate data when `CONTRIBUTE_AND_FINALIZE_DONE`. (#651)
3434
- Fix potential `NullPointerException` during first worker replicate request. (#652)
35+
- Fix missed replicate status update detectors to avoid false positives by mixing `CONTRIBUTE-REVEAL-FINALIZE` and `CONTRIBUTE_AND_FINALIZE` workflows. (#653)
3536

3637
### Dependency Upgrades
3738

src/main/java/com/iexec/core/detector/replicate/ContributionAndFinalizationUnnotifiedDetector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2023 IEXEC BLOCKCHAIN TECH
2+
* Copyright 2023-2024 IEXEC BLOCKCHAIN TECH
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

src/main/java/com/iexec/core/detector/replicate/ContributionUnnotifiedDetector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2023 IEXEC BLOCKCHAIN TECH
2+
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

src/main/java/com/iexec/core/detector/replicate/RevealUnnotifiedDetector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2023 IEXEC BLOCKCHAIN TECH
2+
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

src/main/java/com/iexec/core/detector/replicate/UnnotifiedAbstractDetector.java

Lines changed: 39 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2023 IEXEC BLOCKCHAIN TECH
2+
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,8 +30,7 @@
3030

3131
import java.util.List;
3232

33-
import static com.iexec.common.replicate.ReplicateStatus.WORKER_LOST;
34-
import static com.iexec.common.replicate.ReplicateStatus.getMissingStatuses;
33+
import static com.iexec.common.replicate.ReplicateStatus.*;
3534

3635
@Slf4j
3736
public abstract class UnnotifiedAbstractDetector {
@@ -95,21 +94,11 @@ void detectOnchainDoneWhenOffchainOngoing() {
9594
this.onchainDone, this.offchainOngoing, this.detectorRate);
9695

9796
for (Task task : taskService.findByCurrentStatus(detectWhenOffChainTaskStatuses)) {
98-
for (Replicate replicate : replicatesService.getReplicates(task.getChainTaskId())) {
99-
final ReplicateStatus lastRelevantStatus = replicate.getLastRelevantStatus();
100-
if (lastRelevantStatus != offchainOngoing) {
101-
continue;
102-
}
103-
104-
final boolean statusTrueOnChain = detectStatusReachedOnChain(
105-
task.getChainTaskId(), replicate.getWalletAddress());
106-
107-
if (statusTrueOnChain) {
108-
log.info("Detected confirmed missing update (replicate) [is:{}, should:{}, taskId:{}]",
109-
lastRelevantStatus, onchainDone, task.getChainTaskId());
110-
updateReplicateStatuses(task, replicate);
111-
}
112-
}
97+
replicatesService.getReplicates(task.getChainTaskId()).stream()
98+
.filter(replicate -> replicate.getLastRelevantStatus() == offchainOngoing)
99+
.filter(this::checkDetectionIsValid)
100+
.filter(this::detectStatusReachedOnChain)
101+
.forEach(replicate -> updateReplicateStatuses(task, replicate));
113102
}
114103
}
115104

@@ -123,33 +112,40 @@ void detectOnchainDoneWhenOffchainOngoing() {
123112
public void detectOnchainDone() {
124113
log.debug("Detect onchain {} [retryIn:{}]", onchainDone, this.detectorRate * LESS_OFTEN_DETECTOR_FREQUENCY);
125114
for (Task task : taskService.findByCurrentStatus(detectWhenOffChainTaskStatuses)) {
126-
for (Replicate replicate : replicatesService.getReplicates(task.getChainTaskId())) {
127-
final ReplicateStatus lastRelevantStatus = replicate.getLastRelevantStatus();
128-
129-
if (lastRelevantStatus == offchainDone) {
130-
continue;
131-
}
132-
133-
final boolean statusTrueOnChain = detectStatusReachedOnChain(
134-
task.getChainTaskId(), replicate.getWalletAddress());
135-
136-
if (statusTrueOnChain) {
137-
log.info("Detected confirmed missing update (replicate) [is:{}, should:{}, taskId:{}]",
138-
lastRelevantStatus, onchainDone, task.getChainTaskId());
139-
updateReplicateStatuses(task, replicate);
140-
}
141-
}
115+
replicatesService.getReplicates(task.getChainTaskId()).stream()
116+
.filter(replicate -> replicate.getLastRelevantStatus() != offchainDone)
117+
.filter(this::checkDetectionIsValid)
118+
.filter(this::detectStatusReachedOnChain)
119+
.forEach(replicate -> updateReplicateStatuses(task, replicate));
142120
}
143121
}
144122

123+
/**
124+
* Checks replicate eligibility to a detection against an {@code offchainDone} status.
125+
* <p>
126+
* All replicates are eligible to detection against {@code REVEALED}, but not against {@code CONTRIBUTED} or
127+
* {@code CONTRIBUTE_AND_FINALIZED}.
128+
*
129+
* @param replicate The replicate to check
130+
* @return {@literal true} if the replicate is eligible, {@literal false} otherwise
131+
*/
132+
private boolean checkDetectionIsValid(Replicate replicate) {
133+
final boolean isEligibleToContributeAndFinalize = iexecHubService.getTaskDescription(replicate.getChainTaskId())
134+
.isEligibleToContributeAndFinalize();
135+
return offchainDone == REVEALED
136+
|| (!isEligibleToContributeAndFinalize && offchainDone == CONTRIBUTED)
137+
|| (isEligibleToContributeAndFinalize && offchainDone == CONTRIBUTE_AND_FINALIZE_DONE);
138+
}
139+
145140
/**
146141
* Checks if {@code onchainDone} status has been reached on blockchain network.
147142
*
148-
* @param chainTaskId ID of on-chain task
149-
* @param walletAddress Address of a worker working on the current task.
150-
* @return
143+
* @param replicate Replicate whose on-chain status will be checked
144+
* @return {@literal true} if given status has been found on-chain, {@literal false} otherwise.
151145
*/
152-
private boolean detectStatusReachedOnChain(String chainTaskId, String walletAddress) {
146+
private boolean detectStatusReachedOnChain(Replicate replicate) {
147+
final String chainTaskId = replicate.getChainTaskId();
148+
final String walletAddress = replicate.getWalletAddress();
153149
switch (onchainDone) {
154150
case CONTRIBUTED:
155151
return iexecHubService.isContributed(chainTaskId, walletAddress);
@@ -171,14 +167,13 @@ private boolean detectStatusReachedOnChain(String chainTaskId, String walletAddr
171167
private void updateReplicateStatuses(Task task, Replicate replicate) {
172168
final String chainTaskId = task.getChainTaskId();
173169
final long initBlockNumber = task.getInitializationBlockNumber();
174-
175-
final ReplicateStatus retrieveFrom = replicate.getCurrentStatus().equals(WORKER_LOST)
176-
? replicate.getLastButOneStatus()
177-
: replicate.getCurrentStatus();
178-
final List<ReplicateStatus> statusesToUpdate = getMissingStatuses(retrieveFrom, offchainDone);
179-
170+
final ReplicateStatus lastRelevantStatus = replicate.getLastRelevantStatus();
171+
final List<ReplicateStatus> statusesToUpdate = getMissingStatuses(lastRelevantStatus, offchainDone);
180172
final String wallet = replicate.getWalletAddress();
181173

174+
log.info("Detected confirmed missing update (replicate) [is:{}, should:{}, taskId:{}]",
175+
lastRelevantStatus, onchainDone, task.getChainTaskId());
176+
182177
for (ReplicateStatus statusToUpdate : statusesToUpdate) {
183178
// add details to the update if needed
184179
ReplicateStatusDetails details = null;

src/test/java/com/iexec/core/detector/replicate/ContributionAndFinalizationUnnotifiedDetectorTests.java

Lines changed: 84 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,27 @@
1+
/*
2+
* Copyright 2023-2024 IEXEC BLOCKCHAIN TECH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package com.iexec.core.detector.replicate;
218

319
import com.iexec.common.replicate.ReplicateStatus;
420
import com.iexec.common.replicate.ReplicateStatusDetails;
521
import com.iexec.common.replicate.ReplicateStatusUpdate;
622
import com.iexec.commons.poco.chain.ChainReceipt;
23+
import com.iexec.commons.poco.task.TaskDescription;
24+
import com.iexec.commons.poco.utils.BytesUtils;
725
import com.iexec.core.chain.IexecHubService;
826
import com.iexec.core.chain.Web3jService;
927
import com.iexec.core.configuration.CronConfiguration;
@@ -14,15 +32,18 @@
1432
import com.iexec.core.task.TaskStatus;
1533
import org.junit.jupiter.api.BeforeEach;
1634
import org.junit.jupiter.api.Test;
35+
import org.junit.jupiter.params.ParameterizedTest;
36+
import org.junit.jupiter.params.provider.EnumSource;
1737
import org.mockito.*;
38+
import org.springframework.test.util.ReflectionTestUtils;
1839

19-
import java.util.Arrays;
40+
import java.math.BigInteger;
2041
import java.util.Collections;
2142

2243
import static com.iexec.common.replicate.ReplicateStatus.*;
2344
import static com.iexec.common.replicate.ReplicateStatusModifier.WORKER;
24-
import static org.mockito.ArgumentMatchers.any;
25-
import static org.mockito.ArgumentMatchers.eq;
45+
import static org.mockito.ArgumentMatchers.*;
46+
import static org.mockito.Mockito.never;
2647
import static org.mockito.Mockito.when;
2748

2849
class ContributionAndFinalizationUnnotifiedDetectorTests {
@@ -51,9 +72,23 @@ class ContributionAndFinalizationUnnotifiedDetectorTests {
5172
@BeforeEach
5273
void init() {
5374
MockitoAnnotations.openMocks(this);
75+
ReflectionTestUtils.setField(detector, "detectorRate", 1000);
76+
when(iexecHubService.getTaskDescription(anyString())).thenReturn(TaskDescription.builder()
77+
.trust(BigInteger.ONE)
78+
.isTeeTask(true)
79+
.callback(BytesUtils.EMPTY_ADDRESS)
80+
.build());
81+
}
82+
83+
private Replicate getReplicateWithStatus(ReplicateStatus replicateStatus) {
84+
Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID);
85+
ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder()
86+
.modifier(WORKER).status(replicateStatus).build();
87+
replicate.setStatusUpdateList(Collections.singletonList(statusUpdate));
88+
return replicate;
5489
}
5590

56-
// region Detector aggregator
91+
// region detectOnChainChanges
5792

5893
/**
5994
* When running {@link ContributionAndFinalizationUnnotifiedDetector#detectOnChainChanges} 10 times,
@@ -68,10 +103,7 @@ void shouldDetectBothChangesOnChain() {
68103
Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build();
69104
when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task));
70105

71-
Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID);
72-
ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(CONTRIBUTE_AND_FINALIZE_ONGOING).modifier(WORKER).build();
73-
replicate.setStatusUpdateList(Collections.singletonList(statusUpdate));
74-
106+
Replicate replicate = getReplicateWithStatus(CONTRIBUTE_AND_FINALIZE_ONGOING);
75107
when(replicatesService.getReplicates(CHAIN_TASK_ID)).thenReturn(Collections.singletonList(replicate));
76108
when(iexecHubService.isRevealed(CHAIN_TASK_ID, WALLET_ADDRESS)).thenReturn(true);
77109
when(web3jService.getLatestBlockNumber()).thenReturn(11L);
@@ -97,17 +129,14 @@ void shouldDetectBothChangesOnChain() {
97129

98130
// endregion
99131

100-
//region detectOnchainDoneWhenOffchainOngoing (ContributeAndFinalizeOngoing)
132+
// region detectOnchainDoneWhenOffchainOngoing (ContributeAndFinalizeOngoing)
101133

102134
@Test
103-
void shouldDetectUnNotifiedContributeAndFinalizeDoneAfterContributeAndFinalizeOngoing() {
135+
void shouldDetectMissedUpdateSinceOffChainOngoing() {
104136
Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build();
105-
when(taskService.findByCurrentStatus(Arrays.asList(TaskStatus.INITIALIZED, TaskStatus.RUNNING))).thenReturn(Collections.singletonList(task));
106-
107-
Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID);
108-
ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(CONTRIBUTE_AND_FINALIZE_ONGOING).build();
109-
replicate.setStatusUpdateList(Collections.singletonList(statusUpdate));
137+
when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task));
110138

139+
Replicate replicate = getReplicateWithStatus(CONTRIBUTE_AND_FINALIZE_ONGOING);
111140
when(replicatesService.getReplicates(CHAIN_TASK_ID)).thenReturn(Collections.singletonList(replicate));
112141
when(iexecHubService.isRevealed(CHAIN_TASK_ID, WALLET_ADDRESS)).thenReturn(true);
113142
when(web3jService.getLatestBlockNumber()).thenReturn(11L);
@@ -130,53 +159,45 @@ void shouldDetectUnNotifiedContributeAndFinalizeDoneAfterContributeAndFinalizeOn
130159
}
131160

132161
@Test
133-
void shouldDetectUnNotifiedContributeAndFinalizeDoneSinceBeforeContributeAndFinalizeOngoing() {
162+
void shouldNotDetectMissedUpdateSinceNotOffChainOngoing() {
134163
Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build();
135-
when(taskService.findByCurrentStatus(Arrays.asList(TaskStatus.INITIALIZED, TaskStatus.RUNNING))).thenReturn(Collections.singletonList(task));
136-
137-
Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID);
138-
ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(COMPUTED).build();
139-
replicate.setStatusUpdateList(Collections.singletonList(statusUpdate));
164+
when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task));
140165

166+
Replicate replicate = getReplicateWithStatus(COMPUTED);
141167
when(replicatesService.getReplicates(CHAIN_TASK_ID)).thenReturn(Collections.singletonList(replicate));
142168
when(iexecHubService.isRevealed(CHAIN_TASK_ID, WALLET_ADDRESS)).thenReturn(true);
143169

144170
detector.detectOnchainDoneWhenOffchainOngoing();
145171

146-
Mockito.verify(replicatesService, Mockito.times(0))
172+
Mockito.verify(replicatesService, never())
147173
.updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class));
148174
}
149175

150176
@Test
151-
void shouldNotDetectUnNotifiedContributeAndFinalizeDoneSinceNotFinalizedOnChain() {
177+
void shouldNotDetectMissedUpdateSinceNotOnChainDone() {
152178
Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build();
153-
when(taskService.findByCurrentStatus(Arrays.asList(TaskStatus.INITIALIZED, TaskStatus.RUNNING))).thenReturn(Collections.singletonList(task));
154-
155-
Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID);
156-
ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(CONTRIBUTE_AND_FINALIZE_ONGOING).build();
157-
replicate.setStatusUpdateList(Collections.singletonList(statusUpdate));
179+
when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task));
158180

181+
Replicate replicate = getReplicateWithStatus(CONTRIBUTE_AND_FINALIZE_ONGOING);
159182
when(replicatesService.getReplicates(CHAIN_TASK_ID)).thenReturn(Collections.singletonList(replicate));
160183
when(iexecHubService.isRevealed(CHAIN_TASK_ID, WALLET_ADDRESS)).thenReturn(false);
161184
detector.detectOnchainDoneWhenOffchainOngoing();
162185

163-
Mockito.verify(replicatesService, Mockito.times(0))
186+
Mockito.verify(replicatesService, never())
164187
.updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class));
165188
}
166189

167190
// endregion
168191

169-
//region detectOnchainDone (REVEALED)
192+
// region detectOnchainDone (REVEALED)
170193

171-
@Test
172-
void shouldDetectUnNotifiedContributeAndFinalizeOngoing() {
194+
@ParameterizedTest
195+
@EnumSource(value = ReplicateStatus.class, names = {"COMPUTED", "CONTRIBUTE_AND_FINALIZE_ONGOING"})
196+
void shouldDetectMissedUpdateSinceOnChainDoneNotOffChainDone(ReplicateStatus replicateStatus) {
173197
Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build();
174-
when(taskService.findByCurrentStatus(Arrays.asList(TaskStatus.INITIALIZED, TaskStatus.RUNNING))).thenReturn(Collections.singletonList(task));
175-
176-
Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID);
177-
ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(CONTRIBUTE_AND_FINALIZE_ONGOING).build();
178-
replicate.setStatusUpdateList(Collections.singletonList(statusUpdate));
198+
when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task));
179199

200+
Replicate replicate = getReplicateWithStatus(replicateStatus);
180201
when(replicatesService.getReplicates(CHAIN_TASK_ID)).thenReturn(Collections.singletonList(replicate));
181202
when(iexecHubService.isRevealed(CHAIN_TASK_ID, WALLET_ADDRESS)).thenReturn(true);
182203
when(web3jService.getLatestBlockNumber()).thenReturn(11L);
@@ -197,19 +218,38 @@ void shouldDetectUnNotifiedContributeAndFinalizeOngoing() {
197218
}
198219

199220
@Test
200-
void shouldNotDetectUnNotifiedContributedSinceContributeAndFinalizeDone() {
221+
void shouldNotDetectMissedUpdateSinceOnChainDoneAndOffChainDone() {
201222
Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build();
202-
when(taskService.findByCurrentStatus(Arrays.asList(TaskStatus.INITIALIZED, TaskStatus.RUNNING))).thenReturn(Collections.singletonList(task));
203-
204-
Replicate replicate = new Replicate(WALLET_ADDRESS, CHAIN_TASK_ID);
205-
ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder().status(CONTRIBUTE_AND_FINALIZE_DONE).build();
206-
replicate.setStatusUpdateList(Collections.singletonList(statusUpdate));
223+
when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task));
207224

225+
Replicate replicate = getReplicateWithStatus(CONTRIBUTE_AND_FINALIZE_DONE);
208226
when(replicatesService.getReplicates(CHAIN_TASK_ID)).thenReturn(Collections.singletonList(replicate));
209227
when(iexecHubService.isRevealed(CHAIN_TASK_ID, WALLET_ADDRESS)).thenReturn(true);
210228
detector.detectOnchainDone();
211229

212-
Mockito.verify(replicatesService, Mockito.times(0))
230+
Mockito.verify(replicatesService, never())
231+
.updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class));
232+
}
233+
234+
@Test
235+
void shouldNotDetectMissedUpdateSinceOnChainDoneAndNotEligibleToContributeAndFinalize() {
236+
when(iexecHubService.getTaskDescription(CHAIN_TASK_ID)).thenReturn(
237+
TaskDescription.builder().trust(BigInteger.ONE).isTeeTask(true).callback("0x2").build());
238+
Task task = Task.builder().chainTaskId(CHAIN_TASK_ID).build();
239+
when(taskService.findByCurrentStatus(TaskStatus.getWaitingContributionStatuses())).thenReturn(Collections.singletonList(task));
240+
241+
Replicate replicate = getReplicateWithStatus(CONTRIBUTING);
242+
when(replicatesService.getReplicates(any())).thenReturn(Collections.singletonList(replicate));
243+
when(iexecHubService.isContributed(any(), any())).thenReturn(true);
244+
when(web3jService.getLatestBlockNumber()).thenReturn(11L);
245+
when(iexecHubService.getContributionBlock(anyString(), anyString(), anyLong())).thenReturn(ChainReceipt.builder()
246+
.blockNumber(10L)
247+
.txHash("0xabcef")
248+
.build());
249+
250+
detector.detectOnchainDone();
251+
252+
Mockito.verify(replicatesService, never())
213253
.updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class));
214254
}
215255

0 commit comments

Comments
 (0)