Skip to content

Commit 13a1b2d

Browse files
authored
Merge pull request #131 from FGasper/felipe_simpler_doc_key
2 parents f398a89 + 351801a commit 13a1b2d

File tree

5 files changed

+133
-114
lines changed

5 files changed

+133
-114
lines changed

dockey/agg.go

Lines changed: 7 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,7 @@
44
package dockey
55

66
import (
7-
"maps"
8-
"slices"
9-
"strconv"
10-
7+
"github.com/10gen/migration-verifier/mslices"
118
"github.com/samber/lo"
129
"go.mongodb.org/mongo-driver/bson"
1310
)
@@ -21,69 +18,12 @@ import (
2118
func ExtractTrueDocKeyAgg(fieldNames []string, docExpr string) bson.D {
2219
assertFieldNameUniqueness(fieldNames)
2320

24-
var docKeyNumKeys bson.D
25-
numToKeyLookup := map[string]string{}
26-
27-
for n, name := range fieldNames {
28-
var valExpr = docExpr + "." + name
29-
30-
// Aggregation forbids direct creation of an object with dotted keys.
31-
// So here we create an object with numeric keys, then below we’ll
32-
// map the numeric keys back to the real ones.
33-
34-
nStr := strconv.Itoa(n)
35-
docKeyNumKeys = append(docKeyNumKeys, bson.E{nStr, valExpr})
36-
numToKeyLookup[nStr] = name
37-
}
38-
39-
// Now convert the numeric keys back to the real ones.
40-
return mapObjectKeysAgg(docKeyNumKeys, numToKeyLookup)
41-
}
42-
43-
// Potentially reusable:
44-
func mapObjectKeysAgg(expr any, mapping map[string]string) bson.D {
45-
// We would ideally pass mapping into the aggregation and $getField
46-
// to get the mapped key, but pre-v8 server versions required $getField’s
47-
// field parameter to be a constant. (And pre-v5 didn’t have $getField
48-
// at all.) So we use a $switch instead.
49-
mapAgg := bson.D{
50-
{"$switch", bson.D{
51-
{"branches", lo.Map(
52-
slices.Collect(maps.Keys(mapping)),
53-
func(key string, _ int) bson.D {
54-
return bson.D{
55-
{"case", bson.D{
56-
{"$eq", bson.A{
57-
key,
58-
"$$numericKey",
59-
}},
60-
}},
61-
{"then", mapping[key]},
62-
}
63-
},
64-
)},
65-
}},
66-
}
67-
6821
return bson.D{
69-
{"$arrayToObject", bson.D{
70-
{"$map", bson.D{
71-
{"input", bson.D{
72-
{"$objectToArray", expr},
73-
}},
74-
{"in", bson.D{
75-
{"$let", bson.D{
76-
{"vars", bson.D{
77-
{"numericKey", "$$this.k"},
78-
{"value", "$$this.v"},
79-
}},
80-
{"in", bson.D{
81-
{"k", mapAgg},
82-
{"v", "$$value"},
83-
}},
84-
}},
85-
}},
86-
}},
87-
}},
22+
{"$arrayToObject", mslices.Of(lo.Map(
23+
fieldNames,
24+
func(fieldName string, _ int) [2]string {
25+
return [...]string{fieldName, docExpr + "." + fieldName}
26+
},
27+
))},
8828
}
8929
}

dockey/raw.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ func ExtractTrueDocKeyFromDoc(
2323

2424
var dk bson.D
2525
for _, field := range fieldNames {
26+
var val bson.RawValue
2627

2728
// This is how sharding routes documents: it always
2829
// splits on the dot and looks deeply into the document.
@@ -31,13 +32,13 @@ func ExtractTrueDocKeyFromDoc(
3132

3233
if errors.Is(err, bsoncore.ErrElementNotFound) || errors.As(err, &bsoncore.InvalidDepthTraversalError{}) {
3334
// If the document lacks a value for this field
34-
// then don’t add it to the document key.
35-
continue
36-
} else if err == nil {
37-
dk = append(dk, bson.E{field, val})
38-
} else {
35+
// then make it null in the document key.
36+
val = bson.RawValue{Type: bson.TypeNull}
37+
} else if err != nil {
3938
return nil, errors.Wrapf(err, "extracting doc key field %#q from doc %+v", field, doc)
4039
}
40+
41+
dk = append(dk, bson.E{field, val})
4142
}
4243

4344
docKey, err := bson.Marshal(dk)

dockey/test/cases.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ var TestCases = []TestCase{
5252
},
5353
DocKey: bson.D{
5454
{"_id", "ccc"},
55+
{"foo.bar.baz", nil},
5556
},
5657
},
5758
{
@@ -73,6 +74,7 @@ var TestCases = []TestCase{
7374
},
7475
DocKey: bson.D{
7576
{"_id", "eee"},
77+
{"foo.bar.baz", nil},
7678
},
7779
},
7880
}

internal/util/sharding.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"github.com/pkg/errors"
88
"go.mongodb.org/mongo-driver/bson"
99
"go.mongodb.org/mongo-driver/mongo"
10+
"go.mongodb.org/mongo-driver/mongo/options"
11+
"go.mongodb.org/mongo-driver/mongo/writeconcern"
1012
)
1113

