-
Notifications
You must be signed in to change notification settings - Fork 567
STREAMS API in Garnet #1131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
STREAMS API in Garnet #1131
Conversation
@ramananeesh please read the following Contributor License Agreement(CLA). If you agree with the CLA, please reply with the following information.
Contributor License AgreementContribution License AgreementThis Contribution License Agreement (“Agreement”) is agreed to by the party signing below (“You”),
|
@@ -12,7 +12,7 @@ | |||
<AllowUnsafeBlocks>true</AllowUnsafeBlocks> | |||
<EnforceCodeStyleInBuild>true</EnforceCodeStyleInBuild> | |||
<GenerateDocumentationFile>true</GenerateDocumentationFile> | |||
<TreatWarningsAsErrors>true</TreatWarningsAsErrors> | |||
<TreatWarningsAsErrors>false</TreatWarningsAsErrors> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert this change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First thanks for contributing!
I see the PR is marked as draft, but there's misunderstanding about the nature of Unsafe.AsPointer
I want to help with as early as possible: It's use is always a mistake and unnecessary. Embrace Span<T>
and normal struct
usage as much as possible. I left few preliminary comments.
libs/server/BTreeIndex/BTree.cs
Outdated
byte[] keyBytes = new byte[BTreeNode.KEY_SIZE]; | ||
Buffer.MemoryCopy(leaf->GetKey(0), Unsafe.AsPointer(ref keyBytes[0]), BTreeNode.KEY_SIZE, BTreeNode.KEY_SIZE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unsafe.AsPointer
is the single most dangerous method in the entire .NET. Using it here and elsewhere is certainly a mistake and unnecessary.
update: This is great example of GC hole. GC is free to move keyBytes
to entirely different location after Unsafe.AsPointer
call (yes, GC can occur between Unsafe.AsPointer
call and Buffer.MemoryCopy
). We could trigger this pretty consistently with GC stress.
Embrace (ReadOnly)Span<T>
here and elsewhere! Here this heap copy would look like:
byte[] keyBytes = new byte[BTreeNode.KEY_SIZE]; | |
Buffer.MemoryCopy(leaf->GetKey(0), Unsafe.AsPointer(ref keyBytes[0]), BTreeNode.KEY_SIZE, BTreeNode.KEY_SIZE); | |
byte[] keyBytes = new ReadOnlySpan<byte>(leaf->GetKey(0), BTreeNode.KEY_SIZE).ToArray(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion! I have fixed this in the latest commit.
libs/server/BTreeIndex/BTree.cs
Outdated
{ | ||
bufferPool = new SectorAlignedBufferPool(1, BTreeNode.PAGE_SIZE); | ||
// var memoryBlock = bufferPool.Get(BTreeNode.PAGE_SIZE); | ||
var memoryBlock = (IntPtr*)Marshal.AllocHGlobal(BTreeNode.PAGE_SIZE).ToPointer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Marshal.AllocHGlobal
is considered legacy API nowadays. We have thinner "malloc" wrappers available as NativeMemory.Alloc
today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we need aligned native allocations for the B-Tree page, we can use NativeMemory.AlignedAlloc
. Just be very careful to free whatever native memory is allocated, on dispose.
libs/server/BTreeIndex/BTree.cs
Outdated
return default; | ||
} | ||
byte[] keyBytes = new byte[BTreeNode.KEY_SIZE]; | ||
Buffer.MemoryCopy(leaf->GetKey(leaf->info->count - 1), Unsafe.AsPointer(ref keyBytes[0]), BTreeNode.KEY_SIZE, BTreeNode.KEY_SIZE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, GC hole.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in latest commit.
libs/server/Stream/StreamID.cs
Outdated
throw new ArgumentException("idBytes must be 16 bytes"); | ||
} | ||
|
||
Buffer.MemoryCopy((byte*)Unsafe.AsPointer(ref inputBytes[0]), (byte*)Unsafe.AsPointer(ref idBytes[0]), 16, 16); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GC Hole with Unsafe.AsPointer
usage here. I won't add comments for all of the Unsafe.AsPointer uses, almost certainly, every single one of them is unnecessary. All of them need to go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in latest commit.
private ByteArrayComparer() { } | ||
public ByteArrayComparer() { } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert. Use the .Instance
above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted.
Two nits:
|
Run |
@@ -186,6 +186,23 @@ public static bool TryWriteSimpleString(ReadOnlySpan<byte> simpleString, ref byt | |||
return true; | |||
} | |||
|
|||
/// <summary> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove commented code.
@@ -99,6 +99,10 @@ public class ServerOptions | |||
/// </summary> | |||
public bool SkipRDBRestoreChecksumValidation = false; | |||
|
|||
public string StreamPageSize = "4m"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move these to GarnetServerOptions.cs and add comments.
|
||
namespace Garnet.server | ||
{ | ||
internal struct SessionStreamCache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this a class, I don't think perf will be impacted that much? Please try it.
@@ -249,6 +254,10 @@ public RespServerSession( | |||
if (this.networkSender.GetMaxSizeSettings?.MaxOutputSize < sizeof(int)) | |||
this.networkSender.GetMaxSizeSettings.MaxOutputSize = sizeof(int); | |||
} | |||
|
|||
// grab stream manager from storeWrapper | |||
this.streamManager = storeWrapper.streamManager; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a flag to enable streams --streams
and leave it disabled by default. Similar to --lua
which enables Lua. If not enabled, we should not incur the overhead of creating stream manager or session stream cache at all. Or any other objects or statics related to streams feature.
Adds support for STREAMS in Garnet.
Index Structure
Uses an in-memory B+tree (B-tree) index that has the following features:
Supported Operations in API
The following operations are currently supported:
STREAMID
Stream ID is a 128-bit ID for an entry in the Stream that is of a format
ts-seq
wherets
is generally the timestamp andseq
is the sequence number.STREAM
The Stream Object that consists of an instance to its
B-tree
index and aTsavorite
log instance for persistence. Every entry added to a Stream is first inserted into theTsavorite
log that returns the added address. This address is added as thevalue
to the index using theSTREAMID
askey
.StreamManager
A container/wrapper that holds all Streams in the server in a dictionary.
SessionStreamCache
A local cache of Streams added by the client for faster access. Currently capped at
capacity
and uses a simpleFIFO
policy for the initial version. Can be extended to support other eviction strategies (preferablyLRU
).