Skip to content

Commit ebcc160

Browse files
committed
Implement scalable counter
Implementation is based on chapter 5.2.2 of Paul E. McKenney (2017), "Is Parallel Programming Hard, And, If So, What Can You Do About It?"
1 parent a18dd96 commit ebcc160

File tree

5 files changed

+105
-12
lines changed

5 files changed

+105
-12
lines changed

core/include/prometheus/counter.h

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,50 @@
11
#pragma once
22

3+
#include <array>
34
#include <atomic>
5+
#include <exception>
46

57
#include "prometheus/client_metric.h"
6-
#include "prometheus/gauge.h"
78

89
namespace prometheus {
910

1011
class Counter {
1112
public:
1213
static const MetricType metric_type = MetricType::Counter;
1314

14-
void Increment();
15-
void Increment(double);
15+
void Increment() { Increment(1.0); }
16+
void Increment(double val) { per_thread_counter_[ThreadId()].v += val; }
17+
1618
double Value() const;
1719

18-
ClientMetric Collect();
20+
ClientMetric Collect() const;
1921

2022
private:
21-
Gauge gauge_;
23+
int ThreadId() {
24+
thread_local int id_{-1};
25+
26+
if (id_ == -1) {
27+
id_ = AssignThreadId();
28+
}
29+
return id_;
30+
}
31+
32+
int AssignThreadId() {
33+
const int id_{count_.fetch_add(1)};
34+
35+
if (id_ >= per_thread_counter_.size()) {
36+
std::terminate();
37+
}
38+
39+
return id_;
40+
}
41+
42+
struct alignas(128) cacheline {
43+
double v{0.0};
44+
};
45+
46+
std::atomic<int> count_{0};
47+
std::array<cacheline, 256> per_thread_counter_{};
2248
};
2349

2450
} // namespace prometheus

core/src/counter.cc

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
1-
#include "prometheus/counter.h"
21

3-
namespace prometheus {
2+
#include "prometheus/counter.h"
43

5-
void Counter::Increment() { gauge_.Increment(); }
4+
#include <algorithm>
65

7-
void Counter::Increment(double val) { gauge_.Increment(val); }
6+
namespace prometheus {
87

9-
double Counter::Value() const { return gauge_.Value(); }
8+
double Counter::Value() const {
9+
return std::accumulate(
10+
std::begin(per_thread_counter_), std::end(per_thread_counter_), 0.0,
11+
[](const double a, const cacheline& b) { return a + b.v; });
12+
}
1013

11-
ClientMetric Counter::Collect() {
14+
ClientMetric Counter::Collect() const {
1215
ClientMetric metric;
1316
metric.counter.value = Value();
1417
return metric;

core/tests/benchmark/counter_bench.cc

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,32 @@ static void BM_Counter_Increment(benchmark::State& state) {
1010
BuildCounter().Name("benchmark_counter").Help("").Register(registry);
1111
auto& counter = counter_family.Add({});
1212

13-
while (state.KeepRunning()) counter.Increment();
13+
while (state.KeepRunning()) {
14+
counter.Increment();
15+
}
1416
}
1517
BENCHMARK(BM_Counter_Increment);
1618

19+
class BM_Counter : public benchmark::Fixture {
20+
protected:
21+
BM_Counter() { this->ThreadPerCpu(); }
22+
23+
prometheus::Registry registry;
24+
prometheus::Family<prometheus::Counter>& counter_family =
25+
prometheus::BuildCounter()
26+
.Name("benchmark_counter")
27+
.Help("")
28+
.Register(registry);
29+
prometheus::Counter& counter = counter_family.Add({});
30+
};
31+
32+
BENCHMARK_F(BM_Counter, ConcurrentIncrement)
33+
(benchmark::State& state) {
34+
for (auto _ : state) {
35+
counter.Increment();
36+
}
37+
}
38+
1739
static void BM_Counter_Collect(benchmark::State& state) {
1840
using prometheus::BuildCounter;
1941
using prometheus::Counter;

core/tests/benchmark/gauge_bench.cc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,26 @@ static void BM_Gauge_Increment(benchmark::State& state) {
1414
}
1515
BENCHMARK(BM_Gauge_Increment);
1616

17+
class BM_Gauge : public benchmark::Fixture {
18+
protected:
19+
BM_Gauge() { this->ThreadPerCpu(); }
20+
21+
prometheus::Registry registry;
22+
prometheus::Family<prometheus::Gauge>& gauge_family{
23+
prometheus::BuildGauge()
24+
.Name("benchmark_gauge")
25+
.Help("")
26+
.Register(registry)};
27+
prometheus::Gauge& gauge{gauge_family.Add({})};
28+
};
29+
30+
BENCHMARK_F(BM_Gauge, ConcurrentIncrement)
31+
(benchmark::State& state) {
32+
for (auto _ : state) {
33+
gauge.Increment();
34+
}
35+
}
36+
1737
static void BM_Gauge_Decrement(benchmark::State& state) {
1838
using prometheus::BuildGauge;
1939
using prometheus::Gauge;

core/tests/counter_test.cc

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
#include "gmock/gmock.h"
22

3+
#include <thread>
4+
#include <vector>
5+
36
#include <prometheus/counter.h>
47

58
using namespace testing;
@@ -31,3 +34,22 @@ TEST_F(CounterTest, inc_multiple) {
3134
counter.Increment(5);
3235
EXPECT_EQ(counter.Value(), 7.0);
3336
}
37+
38+
TEST_F(CounterTest, concurrent) {
39+
Counter counter;
40+
std::vector<std::thread> threads(std::thread::hardware_concurrency());
41+
42+
for (auto& thread : threads) {
43+
thread = std::thread{[&counter]() {
44+
for (int i = {0}; i < 100000; ++i) {
45+
counter.Increment();
46+
}
47+
}};
48+
}
49+
50+
for (auto& thread : threads) {
51+
thread.join();
52+
}
53+
54+
EXPECT_EQ(100000 * threads.size(), counter.Value());
55+
}

0 commit comments

Comments
 (0)