Skip to content

Commit b5588b1

Browse files
authored
Merge pull request #654 from iExecBlockchainComputing/feature/expose-current-statuses-count
Expose current task statuses count on `/metrics` endpoint
2 parents 1cdeed2 + 45a6899 commit b5588b1

File tree

8 files changed

+185
-42
lines changed

8 files changed

+185
-42
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ All notable changes to this project will be documented in this file.
1212
- Add prometheus endpoint with custom metrics. (#632)
1313
- Expose version through prometheus endpoint. (#637, #639)
1414
- Stop fetching completed tasks count from DB. (#638)
15-
- Expose current task statuses count to Prometheus. (#640)
15+
- Expose current task statuses count to Prometheus and `/metrics` endpoint. (#640, #654)
1616
- Add `tasks` endpoints to `iexec-core-library`. (#645)
1717

1818
### Quality

iexec-core-library/src/main/java/com/iexec/core/metric/PlatformMetric.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818

1919
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
2020
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
21+
import com.iexec.core.task.TaskStatus;
2122
import lombok.Builder;
2223
import lombok.Value;
2324

2425
import java.math.BigInteger;
26+
import java.util.LinkedHashMap;
2527

2628
@Value
2729
@Builder
@@ -32,7 +34,7 @@ public class PlatformMetric {
3234
int aliveAvailableCpu;
3335
int aliveTotalGpu;
3436
int aliveAvailableGpu;
35-
long completedTasks;
37+
LinkedHashMap<TaskStatus, Long> currentTaskStatusesCount;
3638
long dealEventsCount;
3739
long dealsCount;
3840
long replayDealsCount;

iexec-core-library/src/test/java/com/iexec/core/metric/PlatformMetricTests.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818

1919
import com.fasterxml.jackson.core.JsonProcessingException;
2020
import com.fasterxml.jackson.databind.ObjectMapper;
21+
import com.iexec.core.task.TaskStatus;
2122
import org.junit.jupiter.api.Test;
2223

2324
import java.math.BigInteger;
25+
import java.util.LinkedHashMap;
2426

2527
import static org.junit.jupiter.api.Assertions.assertEquals;
2628

@@ -35,11 +37,38 @@ void shouldSerializeAndDeserialize() throws JsonProcessingException {
3537
.aliveAvailableCpu(7)
3638
.aliveTotalGpu(0)
3739
.aliveAvailableGpu(0)
38-
.completedTasks(1000)
40+
.currentTaskStatusesCount(createCurrentTaskStatusesCount())
3941
.dealEventsCount(3000)
4042
.dealsCount(1100)
4143
.latestBlockNumberWithDeal(BigInteger.valueOf(1_000_000L))
4244
.build();
4345
assertEquals(platformMetric, mapper.readValue(mapper.writeValueAsString(platformMetric), PlatformMetric.class));
4446
}
47+
48+
private LinkedHashMap<TaskStatus, Long> createCurrentTaskStatusesCount() {
49+
final LinkedHashMap<TaskStatus, Long> expectedCurrentTaskStatusesCount = new LinkedHashMap<>(TaskStatus.values().length);
50+
expectedCurrentTaskStatusesCount.put(TaskStatus.RECEIVED, 1L);
51+
expectedCurrentTaskStatusesCount.put(TaskStatus.INITIALIZING, 2L);
52+
expectedCurrentTaskStatusesCount.put(TaskStatus.INITIALIZED, 3L);
53+
expectedCurrentTaskStatusesCount.put(TaskStatus.INITIALIZE_FAILED, 4L);
54+
expectedCurrentTaskStatusesCount.put(TaskStatus.RUNNING, 5L);
55+
expectedCurrentTaskStatusesCount.put(TaskStatus.RUNNING_FAILED, 6L);
56+
expectedCurrentTaskStatusesCount.put(TaskStatus.CONTRIBUTION_TIMEOUT, 7L);
57+
expectedCurrentTaskStatusesCount.put(TaskStatus.CONSENSUS_REACHED, 8L);
58+
expectedCurrentTaskStatusesCount.put(TaskStatus.REOPENING, 9L);
59+
expectedCurrentTaskStatusesCount.put(TaskStatus.REOPENED, 10L);
60+
expectedCurrentTaskStatusesCount.put(TaskStatus.REOPEN_FAILED, 11L);
61+
expectedCurrentTaskStatusesCount.put(TaskStatus.AT_LEAST_ONE_REVEALED, 12L);
62+
expectedCurrentTaskStatusesCount.put(TaskStatus.RESULT_UPLOADING, 13L);
63+
expectedCurrentTaskStatusesCount.put(TaskStatus.RESULT_UPLOADED, 14L);
64+
expectedCurrentTaskStatusesCount.put(TaskStatus.RESULT_UPLOAD_TIMEOUT, 15L);
65+
expectedCurrentTaskStatusesCount.put(TaskStatus.FINALIZING, 16L);
66+
expectedCurrentTaskStatusesCount.put(TaskStatus.FINALIZED, 17L);
67+
expectedCurrentTaskStatusesCount.put(TaskStatus.FINALIZE_FAILED, 18L);
68+
expectedCurrentTaskStatusesCount.put(TaskStatus.FINAL_DEADLINE_REACHED, 19L);
69+
expectedCurrentTaskStatusesCount.put(TaskStatus.COMPLETED, 20L);
70+
expectedCurrentTaskStatusesCount.put(TaskStatus.FAILED, 21L);
71+
72+
return expectedCurrentTaskStatusesCount;
73+
}
4574
}

src/main/java/com/iexec/core/metric/MetricService.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,26 @@
1717
package com.iexec.core.metric;
1818

1919
import com.iexec.core.chain.DealWatcherService;
20-
import com.iexec.core.task.TaskService;
20+
import com.iexec.core.task.TaskStatus;
21+
import com.iexec.core.task.event.TaskStatusesCountUpdatedEvent;
2122
import com.iexec.core.worker.WorkerService;
23+
import org.springframework.context.event.EventListener;
2224
import org.springframework.stereotype.Service;
2325

26+
import java.util.LinkedHashMap;
27+
2428
@Service
2529
public class MetricService {
2630
private final DealWatcherService dealWatcherService;
2731
private final WorkerService workerService;
28-
private final TaskService taskService;
32+
private LinkedHashMap<TaskStatus, Long> currentTaskStatusesCount;
2933

3034
public MetricService(DealWatcherService dealWatcherService,
31-
WorkerService workerService,
32-
TaskService taskService) {
35+
WorkerService workerService) {
3336
this.dealWatcherService = dealWatcherService;
3437
this.workerService = workerService;
35-
this.taskService = taskService;
38+
39+
this.currentTaskStatusesCount = new LinkedHashMap<>();
3640
}
3741

3842
public PlatformMetric getPlatformMetrics() {
@@ -42,11 +46,16 @@ public PlatformMetric getPlatformMetrics() {
4246
.aliveAvailableCpu(workerService.getAliveAvailableCpu())
4347
.aliveTotalGpu(workerService.getAliveTotalGpu())
4448
.aliveAvailableGpu(workerService.getAliveAvailableGpu())
45-
.completedTasks(taskService.getCompletedTasksCount())
49+
.currentTaskStatusesCount(currentTaskStatusesCount)
4650
.dealEventsCount(dealWatcherService.getDealEventsCount())
4751
.dealsCount(dealWatcherService.getDealsCount())
4852
.replayDealsCount(dealWatcherService.getReplayDealsCount())
4953
.latestBlockNumberWithDeal(dealWatcherService.getLatestBlockNumberWithDeal())
5054
.build();
5155
}
56+
57+
@EventListener
58+
void onTaskStatusesCountUpdateEvent(TaskStatusesCountUpdatedEvent event) {
59+
this.currentTaskStatusesCount = event.getCurrentTaskStatusesCount();
60+
}
5261
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2024-2024 IEXEC BLOCKCHAIN TECH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.iexec.core.task.event;
18+
19+
import com.iexec.core.task.TaskStatus;
20+
import lombok.Value;
21+
22+
import java.util.LinkedHashMap;
23+
24+
@Value
25+
public class TaskStatusesCountUpdatedEvent {
26+
LinkedHashMap<TaskStatus, Long> currentTaskStatusesCount;
27+
}

src/main/java/com/iexec/core/task/update/TaskUpdateManager.java

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,7 @@ class TaskUpdateManager {
6565
private final BlockchainAdapterService blockchainAdapterService;
6666
private final SmsService smsService;
6767

68-
private final Map<TaskStatus, AtomicLong> currentTaskStatusesCount;
69-
private final ExecutorService taskStatusesCountExecutor;
68+
private final LinkedHashMap<TaskStatus, AtomicLong> currentTaskStatusesCount;
7069

7170
public TaskUpdateManager(TaskService taskService,
7271
IexecHubService iexecHubService,
@@ -97,23 +96,31 @@ public TaskUpdateManager(TaskService taskService,
9796
"status", status.name()
9897
).register(Metrics.globalRegistry);
9998
}
100-
101-
this.taskStatusesCountExecutor = Executors.newSingleThreadExecutor();
10299
}
103100

104101
@PostConstruct
105102
Future<Void> init() {
106-
return taskStatusesCountExecutor.submit(
107-
// The following could take a bit of time, depending on how many tasks are in DB.
108-
// It is expected to take ~1.7s for 1,000,000 tasks and to be linear (so, ~17s for 10,000,000 tasks).
109-
// As we use AtomicLongs, the final count should be accurate - no race conditions to expect,
110-
// even though new deals are detected during the count.
111-
() -> currentTaskStatusesCount
112-
.entrySet()
113-
.parallelStream()
114-
.forEach(entry -> entry.getValue().addAndGet(taskService.countByCurrentStatus(entry.getKey()))),
103+
final ExecutorService taskStatusesCountExecutor = Executors.newSingleThreadExecutor();
104+
final Future<Void> future = taskStatusesCountExecutor.submit(
105+
this::initializeCurrentTaskStatusesCount,
115106
null // Trick to get a `Future<Void>` instead of a `Future<?>`
116107
);
108+
taskStatusesCountExecutor.shutdown();
109+
return future;
110+
}
111+
112+
/**
113+
* The following could take a bit of time, depending on how many tasks are in DB.
114+
* It is expected to take ~1.7s for 1,000,000 tasks and to be linear (so, ~17s for 10,000,000 tasks).
115+
* As we use AtomicLongs, the final count should be accurate - no race conditions to expect,
116+
* even though new deals are detected during the count.
117+
*/
118+
private void initializeCurrentTaskStatusesCount() {
119+
currentTaskStatusesCount
120+
.entrySet()
121+
.parallelStream()
122+
.forEach(entry -> entry.getValue().addAndGet(taskService.countByCurrentStatus(entry.getKey())));
123+
publishTaskStatusesCountUpdate();
117124
}
118125

119126
void updateTask(String chainTaskId) {
@@ -710,10 +717,28 @@ void toFailed(Task task, TaskStatus reason) {
710717
void updateMetricsAfterStatusUpdate(TaskStatus previousStatus, TaskStatus newStatus) {
711718
currentTaskStatusesCount.get(previousStatus).decrementAndGet();
712719
currentTaskStatusesCount.get(newStatus).incrementAndGet();
720+
publishTaskStatusesCountUpdate();
713721
}
714722

715723
@EventListener(TaskCreatedEvent.class)
716724
void onTaskCreatedEvent() {
717725
currentTaskStatusesCount.get(RECEIVED).incrementAndGet();
726+
publishTaskStatusesCountUpdate();
727+
}
728+
729+
private void publishTaskStatusesCountUpdate() {
730+
// Copying the map here ensures the original values can't be updated from outside this class.
731+
// As this data should be read only, no need for any atomic class.
732+
final LinkedHashMap<TaskStatus, Long> currentTaskStatusesCountToPublish = currentTaskStatusesCount
733+
.entrySet()
734+
.stream()
735+
.collect(Collectors.toMap(
736+
Map.Entry::getKey,
737+
entrySet -> entrySet.getValue().get(),
738+
(a, b) -> b,
739+
LinkedHashMap::new
740+
));
741+
final TaskStatusesCountUpdatedEvent event = new TaskStatusesCountUpdatedEvent(currentTaskStatusesCountToPublish);
742+
applicationEventPublisher.publishEvent(event);
718743
}
719744
}

src/test/java/com/iexec/core/metric/MetricServiceTests.java

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,19 @@
1717
package com.iexec.core.metric;
1818

1919
import com.iexec.core.chain.DealWatcherService;
20-
import com.iexec.core.task.TaskService;
20+
import com.iexec.core.task.TaskStatus;
21+
import com.iexec.core.task.event.TaskStatusesCountUpdatedEvent;
2122
import com.iexec.core.worker.Worker;
2223
import com.iexec.core.worker.WorkerService;
24+
import org.junit.jupiter.api.Assertions;
2325
import org.junit.jupiter.api.BeforeEach;
2426
import org.junit.jupiter.api.Test;
2527
import org.mockito.InjectMocks;
2628
import org.mockito.Mock;
2729
import org.mockito.MockitoAnnotations;
2830

2931
import java.math.BigInteger;
32+
import java.util.LinkedHashMap;
3033
import java.util.List;
3134

3235
import static org.assertj.core.api.Assertions.assertThat;
@@ -38,8 +41,6 @@ class MetricServiceTests {
3841
private DealWatcherService dealWatcherService;
3942
@Mock
4043
private WorkerService workerService;
41-
@Mock
42-
private TaskService taskService;
4344

4445
@InjectMocks
4546
private MetricService metricService;
@@ -51,30 +52,61 @@ void init() {
5152

5253
@Test
5354
void shouldGetPlatformMetrics() {
55+
final LinkedHashMap<TaskStatus, Long> expectedCurrentTaskStatusesCount = createExpectedCurrentTaskStatusesCount();
56+
5457
List<Worker> aliveWorkers = List.of(new Worker());
5558
when(workerService.getAliveWorkers()).thenReturn(aliveWorkers);
5659
when(workerService.getAliveTotalCpu()).thenReturn(1);
5760
when(workerService.getAliveAvailableCpu()).thenReturn(1);
5861
when(workerService.getAliveTotalGpu()).thenReturn(1);
5962
when(workerService.getAliveAvailableGpu()).thenReturn(1);
60-
when(taskService.getCompletedTasksCount())
61-
.thenReturn(10L);
6263
when(dealWatcherService.getDealEventsCount()).thenReturn(10L);
6364
when(dealWatcherService.getDealsCount()).thenReturn(8L);
6465
when(dealWatcherService.getReplayDealsCount()).thenReturn(2L);
6566
when(dealWatcherService.getLatestBlockNumberWithDeal()).thenReturn(BigInteger.valueOf(255L));
6667

6768
PlatformMetric metric = metricService.getPlatformMetrics();
68-
assertThat(metric.getAliveWorkers()).isEqualTo(aliveWorkers.size());
69-
assertThat(metric.getAliveTotalCpu()).isEqualTo(1);
70-
assertThat(metric.getAliveAvailableCpu()).isEqualTo(1);
71-
assertThat(metric.getAliveTotalGpu()).isEqualTo(1);
72-
assertThat(metric.getAliveAvailableGpu()).isEqualTo(1);
73-
assertThat(metric.getCompletedTasks()).isEqualTo(10L);
74-
assertThat(metric.getDealEventsCount()).isEqualTo(10);
75-
assertThat(metric.getDealsCount()).isEqualTo(8);
76-
assertThat(metric.getReplayDealsCount()).isEqualTo(2);
77-
assertThat(metric.getLatestBlockNumberWithDeal()).isEqualTo(255);
69+
Assertions.assertAll(
70+
() -> assertThat(metric.getAliveWorkers()).isEqualTo(aliveWorkers.size()),
71+
() -> assertThat(metric.getAliveTotalCpu()).isEqualTo(1),
72+
() -> assertThat(metric.getAliveAvailableCpu()).isEqualTo(1),
73+
() -> assertThat(metric.getAliveTotalGpu()).isEqualTo(1),
74+
() -> assertThat(metric.getAliveAvailableGpu()).isEqualTo(1),
75+
() -> assertThat(metric.getCurrentTaskStatusesCount()).isEqualTo(expectedCurrentTaskStatusesCount),
76+
() -> assertThat(metric.getDealEventsCount()).isEqualTo(10),
77+
() -> assertThat(metric.getDealsCount()).isEqualTo(8),
78+
() -> assertThat(metric.getReplayDealsCount()).isEqualTo(2),
79+
() -> assertThat(metric.getLatestBlockNumberWithDeal()).isEqualTo(255)
80+
);
81+
}
82+
83+
private LinkedHashMap<TaskStatus, Long> createExpectedCurrentTaskStatusesCount() {
84+
final LinkedHashMap<TaskStatus, Long> expectedCurrentTaskStatusesCount = new LinkedHashMap<>(TaskStatus.values().length);
85+
expectedCurrentTaskStatusesCount.put(TaskStatus.RECEIVED, 1L);
86+
expectedCurrentTaskStatusesCount.put(TaskStatus.INITIALIZING, 2L);
87+
expectedCurrentTaskStatusesCount.put(TaskStatus.INITIALIZED, 3L);
88+
expectedCurrentTaskStatusesCount.put(TaskStatus.INITIALIZE_FAILED, 4L);
89+
expectedCurrentTaskStatusesCount.put(TaskStatus.RUNNING, 5L);
90+
expectedCurrentTaskStatusesCount.put(TaskStatus.RUNNING_FAILED, 6L);
91+
expectedCurrentTaskStatusesCount.put(TaskStatus.CONTRIBUTION_TIMEOUT, 7L);
92+
expectedCurrentTaskStatusesCount.put(TaskStatus.CONSENSUS_REACHED, 8L);
93+
expectedCurrentTaskStatusesCount.put(TaskStatus.REOPENING, 9L);
94+
expectedCurrentTaskStatusesCount.put(TaskStatus.REOPENED, 10L);
95+
expectedCurrentTaskStatusesCount.put(TaskStatus.REOPEN_FAILED, 11L);
96+
expectedCurrentTaskStatusesCount.put(TaskStatus.AT_LEAST_ONE_REVEALED, 12L);
97+
expectedCurrentTaskStatusesCount.put(TaskStatus.RESULT_UPLOADING, 13L);
98+
expectedCurrentTaskStatusesCount.put(TaskStatus.RESULT_UPLOADED, 14L);
99+
expectedCurrentTaskStatusesCount.put(TaskStatus.RESULT_UPLOAD_TIMEOUT, 15L);
100+
expectedCurrentTaskStatusesCount.put(TaskStatus.FINALIZING, 16L);
101+
expectedCurrentTaskStatusesCount.put(TaskStatus.FINALIZED, 17L);
102+
expectedCurrentTaskStatusesCount.put(TaskStatus.FINALIZE_FAILED, 18L);
103+
expectedCurrentTaskStatusesCount.put(TaskStatus.FINAL_DEADLINE_REACHED, 19L);
104+
expectedCurrentTaskStatusesCount.put(TaskStatus.COMPLETED, 20L);
105+
expectedCurrentTaskStatusesCount.put(TaskStatus.FAILED, 21L);
106+
107+
metricService.onTaskStatusesCountUpdateEvent(new TaskStatusesCountUpdatedEvent(expectedCurrentTaskStatusesCount));
108+
109+
return expectedCurrentTaskStatusesCount;
78110
}
79111

80112
}

0 commit comments

Comments
 (0)