Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 22 additions & 18 deletions pkg/fileservice/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,17 @@ import (
type Bytes struct {
bytes []byte
deallocator malloc.Deallocator
deallocated uint32
_refs atomic.Int32
refs *atomic.Int32
refs atomic.Int32
}

func (b *Bytes) Size() int64 {
return int64(len(b.bytes))
}

func (b *Bytes) Bytes() []byte {
if b.refs.Load() <= 0 {
panic("Bytes.Bytes: memory was already deallocated.")
}
return b.bytes
}

Expand All @@ -44,25 +45,29 @@ func (b *Bytes) Slice(length int) fscache.Data {
}

func (b *Bytes) Retain() {
if b.refs != nil {
b.refs.Add(1)
}
b.refs.Add(1)
}

func (b *Bytes) Release() {
if b.refs != nil {
if n := b.refs.Add(-1); n == 0 {
if b.deallocator != nil &&
atomic.CompareAndSwapUint32(&b.deallocated, 0, 1) {
b.deallocator.Deallocate(malloc.NoHints)
}
}
} else {
if b.deallocator != nil &&
atomic.CompareAndSwapUint32(&b.deallocated, 0, 1) {
n := b.refs.Add(-1)
if n == 0 {
// set bytes to nil
b.bytes = nil
if b.deallocator != nil {
b.deallocator.Deallocate(malloc.NoHints)
b.deallocator = nil
}
} else if n < 0 {
panic("Bytes.Release: double free")
}
}

func NewBytes(data []byte) *Bytes {
bytes := &Bytes{
bytes: data,
}
bytes.refs.Store(1)
return bytes
}

type bytesAllocator struct {
Expand All @@ -80,8 +85,7 @@ func (b *bytesAllocator) allocateCacheData(size int, hints malloc.Hints) fscache
bytes: slice,
deallocator: dec,
}
bytes._refs.Store(1)
bytes.refs = &bytes._refs
bytes.refs.Store(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, you can do a NewBytes here, so that we can restrict all the Store(1) to one single place.

return bytes
}

Expand Down
73 changes: 73 additions & 0 deletions pkg/fileservice/bytes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package fileservice

import (
"sync"
"testing"
"time"

"github.com/matrixorigin/matrixone/pkg/common/malloc"
"github.com/stretchr/testify/assert"
Expand All @@ -29,6 +31,77 @@ func TestBytes(t *testing.T) {
bytes: bytes,
deallocator: deallocator,
}
bs.refs.Store(1)
bs.Release()
})
}

func TestBytesError(t *testing.T) {
t.Run("Bytes get invalid memory", func(t *testing.T) {
bytes, deallocator, err := ioAllocator().Allocate(42, malloc.NoHints)
assert.Nil(t, err)
bs := &Bytes{
bytes: bytes,
deallocator: deallocator,
}
bs.refs.Store(1)

// deallocate memory
bs.Release()

// nil pointer
assert.Panics(t, func() { bs.Bytes() }, "get invalid memory")
})

t.Run("Bytes double free", func(t *testing.T) {
bytes, deallocator, err := ioAllocator().Allocate(42, malloc.NoHints)
assert.Nil(t, err)
bs := &Bytes{
bytes: bytes,
deallocator: deallocator,
}
bs.refs.Store(1)

// deallocate memory
bs.Release()

// double free
assert.Panics(t, func() { bs.Release() }, "double free")
})

t.Run("Bytes nil deallocator", func(t *testing.T) {
data := []byte("123")
bs := NewBytes(data)

// deallocate memory
bs.Release()

assert.Panics(t, func() { bs.Release() }, "double free")
})
}

func TestBytesConcurrent(t *testing.T) {
data := []byte("123")
bs := NewBytes(data)
nthread := 5
var wg sync.WaitGroup
for i := 0; i < nthread; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

bs.Retain()

time.Sleep(1 * time.Millisecond)

bs.Release()
}(i)
}

wg.Wait()

bs.Release()

// double free
assert.Panics(t, func() { bs.Release() }, "double free")
}
2 changes: 1 addition & 1 deletion pkg/fileservice/remote_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (r *RemoteCache) Read(ctx context.Context, vector *IOVector) error {
idx := int(cacheData.Index)
if cacheData.Hit {
vector.Entries[idx].done = true
vector.Entries[idx].CachedData = &Bytes{bytes: cacheData.Data}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reusee can you please check this line? This is significant. I will be surprised if the old code works, and the new code also works without a leak. Are we fixing a real bug here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

旧的代码不带引用计数,所以就只是对 []byte 的包装。新的代码改成了引用计数,只要使用方的计数正确,就不会有问题。如果真的有问题,可以另外增加一个 GoBytes 类型,实现 fscache.Data 接口,但不实现引用计数机制。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Cannot have another type; the ref counting MUST be correct. So great, let's go with this and see if we will hit panic.

Thank you.

vector.Entries[idx].CachedData = NewBytes(cacheData.Data)
vector.Entries[idx].fromCache = r
numHit++
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/fileservice/remote_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestRemoteCache(t *testing.T) {
err = sf2.rc.Read(ctx, ioVec2)
assert.NoError(t, err)
assert.Equal(t, 1, len(ioVec2.Entries))
assert.Equal(t, &Bytes{bytes: []byte{1, 2}}, ioVec2.Entries[0].CachedData)
assert.Equal(t, NewBytes([]byte{1, 2}), ioVec2.Entries[0].CachedData)
assert.Equal(t, true, ioVec2.Entries[0].done)
assert.NotNil(t, ioVec2.Entries[0].fromCache)

Expand Down
Loading