68
68
* "\"oobReportingPeriod\":\"10s\"," +
69
69
* "\"weightExpirationPeriod\":\"180s\"," +
70
70
* "\"errorUtilizationPenalty\":\"1.0\"," +
71
- * "\"weightUpdatePeriod\":\"1s\"}}]}";
71
+ * "\"weightUpdatePeriod\":\"1s\"," +
72
+ * "\"slowStartConfig\":{" +
73
+ * "\"minWeightPercent\":10.0," +
74
+ * "\"aggression\":1.0," +
75
+ * "\"slowStartWindow\":\"30s\"}}}]}";
72
76
* serviceConfig = (Map<String, ?>) JsonParser.parse(wrrConfig);
73
77
* channel = ManagedChannelBuilder.forTarget("test:///lb.test.grpc.io")
74
78
* .defaultServiceConfig(serviceConfig)
@@ -90,6 +94,7 @@ final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer {
90
94
private static final LongCounterMetricInstrument RR_FALLBACK_COUNTER ;
91
95
private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER ;
92
96
private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_STALE_COUNTER ;
97
+ private static final LongCounterMetricInstrument ENDPOINT_SLOW_START_COUNTER ;
93
98
private static final DoubleHistogramMetricInstrument ENDPOINT_WEIGHTS_HISTOGRAM ;
94
99
private static final Logger log = Logger .getLogger (
95
100
WeightedRoundRobinLoadBalancer .class .getName ());
@@ -133,6 +138,14 @@ final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer {
133
138
Lists .newArrayList ("grpc.target" ),
134
139
Lists .newArrayList ("grpc.lb.locality" , "grpc.lb.backend_service" ),
135
140
false );
141
+ ENDPOINT_SLOW_START_COUNTER = metricInstrumentRegistry .registerLongCounter (
142
+ "grpc.lb.wrr.endpoints_in_slow_start" ,
143
+ "EXPERIMENTAL. Number of endpoints from each scheduler update that "
144
+ + "are in slow start window" ,
145
+ "{endpoint}" ,
146
+ Lists .newArrayList ("grpc.target" ),
147
+ Lists .newArrayList ("grpc.lb.locality" , "grpc.lb.backend_service" ),
148
+ false );
136
149
ENDPOINT_WEIGHTS_HISTOGRAM = metricInstrumentRegistry .registerDoubleHistogram (
137
150
"grpc.lb.wrr.endpoint_weights" ,
138
151
"EXPERIMENTAL. The histogram buckets will be endpoint weight ranges." ,
@@ -243,16 +256,21 @@ private SubchannelPicker createReadyPicker(Collection<ChildLbState> activeList)
243
256
private void updateWeight (WeightedRoundRobinPicker picker ) {
244
257
Helper helper = getHelper ();
245
258
float [] newWeights = new float [picker .children .size ()];
259
+ float [] newScales = new float [picker .children .size ()];
246
260
AtomicInteger staleEndpoints = new AtomicInteger ();
247
261
AtomicInteger notYetUsableEndpoints = new AtomicInteger ();
262
+ AtomicInteger slowStartEndpoints = new AtomicInteger ();
248
263
for (int i = 0 ; i < picker .children .size (); i ++) {
249
264
double newWeight = ((WeightedChildLbState ) picker .children .get (i )).getWeight (staleEndpoints ,
250
265
notYetUsableEndpoints );
266
+ double newScale = ((WeightedChildLbState ) picker .children .get (i ))
267
+ .getScale (slowStartEndpoints );
251
268
helper .getMetricRecorder ()
252
269
.recordDoubleHistogram (ENDPOINT_WEIGHTS_HISTOGRAM , newWeight ,
253
270
ImmutableList .of (helper .getChannelTarget ()),
254
271
ImmutableList .of (locality , backendService ));
255
272
newWeights [i ] = newWeight > 0 ? (float ) newWeight : 0.0f ;
273
+ newScales [i ] = newScale > 0 ? (float ) newScale : 1.0f ;
256
274
}
257
275
258
276
if (staleEndpoints .get () > 0 ) {
@@ -267,7 +285,13 @@ private void updateWeight(WeightedRoundRobinPicker picker) {
267
285
ImmutableList .of (helper .getChannelTarget ()),
268
286
ImmutableList .of (locality , backendService ));
269
287
}
270
- boolean weightsEffective = picker .updateWeight (newWeights );
288
+ if (slowStartEndpoints .get () > 0 ) {
289
+ helper .getMetricRecorder ()
290
+ .addLongCounter (ENDPOINT_SLOW_START_COUNTER , slowStartEndpoints .get (),
291
+ ImmutableList .of (helper .getChannelTarget ()),
292
+ ImmutableList .of (locality , backendService ));
293
+ }
294
+ boolean weightsEffective = picker .updateWeight (newWeights , newScales );
271
295
if (!weightsEffective ) {
272
296
helper .getMetricRecorder ()
273
297
.addLongCounter (RR_FALLBACK_COUNTER , 1 , ImmutableList .of (helper .getChannelTarget ()),
@@ -289,6 +313,7 @@ final class WeightedChildLbState extends ChildLbState {
289
313
private final Set <WrrSubchannel > subchannels = new HashSet <>();
290
314
private volatile long lastUpdated ;
291
315
private volatile long nonEmptySince ;
316
+ private volatile long readySince ;
292
317
private volatile double weight = 0 ;
293
318
294
319
private OrcaReportListener orcaReportListener ;
@@ -320,6 +345,25 @@ private double getWeight(AtomicInteger staleEndpoints, AtomicInteger notYetUsabl
320
345
}
321
346
}
322
347
348
+ private double getScale (AtomicInteger slowStartEndpoints ) {
349
+ if (config == null || config .slowStartConfig == null ) {
350
+ return 1 ;
351
+ }
352
+ long slowStartWindowNanos = config .slowStartConfig .slowStartWindowNanos ;
353
+ if (slowStartWindowNanos <= 0 ) {
354
+ return 1 ;
355
+ }
356
+ long now = ticker .nanoTime ();
357
+ if (now - readySince >= slowStartWindowNanos ) {
358
+ return 1 ;
359
+ } else {
360
+ slowStartEndpoints .incrementAndGet ();
361
+ double timeFactor = Math .max (now - readySince , 1.0 ) / slowStartWindowNanos ;
362
+ double weightPercent = Math .pow (timeFactor , 1.0 / config .slowStartConfig .aggression );
363
+ return Math .max (config .slowStartConfig .minWeightPercent / 100.0 , weightPercent );
364
+ }
365
+ }
366
+
323
367
public void addSubchannel (WrrSubchannel wrrSubchannel ) {
324
368
subchannels .add (wrrSubchannel );
325
369
}
@@ -439,6 +483,7 @@ public void start(SubchannelStateListener listener) {
439
483
public void onSubchannelState (ConnectivityStateInfo newState ) {
440
484
if (newState .getState ().equals (ConnectivityState .READY )) {
441
485
owner .nonEmptySince = infTime ;
486
+ owner .readySince = ticker .nanoTime ();
442
487
}
443
488
listener .onSubchannelState (newState );
444
489
}
@@ -517,8 +562,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
517
562
}
518
563
519
564
/** Returns {@code true} if weights are different than round_robin. */
520
- private boolean updateWeight (float [] newWeights ) {
521
- this .scheduler = new StaticStrideScheduler (newWeights , sequence );
565
+ private boolean updateWeight (float [] newWeights , float [] newScales ) {
566
+ this .scheduler = new StaticStrideScheduler (newWeights , newScales , sequence );
522
567
return !this .scheduler .usesRoundRobin ();
523
568
}
524
569
@@ -604,7 +649,7 @@ static final class StaticStrideScheduler {
604
649
private static final double K_MAX_RATIO = 10 ;
605
650
private static final double K_MIN_RATIO = 0.1 ;
606
651
607
- StaticStrideScheduler (float [] weights , AtomicInteger sequence ) {
652
+ StaticStrideScheduler (float [] weights , float [] scales , AtomicInteger sequence ) {
608
653
checkArgument (weights .length >= 1 , "Couldn't build scheduler: requires at least one weight" );
609
654
int numChannels = weights .length ;
610
655
int numWeightedChannels = 0 ;
@@ -643,12 +688,14 @@ static final class StaticStrideScheduler {
643
688
int weightLowerBound = (int ) Math .ceil (scalingFactor * unscaledMeanWeight * K_MIN_RATIO );
644
689
short [] scaledWeights = new short [numChannels ];
645
690
for (int i = 0 ; i < numChannels ; i ++) {
691
+ double curScalingFactor = scalingFactor * scales [i ];
692
+ int weight ;
646
693
if (weights [i ] <= 0 ) {
647
- scaledWeights [ i ] = (short ) Math .round (scalingFactor * unscaledMeanWeight );
694
+ weight = (int ) Math .round (curScalingFactor * unscaledMeanWeight );
648
695
} else {
649
- int weight = (int ) Math .round (scalingFactor * Math .min (weights [i ], unscaledMaxWeight ));
650
- scaledWeights [i ] = (short ) Math .max (weight , weightLowerBound );
696
+ weight = (int ) Math .round (curScalingFactor * Math .min (weights [i ], unscaledMaxWeight ));
651
697
}
698
+ scaledWeights [i ] = (short ) Math .max (weight , weightLowerBound );
652
699
}
653
700
654
701
this .scaledWeights = scaledWeights ;
0 commit comments