diff --git a/memmetrics/roundtrip.go b/memmetrics/roundtrip.go index 34b39691..5cbf0779 100644 --- a/memmetrics/roundtrip.go +++ b/memmetrics/roundtrip.go @@ -15,16 +15,15 @@ import ( // are a rolling window histograms with defined precision as well. // See RTOptions for more detail on parameters. type RTMetrics struct { - total *RollingCounter - netErrors *RollingCounter - statusCodes map[int]*RollingCounter - statusCodesLock sync.RWMutex - histogram *RollingHDRHistogram - histogramLock sync.RWMutex - - newCounter NewCounterFn - newHist NewRollingHistogramFn - clock timetools.TimeProvider + // lock protects all data members. + lock sync.Mutex + total *RollingCounter + netErrors *RollingCounter + statusCodes map[int]*RollingCounter + histogram *RollingHDRHistogram + newCounter NewCounterFn + newHist NewRollingHistogramFn + clock timetools.TimeProvider } type rrOptSetter func(r *RTMetrics) error @@ -65,8 +64,7 @@ func RTClock(clock timetools.TimeProvider) rrOptSetter { // NewRTMetrics returns new instance of metrics collector. func NewRTMetrics(settings ...rrOptSetter) (*RTMetrics, error) { m := &RTMetrics{ - statusCodes: make(map[int]*RollingCounter), - statusCodesLock: sync.RWMutex{}, + statusCodes: make(map[int]*RollingCounter), } for _, s := range settings { if err := s(m); err != nil { @@ -113,14 +111,10 @@ func NewRTMetrics(settings ...rrOptSetter) (*RTMetrics, error) { // Export Returns a new RTMetrics which is a copy of the current one func (m *RTMetrics) Export() *RTMetrics { - m.statusCodesLock.RLock() - defer m.statusCodesLock.RUnlock() - m.histogramLock.RLock() - defer m.histogramLock.RUnlock() + m.lock.Lock() + defer m.lock.Unlock() export := &RTMetrics{} - export.statusCodesLock = sync.RWMutex{} - export.histogramLock = sync.RWMutex{} export.total = m.total.Clone() export.netErrors = m.netErrors.Clone() exportStatusCodes := map[int]*RollingCounter{} @@ -140,12 +134,16 @@ func (m *RTMetrics) Export() *RTMetrics { // CounterWindowSize gets total windows size func (m *RTMetrics) CounterWindowSize() time.Duration { + m.lock.Lock() + defer m.lock.Unlock() return m.total.WindowSize() } // NetworkErrorRatio calculates the amont of network errors such as time outs and dropped connection // that occurred in the given time window compared to the total requests count. func (m *RTMetrics) NetworkErrorRatio() float64 { + m.lock.Lock() + defer m.lock.Unlock() if m.total.Count() == 0 { return 0 } @@ -154,10 +152,10 @@ func (m *RTMetrics) NetworkErrorRatio() float64 { // ResponseCodeRatio calculates ratio of count(startA to endA) / count(startB to endB) func (m *RTMetrics) ResponseCodeRatio(startA, endA, startB, endB int) float64 { + m.lock.Lock() + defer m.lock.Unlock() a := int64(0) b := int64(0) - m.statusCodesLock.RLock() - defer m.statusCodesLock.RUnlock() for code, v := range m.statusCodes { if code < endA && code >= startA { a += v.Count() @@ -174,6 +172,9 @@ func (m *RTMetrics) ResponseCodeRatio(startA, endA, startB, endB int) float64 { // Append append a metric func (m *RTMetrics) Append(other *RTMetrics) error { + m.lock.Lock() + defer m.lock.Unlock() + if m == other { return errors.New("RTMetrics cannot append to self") } @@ -188,10 +189,6 @@ func (m *RTMetrics) Append(other *RTMetrics) error { copied := other.Export() - m.statusCodesLock.Lock() - defer m.statusCodesLock.Unlock() - m.histogramLock.Lock() - defer m.histogramLock.Unlock() for code, c := range copied.statusCodes { o, ok := m.statusCodes[code] if ok { @@ -208,6 +205,9 @@ func (m *RTMetrics) Append(other *RTMetrics) error { // Record records a metric func (m *RTMetrics) Record(code int, duration time.Duration) { + m.lock.Lock() + defer m.lock.Unlock() + m.total.Inc(1) if code == http.StatusGatewayTimeout || code == http.StatusBadGateway { m.netErrors.Inc(1) @@ -218,19 +218,24 @@ func (m *RTMetrics) Record(code int, duration time.Duration) { // TotalCount returns total count of processed requests collected. func (m *RTMetrics) TotalCount() int64 { + m.lock.Lock() + defer m.lock.Unlock() return m.total.Count() } // NetworkErrorCount returns total count of processed requests observed func (m *RTMetrics) NetworkErrorCount() int64 { + m.lock.Lock() + defer m.lock.Unlock() return m.netErrors.Count() } // StatusCodesCounts returns map with counts of the response codes func (m *RTMetrics) StatusCodesCounts() map[int]int64 { + m.lock.Lock() + defer m.lock.Unlock() + sc := make(map[int]int64) - m.statusCodesLock.RLock() - defer m.statusCodesLock.RUnlock() for k, v := range m.statusCodes { if v.Count() != 0 { sc[k] = v.Count() @@ -241,40 +246,32 @@ func (m *RTMetrics) StatusCodesCounts() map[int]int64 { // LatencyHistogram computes and returns resulting histogram with latencies observed. func (m *RTMetrics) LatencyHistogram() (*HDRHistogram, error) { - m.histogramLock.Lock() - defer m.histogramLock.Unlock() + m.lock.Lock() + defer m.lock.Unlock() return m.histogram.Merged() } // Reset reset metrics func (m *RTMetrics) Reset() { - m.statusCodesLock.Lock() - defer m.statusCodesLock.Unlock() - m.histogramLock.Lock() - defer m.histogramLock.Unlock() + m.lock.Lock() + defer m.lock.Unlock() m.histogram.Reset() m.total.Reset() m.netErrors.Reset() m.statusCodes = make(map[int]*RollingCounter) } +// WARNING: Lock must be held before calling. func (m *RTMetrics) recordLatency(d time.Duration) error { - m.histogramLock.Lock() - defer m.histogramLock.Unlock() return m.histogram.RecordLatencies(d, 1) } +// WARNING: Lock must be held before calling. func (m *RTMetrics) recordStatusCode(statusCode int) error { - m.statusCodesLock.Lock() if c, ok := m.statusCodes[statusCode]; ok { c.Inc(1) - m.statusCodesLock.Unlock() return nil } - m.statusCodesLock.Unlock() - - m.statusCodesLock.Lock() - defer m.statusCodesLock.Unlock() // Check if another goroutine has written our counter already if c, ok := m.statusCodes[statusCode]; ok { diff --git a/memmetrics/roundtrip_test.go b/memmetrics/roundtrip_test.go index 2de6be23..022b945e 100644 --- a/memmetrics/roundtrip_test.go +++ b/memmetrics/roundtrip_test.go @@ -12,6 +12,61 @@ import ( "github.com/vulcand/oxy/testutils" ) +func BenchmarkRecord(b *testing.B) { + b.ReportAllocs() + + rr, err := NewRTMetrics(RTClock(testutils.GetClock())) + require.NoError(b, err) + + // warm up metrics. Adding a new code can do allocations, but in the steady + // state recording a code is cheap. We want to measure the steady state. + const codes = 100 + for code := 0; code < codes; code++ { + rr.Record(code, time.Second) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + rr.Record(i%codes, time.Second) + } +} + +func BenchmarkRecordConcurrently(b *testing.B) { + b.ReportAllocs() + + rr, err := NewRTMetrics(RTClock(testutils.GetClock())) + require.NoError(b, err) + + // warm up metrics. Adding a new code can do allocations, but in the steady + // state recording a code is cheap. We want to measure the steady state. + const codes = 100 + for code := 0; code < codes; code++ { + rr.Record(code, time.Second) + } + + concurrency := runtime.NumCPU() + b.Logf("NumCPU: %d, Concurrency: %d, GOMAXPROCS: %d", + runtime.NumCPU(), concurrency, runtime.GOMAXPROCS(0)) + wg := sync.WaitGroup{} + wg.Add(concurrency) + perG := b.N/concurrency + if perG == 0 { + perG = 1 + } + + b.ResetTimer() + for i := 0; i < concurrency; i++ { + go func() { + for j := 0; j < perG; j++ { + rr.Record(j%codes, time.Second) + } + wg.Done() + }() + } + + wg.Wait() +} + func TestDefaults(t *testing.T) { rr, err := NewRTMetrics(RTClock(testutils.GetClock())) require.NoError(t, err) @@ -75,8 +130,11 @@ func TestAppend(t *testing.T) { } func TestConcurrentRecords(t *testing.T) { - // This test asserts a race condition which requires parallelism + // This test asserts a race condition which requires concurrency. Set + // GOMAXPROCS high for this test, then restore after test completes. + n := runtime.GOMAXPROCS(0) runtime.GOMAXPROCS(100) + defer runtime.GOMAXPROCS(n) rr, err := NewRTMetrics(RTClock(testutils.GetClock())) require.NoError(t, err) @@ -84,7 +142,7 @@ func TestConcurrentRecords(t *testing.T) { for code := 0; code < 100; code++ { for numRecords := 0; numRecords < 10; numRecords++ { go func(statusCode int) { - _ = rr.recordStatusCode(statusCode) + rr.Record(statusCode, time.Second) }(code) } } @@ -92,11 +150,9 @@ func TestConcurrentRecords(t *testing.T) { func TestRTMetricExportReturnsNewCopy(t *testing.T) { a := RTMetrics{ - clock: &timetools.RealTime{}, - statusCodes: map[int]*RollingCounter{}, - statusCodesLock: sync.RWMutex{}, - histogram: &RollingHDRHistogram{}, - histogramLock: sync.RWMutex{}, + clock: &timetools.RealTime{}, + statusCodes: map[int]*RollingCounter{}, + histogram: &RollingHDRHistogram{}, } var err error @@ -129,23 +185,4 @@ func TestRTMetricExportReturnsNewCopy(t *testing.T) { assert.NotNil(t, b.newCounter) assert.NotNil(t, b.newHist) assert.NotNil(t, b.clock) - - // a and b should have different locks - locksSucceed := make(chan bool) - go func() { - a.statusCodesLock.Lock() - b.statusCodesLock.Lock() - a.histogramLock.Lock() - b.histogramLock.Lock() - locksSucceed <- true - }() - - for { - select { - case <-locksSucceed: - return - case <-time.After(10 * time.Second): - t.FailNow() - } - } }