diff --git a/graphql/admin/admin.go b/graphql/admin/admin.go index 165b1d91be9..e166214645d 100644 --- a/graphql/admin/admin.go +++ b/graphql/admin/admin.go @@ -654,6 +654,10 @@ func newAdminResolver( return } + if len(pl.Postings) == 2 && string(pl.Postings[0].Value) == "_STAR_ALL" { + pl.Postings = pl.Postings[1:] + } + // There should be only one posting. if len(pl.Postings) != 1 { glog.Errorf("Only one posting is expected in the graphql schema posting list but got %d", diff --git a/posting/index.go b/posting/index.go index f17d5e6a57a..933d7047bbc 100644 --- a/posting/index.go +++ b/posting/index.go @@ -15,14 +15,17 @@ import ( "math" "os" "strings" + "sync" "sync/atomic" "time" "unsafe" + "github.com/dgryski/go-farm" "github.com/golang/glog" "github.com/pkg/errors" ostats "go.opencensus.io/stats" "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" "github.com/dgraph-io/badger/v4" @@ -63,23 +66,1097 @@ func indexTokens(ctx context.Context, info *indexMutationInfo) ([]string, error) } sv, err := types.Convert(info.val, schemaType) if err != nil { - return nil, err + return nil, errors.Wrap(err, "Cannot convert value to scalar type") } var tokens []string for _, it := range info.tokenizers { toks, err := tok.BuildTokens(sv.Value, tok.GetTokenizerForLang(it, lang)) if err != nil { - return tokens, err + return tokens, errors.Wrapf(err, "Cannot build tokens for attribute %s", attr) } tokens = append(tokens, toks...) } return tokens, nil } -// addIndexMutations adds mutation(s) for a single term, to maintain the index, -// but only for the given tokenizers. -// TODO - See if we need to pass op as argument as t should already have Op. +type MutationPipeline struct { + txn *Txn +} + +func NewMutationPipeline(txn *Txn) *MutationPipeline { + return &MutationPipeline{txn: txn} +} + +type PredicatePipeline struct { + attr string + edges chan *pb.DirectedEdge + wg *sync.WaitGroup + errCh chan error +} + +func (pp *PredicatePipeline) close() { + pp.wg.Done() +} + +func (mp *MutationPipeline) ProcessVectorIndex(ctx context.Context, pipeline *PredicatePipeline, info predicateInfo) error { + var wg errgroup.Group + numThreads := 10 + + for i := 0; i < numThreads; i++ { + wg.Go(func() error { + for edge := range pipeline.edges { + uid := edge.Entity + + key := x.DataKey(pipeline.attr, uid) + pl, err := mp.txn.Get(key) + if err != nil { + return err + } + if err := pl.AddMutationWithIndex(ctx, edge, mp.txn); err != nil { + return err + } + } + return nil + }) + } + + if err := wg.Wait(); err != nil { + return err + } + + return nil +} + +func (mp *MutationPipeline) InsertTokenizerIndexes(ctx context.Context, pipeline *PredicatePipeline, postings *map[uint64]*pb.PostingList, info predicateInfo) error { + startTime := time.Now() + defer func() { + fmt.Println("Inserting tokenizer indexes for predicate", pipeline.attr, "took", time.Since(startTime)) + }() + + tokenizers := schema.State().Tokenizer(ctx, pipeline.attr) + if len(tokenizers) == 0 { + return nil + } + + indexesGenInMutation := types.NewLockedShardedMap[string, *MutableLayer]() + wg := &sync.WaitGroup{} + + syncMap := sync.Map{} + + chanFn := func(uids chan uint64, estimatedSize int) { + defer wg.Done() + indexGenInThread := make(map[string]*pb.PostingList, estimatedSize) + tokenizers := schema.State().Tokenizer(ctx, pipeline.attr) + + factorySpecs, err := schema.State().FactoryCreateSpec(ctx, pipeline.attr) + if err != nil { + pipeline.errCh <- err + return + } + + indexEdge := &pb.DirectedEdge{ + Attr: pipeline.attr, + } + + for uid := range uids { + postingList := (*postings)[uid] + newList := &pb.PostingList{} + if info.isSingleEdge && len(postingList.Postings) == 2 { + newList.Postings = append(newList.Postings, postingList.Postings[1]) + newList.Postings = append(newList.Postings, postingList.Postings[0]) + } else { + newList = postingList + } + for _, posting := range newList.Postings { + info := &indexMutationInfo{ + tokenizers: tokenizers, + factorySpecs: factorySpecs, + op: pb.DirectedEdge_SET, + val: types.Val{ + Tid: types.TypeID(posting.ValType), + Value: posting.Value, + }, + } + + info.edge = &pb.DirectedEdge{ + Attr: pipeline.attr, + Op: pb.DirectedEdge_SET, + Lang: string(posting.LangTag), + Value: posting.Value, + } + + key := fmt.Sprintf("%s,%s", posting.LangTag, posting.Value) + tokens, loaded := syncMap.Load(key) + + if !loaded { + tokens, err = indexTokens(ctx, info) + if err != nil { + fmt.Println("ERRORRRING", err) + x.Panic(err) + } + syncMap.Store(key, tokens) + } + + indexEdge.Op = GetPostingOp(posting.Op) + indexEdge.ValueId = uid + mpost := makePostingFromEdge(mp.txn.StartTs, indexEdge) + + for _, token := range tokens.([]string) { + key := x.IndexKey(pipeline.attr, token) + pk, _ := x.Parse([]byte(key)) + fmt.Println("TOKENS", key, pk) + val, ok := indexGenInThread[string(key)] + if !ok { + val = &pb.PostingList{} + } + val.Postings = append(val.Postings, mpost) + indexGenInThread[string(key)] = val + } + } + } + + for key, value := range indexGenInThread { + pk, _ := x.Parse([]byte(key)) + fmt.Println("LOCAL MAP", pk, value) + indexesGenInMutation.Update(key, func(val *MutableLayer, ok bool) *MutableLayer { + if !ok { + val = newMutableLayer() + val.currentEntries = &pb.PostingList{} + } + for _, posting := range value.Postings { + val.insertPosting(posting, false) + } + return val + }) + } + } + + numGo := 10 + wg.Add(numGo) + chMap := make(map[int]chan uint64) + + for i := 0; i < numGo; i++ { + uidCh := make(chan uint64, numGo) + chMap[i] = uidCh + go chanFn(uidCh, len(*postings)/numGo) + } + + for uid := range *postings { + chMap[int(uid)%numGo] <- uid + } + + for i := 0; i < numGo; i++ { + close(chMap[i]) + } + + wg.Wait() + + mp.txn.cache.Lock() + defer mp.txn.cache.Unlock() + + indexGenInTxn := mp.txn.cache.deltas.GetIndexMapForPredicate(pipeline.attr) + if indexGenInTxn == nil { + indexGenInTxn = types.NewLockedShardedMap[string, *pb.PostingList]() + mp.txn.cache.deltas.indexMap[pipeline.attr] = indexGenInTxn + } + + fmt.Println("INSERTING INDEX", pipeline.attr, *postings) + updateFn := func(key string, value *MutableLayer) { + pk, _ := x.Parse([]byte(key)) + fmt.Println("UPDATE INDEX", pk, value) + indexGenInTxn.Update(key, func(val *pb.PostingList, ok bool) *pb.PostingList { + if !ok { + val = &pb.PostingList{} + } + val.Postings = append(val.Postings, value.currentEntries.Postings...) + return val + }) + } + + if info.hasUpsert { + err := indexesGenInMutation.Iterate(func(key string, value *MutableLayer) error { + updateFn(key, value) + mp.txn.addConflictKey(farm.Fingerprint64([]byte(key))) + return nil + }) + if err != nil { + return err + } + } else { + err := indexesGenInMutation.Iterate(func(key string, value *MutableLayer) error { + updateFn(key, value) + mp.txn.addConflictKeyWithUid([]byte(key), value.currentEntries, info.hasUpsert, info.noConflict) + return nil + }) + if err != nil { + return err + } + } + + return nil +} + +// func (mp *MutationPipeline) InsertTokenizerIndexes(ctx context.Context, pipeline *PredicatePipeline, postings *map[uint64]*pb.PostingList, info predicateInfo) error { +// startTime := time.Now() +// defer func() { +// fmt.Println("Inserting tokenizer indexes for predicate", pipeline.attr, "took", time.Since(startTime)) +// }() + +// tokenizers := schema.State().Tokenizer(ctx, pipeline.attr) +// if len(tokenizers) == 0 { +// return nil +// } + +// values := make(map[string]*pb.PostingList, len(tokenizers)*len(*postings)) +// valPost := make(map[string]*pb.Posting) + +// indexEdge1 := &pb.DirectedEdge{ +// Attr: pipeline.attr, +// } + +// for uid, postingList := range *postings { +// fmt.Println("POSTING", uid, postingList) +// for _, posting := range postingList.Postings { +// key := fmt.Sprintf("%s,%s", posting.LangTag, posting.Value) +// valPl, ok := values[key] +// if !ok { +// valPl = &pb.PostingList{} +// } + +// indexEdge1.Op = GetPostingOp(posting.Op) +// indexEdge1.ValueId = uid + +// mpost := makePostingFromEdge(mp.txn.StartTs, indexEdge1) +// valPl.Postings = append(valPl.Postings, mpost) +// values[key] = valPl + +// newPosting := new(pb.Posting) +// newPosting.ValType = posting.ValType +// newPosting.Value = posting.Value +// newPosting.LangTag = posting.LangTag +// valPost[key] = newPosting +// } +// } + +// keysCreated := make([]string, 0, len(values)) +// for i := range values { +// keysCreated = append(keysCreated, i) +// } + +// //fmt.Println("START") + +// f := func(numGo int) *types.LockedShardedMap[string, *MutableLayer] { +// wg := &sync.WaitGroup{} + +// globalMap := types.NewLockedShardedMap[string, *MutableLayer]() +// process := func(start int) { +// factorySpecs, err := schema.State().FactoryCreateSpec(ctx, pipeline.attr) +// if err != nil { +// pipeline.errCh <- err +// return +// } + +// defer wg.Done() +// localMap := make(map[string]*pb.PostingList, len(values)/numGo) +// for i := start; i < len(values); i += numGo { +// key := keysCreated[i] +// valPl := values[key] +// if len(valPl.Postings) == 0 { +// continue +// } + +// posting := valPost[key] +// // Build info per iteration without indexEdge. +// info := &indexMutationInfo{ +// tokenizers: tokenizers, +// factorySpecs: factorySpecs, +// op: pb.DirectedEdge_SET, +// val: types.Val{ +// Tid: types.TypeID(posting.ValType), +// Value: posting.Value, +// }, +// } + +// info.edge = &pb.DirectedEdge{ +// Attr: pipeline.attr, +// Op: pb.DirectedEdge_SET, +// Lang: string(posting.LangTag), +// Value: posting.Value, +// } + +// tokens, erri := indexTokens(ctx, info) +// if erri != nil { +// fmt.Println("ERRORRRING", erri) +// x.Panic(erri) +// } + +// for _, token := range tokens { +// key := x.IndexKey(pipeline.attr, token) +// pk, _ := x.Parse([]byte(key)) +// fmt.Println("TOKENS", key, i, numGo, pk) +// val, ok := localMap[string(key)] +// if !ok { +// val = &pb.PostingList{} +// } +// val.Postings = append(val.Postings, valPl.Postings...) +// localMap[string(key)] = val +// } +// } + +// for key, value := range localMap { +// pk, _ := x.Parse([]byte(key)) +// fmt.Println("LOCAL MAP", pk, numGo, value) +// globalMap.Update(key, func(val *MutableLayer, ok bool) *MutableLayer { +// if !ok { +// val = newMutableLayer() +// val.currentEntries = &pb.PostingList{} +// } +// for _, posting := range value.Postings { +// val.insertPosting(posting, false) +// } +// return val +// }) +// } +// } + +// for i := range numGo { +// wg.Add(1) +// go process(i) +// } + +// wg.Wait() + +// return globalMap +// } + +// globalMapI := f(1) + +// mp.txn.cache.Lock() +// defer mp.txn.cache.Unlock() + +// globalMap := mp.txn.cache.deltas.GetIndexMapForPredicate(pipeline.attr) +// if globalMap == nil { +// globalMap = types.NewLockedShardedMap[string, *pb.PostingList]() +// mp.txn.cache.deltas.indexMap[pipeline.attr] = globalMap +// } + +// updateFn := func(key string, value *MutableLayer) { +// globalMap.Update(key, func(val *pb.PostingList, ok bool) *pb.PostingList { +// if !ok { +// val = &pb.PostingList{} +// } +// val.Postings = append(val.Postings, value.currentEntries.Postings...) +// return val +// }) +// } + +// if info.hasUpsert { +// err := globalMapI.Iterate(func(key string, value *MutableLayer) error { +// updateFn(key, value) +// mp.txn.addConflictKey(farm.Fingerprint64([]byte(key))) +// return nil +// }) +// if err != nil { +// return err +// } +// } else { +// err := globalMapI.Iterate(func(key string, value *MutableLayer) error { +// updateFn(key, value) +// mp.txn.addConflictKeyWithUid([]byte(key), value.currentEntries, info.hasUpsert, info.noConflict) +// return nil +// }) +// if err != nil { +// return err +// } +// } + +// return nil +// } + +type predicateInfo struct { + isList bool + index bool + reverse bool + count bool + noConflict bool + hasUpsert bool + isUid bool + + isSingleEdge bool +} + +func (mp *MutationPipeline) ProcessList(ctx context.Context, pipeline *PredicatePipeline, info predicateInfo) error { + su, schemaExists := schema.State().Get(ctx, pipeline.attr) + + mutations := make(map[uint64]*MutableLayer, 1000) + + for edge := range pipeline.edges { + if edge.Op != pb.DirectedEdge_DEL && !schemaExists { + return errors.Errorf("runMutation: Unable to find schema for %s", edge.Attr) + } + + if err := ValidateAndConvert(edge, &su); err != nil { + return err + } + + uid := edge.Entity + pl, exists := mutations[uid] + if !exists { + pl = newMutableLayer() + pl.currentEntries = &pb.PostingList{} + } + + mpost := NewPosting(edge) + mpost.StartTs = mp.txn.StartTs + if mpost.PostingType != pb.Posting_REF { + edge.ValueId = FingerprintEdge(edge) + mpost.Uid = edge.ValueId + } + + pl.insertPosting(mpost, false) + mutations[uid] = pl + } + + postings := make(map[uint64]*pb.PostingList, 1000) + for uid, pl := range mutations { + postings[uid] = pl.currentEntries + } + + if info.reverse { + if err := mp.ProcessReverse(ctx, pipeline, &postings, info); err != nil { + return err + } + } + + if info.index { + if err := mp.InsertTokenizerIndexes(ctx, pipeline, &postings, info); err != nil { + return err + } + } + + if info.count { + return mp.ProcessCount(ctx, pipeline, &postings, info, true, false) + } + + dataKey := x.DataKey(pipeline.attr, 0) + baseKey := string(dataKey[:len(dataKey)-8]) // Avoid repeated conversion + + for uid, pl := range postings { + if len(pl.Postings) == 0 { + continue + } + + binary.BigEndian.PutUint64(dataKey[len(dataKey)-8:], uid) + if newPl, err := mp.txn.AddDelta(baseKey+string(dataKey[len(dataKey)-8:]), pl, info.isUid, true); err != nil { + return err + } else { + if !info.noConflict { + mp.txn.addConflictKeyWithUid(dataKey, newPl, info.hasUpsert, info.noConflict) + } + } + } + + return nil +} + +func findSingleValueInPostingList(pb *pb.PostingList) *pb.Posting { + if pb == nil { + return nil + } + for _, p := range pb.Postings { + if p.Op == Set { + return p + } + } + return nil +} + +func (mp *MutationPipeline) ProcessReverse(ctx context.Context, pipeline *PredicatePipeline, postings *map[uint64]*pb.PostingList, info predicateInfo) error { + key := x.ReverseKey(pipeline.attr, 0) + edge := &pb.DirectedEdge{ + Attr: pipeline.attr, + } + reverseredMap := make(map[uint64]*pb.PostingList, 1000) + for uid, postingList := range *postings { + for _, posting := range postingList.Postings { + postingList, ok := reverseredMap[posting.Uid] + if !ok { + postingList = &pb.PostingList{} + } + edge.Entity = posting.Uid + edge.ValueId = uid + edge.ValueType = posting.ValType + edge.Op = GetPostingOp(posting.Op) + edge.Facets = posting.Facets + + postingList.Postings = append(postingList.Postings, makePostingFromEdge(mp.txn.StartTs, edge)) + reverseredMap[posting.Uid] = postingList + } + } + + if info.count { + newInfo := predicateInfo{ + isList: true, + index: info.index, + reverse: info.reverse, + count: info.count, + noConflict: info.noConflict, + hasUpsert: info.hasUpsert, + } + return mp.ProcessCount(ctx, pipeline, &reverseredMap, newInfo, true, true) + } + + for uid, pl := range reverseredMap { + if len(pl.Postings) == 0 { + continue + } + binary.BigEndian.PutUint64(key[len(key)-8:], uid) + if newPl, err := mp.txn.AddDelta(string(key), pl, true, true); err != nil { + return err + } else { + mp.txn.addConflictKeyWithUid(key, newPl, info.hasUpsert, info.noConflict) + } + } + + return nil +} + +func makePostingFromEdge(startTs uint64, edge *pb.DirectedEdge) *pb.Posting { + mpost := NewPosting(edge) + mpost.StartTs = startTs + if mpost.PostingType != pb.Posting_REF { + edge.ValueId = FingerprintEdge(edge) + mpost.Uid = edge.ValueId + } + return mpost +} + +func (mp *MutationPipeline) handleOldDeleteForSingle(pipeline *PredicatePipeline, postings map[uint64]*pb.PostingList) error { + edge := &pb.DirectedEdge{ + Attr: pipeline.attr, + } + + dataKey := x.DataKey(pipeline.attr, 0) + + for uid, postingList := range postings { + currValue := findSingleValueInPostingList(postingList) + if currValue == nil { + continue + } + + binary.BigEndian.PutUint64(dataKey[len(dataKey)-8:], uid) + list, err := mp.txn.GetScalarList(dataKey) + if err != nil { + return err + } + + oldValList, err := list.StaticValue(mp.txn.StartTs) + if err != nil { + return err + } + + oldVal := findSingleValueInPostingList(oldValList) + + if oldVal == nil { + continue + } + + if string(oldVal.Value) == string(currValue.Value) { + postings[uid] = &pb.PostingList{} + continue + } + + edge.Op = pb.DirectedEdge_DEL + edge.Value = oldVal.Value + edge.ValueType = oldVal.ValType + edge.ValueId = oldVal.Uid + + mpost := makePostingFromEdge(mp.txn.StartTs, edge) + postingList.Postings = append(postingList.Postings, mpost) + postings[uid] = postingList + } + + return nil +} + +func (txn *Txn) addConflictKeyWithUid(key []byte, pl *pb.PostingList, hasUpsert bool, hasNoConflict bool) { + if hasNoConflict { + return + } + txn.Lock() + defer txn.Unlock() + if txn.conflicts == nil { + txn.conflicts = make(map[uint64]struct{}) + } + keyHash := farm.Fingerprint64(key) + if hasUpsert { + txn.conflicts[keyHash] = struct{}{} + return + } + for _, post := range pl.Postings { + txn.conflicts[keyHash^post.Uid] = struct{}{} + } +} + +func (mp *MutationPipeline) ProcessCount(ctx context.Context, pipeline *PredicatePipeline, postings *map[uint64]*pb.PostingList, info predicateInfo, isListEdge bool, isReverseEdge bool) error { + dataKey := x.DataKey(pipeline.attr, 0) + if isReverseEdge { + dataKey = x.ReverseKey(pipeline.attr, 0) + } + edge := pb.DirectedEdge{ + Attr: pipeline.attr, + } + + countMap := make(map[int]*pb.PostingList, 2*len(*postings)) + + insertEdgeCount := func(count int) { + c, ok := countMap[count] + if !ok { + c = &pb.PostingList{} + countMap[count] = c + } + c.Postings = append(c.Postings, makePostingFromEdge(mp.txn.StartTs, &edge)) + countMap[count] = c + } + + for uid, postingList := range *postings { + binary.BigEndian.PutUint64(dataKey[len(dataKey)-8:], uid) + list, err := mp.txn.Get(dataKey) + if err != nil { + return err + } + + list.Lock() + prevCount := list.GetLength(mp.txn.StartTs) + + for _, post := range postingList.Postings { + found, _, _ := list.findPosting(post.StartTs, post.Uid) + if found { + if post.Op == Set && isListEdge { + post.Op = Ovr + } + } else { + if post.Op == Del { + continue + } + } + + if err := list.updateMutationLayer(post, !isListEdge, true); err != nil { + return err + } + } + + newCount := list.GetLength(mp.txn.StartTs) + updated := list.mutationMap.currentEntries != nil + list.Unlock() + + if updated { + if !isListEdge { + if !info.noConflict { + mp.txn.addConflictKey(farm.Fingerprint64(dataKey)) + } + } else { + mp.txn.addConflictKeyWithUid(dataKey, postingList, info.hasUpsert, info.noConflict) + } + } + + if newCount == prevCount { + continue + } + + //fmt.Println("COUNT STATS", uid, prevCount, newCount, postingList, list.Print()) + + edge.ValueId = uid + edge.Op = pb.DirectedEdge_DEL + if prevCount > 0 { + insertEdgeCount(prevCount) + } + edge.Op = pb.DirectedEdge_SET + if newCount > 0 { + insertEdgeCount(newCount) + } + } + + for c, pl := range countMap { + //fmt.Println("COUNT", c, pl) + ck := x.CountKey(pipeline.attr, uint32(c), isReverseEdge) + if newPl, err := mp.txn.AddDelta(string(ck), pl, true, true); err != nil { + return err + } else { + mp.txn.addConflictKeyWithUid(ck, newPl, info.hasUpsert, info.noConflict) + } + } + + return nil +} + +func (mp *MutationPipeline) ProcessSingle(ctx context.Context, pipeline *PredicatePipeline, info predicateInfo) error { + su, schemaExists := schema.State().Get(ctx, pipeline.attr) + + postings := make(map[uint64]*pb.PostingList, 1000) + + dataKey := x.DataKey(pipeline.attr, 0) + insertDeleteAllEdge := !(info.index || info.reverse || info.count) // nolint + + var oldVal *pb.Posting + for edge := range pipeline.edges { + // fmt.Println("SINGLE EDGE", edge) + if edge.Op != pb.DirectedEdge_DEL && !schemaExists { + return errors.Errorf("runMutation: Unable to find schema for %s", edge.Attr) + } + + if err := ValidateAndConvert(edge, &su); err != nil { + return err + } + + uid := edge.Entity + pl, exists := postings[uid] + + setPosting := func() { + mpost := makePostingFromEdge(mp.txn.StartTs, edge) + if len(pl.Postings) == 0 { + if insertDeleteAllEdge { + pl = &pb.PostingList{ + Postings: []*pb.Posting{createDeleteAllPosting(), mpost}, + } + } else { + pl = &pb.PostingList{ + Postings: []*pb.Posting{mpost}, + } + } + } else { + if pl.Postings[len(pl.Postings)-1].Op == Set { + pl.Postings[len(pl.Postings)-1] = mpost + } else { + pl.Postings = append(pl.Postings, mpost) + } + } + postings[uid] = pl + } + + if exists { + if edge.Op == pb.DirectedEdge_DEL { + oldVal = findSingleValueInPostingList(pl) + if string(edge.Value) == string(oldVal.Value) { + setPosting() + } + } else { + setPosting() + } + continue + } + + pl = &pb.PostingList{} + postings[uid] = pl + + if edge.Op == pb.DirectedEdge_DEL { + binary.BigEndian.PutUint64(dataKey[len(dataKey)-8:], uid) + list, err := mp.txn.GetScalarList(dataKey) + if err != nil { + return err + } + if list != nil { + l, err := list.StaticValue(mp.txn.StartTs) + if err != nil { + return err + } + oldVal = findSingleValueInPostingList(l) + } + if oldVal != nil { + if string(oldVal.Value) == string(edge.Value) { + setPosting() + } + } + } else { + setPosting() + } + } + + if info.index || info.reverse || info.count { + if err := mp.handleOldDeleteForSingle(pipeline, postings); err != nil { + return err + } + } + + if info.index { + if err := mp.InsertTokenizerIndexes(ctx, pipeline, &postings, info); err != nil { + return err + } + } + + if info.reverse { + if err := mp.ProcessReverse(ctx, pipeline, &postings, info); err != nil { + return err + } + } + + if info.count { + // Count should take care of updating the posting list + return mp.ProcessCount(ctx, pipeline, &postings, info, false, false) + } + + baseKey := string(dataKey[:len(dataKey)-8]) // Avoid repeated conversion + + for uid, pl := range postings { + //fmt.Println("ADDING DELTA", uid, pipeline.attr, pl) + binary.BigEndian.PutUint64(dataKey[len(dataKey)-8:], uid) + key := baseKey + string(dataKey[len(dataKey)-8:]) + + if !info.noConflict { + mp.txn.addConflictKey(farm.Fingerprint64([]byte(key))) + } + + if _, err := mp.txn.AddDelta(key, pl, false, false); err != nil { + return err + } + } + + return nil +} + +func runMutation(ctx context.Context, edge *pb.DirectedEdge, txn *Txn) error { + ctx = schema.GetWriteContext(ctx) + + // We shouldn't check whether this Alpha serves this predicate or not. Membership information + // isn't consistent across the entire cluster. We should just apply whatever is given to us. + su, ok := schema.State().Get(ctx, edge.Attr) + if edge.Op != pb.DirectedEdge_DEL { + if !ok { + return errors.Errorf("runMutation: Unable to find schema for %s", edge.Attr) + } + } + + key := x.DataKey(edge.Attr, edge.Entity) + // The following is a performance optimization which allows us to not read a posting list from + // disk. We calculate this based on how AddMutationWithIndex works. The general idea is that if + // we're not using the read posting list, we don't need to retrieve it. We need the posting list + // if we're doing count index or delete operation. For scalar predicates, we just get the last item merged. + // In other cases, we can just create a posting list facade in memory and use it to store the delta in Badger. + // Later, the rollup operation would consolidate all these deltas into a posting list. + isList := su.GetList() + var getFn func(key []byte) (*List, error) + switch { + case len(edge.Lang) == 0 && !isList: + // Scalar Predicates, without lang + getFn = txn.GetScalarList + case len(edge.Lang) > 0 || su.GetCount(): + // Language or Count Index + getFn = txn.Get + case edge.Op == pb.DirectedEdge_DEL: + // Covers various delete cases to keep things simple. + getFn = txn.Get + default: + // Only count index needs to be read. For other indexes on list, we don't need to read any data. + // For indexes on scalar prediactes, only the last element needs to be left. + // Delete cases covered above. + getFn = txn.GetFromDelta + } + + plist, err := getFn(key) + if err != nil { + return err + } + return plist.AddMutationWithIndex(ctx, edge, txn) +} + +func (mp *MutationPipeline) ProcessPredicate(ctx context.Context, pipeline *PredicatePipeline) error { + defer pipeline.close() + ctx = schema.GetWriteContext(ctx) + + // We shouldn't check whether this Alpha serves this predicate or not. Membership information + // isn't consistent across the entire cluster. We should just apply whatever is given to us. + su, ok := schema.State().Get(ctx, pipeline.attr) + info := predicateInfo{} + runForVectorIndex := false + + if ok { + info.index = schema.State().IsIndexed(ctx, pipeline.attr) + info.count = schema.State().HasCount(ctx, pipeline.attr) + info.reverse = schema.State().IsReversed(ctx, pipeline.attr) + info.noConflict = schema.State().HasNoConflict(pipeline.attr) + info.hasUpsert = schema.State().HasUpsert(pipeline.attr) + info.isList = schema.State().IsList(pipeline.attr) + info.isUid = su.ValueType == pb.Posting_UID + factorySpecs, err := schema.State().FactoryCreateSpec(ctx, pipeline.attr) + if err != nil { + return err + } + if len(factorySpecs) > 0 { + runForVectorIndex = true + } + } + + if runForVectorIndex { + return mp.ProcessVectorIndex(ctx, pipeline, info) + } + + runListFn := false + + if ok { + if info.isList || su.Lang { + runListFn = true + } + } + + info.isSingleEdge = !runListFn + + if runListFn { + if err := mp.ProcessList(ctx, pipeline, info); err != nil { + return err + } + } + + if ok && !runListFn { + if err := mp.ProcessSingle(ctx, pipeline, info); err != nil { + return err + } + } + + for edge := range pipeline.edges { + if err := runMutation(ctx, edge, mp.txn); err != nil { + return err + } + } + + return nil +} + +func isStarAll(v []byte) bool { + return bytes.Equal(v, []byte(x.Star)) +} + +func ValidateAndConvert(edge *pb.DirectedEdge, su *pb.SchemaUpdate) error { + if types.TypeID(edge.ValueType) == types.DefaultID && isStarAll(edge.Value) { + return nil + } + + storageType := TypeID(edge) + schemaType := types.TypeID(su.ValueType) + + // type checks + switch { + case edge.Lang != "" && !su.GetLang(): + return errors.Errorf("Attr: [%v] should have @lang directive in schema to mutate edge: [%v]", + x.ParseAttr(edge.Attr), edge) + + case !schemaType.IsScalar() && !storageType.IsScalar(): + return nil + + case !schemaType.IsScalar() && storageType.IsScalar(): + return errors.Errorf("Input for predicate %q of type uid is scalar. Edge: %v", + x.ParseAttr(edge.Attr), edge) + + case schemaType.IsScalar() && !storageType.IsScalar(): + return errors.Errorf("Input for predicate %q of type scalar is uid. Edge: %v", + x.ParseAttr(edge.Attr), edge) + + case schemaType == types.TypeID(pb.Posting_VFLOAT): + if !(storageType == types.TypeID(pb.Posting_VFLOAT) || storageType == types.TypeID(pb.Posting_STRING) || //nolint + storageType == types.TypeID(pb.Posting_DEFAULT)) { + return errors.Errorf("Input for predicate %q of type vector is not vector."+ + " Did you forget to add quotes before []?. Edge: %v", x.ParseAttr(edge.Attr), edge) + } + + // The suggested storage type matches the schema, OK! (Nothing to do ...) + case storageType == schemaType && schemaType != types.DefaultID: + return nil + + // We accept the storage type iff we don't have a schema type and a storage type is specified. + case schemaType == types.DefaultID: + schemaType = storageType + } + + var ( + dst types.Val + err error + ) + + src := types.Val{Tid: types.TypeID(edge.ValueType), Value: edge.Value} + // check compatibility of schema type and storage type + // The goal is to convert value on edge to value type defined by schema. + if dst, err = types.Convert(src, schemaType); err != nil { + return err + } + + // convert to schema type + b := types.ValueForType(types.BinaryID) + if err = types.Marshal(dst, &b); err != nil { + return err + } + + if x.WorkerConfig.AclEnabled && x.ParseAttr(edge.GetAttr()) == "dgraph.rule.permission" { + perm, ok := dst.Value.(int64) + if !ok { + return errors.Errorf("Value for predicate should be of type int") + } + if perm < 0 || perm > 7 { + return errors.Errorf("Can't set to %d, Value for this"+ + " predicate should be between 0 and 7", perm) + } + } + + // TODO: Figure out why this is Enum. It really seems like an odd choice -- rather than + // specifying it as the same type as presented in su. + edge.ValueType = schemaType.Enum() + var ok bool + edge.Value, ok = b.Value.([]byte) + if !ok { + return errors.Errorf("failure to convert edge type: '%+v' to schema type: '%+v'", + storageType, schemaType) + } + + return nil +} + +func (mp *MutationPipeline) Process(ctx context.Context, edges []*pb.DirectedEdge) error { + predicates := map[string]*PredicatePipeline{} + var wg sync.WaitGroup + numWg := 0 + eg, egCtx := errgroup.WithContext(ctx) + for _, edge := range edges { + //fmt.Println("PROCESSING EDGE", edge) + if edge.Op == pb.DirectedEdge_DEL && string(edge.Value) == x.Star { + l, err := mp.txn.Get(x.DataKey(edge.Attr, edge.Entity)) + if err != nil { + return err + } + if err = l.handleDeleteAll(ctx, edge, mp.txn); err != nil { + return err + } + continue + } + pred, ok := predicates[edge.Attr] + if !ok { + pred = &PredicatePipeline{ + attr: edge.Attr, + edges: make(chan *pb.DirectedEdge, 1000), + wg: &wg, + } + predicates[edge.Attr] = pred + wg.Add(1) + numWg += 1 + // Capture pred by passing it as a parameter to the closure + eg.Go(func(p *PredicatePipeline) func() error { + return func() error { + return mp.ProcessPredicate(egCtx, p) + } + }(pred)) + } + pred.edges <- edge + } + for _, pred := range predicates { + close(pred.edges) + } + if numWg == 0 { + return nil + } + // Wait for all predicate processors; returns first error (and cancels others via context). + if err := eg.Wait(); err != nil { + return err + } + return nil +} func (txn *Txn) addIndexMutations(ctx context.Context, info *indexMutationInfo) ([]*pb.DirectedEdge, error) { if info.tokenizers == nil { @@ -951,7 +2028,7 @@ func (r *rebuilder) Run(ctx context.Context) error { // txn.cache.Lock() is not required because we are the only one making changes to txn. kvs := make([]*bpb.KV, 0) - for key, data := range streamTxn.cache.deltas { + if err := streamTxn.cache.deltas.IterateBytes(func(key string, data []byte) error { version := atomic.AddUint64(&counter, 1) kv := bpb.KV{ Key: []byte(key), @@ -960,7 +2037,11 @@ func (r *rebuilder) Run(ctx context.Context) error { Version: version, } kvs = append(kvs, &kv) + return nil + }); err != nil { + return nil, err } + txns[threadId] = NewTxn(r.startTs) return &bpb.KVList{Kv: kvs}, nil } @@ -1009,7 +2090,7 @@ func (r *rebuilder) Run(ctx context.Context) error { // Convert data into deltas. streamTxn.Update() // txn.cache.Lock() is not required because we are the only one making changes to txn. - for key, data := range streamTxn.cache.deltas { + if err := streamTxn.cache.deltas.IterateBytes(func(key string, data []byte) error { version := atomic.AddUint64(&counter, 1) kv := bpb.KV{ Key: []byte(key), @@ -1018,6 +2099,9 @@ func (r *rebuilder) Run(ctx context.Context) error { Version: version, } kvs = append(kvs, &kv) + return nil + }); err != nil { + return nil, err } txns[threadId] = NewTxn(r.startTs) diff --git a/posting/index_test.go b/posting/index_test.go index b8bb7887b1a..59a0c5b723b 100644 --- a/posting/index_test.go +++ b/posting/index_test.go @@ -139,19 +139,28 @@ func addMutation(t *testing.T, l *List, edge *pb.DirectedEdge, op uint32, } txn := Oracle().RegisterStartTs(startTs) txn.cache.SetIfAbsent(string(l.key), l) - if index { - require.NoError(t, l.AddMutationWithIndex(context.Background(), edge, txn)) - } else { - require.NoError(t, l.addMutation(context.Background(), txn, edge)) - } + mp := NewMutationPipeline(txn) + err := mp.Process(context.Background(), []*pb.DirectedEdge{edge}) + require.NoError(t, err) txn.Update() txn.UpdateCachedKeys(commitTs) writer := NewTxnWriter(pstore) require.NoError(t, txn.CommitToDisk(writer, commitTs)) require.NoError(t, writer.Flush()) + newTxn := Oracle().RegisterStartTs(commitTs + 1) + l1, err := newTxn.Get(l.key) + require.NoError(t, err) + *l = *l1 //nolint } +const schemaPreVal = ` + name: string . + name2: string . + dob: dateTime . + friend: [uid] . +` + const schemaVal = ` name: string @index(term) . name2: string @index(term) . @@ -263,6 +272,9 @@ func addEdgeToUID(t *testing.T, attr string, src uint64, func TestCountReverseIndexWithData(t *testing.T) { require.NoError(t, pstore.DropAll()) MemLayerInstance.clear() + preIndex := "testcount: [uid] ." + require.NoError(t, schema.ParseBytes([]byte(preIndex), 1)) + indexNameCountVal := "testcount: [uid] @count @reverse ." attr := x.AttrInRootNamespace("testcount") @@ -298,6 +310,9 @@ func TestCountReverseIndexWithData(t *testing.T) { func TestCountReverseIndexEmptyPosting(t *testing.T) { require.NoError(t, pstore.DropAll()) MemLayerInstance.clear() + preIndex := "testcount: [uid] ." + require.NoError(t, schema.ParseBytes([]byte(preIndex), 1)) + indexNameCountVal := "testcount: [uid] @count @reverse ." attr := x.AttrInRootNamespace("testcount") @@ -330,6 +345,8 @@ func TestCountReverseIndexEmptyPosting(t *testing.T) { } func TestRebuildTokIndex(t *testing.T) { + require.NoError(t, schema.ParseBytes([]byte(schemaPreVal), 1)) + addEdgeToValue(t, x.AttrInRootNamespace("name2"), 91, "Michonne", uint64(1), uint64(2)) addEdgeToValue(t, x.AttrInRootNamespace("name2"), 92, "David", uint64(3), uint64(4)) diff --git a/posting/list.go b/posting/list.go index 2a3638cca01..f3f16ba574f 100644 --- a/posting/list.go +++ b/posting/list.go @@ -335,6 +335,7 @@ func (mm *MutableLayer) insertCommittedPostings(pl *pb.PostingList) { // We insert old postings in reverse order. So we only need to read the first update to an UID. if _, ok := mm.committedUids[mpost.Uid]; !ok { mm.committedUids[mpost.Uid] = mpost + fmt.Println("Buidlding committed uids", mm.committedUids, mpost) } } } @@ -404,13 +405,18 @@ func (mm *MutableLayer) print() string { if mm == nil { return "" } - return fmt.Sprintf("Committed List: %+v Proposed list: %+v Delete all marker: %d \n", + return fmt.Sprintf("Committed List: %+v Proposed list: %+v Delete all marker: %d. Count: %d \n", mm.committedEntries, mm.currentEntries, - mm.deleteAllMarker) + mm.deleteAllMarker, + mm.length) } func (l *List) Print() string { + if l.plist.Pack != nil { + uids := codec.Decode(l.plist.Pack, 0) + return fmt.Sprintf("minTs: %d, committed uids: %+v, mutationMap: %s", l.minTs, uids, l.mutationMap.print()) + } return fmt.Sprintf("minTs: %d, plist: %+v, mutationMap: %s", l.minTs, l.plist, l.mutationMap.print()) } @@ -712,6 +718,53 @@ type ListOptions struct { First int } +func NewPostingExisting(p *pb.Posting, t *pb.DirectedEdge) { + var op uint32 + switch t.Op { + case pb.DirectedEdge_SET: + op = Set + case pb.DirectedEdge_OVR: + op = Ovr + case pb.DirectedEdge_DEL: + op = Del + default: + x.Fatalf("Unhandled operation: %+v", t) + } + + var postingType pb.Posting_PostingType + switch { + case len(t.Lang) > 0: + postingType = pb.Posting_VALUE_LANG + case t.ValueId == 0: + postingType = pb.Posting_VALUE + default: + postingType = pb.Posting_REF + } + + p.Uid = t.ValueId + p.Value = t.Value + p.ValType = t.ValueType + p.PostingType = postingType + p.LangTag = []byte(t.Lang) + p.Op = op + p.Facets = t.Facets +} + +func GetPostingOp(top uint32) pb.DirectedEdge_Op { + var op pb.DirectedEdge_Op + switch top { + case Set: + op = pb.DirectedEdge_SET + case Del: + op = pb.DirectedEdge_DEL + case Ovr: + op = pb.DirectedEdge_OVR + default: + x.Fatalf("Unhandled operation: %+v", top) + } + return op +} + // NewPosting takes the given edge and returns its equivalent representation as a posting. func NewPosting(t *pb.DirectedEdge) *pb.Posting { var op uint32 @@ -789,12 +842,12 @@ func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate, hasCountI // The current value should be deleted in favor of this value. This needs to // be done because the fingerprint for the value is not math.MaxUint64 as is // the case with the rest of the scalar predicates. - newPlist := &pb.PostingList{} + newPlist := &pb.PostingList{ + Postings: []*pb.Posting{createDeleteAllPosting()}, + } if mpost.Op != Del { - // If we are setting a new value then we can just delete all the older values. - newPlist.Postings = append(newPlist.Postings, createDeleteAllPosting()) + newPlist.Postings = append(newPlist.Postings, mpost) } - newPlist.Postings = append(newPlist.Postings, mpost) l.mutationMap.setCurrentEntries(mpost.StartTs, newPlist) return nil } @@ -833,6 +886,10 @@ func fingerprintEdge(t *pb.DirectedEdge) uint64 { return id } +func FingerprintEdge(t *pb.DirectedEdge) uint64 { + return fingerprintEdge(t) +} + func (l *List) addMutation(ctx context.Context, txn *Txn, t *pb.DirectedEdge) error { l.Lock() defer l.Unlock() @@ -1043,6 +1100,12 @@ func (l *List) setMutationAfterCommit(startTs, commitTs uint64, pl *pb.PostingLi func (l *List) setMutation(startTs uint64, data []byte) { pl := new(pb.PostingList) x.Check(proto.Unmarshal(data, pl)) + l.setMutationWithPosting(startTs, pl) +} + +func (l *List) setMutationWithPosting(startTs uint64, pl *pb.PostingList) { + // pk, _ := x.Parse(l.key) + // fmt.Println("Setting mutation for ", l.key, pk, pl) l.Lock() if l.mutationMap == nil { @@ -1110,6 +1173,13 @@ func (l *List) pickPostings(readTs uint64) (uint64, []*pb.Posting) { } return pi.Uid < pj.Uid }) + + if len(posts) > 0 { + if hasDeleteAll(posts[0]) { + posts = posts[1:] + } + } + return deleteAllMarker, posts } @@ -1258,6 +1328,11 @@ func (l *List) GetLength(readTs uint64) int { length += immutLen } + // pureLength := l.length(readTs, 0) + // if pureLength != length { + // panic(fmt.Sprintf("pure length != length %d %d %s", pureLength, length, l.Print())) + // } + return length } @@ -1451,11 +1526,17 @@ func (l *List) Rollup(alloc *z.Allocator, readTs uint64) ([]*bpb.KV, error) { return bytes.Compare(kvs[i].Key, kvs[j].Key) <= 0 }) - x.PrintRollup(out.plist, out.parts, l.key, kv.Version) + PrintRollup(out.plist, out.parts, l.key, kv.Version) x.VerifyPostingSplits(kvs, out.plist, out.parts, l.key) return kvs, nil } +func PrintRollup(plist *pb.PostingList, parts map[uint64]*pb.PostingList, baseKey []byte, ts uint64) { + k, _ := x.Parse(baseKey) + uids := codec.Decode(plist.Pack, 0) + fmt.Printf("[TXNLOG] DOING ROLLUP for key: %+v at timestamp: %v, uids: %+v\n", k, ts, uids) +} + // ToBackupPostingList uses rollup to generate a single list with no splits. // It's used during backup so that each backed up posting list is stored in a single key. func (l *List) ToBackupPostingList( @@ -2007,6 +2088,18 @@ func (l *List) findStaticValue(readTs uint64) *pb.PostingList { if l.plist != nil && len(l.plist.Postings) > 0 { return l.plist } + if l.plist != nil && l.plist.Pack != nil { + uids := codec.Decode(l.plist.Pack, 0) + return &pb.PostingList{ + Postings: []*pb.Posting{ + { + Uid: uids[0], + ValType: pb.Posting_UID, + Op: Set, + }, + }, + } + } return nil } diff --git a/posting/list_test.go b/posting/list_test.go index f64fcbc765c..8002b226b38 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -124,7 +124,6 @@ func TestGetSinglePosting(t *testing.T) { res, err := l.StaticValue(1) require.NoError(t, err) - fmt.Println(res, res == nil) require.Equal(t, res == nil, true) l.plist = create_pl(1, 1) diff --git a/posting/lists.go b/posting/lists.go index fa2fc1c6605..583ca08ef79 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -20,6 +20,7 @@ import ( "github.com/dgraph-io/dgo/v250/protos/api" "github.com/dgraph-io/ristretto/v2/z" "github.com/hypermodeinc/dgraph/v25/protos/pb" + "github.com/hypermodeinc/dgraph/v25/types" "github.com/hypermodeinc/dgraph/v25/x" ) @@ -69,10 +70,7 @@ type LocalCache struct { startTs uint64 commitTs uint64 - // The keys for these maps is a string representation of the Badger key for the posting list. - // deltas keep track of the updates made by txn. These must be kept around until written to disk - // during commit. - deltas map[string][]byte + deltas *Deltas // max committed timestamp of the read posting lists. maxVersions map[string]uint64 @@ -81,6 +79,143 @@ type LocalCache struct { plists map[string]*List } +// The keys for these maps is a string representation of the Badger key for the posting list. +// deltas keep track of the updates made by txn. These must be kept around until written to disk +// during commit. +type Deltas struct { + deltas *types.LockedShardedMap[string, []byte] + + // We genereate indexes for the posting lists all at once. Moving them from this map to deltas + // map is uneccessary. More data can be stored per predicate later on. + indexMap map[string]*types.LockedShardedMap[string, *pb.PostingList] +} + +func NewDeltas() *Deltas { + return &Deltas{ + deltas: types.NewLockedShardedMap[string, []byte](), + indexMap: map[string]*types.LockedShardedMap[string, *pb.PostingList]{}, + } +} + +// Call this function after taking a lock on the cache. +func (d *Deltas) GetIndexMapForPredicate(pred string) *types.LockedShardedMap[string, *pb.PostingList] { + val, ok := d.indexMap[pred] + if !ok { + d.indexMap[pred] = types.NewLockedShardedMap[string, *pb.PostingList]() + return d.indexMap[pred] + } + return val +} + +func (d *Deltas) Get(key string) (*pb.PostingList, bool) { + if d == nil { + return nil, false + } + pk, err := x.Parse([]byte(key)) + if err != nil { + return nil, false + } + + res := &pb.PostingList{} + + val, ok := d.deltas.Get(key) + if ok { + if err := proto.Unmarshal(val, res); err != nil { + return nil, false + } + } + + if indexMap, ok := d.indexMap[pk.Attr]; ok { + if value, ok1 := indexMap.Get(key); ok1 { + res.Postings = append(res.Postings, value.Postings...) + } + } + + // fmt.Println("GETTING KEY FROM DELTAS", pk, "res", res, "val", val, "d.deltas", d.deltas, "d.indexMap[pk.Attr]", d.indexMap[pk.Attr], "ok", ok) + + return res, len(res.Postings) > 0 +} + +func (d *Deltas) GetBytes(key string) ([]byte, bool) { + if len(d.indexMap) == 0 { + return d.deltas.Get(key) + } + + pk, err := x.Parse([]byte(key)) + if err != nil { + return nil, false + } + + delta, deltaFound := d.deltas.Get(key) + + if indexMap, ok := d.indexMap[pk.Attr]; ok { + if value, ok1 := indexMap.Get(key); ok1 && deltaFound && len(value.Postings) > 0 { + res := &pb.PostingList{} + if err := proto.Unmarshal(delta, res); err != nil { + return nil, false + } + res.Postings = append(res.Postings, value.Postings...) + data, err := proto.Marshal(res) + if err != nil { + return nil, false + } + return data, true + } else if ok1 && len(value.Postings) > 0 { + data, err := proto.Marshal(value) + if err != nil { + return nil, false + } + return data, true + } + } + + return delta, deltaFound +} + +func (d *Deltas) AddToDeltas(key string, delta []byte) { + d.deltas.Set(key, delta) +} + +func (d *Deltas) IterateKeys(fn func(key string) error) error { + for _, v := range d.indexMap { + if err := v.Iterate(func(key string, value *pb.PostingList) error { + return fn(key) + }); err != nil { + return err + } + } + if err := d.deltas.Iterate(func(key string, value []byte) error { + return fn(key) + }); err != nil { + return err + } + return nil +} + +func (d *Deltas) IteratePostings(fn func(key string, value *pb.PostingList) error) error { + return d.IterateKeys(func(key string) error { + val, ok := d.Get(key) + if !ok { + return nil + } + return fn(key, val) + }) +} + +func (d *Deltas) IterateBytes(fn func(key string, value []byte) error) error { + return d.IterateKeys(func(key string) error { + val, ok := d.Get(key) + if !ok { + return nil + } + data, err := proto.Marshal(val) + if err != nil { + return err + } + return fn(key, data) + }) +} + // struct to implement LocalCache interface from vector-indexer // acts as wrapper for dgraph *LocalCache type viLocalCache struct { @@ -135,7 +270,7 @@ func NewViLocalCache(delegate *LocalCache) *viLocalCache { func NewLocalCache(startTs uint64) *LocalCache { return &LocalCache{ startTs: startTs, - deltas: make(map[string][]byte), + deltas: NewDeltas(), plists: make(map[string]*List), maxVersions: make(map[string]uint64), } @@ -264,9 +399,20 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk, readUids bool) (*Lis lc.RLock() defer lc.RUnlock() if lc.plists == nil { - return getNew(key, pstore, lc.startTs, readUids) + l, err := getNew(key, pstore, lc.startTs, readUids) + if err != nil { + return nil, err + } + pk, _ := x.Parse(key) + fmt.Println("READING NEW PLIST", pk, l.Print()) + return l, nil } if l, ok := lc.plists[skey]; ok { + if delta, ok := lc.deltas.Get(skey); ok && delta != nil { + l.setMutationWithPosting(lc.startTs, delta) + } + pk, _ := x.Parse(key) + fmt.Println("READING PLIST", pk, l.Print()) return l, nil } return nil, nil @@ -294,10 +440,13 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk, readUids bool) (*Lis // If we just brought this posting list into memory and we already have a delta for it, let's // apply it before returning the list. lc.RLock() - if delta, ok := lc.deltas[skey]; ok && len(delta) > 0 { - pl.setMutation(lc.startTs, delta) + if delta, ok := lc.deltas.Get(skey); ok && delta != nil { + pl.setMutationWithPosting(lc.startTs, delta) } lc.RUnlock() + + pk, _ := x.Parse(key) + fmt.Println("READING ", pk, pl.Print()) return lc.SetIfAbsent(skey, pl), nil } @@ -334,21 +483,26 @@ func (lc *LocalCache) readPostingListAt(key []byte) (*pb.PostingList, error) { // given key. This is used for retrieving the value of a scalar predicats. func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { // This would return an error if there is some data in the local cache, but we couldn't read it. + + pk, _ := x.Parse(key) + fmt.Println("READING SINGLE ", pk) + getListFromLocalCache := func() (*pb.PostingList, error) { lc.RLock() - pl := &pb.PostingList{} - if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { - err := proto.Unmarshal(delta, pl) + if delta, ok := lc.deltas.Get(string(key)); ok && delta != nil { lc.RUnlock() - return pl, err + fmt.Println("READING SINGLE FROM DELTA", pk, delta) + return delta, nil } l := lc.plists[string(key)] lc.RUnlock() if l != nil { - return l.StaticValue(lc.startTs) + res, err := l.StaticValue(lc.startTs) + fmt.Println("READING SINGLE FROM PLISTS", pk, res, err, l.Print()) + return res, err } return nil, nil @@ -366,25 +520,28 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { } pl, err := getPostings() + + fmt.Println("READING SINGLE ", pk, "pl:", pl) + if err == badger.ErrKeyNotFound { + fmt.Println("READING ", pk, nil) return nil, nil } if err != nil { + fmt.Println("READING ", pk, err) return nil, err } // Filter and remove STAR_ALL and OP_DELETE Postings idx := 0 for _, postings := range pl.Postings { - if hasDeleteAll(postings) { - return nil, nil - } if postings.Op != Del { pl.Postings[idx] = postings idx++ } } pl.Postings = pl.Postings[:idx] + // fmt.Println("READING ", pk, pl) return pl, nil } @@ -415,7 +572,7 @@ func (lc *LocalCache) UpdateDeltasAndDiscardLists() { for key, pl := range lc.plists { data := pl.getMutation(lc.startTs) if len(data) > 0 { - lc.deltas[key] = data + lc.deltas.AddToDeltas(key, data) } lc.maxVersions[key] = pl.maxVersion() // We can't run pl.release() here because LocalCache is still being used by other callers @@ -428,16 +585,19 @@ func (lc *LocalCache) UpdateDeltasAndDiscardLists() { func (lc *LocalCache) fillPreds(ctx *api.TxnContext, gid uint32) { lc.RLock() defer lc.RUnlock() - for key := range lc.deltas { + if err := lc.deltas.IterateKeys(func(key string) error { pk, err := x.Parse([]byte(key)) x.Check(err) if len(pk.Attr) == 0 { - continue + return nil } // Also send the group id that the predicate was being served by. This is useful when // checking if Zero should allow a commit during a predicate move. predKey := fmt.Sprintf("%d-%s", gid, pk.Attr) ctx.Preds = append(ctx.Preds, predKey) + return nil + }); err != nil { + x.Check(err) } ctx.Preds = x.Unique(ctx.Preds) } diff --git a/posting/mvcc.go b/posting/mvcc.go index 06b70fdce57..8f83949018e 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -9,6 +9,7 @@ import ( "bytes" "context" "encoding/hex" + "fmt" "math" "strconv" "sync" @@ -276,8 +277,11 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error { defer cache.Unlock() var keys []string - for key := range cache.deltas { + if err := cache.deltas.IterateKeys(func(key string) error { keys = append(keys, key) + return nil + }); err != nil { + return err } defer func() { @@ -296,10 +300,19 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error { err := writer.update(commitTs, func(btxn *badger.Txn) error { for ; idx < len(keys); idx++ { key := keys[idx] - data := cache.deltas[key] - if len(data) == 0 { + data, ok := cache.deltas.GetBytes(key) + if !ok || data == nil { + continue + } + pl := &pb.PostingList{} + if err := proto.Unmarshal(data, pl); err != nil { + return err + } + if len(pl.Postings) == 0 { continue } + pk, _ := x.Parse([]byte(key)) + fmt.Println("COMMITTING", pk, pl) if ts := cache.maxVersions[key]; ts >= commitTs { // Skip write because we already have a write at a higher ts. // Logging here can cause a lot of output when doing Raft log replay. So, let's @@ -482,7 +495,8 @@ func (ml *MemoryLayer) IterateDisk(ctx context.Context, f IterateDiskArgs) error if err != nil { return err } - empty, err := l.IsEmpty(f.ReadTs, 0) + empty, err := false, nil + //empty, err := l.IsEmpty(f.ReadTs, 0) switch { case err != nil: return err @@ -567,7 +581,7 @@ func (ml *MemoryLayer) wait() { ml.cache.wait() } -func (ml *MemoryLayer) updateItemInCache(key string, delta []byte, startTs, commitTs uint64) { +func (ml *MemoryLayer) updateItemInCache(key string, delta *pb.PostingList, startTs, commitTs uint64) { if commitTs == 0 { return } @@ -579,24 +593,19 @@ func (ml *MemoryLayer) updateItemInCache(key string, delta []byte, startTs, comm } val, ok := ml.cache.get([]byte(key)) - if !ok { - return - } - val.lastUpdate = commitTs + if ok && val.list != nil && val.list.minTs <= commitTs { + val.lastUpdate = commitTs - if val.list != nil { - p := new(pb.PostingList) - x.Check(proto.Unmarshal(delta, p)) - - if p.Pack == nil { - val.list.setMutationAfterCommit(startTs, commitTs, p, true) - checkForRollup([]byte(key), val.list) - } else { - // Data was rolled up. TODO figure out how is UpdateCachedKeys getting delta which is pack) - ml.del([]byte(key)) + if val.list != nil { + if delta.Pack == nil { + val.list.setMutationAfterCommit(startTs, commitTs, delta, true) + checkForRollup([]byte(key), val.list) + } else { + // Data was rolled up. TODO figure out how is UpdateCachedKeys getting delta which is pack) + ml.del([]byte(key)) + } } - } } @@ -607,8 +616,11 @@ func (txn *Txn) UpdateCachedKeys(commitTs uint64) { } MemLayerInstance.wait() - for key, delta := range txn.cache.deltas { - MemLayerInstance.updateItemInCache(key, delta, txn.StartTs, commitTs) + if err := txn.cache.deltas.IteratePostings(func(key string, value *pb.PostingList) error { + MemLayerInstance.updateItemInCache(key, value, txn.StartTs, commitTs) + return nil + }); err != nil { + glog.Errorf("UpdateCachedKeys: error while iterating deltas: %v", err) } } diff --git a/posting/mvcc_test.go b/posting/mvcc_test.go index 3fe6382c116..ffdbbee7417 100644 --- a/posting/mvcc_test.go +++ b/posting/mvcc_test.go @@ -72,7 +72,7 @@ func TestCacheAfterDeltaUpdateRecieved(t *testing.T) { // Write delta to disk and call update txn := Oracle().RegisterStartTs(5) - txn.cache.deltas[string(key)] = delta + txn.cache.deltas.AddToDeltas(string(key), delta) writer := NewTxnWriter(pstore) require.NoError(t, txn.CommitToDisk(writer, 15)) @@ -144,6 +144,8 @@ func BenchmarkTestCache(b *testing.B) { } func TestRollupTimestamp(t *testing.T) { + require.NoError(t, schema.ParseBytes([]byte("rollup: [uid] ."), 1)) + attr := x.AttrInRootNamespace("rollup") key := x.DataKey(attr, 1) // 3 Delta commits. @@ -183,6 +185,8 @@ func TestRollupTimestamp(t *testing.T) { } func TestPostingListRead(t *testing.T) { + require.NoError(t, schema.ParseBytes([]byte("emptypl: [uid] ."), 1)) + attr := x.AttrInRootNamespace("emptypl") key := x.DataKey(attr, 1) diff --git a/posting/oracle.go b/posting/oracle.go index 5b6c4510c22..79acdd3d952 100644 --- a/posting/oracle.go +++ b/posting/oracle.go @@ -9,6 +9,7 @@ import ( "context" "encoding/hex" "math" + "sort" "sync" "sync/atomic" "time" @@ -16,6 +17,7 @@ import ( "github.com/dgraph-io/badger/v4" "github.com/golang/glog" ostats "go.opencensus.io/stats" + "google.golang.org/protobuf/proto" "github.com/hypermodeinc/dgraph/v25/protos/pb" "github.com/hypermodeinc/dgraph/v25/tok/index" @@ -54,6 +56,84 @@ type Txn struct { lastUpdate time.Time cache *LocalCache // This pointer does not get modified. + + pointers [](*[]byte) +} + +func (txn *Txn) AddPointer(p *[]byte) { + if txn.pointers == nil { + txn.pointers = make([](*[]byte), 1) + txn.pointers[0] = p + } + txn.pointers = append(txn.pointers, p) +} + +func (txn *Txn) GetPointers() [](*[]byte) { + return txn.pointers +} + +func SortAndDedupPostings(postings []*pb.Posting) []*pb.Posting { + // Sort postings by UID + sort.Slice(postings, func(i, j int) bool { + return postings[i].Uid < postings[j].Uid + }) + + //In-place filtering: keep only the last occurrence for each UID + n := 0 // write index + for i := 0; i < len(postings); { + j := i + 1 + // Skip all postings with same UID + for j < len(postings) && postings[j].Uid == postings[i].Uid { + j++ + } + // Keep only the last posting for this UID + postings[n] = postings[j-1] + n++ + i = j + } + return postings[:n] +} + +func (txn *Txn) AddDelta(key string, input *pb.PostingList, doSortAndDedup bool, addToList bool) (*pb.PostingList, error) { + txn.cache.Lock() + defer txn.cache.Unlock() + + pl := new(pb.PostingList) + + if addToList { + prevDelta, ok := txn.cache.deltas.Get(key) + if ok { + pl.Postings = append(pl.Postings, prevDelta.Postings...) + } + } + + pl.Postings = append(pl.Postings, input.Postings...) + + if doSortAndDedup { + pl.Postings = SortAndDedupPostings(pl.Postings) + } + + newPl, err := proto.Marshal(pl) + if err != nil { + glog.Errorf("Error marshalling posting list: %v", err) + return nil, err + } + + txn.cache.deltas.AddToDeltas(key, newPl) + + list, listOk := txn.cache.plists[key] + if listOk { + list.setMutation(txn.StartTs, newPl) + } + return pl, nil +} + +func (txn *Txn) LockCache() { + txn.cache.Lock() +} + +func (txn *Txn) UnlockCache() { + txn.cache.Unlock() } // struct to implement Txn interface from vector-indexer @@ -324,8 +404,11 @@ func (o *oracle) ProcessDelta(delta *pb.OracleDelta) { for _, status := range delta.Txns { txn := o.pendingTxns[status.StartTs] if txn != nil && status.CommitTs > 0 { - for k := range txn.cache.deltas { - IncrRollup.addKeyToBatch([]byte(k), 0) + if err := txn.cache.deltas.IterateBytes(func(key string, value []byte) error { + IncrRollup.addKeyToBatch([]byte(key), 0) + return nil + }); err != nil { + glog.Errorf("ProcessDelta: error while iterating deltas for txn %d: %v", status.StartTs, err) } } delete(o.pendingTxns, status.StartTs) @@ -379,17 +462,6 @@ func (o *oracle) GetTxn(startTs uint64) *Txn { return o.pendingTxns[startTs] } -func (txn *Txn) matchesDelta(ok func(key []byte) bool) bool { - txn.Lock() - defer txn.Unlock() - for key := range txn.cache.deltas { - if ok([]byte(key)) { - return true - } - } - return false -} - // IterateTxns returns a list of start timestamps for currently pending transactions, which match // the provided function. func (o *oracle) IterateTxns(ok func(key []byte) bool) []uint64 { @@ -397,8 +469,13 @@ func (o *oracle) IterateTxns(ok func(key []byte) bool) []uint64 { defer o.RUnlock() var timestamps []uint64 for startTs, txn := range o.pendingTxns { - if txn.matchesDelta(ok) { - timestamps = append(timestamps, startTs) + if err := txn.cache.deltas.IterateBytes(func(key string, value []byte) error { + if ok([]byte(key)) { + timestamps = append(timestamps, startTs) + } + return nil + }); err != nil { + glog.Errorf("IterateTxns: error while iterating deltas for txn %d: %v", startTs, err) } } return timestamps diff --git a/query/upgrade_test.go b/query/upgrade_test.go index cbdfbf2b72e..18674c8dae8 100644 --- a/query/upgrade_test.go +++ b/query/upgrade_test.go @@ -50,7 +50,7 @@ func TestMain(m *testing.M) { WithReplicas(1).WithACL(time.Hour).WithVersion(uc.Before) c, err := dgraphtest.NewLocalCluster(conf) x.Panic(err) - defer func() { c.Cleanup(code != 0) }() + defer func() { c.Cleanup(true) }() x.Panic(c.Start()) hc, err := c.HTTPClient() diff --git a/schema/schema.go b/schema/schema.go index c6a268d4d00..e6e191e7e6e 100644 --- a/schema/schema.go +++ b/schema/schema.go @@ -1,6 +1,17 @@ /* - * SPDX-FileCopyrightText: © Hypermode Inc. - * SPDX-License-Identifier: Apache-2.0 + * Copyright 2016-2025 Hypermode Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package schema @@ -666,8 +677,7 @@ func initialTypesInternal(namespace uint64, all bool) []*pb.TypeUpdate { ValueType: pb.Posting_STRING, }, }, - }, - &pb.TypeUpdate{ + }, &pb.TypeUpdate{ TypeName: "dgraph.graphql.persisted_query", Fields: []*pb.SchemaUpdate{ { @@ -697,24 +707,23 @@ func initialTypesInternal(namespace uint64, all bool) []*pb.TypeUpdate { if all || x.WorkerConfig.AclEnabled { // These type definitions are required for deleteUser and deleteGroup GraphQL API to work // properly. - initialTypes = append(initialTypes, - &pb.TypeUpdate{ - TypeName: "dgraph.type.User", - Fields: []*pb.SchemaUpdate{ - { - Predicate: "dgraph.xid", - ValueType: pb.Posting_STRING, - }, - { - Predicate: "dgraph.password", - ValueType: pb.Posting_PASSWORD, - }, - { - Predicate: "dgraph.user.group", - ValueType: pb.Posting_UID, - }, + initialTypes = append(initialTypes, &pb.TypeUpdate{ + TypeName: "dgraph.type.User", + Fields: []*pb.SchemaUpdate{ + { + Predicate: "dgraph.xid", + ValueType: pb.Posting_STRING, + }, + { + Predicate: "dgraph.password", + ValueType: pb.Posting_PASSWORD, + }, + { + Predicate: "dgraph.user.group", + ValueType: pb.Posting_UID, }, }, + }, &pb.TypeUpdate{ TypeName: "dgraph.type.Group", Fields: []*pb.SchemaUpdate{ @@ -771,36 +780,31 @@ func CompleteInitialSchema(namespace uint64) []*pb.SchemaUpdate { func initialSchemaInternal(namespace uint64, all bool) []*pb.SchemaUpdate { var initialSchema []*pb.SchemaUpdate - initialSchema = append(initialSchema, []*pb.SchemaUpdate{ - { + initialSchema = append(initialSchema, + &pb.SchemaUpdate{ Predicate: "dgraph.type", ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX, Tokenizer: []string{"exact"}, List: true, - }, - { + }, &pb.SchemaUpdate{ Predicate: "dgraph.drop.op", ValueType: pb.Posting_STRING, - }, - { + }, &pb.SchemaUpdate{ Predicate: "dgraph.graphql.schema", ValueType: pb.Posting_STRING, - }, - { + }, &pb.SchemaUpdate{ Predicate: "dgraph.graphql.xid", ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX, Tokenizer: []string{"exact"}, Upsert: true, - }, - { + }, &pb.SchemaUpdate{ Predicate: "dgraph.graphql.p_query", ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX, Tokenizer: []string{"sha256"}, - }, - }...) + }) if namespace == x.RootNamespace { initialSchema = append(initialSchema, []*pb.SchemaUpdate{ diff --git a/types/locked_sharded_map.go b/types/locked_sharded_map.go new file mode 100644 index 00000000000..4208d7b18ba --- /dev/null +++ b/types/locked_sharded_map.go @@ -0,0 +1,185 @@ +// SPDX-FileCopyrightText: © Hypermode Inc. +// SPDX-License-Identifier: Apache-2.0 + +// Package types contains some very common utilities used by Dgraph. These utilities +// are of "miscellaneous" nature, e.g., error checking. +package types + +import ( + "sync" + + "github.com/dgryski/go-farm" +) + +// LockedShardedMap is a thread-safe, sharded map with generic key-value types. +type LockedShardedMap[K comparable, V any] struct { + shards []map[K]V + locks []sync.RWMutex +} + +// NewLockedShardedMap creates a new LockedShardedMap. +func NewLockedShardedMap[K comparable, V any]() *LockedShardedMap[K, V] { + shards := make([]map[K]V, NumShards) + locks := make([]sync.RWMutex, NumShards) + for i := range shards { + shards[i] = make(map[K]V) + } + return &LockedShardedMap[K, V]{shards: shards, locks: locks} +} + +func (s *LockedShardedMap[K, V]) getShardIndex(key K) int { + // Only works for integer-like keys (uint64 etc). For generic types, + // a better hash function is needed. We'll assume uint64 for now. + switch k := any(key).(type) { + case uint64: + return int(k % uint64(NumShards)) + case string: + return int(farm.Fingerprint64([]byte(k)) % uint64(NumShards)) + default: + panic("LockedShardedMap only supports uint64 and string keys for now") + } +} + +func (s *LockedShardedMap[K, V]) Set(key K, value V) { + if s == nil { + return + } + index := s.getShardIndex(key) + s.locks[index].Lock() + defer s.locks[index].Unlock() + s.shards[index][key] = value +} + +func (s *LockedShardedMap[K, V]) Get(key K) (V, bool) { + var zero V + if s == nil { + return zero, false + } + index := s.getShardIndex(key) + s.locks[index].RLock() + defer s.locks[index].RUnlock() + val, ok := s.shards[index][key] + return val, ok +} + +func (s *LockedShardedMap[K, V]) Update(key K, update func(V, bool) V) { + if s == nil { + return + } + index := s.getShardIndex(key) + s.locks[index].Lock() + defer s.locks[index].Unlock() + val, ok := s.shards[index][key] + s.shards[index][key] = update(val, ok) +} + +func (s *LockedShardedMap[K, V]) Merge(other *LockedShardedMap[K, V], ag func(a, b V) V) { + var wg sync.WaitGroup + for i := range s.shards { + wg.Add(1) + go func(i int) { + defer wg.Done() + otherShard := other.shards[i] + for k, v := range otherShard { + s.locks[i].Lock() + if existing, ok := s.shards[i][k]; ok { + s.shards[i][k] = ag(existing, v) + } else { + s.shards[i][k] = v + } + s.locks[i].Unlock() + } + }(i) + } + wg.Wait() +} + +func (s *LockedShardedMap[K, V]) Len() int { + if s == nil { + return 0 + } + var count int + for i := range s.shards { + s.locks[i].RLock() + count += len(s.shards[i]) + s.locks[i].RUnlock() + } + return count +} + +func (s *LockedShardedMap[K, V]) ParallelIterate(f func(K, V) error) error { + if s == nil { + return nil + } + + var ( + wg sync.WaitGroup + errCh = make(chan error, 1) + once sync.Once + ) + + for i := range s.shards { + wg.Add(1) + go func(i int) { + defer wg.Done() + + s.locks[i].RLock() + defer s.locks[i].RUnlock() + + for k, v := range s.shards[i] { + if err := f(k, v); err != nil { + once.Do(func() { + errCh <- err + }) + return + } + } + }(i) + } + + // Wait in a separate goroutine so we can still select on errCh. + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case err := <-errCh: + return err + case <-done: + return nil + } +} + +func (s *LockedShardedMap[K, V]) Iterate(f func(K, V) error) error { + if s == nil { + return nil + } + for i := range s.shards { + s.locks[i].RLock() + for k, v := range s.shards[i] { + if err := f(k, v); err != nil { + s.locks[i].RUnlock() + return err + } + } + s.locks[i].RUnlock() + } + return nil +} + +func (s *LockedShardedMap[K, V]) IsEmpty() bool { + if s == nil { + return true + } + for i := range s.shards { + s.locks[i].RLock() + if len(s.shards[i]) > 0 { + s.locks[i].RUnlock() + return false + } + s.locks[i].RUnlock() + } + return true +} diff --git a/worker/draft.go b/worker/draft.go index 97a7ea9d43e..01fdeafddb6 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -516,6 +516,22 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr m := proposal.Mutations + txn := posting.Oracle().RegisterStartTs(m.StartTs) + if txn.ShouldAbort() { + span.AddEvent("Txn should abort.", trace.WithAttributes( + attribute.Int64("start_ts", int64(m.StartTs)), + )) + return x.ErrConflict + } + // Discard the posting lists from cache to release memory at the end. + defer txn.Update() + + featureFlag := true + if featureFlag { + mp := posting.NewMutationPipeline(txn) + return mp.Process(ctx, m.Edges) + } + // It is possible that the user gives us multiple versions of the same edge, one with no facets // and another with facets. In that case, use stable sort to maintain the ordering given to us // by the user. @@ -528,16 +544,6 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr return ei.GetEntity() < ej.GetEntity() }) - txn := posting.Oracle().RegisterStartTs(m.StartTs) - if txn.ShouldAbort() { - span.AddEvent("Txn should abort.", trace.WithAttributes( - attribute.Int64("start_ts", int64(m.StartTs)), - )) - return x.ErrConflict - } - // Discard the posting lists from cache to release memory at the end. - defer txn.Update() - process := func(edges []*pb.DirectedEdge) error { var retries int for _, edge := range edges { diff --git a/worker/draft_test.go b/worker/draft_test.go index e0c3762c483..a1a32b95bf9 100644 --- a/worker/draft_test.go +++ b/worker/draft_test.go @@ -38,6 +38,107 @@ func getEntryForCommit(index, startTs, commitTs uint64) raftpb.Entry { return raftpb.Entry{Index: index, Term: 1, Type: raftpb.EntryNormal, Data: data} } +// func BenchmarkProcessListIndex(b *testing.B) { +// dir, err := os.MkdirTemp("", "storetest_") +// x.Check(err) +// defer os.RemoveAll(dir) + +// opt := badger.DefaultOptions(dir) +// ps, err := badger.OpenManaged(opt) +// x.Check(err) +// pstore = ps +// // Not using posting list cache +// posting.Init(ps, 0, false) +// Init(ps) +// err = schema.ParseBytes([]byte("testAttr: [string] @index(exact) ."), 1) +// require.NoError(b, err) + +// ctx := context.Background() +// pipeline := &PredicatePipeline{ +// attr: "0-testAttr", +// edges: make(chan *pb.DirectedEdge, 1000), +// wg: &sync.WaitGroup{}, +// errCh: make(chan error, 1), +// } + +// txn := posting.Oracle().RegisterStartTs(5) +// mp := &MutationPipeline{txn: txn} + +// // Generate 1000 edges +// populatePipeline := func() { +// pipeline = &PredicatePipeline{ +// attr: "0-testAttr", +// edges: make(chan *pb.DirectedEdge, 1000), +// wg: &sync.WaitGroup{}, +// errCh: make(chan error, 1), +// } + +// txn = posting.Oracle().RegisterStartTs(5) +// mp = &MutationPipeline{txn: txn} + +// for i := 0; i < 1000; i++ { +// edge := &pb.DirectedEdge{ +// Entity: uint64(i + 1), +// Attr: "0-testAttr", +// Value: []byte(fmt.Sprintf("value%d", rand.Intn(1000))), +// ValueType: pb.Posting_STRING, +// Op: pb.DirectedEdge_SET, +// } +// pipeline.edges <- edge +// } +// } + +// b.ResetTimer() + +// b.Run("Baseline", func(b *testing.B) { +// for i := 0; i < b.N; i++ { +// populatePipeline() +// } +// }) + +// b.Run("DefaultPipeline", func(b *testing.B) { +// for i := 0; i < b.N; i++ { +// populatePipeline() +// var wg sync.WaitGroup +// wg.Add(1) +// go func() { +// mp.DefaultPipeline(ctx, pipeline) +// wg.Done() +// }() +// close(pipeline.edges) +// wg.Wait() +// } +// }) + +// b.Run("ProcessListWithoutIndex", func(b *testing.B) { +// for i := 0; i < b.N; i++ { +// populatePipeline() +// var wg sync.WaitGroup +// wg.Add(1) +// go func() { +// mp.ProcessListWithoutIndex(ctx, pipeline) +// wg.Done() +// }() +// close(pipeline.edges) +// wg.Wait() +// } +// }) + +// b.Run("ProcessListIndex", func(b *testing.B) { +// for i := 0; i < b.N; i++ { +// populatePipeline() +// var wg sync.WaitGroup +// wg.Add(1) +// go func() { +// mp.ProcessListIndex(ctx, pipeline) +// wg.Done() +// }() +// close(pipeline.edges) +// wg.Wait() +// } +// }) +// } + func TestCalculateSnapshot(t *testing.T) { dir := t.TempDir() ds := raftwal.Init(dir) diff --git a/worker/mutation.go b/worker/mutation.go index 47ef32b5c9e..5a63f80daf1 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -55,6 +55,16 @@ func isDeletePredicateEdge(edge *pb.DirectedEdge) bool { return edge.Entity == 0 && isStarAll(edge.Value) } +func newRunMutations(ctx context.Context, edges []*pb.DirectedEdge, txn *posting.Txn) error { + mp := posting.NewMutationPipeline(txn) + return mp.Process(ctx, edges) +} + +func newRunMutation(ctx context.Context, edge *pb.DirectedEdge, txn *posting.Txn) error { + mp := posting.NewMutationPipeline(txn) + return mp.Process(ctx, []*pb.DirectedEdge{edge}) +} + // runMutation goes through all the edges and applies them. func runMutation(ctx context.Context, edge *pb.DirectedEdge, txn *posting.Txn) error { ctx = schema.GetWriteContext(ctx) diff --git a/worker/mutation_unit_test.go b/worker/mutation_unit_test.go index d734290857b..73b89163cf9 100644 --- a/worker/mutation_unit_test.go +++ b/worker/mutation_unit_test.go @@ -47,8 +47,8 @@ func TestReverseEdge(t *testing.T) { Op: pb.DirectedEdge_DEL, } - x.Check(runMutation(ctx, edge, txn)) - x.Check(runMutation(ctx, edge, txn)) + x.Check(newRunMutation(ctx, edge, txn)) + x.Check(newRunMutation(ctx, edge, txn)) pl, err := txn.Get(x.DataKey(attr, 1)) require.NoError(t, err) @@ -90,7 +90,7 @@ func TestReverseEdgeSetDel(t *testing.T) { Entity: 1, Op: pb.DirectedEdge_SET, } - + edgeSet2 := &pb.DirectedEdge{ ValueId: 2, Attr: attr, @@ -98,7 +98,6 @@ func TestReverseEdgeSetDel(t *testing.T) { Op: pb.DirectedEdge_SET, } - edgeSet3 := &pb.DirectedEdge{ ValueId: 2, Attr: attr, @@ -106,11 +105,10 @@ func TestReverseEdgeSetDel(t *testing.T) { Op: pb.DirectedEdge_SET, } - - x.Check(runMutation(ctx, edgeSet1, txn)) - x.Check(runMutation(ctx, edgeSet2, txn)) - x.Check(runMutation(ctx, edgeSet3, txn)) - x.Check(runMutation(ctx, edgeDel, txn)) + x.Check(newRunMutation(ctx, edgeSet1, txn)) + x.Check(newRunMutation(ctx, edgeSet2, txn)) + x.Check(newRunMutation(ctx, edgeSet3, txn)) + x.Check(newRunMutation(ctx, edgeDel, txn)) pl, err := txn.Get(x.ReverseKey(attr, 2)) require.NoError(t, err) diff --git a/worker/sort_test.go b/worker/sort_test.go index 2a1e50dd612..6d6ca99ba24 100644 --- a/worker/sort_test.go +++ b/worker/sort_test.go @@ -8,8 +8,10 @@ package worker import ( "context" "fmt" + "math" "math/rand" "os" + "sync" "testing" "github.com/dgraph-io/badger/v4" @@ -17,7 +19,10 @@ import ( "github.com/hypermodeinc/dgraph/v25/posting" "github.com/hypermodeinc/dgraph/v25/protos/pb" "github.com/hypermodeinc/dgraph/v25/schema" + "github.com/hypermodeinc/dgraph/v25/tok" + "github.com/hypermodeinc/dgraph/v25/types" "github.com/hypermodeinc/dgraph/v25/x" + "github.com/pkg/errors" "github.com/stretchr/testify/require" ) @@ -90,6 +95,319 @@ func TestEmptyTypeSchema(t *testing.T) { x.ParseNamespaceAttr(types[0].TypeName) } +func TestDatetime(t *testing.T) { + // Setup temporary directory for Badger DB + dir, err := os.MkdirTemp("", "storetest_") + require.NoError(t, err) + defer os.RemoveAll(dir) + + opt := badger.DefaultOptions(dir) + ps, err := badger.OpenManaged(opt) + require.NoError(t, err) + posting.Init(ps, 0, false) + Init(ps) + + // Set schema + schemaTxt := ` + t: datetime @index(year) . + ` + err = schema.ParseBytes([]byte(schemaTxt), 1) + require.NoError(t, err) + + ctx := context.Background() + newRunMutation := func(startTs, commitTs uint64, edges []*pb.DirectedEdge) { + txn := posting.Oracle().RegisterStartTs(startTs) + for _, edge := range edges { + require.NoError(t, newRunMutation(ctx, edge, txn)) + } + txn.Update() + writer := posting.NewTxnWriter(ps) + require.NoError(t, txn.CommitToDisk(writer, commitTs)) + require.NoError(t, writer.Flush()) + txn.UpdateCachedKeys(commitTs) + } + + newRunMutation(1, 3, []*pb.DirectedEdge{ + { + Entity: 1, + Attr: x.AttrInRootNamespace("t"), + Value: []byte("2020-01-01T00:00:00Z"), + ValueType: pb.Posting_DEFAULT, + Op: pb.DirectedEdge_SET, + }, + }) + +} + +type indexMutationInfo struct { + tokenizers []tok.Tokenizer + factorySpecs []*tok.FactoryCreateSpec + edge *pb.DirectedEdge // Represents the original uid -> value edge. + val types.Val + op pb.DirectedEdge_Op +} + +func indexTokens(ctx context.Context, info *indexMutationInfo) ([]string, error) { + attr := info.edge.Attr + lang := info.edge.GetLang() + + schemaType, err := schema.State().TypeOf(attr) + if err != nil || !schemaType.IsScalar() { + return nil, errors.Errorf("Cannot index attribute %s of type object.", attr) + } + + if !schema.State().IsIndexed(ctx, attr) { + return nil, errors.Errorf("Attribute %s is not indexed.", attr) + } + sv, err := types.Convert(info.val, schemaType) + if err != nil { + return nil, errors.Wrap(err, "Cannot convert value to scalar type") + } + + var tokens []string + for _, it := range info.tokenizers { + toks, err := tok.BuildTokens(sv.Value, tok.GetTokenizerForLang(it, lang)) + if err != nil { + return tokens, errors.Wrapf(err, "Cannot build tokens for attribute %s", attr) + } + tokens = append(tokens, toks...) + } + return tokens, nil +} + +func TestStringIndexWithLang(t *testing.T) { + // Setup temporary directory for Badger DB + dir, err := os.MkdirTemp("", "storetest_") + require.NoError(t, err) + defer os.RemoveAll(dir) + + opt := badger.DefaultOptions(dir) + ps, err := badger.OpenManaged(opt) + require.NoError(t, err) + posting.Init(ps, 0, false) + Init(ps) + + // Set schema + schemaTxt := ` + name: string @index(fulltext, trigram, term, exact) @lang . + ` + + err = schema.ParseBytes([]byte(schemaTxt), 1) + require.NoError(t, err) + + ctx := context.Background() + newRunMutation := func(startTs, commitTs uint64, edges []*pb.DirectedEdge) { + txn := posting.Oracle().RegisterStartTs(startTs) + require.NoError(t, newRunMutations(ctx, edges, txn)) + txn.Update() + writer := posting.NewTxnWriter(ps) + require.NoError(t, txn.CommitToDisk(writer, commitTs)) + require.NoError(t, writer.Flush()) + txn.UpdateCachedKeys(commitTs) + } + + attr := x.AttrInRootNamespace("name") + + // Prepare 400 mutations across 4 threads, 100 per thread (kept modest for stability). + const ( + threads = 10 + perThread = 20000 + total = threads * perThread + baseStartTs = uint64(10) + ) + + // uid -> value map + values := make(map[uint64]string, total) + for i := 0; i < total; i++ { + uid := uint64(i + 1) + // Simple deterministic values with shared tokens and unique numbers. + values[uid] = fmt.Sprintf("title %d", i+1) + } + + // Build expected token -> set of uids + tokenizers := schema.State().Tokenizer(ctx, attr) + expected := make(map[string]map[uint64]struct{}, total) + for uid, val := range values { + info := &indexMutationInfo{ + tokenizers: tokenizers, + op: pb.DirectedEdge_SET, + val: types.Val{Tid: types.StringID, Value: []byte(val)}, + edge: &pb.DirectedEdge{ + Attr: attr, + Value: []byte(val), + Lang: "en", + Op: pb.DirectedEdge_SET, + }, + } + toks, err := indexTokens(ctx, info) + require.NoError(t, err) + for _, tk := range toks { + if expected[tk] == nil { + expected[tk] = make(map[uint64]struct{}) + } + expected[tk][uid] = struct{}{} + } + } + + // Run 4 threads; each thread writes 100 edges with distinct ts + var wg sync.WaitGroup + wg.Add(threads) + for th := 0; th < threads; th++ { + th := th + go func() { + defer wg.Done() + start := th*perThread + 1 + end := start + perThread + edges := make([]*pb.DirectedEdge, 0, perThread) + for i := start; i < end; i++ { + uid := uint64(i) + edges = append(edges, &pb.DirectedEdge{ + Entity: uid, + Attr: attr, + Value: []byte(values[uid]), + ValueType: pb.Posting_DEFAULT, + Lang: "en", + Op: pb.DirectedEdge_SET, + }) + } + sTs := baseStartTs + uint64(th*10) + cTs := sTs + 2 + newRunMutation(sTs, cTs, edges) + }() + } + wg.Wait() + + // Verify all tokens have the expected UIDs. + readTs := baseStartTs + uint64(threads*10) + 10 + for tk, uidset := range expected { + key := x.IndexKey(attr, tk) + txn := posting.Oracle().RegisterStartTs(readTs) + pl, err := txn.Get(key) + require.NoError(t, err) + lst, err := pl.Uids(posting.ListOptions{ReadTs: readTs}) + require.NoError(t, err) + got := make(map[uint64]struct{}, len(lst.Uids)) + for _, u := range lst.Uids { + got[u] = struct{}{} + } + // Compare sets + require.Equal(t, len(uidset), len(got), "mismatch uid count for token %q", tk) + for u := range uidset { + if _, ok := got[u]; !ok { + t.Fatalf("missing uid %d for token %q", u, tk) + } + } + } +} + +func TestCount(t *testing.T) { + t.Skip() + // Setup temporary directory for Badger DB + dir, err := os.MkdirTemp("", "storetest_") + require.NoError(t, err) + defer os.RemoveAll(dir) + + opt := badger.DefaultOptions(dir) + ps, err := badger.OpenManaged(opt) + require.NoError(t, err) + posting.Init(ps, 0, false) + Init(ps) + + // Set schema + schemaTxt := ` + friends: [uid] @count . + ` + + err = schema.ParseBytes([]byte(schemaTxt), 1) + require.NoError(t, err) + ctx := context.Background() + newRunMutation := func(startTs, commitTs uint64, edges []*pb.DirectedEdge) { + txn := posting.Oracle().RegisterStartTs(startTs) + require.NoError(t, newRunMutations(ctx, edges, txn)) + txn.Update() + writer := posting.NewTxnWriter(ps) + require.NoError(t, txn.CommitToDisk(writer, commitTs)) + require.NoError(t, writer.Flush()) + txn.UpdateCachedKeys(commitTs) + } + + pred := x.AttrInRootNamespace("friends") + + // Prepare mutations such that each subject gets multiple uid edges, and + // each edge is added from a different thread. We also send multiple + // batches per thread. + const ( + subjects = 10 // total number of subjects/entities + edgesPer = 5 // number of edges per subject + threads = 2 // one thread per edge ordinal, touching all subjects + baseStartTs = uint64(10) + total = subjects * edgesPer + ) + + // 1) Pre-generate all mutations into one big slice + edgesAll := make([]*pb.DirectedEdge, 0, total) + for subj := 1; subj <= subjects; subj++ { + uid := uint64(subj) + for e := 0; e < edgesPer; e++ { + // Unique object per (subject, edge-ordinal) pair to avoid duplicates. + // Ensures exactly 'edgesPer' distinct UIDs per subject. + obj := uint64(1_000_000 + subj*100 + e) + edgesAll = append(edgesAll, &pb.DirectedEdge{ + Entity: uid, + Attr: pred, + ValueId: obj, + ValueType: pb.Posting_UID, + Op: pb.DirectedEdge_SET, + }) + } + } + + // Shuffle the edges to simulate randomness (determinism depends on rand.Seed in package scope) + for i := range edgesAll { + j := rand.Intn(i + 1) + edgesAll[i], edgesAll[j] = edgesAll[j], edgesAll[i] + } + + // 2) Dispatch pre-generated mutations into threads, in multiple batches per thread + var wg sync.WaitGroup + wg.Add(threads) + for th := 0; th < threads; th++ { + th := th + go func() { + defer wg.Done() + // Split each thread's disjoint chunk into multiple batches/transactions + const batches = 5 + chunk := total / threads + chunkStart := th * chunk + chunkEnd := chunkStart + chunk + perBatch := chunk / batches + for b := 0; b < batches; b++ { + batchStart := chunkStart + b*perBatch + batchEnd := batchStart + perBatch + if b == batches-1 { + batchEnd = chunkEnd + } + batch := edgesAll[batchStart:batchEnd] + // Space out start/commit timestamps per thread and per batch to avoid collisions + sTs := baseStartTs + uint64(th*100) + uint64(b*2) + cTs := sTs + 1 + newRunMutation(sTs, cTs, batch) + } + }() + } + wg.Wait() + + // Verify the @count index for the exact number of edges per subject. + countKey := x.CountKey(pred, edgesPer, false) + txn := posting.Oracle().RegisterStartTs(math.MaxUint64) + pl, err := txn.Get(countKey) + require.NoError(t, err) + uids, err := pl.Uids(posting.ListOptions{ReadTs: math.MaxUint64}) + require.NoError(t, err) + fmt.Println(uids.Uids) + require.Equal(t, subjects, len(uids.Uids)) +} + func TestDeleteSetWithVarEdgeCorruptsData(t *testing.T) { // Setup temporary directory for Badger DB dir, err := os.MkdirTemp("", "storetest_") @@ -119,10 +437,10 @@ func TestDeleteSetWithVarEdgeCorruptsData(t *testing.T) { uidRoom := uint64(1) uidJohn := uint64(2) - runMutation := func(startTs, commitTs uint64, edges []*pb.DirectedEdge) { + newRunMutation := func(startTs, commitTs uint64, edges []*pb.DirectedEdge) { txn := posting.Oracle().RegisterStartTs(startTs) for _, edge := range edges { - require.NoError(t, runMutation(ctx, edge, txn)) + require.NoError(t, newRunMutation(ctx, edge, txn)) } txn.Update() writer := posting.NewTxnWriter(ps) @@ -132,7 +450,7 @@ func TestDeleteSetWithVarEdgeCorruptsData(t *testing.T) { } // Initial mutation: Set John → Leopard - runMutation(1, 3, []*pb.DirectedEdge{ + newRunMutation(1, 3, []*pb.DirectedEdge{ { Entity: uidJohn, Attr: attrPerson, @@ -162,7 +480,7 @@ func TestDeleteSetWithVarEdgeCorruptsData(t *testing.T) { // Second mutation: Remove John from Leopard, assign Amanda uidAmanda := uint64(3) - runMutation(6, 8, []*pb.DirectedEdge{ + newRunMutation(6, 8, []*pb.DirectedEdge{ { Entity: uidJohn, Attr: attrOffice, @@ -225,7 +543,7 @@ func TestGetScalarList(t *testing.T) { runM := func(startTs, commitTs uint64, edges []*pb.DirectedEdge) { txn := posting.Oracle().RegisterStartTs(startTs) for _, edge := range edges { - x.Check(runMutation(context.Background(), edge, txn)) + x.Check(newRunMutation(context.Background(), edge, txn)) } txn.Update() writer := posting.NewTxnWriter(pstore) @@ -283,7 +601,7 @@ func TestMultipleTxnListCount(t *testing.T) { runM := func(startTs, commitTs uint64, edges []*pb.DirectedEdge) { txn := posting.Oracle().RegisterStartTs(startTs) for _, edge := range edges { - x.Check(runMutation(ctx, edge, txn)) + x.Check(newRunMutation(ctx, edge, txn)) } txn.Update() writer := posting.NewTxnWriter(pstore) @@ -342,7 +660,7 @@ func TestScalarPredicateRevCount(t *testing.T) { runM := func(startTs, commitTs uint64, edges []*pb.DirectedEdge) { txn := posting.Oracle().RegisterStartTs(startTs) for _, edge := range edges { - x.Check(runMutation(ctx, edge, txn)) + x.Check(newRunMutation(ctx, edge, txn)) } txn.Update() writer := posting.NewTxnWriter(pstore) @@ -423,7 +741,7 @@ func TestScalarPredicateIntCount(t *testing.T) { runM := func(startTs, commitTs uint64, edge *pb.DirectedEdge) { txn := posting.Oracle().RegisterStartTs(startTs) - x.Check(runMutation(ctx, edge, txn)) + x.Check(newRunMutation(ctx, edge, txn)) txn.Update() writer := posting.NewTxnWriter(pstore) require.NoError(t, txn.CommitToDisk(writer, commitTs)) @@ -477,7 +795,7 @@ func TestScalarPredicateCount(t *testing.T) { runM := func(startTs, commitTs uint64, edge *pb.DirectedEdge) { txn := posting.Oracle().RegisterStartTs(startTs) - x.Check(runMutation(ctx, edge, txn)) + x.Check(newRunMutation(ctx, edge, txn)) txn.Update() writer := posting.NewTxnWriter(pstore) require.NoError(t, txn.CommitToDisk(writer, commitTs)) @@ -531,7 +849,7 @@ func TestSingleUidReplacement(t *testing.T) { attr := x.AttrInRootNamespace("singleUidReplaceTest") // Txn 1. Set 1 -> 2 - x.Check(runMutation(ctx, &pb.DirectedEdge{ + x.Check(newRunMutation(ctx, &pb.DirectedEdge{ ValueId: 2, Attr: attr, Entity: 1, @@ -547,7 +865,7 @@ func TestSingleUidReplacement(t *testing.T) { // Txn 2. Set 1 -> 3 txn = posting.Oracle().RegisterStartTs(9) - x.Check(runMutation(ctx, &pb.DirectedEdge{ + x.Check(newRunMutation(ctx, &pb.DirectedEdge{ ValueId: 3, Attr: attr, Entity: 1, @@ -591,14 +909,14 @@ func TestSingleString(t *testing.T) { attr := x.AttrInRootNamespace("singleUidTest") // Txn 1. Set 1 -> david 2 -> blush - x.Check(runMutation(ctx, &pb.DirectedEdge{ + x.Check(newRunMutation(ctx, &pb.DirectedEdge{ Value: []byte("david"), Attr: attr, Entity: 1, Op: pb.DirectedEdge_SET, }, txn)) - x.Check(runMutation(ctx, &pb.DirectedEdge{ + x.Check(newRunMutation(ctx, &pb.DirectedEdge{ Value: []byte("blush"), Attr: attr, Entity: 2, @@ -614,14 +932,14 @@ func TestSingleString(t *testing.T) { // Txn 2. Set 2 -> david 1 -> blush txn = posting.Oracle().RegisterStartTs(9) - x.Check(runMutation(ctx, &pb.DirectedEdge{ + x.Check(newRunMutation(ctx, &pb.DirectedEdge{ Value: []byte("david"), Attr: attr, Entity: 2, Op: pb.DirectedEdge_SET, }, txn)) - x.Check(runMutation(ctx, &pb.DirectedEdge{ + x.Check(newRunMutation(ctx, &pb.DirectedEdge{ Value: []byte("blush"), Attr: attr, Entity: 1, @@ -693,7 +1011,7 @@ func TestLangExact(t *testing.T) { Lang: "en", } - x.Check(runMutation(ctx, edge, txn)) + x.Check(newRunMutation(ctx, edge, txn)) edge = &pb.DirectedEdge{ Value: []byte("hindi"), @@ -703,7 +1021,7 @@ func TestLangExact(t *testing.T) { Lang: "hi", } - x.Check(runMutation(ctx, edge, txn)) + x.Check(newRunMutation(ctx, edge, txn)) txn.Update() writer := posting.NewTxnWriter(pstore) @@ -745,7 +1063,6 @@ func BenchmarkAddMutationWithIndex(b *testing.B) { posting.Init(ps, 0, false) Init(ps) err = schema.ParseBytes([]byte("benchmarkadd: string @index(term) ."), 1) - fmt.Println(err) if err != nil { panic(err) } @@ -767,7 +1084,7 @@ func BenchmarkAddMutationWithIndex(b *testing.B) { Op: pb.DirectedEdge_SET, } - x.Check(runMutation(ctx, edge, txn)) + x.Check(newRunMutation(ctx, edge, txn)) } } diff --git a/worker/task.go b/worker/task.go index ba7e859572f..aaaeae15f13 100644 --- a/worker/task.go +++ b/worker/task.go @@ -462,6 +462,8 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er Value: p.Value, } } + pk, _ := x.Parse(key) + fmt.Println("READING SINGLE", pk, vals, pl) } else { pl, err := qs.cache.Get(key) if err != nil { @@ -481,6 +483,8 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er } vals, fcs, err = retrieveValuesAndFacets(args, pl, facetsTree, listType) + pk, _ := x.Parse(key) + fmt.Println("READING", pk, vals, fcs, pl.Print()) switch { case err == posting.ErrNoValue || (err == nil && len(vals) == 0):