Skip to content

Commit c441353

Browse files
committed
qol: separate decisional logic on post-enrollment confirmation into consensus engine, proposer, and worker manager where relevant, refactor out scoring
1 parent 75b2811 commit c441353

File tree

5 files changed

+429
-84
lines changed

5 files changed

+429
-84
lines changed

node/consensus/global/global_consensus_engine.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ func NewGlobalConsensusEngine(
289289
logger,
290290
config,
291291
engine.ProposeWorkerJoin,
292+
engine.DecideWorkerJoins,
292293
)
293294
if !config.Engine.ArchiveMode {
294295
strategy := provers.RewardGreedy
@@ -2213,3 +2214,101 @@ func (e *GlobalConsensusEngine) ProposeWorkerJoin(
22132214

22142215
return nil
22152216
}
2217+
2218+
func (e *GlobalConsensusEngine) DecideWorkerJoins(
2219+
reject [][]byte,
2220+
confirm [][]byte,
2221+
) error {
2222+
frame := e.GetFrame()
2223+
if frame == nil {
2224+
e.logger.Debug("cannot decide, no frame")
2225+
return errors.New("not ready")
2226+
}
2227+
2228+
_, err := e.keyManager.GetSigningKey("q-prover-key")
2229+
if err != nil {
2230+
e.logger.Debug("cannot decide, no signer key")
2231+
return errors.Wrap(err, "decide worker joins")
2232+
}
2233+
2234+
bundle := &protobufs.MessageBundle{
2235+
Requests: []*protobufs.MessageRequest{},
2236+
}
2237+
2238+
if len(reject) != 0 {
2239+
for _, r := range reject {
2240+
rejectMessage, err := global.NewProverReject(
2241+
r,
2242+
frame.Header.FrameNumber,
2243+
e.keyManager,
2244+
e.hypergraph,
2245+
schema.NewRDFMultiprover(&schema.TurtleRDFParser{}, e.inclusionProver),
2246+
)
2247+
if err != nil {
2248+
e.logger.Error("could not construct reject", zap.Error(err))
2249+
return errors.Wrap(err, "decide worker joins")
2250+
}
2251+
2252+
err = rejectMessage.Prove(frame.Header.FrameNumber)
2253+
if err != nil {
2254+
e.logger.Error("could not construct reject", zap.Error(err))
2255+
return errors.Wrap(err, "decide worker joins")
2256+
}
2257+
2258+
bundle.Requests = append(bundle.Requests, &protobufs.MessageRequest{
2259+
Request: &protobufs.MessageRequest_Reject{
2260+
Reject: rejectMessage.ToProtobuf(),
2261+
},
2262+
})
2263+
}
2264+
}
2265+
2266+
if len(confirm) != 0 {
2267+
for _, r := range confirm {
2268+
confirmMessage, err := global.NewProverConfirm(
2269+
r,
2270+
frame.Header.FrameNumber,
2271+
e.keyManager,
2272+
e.hypergraph,
2273+
schema.NewRDFMultiprover(&schema.TurtleRDFParser{}, e.inclusionProver),
2274+
)
2275+
if err != nil {
2276+
e.logger.Error("could not construct confirm", zap.Error(err))
2277+
return errors.Wrap(err, "decide worker joins")
2278+
}
2279+
2280+
err = confirmMessage.Prove(frame.Header.FrameNumber)
2281+
if err != nil {
2282+
e.logger.Error("could not construct confirm", zap.Error(err))
2283+
return errors.Wrap(err, "decide worker joins")
2284+
}
2285+
2286+
bundle.Requests = append(bundle.Requests, &protobufs.MessageRequest{
2287+
Request: &protobufs.MessageRequest_Confirm{
2288+
Confirm: confirmMessage.ToProtobuf(),
2289+
},
2290+
})
2291+
}
2292+
}
2293+
2294+
bundle.Timestamp = time.Now().UnixMilli()
2295+
2296+
msg, err := bundle.ToCanonicalBytes()
2297+
if err != nil {
2298+
e.logger.Error("could not construct decision", zap.Error(err))
2299+
return errors.Wrap(err, "decide worker joins")
2300+
}
2301+
2302+
err = e.pubsub.PublishToBitmask(
2303+
GLOBAL_PROVER_BITMASK,
2304+
msg,
2305+
)
2306+
if err != nil {
2307+
e.logger.Error("could not construct join", zap.Error(err))
2308+
return errors.Wrap(err, "decide worker joins")
2309+
}
2310+
2311+
e.logger.Debug("submitted join decisions")
2312+
2313+
return nil
2314+
}

node/consensus/provers/proposer.go

Lines changed: 178 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ type Proposal struct {
5656
ShardsDenominator uint64
5757
}
5858

59+
// scored is an internal struct for ranking proposals
60+
type scored struct {
61+
idx int
62+
score *big.Int
63+
}
64+
5965
// Manager ranks shards and assigns free workers to the best ones.
6066
type Manager struct {
6167
logger *zap.Logger
@@ -147,79 +153,9 @@ func (m *Manager) PlanAndAllocate(
147153
// Pre-compute basis (independent of shard specifics).
148154
basis := reward.PomwBasis(difficulty, worldBytes.Uint64(), m.Units)
149155

150-
// Score each shard by expected reward for a single allocation.
151-
type scored struct {
152-
idx int
153-
score *big.Int
154-
}
155-
scores := make([]scored, 0, len(shards))
156-
157-
for i, s := range shards {
158-
if len(s.Filter) == 0 || s.Size == 0 {
159-
m.logger.Debug(
160-
"filtering out empty shard",
161-
zap.String("filter", hex.EncodeToString(s.Filter)),
162-
zap.Uint64("size", s.Size),
163-
)
164-
continue
165-
}
166-
167-
if s.Shards == 0 {
168-
s.Shards = 1
169-
}
170-
171-
var score *big.Int
172-
switch m.Strategy {
173-
case DataGreedy:
174-
// Pure data coverage: larger shards first.
175-
score = big.NewInt(int64(s.Size))
176-
default:
177-
// factor = (stateSize / worldBytes)
178-
factor := decimal.NewFromUint64(s.Size)
179-
factor = factor.Mul(decimal.NewFromBigInt(basis, 0))
180-
factor = factor.Div(decimal.NewFromBigInt(worldBytes, 0))
181-
182-
// ring divisor = 2^Ring
183-
divisor := int64(1)
184-
for j := uint8(0); j < s.Ring+1; j++ {
185-
divisor <<= 1
186-
}
187-
ringDiv := decimal.NewFromInt(divisor)
188-
189-
// shard factor = sqrt(Shards)
190-
shardsSqrt, err := decimal.NewFromUint64(s.Shards).PowWithPrecision(
191-
decimal.NewFromBigRat(big.NewRat(1, 2), 53),
192-
53,
193-
)
194-
if err != nil {
195-
return nil, errors.Wrap(err, "plan and allocate")
196-
}
197-
198-
if shardsSqrt.IsZero() {
199-
return nil, errors.New("plan and allocate")
200-
}
201-
202-
m.logger.Debug(
203-
"calculating score",
204-
zap.Int("index", i),
205-
zap.String("basis", basis.String()),
206-
zap.String("size", big.NewInt(int64(s.Size)).String()),
207-
zap.String("worldBytes", worldBytes.String()),
208-
zap.String("factor", factor.String()),
209-
zap.String("divisor", ringDiv.String()),
210-
zap.String("shardsSqrt", shardsSqrt.String()),
211-
)
212-
factor = factor.Div(ringDiv)
213-
factor = factor.Div(shardsSqrt)
214-
score = factor.BigInt()
215-
}
216-
217-
m.logger.Debug(
218-
"adding score proposal",
219-
zap.Int("index", i),
220-
zap.String("score", score.String()),
221-
)
222-
scores = append(scores, scored{idx: i, score: score})
156+
scores, err := m.scoreShards(shards, basis, worldBytes)
157+
if err != nil {
158+
return nil, errors.Wrap(err, "plan and allocate")
223159
}
224160

225161
if len(scores) == 0 {
@@ -324,3 +260,172 @@ func (m *Manager) PlanAndAllocate(
324260

325261
return proposals, errors.Wrap(err, "plan and allocate")
326262
}
263+
264+
func (m *Manager) scoreShards(
265+
shards []ShardDescriptor,
266+
basis *big.Int,
267+
worldBytes *big.Int,
268+
) ([]scored, error) {
269+
scores := make([]scored, 0, len(shards))
270+
for i, s := range shards {
271+
if len(s.Filter) == 0 || s.Size == 0 {
272+
m.logger.Debug(
273+
"filtering out empty shard",
274+
zap.String("filter", hex.EncodeToString(s.Filter)),
275+
zap.Uint64("size", s.Size),
276+
)
277+
continue
278+
}
279+
280+
if s.Shards == 0 {
281+
s.Shards = 1
282+
}
283+
284+
var score *big.Int
285+
switch m.Strategy {
286+
case DataGreedy:
287+
// Pure data coverage: larger shards first.
288+
score = big.NewInt(int64(s.Size))
289+
default:
290+
// factor = (stateSize / worldBytes)
291+
factor := decimal.NewFromUint64(s.Size)
292+
factor = factor.Mul(decimal.NewFromBigInt(basis, 0))
293+
factor = factor.Div(decimal.NewFromBigInt(worldBytes, 0))
294+
295+
// ring divisor = 2^Ring
296+
divisor := int64(1)
297+
for j := uint8(0); j < s.Ring+1; j++ {
298+
divisor <<= 1
299+
}
300+
ringDiv := decimal.NewFromInt(divisor)
301+
302+
// shard factor = sqrt(Shards)
303+
shardsSqrt, err := decimal.NewFromUint64(s.Shards).PowWithPrecision(
304+
decimal.NewFromBigRat(big.NewRat(1, 2), 53),
305+
53,
306+
)
307+
if err != nil {
308+
return nil, errors.Wrap(err, "score shards")
309+
}
310+
311+
if shardsSqrt.IsZero() {
312+
return nil, errors.New("score shards")
313+
}
314+
315+
m.logger.Debug(
316+
"calculating score",
317+
zap.Int("index", i),
318+
zap.String("basis", basis.String()),
319+
zap.String("size", big.NewInt(int64(s.Size)).String()),
320+
zap.String("worldBytes", worldBytes.String()),
321+
zap.String("factor", factor.String()),
322+
zap.String("divisor", ringDiv.String()),
323+
zap.String("shardsSqrt", shardsSqrt.String()),
324+
)
325+
factor = factor.Div(ringDiv)
326+
factor = factor.Div(shardsSqrt)
327+
score = factor.BigInt()
328+
}
329+
330+
m.logger.Debug(
331+
"adding score proposal",
332+
zap.Int("index", i),
333+
zap.String("score", score.String()),
334+
)
335+
scores = append(scores, scored{idx: i, score: score})
336+
}
337+
return scores, nil
338+
}
339+
340+
// DecideJoins evaluates pending shard joins using the latest shard view. It
341+
// uses the same scoring basis as initial planning. For each pending join:
342+
// - If there exists a strictly better shard in the latest view, reject the
343+
// existing one (this will result in a new join attempt).
344+
// - Otherwise (tie or better), confirm the existing one.
345+
func (m *Manager) DecideJoins(
346+
difficulty uint64,
347+
shards []ShardDescriptor,
348+
pending [][]byte,
349+
) error {
350+
if len(pending) == 0 {
351+
return nil
352+
}
353+
354+
// If no shards remain, we should warn
355+
if len(shards) == 0 {
356+
m.logger.Warn("no shards available to decide")
357+
return nil
358+
}
359+
360+
worldBytes := m.world.GetSize(nil, nil)
361+
362+
basis := reward.PomwBasis(difficulty, worldBytes.Uint64(), m.Units)
363+
364+
scores, err := m.scoreShards(shards, basis, worldBytes)
365+
if err != nil {
366+
return errors.Wrap(err, "decide joins")
367+
}
368+
369+
type srec struct {
370+
desc ShardDescriptor
371+
score *big.Int
372+
}
373+
byHex := make(map[string]srec, len(shards))
374+
var bestScore *big.Int
375+
for _, sc := range scores {
376+
s := shards[sc.idx]
377+
key := hex.EncodeToString(s.Filter)
378+
byHex[key] = srec{desc: s, score: sc.score}
379+
if bestScore == nil || sc.score.Cmp(bestScore) > 0 {
380+
bestScore = sc.score
381+
}
382+
}
383+
384+
// If nothing valid, reject everything.
385+
if bestScore == nil {
386+
reject := make([][]byte, 0, len(pending))
387+
for _, p := range pending {
388+
if len(p) == 0 {
389+
continue
390+
}
391+
pc := make([]byte, len(p))
392+
copy(pc, p)
393+
reject = append(reject, pc)
394+
}
395+
return m.workerMgr.DecideAllocations(reject, nil)
396+
}
397+
398+
reject := make([][]byte, 0, len(pending))
399+
confirm := make([][]byte, 0, len(pending))
400+
401+
for _, p := range pending {
402+
if len(p) == 0 {
403+
continue
404+
}
405+
406+
key := hex.EncodeToString(p)
407+
rec, ok := byHex[key]
408+
if !ok {
409+
// If a pending shard is missing, we should reject it. This could happen
410+
// if shard-out produces a bunch of divisions.
411+
pc := make([]byte, len(p))
412+
copy(pc, p)
413+
reject = append(reject, pc)
414+
continue
415+
}
416+
417+
// Reject only if there exists a strictly better score.
418+
if rec.score.Cmp(bestScore) < 0 {
419+
pc := make([]byte, len(p))
420+
copy(pc, p)
421+
reject = append(reject, pc)
422+
} else {
423+
// Otherwise confirm
424+
pc := make([]byte, len(p))
425+
copy(pc, p)
426+
confirm = append(confirm, pc)
427+
}
428+
}
429+
430+
return m.workerMgr.DecideAllocations(reject, confirm)
431+
}

0 commit comments

Comments
 (0)