Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
41253c2
Fix typos
pmikolajczyk41 Sep 18, 2025
5f8c604
Checkpoint: add second verifier to the receiver
pmikolajczyk41 Sep 19, 2025
3ccb2a8
Checkpoint: use second verifier for start and finalization (tests pass)
pmikolajczyk41 Sep 19, 2025
c1f8951
Correctly handle how Golang uses slices
pmikolajczyk41 Sep 19, 2025
c0cd16c
Remove the old verifier from the data stream protocol
pmikolajczyk41 Sep 19, 2025
46f615a
Remove the old verifier from the legacy protocol
pmikolajczyk41 Sep 19, 2025
3a2c443
Add // lint:require-exhaustive-initialization
pmikolajczyk41 Sep 19, 2025
b81cbba
Fix compilation in tests and factory
pmikolajczyk41 Sep 19, 2025
ed214fc
Extract expiration callback
pmikolajczyk41 Sep 19, 2025
e62f168
Remove impl details metrics
pmikolajczyk41 Sep 19, 2025
bbb33d2
Refactor signing payloads
pmikolajczyk41 Sep 19, 2025
4098d5f
Renamings
pmikolajczyk41 Sep 19, 2025
354212f
Refactor sender
pmikolajczyk41 Sep 19, 2025
bba606e
Move protocol to subpackage
pmikolajczyk41 Sep 19, 2025
8d90182
Lint + remove unused verifier code
pmikolajczyk41 Sep 19, 2025
4e9c662
Make Sender generic
pmikolajczyk41 Sep 19, 2025
0e154ac
Use generics in place of json marshalling and unmarshalling
pmikolajczyk41 Sep 19, 2025
3bd97fa
Revert changes to test
pmikolajczyk41 Sep 19, 2025
69ea61a
Abstract signing and verifying payload
pmikolajczyk41 Sep 22, 2025
9cf754a
Improve API
pmikolajczyk41 Sep 22, 2025
5783b30
Protocol test
pmikolajczyk41 Sep 22, 2025
07dba80
Merge remote-tracking branch 'origin/master' into pmikolajczyk/data-s…
pmikolajczyk41 Sep 22, 2025
2747db3
Merge branch 'master' into pmikolajczyk/data-stream-improvements
pmikolajczyk41 Sep 22, 2025
da11c3b
Merge branch 'master' into pmikolajczyk/data-stream-improvements
pmikolajczyk41 Sep 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions broadcastclient/broadcastclient.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2022, Offchain Labs, Inc.
// Copyright 2021-2025, Offchain Labs, Inc.
// For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md

