Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e3107ba
add dockey
FGasper Aug 11, 2025
0ac82bc
no base64
FGasper Aug 11, 2025
828f072
refactor a bit
FGasper Aug 12, 2025
bb0ac8b
fix agg
FGasper Aug 12, 2025
b965033
bring in mongosync
FGasper Aug 13, 2025
2d6ca57
add error check
FGasper Aug 13, 2025
b137e99
precreate
FGasper Aug 13, 2025
1f25562
try older versions
FGasper Aug 13, 2025
fc6b8c6
back-compat
FGasper Aug 13, 2025
c3121d4
maybe fix 5
FGasper Aug 13, 2025
32563c9
avoid pre-v6 wonkiness
FGasper Aug 13, 2025
8b6d91b
comment
FGasper Aug 13, 2025
5460ac8
rename DB … ??
FGasper Aug 13, 2025
e23adab
move test
FGasper Aug 14, 2025
c9b78f5
more
FGasper Aug 14, 2025
34c63cf
fix test
FGasper Aug 15, 2025
9d90009
Use the simpler logic that routing actually uses.
FGasper Aug 15, 2025
a7c0de9
rollback
FGasper Aug 19, 2025
0ed6ddd
revert
FGasper Aug 19, 2025
578310c
pare back
FGasper Aug 19, 2025
f28c72f
“true” doc key
FGasper Aug 19, 2025
0300b8b
fix DB name
FGasper Aug 19, 2025
7c12d88
fix test
FGasper Aug 19, 2025
ac2b99a
compat with pre-v6
FGasper Aug 19, 2025
7bcddaa
only sharded for now
FGasper Aug 19, 2025
9f01a5f
all CI
FGasper Aug 19, 2025
54d347a
Update internal/verifier/dockey_agg_test.go
FGasper Aug 19, 2025
877a9cc
refactor & add more tests
FGasper Aug 19, 2025
c0ee5c4
Merge branch 'REP-6465-fix-dotted-shard-key' of github.com:FGasper/mi…
FGasper Aug 19, 2025
5c01288
comment
FGasper Aug 19, 2025
035fd76
check reverse
FGasper Aug 20, 2025
ed0aa0d
panic & assert in tests
FGasper Aug 20, 2025
b86061e
no error check
FGasper Aug 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions dockey/agg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Package dockey contains logic related to document key determination.
// Its tests use a cluster and thus are stored in internal/verifier.

package dockey

import (
"maps"
"slices"
"strconv"

"github.com/samber/lo"
"go.mongodb.org/mongo-driver/bson"
)

// ExtractTrueDocKeyAgg returns an aggregation expression that extracts the
// document key from the document to which the `docExpr` refers.
//
// NB: This avoids the problem documented in SERVER-109340; as a result,
// the returned key may not always match the change stream’s `documentKey`
// (because the server misreports its own sharding logic).
func ExtractTrueDocKeyAgg(fieldNames []string, docExpr string) bson.D {
var docKeyNumKeys bson.D
numToKeyLookup := map[string]string{}

for n, name := range fieldNames {
var valExpr = docExpr + "." + name

// Aggregation forbids direct creation of an object with dotted keys.
// So here we create an object with numeric keys, then below we’ll
// map the numeric keys back to the real ones.

nStr := strconv.Itoa(n)
docKeyNumKeys = append(docKeyNumKeys, bson.E{nStr, valExpr})
numToKeyLookup[nStr] = name
}

// Now convert the numeric keys back to the real ones.
return mapObjectKeysAgg(docKeyNumKeys, numToKeyLookup)
}