1214
const (
@@ -52,3 +54,31 @@ func GetShardKey(
5254

5355
return option.Some(key), nil
5456
}
57+
58+
// Used in tests:
59+
func DisableBalancing(ctx context.Context, coll *mongo.Collection) error {
60+
client := coll.Database().Client()
61+
configDB := client.Database("config")
62+
63+
ns := FullName(coll)
64+
65+
_, err := configDB.
66+
Collection(
67+
"collections",
68+
options.Collection().SetWriteConcern(writeconcern.Majority()),
69+
).
70+
UpdateOne(
71+
ctx,
72+
bson.D{{"_id", ns}},
73+
bson.D{
74+
{"$set", bson.D{
75+
{"noBalance", true},
76+
}},
77+
},
78+
)
79+
80+
if err != nil {
81+
return errors.Wrapf(err, "disabling %#q’s shard balancing", ns)
82+
}
83+
return nil
84+
}

internal/verifier/migration_verifier_test.go

Lines changed: 88 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -162,70 +162,115 @@ func (suite *IntegrationTestSuite) TestVerifier_Dotted_Shard_Key() {
162162
dbName := suite.DBNameForTest()
163163
collName := "coll"
164164

165-
verifier := suite.BuildVerifier()
166-
srcColl := verifier.srcClient.Database(dbName).Collection(collName)
167-
dstColl := verifier.dstClient.Database(dbName).Collection(collName)
168-
169165
docs := []bson.D{
170-
// NB: It’s not possible to test with /foo.bar.baz or /foo/bar.baz
171-
// because the server (as of v8) will route those to the same shard.
172-
// Only /foo/bar/baz actually gets routed as per the shard key, so
173-
// only that value can create a duplicate ID.
174-
{{"_id", 33}, {"foo", bson.D{{"bar", bson.D{{"baz", 100}}}}}},
175-
{{"_id", 33}, {"foo", bson.D{{"bar", bson.D{{"baz", 200}}}}}},
166+
{{"_id", 33}, {"foo", bson.D{{"bar", 100}}}},
167+
{{"_id", 33}, {"foo", bson.D{{"bar", 200}}}},
176168
}
177169

178170
shardKey := bson.D{
179-
{"foo.bar.baz", 1},
171+
{"foo.bar", 1},
180172
}
181173

182-
for _, coll := range mslices.Of(srcColl, dstColl) {
183-
db := coll.Database()
184-
client := db.Client()
174+
keyField := shardKey[0].Key
175+
splitKey := bson.D{{keyField, 150}}
185176

186-
// For sharded, pre-v8 clusters we need to create the collection first.
187-
require.NoError(db.CreateCollection(ctx, coll.Name()))
188-
require.NoError(client.Database("admin").RunCommand(
189-
ctx,
190-
bson.D{{"enableSharding", db.Name()}},
191-
).Err())
177+
clientsWithLabels := []struct {
178+
label string
179+
client *mongo.Client
180+
}{
181+
{"source", suite.srcMongoClient},
182+
{"destination", suite.dstMongoClient},
183+
}
192184

193-
require.NoError(client.Database("admin").RunCommand(
194-
ctx,
195-
bson.D{
196-
{"shardCollection", db.Name() + "." + coll.Name()},
197-
{"key", shardKey},
198-
},
199-
).Err(),
200-
)
185+
for _, clientWithLabel := range clientsWithLabels {
186+
client := clientWithLabel.client
187+
clientLabel := clientWithLabel.label
188+
189+
db := client.Database(dbName)
190+
coll := db.Collection(collName)
191+
192+
// The DB shouldn’t exist anyway, but just in case.
193+
require.NoError(db.Drop(ctx), "should drop database")
201194

202195
shardIds := getShardIds(suite.T(), client)
203196

204197
admin := client.Database("admin")
205-
keyField := "foo.bar.baz"
206-
splitKey := bson.D{{keyField, 150}}
198+
199+
require.NoError(admin.RunCommand(
200+
ctx,
201+
bson.D{
202+
{"enableSharding", db.Name()},
203+
},
204+
).Err())
207205

208206
require.NoError(
209-
admin.RunCommand(ctx, bson.D{
210-
{"split", db.Name() + "." + coll.Name()},
211-
{"middle", splitKey},
212-
}).Err(),
207+
admin.RunCommand(
208+
ctx,
209+
bson.D{
210+
{"shardCollection", FullName(coll)},
211+
{"key", shardKey},
212+
},
213+
).Err(),
214+
"should shard collection on %s",
215+
clientLabel,
213216
)
214217

215218
require.NoError(
216-
admin.RunCommand(ctx, bson.D{
217-
{"moveChunk", db.Name() + "." + coll.Name()},
218-
{"to", shardIds[0]},
219-
{"find", bson.D{{keyField, 149}}},
220-
}).Err(),
219+
util.DisableBalancing(ctx, coll),
220+
"should disable %#q’s balancing on %s",
221+
FullName(coll),
222+
clientLabel,
221223
)
222224

223225
require.NoError(
224226
admin.RunCommand(ctx, bson.D{
225-
{"moveChunk", db.Name() + "." + coll.Name()},
226-
{"to", shardIds[1]},
227-
{"find", bson.D{{keyField, 151}}},
227+
{"split", FullName(coll)},
228+
{"middle", splitKey},
228229
}).Err(),
230+
"should split on %s",
231+
clientLabel,
232+
)
233+
234+
require.Eventually(
235+
func() bool {
236+
err := admin.RunCommand(ctx, bson.D{
237+
{"moveChunk", FullName(coll)},
238+
{"find", bson.D{{keyField, 149}}},
239+
{"to", shardIds[0]},
240+
{"_waitForDelete", true},
241+
}).Err()
242+
243+
if err != nil {
244+
suite.T().Logf("Failed to move %s’s lower chunk to shard %#q: %v", clientLabel, shardIds[0], err)
245+
return false
246+
}
247+
248+
return true
249+
},
250+
5*time.Minute,
251+
time.Second,
252+
"Should move lower chunk to the 1st shard",
253+
)
254+
255+
require.Eventually(
256+
func() bool {
257+
err := admin.RunCommand(ctx, bson.D{
258+
{"moveChunk", FullName(coll)},
259+
{"find", bson.D{{keyField, 151}}},
260+
{"to", shardIds[1]},
261+
{"_waitForDelete", true},
262+
}).Err()
263+
264+
if err != nil {
265+
suite.T().Logf("Failed to move %s’s upper chunk to shard %#q: %v", clientLabel, shardIds[1], err)
266+
return false
267+
}
268+
269+
return true
270+
},
271+
5*time.Minute,
272+
time.Second,
273+
"Should move upper chunk to the 2nd shard",
229274
)
230275

231276
_, err := coll.InsertMany(ctx, lo.ToAnySlice(lo.Shuffle(docs)))
@@ -252,6 +297,7 @@ func (suite *IntegrationTestSuite) TestVerifier_Dotted_Shard_Key() {
252297
},
253298
}
254299

300+
verifier := suite.BuildVerifier()
255301
results, docCount, _, err := verifier.FetchAndCompareDocuments(ctx, 0, task)
256302
require.NoError(err, "should fetch & compare")
257303
assert.EqualValues(suite.T(), len(docs), docCount, "expected # of docs")

0 commit comments

Comments
 (0)