package broadcastclient
Expand Down Expand Up @@ -99,7 +99,7 @@ var DefaultConfig = Config{
ReconnectMaximumBackoff: time.Second * 64,
RequireChainId: false,
RequireFeedVersion: false,
Verify: signature.DefultFeedVerifierConfig,
Verify: signature.DefaultFeedVerifierConfig,
URL: []string{},
SecondaryURL: []string{},
Timeout: 20 * time.Second,
Expand All @@ -111,7 +111,7 @@ var DefaultTestConfig = Config{
ReconnectMaximumBackoff: 0,
RequireChainId: false,
RequireFeedVersion: false,
Verify: signature.DefultFeedVerifierConfig,
Verify: signature.DefaultFeedVerifierConfig,
URL: []string{""},
SecondaryURL: []string{},
Timeout: 200 * time.Millisecond,
Expand Down
21 changes: 11 additions & 10 deletions daprovider/das/dasRpcClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/offchainlabs/nitro/blsSignatures"
"github.com/offchainlabs/nitro/daprovider/das/dasutil"
"github.com/offchainlabs/nitro/daprovider/das/data_streaming"
"github.com/offchainlabs/nitro/util/pretty"
"github.com/offchainlabs/nitro/util/signature"
)
Expand All @@ -27,17 +28,14 @@ var (
rpcClientStoreFailureGauge = metrics.NewRegisteredGauge("arb/das/rpcclient/store/failure", nil)
rpcClientStoreStoredBytesGauge = metrics.NewRegisteredGauge("arb/das/rpcclient/store/bytes", nil)
rpcClientStoreDurationHistogram = metrics.NewRegisteredHistogram("arb/das/rpcclient/store/duration", nil, metrics.NewBoundedHistogramSample())

rpcClientSendChunkSuccessGauge = metrics.NewRegisteredGauge("arb/das/rpcclient/sendchunk/success", nil)
rpcClientSendChunkFailureGauge = metrics.NewRegisteredGauge("arb/das/rpcclient/sendchunk/failure", nil)
)

// lint:require-exhaustive-initialization
type DASRPCClient struct { // implements DataAvailabilityService
clnt *rpc.Client
url string
signer signature.DataSignerFunc
dataStreamer *DataStreamer
dataStreamer *data_streaming.DataStreamer[StoreResult]
}

func nilSigner(_ []byte) ([]byte, error) {
Expand All @@ -54,14 +52,17 @@ func NewDASRPCClient(target string, signer signature.DataSignerFunc, maxStoreChu
return nil, err
}

var dataStreamer *DataStreamer
var dataStreamer *data_streaming.DataStreamer[StoreResult]
if enableChunkedStore {
rpcMethods := DataStreamingRPCMethods{
startReceiving: "das_startChunkedStore",
receiveChunk: "das_sendChunk",
finalizeReceiving: "das_commitChunkedStore",
rpcMethods := data_streaming.DataStreamingRPCMethods{
StartStream: "das_startChunkedStore",
StreamChunk: "das_sendChunk",
FinalizeStream: "das_commitChunkedStore",
}
dataStreamer, err = NewDataStreamer(target, maxStoreChunkBodySize, signer, rpcMethods)
payloadSigner := data_streaming.CustomPayloadSigner(func(bytes []byte, extras ...uint64) ([]byte, error) {
return applyDasSigner(signer, bytes, extras...)
})
dataStreamer, err = data_streaming.NewDataStreamer[StoreResult](target, maxStoreChunkBodySize, payloadSigner, rpcMethods)
if err != nil {
return nil, err
}
Expand Down
53 changes: 23 additions & 30 deletions daprovider/das/dasRpcServer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2022, Offchain Labs, Inc.
// Copyright 2021-2025, Offchain Labs, Inc.
// For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md

package das
Expand All @@ -19,6 +19,7 @@ import (
"github.com/offchainlabs/nitro/blsSignatures"
"github.com/offchainlabs/nitro/cmd/genericconf"
"github.com/offchainlabs/nitro/daprovider/das/dasutil"
"github.com/offchainlabs/nitro/daprovider/das/data_streaming"
"github.com/offchainlabs/nitro/util/pretty"
)

Expand Down Expand Up @@ -46,7 +47,7 @@ type DASRPCServer struct {

signatureVerifier *SignatureVerifier

dataStreamReceiver *DataStreamReceiver
dataStreamReceiver *data_streaming.DataStreamReceiver
}

func StartDASRPCServer(ctx context.Context, addr string, portNum uint64, rpcServerTimeouts genericconf.HTTPServerTimeoutConfig, rpcServerBodyLimit int, daReader dasutil.DASReader, daWriter dasutil.DASWriter, daHealthChecker DataAvailabilityServiceHealthChecker, signatureVerifier *SignatureVerifier) (*http.Server, error) {
Expand All @@ -69,12 +70,17 @@ func StartDASRPCServerOnListener(ctx context.Context, listener net.Listener, rpc
rpcServer.SetHTTPBodyLimit(rpcServerBodyLimit)
}

dataStreamPayloadVerifier := data_streaming.CustomPayloadVerifier(func(ctx context.Context, signature []byte, bytes []byte, extras ...uint64) error {
return signatureVerifier.verify(ctx, bytes, signature, extras...)
})
err := rpcServer.RegisterName("das", &DASRPCServer{
daReader: daReader,
daWriter: daWriter,
daHealthChecker: daHealthChecker,
signatureVerifier: signatureVerifier,
dataStreamReceiver: NewDataStreamReceiver(signatureVerifier, defaultMaxPendingMessages, defaultMessageCollectionExpiry),
daReader: daReader,
daWriter: daWriter,
daHealthChecker: daHealthChecker,
signatureVerifier: signatureVerifier,
dataStreamReceiver: data_streaming.NewDataStreamReceiver(dataStreamPayloadVerifier, defaultMaxPendingMessages, defaultMessageCollectionExpiry, func(id data_streaming.MessageId) {
rpcStoreFailureGauge.Inc(1)
}),
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -152,35 +158,22 @@ var (
legacyDASStoreAPIOnly = false
)

// lint:require-exhaustive-initialization
type StartChunkedStoreResult struct {
MessageId hexutil.Uint64 `json:"messageId,omitempty"`
}

// lint:require-exhaustive-initialization
type SendChunkResult struct {
Ok hexutil.Uint64 `json:"sendChunkResult,omitempty"`
}

func (s *DASRPCServer) StartChunkedStore(ctx context.Context, timestamp, nChunks, chunkSize, totalSize, timeout hexutil.Uint64, sig hexutil.Bytes) (*StartChunkedStoreResult, error) {
func (s *DASRPCServer) StartChunkedStore(ctx context.Context, timestamp, nChunks, chunkSize, totalSize, timeout hexutil.Uint64, sig hexutil.Bytes) (*data_streaming.StartStreamingResult, error) {
rpcStoreRequestGauge.Inc(1)
failed := true
defer func() {
if failed {
rpcStoreFailureGauge.Inc(1)
} // success gauge will be incremented on successful commit
}
}()

id, err := s.dataStreamReceiver.StartReceiving(ctx, uint64(timestamp), uint64(nChunks), uint64(chunkSize), uint64(totalSize), uint64(timeout), sig)
result, err := s.dataStreamReceiver.StartReceiving(ctx, uint64(timestamp), uint64(nChunks), uint64(chunkSize), uint64(totalSize), uint64(timeout), sig)
if err != nil {
return nil, err
}

failed = false
return &StartChunkedStoreResult{
MessageId: hexutil.Uint64(id),
}, nil

return result, nil
}

func (s *DASRPCServer) SendChunk(ctx context.Context, messageId, chunkId hexutil.Uint64, chunk hexutil.Bytes, sig hexutil.Bytes) error {
Expand All @@ -193,7 +186,7 @@ func (s *DASRPCServer) SendChunk(ctx context.Context, messageId, chunkId hexutil
}
}()

if err := s.dataStreamReceiver.ReceiveChunk(ctx, MessageId(messageId), uint64(chunkId), chunk, sig); err != nil {
if err := s.dataStreamReceiver.ReceiveChunk(ctx, data_streaming.MessageId(messageId), uint64(chunkId), chunk, sig); err != nil {
return err
}

Expand All @@ -202,7 +195,7 @@ func (s *DASRPCServer) SendChunk(ctx context.Context, messageId, chunkId hexutil
}

func (s *DASRPCServer) CommitChunkedStore(ctx context.Context, messageId hexutil.Uint64, sig hexutil.Bytes) (*StoreResult, error) {
message, timeout, startTime, err := s.dataStreamReceiver.FinalizeReceiving(ctx, MessageId(messageId), sig)
message, timeout, startTime, err := s.dataStreamReceiver.FinalizeReceiving(ctx, data_streaming.MessageId(messageId), sig)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -232,12 +225,12 @@ func (s *DASRPCServer) CommitChunkedStore(ctx context.Context, messageId hexutil
}, nil
}

func (serv *DASRPCServer) HealthCheck(ctx context.Context) error {
return serv.daHealthChecker.HealthCheck(ctx)
func (s *DASRPCServer) HealthCheck(ctx context.Context) error {
return s.daHealthChecker.HealthCheck(ctx)
}

func (serv *DASRPCServer) ExpirationPolicy(ctx context.Context) (string, error) {
expirationPolicy, err := serv.daReader.ExpirationPolicy(ctx)
func (s *DASRPCServer) ExpirationPolicy(ctx context.Context) (string, error) {
expirationPolicy, err := s.daReader.ExpirationPolicy(ctx)
if err != nil {
return "", err
}
Expand Down
139 changes: 139 additions & 0 deletions daprovider/das/data_streaming/protocol_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2025, Offchain Labs, Inc.
// For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md

package data_streaming

import (
"context"
"math/rand"
"net"
"net/http"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rpc"

"github.com/offchainlabs/nitro/cmd/genericconf"
"github.com/offchainlabs/nitro/util/signature"
"github.com/offchainlabs/nitro/util/testhelpers"
)

const (
maxPendingMessages = 10
messageCollectionExpiry = time.Duration(2 * time.Second)
maxStoreChunkBodySize = 1024
timeout = 10
serverRPCRoot = "datastreaming"
)

var rpcMethods = DataStreamingRPCMethods{
StartStream: serverRPCRoot + "_start",
StreamChunk: serverRPCRoot + "_chunk",
FinalizeStream: serverRPCRoot + "_finish",
}

func TestDataStreamingProtocol(t *testing.T) {
t.Run("Single sender, short message", func(t *testing.T) {
test(t, maxStoreChunkBodySize/2, 10, 1)
})
t.Run("Single sender, long message", func(t *testing.T) {
test(t, 2*maxStoreChunkBodySize, 50, 1)
})
t.Run("Many senders, long messages", func(t *testing.T) {
test(t, 10*maxStoreChunkBodySize, maxStoreChunkBodySize, maxPendingMessages)
})
}

func test(t *testing.T, messageSizeMean, messageSizeStdDev, concurrency int) {
ctx := context.Background()
signer, verifier := prepareCrypto(t)
serverUrl := launchServer(t, ctx, verifier)

streamer, err := NewDataStreamer[ProtocolResult]("http://"+serverUrl, maxStoreChunkBodySize, DefaultPayloadSigner(signer), rpcMethods)
testhelpers.RequireImpl(t, err)

var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()

messageSize := int(rand.NormFloat64()*float64(messageSizeStdDev) + float64(messageSizeMean))

message := testhelpers.RandomizeSlice(make([]byte, messageSize))
result, err := streamer.StreamData(ctx, message, timeout)
testhelpers.RequireImpl(t, err)
require.Equal(t, message, ([]byte)(result.Message), "protocol resulted in an incorrect message")
}()
}
wg.Wait()
}

func prepareCrypto(t *testing.T) (signature.DataSignerFunc, *signature.Verifier) {
privateKey, err := crypto.GenerateKey()
testhelpers.RequireImpl(t, err)

signatureVerifierConfig := signature.VerifierConfig{
AllowedAddresses: []string{crypto.PubkeyToAddress(privateKey.PublicKey).Hex()},
AcceptSequencer: false,
Dangerous: signature.DangerousVerifierConfig{AcceptMissing: false},
}
verifier, err := signature.NewVerifier(&signatureVerifierConfig, nil)
testhelpers.RequireImpl(t, err)

signer := signature.DataSignerFromPrivateKey(privateKey)
return signer, verifier
}

func launchServer(t *testing.T, ctx context.Context, signatureVerifier *signature.Verifier) string {
rpcServer := rpc.NewServer()
err := rpcServer.RegisterName(serverRPCRoot, &TestServer{
dataStreamReceiver: NewDataStreamReceiver(DefaultPayloadVerifier(signatureVerifier), maxPendingMessages, messageCollectionExpiry, nil),
})
testhelpers.RequireImpl(t, err)

listener, err := net.Listen("tcp", "localhost:0")
testhelpers.RequireImpl(t, err)

httpServer := &http.Server{Handler: rpcServer, ReadTimeout: genericconf.HTTPServerTimeoutConfigDefault.ReadTimeout}
go func() {
err = httpServer.Serve(listener)
testhelpers.RequireImpl(t, err)
}()
go func() {
<-ctx.Done()
_ = httpServer.Shutdown(context.Background())
}()

return listener.Addr().String()
}

// ======================================= Test server (wrapping the receiver part) ========================== //

// lint:require-exhaustive-initialization
type TestServer struct {
dataStreamReceiver *DataStreamReceiver
}

func (server *TestServer) Start(ctx context.Context, timestamp, nChunks, chunkSize, totalSize, timeout hexutil.Uint64, sig hexutil.Bytes) (*StartStreamingResult, error) {
return server.dataStreamReceiver.StartReceiving(ctx, uint64(timestamp), uint64(nChunks), uint64(chunkSize), uint64(totalSize), uint64(timeout), sig)
}

func (server *TestServer) Chunk(ctx context.Context, messageId, chunkId hexutil.Uint64, chunk hexutil.Bytes, sig hexutil.Bytes) error {
return server.dataStreamReceiver.ReceiveChunk(ctx, MessageId(messageId), uint64(chunkId), chunk, sig)
}

func (server *TestServer) Finish(ctx context.Context, messageId hexutil.Uint64, sig hexutil.Bytes) (*ProtocolResult, error) {
message, _, _, err := server.dataStreamReceiver.FinalizeReceiving(ctx, MessageId(messageId), sig)
return &ProtocolResult{Message: message}, err
}

// lint:require-exhaustive-initialization
type ProtocolResult struct {
Message hexutil.Bytes `json:"message"`
}
Loading
Loading