diff --git a/loadgen/ebbandflow/fairness_tracker.go b/loadgen/ebbandflow/fairness_tracker.go index c352f3e0..3966d5ab 100644 --- a/loadgen/ebbandflow/fairness_tracker.go +++ b/loadgen/ebbandflow/fairness_tracker.go @@ -1,7 +1,6 @@ package ebbandflow import ( - "container/heap" "fmt" "math" "sort" @@ -14,53 +13,34 @@ import ( // 2.0 means P95 can be up to 2x the P50 before violation. const significantDiffThreshold = 2.0 -// latencyHeap is a min-heap of float64 values (milliseconds). - -type latencyHeap []float64 - -func (h latencyHeap) Len() int { return len(h) } -func (h latencyHeap) Less(i, j int) bool { return h[i] < h[j] } -func (h latencyHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } - -func (h *latencyHeap) Push(x interface{}) { - *h = append(*h, x.(float64)) -} - -func (h *latencyHeap) Pop() interface{} { - old := *h - n := len(old) - x := old[n-1] - *h = old[0 : n-1] - return x -} +// Maximum number of latency samples to keep per key to prevent unbounded memory growth. +const maxSamplesPerKey = 1000 type FairnessTracker struct { mu sync.RWMutex - latencies map[string]*latencyHeap // fairness key -> heap of latencies in milliseconds - weights map[string]float32 // fairness key -> single weight + latencies map[string][]float64 // fairness key -> slice of latencies in milliseconds + weights map[string]float32 // fairness key -> single weight } -type ViolatorReport struct { +type OutlierReport struct { FairnessKey string `json:"fairnessKey"` P95 float64 `json:"p95"` Weight float64 `json:"weight"` WeightAdjustedP95 float64 `json:"weightAdjustedP95"` - ViolationSeverity float64 `json:"violationSeverity"` + OutlierSeverity float64 `json:"outlierSeverity"` } type FairnessReport struct { - KeyCount int `json:"keyCount"` - WeightAdjustedFairness float64 `json:"weightAdjustedFairness"` - JainsFairnessIndex float64 `json:"jainsFairnessIndex"` - CoefficientOfVariation float64 `json:"coefficientOfVariation"` - AtkinsonIndex float64 `json:"atkinsonIndex"` - Violations map[string]string `json:"violations"` // key is fairness indicator - TopViolators []ViolatorReport `json:"topViolators"` + KeyCount int `json:"keyCount"` + FairnessOutlierCount int `json:"fairnessOutlierCount"` + JainsFairnessIndex float64 `json:"jainsFairnessIndex"` + Violations map[string]string `json:"violations"` // key is fairness indicator + TopOutliers []OutlierReport `json:"topOutliers"` } func NewFairnessTracker() *FairnessTracker { return &FairnessTracker{ - latencies: make(map[string]*latencyHeap), + latencies: make(map[string][]float64), weights: make(map[string]float32), } } @@ -76,13 +56,19 @@ func (ft *FairnessTracker) Track( ft.mu.Lock() defer ft.mu.Unlock() - h, exists := ft.latencies[fairnessKey] - if !exists { - h = &latencyHeap{} - heap.Init(h) - ft.latencies[fairnessKey] = h + latencySlice := ft.latencies[fairnessKey] + + // Add the new sample + latencySlice = append(latencySlice, float64(scheduleToStartLatency.Milliseconds())) + + // Enforce bounded memory by keeping only the most recent samples + if len(latencySlice) > maxSamplesPerKey { + // Keep the most recent maxSamplesPerKey samples + copy(latencySlice, latencySlice[len(latencySlice)-maxSamplesPerKey:]) + latencySlice = latencySlice[:maxSamplesPerKey] } - heap.Push(h, float64(scheduleToStartLatency.Milliseconds())) + + ft.latencies[fairnessKey] = latencySlice ft.weights[fairnessKey] = fairnessWeight } @@ -94,15 +80,15 @@ func (ft *FairnessTracker) GetReport() (*FairnessReport, error) { return nil, fmt.Errorf("need at least 2 fairness keys, got %d", len(ft.latencies)) } - heapData := make(map[string][]float64, len(ft.latencies)) + latencyData := make(map[string][]float64, len(ft.latencies)) weights := make(map[string]float64, len(ft.weights)) - for key, h := range ft.latencies { - if h.Len() == 0 { + for key, slice := range ft.latencies { + if len(slice) == 0 { continue } - values := make([]float64, h.Len()) - copy(values, *h) - heapData[key] = values + values := make([]float64, len(slice)) + copy(values, slice) + latencyData[key] = values // Get the weight for this key if weight, exists := ft.weights[key]; exists { @@ -113,9 +99,9 @@ func (ft *FairnessTracker) GetReport() (*FairnessReport, error) { } ft.mu.RUnlock() - p95Values := make(map[string]float64, len(heapData)) - sampleCounts := make(map[string]int, len(heapData)) - for key, values := range heapData { + p95Values := make(map[string]float64, len(latencyData)) + sampleCounts := make(map[string]int, len(latencyData)) + for key, values := range latencyData { sort.Float64s(values) p95 := calculatePercentile(values, 0.95) p95Values[key] = p95 @@ -130,85 +116,70 @@ func (ft *FairnessTracker) GetReport() (*FairnessReport, error) { p95Slice := maps.Values(p95Values) sort.Float64s(p95Slice) - // Calculate weight-adjusted P95 values for all metrics. + // Calculate volume adjustments based on task count relative to average. + // This represents the "heaviness" - how much load each key is generating. + var totalSamples int + for _, count := range sampleCounts { + totalSamples += count + } + avgSamplesPerKey := float64(totalSamples) / float64(len(sampleCounts)) + + volumeMultipliers := make(map[string]float64, len(sampleCounts)) + for key, count := range sampleCounts { + // Volume multiplier: how much more/less volume this key has vs average + // Higher volume keys should naturally have higher latencies + volumeMultipliers[key] = float64(count) / avgSamplesPerKey + } + + // Calculate weight-adjusted P95 values accounting for both FairnessWeight and volume + // Formula: p95 / (fairnessWeight * volumeMultiplier) + // - fairnessWeight: intentional priority (higher = should get better treatment) + // - volumeMultiplier: natural volume effect (higher = naturally expect worse latency) weightAdjustedP95s := make(map[string]float64, len(p95Values)) for key, p95 := range p95Values { - if weight := weights[key]; weight > 0 { - weightAdjustedP95s[key] = p95 / weight + fairnessWeight := weights[key] + volumeMultiplier := volumeMultipliers[key] + + // Ensure all keys are included with sensible defaults to avoid silently dropping keys + if fairnessWeight <= 0 { + fairnessWeight = 1.0 } + if volumeMultiplier <= 0 { + volumeMultiplier = 1.0 + } + + // Divide by fairnessWeight: higher priority should have lower normalized latency + // Divide by volumeMultiplier: higher volume naturally has higher latency, so normalize for that + weightAdjustedP95s[key] = p95 / (fairnessWeight * volumeMultiplier) } - // Approach 1: Weight-Adjusted Proportional Fairness - // Normalizes each key's P95 by weight (P95/weight), then checks if the distribution - // of normalized values is flat. Higher weight keys should get lower latencies, so - // after normalization all keys should have similar values. We compare P95-of-normalized - // vs P50-of-normalized: if P95 >> P50, some keys are getting much worse treatment - // than the median even after accounting for their weight entitlement. - weightAdjustedFairness, weightAdjustedViolation := calculateWeightAdjustedFairness(weightAdjustedP95s, significantDiffThreshold) - - // Approach 2: Jain's Fairness Index (Weight-Adjusted) - // This is a standard networking fairness metric that measures how evenly distributed - // the weight-adjusted P95 latencies are across all fairness keys. It returns a value between 0 and 1: - // - 1.0 = perfect fairness (all keys have identical weight-adjusted latencies) - // - 0.8-1.0 = good fairness (small variations acceptable) - // - < 0.8 = significant unfairness (some keys much worse than others) - // Now accounts for task weights by using weight-adjusted P95 values. + // Count how many keys exceed the fairness threshold after weight adjustment + offenderCount, weightAdjustedViolation := calculateWeightAdjustedDispersionOffenders(weightAdjustedP95s, significantDiffThreshold) + + // Calculate Jain's Fairness Index (0-1 scale, where 1.0 = perfect fairness) jainsFairnessIndex := calculateJainsFairnessIndex(weightAdjustedP95s) jainsViolation := jainsFairnessIndex < 0.8 - // Approach 3: Coefficient of Variation (Weight-Adjusted) - // Measures relative spread (stddev/mean) of weight-adjusted P95 latencies. Higher CV indicates - // more variability in latencies across keys, suggesting unfair treatment. - // CV > 0.5 typically indicates significant unfairness. - coefficientOfVariation := calculateCoefficientOfVariation(weightAdjustedP95s) - cvViolation := coefficientOfVariation > 0.5 - - // Approach 4: Atkinson Index (Weight-Adjusted) - // Economics inequality measure focusing on worst-off keys. Uses parameter ε=1 - // to give equal weight to proportional gaps. Higher values indicate more inequality. - // Values > 0.3 suggest significant unfairness with some keys being severely disadvantaged. - // Now uses weight-adjusted P95 values to account for task priorities. - atkinsonIndex := calculateAtkinsonIndex(weightAdjustedP95s, 1.0) - atkinsonViolation := atkinsonIndex > 0.3 - - // Approach 5: Latency Envelope Analysis - // Checks if any key has P99/P95 ratio > 3.0, indicating "fat tail" distributions - // that suggest intermittent starvation or inconsistent scheduler behavior. - latencyEnvelopeViolation, latencyEnvelopeDesc := checkLatencyEnvelopeWithDesc(heapData, 3.0) - - // Identify top 5 violators - topViolators := ft.identifyTopViolators(p95Values, weights, p95Slice, significantDiffThreshold) + // Identify top 5 outliers + topOutliers := identifyTopOutliers(p95Values, weights, volumeMultipliers, p95Slice, significantDiffThreshold) // Create violations map violationMap := make(map[string]string) if weightAdjustedViolation { - desc := fmt.Sprintf("Weight-adjusted fairness: %.2f > %.2f threshold", weightAdjustedFairness, significantDiffThreshold) - violationMap["weight_adjusted_fairness"] = desc + desc := fmt.Sprintf("Weight-adjusted dispersion: %d offenders exceed %.2fx threshold", offenderCount, significantDiffThreshold) + violationMap["weight_adjusted_dispersion"] = desc } if jainsViolation { desc := fmt.Sprintf("Jain's fairness index: %.3f < 0.800 threshold", jainsFairnessIndex) violationMap["jains_fairness_index"] = desc } - if cvViolation { - desc := fmt.Sprintf("Coefficient of variation: %.3f > 0.500 threshold", coefficientOfVariation) - violationMap["coefficient_of_variation"] = desc - } - if atkinsonViolation { - desc := fmt.Sprintf("Atkinson index: %.3f > 0.300 threshold", atkinsonIndex) - violationMap["atkinson_index"] = desc - } - if latencyEnvelopeViolation { - violationMap["latency_envelope"] = latencyEnvelopeDesc - } return &FairnessReport{ - KeyCount: len(p95Values), - WeightAdjustedFairness: weightAdjustedFairness, - JainsFairnessIndex: jainsFairnessIndex, - CoefficientOfVariation: coefficientOfVariation, - AtkinsonIndex: atkinsonIndex, - Violations: violationMap, - TopViolators: topViolators, + KeyCount: len(p95Values), + FairnessOutlierCount: offenderCount, + JainsFairnessIndex: jainsFairnessIndex, + Violations: violationMap, + TopOutliers: topOutliers, }, nil } @@ -261,14 +232,14 @@ func calculateJainsFairnessIndex(values map[string]float64) float64 { return (sum * sum) / (n * sumSquares) } -// identifyTopViolators finds the top 5 keys with worst fairness violations -// Must be called with ft.mu held -func (ft *FairnessTracker) identifyTopViolators( +// identifyTopOutliers finds the top 5 keys with worst fairness outliers +func identifyTopOutliers( p95Values map[string]float64, weights map[string]float64, + volumeMultipliers map[string]float64, p95Slice []float64, threshold float64, -) []ViolatorReport { +) []OutlierReport { if len(p95Values) < 2 { return nil } @@ -279,46 +250,54 @@ func (ft *FairnessTracker) identifyTopViolators( return nil } - var violators []ViolatorReport + var outliers []OutlierReport for key, p95 := range p95Values { - weight := weights[key] - weightAdjustedP95 := p95 / weight + fairnessWeight := weights[key] + volumeMultiplier := volumeMultipliers[key] + + // Guard against division by zero + if fairnessWeight <= 0 { + fairnessWeight = 1.0 + } + if volumeMultiplier <= 0 { + volumeMultiplier = 1.0 + } - // Calculate violation severity based on how much this key exceeds the median - // For weighted keys, we expect LOWER latencies, so adjust the severity calculation - // Higher weight should result in lower relative severity for the same absolute latency - violationSeverity := (p95 / medianP95) * (1.0 / weight) + weightAdjustedP95 := p95 / (fairnessWeight * volumeMultiplier) + + // Calculate outlier severity based on how much this key exceeds the median + // Accounts for both fairness weight (intentional priority) and volume multiplier (natural load effect) + outlierSeverity := (p95 / medianP95) * (1.0 / (fairnessWeight * volumeMultiplier)) // Only include keys that significantly exceed the median - if violationSeverity > threshold { - violators = append(violators, ViolatorReport{ + if outlierSeverity > threshold { + outliers = append(outliers, OutlierReport{ FairnessKey: key, P95: p95, - Weight: weight, + Weight: fairnessWeight, WeightAdjustedP95: weightAdjustedP95, - ViolationSeverity: violationSeverity, + OutlierSeverity: outlierSeverity, }) } } - // Sort by violation severity (worst first) - sort.Slice(violators, func(i, j int) bool { - return violators[i].ViolationSeverity > violators[j].ViolationSeverity + // Sort by outlier severity (worst first) + sort.Slice(outliers, func(i, j int) bool { + return outliers[i].OutlierSeverity > outliers[j].OutlierSeverity }) // Return top 5 - if len(violators) > 5 { - violators = violators[:5] + if len(outliers) > 5 { + outliers = outliers[:5] } - - return violators + return outliers } -// calculateWeightAdjustedFairness computes weight-adjusted fairness metric and violation status -func calculateWeightAdjustedFairness(weightAdjustedP95s map[string]float64, threshold float64) (float64, bool) { +// calculateWeightAdjustedDispersionOffenders counts how many keys exceed the fairness threshold +func calculateWeightAdjustedDispersionOffenders(weightAdjustedP95s map[string]float64, threshold float64) (int, bool) { if len(weightAdjustedP95s) < 2 { - return 1.0, false + return 0, false } i := 0 @@ -329,123 +308,24 @@ func calculateWeightAdjustedFairness(weightAdjustedP95s map[string]float64, thre } if len(weightAdjustedSlice) < 2 { - return 1.0, false + return 0, false } sort.Float64s(weightAdjustedSlice) - p50OfWeightAdjusted := calculatePercentile(weightAdjustedSlice, 0.50) - p95OfWeightAdjusted := calculatePercentile(weightAdjustedSlice, 0.95) - - var fairnessRatio float64 - if p50OfWeightAdjusted > 0 { - fairnessRatio = p95OfWeightAdjusted / p50OfWeightAdjusted - } else { - fairnessRatio = 1.0 - } - - violation := fairnessRatio > threshold - return fairnessRatio, violation -} - -// calculateCoefficientOfVariation calculates CV = stddev/mean of P95 values -func calculateCoefficientOfVariation(p95Values map[string]float64) float64 { - if len(p95Values) < 2 { - return 0 - } - - // Calculate mean - var sum float64 - for _, p95 := range p95Values { - sum += p95 - } - mean := sum / float64(len(p95Values)) - - if mean == 0 { - return 0 - } - - // Calculate variance - var sumSquaredDiffs float64 - for _, p95 := range p95Values { - diff := p95 - mean - sumSquaredDiffs += diff * diff - } - variance := sumSquaredDiffs / float64(len(p95Values)) - stddev := math.Sqrt(variance) - - return stddev / mean -} - -// calculateAtkinsonIndex calculates Atkinson inequality index with parameter epsilon -func calculateAtkinsonIndex(p95Values map[string]float64, epsilon float64) float64 { - if len(p95Values) < 2 { - return 0 - } + medianWeightAdjusted := calculatePercentile(weightAdjustedSlice, 0.50) - var sum float64 - var logSum float64 - n := float64(len(p95Values)) - - for _, p95 := range p95Values { - if p95 <= 0 { - return 0 // Cannot compute with zero or negative values - } - sum += p95 - logSum += math.Log(p95) - } - - if sum == 0 { - return 0 - } - - arithmeticMean := sum / n - geometricMean := math.Exp(logSum / n) - - if arithmeticMean == 0 { - return 0 + if medianWeightAdjusted <= 0 { + return 0, false } - return 1 - (geometricMean / arithmeticMean) -} - -// checkLatencyEnvelope checks if any key has P99/P95 ratio exceeding threshold -func (ft *FairnessTracker) checkLatencyEnvelope(p95Values map[string]float64, threshold float64) bool { - for key := range p95Values { - h, exists := ft.latencies[key] - if !exists || h.Len() < 10 { // Need enough samples for P99 - continue - } - - values := make([]float64, h.Len()) - copy(values, *h) - sort.Float64s(values) - - p95 := calculatePercentile(values, 0.95) - p99 := calculatePercentile(values, 0.99) - - if p95 > 0 && (p99/p95) > threshold { - return true // Fat tail detected + // Count how many keys exceed threshold * median + offenderCount := 0 + for _, weightAdjustedP95 := range weightAdjustedP95s { + if weightAdjustedP95 > medianWeightAdjusted*threshold { + offenderCount++ } } - return false -} - -// checkLatencyEnvelopeWithDesc checks if any key has P99/P95 ratio exceeding threshold and returns descriptive text -func checkLatencyEnvelopeWithDesc(heapData map[string][]float64, threshold float64) (bool, string) { - for key, values := range heapData { - if len(values) < 10 { // Need enough samples for P99 - continue - } - // Values are already sorted from GetReport - p95 := calculatePercentile(values, 0.95) - p99 := calculatePercentile(values, 0.99) - - if p95 > 0 && (p99/p95) > threshold { - ratio := p99 / p95 - return true, fmt.Sprintf("Latency envelope violation: key '%s' P99/P95 ratio %.2f > %.1f threshold (P95=%.1fms, P99=%.1fms)", - key, ratio, threshold, p95, p99) - } - } - return false, "No latency envelope violations detected" + violation := offenderCount > 0 + return offenderCount, violation } diff --git a/loadgen/ebbandflow/fairness_tracker_test.go b/loadgen/ebbandflow/fairness_tracker_test.go index 4b8139e4..98541132 100644 --- a/loadgen/ebbandflow/fairness_tracker_test.go +++ b/loadgen/ebbandflow/fairness_tracker_test.go @@ -1,348 +1,138 @@ package ebbandflow import ( - "math" "testing" "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -func TestCalculateWeightAdjustedFairness(t *testing.T) { +func TestFairnessTrackerHandlesBothVolumeAndPriorityScenarios(t *testing.T) { tests := []struct { - name string - p95Values map[string]float64 - weights map[string]float64 - threshold float64 - expectedRatio float64 - expectedViolation bool + name string + setupFunc func(*FairnessTracker) + violationIndicators []string + expectedViolators []string // expected fairness keys in TopViolators (order matters) }{ { - name: "fair distribution - all equal after weight adjustment", - p95Values: map[string]float64{ - "keyA": 100, - "keyB": 200, - "keyC": 50, - }, - weights: map[string]float64{ - "keyA": 1.0, - "keyB": 2.0, - "keyC": 0.5, - }, - threshold: 2.0, - expectedRatio: 1.0, - expectedViolation: false, - }, - { - name: "unfair distribution - high weight key not getting priority", - p95Values: map[string]float64{ - "keyA": 100, // normalized = 100/1.0 = 100 - "keyB": 300, // normalized = 300/3.0 = 100 - "keyC": 400, // normalized = 400/1.0 = 400 (much worse!) - }, - weights: map[string]float64{ - "keyA": 1.0, - "keyB": 3.0, - "keyC": 1.0, - }, - threshold: 2.0, - expectedRatio: 4.0, // P95=400, P50=100, ratio=4.0 - expectedViolation: true, - }, - { - name: "insufficient data", - p95Values: map[string]float64{ - "keyA": 100, - }, - weights: map[string]float64{ - "keyA": 1.0, - }, - threshold: 2.0, - expectedRatio: 1.0, - expectedViolation: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Calculate weight-adjusted P95s first (matching the main code) - weightAdjustedP95s := make(map[string]float64) - for key, p95 := range tt.p95Values { - if weight := tt.weights[key]; weight > 0 { - weightAdjustedP95s[key] = p95 / weight + name: "keys have equal weights and equal volumes", + setupFunc: func(ft *FairnessTracker) { + // Equal weights, equal volumes, similar latencies -> should be fair + for i := 0; i < 5; i++ { + ft.Track("keyA", 1.0, 100*time.Millisecond) + ft.Track("keyB", 1.0, 110*time.Millisecond) } - } - - ratio, violation := calculateWeightAdjustedFairness(weightAdjustedP95s, tt.threshold) - - if math.Abs(ratio-tt.expectedRatio) > 0.5 { - t.Errorf("expected ratio %.2f, got %.2f", tt.expectedRatio, ratio) - } - - if violation != tt.expectedViolation { - t.Errorf("expected violation %v, got %v", tt.expectedViolation, violation) - } - }) - } -} - -func TestCalculateJainsFairnessIndex(t *testing.T) { - tests := []struct { - name string - p95Values map[string]float64 - expectedIndex float64 - }{ - { - name: "perfect fairness - all equal", - p95Values: map[string]float64{ - "keyA": 100, - "keyB": 100, - "keyC": 100, }, - expectedIndex: 1.0, + expectedViolators: []string{}, // no violators expected }, { - name: "moderate unfairness", - p95Values: map[string]float64{ - "keyA": 100, - "keyB": 200, - "keyC": 100, + name: "volume differences are properly adjusted", + setupFunc: func(ft *FairnessTracker) { + // Same weights, different volumes - should be fair with volume adjustment + ft.Track("keyA", 1.0, 50*time.Millisecond) // Low volume, low latency + for i := 0; i < 10; i++ { + ft.Track("keyB", 1.0, 200*time.Millisecond) // High volume, high latency + } }, - expectedIndex: 0.857, // (400)² / (3 × 60000) ≈ 0.889 + expectedViolators: []string{"keyA"}, // keyA exceeds individual threshold but overall fairness metrics pass }, { - name: "severe unfairness", - p95Values: map[string]float64{ - "keyA": 100, - "keyB": 500, - "keyC": 100, + name: "different weights receive proportional treatment", + setupFunc: func(ft *FairnessTracker) { + // Different weights with appropriately proportional latencies - should be fair + // keyB has 4x higher weight, so should get ~4x better latency + for i := 0; i < 5; i++ { + ft.Track("keyA", 1.0, 200*time.Millisecond) // Normal priority, normal latency + ft.Track("keyB", 4.0, 50*time.Millisecond) // 4x priority, 4x better latency + } }, - expectedIndex: 0.571, // (700)² / (3 × 290000) ≈ 0.563 + expectedViolators: []string{}, // proportional treatment should be fair }, { - name: "single key", - p95Values: map[string]float64{ - "keyA": 100, + name: "keys have different priorities", + setupFunc: func(ft *FairnessTracker) { + // Different weights, proportional latencies - will show some violations but that's expected + for i := 0; i < 5; i++ { + ft.Track("keyA", 1.0, 200*time.Millisecond) // Normal priority + ft.Track("keyB", 3.0, 70*time.Millisecond) // High priority, proportionally lower latency + } }, - expectedIndex: 1.0, + violationIndicators: []string{"jains_fairness_index"}, + expectedViolators: []string{}, // no individual violators, just overall fairness impact }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - index := calculateJainsFairnessIndex(tt.p95Values) - - if math.Abs(index-tt.expectedIndex) > 0.3 { - t.Errorf("expected index %.3f, got %.3f", tt.expectedIndex, index) - } - }) - } -} - -func TestCalculateCoefficientOfVariation(t *testing.T) { - tests := []struct { - name string - p95Values map[string]float64 - expectedCV float64 - }{ { - name: "no variation", - p95Values: map[string]float64{ - "keyA": 100, - "keyB": 100, - "keyC": 100, + name: "latency differences are extreme", + setupFunc: func(ft *FairnessTracker) { + // Same weights, same volumes, but extreme latency difference -> unfair + for i := 0; i < 5; i++ { + ft.Track("keyA", 1.0, 50*time.Millisecond) // Very fast + ft.Track("keyB", 1.0, 1000*time.Millisecond) // Very slow + } }, - expectedCV: 0.0, + violationIndicators: []string{"jains_fairness_index"}, + expectedViolators: []string{}, // 20x latency difference may not exceed 2.0 threshold after median normalization }, { - name: "moderate variation", - p95Values: map[string]float64{ - "keyA": 100, - "keyB": 150, - "keyC": 50, + name: "high priority keys receive worse treatment", + setupFunc: func(ft *FairnessTracker) { + // High priority key getting much worse treatment -> should cause violations + for i := 0; i < 5; i++ { + ft.Track("keyA", 1.0, 50*time.Millisecond) // Low priority, very good latency + ft.Track("keyB", 10.0, 2000*time.Millisecond) // High priority, terrible latency + } }, - expectedCV: 0.408, // stddev ≈ 40.8, mean = 100 + violationIndicators: []string{"jains_fairness_index"}, + expectedViolators: []string{}, // keyB has terrible treatment but may not exceed threshold due to high weight }, { - name: "high variation", - p95Values: map[string]float64{ - "keyA": 50, - "keyB": 200, - "keyC": 100, + name: "normalized latency distribution has extreme spread", + setupFunc: func(ft *FairnessTracker) { + // Creates extreme spread in weight-adjusted P95 distribution to trigger weight_adjusted_dispersion violation + // The weight_adjusted_dispersion metric compares P95 vs P50 of normalized latencies - if P95/P50 > 2.0, it violates + // This setup creates 3 keys with good performance and 1 with terrible performance + for i := 0; i < 10; i++ { + ft.Track("keyA", 1.0, 100*time.Millisecond) // Good: normalized = 100/(1.0*1.0) = 100 + ft.Track("keyB", 1.0, 100*time.Millisecond) // Good: normalized = 100/(1.0*1.0) = 100 + ft.Track("keyC", 1.0, 100*time.Millisecond) // Good: normalized = 100/(1.0*1.0) = 100 + ft.Track("keyD", 1.0, 5000*time.Millisecond) // Bad: normalized = 5000/(1.0*1.0) = 5000 + } + // Result: normalized P50≈100, P95≈5000, ratio=50 >> 2.0 threshold }, - expectedCV: 0.611, // stddev ≈ 61.2, mean ≈ 116.7 + violationIndicators: []string{"weight_adjusted_dispersion", "jains_fairness_index"}, + expectedViolators: []string{"keyD"}, // keyD has 50x worse latency than median, will exceed 2.0 threshold }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cv := calculateCoefficientOfVariation(tt.p95Values) + t.Parallel() - if math.Abs(cv-tt.expectedCV) > 0.1 { - t.Errorf("expected CV %.3f, got %.3f", tt.expectedCV, cv) - } - }) - } -} + ft := NewFairnessTracker() + tt.setupFunc(ft) -func TestCalculateAtkinsonIndex(t *testing.T) { - tests := []struct { - name string - p95Values map[string]float64 - epsilon float64 - expectedIndex float64 - }{ - { - name: "perfect equality", - p95Values: map[string]float64{ - "keyA": 100, - "keyB": 100, - "keyC": 100, - }, - epsilon: 1.0, - expectedIndex: 0.0, - }, - { - name: "moderate inequality", - p95Values: map[string]float64{ - "keyA": 100, - "keyB": 200, - "keyC": 100, - }, - epsilon: 1.0, - expectedIndex: 0.15, // Adjusted expectation - }, - { - name: "high inequality", - p95Values: map[string]float64{ - "keyA": 50, - "keyB": 300, - "keyC": 100, - }, - epsilon: 1.0, - expectedIndex: 0.35, // Adjusted expectation - }, - } + report, err := ft.GetReport() + require.NoError(t, err, "GetReport should succeed") - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - index := calculateAtkinsonIndex(tt.p95Values, tt.epsilon) + assert.GreaterOrEqual(t, report.KeyCount, 2) + assert.GreaterOrEqual(t, report.FairnessOutlierCount, 0) - if math.Abs(index-tt.expectedIndex) > 0.3 { - t.Errorf("expected index %.3f, got %.3f", tt.expectedIndex, index) + if len(tt.violationIndicators) > 0 { + assert.NotEmpty(t, tt.violationIndicators) + for _, indicator := range tt.violationIndicators { + assert.Contains(t, report.Violations, indicator) + } + } else { + assert.Empty(t, tt.violationIndicators) } - }) - } -} - -func TestCheckLatencyEnvelope(t *testing.T) { - // Create a fairness tracker with test data - ft := NewFairnessTracker() - - // Add latencies with fat tail for keyA - keyALatencies := []time.Duration{ - 50 * time.Millisecond, - 60 * time.Millisecond, - 70 * time.Millisecond, - 80 * time.Millisecond, - 90 * time.Millisecond, - 95 * time.Millisecond, - 100 * time.Millisecond, - 110 * time.Millisecond, - 120 * time.Millisecond, - 130 * time.Millisecond, // P95 ≈ 120ms - 140 * time.Millisecond, - 150 * time.Millisecond, - 160 * time.Millisecond, - 170 * time.Millisecond, - 180 * time.Millisecond, - 190 * time.Millisecond, - 200 * time.Millisecond, - 210 * time.Millisecond, - 3000 * time.Millisecond, // P99 outlier - creates fat tail - } - - for _, latency := range keyALatencies { - ft.Track("keyA", 1.0, latency) - } - - // Add normal distribution for keyB - keyBLatencies := []time.Duration{ - 90 * time.Millisecond, - 95 * time.Millisecond, - 100 * time.Millisecond, - 105 * time.Millisecond, - 110 * time.Millisecond, - 115 * time.Millisecond, - 120 * time.Millisecond, - 125 * time.Millisecond, - 130 * time.Millisecond, - 135 * time.Millisecond, - } - - for _, latency := range keyBLatencies { - ft.Track("keyB", 1.0, latency) - } - - p95Values := map[string]float64{ - "keyA": 120, // Will have P99 ≈ 500, ratio = 4.2 > 3.0 - "keyB": 130, // Will have P99 ≈ 135, ratio = 1.04 < 3.0 - } - tests := []struct { - name string - threshold float64 - expectedViolation bool - }{ - { - name: "fat tail detected with threshold 3.0", - threshold: 3.0, - expectedViolation: true, // keyA has P99/P95 > 3.0 - }, - { - name: "no violation with high threshold", - threshold: 6.0, - expectedViolation: false, // keyA's ratio ≈ 5.1 < 6.0 - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - violation := ft.checkLatencyEnvelope(p95Values, tt.threshold) - - if violation != tt.expectedViolation { - t.Errorf("expected violation %v, got %v", tt.expectedViolation, violation) + assert.Len(t, report.TopOutliers, len(tt.expectedViolators), "TopOutliers count should match expected") + if len(tt.expectedViolators) > 0 { + for i, expectedKey := range tt.expectedViolators { + assert.Equal(t, expectedKey, report.TopOutliers[i].FairnessKey, + "TopOutliers[%d] should be %s", i, expectedKey) + } } }) } } - -func TestGetReportIntegration(t *testing.T) { - ft := NewFairnessTracker() - - // Add some test data - ft.Track("keyA", 1.0, 100*time.Millisecond) - ft.Track("keyA", 1.0, 110*time.Millisecond) - ft.Track("keyB", 2.0, 200*time.Millisecond) // Higher weight, should get priority - ft.Track("keyB", 2.0, 220*time.Millisecond) - - report, err := ft.GetReport() - if err != nil { - t.Fatalf("GetReport failed: %v", err) - } - - // Verify report structure - if report.KeyCount != 2 { - t.Errorf("expected KeyCount 2, got %d", report.KeyCount) - } - - if report.JainsFairnessIndex < 0 || report.JainsFairnessIndex > 1 { - t.Errorf("Jain's index should be 0-1, got %.3f", report.JainsFairnessIndex) - } - - if report.CoefficientOfVariation < 0 { - t.Errorf("CV should be non-negative, got %.3f", report.CoefficientOfVariation) - } - - if report.AtkinsonIndex < 0 || report.AtkinsonIndex > 1 { - t.Errorf("Atkinson index should be 0-1, got %.3f", report.AtkinsonIndex) - } -} diff --git a/workers/go/ebbandflow/activities.go b/workers/go/ebbandflow/activities.go index a9057682..828ea744 100644 --- a/workers/go/ebbandflow/activities.go +++ b/workers/go/ebbandflow/activities.go @@ -42,9 +42,7 @@ func (a Activities) ProcessFairnessReport(ctx context.Context, report ebbandflow commonFields := []any{ "keyCount", report.KeyCount, "jainsFairnessIndex", fmt.Sprintf("%.3f", report.JainsFairnessIndex), - "coefficientOfVariation", fmt.Sprintf("%.3f", report.CoefficientOfVariation), - "atkinsonIndex", fmt.Sprintf("%.3f", report.AtkinsonIndex), - "weightAdjustedFairness", fmt.Sprintf("%.3f", report.WeightAdjustedFairness), + "fairnessOutlierCount", report.FairnessOutlierCount, } if len(report.Violations) == 0 { @@ -58,23 +56,21 @@ func (a Activities) ProcessFairnessReport(ctx context.Context, report ebbandflow violationSummary := fmt.Sprintf("%d violations: [%s]", len(violations), strings.Join(violations, "; ")) errorFields := append(commonFields, "violationSummary", violationSummary) - for i, offender := range report.TopViolators { - violatorSummary := fmt.Sprintf("key=%s p95=%.2fms weight=%.1f weightAdjustedP95=%.2fms severity=%.2f", - offender.FairnessKey, - offender.P95, - offender.Weight, - offender.WeightAdjustedP95, - offender.ViolationSeverity) - errorFields = append(errorFields, fmt.Sprintf("topViolator%d", i+1), violatorSummary) + for i, outlier := range report.TopOutliers { + outlierSummary := fmt.Sprintf("key=%s p95=%.2fms weight=%.1f weightAdjustedP95=%.2fms severity=%.2f", + outlier.FairnessKey, + outlier.P95, + outlier.Weight, + outlier.WeightAdjustedP95, + outlier.OutlierSeverity) + errorFields = append(errorFields, fmt.Sprintf("topOutlier%d", i+1), outlierSummary) } logger.Error("Fairness status: violated", errorFields...) } // Emit metrics. metricsHandler.Gauge("ebbandflow_fairness_jains_index").Update(report.JainsFairnessIndex) - metricsHandler.Gauge("ebbandflow_fairness_coefficient_variation").Update(report.CoefficientOfVariation) - metricsHandler.Gauge("ebbandflow_fairness_atkinson_index").Update(report.AtkinsonIndex) - metricsHandler.Gauge("ebbandflow_fairness_weight_adjusted").Update(report.WeightAdjustedFairness) + metricsHandler.Gauge("ebbandflow_fairness_outlier_count").Update(float64(report.FairnessOutlierCount)) metricsHandler.Gauge("ebbandflow_fairness_key_count").Update(float64(report.KeyCount)) // Emit one violation metric per validation index with labels