// Potentially reusable:
func mapObjectKeysAgg(expr any, mapping map[string]string) bson.D {
// We would ideally pass mapping into the aggregation and $getField
// to get the mapped key, but pre-v8 server versions required $getField’s
// field parameter to be a constant. (And pre-v5 didn’t have $getField
// at all.) So we use a $switch instead.
mapAgg := bson.D{
{"$switch", bson.D{
{"branches", lo.Map(
slices.Collect(maps.Keys(mapping)),
func(key string, _ int) bson.D {
return bson.D{
{"case", bson.D{
{"$eq", bson.A{
key,
"$$numericKey",
}},
}},
{"then", mapping[key]},
}
},
)},
}},
}

return bson.D{
{"$arrayToObject", bson.D{
{"$map", bson.D{
{"input", bson.D{
{"$objectToArray", expr},
}},
{"in", bson.D{
{"$let", bson.D{
{"vars", bson.D{
{"numericKey", "$$this.k"},
{"value", "$$this.v"},
}},
{"in", bson.D{
{"k", mapAgg},
{"v", "$$value"},
}},
}},
}},
}},
}},
}
}
153 changes: 121 additions & 32 deletions internal/verifier/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,24 @@ import (
"bytes"
"context"
"fmt"
"strings"
"time"

"github.com/10gen/migration-verifier/chanutil"
"github.com/10gen/migration-verifier/contextplus"
"github.com/10gen/migration-verifier/dockey"
"github.com/10gen/migration-verifier/internal/reportutils"
"github.com/10gen/migration-verifier/internal/retry"
"github.com/10gen/migration-verifier/internal/types"
"github.com/10gen/migration-verifier/internal/util"
"github.com/10gen/migration-verifier/option"
"github.com/pkg/errors"
"github.com/samber/lo"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
"golang.org/x/exp/slices"
)

