From b6b993d665dcf77701b0adf1b1155fb29243be6f Mon Sep 17 00:00:00 2001 From: yanghengfei Date: Thu, 28 Apr 2022 19:08:06 +0800 Subject: [PATCH 01/23] feat: add trunk compress for storedFields --- load.go | 28 ++++++++++ merge.go | 144 ++++++++++++++++++++++++++++++++++-------------- new.go | 41 +++++++++++--- read.go | 34 +++++++++--- segment.go | 13 +++-- zinc_chunker.go | 75 +++++++++++++++++++++++++ 6 files changed, 272 insertions(+), 63 deletions(-) create mode 100644 zinc_chunker.go diff --git a/load.go b/load.go index 97b041f..c20a8cb 100644 --- a/load.go +++ b/load.go @@ -55,6 +55,11 @@ func load(data *segment.Data) (*Segment, error) { return nil, err } + err = rv.loadStoredFieldTrunk() + if err != nil { + return nil, err + } + err = rv.loadDvReaders() if err != nil { return nil, err @@ -128,3 +133,26 @@ func (s *Segment) loadFields() error { } return nil } + +func (s *Segment) loadStoredFieldTrunk() error { + // read trunk num + trunkOffsetPos := int(s.footer.storedIndexOffset - 4) // uint32 + trunkData, err := s.data.Read(trunkOffsetPos, trunkOffsetPos+4) + if err != nil { + return err + } + trunkNum := binary.BigEndian.Uint32(trunkData) + // read trunk offsets + trunkOffsetPos -= 8 * int(trunkNum) + trunkData, err = s.data.Read(trunkOffsetPos, trunkOffsetPos+int(8*trunkNum)) + if err != nil { + return err + } + s.storedFieldTrunkOffset = make(map[int]uint64, trunkNum) + for i := 0; i < int(trunkNum); i++ { + offset := binary.BigEndian.Uint64(trunkData[i*8 : i*8+8]) + s.storedFieldTrunkOffset[i] = offset + } + + return nil +} diff --git a/merge.go b/merge.go index a65264c..261af59 100644 --- a/merge.go +++ b/merge.go @@ -20,6 +20,7 @@ import ( "encoding/binary" "fmt" "io" + "log" "math" "sort" @@ -639,6 +640,9 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, vdc := visitDocumentCtxPool.Get().(*visitDocumentCtx) defer visitDocumentCtxPool.Put(vdc) + // zinc trunk + trunkWriter := NewZincTrunker(w) + // for each segment for segI, seg := range segments { // check for the closure in meantime @@ -654,7 +658,7 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, // segments and there are no deletions, via byte-copying // of stored docs bytes directly to the writer if fieldsSame && (dropsI == nil || dropsI.GetCardinality() == 0) { - err := seg.copyStoredDocs(newDocNum, docNumOffsets, w) + err := seg.copyStoredDocs(newDocNum, docNumOffsets, trunkWriter) if err != nil { return 0, nil, err } @@ -670,7 +674,7 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, var err2 error newDocNum, err2 = mergeStoredAndRemapSegment(seg, dropsI, segNewDocNums, newDocNum, &metaBuf, data, - fieldsInv, vals, vdc, fieldsMap, metaEncode, compressed, docNumOffsets, w) + fieldsInv, vals, vdc, fieldsMap, metaEncode, compressed, docNumOffsets, trunkWriter) if err2 != nil { return 0, nil, err2 } @@ -678,6 +682,21 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, newDocNums = append(newDocNums, segNewDocNums) } + // zinc trunk + trunkWriter.Flush() + // write chunk offsets + for _, offset := range trunkWriter.Offsets() { + err = binary.Write(w, binary.BigEndian, offset) + if err != nil { + return 0, nil, err + } + } + // write chunk num + err = binary.Write(w, binary.BigEndian, uint32(trunkWriter.Len())) + if err != nil { + return 0, nil, err + } + // return value is the start of the stored index storedIndexOffset = uint64(w.Count()) @@ -695,7 +714,7 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, func mergeStoredAndRemapSegment(seg *Segment, dropsI *roaring.Bitmap, segNewDocNums []uint64, newDocNum uint64, metaBuf *bytes.Buffer, data []byte, fieldsInv []string, vals [][][]byte, vdc *visitDocumentCtx, fieldsMap map[string]uint16, metaEncode func(val uint64) (int, error), compressed []byte, docNumOffsets []uint64, - w *countHashWriter) (uint64, error) { + trunkWriter *zincTrunker) (uint64, error) { // for each doc num for docNum := uint64(0); docNum < seg.footer.numDocs; docNum++ { // TODO: roaring's API limits docNums to 32-bits? @@ -737,29 +756,36 @@ func mergeStoredAndRemapSegment(seg *Segment, dropsI *roaring.Bitmap, segNewDocN metaBytes := metaBuf.Bytes() - compressed = snappy.Encode(compressed[:cap(compressed)], data) + // compressed = snappy.Encode(compressed[:cap(compressed)], data) // record where we're about to start writing - docNumOffsets[newDocNum] = uint64(w.Count()) + docNumOffsets[newDocNum] = uint64(trunkWriter.BufferSize()) // write out the meta len and compressed data len - err = writeUvarints(w, + err = writeUvarints(trunkWriter, uint64(len(metaBytes)), uint64(len(compressed))) if err != nil { return 0, err } // now write the meta - _, err = w.Write(metaBytes) + // _, err = w.Write(metaBytes) + _, err = trunkWriter.Write(metaBytes) if err != nil { return 0, err } // now write the compressed data - _, err = w.Write(compressed) + // _, err = w.Write(compressed) + _, err = trunkWriter.Write(data) if err != nil { return 0, err } + // trunk line + if err := trunkWriter.NewLine(); err != nil { + return 0, err + } + newDocNum++ } return newDocNum, nil @@ -768,47 +794,81 @@ func mergeStoredAndRemapSegment(seg *Segment, dropsI *roaring.Bitmap, segNewDocN // copyStoredDocs writes out a segment's stored doc info, optimized by // using a single Write() call for the entire set of bytes. The // newDocNumOffsets is filled with the new offsets for each doc. -func (s *Segment) copyStoredDocs(newDocNum uint64, newDocNumOffsets []uint64, - w *countHashWriter) error { +func (s *Segment) copyStoredDocs(newDocNum uint64, newDocNumOffsets []uint64, myTrunk *zincTrunker) error { if s.footer.numDocs <= 0 { return nil } - indexOffset0, storedOffset0, err := s.getDocStoredOffsetsOnly(0) // the segment's first doc - if err != nil { - return err - } - - indexOffsetN, storedOffsetN, readN, metaLenN, dataLenN, err := - s.getDocStoredOffsets(s.footer.numDocs - 1) // the segment's last doc - if err != nil { - return err - } - - storedOffset0New := uint64(w.Count()) - - storedBytesData, err := s.data.Read(int(storedOffset0), int(storedOffsetN+readN+metaLenN+dataLenN)) - if err != nil { - return err - } - storedBytes := storedBytesData - _, err = w.Write(storedBytes) - if err != nil { - return err - } - - // remap the storedOffset's for the docs into new offsets relative - // to storedOffset0New, filling the given docNumOffsetsOut array - for indexOffset := indexOffset0; indexOffset <= indexOffsetN; indexOffset += fileAddrWidth { - storedOffsetData, err := s.data.Read(int(indexOffset), int(indexOffset+fileAddrWidth)) + // visit documents and rewrite to trunk + uncompressed := make([]byte, 0) + for i := 0; i < len(s.storedFieldTrunkOffset)-1; i++ { + trunkOffstart := s.storedFieldTrunkOffset[i] + trunkOffend := s.storedFieldTrunkOffset[i+1] + if trunkOffstart == trunkOffend { + continue + } + compressed, err := s.data.Read(int(trunkOffstart), int(trunkOffend)) if err != nil { return err } - storedOffset := binary.BigEndian.Uint64(storedOffsetData) - storedOffsetNew := storedOffset - storedOffset0 + storedOffset0New - newDocNumOffsets[newDocNum] = storedOffsetNew // PANIC - newDocNum++ - } + uncompressed, err = snappy.Decode(uncompressed[:cap(uncompressed)], compressed) + if err != nil { + log.Panic(err) + return err + } + storedOffset := 0 + n := 0 + for storedOffset < len(uncompressed) { + n = 0 + metaLenData := uncompressed[storedOffset : storedOffset+int(binary.MaxVarintLen64)] + metaLen, read := binary.Uvarint(metaLenData) + n += read + dataLenData := uncompressed[storedOffset+n : storedOffset+n+int(binary.MaxVarintLen64)] + dataLen, read := binary.Uvarint(dataLenData) + n += read + newDocNumOffsets[newDocNum] = uint64(myTrunk.BufferSize()) + myTrunk.Write(uncompressed[storedOffset : storedOffset+n+int(metaLen)+int(dataLen)]) + myTrunk.NewLine() + storedOffset += n + int(metaLen+dataLen) + newDocNum++ + } + } + + // indexOffset0, storedOffset0, err := s.getDocStoredOffsetsOnly(0) // the segment's first doc + // if err != nil { + // return err + // } + + // indexOffsetN, storedOffsetN, readN, metaLenN, dataLenN, err := + // s.getDocStoredOffsets(s.footer.numDocs - 1) // the segment's last doc + // if err != nil { + // return err + // } + + // storedOffset0New := uint64(w.Count()) + + // storedBytesData, err := s.data.Read(int(storedOffset0), int(storedOffsetN+readN+metaLenN+dataLenN)) + // if err != nil { + // return err + // } + // storedBytes := storedBytesData + // _, err = w.Write(storedBytes) + // if err != nil { + // return err + // } + + // // remap the storedOffset's for the docs into new offsets relative + // // to storedOffset0New, filling the given docNumOffsetsOut array + // for indexOffset := indexOffset0; indexOffset <= indexOffsetN; indexOffset += fileAddrWidth { + // storedOffsetData, err := s.data.Read(int(indexOffset), int(indexOffset+fileAddrWidth)) + // if err != nil { + // return err + // } + // storedOffset := binary.BigEndian.Uint64(storedOffsetData) + // storedOffsetNew := storedOffset - storedOffset0 + storedOffset0New + // newDocNumOffsets[newDocNum] = storedOffsetNew // PANIC + // newDocNum++ + // } return nil } diff --git a/new.go b/new.go index 5af2b24..e19bf30 100644 --- a/new.go +++ b/new.go @@ -24,7 +24,6 @@ import ( "github.com/RoaringBitmap/roaring" "github.com/blevesearch/vellum" segment "github.com/blugelabs/bluge_segment_api" - "github.com/golang/snappy" ) var newSegmentBufferNumResultsBump int = 100 @@ -569,6 +568,9 @@ func (s *interim) writeStoredFields() ( // keyed by fieldID, for the current doc in the loop docStoredFields := map[uint16]interimStoredField{} + // zinc trunk + trunkWriter := NewZincTrunker(s.w) + for docNum, result := range s.results { for fieldID := range docStoredFields { // reset for next doc delete(docStoredFields, fieldID) @@ -608,26 +610,49 @@ func (s *interim) writeStoredFields() ( metaBytes := s.metaBuf.Bytes() - compressed = snappy.Encode(compressed[:cap(compressed)], data) - - docStoredOffsets[docNum] = uint64(s.w.Count()) + // compressed = snappy.Encode(compressed[:cap(compressed)], data) + // docStoredOffsets[docNum] = uint64(s.w.Count()) + docStoredOffsets[docNum] = uint64(trunkWriter.BufferSize()) - err = writeUvarints(s.w, + err = writeUvarints(trunkWriter, uint64(len(metaBytes)), - uint64(len(compressed))) + uint64(len(data))) if err != nil { return 0, err } - _, err = s.w.Write(metaBytes) + // _, err = s.w.Write(metaBytes) + _, err = trunkWriter.Write(metaBytes) if err != nil { return 0, err } - _, err = s.w.Write(compressed) + // _, err = s.w.Write(compressed) + _, err = trunkWriter.Write(data) if err != nil { return 0, err } + + // trunk line + if err := trunkWriter.NewLine(); err != nil { + return 0, err + } + + } + + // zinc trunk + trunkWriter.Flush() + // write chunk offsets + for _, offset := range trunkWriter.Offsets() { + err = binary.Write(s.w, binary.BigEndian, offset) + if err != nil { + return 0, err + } + } + // write chunk num + err = binary.Write(s.w, binary.BigEndian, uint32(trunkWriter.Len())) + if err != nil { + return 0, err } storedIndexOffset = uint64(s.w.Count()) diff --git a/read.go b/read.go index 0887433..c52d258 100644 --- a/read.go +++ b/read.go @@ -14,7 +14,11 @@ package ice -import "encoding/binary" +import ( + "encoding/binary" + + "github.com/golang/snappy" +) func (s *Segment) getDocStoredMetaAndCompressed(docNum uint64) (meta, data []byte, err error) { _, storedOffset, n, metaLen, dataLen, err := s.getDocStoredOffsets(docNum) @@ -22,15 +26,22 @@ func (s *Segment) getDocStoredMetaAndCompressed(docNum uint64) (meta, data []byt return nil, nil, err } - meta, err = s.data.Read(int(storedOffset+n), int(storedOffset+n+metaLen)) + // zinc trunk + trunI := docNum / uint64(zincTrunkerSize) + trunkOffstart := s.storedFieldTrunkOffset[int(trunI)] + trunkOffend := s.storedFieldTrunkOffset[int(trunI)+1] + compressed, err := s.data.Read(int(trunkOffstart), int(trunkOffend)) if err != nil { return nil, nil, err } - data, err = s.data.Read(int(storedOffset+n+metaLen), int(storedOffset+n+metaLen+dataLen)) + s.storedFieldTrunkUncompressed, err = snappy.Decode(s.storedFieldTrunkUncompressed[:cap(s.storedFieldTrunkUncompressed)], compressed) if err != nil { return nil, nil, err } + meta = s.storedFieldTrunkUncompressed[int(storedOffset+n):int(storedOffset+n+metaLen)] + data = s.storedFieldTrunkUncompressed[int(storedOffset+n+metaLen):int(storedOffset+n+metaLen+dataLen)] + return meta, data, nil } @@ -44,18 +55,25 @@ func (s *Segment) getDocStoredOffsets(docNum uint64) ( } storedOffset = binary.BigEndian.Uint64(storedOffsetData) - metaLenData, err := s.data.Read(int(storedOffset), int(storedOffset+binary.MaxVarintLen64)) + // zinc trunk + trunI := docNum / uint64(zincTrunkerSize) + trunkOffsetStart := s.storedFieldTrunkOffset[int(trunI)] + trunkOffsetEnd := s.storedFieldTrunkOffset[int(trunI)+1] + compressed, err := s.data.Read(int(trunkOffsetStart), int(trunkOffsetEnd)) + if err != nil { + return 0, 0, 0, 0, 0, err + } + s.storedFieldTrunkUncompressed, err = snappy.Decode(s.storedFieldTrunkUncompressed[:cap(s.storedFieldTrunkUncompressed)], compressed) if err != nil { return 0, 0, 0, 0, 0, err } + + metaLenData := s.storedFieldTrunkUncompressed[int(storedOffset):int(storedOffset+binary.MaxVarintLen64)] var read int metaLen, read = binary.Uvarint(metaLenData) n += uint64(read) - dataLenData, err := s.data.Read(int(storedOffset+n), int(storedOffset+n+binary.MaxVarintLen64)) - if err != nil { - return 0, 0, 0, 0, 0, err - } + dataLenData := s.storedFieldTrunkUncompressed[int(storedOffset+n):int(storedOffset+n+binary.MaxVarintLen64)] dataLen, read = binary.Uvarint(dataLenData) n += uint64(read) diff --git a/segment.go b/segment.go index 5b77791..d874800 100644 --- a/segment.go +++ b/segment.go @@ -25,7 +25,6 @@ import ( "github.com/RoaringBitmap/roaring" "github.com/blevesearch/vellum" segment "github.com/blugelabs/bluge_segment_api" - "github.com/golang/snappy" ) const Version uint32 = 1 @@ -41,6 +40,9 @@ type Segment struct { fieldDocs map[uint16]uint64 // fieldID -> # docs with value in field fieldFreqs map[uint16]uint64 // fieldID -> # total tokens in field + storedFieldTrunkOffset map[int]uint64 // stored field trunk offset + storedFieldTrunkUncompressed []byte // for cache + dictLocs []uint64 fieldDvReaders map[uint16]*docValueReader // naive chunk cache per field fieldDvNames []string // field names cached in fieldDvReaders @@ -199,10 +201,11 @@ func (s *Segment) visitDocument(vdc *visitDocumentCtx, num uint64, vdc.reader.Reset(meta) - uncompressed, err := snappy.Decode(vdc.buf[:cap(vdc.buf)], compressed) - if err != nil { - return err - } + // uncompressed, err := snappy.Decode(vdc.buf[:cap(vdc.buf)], compressed) + // if err != nil { + // return err + // } + uncompressed := compressed var keepGoing = true for keepGoing { diff --git a/zinc_chunker.go b/zinc_chunker.go new file mode 100644 index 0000000..c532da5 --- /dev/null +++ b/zinc_chunker.go @@ -0,0 +1,75 @@ +package ice + +import ( + "bytes" + "io" + + "github.com/golang/snappy" +) + +const zincTrunkerSize = 128 + +type zincTrunker struct { + w io.Writer + buf *bytes.Buffer + n int + bytes int + compressed []byte + offsets []uint64 +} + +func NewZincTrunker(w io.Writer) *zincTrunker { + t := &zincTrunker{ + w: w, + } + t.buf = bytes.NewBuffer(nil) + t.offsets = append(t.offsets, 0) + return t +} +func (t *zincTrunker) Write(data []byte) (int, error) { + return t.buf.Write(data) +} + +func (t *zincTrunker) NewLine() error { + t.n++ + if t.n%zincTrunkerSize != 0 { + return nil + } + return t.Flush() +} + +func (t *zincTrunker) Flush() error { + if t.buf.Len() > 0 { + t.compressed = snappy.Encode(t.compressed[:cap(t.compressed)], t.buf.Bytes()) + n, err := t.w.Write(t.compressed) + if err != nil { + return err + } + t.buf.Reset() + t.bytes += n + } + t.offsets = append(t.offsets, uint64(t.bytes)) + return nil +} + +func (t *zincTrunker) Reset() { + t.compressed = t.compressed[:0] + t.offsets = t.offsets[:0] + t.n = 0 + t.bytes = 0 + t.buf.Reset() +} + +func (t *zincTrunker) Offsets() []uint64 { + return t.offsets +} + +// Len returns trunk nums +func (t *zincTrunker) Len() int { + return len(t.offsets) +} + +// BufferSize returns buffer len +func (t *zincTrunker) BufferSize() int { + return t.buf.Len() +} From 7f1692a54fdb299e7532fc2f12833857c026ba93 Mon Sep 17 00:00:00 2001 From: yanghengfei Date: Thu, 28 Apr 2022 21:07:29 +0800 Subject: [PATCH 02/23] feat: use zstd replace snappy --- contentcoder.go | 9 +++--- docvalues.go | 7 ++--- go.mod | 2 +- go.sum | 4 +-- merge.go | 5 ++-- new.go | 2 +- read.go | 6 ++-- segment.go | 2 +- zinc_chunker.go | 8 +++-- zstd.go | 78 +++++++++++++++++++++++++++++++++++++++++++++++++ 10 files changed, 100 insertions(+), 23 deletions(-) create mode 100644 zstd.go diff --git a/contentcoder.go b/contentcoder.go index 2dc7169..8ddeaf9 100644 --- a/contentcoder.go +++ b/contentcoder.go @@ -18,8 +18,6 @@ import ( "bytes" "encoding/binary" "io" - - "github.com/golang/snappy" ) var termSeparator byte = 0xff @@ -39,7 +37,7 @@ type chunkedContentCoder struct { chunkMeta []metaData - compressed []byte // temp buf for snappy compression + compressed []byte // temp buf for compression } // metaData represents the data information inside a @@ -118,7 +116,10 @@ func (c *chunkedContentCoder) flushContents() error { metaData := c.chunkMetaBuf.Bytes() c.final = append(c.final, c.chunkMetaBuf.Bytes()...) // write the compressed data to the final data - c.compressed = snappy.Encode(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes()) + c.compressed, err = ZSTDCompress(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes(), 3) + if err != nil { + return err + } c.final = append(c.final, c.compressed...) c.chunkLens[c.currChunk] = uint64(len(c.compressed) + len(metaData)) diff --git a/docvalues.go b/docvalues.go index 382f30d..7ba4767 100644 --- a/docvalues.go +++ b/docvalues.go @@ -22,7 +22,6 @@ import ( "sort" segment "github.com/blugelabs/bluge_segment_api" - "github.com/golang/snappy" ) type docNumTermsVisitor func(docNum uint64, terms []byte) error @@ -39,7 +38,7 @@ type docValueReader struct { dvDataLoc uint64 curChunkHeader []metaData curChunkData []byte // compressed data cache - uncompressed []byte // temp buf for snappy decompression + uncompressed []byte // temp buf for decompression } func (di *docValueReader) size() int { @@ -203,7 +202,7 @@ func (di *docValueReader) iterateAllDocValues(s *Segment, visitor docNumTermsVis } // uncompress the already loaded data - uncompressed, err := snappy.Decode(di.uncompressed[:cap(di.uncompressed)], di.curChunkData) + uncompressed, err := ZSTDDecompress(di.uncompressed[:cap(di.uncompressed)], di.curChunkData) if err != nil { return err } @@ -238,7 +237,7 @@ func (di *docValueReader) visitDocValues(docNum uint64, uncompressed = di.uncompressed } else { // uncompress the already loaded data - uncompressed, err = snappy.Decode(di.uncompressed[:cap(di.uncompressed)], di.curChunkData) + uncompressed, err = ZSTDDecompress(di.uncompressed[:cap(di.uncompressed)], di.curChunkData) if err != nil { return err } diff --git a/go.mod b/go.mod index bdea3b2..70b0295 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,6 @@ require ( github.com/blevesearch/mmap-go v1.0.2 github.com/blevesearch/vellum v1.0.5 github.com/blugelabs/bluge_segment_api v0.2.0 - github.com/golang/snappy v0.0.1 + github.com/klauspost/compress v1.15.2 github.com/spf13/cobra v0.0.5 ) diff --git a/go.sum b/go.sum index caffd6f..d2c3d35 100644 --- a/go.sum +++ b/go.sum @@ -20,11 +20,11 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/klauspost/compress v1.15.2 h1:3WH+AG7s2+T8o3nrM/8u2rdqUEcQhmga7smjrT41nAw= +github.com/klauspost/compress v1.15.2/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= diff --git a/merge.go b/merge.go index 261af59..b2d8902 100644 --- a/merge.go +++ b/merge.go @@ -27,7 +27,6 @@ import ( "github.com/RoaringBitmap/roaring" "github.com/blevesearch/vellum" segment "github.com/blugelabs/bluge_segment_api" - "github.com/golang/snappy" ) const docDropped = math.MaxInt64 // sentinel docNum to represent a deleted doc @@ -756,7 +755,7 @@ func mergeStoredAndRemapSegment(seg *Segment, dropsI *roaring.Bitmap, segNewDocN metaBytes := metaBuf.Bytes() - // compressed = snappy.Encode(compressed[:cap(compressed)], data) + // compressed = ZSTDCompress(compressed[:cap(compressed)], data, 3) // record where we're about to start writing docNumOffsets[newDocNum] = uint64(trunkWriter.BufferSize()) @@ -811,7 +810,7 @@ func (s *Segment) copyStoredDocs(newDocNum uint64, newDocNumOffsets []uint64, my if err != nil { return err } - uncompressed, err = snappy.Decode(uncompressed[:cap(uncompressed)], compressed) + uncompressed, err = ZSTDDecompress(uncompressed[:cap(uncompressed)], compressed) if err != nil { log.Panic(err) return err diff --git a/new.go b/new.go index e19bf30..06e10c9 100644 --- a/new.go +++ b/new.go @@ -610,7 +610,7 @@ func (s *interim) writeStoredFields() ( metaBytes := s.metaBuf.Bytes() - // compressed = snappy.Encode(compressed[:cap(compressed)], data) + // compressed = ZSTDCompress(compressed[:cap(compressed)], data, 3) // docStoredOffsets[docNum] = uint64(s.w.Count()) docStoredOffsets[docNum] = uint64(trunkWriter.BufferSize()) diff --git a/read.go b/read.go index c52d258..62e5239 100644 --- a/read.go +++ b/read.go @@ -16,8 +16,6 @@ package ice import ( "encoding/binary" - - "github.com/golang/snappy" ) func (s *Segment) getDocStoredMetaAndCompressed(docNum uint64) (meta, data []byte, err error) { @@ -34,7 +32,7 @@ func (s *Segment) getDocStoredMetaAndCompressed(docNum uint64) (meta, data []byt if err != nil { return nil, nil, err } - s.storedFieldTrunkUncompressed, err = snappy.Decode(s.storedFieldTrunkUncompressed[:cap(s.storedFieldTrunkUncompressed)], compressed) + s.storedFieldTrunkUncompressed, err = ZSTDDecompress(s.storedFieldTrunkUncompressed[:cap(s.storedFieldTrunkUncompressed)], compressed) if err != nil { return nil, nil, err } @@ -63,7 +61,7 @@ func (s *Segment) getDocStoredOffsets(docNum uint64) ( if err != nil { return 0, 0, 0, 0, 0, err } - s.storedFieldTrunkUncompressed, err = snappy.Decode(s.storedFieldTrunkUncompressed[:cap(s.storedFieldTrunkUncompressed)], compressed) + s.storedFieldTrunkUncompressed, err = ZSTDDecompress(s.storedFieldTrunkUncompressed[:cap(s.storedFieldTrunkUncompressed)], compressed) if err != nil { return 0, 0, 0, 0, 0, err } diff --git a/segment.go b/segment.go index d874800..b6d4af5 100644 --- a/segment.go +++ b/segment.go @@ -201,7 +201,7 @@ func (s *Segment) visitDocument(vdc *visitDocumentCtx, num uint64, vdc.reader.Reset(meta) - // uncompressed, err := snappy.Decode(vdc.buf[:cap(vdc.buf)], compressed) + // uncompressed, err := ZSTDDecompress(vdc.buf[:cap(vdc.buf)], compressed) // if err != nil { // return err // } diff --git a/zinc_chunker.go b/zinc_chunker.go index c532da5..bb79265 100644 --- a/zinc_chunker.go +++ b/zinc_chunker.go @@ -3,8 +3,6 @@ package ice import ( "bytes" "io" - - "github.com/golang/snappy" ) const zincTrunkerSize = 128 @@ -40,7 +38,11 @@ func (t *zincTrunker) NewLine() error { func (t *zincTrunker) Flush() error { if t.buf.Len() > 0 { - t.compressed = snappy.Encode(t.compressed[:cap(t.compressed)], t.buf.Bytes()) + var err error + t.compressed, err = ZSTDCompress(t.compressed[:cap(t.compressed)], t.buf.Bytes(), 3) + if err != nil { + return err + } n, err := t.w.Write(t.compressed) if err != nil { return err diff --git a/zstd.go b/zstd.go new file mode 100644 index 0000000..2949eb9 --- /dev/null +++ b/zstd.go @@ -0,0 +1,78 @@ +/* + * Copyright 2019 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ice + +import ( + "fmt" + "log" + "sync" + + "github.com/klauspost/compress/zstd" +) + +var ( + decoder *zstd.Decoder + encoder *zstd.Encoder + + encOnce, decOnce sync.Once +) + +// ZSTDDecompress decompresses a block using ZSTD algorithm. +func ZSTDDecompress(dst, src []byte) ([]byte, error) { + decOnce.Do(func() { + var err error + decoder, err = zstd.NewReader(nil) + Check(err) + }) + return decoder.DecodeAll(src, dst[:0]) +} + +// ZSTDCompress compresses a block using ZSTD algorithm. +func ZSTDCompress(dst, src []byte, compressionLevel int) ([]byte, error) { + encOnce.Do(func() { + var err error + level := zstd.EncoderLevelFromZstd(compressionLevel) + encoder, err = zstd.NewWriter(nil, zstd.WithEncoderLevel(level)) + Check(err) + }) + return encoder.EncodeAll(src, dst[:0]), nil +} + +// ZSTDCompressBound returns the worst case size needed for a destination buffer. +// Klauspost ZSTD library does not provide any API for Compression Bound. This +// calculation is based on the DataDog ZSTD library. +// See https://pkg.go.dev/github.com/DataDog/zstd#CompressBound +func ZSTDCompressBound(srcSize int) int { + lowLimit := 128 << 10 // 128 kB + var margin int + if srcSize < lowLimit { + margin = (lowLimit - srcSize) >> 11 + } + return srcSize + (srcSize >> 8) + margin +} + +// Check logs fatal if err != nil. +func Check(err error) { + if err != nil { + log.Fatalf("%+v", Wrap(err, "")) + } +} + +// Wrap wraps errors from external lib. +func Wrap(err error, msg string) error { + return fmt.Errorf("%s err: %+v", msg, err) +} From a5cb202ed5ef5cdc40cad0d2724fba961c14a21e Mon Sep 17 00:00:00 2001 From: yanghengfei Date: Sun, 1 May 2022 13:35:26 +0800 Subject: [PATCH 03/23] feat: compress docValue --- contentcoder.go | 12 ++++++--- docvalues.go | 71 +++++++++++++++++++++++-------------------------- 2 files changed, 42 insertions(+), 41 deletions(-) diff --git a/contentcoder.go b/contentcoder.go index 8ddeaf9..4e65217 100644 --- a/contentcoder.go +++ b/contentcoder.go @@ -112,17 +112,21 @@ func (c *chunkedContentCoder) flushContents() error { } } + // merge chunkBuf + c.chunkBuf.WriteTo(&c.chunkMetaBuf) + // write the metadata to final data - metaData := c.chunkMetaBuf.Bytes() - c.final = append(c.final, c.chunkMetaBuf.Bytes()...) + // metaData := c.chunkMetaBuf.Bytes() + // c.final = append(c.final, c.chunkMetaBuf.Bytes()...) // write the compressed data to the final data - c.compressed, err = ZSTDCompress(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes(), 3) + c.compressed, err = ZSTDCompress(c.compressed[:cap(c.compressed)], c.chunkMetaBuf.Bytes(), 3) if err != nil { return err } c.final = append(c.final, c.compressed...) - c.chunkLens[c.currChunk] = uint64(len(c.compressed) + len(metaData)) + // c.chunkLens[c.currChunk] = uint64(len(c.compressed) + len(metaData)) + c.chunkLens[c.currChunk] = uint64(len(c.compressed)) if c.progressiveWrite { _, err := c.w.Write(c.final) diff --git a/docvalues.go b/docvalues.go index 7ba4767..ff2a70b 100644 --- a/docvalues.go +++ b/docvalues.go @@ -145,11 +145,21 @@ func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *Segment) error { destChunkDataLoc += start curChunkEnd += end - // read the number of docs reside in the chunk - numDocsData, err := s.data.Read(int(destChunkDataLoc), int(destChunkDataLoc+binary.MaxVarintLen64)) + // load compressed data + curChunkData, err := s.data.Read(int(destChunkDataLoc), int(curChunkEnd)) + if err != nil { + return err + } + // uncompress data + di.uncompressed, err = ZSTDDecompress(di.uncompressed[:cap(di.uncompressed)], curChunkData) if err != nil { return err } + // reset data start offset + destChunkDataLoc = 0 + + // read the number of docs reside in the chunk + numDocsData := di.uncompressed[int(destChunkDataLoc):int(destChunkDataLoc+binary.MaxVarintLen64)] numDocs, read := binary.Uvarint(numDocsData) if read <= 0 { return fmt.Errorf("failed to read the chunk") @@ -163,29 +173,16 @@ func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *Segment) error { di.curChunkHeader = di.curChunkHeader[:int(numDocs)] } for i := 0; i < int(numDocs); i++ { - var docNumData []byte - docNumData, err = s.data.Read(int(chunkMetaLoc+offset), int(chunkMetaLoc+offset+binary.MaxVarintLen64)) - if err != nil { - return err - } + docNumData := di.uncompressed[int(chunkMetaLoc+offset):int(chunkMetaLoc+offset+binary.MaxVarintLen64)] di.curChunkHeader[i].DocNum, read = binary.Uvarint(docNumData) offset += uint64(read) - var docDvOffsetData []byte - docDvOffsetData, err = s.data.Read(int(chunkMetaLoc+offset), int(chunkMetaLoc+offset+binary.MaxVarintLen64)) - if err != nil { - return err - } + docDvOffsetData := di.uncompressed[int(chunkMetaLoc+offset):int(chunkMetaLoc+offset+binary.MaxVarintLen64)] di.curChunkHeader[i].DocDvOffset, read = binary.Uvarint(docDvOffsetData) offset += uint64(read) } compressedDataLoc := chunkMetaLoc + offset - dataLength := curChunkEnd - compressedDataLoc - curChunkData, err := s.data.Read(int(compressedDataLoc), int(compressedDataLoc+dataLength)) - if err != nil { - return err - } - di.curChunkData = curChunkData + di.curChunkData = di.uncompressed[int(compressedDataLoc):] di.curChunkNum = chunkNumber di.uncompressed = di.uncompressed[:0] return nil @@ -202,15 +199,15 @@ func (di *docValueReader) iterateAllDocValues(s *Segment, visitor docNumTermsVis } // uncompress the already loaded data - uncompressed, err := ZSTDDecompress(di.uncompressed[:cap(di.uncompressed)], di.curChunkData) - if err != nil { - return err - } - di.uncompressed = uncompressed + // uncompressed, err := ZSTDDecompress(di.uncompressed[:cap(di.uncompressed)], di.curChunkData) + // if err != nil { + // return err + // } + // di.uncompressed = uncompressed start := uint64(0) for _, entry := range di.curChunkHeader { - err = visitor(entry.DocNum, uncompressed[start:entry.DocDvOffset]) + err = visitor(entry.DocNum, di.curChunkData[start:entry.DocDvOffset]) if err != nil { return err } @@ -230,22 +227,22 @@ func (di *docValueReader) visitDocValues(docNum uint64, return nil } - var uncompressed []byte - var err error + // var uncompressed []byte + // var err error // use the uncompressed copy if available - if len(di.uncompressed) > 0 { - uncompressed = di.uncompressed - } else { - // uncompress the already loaded data - uncompressed, err = ZSTDDecompress(di.uncompressed[:cap(di.uncompressed)], di.curChunkData) - if err != nil { - return err - } - di.uncompressed = uncompressed - } + // if len(di.uncompressed) > 0 { + // uncompressed = di.uncompressed + // } else { + // uncompress the already loaded data + // uncompressed, err = ZSTDDecompress(di.uncompressed[:cap(di.uncompressed)], di.curChunkData) + // if err != nil { + // return err + // } + // di.uncompressed = uncompressed + // } // pick the terms for the given docNum - uncompressed = uncompressed[start:end] + uncompressed := di.curChunkData[start:end] for { i := bytes.Index(uncompressed, termSeparatorSplitSlice) if i < 0 { From c3633edb173f385a576e39e330eb14af6fd7e698 Mon Sep 17 00:00:00 2001 From: yanghengfei Date: Sun, 1 May 2022 13:41:15 +0800 Subject: [PATCH 04/23] feat: reback the docValue compress --- contentcoder.go | 12 +++------ docvalues.go | 71 ++++++++++++++++++++++++++----------------------- 2 files changed, 41 insertions(+), 42 deletions(-) diff --git a/contentcoder.go b/contentcoder.go index 4e65217..8ddeaf9 100644 --- a/contentcoder.go +++ b/contentcoder.go @@ -112,21 +112,17 @@ func (c *chunkedContentCoder) flushContents() error { } } - // merge chunkBuf - c.chunkBuf.WriteTo(&c.chunkMetaBuf) - // write the metadata to final data - // metaData := c.chunkMetaBuf.Bytes() - // c.final = append(c.final, c.chunkMetaBuf.Bytes()...) + metaData := c.chunkMetaBuf.Bytes() + c.final = append(c.final, c.chunkMetaBuf.Bytes()...) // write the compressed data to the final data - c.compressed, err = ZSTDCompress(c.compressed[:cap(c.compressed)], c.chunkMetaBuf.Bytes(), 3) + c.compressed, err = ZSTDCompress(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes(), 3) if err != nil { return err } c.final = append(c.final, c.compressed...) - // c.chunkLens[c.currChunk] = uint64(len(c.compressed) + len(metaData)) - c.chunkLens[c.currChunk] = uint64(len(c.compressed)) + c.chunkLens[c.currChunk] = uint64(len(c.compressed) + len(metaData)) if c.progressiveWrite { _, err := c.w.Write(c.final) diff --git a/docvalues.go b/docvalues.go index ff2a70b..7ba4767 100644 --- a/docvalues.go +++ b/docvalues.go @@ -145,21 +145,11 @@ func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *Segment) error { destChunkDataLoc += start curChunkEnd += end - // load compressed data - curChunkData, err := s.data.Read(int(destChunkDataLoc), int(curChunkEnd)) - if err != nil { - return err - } - // uncompress data - di.uncompressed, err = ZSTDDecompress(di.uncompressed[:cap(di.uncompressed)], curChunkData) + // read the number of docs reside in the chunk + numDocsData, err := s.data.Read(int(destChunkDataLoc), int(destChunkDataLoc+binary.MaxVarintLen64)) if err != nil { return err } - // reset data start offset - destChunkDataLoc = 0 - - // read the number of docs reside in the chunk - numDocsData := di.uncompressed[int(destChunkDataLoc):int(destChunkDataLoc+binary.MaxVarintLen64)] numDocs, read := binary.Uvarint(numDocsData) if read <= 0 { return fmt.Errorf("failed to read the chunk") @@ -173,16 +163,29 @@ func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *Segment) error { di.curChunkHeader = di.curChunkHeader[:int(numDocs)] } for i := 0; i < int(numDocs); i++ { - docNumData := di.uncompressed[int(chunkMetaLoc+offset):int(chunkMetaLoc+offset+binary.MaxVarintLen64)] + var docNumData []byte + docNumData, err = s.data.Read(int(chunkMetaLoc+offset), int(chunkMetaLoc+offset+binary.MaxVarintLen64)) + if err != nil { + return err + } di.curChunkHeader[i].DocNum, read = binary.Uvarint(docNumData) offset += uint64(read) - docDvOffsetData := di.uncompressed[int(chunkMetaLoc+offset):int(chunkMetaLoc+offset+binary.MaxVarintLen64)] + var docDvOffsetData []byte + docDvOffsetData, err = s.data.Read(int(chunkMetaLoc+offset), int(chunkMetaLoc+offset+binary.MaxVarintLen64)) + if err != nil { + return err + } di.curChunkHeader[i].DocDvOffset, read = binary.Uvarint(docDvOffsetData) offset += uint64(read) } compressedDataLoc := chunkMetaLoc + offset - di.curChunkData = di.uncompressed[int(compressedDataLoc):] + dataLength := curChunkEnd - compressedDataLoc + curChunkData, err := s.data.Read(int(compressedDataLoc), int(compressedDataLoc+dataLength)) + if err != nil { + return err + } + di.curChunkData = curChunkData di.curChunkNum = chunkNumber di.uncompressed = di.uncompressed[:0] return nil @@ -199,15 +202,15 @@ func (di *docValueReader) iterateAllDocValues(s *Segment, visitor docNumTermsVis } // uncompress the already loaded data - // uncompressed, err := ZSTDDecompress(di.uncompressed[:cap(di.uncompressed)], di.curChunkData) - // if err != nil { - // return err - // } - // di.uncompressed = uncompressed + uncompressed, err := ZSTDDecompress(di.uncompressed[:cap(di.uncompressed)], di.curChunkData) + if err != nil { + return err + } + di.uncompressed = uncompressed start := uint64(0) for _, entry := range di.curChunkHeader { - err = visitor(entry.DocNum, di.curChunkData[start:entry.DocDvOffset]) + err = visitor(entry.DocNum, uncompressed[start:entry.DocDvOffset]) if err != nil { return err } @@ -227,22 +230,22 @@ func (di *docValueReader) visitDocValues(docNum uint64, return nil } - // var uncompressed []byte - // var err error + var uncompressed []byte + var err error // use the uncompressed copy if available - // if len(di.uncompressed) > 0 { - // uncompressed = di.uncompressed - // } else { - // uncompress the already loaded data - // uncompressed, err = ZSTDDecompress(di.uncompressed[:cap(di.uncompressed)], di.curChunkData) - // if err != nil { - // return err - // } - // di.uncompressed = uncompressed - // } + if len(di.uncompressed) > 0 { + uncompressed = di.uncompressed + } else { + // uncompress the already loaded data + uncompressed, err = ZSTDDecompress(di.uncompressed[:cap(di.uncompressed)], di.curChunkData) + if err != nil { + return err + } + di.uncompressed = uncompressed + } // pick the terms for the given docNum - uncompressed := di.curChunkData[start:end] + uncompressed = uncompressed[start:end] for { i := bytes.Index(uncompressed, termSeparatorSplitSlice) if i < 0 { From 4662783ee2c89c13fcff22a2592394f68fcaa034 Mon Sep 17 00:00:00 2001 From: yanghengfei Date: Sun, 1 May 2022 22:17:56 +0800 Subject: [PATCH 05/23] feat: packed docNum and Offset for docValue --- contentcoder.go | 6 +++++- docvalues.go | 7 +++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/contentcoder.go b/contentcoder.go index 8ddeaf9..be94369 100644 --- a/contentcoder.go +++ b/contentcoder.go @@ -105,11 +105,15 @@ func (c *chunkedContentCoder) flushContents() error { } // write out the metaData slice + diffDocNum := uint64(0) + diffDvOffset := uint64(0) for _, meta := range c.chunkMeta { - err := writeUvarints(&c.chunkMetaBuf, meta.DocNum, meta.DocDvOffset) + err := writeUvarints(&c.chunkMetaBuf, meta.DocNum-diffDocNum, meta.DocDvOffset-diffDvOffset) if err != nil { return err } + diffDocNum = meta.DocNum + diffDvOffset = meta.DocDvOffset } // write the metadata to final data diff --git a/docvalues.go b/docvalues.go index 7ba4767..b8923c6 100644 --- a/docvalues.go +++ b/docvalues.go @@ -162,6 +162,9 @@ func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *Segment) error { } else { di.curChunkHeader = di.curChunkHeader[:int(numDocs)] } + + diffDocNum := uint64(0) + diffDvOffset := uint64(0) for i := 0; i < int(numDocs); i++ { var docNumData []byte docNumData, err = s.data.Read(int(chunkMetaLoc+offset), int(chunkMetaLoc+offset+binary.MaxVarintLen64)) @@ -169,6 +172,8 @@ func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *Segment) error { return err } di.curChunkHeader[i].DocNum, read = binary.Uvarint(docNumData) + di.curChunkHeader[i].DocNum += diffDocNum + diffDocNum = di.curChunkHeader[i].DocNum offset += uint64(read) var docDvOffsetData []byte docDvOffsetData, err = s.data.Read(int(chunkMetaLoc+offset), int(chunkMetaLoc+offset+binary.MaxVarintLen64)) @@ -176,6 +181,8 @@ func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *Segment) error { return err } di.curChunkHeader[i].DocDvOffset, read = binary.Uvarint(docDvOffsetData) + di.curChunkHeader[i].DocDvOffset += diffDvOffset + diffDvOffset = di.curChunkHeader[i].DocDvOffset offset += uint64(read) } From 439e31ada67de80ef0699f61b0a8d4bb98e3d202 Mon Sep 17 00:00:00 2001 From: yanghengfei Date: Sun, 1 May 2022 23:56:41 +0800 Subject: [PATCH 06/23] doc: update go.mod --- go.mod | 6 +++--- go.sum | 12 ++++++------ new.go | 3 --- zstd.go | 8 +------- 4 files changed, 10 insertions(+), 19 deletions(-) diff --git a/go.mod b/go.mod index 70b0295..a3a240f 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,9 @@ module github.com/blugelabs/ice go 1.12 require ( - github.com/RoaringBitmap/roaring v0.9.1 - github.com/blevesearch/mmap-go v1.0.2 - github.com/blevesearch/vellum v1.0.5 + github.com/RoaringBitmap/roaring v0.9.4 + github.com/blevesearch/mmap-go v1.0.3 + github.com/blevesearch/vellum v1.0.7 github.com/blugelabs/bluge_segment_api v0.2.0 github.com/klauspost/compress v1.15.2 github.com/spf13/cobra v0.0.5 diff --git a/go.sum b/go.sum index d2c3d35..3f1c5f6 100644 --- a/go.sum +++ b/go.sum @@ -1,15 +1,16 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/RoaringBitmap/gocroaring v0.4.0/go.mod h1:NieMwz7ZqwU2DD73/vvYwv7r4eWBKuPVSXZIpsaMwCI= github.com/RoaringBitmap/real-roaring-datasets v0.0.0-20190726190000-eb7c87156f76/go.mod h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE= -github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM= github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc= +github.com/RoaringBitmap/roaring v0.9.4 h1:ckvZSX5gwCRaJYBNe7syNawCU5oruY9gQmjXlp4riwo= +github.com/RoaringBitmap/roaring v0.9.4/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/bits-and-blooms/bitset v1.2.0 h1:Kn4yilvwNtMACtf1eYDlG8H77R07mZSPbMjLyS07ChA= github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= -github.com/blevesearch/mmap-go v1.0.2 h1:JtMHb+FgQCTTYIhtMvimw15dJwu1Y5lrZDMOFXVWPk0= -github.com/blevesearch/mmap-go v1.0.2/go.mod h1:ol2qBqYaOUsGdm7aRMRrYGgPvnwLe6Y+7LMvAB5IbSA= -github.com/blevesearch/vellum v1.0.5 h1:L5dJ7hKauRVbuH7I8uqLeSK92CPPY6FfrbAmLhAug8A= -github.com/blevesearch/vellum v1.0.5/go.mod h1:atE0EH3fvk43zzS7t1YNdNC7DbmcC3uz+eMD5xZ2OyQ= +github.com/blevesearch/mmap-go v1.0.3 h1:7QkALgFNooSq3a46AE+pWeKASAZc9SiNFJhDGF1NDx4= +github.com/blevesearch/mmap-go v1.0.3/go.mod h1:pYvKl/grLQrBxuaRYgoTssa4rVujYYeenDp++2E+yvs= +github.com/blevesearch/vellum v1.0.7 h1:+vn8rfyCRHxKVRgDLeR0FAXej2+6mEb5Q15aQE/XESQ= +github.com/blevesearch/vellum v1.0.7/go.mod h1:doBZpmRhwTsASB4QdUZANlJvqVAUdUyX0ZK7QJCTeBE= github.com/blugelabs/bluge_segment_api v0.2.0 h1:cCX1Y2y8v0LZ7+EEJ6gH7dW6TtVTW4RhG0vp3R+N2Lo= github.com/blugelabs/bluge_segment_api v0.2.0/go.mod h1:95XA+ZXfRj/IXADm7gZ+iTcWOJPg5jQTY1EReIzl3LA= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= @@ -50,7 +51,6 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljT github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a h1:aYOabOQFp6Vj6W1F80affTUvO9UxmJRx8K0gsfABByQ= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/new.go b/new.go index 06e10c9..1811803 100644 --- a/new.go +++ b/new.go @@ -609,9 +609,6 @@ func (s *interim) writeStoredFields() ( } metaBytes := s.metaBuf.Bytes() - - // compressed = ZSTDCompress(compressed[:cap(compressed)], data, 3) - // docStoredOffsets[docNum] = uint64(s.w.Count()) docStoredOffsets[docNum] = uint64(trunkWriter.BufferSize()) err = writeUvarints(trunkWriter, diff --git a/zstd.go b/zstd.go index 2949eb9..1381085 100644 --- a/zstd.go +++ b/zstd.go @@ -17,7 +17,6 @@ package ice import ( - "fmt" "log" "sync" @@ -68,11 +67,6 @@ func ZSTDCompressBound(srcSize int) int { // Check logs fatal if err != nil. func Check(err error) { if err != nil { - log.Fatalf("%+v", Wrap(err, "")) + log.Fatalf("%+v", err) } } - -// Wrap wraps errors from external lib. -func Wrap(err error, msg string) error { - return fmt.Errorf("%s err: %+v", msg, err) -} From 0753dc92efcb817b0860675f11c0c89b5028d6d4 Mon Sep 17 00:00:00 2001 From: yanghengfei Date: Mon, 2 May 2022 00:03:54 +0800 Subject: [PATCH 07/23] feat: packed numeric of posting list --- posting.go | 3 +++ write.go | 6 +++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/posting.go b/posting.go index c0dc2d4..404cfbb 100644 --- a/posting.go +++ b/posting.go @@ -258,6 +258,9 @@ func (p *PostingsList) read(postingsOffset uint64, d *Dictionary) error { return err } p.locOffset, read = binary.Uvarint(locOffsetData) + if p.locOffset > 0 && p.freqOffset > 0 { + p.locOffset += p.freqOffset + } n += uint64(read) postingsLenData, err := d.sb.data.Read(int(postingsOffset+n), int(postingsOffset+n+binary.MaxVarintLen64)) diff --git a/write.go b/write.go index b42f214..cad88c2 100644 --- a/write.go +++ b/write.go @@ -90,7 +90,11 @@ func writePostings(postings *roaring.Bitmap, tfEncoder, locEncoder *chunkedIntCo return 0, err } - n = binary.PutUvarint(bufMaxVarintLen64, locOffset) + if locOffset > 0 && tfOffset > 0 { + n = binary.PutUvarint(bufMaxVarintLen64, locOffset-tfOffset) + } else { + n = binary.PutUvarint(bufMaxVarintLen64, locOffset) + } _, err = w.Write(bufMaxVarintLen64[:n]) if err != nil { return 0, err From 3061032427f2873c3b9a248b3261f694d22cc424 Mon Sep 17 00:00:00 2001 From: yanghengfei Date: Mon, 2 May 2022 00:05:41 +0800 Subject: [PATCH 08/23] feat: compress numeric of posting list --- posting.go | 3 --- write.go | 6 +----- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/posting.go b/posting.go index 404cfbb..c0dc2d4 100644 --- a/posting.go +++ b/posting.go @@ -258,9 +258,6 @@ func (p *PostingsList) read(postingsOffset uint64, d *Dictionary) error { return err } p.locOffset, read = binary.Uvarint(locOffsetData) - if p.locOffset > 0 && p.freqOffset > 0 { - p.locOffset += p.freqOffset - } n += uint64(read) postingsLenData, err := d.sb.data.Read(int(postingsOffset+n), int(postingsOffset+n+binary.MaxVarintLen64)) diff --git a/write.go b/write.go index cad88c2..b42f214 100644 --- a/write.go +++ b/write.go @@ -90,11 +90,7 @@ func writePostings(postings *roaring.Bitmap, tfEncoder, locEncoder *chunkedIntCo return 0, err } - if locOffset > 0 && tfOffset > 0 { - n = binary.PutUvarint(bufMaxVarintLen64, locOffset-tfOffset) - } else { - n = binary.PutUvarint(bufMaxVarintLen64, locOffset) - } + n = binary.PutUvarint(bufMaxVarintLen64, locOffset) _, err = w.Write(bufMaxVarintLen64[:n]) if err != nil { return 0, err From b4acb45dc3cfef69bf028148eac29937519875d5 Mon Sep 17 00:00:00 2001 From: yanghengfei Date: Mon, 2 May 2022 10:03:14 +0800 Subject: [PATCH 09/23] feat: packed numeric of posting list --- posting.go | 3 +++ write.go | 6 +++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/posting.go b/posting.go index c0dc2d4..404cfbb 100644 --- a/posting.go +++ b/posting.go @@ -258,6 +258,9 @@ func (p *PostingsList) read(postingsOffset uint64, d *Dictionary) error { return err } p.locOffset, read = binary.Uvarint(locOffsetData) + if p.locOffset > 0 && p.freqOffset > 0 { + p.locOffset += p.freqOffset + } n += uint64(read) postingsLenData, err := d.sb.data.Read(int(postingsOffset+n), int(postingsOffset+n+binary.MaxVarintLen64)) diff --git a/write.go b/write.go index b42f214..cad88c2 100644 --- a/write.go +++ b/write.go @@ -90,7 +90,11 @@ func writePostings(postings *roaring.Bitmap, tfEncoder, locEncoder *chunkedIntCo return 0, err } - n = binary.PutUvarint(bufMaxVarintLen64, locOffset) + if locOffset > 0 && tfOffset > 0 { + n = binary.PutUvarint(bufMaxVarintLen64, locOffset-tfOffset) + } else { + n = binary.PutUvarint(bufMaxVarintLen64, locOffset) + } _, err = w.Write(bufMaxVarintLen64[:n]) if err != nil { return 0, err From 3442f5a7fd0575cbff21a7c4d196afe9c0c85dcc Mon Sep 17 00:00:00 2001 From: yanghengfei Date: Mon, 2 May 2022 11:42:07 +0800 Subject: [PATCH 10/23] feat: compress intcoder --- intcoder.go | 16 +++++++++++----- intdecoder.go | 8 +++++++- merge.go | 2 -- zinc_chunker.go | 2 +- 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/intcoder.go b/intcoder.go index 28749b1..5696312 100644 --- a/intcoder.go +++ b/intcoder.go @@ -33,7 +33,8 @@ type chunkedIntCoder struct { chunkLens []uint64 currChunk uint64 - buf []byte + buf []byte + compressed []byte } // newChunkedIntCoder returns a new chunk int coder which packs data into @@ -101,11 +102,16 @@ func (c *chunkedIntCoder) Add(docNum uint64, vals ...uint64) error { // Close indicates you are done calling Add() this allows the final chunk // to be encoded. -func (c *chunkedIntCoder) Close() { - encodingBytes := c.chunkBuf.Bytes() - c.chunkLens[c.currChunk] = uint64(len(encodingBytes)) - c.final = append(c.final, encodingBytes...) +func (c *chunkedIntCoder) Close() error { + var err error + c.compressed, err = ZSTDCompress(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes(), 3) + if err != nil { + return err + } + c.chunkLens[c.currChunk] = uint64(len(c.compressed)) + c.final = append(c.final, c.compressed...) c.currChunk = uint64(cap(c.chunkLens)) // sentinel to detect double close + return nil } // Write commits all the encoded chunked integers to the provided writer. diff --git a/intdecoder.go b/intdecoder.go index b9a0575..690a8b3 100644 --- a/intdecoder.go +++ b/intdecoder.go @@ -26,6 +26,7 @@ type chunkedIntDecoder struct { dataStartOffset uint64 chunkOffsets []uint64 curChunkBytes []byte + uncompressed []byte // temp buf for decompression data *segment.Data r *memUvarintReader } @@ -86,7 +87,11 @@ func (d *chunkedIntDecoder) loadChunk(chunk int) error { if err != nil { return err } - d.curChunkBytes = curChunkBytesData + d.uncompressed, err = ZSTDDecompress(d.uncompressed[:cap(d.uncompressed)], curChunkBytesData) + if err != nil { + return err + } + d.curChunkBytes = d.uncompressed if d.r == nil { d.r = newMemUvarintReader(d.curChunkBytes) } else { @@ -101,6 +106,7 @@ func (d *chunkedIntDecoder) reset() { d.dataStartOffset = 0 d.chunkOffsets = d.chunkOffsets[:0] d.curChunkBytes = d.curChunkBytes[:0] + d.uncompressed = d.uncompressed[:0] // FIXME what? // d.data = d.data[:0] diff --git a/merge.go b/merge.go index b2d8902..727b280 100644 --- a/merge.go +++ b/merge.go @@ -755,8 +755,6 @@ func mergeStoredAndRemapSegment(seg *Segment, dropsI *roaring.Bitmap, segNewDocN metaBytes := metaBuf.Bytes() - // compressed = ZSTDCompress(compressed[:cap(compressed)], data, 3) - // record where we're about to start writing docNumOffsets[newDocNum] = uint64(trunkWriter.BufferSize()) diff --git a/zinc_chunker.go b/zinc_chunker.go index bb79265..5885347 100644 --- a/zinc_chunker.go +++ b/zinc_chunker.go @@ -47,8 +47,8 @@ func (t *zincTrunker) Flush() error { if err != nil { return err } - t.buf.Reset() t.bytes += n + t.buf.Reset() } t.offsets = append(t.offsets, uint64(t.bytes)) return nil From 152a0114c76548934996174cab52de39c50bd7ba Mon Sep 17 00:00:00 2001 From: yanghengfei Date: Tue, 3 May 2022 13:14:31 +0800 Subject: [PATCH 11/23] feat: run optimize on bitmap --- write.go | 1 + 1 file changed, 1 insertion(+) diff --git a/write.go b/write.go index cad88c2..d177380 100644 --- a/write.go +++ b/write.go @@ -131,6 +131,7 @@ func numUvarintBytes(x uint64) (n int) { // then writes out the roaring bitmap itself func writeRoaringWithLen(r *roaring.Bitmap, w io.Writer, reuseBufVarint []byte) (int, error) { + r.RunOptimize() buf, err := r.ToBytes() if err != nil { return 0, err From c8cc4b7ca22472f7ea86c2082c752ef512e1dd0a Mon Sep 17 00:00:00 2001 From: yanghengfei Date: Tue, 3 May 2022 16:31:05 +0800 Subject: [PATCH 12/23] feat: optimize document values chunk --- documentcoder.go | 107 ++++++++++++++++++++++++++++++++++++++++++ documentcoder_test.go | 1 + load.go | 40 +++++++++++----- merge.go | 97 +++++++++++--------------------------- new.go | 34 +++++--------- read.go | 46 ++++++------------ segment.go | 4 +- zinc_chunker.go | 77 ------------------------------ zstd.go | 15 +++--- 9 files changed, 196 insertions(+), 225 deletions(-) create mode 100644 documentcoder.go create mode 100644 documentcoder_test.go delete mode 100644 zinc_chunker.go diff --git a/documentcoder.go b/documentcoder.go new file mode 100644 index 0000000..25e1ec7 --- /dev/null +++ b/documentcoder.go @@ -0,0 +1,107 @@ +package ice + +import ( + "bytes" + "encoding/binary" + "io" +) + +const defaultDocumentChunkSize uint32 = 128 + +type chunkedDocumentCoder struct { + chunkSize uint64 + w io.Writer + buf *bytes.Buffer + metaBuf []byte + n uint64 + bytes uint64 + compressed []byte + offsets []uint64 +} + +func NewChunkedDocumentCoder(chunkSize uint64, w io.Writer) *chunkedDocumentCoder { + t := &chunkedDocumentCoder{ + chunkSize: chunkSize, + w: w, + } + t.buf = bytes.NewBuffer(nil) + t.metaBuf = make([]byte, binary.MaxVarintLen64) + t.offsets = append(t.offsets, 0) + return t +} + +func (t *chunkedDocumentCoder) Write(data []byte) (int, error) { + return t.buf.Write(data) +} + +func (t *chunkedDocumentCoder) NewLine() error { + t.n++ + if t.n%t.chunkSize != 0 { + return nil + } + return t.Flush() +} + +func (t *chunkedDocumentCoder) Flush() error { + if t.buf.Len() > 0 { + var err error + t.compressed, err = ZSTDCompress(t.compressed[:cap(t.compressed)], t.buf.Bytes(), 3) + if err != nil { + return err + } + n, err := t.w.Write(t.compressed) + if err != nil { + return err + } + t.bytes += uint64(n) + t.buf.Reset() + } + t.offsets = append(t.offsets, t.bytes) + return nil +} + +func (t *chunkedDocumentCoder) WriteMetaData() error { + var err error + var wn, n int + // write chunk offsets + for _, offset := range t.offsets { + n = binary.PutUvarint(t.metaBuf, offset) + if _, err = t.w.Write(t.metaBuf[:n]); err != nil { + return err + } + wn += n + } + // write chunk offset length + err = binary.Write(t.w, binary.BigEndian, uint32(wn)) + if err != nil { + return err + } + // write chunk num + err = binary.Write(t.w, binary.BigEndian, uint32(t.Len())) + if err != nil { + return err + } + return nil +} + +func (t *chunkedDocumentCoder) Reset() { + t.compressed = t.compressed[:0] + t.offsets = t.offsets[:0] + t.n = 0 + t.bytes = 0 + t.buf.Reset() +} + +func (t *chunkedDocumentCoder) Offsets() []uint64 { + return t.offsets +} + +// Len returns chunk nums +func (t *chunkedDocumentCoder) Len() int { + return len(t.offsets) +} + +// BufferSize returns buffer len +func (t *chunkedDocumentCoder) BufferSize() int { + return t.buf.Len() +} diff --git a/documentcoder_test.go b/documentcoder_test.go new file mode 100644 index 0000000..f1c4f73 --- /dev/null +++ b/documentcoder_test.go @@ -0,0 +1 @@ +package ice diff --git a/load.go b/load.go index c20a8cb..7d5d53d 100644 --- a/load.go +++ b/load.go @@ -55,7 +55,7 @@ func load(data *segment.Data) (*Segment, error) { return nil, err } - err = rv.loadStoredFieldTrunk() + err = rv.loadStoredFieldChunk() if err != nil { return nil, err } @@ -134,24 +134,38 @@ func (s *Segment) loadFields() error { return nil } -func (s *Segment) loadStoredFieldTrunk() error { - // read trunk num - trunkOffsetPos := int(s.footer.storedIndexOffset - 4) // uint32 - trunkData, err := s.data.Read(trunkOffsetPos, trunkOffsetPos+4) +// loadStoredFieldChunk load storedField chunk offsets +func (s *Segment) loadStoredFieldChunk() error { + // read chunk num + chunkOffsetPos := int(s.footer.storedIndexOffset - 4) // uint32 + chunkData, err := s.data.Read(chunkOffsetPos, chunkOffsetPos+4) if err != nil { return err } - trunkNum := binary.BigEndian.Uint32(trunkData) - // read trunk offsets - trunkOffsetPos -= 8 * int(trunkNum) - trunkData, err = s.data.Read(trunkOffsetPos, trunkOffsetPos+int(8*trunkNum)) + chunkNum := binary.BigEndian.Uint32(chunkData) + chunkOffsetPos -= 4 + // read chunk offsets length + chunkData, err = s.data.Read(chunkOffsetPos, chunkOffsetPos+4) if err != nil { return err } - s.storedFieldTrunkOffset = make(map[int]uint64, trunkNum) - for i := 0; i < int(trunkNum); i++ { - offset := binary.BigEndian.Uint64(trunkData[i*8 : i*8+8]) - s.storedFieldTrunkOffset[i] = offset + chunkOffsetsLen := binary.BigEndian.Uint32(chunkData) + // read chunk offsets + chunkOffsetPos -= int(chunkOffsetsLen) + chunkData, err = s.data.Read(chunkOffsetPos, chunkOffsetPos+int(chunkOffsetsLen)) + if err != nil { + return err + } + var offset, read int + var offsetata []byte + s.storedFieldChunkOffset = make(map[int]uint64, chunkNum) + for i := 0; i < int(chunkNum); i++ { + offsetata, err = s.data.Read(chunkOffsetPos+offset, chunkOffsetPos+offset+binary.MaxVarintLen64) + if err != nil { + return err + } + s.storedFieldChunkOffset[i], read = binary.Uvarint(offsetata) + offset += read } return nil diff --git a/merge.go b/merge.go index 727b280..ab6b0b4 100644 --- a/merge.go +++ b/merge.go @@ -639,8 +639,8 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, vdc := visitDocumentCtxPool.Get().(*visitDocumentCtx) defer visitDocumentCtxPool.Put(vdc) - // zinc trunk - trunkWriter := NewZincTrunker(w) + // document chunk coder + docChunkCoder := NewChunkedDocumentCoder(uint64(defaultDocumentChunkSize), w) // for each segment for segI, seg := range segments { @@ -657,7 +657,7 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, // segments and there are no deletions, via byte-copying // of stored docs bytes directly to the writer if fieldsSame && (dropsI == nil || dropsI.GetCardinality() == 0) { - err := seg.copyStoredDocs(newDocNum, docNumOffsets, trunkWriter) + err := seg.copyStoredDocs(newDocNum, docNumOffsets, docChunkCoder) if err != nil { return 0, nil, err } @@ -673,7 +673,7 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, var err2 error newDocNum, err2 = mergeStoredAndRemapSegment(seg, dropsI, segNewDocNums, newDocNum, &metaBuf, data, - fieldsInv, vals, vdc, fieldsMap, metaEncode, compressed, docNumOffsets, trunkWriter) + fieldsInv, vals, vdc, fieldsMap, metaEncode, compressed, docNumOffsets, docChunkCoder) if err2 != nil { return 0, nil, err2 } @@ -681,18 +681,11 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, newDocNums = append(newDocNums, segNewDocNums) } - // zinc trunk - trunkWriter.Flush() - // write chunk offsets - for _, offset := range trunkWriter.Offsets() { - err = binary.Write(w, binary.BigEndian, offset) - if err != nil { - return 0, nil, err - } + // document chunk coder + if err := docChunkCoder.Flush(); err != nil { + return 0, nil, err } - // write chunk num - err = binary.Write(w, binary.BigEndian, uint32(trunkWriter.Len())) - if err != nil { + if err := docChunkCoder.WriteMetaData(); err != nil { return 0, nil, err } @@ -713,7 +706,7 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, func mergeStoredAndRemapSegment(seg *Segment, dropsI *roaring.Bitmap, segNewDocNums []uint64, newDocNum uint64, metaBuf *bytes.Buffer, data []byte, fieldsInv []string, vals [][][]byte, vdc *visitDocumentCtx, fieldsMap map[string]uint16, metaEncode func(val uint64) (int, error), compressed []byte, docNumOffsets []uint64, - trunkWriter *zincTrunker) (uint64, error) { + docChunkCoder *chunkedDocumentCoder) (uint64, error) { // for each doc num for docNum := uint64(0); docNum < seg.footer.numDocs; docNum++ { // TODO: roaring's API limits docNums to 32-bits? @@ -756,30 +749,28 @@ func mergeStoredAndRemapSegment(seg *Segment, dropsI *roaring.Bitmap, segNewDocN metaBytes := metaBuf.Bytes() // record where we're about to start writing - docNumOffsets[newDocNum] = uint64(trunkWriter.BufferSize()) + docNumOffsets[newDocNum] = uint64(docChunkCoder.BufferSize()) // write out the meta len and compressed data len - err = writeUvarints(trunkWriter, + err = writeUvarints(docChunkCoder, uint64(len(metaBytes)), uint64(len(compressed))) if err != nil { return 0, err } // now write the meta - // _, err = w.Write(metaBytes) - _, err = trunkWriter.Write(metaBytes) + _, err = docChunkCoder.Write(metaBytes) if err != nil { return 0, err } // now write the compressed data - // _, err = w.Write(compressed) - _, err = trunkWriter.Write(data) + _, err = docChunkCoder.Write(data) if err != nil { return 0, err } - // trunk line - if err := trunkWriter.NewLine(); err != nil { + // document chunk line + if err := docChunkCoder.NewLine(); err != nil { return 0, err } @@ -791,20 +782,20 @@ func mergeStoredAndRemapSegment(seg *Segment, dropsI *roaring.Bitmap, segNewDocN // copyStoredDocs writes out a segment's stored doc info, optimized by // using a single Write() call for the entire set of bytes. The // newDocNumOffsets is filled with the new offsets for each doc. -func (s *Segment) copyStoredDocs(newDocNum uint64, newDocNumOffsets []uint64, myTrunk *zincTrunker) error { +func (s *Segment) copyStoredDocs(newDocNum uint64, newDocNumOffsets []uint64, docChunkCoder *chunkedDocumentCoder) error { if s.footer.numDocs <= 0 { return nil } - // visit documents and rewrite to trunk + // visit documents and rewrite to chunk uncompressed := make([]byte, 0) - for i := 0; i < len(s.storedFieldTrunkOffset)-1; i++ { - trunkOffstart := s.storedFieldTrunkOffset[i] - trunkOffend := s.storedFieldTrunkOffset[i+1] - if trunkOffstart == trunkOffend { + for i := 0; i < len(s.storedFieldChunkOffset)-1; i++ { + chunkOffstart := s.storedFieldChunkOffset[i] + chunkOffend := s.storedFieldChunkOffset[i+1] + if chunkOffstart == chunkOffend { continue } - compressed, err := s.data.Read(int(trunkOffstart), int(trunkOffend)) + compressed, err := s.data.Read(int(chunkOffstart), int(chunkOffend)) if err != nil { return err } @@ -823,50 +814,16 @@ func (s *Segment) copyStoredDocs(newDocNum uint64, newDocNumOffsets []uint64, my dataLenData := uncompressed[storedOffset+n : storedOffset+n+int(binary.MaxVarintLen64)] dataLen, read := binary.Uvarint(dataLenData) n += read - newDocNumOffsets[newDocNum] = uint64(myTrunk.BufferSize()) - myTrunk.Write(uncompressed[storedOffset : storedOffset+n+int(metaLen)+int(dataLen)]) - myTrunk.NewLine() + newDocNumOffsets[newDocNum] = uint64(docChunkCoder.BufferSize()) + docChunkCoder.Write(uncompressed[storedOffset : storedOffset+n+int(metaLen)+int(dataLen)]) + if err := docChunkCoder.NewLine(); err != nil { + return err + } storedOffset += n + int(metaLen+dataLen) newDocNum++ } } - // indexOffset0, storedOffset0, err := s.getDocStoredOffsetsOnly(0) // the segment's first doc - // if err != nil { - // return err - // } - - // indexOffsetN, storedOffsetN, readN, metaLenN, dataLenN, err := - // s.getDocStoredOffsets(s.footer.numDocs - 1) // the segment's last doc - // if err != nil { - // return err - // } - - // storedOffset0New := uint64(w.Count()) - - // storedBytesData, err := s.data.Read(int(storedOffset0), int(storedOffsetN+readN+metaLenN+dataLenN)) - // if err != nil { - // return err - // } - // storedBytes := storedBytesData - // _, err = w.Write(storedBytes) - // if err != nil { - // return err - // } - - // // remap the storedOffset's for the docs into new offsets relative - // // to storedOffset0New, filling the given docNumOffsetsOut array - // for indexOffset := indexOffset0; indexOffset <= indexOffsetN; indexOffset += fileAddrWidth { - // storedOffsetData, err := s.data.Read(int(indexOffset), int(indexOffset+fileAddrWidth)) - // if err != nil { - // return err - // } - // storedOffset := binary.BigEndian.Uint64(storedOffsetData) - // storedOffsetNew := storedOffset - storedOffset0 + storedOffset0New - // newDocNumOffsets[newDocNum] = storedOffsetNew // PANIC - // newDocNum++ - // } - return nil } diff --git a/new.go b/new.go index 1811803..d0036f7 100644 --- a/new.go +++ b/new.go @@ -568,8 +568,8 @@ func (s *interim) writeStoredFields() ( // keyed by fieldID, for the current doc in the loop docStoredFields := map[uint16]interimStoredField{} - // zinc trunk - trunkWriter := NewZincTrunker(s.w) + // document chunk coder + docChunkCoder := NewChunkedDocumentCoder(uint64(defaultDocumentChunkSize), s.w) for docNum, result := range s.results { for fieldID := range docStoredFields { // reset for next doc @@ -609,46 +609,36 @@ func (s *interim) writeStoredFields() ( } metaBytes := s.metaBuf.Bytes() - docStoredOffsets[docNum] = uint64(trunkWriter.BufferSize()) + docStoredOffsets[docNum] = uint64(docChunkCoder.BufferSize()) - err = writeUvarints(trunkWriter, + err = writeUvarints(docChunkCoder, uint64(len(metaBytes)), uint64(len(data))) if err != nil { return 0, err } - // _, err = s.w.Write(metaBytes) - _, err = trunkWriter.Write(metaBytes) + _, err = docChunkCoder.Write(metaBytes) if err != nil { return 0, err } - // _, err = s.w.Write(compressed) - _, err = trunkWriter.Write(data) + _, err = docChunkCoder.Write(data) if err != nil { return 0, err } - // trunk line - if err := trunkWriter.NewLine(); err != nil { + // document chunk line + if err := docChunkCoder.NewLine(); err != nil { return 0, err } - } - // zinc trunk - trunkWriter.Flush() - // write chunk offsets - for _, offset := range trunkWriter.Offsets() { - err = binary.Write(s.w, binary.BigEndian, offset) - if err != nil { - return 0, err - } + // document chunk coder + if err := docChunkCoder.Flush(); err != nil { + return 0, err } - // write chunk num - err = binary.Write(s.w, binary.BigEndian, uint32(trunkWriter.Len())) - if err != nil { + if err := docChunkCoder.WriteMetaData(); err != nil { return 0, err } diff --git a/read.go b/read.go index 62e5239..0d1916c 100644 --- a/read.go +++ b/read.go @@ -24,63 +24,45 @@ func (s *Segment) getDocStoredMetaAndCompressed(docNum uint64) (meta, data []byt return nil, nil, err } - // zinc trunk - trunI := docNum / uint64(zincTrunkerSize) - trunkOffstart := s.storedFieldTrunkOffset[int(trunI)] - trunkOffend := s.storedFieldTrunkOffset[int(trunI)+1] - compressed, err := s.data.Read(int(trunkOffstart), int(trunkOffend)) - if err != nil { - return nil, nil, err - } - s.storedFieldTrunkUncompressed, err = ZSTDDecompress(s.storedFieldTrunkUncompressed[:cap(s.storedFieldTrunkUncompressed)], compressed) - if err != nil { - return nil, nil, err - } - - meta = s.storedFieldTrunkUncompressed[int(storedOffset+n):int(storedOffset+n+metaLen)] - data = s.storedFieldTrunkUncompressed[int(storedOffset+n+metaLen):int(storedOffset+n+metaLen+dataLen)] - + meta = s.storedFieldChunkUncompressed[int(storedOffset+n):int(storedOffset+n+metaLen)] + data = s.storedFieldChunkUncompressed[int(storedOffset+n+metaLen):int(storedOffset+n+metaLen+dataLen)] return meta, data, nil } -func (s *Segment) getDocStoredOffsets(docNum uint64) ( - indexOffset, storedOffset, n, metaLen, dataLen uint64, err error) { - indexOffset = s.footer.storedIndexOffset + (fileAddrWidth * docNum) - - storedOffsetData, err := s.data.Read(int(indexOffset), int(indexOffset+fileAddrWidth)) +func (s *Segment) getDocStoredOffsets(docNum uint64) (indexOffset, storedOffset, n, metaLen, dataLen uint64, err error) { + indexOffset, storedOffset, err = s.getDocStoredOffsetsOnly(docNum) if err != nil { return 0, 0, 0, 0, 0, err } - storedOffset = binary.BigEndian.Uint64(storedOffsetData) - // zinc trunk - trunI := docNum / uint64(zincTrunkerSize) - trunkOffsetStart := s.storedFieldTrunkOffset[int(trunI)] - trunkOffsetEnd := s.storedFieldTrunkOffset[int(trunI)+1] - compressed, err := s.data.Read(int(trunkOffsetStart), int(trunkOffsetEnd)) + // document chunk coder + trunI := docNum / uint64(defaultDocumentChunkSize) + chunkOffsetStart := s.storedFieldChunkOffset[int(trunI)] + chunkOffsetEnd := s.storedFieldChunkOffset[int(trunI)+1] + compressed, err := s.data.Read(int(chunkOffsetStart), int(chunkOffsetEnd)) if err != nil { return 0, 0, 0, 0, 0, err } - s.storedFieldTrunkUncompressed, err = ZSTDDecompress(s.storedFieldTrunkUncompressed[:cap(s.storedFieldTrunkUncompressed)], compressed) + s.storedFieldChunkUncompressed = s.storedFieldChunkUncompressed[:0] + s.storedFieldChunkUncompressed, err = ZSTDDecompress(s.storedFieldChunkUncompressed[:cap(s.storedFieldChunkUncompressed)], compressed) if err != nil { return 0, 0, 0, 0, 0, err } - metaLenData := s.storedFieldTrunkUncompressed[int(storedOffset):int(storedOffset+binary.MaxVarintLen64)] + metaLenData := s.storedFieldChunkUncompressed[int(storedOffset):int(storedOffset+binary.MaxVarintLen64)] var read int metaLen, read = binary.Uvarint(metaLenData) n += uint64(read) - dataLenData := s.storedFieldTrunkUncompressed[int(storedOffset+n):int(storedOffset+n+binary.MaxVarintLen64)] + dataLenData := s.storedFieldChunkUncompressed[int(storedOffset+n):int(storedOffset+n+binary.MaxVarintLen64)] dataLen, read = binary.Uvarint(dataLenData) n += uint64(read) return indexOffset, storedOffset, n, metaLen, dataLen, nil } -func (s *Segment) getDocStoredOffsetsOnly(docNum int) (indexOffset, storedOffset uint64, err error) { +func (s *Segment) getDocStoredOffsetsOnly(docNum uint64) (indexOffset, storedOffset uint64, err error) { indexOffset = s.footer.storedIndexOffset + (fileAddrWidth * uint64(docNum)) - storedOffsetData, err := s.data.Read(int(indexOffset), int(indexOffset+fileAddrWidth)) if err != nil { return 0, 0, err diff --git a/segment.go b/segment.go index b6d4af5..adc6bc4 100644 --- a/segment.go +++ b/segment.go @@ -40,8 +40,8 @@ type Segment struct { fieldDocs map[uint16]uint64 // fieldID -> # docs with value in field fieldFreqs map[uint16]uint64 // fieldID -> # total tokens in field - storedFieldTrunkOffset map[int]uint64 // stored field trunk offset - storedFieldTrunkUncompressed []byte // for cache + storedFieldChunkOffset map[int]uint64 // stored field chunk offset + storedFieldChunkUncompressed []byte // for uncompress cache dictLocs []uint64 fieldDvReaders map[uint16]*docValueReader // naive chunk cache per field diff --git a/zinc_chunker.go b/zinc_chunker.go deleted file mode 100644 index 5885347..0000000 --- a/zinc_chunker.go +++ /dev/null @@ -1,77 +0,0 @@ -package ice - -import ( - "bytes" - "io" -) - -const zincTrunkerSize = 128 - -type zincTrunker struct { - w io.Writer - buf *bytes.Buffer - n int - bytes int - compressed []byte - offsets []uint64 -} - -func NewZincTrunker(w io.Writer) *zincTrunker { - t := &zincTrunker{ - w: w, - } - t.buf = bytes.NewBuffer(nil) - t.offsets = append(t.offsets, 0) - return t -} -func (t *zincTrunker) Write(data []byte) (int, error) { - return t.buf.Write(data) -} - -func (t *zincTrunker) NewLine() error { - t.n++ - if t.n%zincTrunkerSize != 0 { - return nil - } - return t.Flush() -} - -func (t *zincTrunker) Flush() error { - if t.buf.Len() > 0 { - var err error - t.compressed, err = ZSTDCompress(t.compressed[:cap(t.compressed)], t.buf.Bytes(), 3) - if err != nil { - return err - } - n, err := t.w.Write(t.compressed) - if err != nil { - return err - } - t.bytes += n - t.buf.Reset() - } - t.offsets = append(t.offsets, uint64(t.bytes)) - return nil -} - -func (t *zincTrunker) Reset() { - t.compressed = t.compressed[:0] - t.offsets = t.offsets[:0] - t.n = 0 - t.bytes = 0 - t.buf.Reset() -} - -func (t *zincTrunker) Offsets() []uint64 { - return t.offsets -} - -// Len returns trunk nums -func (t *zincTrunker) Len() int { - return len(t.offsets) -} - -// BufferSize returns buffer len -func (t *zincTrunker) BufferSize() int { - return t.buf.Len() -} diff --git a/zstd.go b/zstd.go index 1381085..8e08636 100644 --- a/zstd.go +++ b/zstd.go @@ -35,7 +35,9 @@ func ZSTDDecompress(dst, src []byte) ([]byte, error) { decOnce.Do(func() { var err error decoder, err = zstd.NewReader(nil) - Check(err) + if err != nil { + log.Fatalf("%+v", err) + } }) return decoder.DecodeAll(src, dst[:0]) } @@ -46,7 +48,9 @@ func ZSTDCompress(dst, src []byte, compressionLevel int) ([]byte, error) { var err error level := zstd.EncoderLevelFromZstd(compressionLevel) encoder, err = zstd.NewWriter(nil, zstd.WithEncoderLevel(level)) - Check(err) + if err != nil { + log.Fatalf("%+v", err) + } }) return encoder.EncodeAll(src, dst[:0]), nil } @@ -63,10 +67,3 @@ func ZSTDCompressBound(srcSize int) int { } return srcSize + (srcSize >> 8) + margin } - -// Check logs fatal if err != nil. -func Check(err error) { - if err != nil { - log.Fatalf("%+v", err) - } -} From 79196b64e1223ed65200050d40053dace40da1c1 Mon Sep 17 00:00:00 2001 From: yanghengfei Date: Tue, 3 May 2022 19:45:16 +0800 Subject: [PATCH 13/23] doc: add author --- AUTHORS | 1 + 1 file changed, 1 insertion(+) diff --git a/AUTHORS b/AUTHORS index 4ee2adc..57b6b27 100644 --- a/AUTHORS +++ b/AUTHORS @@ -8,3 +8,4 @@ # Please keep the list sorted. Marty Schoch +Hengfei Yang From 9bed194792c942e1850cf01e50ec1fe3f11388cc Mon Sep 17 00:00:00 2001 From: yanghengfei Date: Tue, 3 May 2022 21:09:33 +0800 Subject: [PATCH 14/23] style: change implement --- documentcoder.go | 54 +++++++++++++++++++++++++++++++++--------------- merge.go | 35 +++++++------------------------ new.go | 27 +++--------------------- 3 files changed, 47 insertions(+), 69 deletions(-) diff --git a/documentcoder.go b/documentcoder.go index 25e1ec7..8223bb2 100644 --- a/documentcoder.go +++ b/documentcoder.go @@ -30,19 +30,44 @@ func NewChunkedDocumentCoder(chunkSize uint64, w io.Writer) *chunkedDocumentCode return t } -func (t *chunkedDocumentCoder) Write(data []byte) (int, error) { +func (t *chunkedDocumentCoder) Add(docNum uint64, meta, data []byte) (int, error) { + var wn, n int + var err error + n = binary.PutUvarint(t.metaBuf, uint64(len(meta))) + if n, err = t.writeToBuf(t.metaBuf[:n]); err != nil { + return 0, err + } + wn += n + n = binary.PutUvarint(t.metaBuf, uint64(len(data))) + if n, err = t.writeToBuf(t.metaBuf[:n]); err != nil { + return 0, err + } + wn += n + if n, err = t.writeToBuf(meta); err != nil { + return 0, err + } + wn += n + if n, err = t.writeToBuf(data); err != nil { + return 0, err + } + wn += n + + return wn, t.newLine() +} + +func (t *chunkedDocumentCoder) writeToBuf(data []byte) (int, error) { return t.buf.Write(data) } -func (t *chunkedDocumentCoder) NewLine() error { +func (t *chunkedDocumentCoder) newLine() error { t.n++ if t.n%t.chunkSize != 0 { return nil } - return t.Flush() + return t.flush() } -func (t *chunkedDocumentCoder) Flush() error { +func (t *chunkedDocumentCoder) flush() error { if t.buf.Len() > 0 { var err error t.compressed, err = ZSTDCompress(t.compressed[:cap(t.compressed)], t.buf.Bytes(), 3) @@ -60,9 +85,13 @@ func (t *chunkedDocumentCoder) Flush() error { return nil } -func (t *chunkedDocumentCoder) WriteMetaData() error { +func (t *chunkedDocumentCoder) Write() error { var err error var wn, n int + // flush + if err = t.flush(); err != nil { + return err + } // write chunk offsets for _, offset := range t.offsets { n = binary.PutUvarint(t.metaBuf, offset) @@ -77,7 +106,7 @@ func (t *chunkedDocumentCoder) WriteMetaData() error { return err } // write chunk num - err = binary.Write(t.w, binary.BigEndian, uint32(t.Len())) + err = binary.Write(t.w, binary.BigEndian, uint32(len(t.offsets))) if err != nil { return err } @@ -92,16 +121,7 @@ func (t *chunkedDocumentCoder) Reset() { t.buf.Reset() } -func (t *chunkedDocumentCoder) Offsets() []uint64 { - return t.offsets -} - -// Len returns chunk nums -func (t *chunkedDocumentCoder) Len() int { - return len(t.offsets) -} - // BufferSize returns buffer len -func (t *chunkedDocumentCoder) BufferSize() int { - return t.buf.Len() +func (t *chunkedDocumentCoder) BufferSize() uint64 { + return uint64(t.buf.Len()) } diff --git a/merge.go b/merge.go index ab6b0b4..ef70754 100644 --- a/merge.go +++ b/merge.go @@ -682,10 +682,7 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, } // document chunk coder - if err := docChunkCoder.Flush(); err != nil { - return 0, nil, err - } - if err := docChunkCoder.WriteMetaData(); err != nil { + if err := docChunkCoder.Write(); err != nil { return 0, nil, err } @@ -749,28 +746,9 @@ func mergeStoredAndRemapSegment(seg *Segment, dropsI *roaring.Bitmap, segNewDocN metaBytes := metaBuf.Bytes() // record where we're about to start writing - docNumOffsets[newDocNum] = uint64(docChunkCoder.BufferSize()) - - // write out the meta len and compressed data len - err = writeUvarints(docChunkCoder, - uint64(len(metaBytes)), - uint64(len(compressed))) - if err != nil { - return 0, err - } - // now write the meta - _, err = docChunkCoder.Write(metaBytes) - if err != nil { - return 0, err - } - // now write the compressed data - _, err = docChunkCoder.Write(data) - if err != nil { - return 0, err - } - + docNumOffsets[newDocNum] = docChunkCoder.BufferSize() // document chunk line - if err := docChunkCoder.NewLine(); err != nil { + if _, err := docChunkCoder.Add(newDocNum, metaBytes, data); err != nil { return 0, err } @@ -814,9 +792,10 @@ func (s *Segment) copyStoredDocs(newDocNum uint64, newDocNumOffsets []uint64, do dataLenData := uncompressed[storedOffset+n : storedOffset+n+int(binary.MaxVarintLen64)] dataLen, read := binary.Uvarint(dataLenData) n += read - newDocNumOffsets[newDocNum] = uint64(docChunkCoder.BufferSize()) - docChunkCoder.Write(uncompressed[storedOffset : storedOffset+n+int(metaLen)+int(dataLen)]) - if err := docChunkCoder.NewLine(); err != nil { + newDocNumOffsets[newDocNum] = docChunkCoder.BufferSize() + metaBytes := uncompressed[storedOffset+n : storedOffset+n+int(metaLen)] + data := uncompressed[storedOffset+n+int(metaLen) : storedOffset+n+int(metaLen)+int(dataLen)] + if _, err := docChunkCoder.Add(newDocNum, metaBytes, data); err != nil { return err } storedOffset += n + int(metaLen+dataLen) diff --git a/new.go b/new.go index d0036f7..7b730ca 100644 --- a/new.go +++ b/new.go @@ -609,36 +609,15 @@ func (s *interim) writeStoredFields() ( } metaBytes := s.metaBuf.Bytes() - docStoredOffsets[docNum] = uint64(docChunkCoder.BufferSize()) - - err = writeUvarints(docChunkCoder, - uint64(len(metaBytes)), - uint64(len(data))) - if err != nil { - return 0, err - } - - _, err = docChunkCoder.Write(metaBytes) - if err != nil { - return 0, err - } - - _, err = docChunkCoder.Write(data) + docStoredOffsets[docNum] = docChunkCoder.BufferSize() + _, err = docChunkCoder.Add(uint64(docNum), metaBytes, data) if err != nil { return 0, err } - - // document chunk line - if err := docChunkCoder.NewLine(); err != nil { - return 0, err - } } // document chunk coder - if err := docChunkCoder.Flush(); err != nil { - return 0, err - } - if err := docChunkCoder.WriteMetaData(); err != nil { + if err := docChunkCoder.Write(); err != nil { return 0, err } From 4f5e6c0e7a023f48d9bc6bf8e092f9e8000eaa79 Mon Sep 17 00:00:00 2001 From: yanghengfei Date: Tue, 3 May 2022 21:20:44 +0800 Subject: [PATCH 15/23] fix: tests --- contentcoder_test.go | 22 +++++++++++++--------- intcoder_test.go | 15 +++++++++++---- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/contentcoder_test.go b/contentcoder_test.go index 1b7fd31..c1db8c9 100644 --- a/contentcoder_test.go +++ b/contentcoder_test.go @@ -33,10 +33,12 @@ func TestChunkedContentCoder(t *testing.T) { docNums: []uint64{0}, vals: [][]byte{[]byte("bluge")}, // 1 chunk, chunk-0 length 11(b), value - expected: []byte{0x1, 0x0, 0x5, 0x5, 0x10, 'b', 'l', 'u', 'g', 'e', - 0xa, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, + expected: []byte{ + 0x1, 0x0, 0x5, 0x28, 0xb5, 0x2f, 0xfd, 0x4, 0x0, 0x29, 0x0, 0x0, + 'b', 'l', 'u', 'g', 'e', + 0x7e, 0xde, 0xed, 0x4a, 0x15, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, + }, }, { maxDocNum: 1, @@ -47,11 +49,13 @@ func TestChunkedContentCoder(t *testing.T) { []byte("scorch"), }, - expected: []byte{0x1, 0x0, 0x6, 0x6, 0x14, 0x75, 0x70, 0x73, 0x69, 0x64, - 0x65, 0x1, 0x1, 0x6, 0x6, 0x14, 0x73, 0x63, 0x6f, 0x72, 0x63, 0x68, - 0xb, 0x16, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2}, + expected: []byte{ + 0x1, 0x0, 0x6, 0x28, 0xb5, 0x2f, 0xfd, 0x4, 0x0, 0x31, 0x0, 0x0, + 0x75, 0x70, 0x73, 0x69, 0x64, 0x65, 0x35, 0x89, 0x5a, 0xd, + 0x1, 0x1, 0x6, 0x28, 0xb5, 0x2f, 0xfd, 0x4, 0x0, 0x31, 0x0, 0x0, + 0x73, 0x63, 0x6f, 0x72, 0x63, 0x68, 0xc4, 0x46, 0x89, 0x39, 0x16, 0x2c, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, + }, }, } diff --git a/intcoder_test.go b/intcoder_test.go index 044194c..7f91e1b 100644 --- a/intcoder_test.go +++ b/intcoder_test.go @@ -35,8 +35,11 @@ func TestChunkIntCoder(t *testing.T) { vals: [][]uint64{ {3}, }, - // 1 chunk, chunk-0 length 1, value 3 - expected: []byte{0x1, 0x1, 0x3}, + // 1 chunk, chunk-0 length 1, value 3, but compressed + expected: []byte{ + 0x1, 0xe, 0x28, 0xb5, 0x2f, 0xfd, 0x4, 0x0, 0x9, + 0x0, 0x0, 0x3, 0xb6, 0x4b, 0x1f, 0xbc, + }, }, { maxDocNum: 1, @@ -46,8 +49,12 @@ func TestChunkIntCoder(t *testing.T) { {3}, {7}, }, - // 2 chunks, chunk-0 offset 1, chunk-1 offset 2, value 3, value 7 - expected: []byte{0x2, 0x1, 0x2, 0x3, 0x7}, + // 2 chunks, chunk-0 offset 1, chunk-1 offset 2, value 3, value 7, but compressed + expected: []byte{ + 0x2, 0xe, 0x1c, 0x28, 0xb5, 0x2f, 0xfd, 0x4, 0x0, 0x9, + 0x0, 0x0, 0x3, 0xb6, 0x4b, 0x1f, 0xbc, 0x28, 0xb5, 0x2f, 0xfd, 0x4, 0x0, 0x9, + 0x0, 0x0, 0x7, 0xb7, 0xbb, 0x58, 0xe8, + }, }, } From 929ccaff223876a8796bb8030658d30f91b33c6a Mon Sep 17 00:00:00 2001 From: yanghengfei Date: Tue, 3 May 2022 21:49:33 +0800 Subject: [PATCH 16/23] test: add test for document coder --- contentcoder_test.go | 6 +- documentcoder.go | 17 ++++-- documentcoder_test.go | 126 ++++++++++++++++++++++++++++++++++++++++++ merge.go | 5 +- new.go | 5 +- 5 files changed, 149 insertions(+), 10 deletions(-) diff --git a/contentcoder_test.go b/contentcoder_test.go index c1db8c9..8a4ac79 100644 --- a/contentcoder_test.go +++ b/contentcoder_test.go @@ -65,7 +65,7 @@ func TestChunkedContentCoder(t *testing.T) { for i, docNum := range test.docNums { err := cic.Add(docNum, test.vals[i]) if err != nil { - t.Fatalf("error adding to intcoder: %v", err) + t.Fatalf("error adding to contentcoder: %v", err) } } _ = cic.Close() @@ -102,11 +102,11 @@ func TestChunkedContentCoders(t *testing.T) { for i, docNum := range docNums { err := cic1.Add(docNum, vals[i]) if err != nil { - t.Fatalf("error adding to intcoder: %v", err) + t.Fatalf("error adding to contentcoder: %v", err) } err = cic2.Add(docNum, vals[i]) if err != nil { - t.Fatalf("error adding to intcoder: %v", err) + t.Fatalf("error adding to contentcoder: %v", err) } } _ = cic1.Close() diff --git a/documentcoder.go b/documentcoder.go index 8223bb2..48b7956 100644 --- a/documentcoder.go +++ b/documentcoder.go @@ -19,7 +19,7 @@ type chunkedDocumentCoder struct { offsets []uint64 } -func NewChunkedDocumentCoder(chunkSize uint64, w io.Writer) *chunkedDocumentCoder { +func newChunkedDocumentCoder(chunkSize uint64, w io.Writer) *chunkedDocumentCoder { t := &chunkedDocumentCoder{ chunkSize: chunkSize, w: w, @@ -88,10 +88,6 @@ func (t *chunkedDocumentCoder) flush() error { func (t *chunkedDocumentCoder) Write() error { var err error var wn, n int - // flush - if err = t.flush(); err != nil { - return err - } // write chunk offsets for _, offset := range t.offsets { n = binary.PutUvarint(t.metaBuf, offset) @@ -113,6 +109,12 @@ func (t *chunkedDocumentCoder) Write() error { return nil } +// Close indicates you are done calling Add() this allows +// the final chunk to be encoded. +func (c *chunkedDocumentCoder) Close() error { + return c.flush() +} + func (t *chunkedDocumentCoder) Reset() { t.compressed = t.compressed[:0] t.offsets = t.offsets[:0] @@ -125,3 +127,8 @@ func (t *chunkedDocumentCoder) Reset() { func (t *chunkedDocumentCoder) BufferSize() uint64 { return uint64(t.buf.Len()) } + +// Len returns trunks num +func (t *chunkedDocumentCoder) Len() int { + return len(t.offsets) +} diff --git a/documentcoder_test.go b/documentcoder_test.go index f1c4f73..e8630ca 100644 --- a/documentcoder_test.go +++ b/documentcoder_test.go @@ -1 +1,127 @@ package ice + +import ( + "bytes" + "testing" +) + +func TestChunkedDocumentCoder(t *testing.T) { + tests := []struct { + chunkSize uint64 + docNums []uint64 + metas [][]byte + datas [][]byte + expected []byte + expectedTrunkNum int + }{ + { + chunkSize: 1, + docNums: []uint64{0}, + metas: [][]byte{{0}}, + datas: [][]byte{[]byte("bluge")}, + expected: []byte{ + 0x28, 0xb5, 0x2f, 0xfd, 0x4, 0x0, 0x41, + 0x0, 0x0, 0x1, 0x5, 0x0, 0x62, 0x6c, 0x75, 0x67, 0x65, 0x2b, 0x30, 0x97, 0x33, 0x0, 0x15, 0x15, + 0x0, 0x0, 0x0, 0x3, 0x0, 0x0, 0x0, 0x3, + }, + expectedTrunkNum: 3, // left, trunk, right + }, + { + chunkSize: 1, + docNums: []uint64{0, 1}, + metas: [][]byte{{0}, {1}}, + datas: [][]byte{[]byte("upside"), []byte("scorch")}, + expected: []byte{ + 0x28, 0xb5, 0x2f, 0xfd, 0x4, 0x0, 0x49, + 0x0, 0x0, 0x1, 0x6, 0x0, 0x75, 0x70, 0x73, 0x69, 0x64, 0x65, + 0x36, 0x6e, 0x7e, 0x39, 0x28, 0xb5, 0x2f, 0xfd, 0x4, 0x0, 0x49, + 0x0, 0x0, 0x1, 0x6, 0x1, 0x73, 0x63, 0x6f, 0x72, 0x63, 0x68, + 0x8f, 0x83, 0xa3, 0x37, 0x0, 0x16, 0x2c, 0x2c, + 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x4, + }, + expectedTrunkNum: 4, // left, trunk, trunk, right + }, + } + + for _, test := range tests { + var actual bytes.Buffer + cic := newChunkedDocumentCoder(test.chunkSize, &actual) + for i, docNum := range test.docNums { + _, err := cic.Add(docNum, test.metas[i], test.datas[i]) + if err != nil { + t.Fatalf("error adding to documentcoder: %v", err) + } + } + _ = cic.Close() + err := cic.Write() + if err != nil { + t.Fatalf("error writing: %v", err) + } + if !bytes.Equal(test.expected, actual.Bytes()) { + t.Errorf("got:%s, expected:%s", actual.String(), string(test.expected)) + } + if test.expectedTrunkNum != cic.Len() { + t.Errorf("got:%d, expected:%d", cic.Len(), test.expectedTrunkNum) + } + } +} + +func TestChunkedDocumentCoders(t *testing.T) { + chunkSize := uint64(2) + docNums := []uint64{0, 1, 2, 3, 4, 5} + metas := [][]byte{ + {0}, + {1}, + {2}, + {3}, + {4}, + {5}, + } + datas := [][]byte{ + []byte("scorch"), + []byte("does"), + []byte("better"), + []byte("than"), + []byte("upside"), + []byte("down"), + } + trunkNum := 5 // left, trunk, trunk, trunk, right + + var actual1, actual2 bytes.Buffer + // chunkedDocumentCoder that writes out at the end + cic1 := newChunkedDocumentCoder(chunkSize, &actual1) + // chunkedContentCoder that writes out in chunks + cic2 := newChunkedDocumentCoder(chunkSize, &actual2) + + for i, docNum := range docNums { + _, err := cic1.Add(docNum, metas[i], datas[i]) + if err != nil { + t.Fatalf("error adding to documentcoder: %v", err) + } + _, err = cic2.Add(docNum, metas[i], datas[i]) + if err != nil { + t.Fatalf("error adding to documentcoder: %v", err) + } + } + _ = cic1.Close() + _ = cic2.Close() + + err := cic1.Write() + if err != nil { + t.Fatalf("error writing: %v", err) + } + err = cic2.Write() + if err != nil { + t.Fatalf("error writing: %v", err) + } + + if !bytes.Equal(actual1.Bytes(), actual2.Bytes()) { + t.Errorf("%s != %s", actual1.String(), actual2.String()) + } + if trunkNum != cic1.Len() { + t.Errorf("got:%d, expected:%d", cic1.Len(), trunkNum) + } + if trunkNum != cic2.Len() { + t.Errorf("got:%d, expected:%d", cic2.Len(), trunkNum) + } +} diff --git a/merge.go b/merge.go index ef70754..2463888 100644 --- a/merge.go +++ b/merge.go @@ -640,7 +640,7 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, defer visitDocumentCtxPool.Put(vdc) // document chunk coder - docChunkCoder := NewChunkedDocumentCoder(uint64(defaultDocumentChunkSize), w) + docChunkCoder := newChunkedDocumentCoder(uint64(defaultDocumentChunkSize), w) // for each segment for segI, seg := range segments { @@ -682,6 +682,9 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, } // document chunk coder + if err := docChunkCoder.Close(); err != nil { + return 0, nil, err + } if err := docChunkCoder.Write(); err != nil { return 0, nil, err } diff --git a/new.go b/new.go index 7b730ca..50ace00 100644 --- a/new.go +++ b/new.go @@ -569,7 +569,7 @@ func (s *interim) writeStoredFields() ( docStoredFields := map[uint16]interimStoredField{} // document chunk coder - docChunkCoder := NewChunkedDocumentCoder(uint64(defaultDocumentChunkSize), s.w) + docChunkCoder := newChunkedDocumentCoder(uint64(defaultDocumentChunkSize), s.w) for docNum, result := range s.results { for fieldID := range docStoredFields { // reset for next doc @@ -617,6 +617,9 @@ func (s *interim) writeStoredFields() ( } // document chunk coder + if err := docChunkCoder.Close(); err != nil { + return 0, err + } if err := docChunkCoder.Write(); err != nil { return 0, err } From ace50a7ef71f526c289e59d5cb464c561075f857 Mon Sep 17 00:00:00 2001 From: yanghengfei Date: Wed, 4 May 2022 12:15:39 +0800 Subject: [PATCH 17/23] style: format code --- contentcoder.go | 4 +-- documentcoder.go | 84 ++++++++++++++++++++++++------------------------ intcoder.go | 2 +- load.go | 12 +++---- merge.go | 6 ++-- new.go | 6 ++-- read.go | 4 +-- segment.go | 8 +---- sizes.go | 3 ++ zstd.go | 2 ++ 10 files changed, 64 insertions(+), 67 deletions(-) diff --git a/contentcoder.go b/contentcoder.go index be94369..6996198 100644 --- a/contentcoder.go +++ b/contentcoder.go @@ -108,7 +108,7 @@ func (c *chunkedContentCoder) flushContents() error { diffDocNum := uint64(0) diffDvOffset := uint64(0) for _, meta := range c.chunkMeta { - err := writeUvarints(&c.chunkMetaBuf, meta.DocNum-diffDocNum, meta.DocDvOffset-diffDvOffset) + err = writeUvarints(&c.chunkMetaBuf, meta.DocNum-diffDocNum, meta.DocDvOffset-diffDvOffset) if err != nil { return err } @@ -120,7 +120,7 @@ func (c *chunkedContentCoder) flushContents() error { metaData := c.chunkMetaBuf.Bytes() c.final = append(c.final, c.chunkMetaBuf.Bytes()...) // write the compressed data to the final data - c.compressed, err = ZSTDCompress(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes(), 3) + c.compressed, err = ZSTDCompress(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes(), ZSTDCompressionLevel) if err != nil { return err } diff --git a/documentcoder.go b/documentcoder.go index 48b7956..5afa779 100644 --- a/documentcoder.go +++ b/documentcoder.go @@ -20,89 +20,89 @@ type chunkedDocumentCoder struct { } func newChunkedDocumentCoder(chunkSize uint64, w io.Writer) *chunkedDocumentCoder { - t := &chunkedDocumentCoder{ + c := &chunkedDocumentCoder{ chunkSize: chunkSize, w: w, } - t.buf = bytes.NewBuffer(nil) - t.metaBuf = make([]byte, binary.MaxVarintLen64) - t.offsets = append(t.offsets, 0) - return t + c.buf = bytes.NewBuffer(nil) + c.metaBuf = make([]byte, binary.MaxVarintLen64) + c.offsets = append(c.offsets, 0) + return c } -func (t *chunkedDocumentCoder) Add(docNum uint64, meta, data []byte) (int, error) { +func (c *chunkedDocumentCoder) Add(docNum uint64, meta, data []byte) (int, error) { var wn, n int var err error - n = binary.PutUvarint(t.metaBuf, uint64(len(meta))) - if n, err = t.writeToBuf(t.metaBuf[:n]); err != nil { + n = binary.PutUvarint(c.metaBuf, uint64(len(meta))) + if n, err = c.writeToBuf(c.metaBuf[:n]); err != nil { return 0, err } wn += n - n = binary.PutUvarint(t.metaBuf, uint64(len(data))) - if n, err = t.writeToBuf(t.metaBuf[:n]); err != nil { + n = binary.PutUvarint(c.metaBuf, uint64(len(data))) + if n, err = c.writeToBuf(c.metaBuf[:n]); err != nil { return 0, err } wn += n - if n, err = t.writeToBuf(meta); err != nil { + if n, err = c.writeToBuf(meta); err != nil { return 0, err } wn += n - if n, err = t.writeToBuf(data); err != nil { + if n, err = c.writeToBuf(data); err != nil { return 0, err } wn += n - return wn, t.newLine() + return wn, c.newLine() } -func (t *chunkedDocumentCoder) writeToBuf(data []byte) (int, error) { - return t.buf.Write(data) +func (c *chunkedDocumentCoder) writeToBuf(data []byte) (int, error) { + return c.buf.Write(data) } -func (t *chunkedDocumentCoder) newLine() error { - t.n++ - if t.n%t.chunkSize != 0 { +func (c *chunkedDocumentCoder) newLine() error { + c.n++ + if c.n%c.chunkSize != 0 { return nil } - return t.flush() + return c.flush() } -func (t *chunkedDocumentCoder) flush() error { - if t.buf.Len() > 0 { +func (c *chunkedDocumentCoder) flush() error { + if c.buf.Len() > 0 { var err error - t.compressed, err = ZSTDCompress(t.compressed[:cap(t.compressed)], t.buf.Bytes(), 3) + c.compressed, err = ZSTDCompress(c.compressed[:cap(c.compressed)], c.buf.Bytes(), ZSTDCompressionLevel) if err != nil { return err } - n, err := t.w.Write(t.compressed) + n, err := c.w.Write(c.compressed) if err != nil { return err } - t.bytes += uint64(n) - t.buf.Reset() + c.bytes += uint64(n) + c.buf.Reset() } - t.offsets = append(t.offsets, t.bytes) + c.offsets = append(c.offsets, c.bytes) return nil } -func (t *chunkedDocumentCoder) Write() error { +func (c *chunkedDocumentCoder) Write() error { var err error var wn, n int // write chunk offsets - for _, offset := range t.offsets { - n = binary.PutUvarint(t.metaBuf, offset) - if _, err = t.w.Write(t.metaBuf[:n]); err != nil { + for _, offset := range c.offsets { + n = binary.PutUvarint(c.metaBuf, offset) + if _, err = c.w.Write(c.metaBuf[:n]); err != nil { return err } wn += n } // write chunk offset length - err = binary.Write(t.w, binary.BigEndian, uint32(wn)) + err = binary.Write(c.w, binary.BigEndian, uint32(wn)) if err != nil { return err } // write chunk num - err = binary.Write(t.w, binary.BigEndian, uint32(len(t.offsets))) + err = binary.Write(c.w, binary.BigEndian, uint32(len(c.offsets))) if err != nil { return err } @@ -115,20 +115,20 @@ func (c *chunkedDocumentCoder) Close() error { return c.flush() } -func (t *chunkedDocumentCoder) Reset() { - t.compressed = t.compressed[:0] - t.offsets = t.offsets[:0] - t.n = 0 - t.bytes = 0 - t.buf.Reset() +func (c *chunkedDocumentCoder) Reset() { + c.compressed = c.compressed[:0] + c.offsets = c.offsets[:0] + c.n = 0 + c.bytes = 0 + c.buf.Reset() } // BufferSize returns buffer len -func (t *chunkedDocumentCoder) BufferSize() uint64 { - return uint64(t.buf.Len()) +func (c *chunkedDocumentCoder) BufferSize() uint64 { + return uint64(c.buf.Len()) } // Len returns trunks num -func (t *chunkedDocumentCoder) Len() int { - return len(t.offsets) +func (c *chunkedDocumentCoder) Len() int { + return len(c.offsets) } diff --git a/intcoder.go b/intcoder.go index 5696312..9dfc104 100644 --- a/intcoder.go +++ b/intcoder.go @@ -104,7 +104,7 @@ func (c *chunkedIntCoder) Add(docNum uint64, vals ...uint64) error { // to be encoded. func (c *chunkedIntCoder) Close() error { var err error - c.compressed, err = ZSTDCompress(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes(), 3) + c.compressed, err = ZSTDCompress(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes(), ZSTDCompressionLevel) if err != nil { return err } diff --git a/load.go b/load.go index 7d5d53d..ae53349 100644 --- a/load.go +++ b/load.go @@ -137,25 +137,21 @@ func (s *Segment) loadFields() error { // loadStoredFieldChunk load storedField chunk offsets func (s *Segment) loadStoredFieldChunk() error { // read chunk num - chunkOffsetPos := int(s.footer.storedIndexOffset - 4) // uint32 - chunkData, err := s.data.Read(chunkOffsetPos, chunkOffsetPos+4) + chunkOffsetPos := int(s.footer.storedIndexOffset - uint64(sizeOfUint32)) + chunkData, err := s.data.Read(chunkOffsetPos, chunkOffsetPos+sizeOfUint32) if err != nil { return err } chunkNum := binary.BigEndian.Uint32(chunkData) - chunkOffsetPos -= 4 + chunkOffsetPos -= sizeOfUint32 // read chunk offsets length - chunkData, err = s.data.Read(chunkOffsetPos, chunkOffsetPos+4) + chunkData, err = s.data.Read(chunkOffsetPos, chunkOffsetPos+sizeOfUint32) if err != nil { return err } chunkOffsetsLen := binary.BigEndian.Uint32(chunkData) // read chunk offsets chunkOffsetPos -= int(chunkOffsetsLen) - chunkData, err = s.data.Read(chunkOffsetPos, chunkOffsetPos+int(chunkOffsetsLen)) - if err != nil { - return err - } var offset, read int var offsetata []byte s.storedFieldChunkOffset = make(map[int]uint64, chunkNum) diff --git a/merge.go b/merge.go index 2463888..b838e7e 100644 --- a/merge.go +++ b/merge.go @@ -624,7 +624,7 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, w *countHashWriter, closeCh chan struct{}) (storedIndexOffset uint64, newDocNums [][]uint64, err error) { var newDocNum uint64 - var data, compressed []byte + var data []byte var metaBuf bytes.Buffer varBuf := make([]byte, binary.MaxVarintLen64) metaEncode := func(val uint64) (int, error) { @@ -673,7 +673,7 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, var err2 error newDocNum, err2 = mergeStoredAndRemapSegment(seg, dropsI, segNewDocNums, newDocNum, &metaBuf, data, - fieldsInv, vals, vdc, fieldsMap, metaEncode, compressed, docNumOffsets, docChunkCoder) + fieldsInv, vals, vdc, fieldsMap, metaEncode, docNumOffsets, docChunkCoder) if err2 != nil { return 0, nil, err2 } @@ -705,7 +705,7 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, func mergeStoredAndRemapSegment(seg *Segment, dropsI *roaring.Bitmap, segNewDocNums []uint64, newDocNum uint64, metaBuf *bytes.Buffer, data []byte, fieldsInv []string, vals [][][]byte, vdc *visitDocumentCtx, - fieldsMap map[string]uint16, metaEncode func(val uint64) (int, error), compressed []byte, docNumOffsets []uint64, + fieldsMap map[string]uint16, metaEncode func(val uint64) (int, error), docNumOffsets []uint64, docChunkCoder *chunkedDocumentCoder) (uint64, error) { // for each doc num for docNum := uint64(0); docNum < seg.footer.numDocs; docNum++ { diff --git a/new.go b/new.go index 50ace00..3b7c14d 100644 --- a/new.go +++ b/new.go @@ -617,10 +617,12 @@ func (s *interim) writeStoredFields() ( } // document chunk coder - if err := docChunkCoder.Close(); err != nil { + err = docChunkCoder.Close() + if err != nil { return 0, err } - if err := docChunkCoder.Write(); err != nil { + err = docChunkCoder.Write() + if err != nil { return 0, err } diff --git a/read.go b/read.go index 0d1916c..f1faa6a 100644 --- a/read.go +++ b/read.go @@ -18,7 +18,7 @@ import ( "encoding/binary" ) -func (s *Segment) getDocStoredMetaAndCompressed(docNum uint64) (meta, data []byte, err error) { +func (s *Segment) getDocStoredMetaAndUnCompressed(docNum uint64) (meta, data []byte, err error) { _, storedOffset, n, metaLen, dataLen, err := s.getDocStoredOffsets(docNum) if err != nil { return nil, nil, err @@ -62,7 +62,7 @@ func (s *Segment) getDocStoredOffsets(docNum uint64) (indexOffset, storedOffset, } func (s *Segment) getDocStoredOffsetsOnly(docNum uint64) (indexOffset, storedOffset uint64, err error) { - indexOffset = s.footer.storedIndexOffset + (fileAddrWidth * uint64(docNum)) + indexOffset = s.footer.storedIndexOffset + (fileAddrWidth * docNum) storedOffsetData, err := s.data.Read(int(indexOffset), int(indexOffset+fileAddrWidth)) if err != nil { return 0, 0, err diff --git a/segment.go b/segment.go index adc6bc4..5d045b7 100644 --- a/segment.go +++ b/segment.go @@ -194,19 +194,13 @@ func (s *Segment) visitDocument(vdc *visitDocumentCtx, num uint64, visitor segment.StoredFieldVisitor) error { // first make sure this is a valid number in this segment if num < s.footer.numDocs { - meta, compressed, err := s.getDocStoredMetaAndCompressed(num) + meta, uncompressed, err := s.getDocStoredMetaAndUnCompressed(num) if err != nil { return err } vdc.reader.Reset(meta) - // uncompressed, err := ZSTDDecompress(vdc.buf[:cap(vdc.buf)], compressed) - // if err != nil { - // return err - // } - uncompressed := compressed - var keepGoing = true for keepGoing { field, err := binary.ReadUvarint(&vdc.reader) diff --git a/sizes.go b/sizes.go index e851daa..b8ade57 100644 --- a/sizes.go +++ b/sizes.go @@ -25,6 +25,8 @@ func init() { sizeOfString = int(reflect.TypeOf(str).Size()) var u16 uint16 sizeOfUint16 = int(reflect.TypeOf(u16).Size()) + var u32 uint32 + sizeOfUint32 = int(reflect.TypeOf(u32).Size()) var u64 uint64 sizeOfUint64 = int(reflect.TypeOf(u64).Size()) reflectStaticSizeSegment = int(reflect.TypeOf(Segment{}).Size()) @@ -45,6 +47,7 @@ func init() { var sizeOfPtr int var sizeOfString int var sizeOfUint16 int +var sizeOfUint32 int var sizeOfUint64 int var reflectStaticSizeSegment int var reflectStaticSizeMetaData int diff --git a/zstd.go b/zstd.go index 8e08636..6436caf 100644 --- a/zstd.go +++ b/zstd.go @@ -23,6 +23,8 @@ import ( "github.com/klauspost/compress/zstd" ) +const ZSTDCompressionLevel = 3 // 1, 3, 9 + var ( decoder *zstd.Decoder encoder *zstd.Encoder From dd60c366a0199e22d807a8ca71b1fd138799c8b3 Mon Sep 17 00:00:00 2001 From: hengfeiyang Date: Sun, 15 May 2022 10:29:47 +0800 Subject: [PATCH 18/23] update trunk to chunk --- documentcoder.go | 2 +- documentcoder_test.go | 20 ++++++++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/documentcoder.go b/documentcoder.go index 5afa779..f66c828 100644 --- a/documentcoder.go +++ b/documentcoder.go @@ -128,7 +128,7 @@ func (c *chunkedDocumentCoder) BufferSize() uint64 { return uint64(c.buf.Len()) } -// Len returns trunks num +// Len returns chunks num func (c *chunkedDocumentCoder) Len() int { return len(c.offsets) } diff --git a/documentcoder_test.go b/documentcoder_test.go index e8630ca..c0f8b17 100644 --- a/documentcoder_test.go +++ b/documentcoder_test.go @@ -12,7 +12,7 @@ func TestChunkedDocumentCoder(t *testing.T) { metas [][]byte datas [][]byte expected []byte - expectedTrunkNum int + expectedChunkNum int }{ { chunkSize: 1, @@ -24,7 +24,7 @@ func TestChunkedDocumentCoder(t *testing.T) { 0x0, 0x0, 0x1, 0x5, 0x0, 0x62, 0x6c, 0x75, 0x67, 0x65, 0x2b, 0x30, 0x97, 0x33, 0x0, 0x15, 0x15, 0x0, 0x0, 0x0, 0x3, 0x0, 0x0, 0x0, 0x3, }, - expectedTrunkNum: 3, // left, trunk, right + expectedChunkNum: 3, // left, chunk, right }, { chunkSize: 1, @@ -39,7 +39,7 @@ func TestChunkedDocumentCoder(t *testing.T) { 0x8f, 0x83, 0xa3, 0x37, 0x0, 0x16, 0x2c, 0x2c, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x4, }, - expectedTrunkNum: 4, // left, trunk, trunk, right + expectedChunkNum: 4, // left, chunk, chunk, right }, } @@ -60,8 +60,8 @@ func TestChunkedDocumentCoder(t *testing.T) { if !bytes.Equal(test.expected, actual.Bytes()) { t.Errorf("got:%s, expected:%s", actual.String(), string(test.expected)) } - if test.expectedTrunkNum != cic.Len() { - t.Errorf("got:%d, expected:%d", cic.Len(), test.expectedTrunkNum) + if test.expectedChunkNum != cic.Len() { + t.Errorf("got:%d, expected:%d", cic.Len(), test.expectedChunkNum) } } } @@ -85,7 +85,7 @@ func TestChunkedDocumentCoders(t *testing.T) { []byte("upside"), []byte("down"), } - trunkNum := 5 // left, trunk, trunk, trunk, right + chunkNum := 5 // left, chunk, chunk, chunk, right var actual1, actual2 bytes.Buffer // chunkedDocumentCoder that writes out at the end @@ -118,10 +118,10 @@ func TestChunkedDocumentCoders(t *testing.T) { if !bytes.Equal(actual1.Bytes(), actual2.Bytes()) { t.Errorf("%s != %s", actual1.String(), actual2.String()) } - if trunkNum != cic1.Len() { - t.Errorf("got:%d, expected:%d", cic1.Len(), trunkNum) + if chunkNum != cic1.Len() { + t.Errorf("got:%d, expected:%d", cic1.Len(), chunkNum) } - if trunkNum != cic2.Len() { - t.Errorf("got:%d, expected:%d", cic2.Len(), trunkNum) + if chunkNum != cic2.Len() { + t.Errorf("got:%d, expected:%d", cic2.Len(), chunkNum) } } From ab3514f7f2187f6b5b1a3540bafb2d01827c3f37 Mon Sep 17 00:00:00 2001 From: hengfeiyang Date: Sun, 15 May 2022 10:29:58 +0800 Subject: [PATCH 19/23] update sort of Authors --- AUTHORS | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/AUTHORS b/AUTHORS index 57b6b27..94eddb1 100644 --- a/AUTHORS +++ b/AUTHORS @@ -7,5 +7,5 @@ # # Please keep the list sorted. -Marty Schoch Hengfei Yang +Marty Schoch From 2f4d2fdf0e8892e01a944ad01625646f55fe0e0c Mon Sep 17 00:00:00 2001 From: hengfeiyang Date: Sun, 15 May 2022 10:43:56 +0800 Subject: [PATCH 20/23] rename BufferSize to Size and remove Close method --- documentcoder.go | 14 ++++++-------- documentcoder_test.go | 3 --- merge.go | 7 ++----- new.go | 6 +----- 4 files changed, 9 insertions(+), 21 deletions(-) diff --git a/documentcoder.go b/documentcoder.go index f66c828..1903816 100644 --- a/documentcoder.go +++ b/documentcoder.go @@ -86,6 +86,10 @@ func (c *chunkedDocumentCoder) flush() error { } func (c *chunkedDocumentCoder) Write() error { + // flush first + if err := c.flush(); err != nil { + return err + } var err error var wn, n int // write chunk offsets @@ -109,12 +113,6 @@ func (c *chunkedDocumentCoder) Write() error { return nil } -// Close indicates you are done calling Add() this allows -// the final chunk to be encoded. -func (c *chunkedDocumentCoder) Close() error { - return c.flush() -} - func (c *chunkedDocumentCoder) Reset() { c.compressed = c.compressed[:0] c.offsets = c.offsets[:0] @@ -123,8 +121,8 @@ func (c *chunkedDocumentCoder) Reset() { c.buf.Reset() } -// BufferSize returns buffer len -func (c *chunkedDocumentCoder) BufferSize() uint64 { +// Size returns buffer size of current chunk +func (c *chunkedDocumentCoder) Size() uint64 { return uint64(c.buf.Len()) } diff --git a/documentcoder_test.go b/documentcoder_test.go index c0f8b17..3067a29 100644 --- a/documentcoder_test.go +++ b/documentcoder_test.go @@ -52,7 +52,6 @@ func TestChunkedDocumentCoder(t *testing.T) { t.Fatalf("error adding to documentcoder: %v", err) } } - _ = cic.Close() err := cic.Write() if err != nil { t.Fatalf("error writing: %v", err) @@ -103,8 +102,6 @@ func TestChunkedDocumentCoders(t *testing.T) { t.Fatalf("error adding to documentcoder: %v", err) } } - _ = cic1.Close() - _ = cic2.Close() err := cic1.Write() if err != nil { diff --git a/merge.go b/merge.go index b838e7e..d4ec6eb 100644 --- a/merge.go +++ b/merge.go @@ -682,9 +682,6 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, } // document chunk coder - if err := docChunkCoder.Close(); err != nil { - return 0, nil, err - } if err := docChunkCoder.Write(); err != nil { return 0, nil, err } @@ -749,7 +746,7 @@ func mergeStoredAndRemapSegment(seg *Segment, dropsI *roaring.Bitmap, segNewDocN metaBytes := metaBuf.Bytes() // record where we're about to start writing - docNumOffsets[newDocNum] = docChunkCoder.BufferSize() + docNumOffsets[newDocNum] = docChunkCoder.Size() // document chunk line if _, err := docChunkCoder.Add(newDocNum, metaBytes, data); err != nil { return 0, err @@ -795,7 +792,7 @@ func (s *Segment) copyStoredDocs(newDocNum uint64, newDocNumOffsets []uint64, do dataLenData := uncompressed[storedOffset+n : storedOffset+n+int(binary.MaxVarintLen64)] dataLen, read := binary.Uvarint(dataLenData) n += read - newDocNumOffsets[newDocNum] = docChunkCoder.BufferSize() + newDocNumOffsets[newDocNum] = docChunkCoder.Size() metaBytes := uncompressed[storedOffset+n : storedOffset+n+int(metaLen)] data := uncompressed[storedOffset+n+int(metaLen) : storedOffset+n+int(metaLen)+int(dataLen)] if _, err := docChunkCoder.Add(newDocNum, metaBytes, data); err != nil { diff --git a/new.go b/new.go index 3b7c14d..ed4918e 100644 --- a/new.go +++ b/new.go @@ -609,7 +609,7 @@ func (s *interim) writeStoredFields() ( } metaBytes := s.metaBuf.Bytes() - docStoredOffsets[docNum] = docChunkCoder.BufferSize() + docStoredOffsets[docNum] = docChunkCoder.Size() _, err = docChunkCoder.Add(uint64(docNum), metaBytes, data) if err != nil { return 0, err @@ -617,10 +617,6 @@ func (s *interim) writeStoredFields() ( } // document chunk coder - err = docChunkCoder.Close() - if err != nil { - return 0, err - } err = docChunkCoder.Write() if err != nil { return 0, err From 5e1f80d8e605615e3a3258625d4a02342e473071 Mon Sep 17 00:00:00 2001 From: hengfeiyang Date: Sun, 15 May 2022 11:40:15 +0800 Subject: [PATCH 21/23] update version --- segment.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/segment.go b/segment.go index 5d045b7..bee7ca5 100644 --- a/segment.go +++ b/segment.go @@ -27,7 +27,7 @@ import ( segment "github.com/blugelabs/bluge_segment_api" ) -const Version uint32 = 1 +const Version uint32 = 2 const Type string = "ice" From d802dbc098cd6ab8fe28939ca3734716f567c1f2 Mon Sep 17 00:00:00 2001 From: hengfeiyang Date: Tue, 24 May 2022 13:28:09 +0800 Subject: [PATCH 22/23] fix panic when search memory --- documentcoder.go | 7 +++++++ load.go | 4 ++-- merge.go | 10 ++++------ new.go | 50 +++++++++++++++++++++++++----------------------- read.go | 6 +++--- segment.go | 4 ++-- zstd.go | 4 ++-- 7 files changed, 46 insertions(+), 39 deletions(-) diff --git a/documentcoder.go b/documentcoder.go index 1903816..b55daec 100644 --- a/documentcoder.go +++ b/documentcoder.go @@ -130,3 +130,10 @@ func (c *chunkedDocumentCoder) Size() uint64 { func (c *chunkedDocumentCoder) Len() int { return len(c.offsets) } + +// Len returns chunks num +func (c *chunkedDocumentCoder) Offsets() []uint64 { + m := make([]uint64, 0, len(c.offsets)) + m = append(m, c.offsets...) + return m +} diff --git a/load.go b/load.go index ae53349..b242efb 100644 --- a/load.go +++ b/load.go @@ -154,13 +154,13 @@ func (s *Segment) loadStoredFieldChunk() error { chunkOffsetPos -= int(chunkOffsetsLen) var offset, read int var offsetata []byte - s.storedFieldChunkOffset = make(map[int]uint64, chunkNum) + s.storedFieldChunkOffsets = make([]uint64, chunkNum) for i := 0; i < int(chunkNum); i++ { offsetata, err = s.data.Read(chunkOffsetPos+offset, chunkOffsetPos+offset+binary.MaxVarintLen64) if err != nil { return err } - s.storedFieldChunkOffset[i], read = binary.Uvarint(offsetata) + s.storedFieldChunkOffsets[i], read = binary.Uvarint(offsetata) offset += read } diff --git a/merge.go b/merge.go index d4ec6eb..1397439 100644 --- a/merge.go +++ b/merge.go @@ -20,7 +20,6 @@ import ( "encoding/binary" "fmt" "io" - "log" "math" "sort" @@ -767,9 +766,9 @@ func (s *Segment) copyStoredDocs(newDocNum uint64, newDocNumOffsets []uint64, do // visit documents and rewrite to chunk uncompressed := make([]byte, 0) - for i := 0; i < len(s.storedFieldChunkOffset)-1; i++ { - chunkOffstart := s.storedFieldChunkOffset[i] - chunkOffend := s.storedFieldChunkOffset[i+1] + for i := 0; i < len(s.storedFieldChunkOffsets)-1; i++ { + chunkOffstart := s.storedFieldChunkOffsets[i] + chunkOffend := s.storedFieldChunkOffsets[i+1] if chunkOffstart == chunkOffend { continue } @@ -779,7 +778,6 @@ func (s *Segment) copyStoredDocs(newDocNum uint64, newDocNumOffsets []uint64, do } uncompressed, err = ZSTDDecompress(uncompressed[:cap(uncompressed)], compressed) if err != nil { - log.Panic(err) return err } storedOffset := 0 @@ -794,7 +792,7 @@ func (s *Segment) copyStoredDocs(newDocNum uint64, newDocNumOffsets []uint64, do n += read newDocNumOffsets[newDocNum] = docChunkCoder.Size() metaBytes := uncompressed[storedOffset+n : storedOffset+n+int(metaLen)] - data := uncompressed[storedOffset+n+int(metaLen) : storedOffset+n+int(metaLen)+int(dataLen)] + data := uncompressed[storedOffset+n+int(metaLen) : storedOffset+n+int(metaLen+dataLen)] if _, err := docChunkCoder.Add(newDocNum, metaBytes, data); err != nil { return err } diff --git a/new.go b/new.go index ed4918e..05df566 100644 --- a/new.go +++ b/new.go @@ -61,7 +61,7 @@ func newWithChunkMode(results []segment.Document, normCalc func(string, int) flo s.w = newCountHashWriter(&br) var footer *footer - footer, dictOffsets, err := s.convert() + footer, dictOffsets, storedFieldChunkOffsets, err := s.convert() if err != nil { return nil, uint64(0), err } @@ -72,7 +72,7 @@ func newWithChunkMode(results []segment.Document, normCalc func(string, int) flo sb, err := initSegmentBase(br.Bytes(), footer, s.FieldsMap, s.FieldsInv, s.FieldDocs, s.FieldFreqs, - dictOffsets) + dictOffsets, storedFieldChunkOffsets) if err == nil && s.reset() == nil { s.lastNumDocs = len(results) @@ -86,17 +86,18 @@ func newWithChunkMode(results []segment.Document, normCalc func(string, int) flo func initSegmentBase(mem []byte, footer *footer, fieldsMap map[string]uint16, fieldsInv []string, fieldsDocs, fieldsFreqs map[uint16]uint64, - dictLocs []uint64) (*Segment, error) { + dictLocs []uint64, storedFieldChunkOffsets []uint64) (*Segment, error) { sb := &Segment{ - data: segment.NewDataBytes(mem), - footer: footer, - fieldsMap: fieldsMap, - fieldsInv: fieldsInv, - fieldDocs: fieldsDocs, - fieldFreqs: fieldsFreqs, - dictLocs: dictLocs, - fieldDvReaders: make(map[uint16]*docValueReader), - fieldFSTs: make(map[uint16]*vellum.FST), + data: segment.NewDataBytes(mem), + footer: footer, + fieldsMap: fieldsMap, + fieldsInv: fieldsInv, + fieldDocs: fieldsDocs, + fieldFreqs: fieldsFreqs, + dictLocs: dictLocs, + fieldDvReaders: make(map[uint16]*docValueReader), + fieldFSTs: make(map[uint16]*vellum.FST), + storedFieldChunkOffsets: storedFieldChunkOffsets, } sb.updateSize() @@ -248,7 +249,7 @@ type interimLoc struct { end uint64 } -func (s *interim) convert() (*footer, []uint64, error) { +func (s *interim) convert() (*footer, []uint64, []uint64, error) { s.FieldsMap = map[string]uint16{} s.FieldDocs = map[uint16]uint64{} s.FieldFreqs = map[uint16]uint64{} @@ -283,9 +284,9 @@ func (s *interim) convert() (*footer, []uint64, error) { s.processDocuments() - storedIndexOffset, err := s.writeStoredFields() + storedIndexOffset, storedFieldChunkOffsets, err := s.writeStoredFields() if err != nil { - return nil, nil, err + return nil, nil, nil, err } var fdvIndexOffset uint64 @@ -294,7 +295,7 @@ func (s *interim) convert() (*footer, []uint64, error) { if len(s.results) > 0 { fdvIndexOffset, dictOffsets, err = s.writeDicts() if err != nil { - return nil, nil, err + return nil, nil, nil, err } } else { dictOffsets = make([]uint64, len(s.FieldsInv)) @@ -302,7 +303,7 @@ func (s *interim) convert() (*footer, []uint64, error) { fieldsIndexOffset, err := persistFields(s.FieldsInv, s.FieldDocs, s.FieldFreqs, s.w, dictOffsets) if err != nil { - return nil, nil, err + return nil, nil, nil, err } return &footer{ @@ -310,7 +311,7 @@ func (s *interim) convert() (*footer, []uint64, error) { fieldsIndexOffset: fieldsIndexOffset, docValueOffset: fdvIndexOffset, version: Version, - }, dictOffsets, nil + }, dictOffsets, storedFieldChunkOffsets, nil } func (s *interim) getOrDefineField(fieldName string) int { @@ -552,7 +553,7 @@ func (s *interim) processDocument(docNum uint64, } func (s *interim) writeStoredFields() ( - storedIndexOffset uint64, err error) { + storedIndexOffset uint64, storedFieldChunkOffsets []uint64, err error) { varBuf := make([]byte, binary.MaxVarintLen64) metaEncode := func(val uint64) (int, error) { wb := binary.PutUvarint(varBuf, val) @@ -603,7 +604,7 @@ func (s *interim) writeStoredFields() ( fieldID, isf.vals, curr, metaEncode, data) if err != nil { - return 0, err + return 0, nil, err } } } @@ -612,26 +613,27 @@ func (s *interim) writeStoredFields() ( docStoredOffsets[docNum] = docChunkCoder.Size() _, err = docChunkCoder.Add(uint64(docNum), metaBytes, data) if err != nil { - return 0, err + return 0, nil, err } } // document chunk coder err = docChunkCoder.Write() if err != nil { - return 0, err + return 0, nil, err } + storedFieldChunkOffsets = docChunkCoder.Offsets() storedIndexOffset = uint64(s.w.Count()) for _, docStoredOffset := range docStoredOffsets { err = binary.Write(s.w, binary.BigEndian, docStoredOffset) if err != nil { - return 0, err + return 0, nil, err } } - return storedIndexOffset, nil + return storedIndexOffset, storedFieldChunkOffsets, nil } func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err error) { diff --git a/read.go b/read.go index f1faa6a..bc8a60c 100644 --- a/read.go +++ b/read.go @@ -36,9 +36,9 @@ func (s *Segment) getDocStoredOffsets(docNum uint64) (indexOffset, storedOffset, } // document chunk coder - trunI := docNum / uint64(defaultDocumentChunkSize) - chunkOffsetStart := s.storedFieldChunkOffset[int(trunI)] - chunkOffsetEnd := s.storedFieldChunkOffset[int(trunI)+1] + thunkI := docNum / uint64(defaultDocumentChunkSize) + chunkOffsetStart := s.storedFieldChunkOffsets[int(thunkI)] + chunkOffsetEnd := s.storedFieldChunkOffsets[int(thunkI)+1] compressed, err := s.data.Read(int(chunkOffsetStart), int(chunkOffsetEnd)) if err != nil { return 0, 0, 0, 0, 0, err diff --git a/segment.go b/segment.go index bee7ca5..19b768c 100644 --- a/segment.go +++ b/segment.go @@ -40,8 +40,8 @@ type Segment struct { fieldDocs map[uint16]uint64 // fieldID -> # docs with value in field fieldFreqs map[uint16]uint64 // fieldID -> # total tokens in field - storedFieldChunkOffset map[int]uint64 // stored field chunk offset - storedFieldChunkUncompressed []byte // for uncompress cache + storedFieldChunkOffsets []uint64 // stored field chunk offset + storedFieldChunkUncompressed []byte // for uncompress cache dictLocs []uint64 fieldDvReaders map[uint16]*docValueReader // naive chunk cache per field diff --git a/zstd.go b/zstd.go index 6436caf..7e053da 100644 --- a/zstd.go +++ b/zstd.go @@ -38,7 +38,7 @@ func ZSTDDecompress(dst, src []byte) ([]byte, error) { var err error decoder, err = zstd.NewReader(nil) if err != nil { - log.Fatalf("%+v", err) + log.Panicf("ZSTDDecompress: %+v", err) } }) return decoder.DecodeAll(src, dst[:0]) @@ -51,7 +51,7 @@ func ZSTDCompress(dst, src []byte, compressionLevel int) ([]byte, error) { level := zstd.EncoderLevelFromZstd(compressionLevel) encoder, err = zstd.NewWriter(nil, zstd.WithEncoderLevel(level)) if err != nil { - log.Fatalf("%+v", err) + log.Panicf("ZSTDCompress: %+v", err) } }) return encoder.EncodeAll(src, dst[:0]), nil From 1e2159ef1324548ee2a1930b0da2e9ed586eb582 Mon Sep 17 00:00:00 2001 From: hengfeiyang Date: Wed, 25 May 2022 10:53:36 +0800 Subject: [PATCH 23/23] rename variables --- read.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/read.go b/read.go index bc8a60c..809555a 100644 --- a/read.go +++ b/read.go @@ -36,9 +36,9 @@ func (s *Segment) getDocStoredOffsets(docNum uint64) (indexOffset, storedOffset, } // document chunk coder - thunkI := docNum / uint64(defaultDocumentChunkSize) - chunkOffsetStart := s.storedFieldChunkOffsets[int(thunkI)] - chunkOffsetEnd := s.storedFieldChunkOffsets[int(thunkI)+1] + chunkI := docNum / uint64(defaultDocumentChunkSize) + chunkOffsetStart := s.storedFieldChunkOffsets[int(chunkI)] + chunkOffsetEnd := s.storedFieldChunkOffsets[int(chunkI)+1] compressed, err := s.data.Read(int(chunkOffsetStart), int(chunkOffsetEnd)) if err != nil { return 0, 0, 0, 0, 0, err