Expand Down Expand Up @@ -126,16 +128,15 @@ func (verifier *Verifier) compareDocsFromChannels(
// b) compares the new doc against its previously-received, cached
// counterpart and records any mismatch.
handleNewDoc := func(curDocWithTs docWithTs, isSrc bool) error {
docKeyValues := lo.Map(
docKeyValues, err := getDocKeyValues(
verifier.docCompareMethod,
curDocWithTs.doc,
mapKeyFieldNames,
func(fieldName string, _ int) bson.RawValue {
return getDocKeyFieldFromComparison(
verifier.docCompareMethod,
curDocWithTs.doc,
fieldName,
)
},
)
if err != nil {
return errors.Wrapf(err, "extracting doc key (fields: %v) values from doc %+v", mapKeyFieldNames, curDocWithTs.doc)
}

mapKey := getMapKey(docKeyValues)

var ourMap, theirMap map[string]docWithTs
Expand Down Expand Up @@ -321,10 +322,9 @@ func (verifier *Verifier) compareDocsFromChannels(
results = append(
results,
VerificationResult{
ID: getDocKeyFieldFromComparison(
ID: getDocIdFromComparison(
verifier.docCompareMethod,
docWithTs.doc,
"_id",
),
Details: Missing,
Cluster: ClusterTarget,
Expand All @@ -339,10 +339,9 @@ func (verifier *Verifier) compareDocsFromChannels(
results = append(
results,
VerificationResult{
ID: getDocKeyFieldFromComparison(
ID: getDocIdFromComparison(
verifier.docCompareMethod,
docWithTs.doc,
"_id",
),
Details: Missing,
Cluster: ClusterSource,
Expand All @@ -356,21 +355,110 @@ func (verifier *Verifier) compareDocsFromChannels(
return results, srcDocCount, srcByteCount, nil
}

func getDocKeyFieldFromComparison(
func getDocIdFromComparison(
docCompareMethod DocCompareMethod,
doc bson.Raw,
fieldName string,
) bson.RawValue {
switch docCompareMethod {
case DocCompareBinary, DocCompareIgnoreOrder:
return doc.Lookup(fieldName)
return doc.Lookup("_id")
case DocCompareToHashedIndexKey:
return doc.Lookup(docKeyInHashedCompare, fieldName)
return doc.Lookup(docKeyInHashedCompare, "_id")
default:
panic("bad doc compare method: " + docCompareMethod)
}
}

func getDocKeyValues(
docCompareMethod DocCompareMethod,
doc bson.Raw,
fieldNames []string,
) ([]bson.RawValue, error) {
var docKey bson.Raw

switch docCompareMethod {
case DocCompareBinary, DocCompareIgnoreOrder:
// If we have the full document, create the document key manually:
var err error
docKey, err = extractTrueDocKeyFromDoc(fieldNames, doc)
if err != nil {
return nil, err
}
case DocCompareToHashedIndexKey:
// If we have a hash, then the aggregation should have extracted the
// document key for us.
docKeyVal, err := doc.LookupErr(docKeyInHashedCompare)
if err != nil {
return nil, errors.Wrapf(err, "fetching %#q from doc %v", docKeyInHashedCompare, doc)
}

var isDoc bool
docKey, isDoc = docKeyVal.DocumentOK()
if !isDoc {
return nil, fmt.Errorf(
"%#q in doc %v is type %s but should be %s",
docKeyInHashedCompare,
doc,
docKeyVal.Type,
bson.TypeEmbeddedDocument,
)
}
}

var values []bson.RawValue
els, err := docKey.Elements()
if err != nil {
return nil, errors.Wrapf(err, "parsing doc key (%+v) of doc %+v", docKey, doc)
}

for _, el := range els {
val, err := el.ValueErr()
if err != nil {
return nil, errors.Wrapf(err, "parsing doc key element (%+v) of doc %+v", el, doc)
}

values = append(values, val)
}

return values, nil
}

// This extracts the document key from a document gets its field names.
//
// NB: This avoids the problem documented in SERVER-109340; as a result,
// the returned key may not always match the change stream’s `documentKey`
// (because the server misreports its own sharding logic).
func extractTrueDocKeyFromDoc(
fieldNames []string,
doc bson.Raw,
) (bson.Raw, error) {
var dk bson.D
for _, field := range fieldNames {

// This is how sharding routes documents: it always
// splits on the dot and looks deeply into the document.
parts := strings.Split(field, ".")
val, err := doc.LookupErr(parts...)

if errors.Is(err, bsoncore.ErrElementNotFound) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably also want to handle bsoncore.ErrInvalidDepthTraversalError here, which is the kind of error you get if you have a shard key like a.b and the a field in the target document is an array.

// If the document lacks a value for this field
// then don’t add it to the document key.
continue
} else if err == nil {
dk = append(dk, bson.E{field, val})
} else {
return nil, errors.Wrapf(err, "extracting doc key field %#q from doc %+v", field, doc)
}
}

docKey, err := bson.Marshal(dk)
if err != nil {
return nil, errors.Wrapf(err, "marshaling doc key %v from doc %v", dk, docKey)
}

return docKey, nil
}

func simpleTimerReset(t *time.Timer, dur time.Duration) {
if !t.Stop() {
<-t.C
Expand Down Expand Up @@ -607,15 +695,18 @@ func (verifier *Verifier) getDocumentsCursor(ctx mongo.SessionContext, collectio
// Suppress this log for recheck tasks because the list of IDs can be
// quite long.
if !task.IsRecheck() {
extJSON, _ := bson.MarshalExtJSON(cmd, true, false)

verifier.logger.Debug().
Any("task", task.PrimaryKey).
Str("cmd", lo.Ternary(
extJSON == nil,
fmt.Sprintf("%s", cmd),
string(extJSON),
)).

evt := verifier.logger.Debug().
Any("task", task.PrimaryKey)

extJSON, err := bson.MarshalExtJSON(cmd, true, false)
if err != nil {
evt = evt.Str("cmd", fmt.Sprintf("%s", cmd))
} else {
evt = evt.RawJSON("cmd", extJSON)
}

evt.
Str("options", fmt.Sprintf("%v", *runCommandOptions)).
Msg("getDocuments command.")
}
Expand All @@ -631,12 +722,10 @@ func transformPipelineForToHashedIndexKey(
slices.Clone(in),
bson.D{{"$replaceWith", bson.D{
// Single-letter field names minimize the document size.
{docKeyInHashedCompare, bson.D(lo.Map(
{docKeyInHashedCompare, dockey.ExtractTrueDocKeyAgg(
task.QueryFilter.GetDocKeyFields(),
func(f string, _ int) bson.E {
return bson.E{f, "$$ROOT." + f}
},
))},
"$$ROOT",
)},
{"h", bson.D{
{"$toHashedIndexKey", bson.D{
{"$_internalKeyStringValue", bson.D{
Expand All @@ -658,7 +747,7 @@ func (verifier *Verifier) compareOneDocument(srcClientDoc, dstClientDoc bson.Raw
if verifier.docCompareMethod == DocCompareToHashedIndexKey {
// With hash comparison, mismatches are opaque.
return []VerificationResult{{
ID: getDocKeyFieldFromComparison(verifier.docCompareMethod, srcClientDoc, "_id"),
ID: getDocIdFromComparison(verifier.docCompareMethod, srcClientDoc),
Details: Mismatch,
Cluster: ClusterTarget,
NameSpace: namespace,
Expand Down
Loading