diff --git a/broadcastclient/broadcastclient.go b/broadcastclient/broadcastclient.go index 4cd4e817bc..9baec85082 100644 --- a/broadcastclient/broadcastclient.go +++ b/broadcastclient/broadcastclient.go @@ -1,4 +1,4 @@ -// Copyright 2021-2022, Offchain Labs, Inc. +// Copyright 2021-2025, Offchain Labs, Inc. // For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md package broadcastclient @@ -99,7 +99,7 @@ var DefaultConfig = Config{ ReconnectMaximumBackoff: time.Second * 64, RequireChainId: false, RequireFeedVersion: false, - Verify: signature.DefultFeedVerifierConfig, + Verify: signature.DefaultFeedVerifierConfig, URL: []string{}, SecondaryURL: []string{}, Timeout: 20 * time.Second, @@ -111,7 +111,7 @@ var DefaultTestConfig = Config{ ReconnectMaximumBackoff: 0, RequireChainId: false, RequireFeedVersion: false, - Verify: signature.DefultFeedVerifierConfig, + Verify: signature.DefaultFeedVerifierConfig, URL: []string{""}, SecondaryURL: []string{}, Timeout: 200 * time.Millisecond, diff --git a/daprovider/das/dasRpcClient.go b/daprovider/das/dasRpcClient.go index 5dfb1ad76d..ff93699d47 100644 --- a/daprovider/das/dasRpcClient.go +++ b/daprovider/das/dasRpcClient.go @@ -17,6 +17,7 @@ import ( "github.com/offchainlabs/nitro/blsSignatures" "github.com/offchainlabs/nitro/daprovider/das/dasutil" + "github.com/offchainlabs/nitro/daprovider/das/data_streaming" "github.com/offchainlabs/nitro/util/pretty" "github.com/offchainlabs/nitro/util/signature" ) @@ -27,9 +28,6 @@ var ( rpcClientStoreFailureGauge = metrics.NewRegisteredGauge("arb/das/rpcclient/store/failure", nil) rpcClientStoreStoredBytesGauge = metrics.NewRegisteredGauge("arb/das/rpcclient/store/bytes", nil) rpcClientStoreDurationHistogram = metrics.NewRegisteredHistogram("arb/das/rpcclient/store/duration", nil, metrics.NewBoundedHistogramSample()) - - rpcClientSendChunkSuccessGauge = metrics.NewRegisteredGauge("arb/das/rpcclient/sendchunk/success", nil) - rpcClientSendChunkFailureGauge = metrics.NewRegisteredGauge("arb/das/rpcclient/sendchunk/failure", nil) ) // lint:require-exhaustive-initialization @@ -37,7 +35,7 @@ type DASRPCClient struct { // implements DataAvailabilityService clnt *rpc.Client url string signer signature.DataSignerFunc - dataStreamer *DataStreamer + dataStreamer *data_streaming.DataStreamer[StoreResult] } func nilSigner(_ []byte) ([]byte, error) { @@ -54,14 +52,17 @@ func NewDASRPCClient(target string, signer signature.DataSignerFunc, maxStoreChu return nil, err } - var dataStreamer *DataStreamer + var dataStreamer *data_streaming.DataStreamer[StoreResult] if enableChunkedStore { - rpcMethods := DataStreamingRPCMethods{ - startReceiving: "das_startChunkedStore", - receiveChunk: "das_sendChunk", - finalizeReceiving: "das_commitChunkedStore", + rpcMethods := data_streaming.DataStreamingRPCMethods{ + StartStream: "das_startChunkedStore", + StreamChunk: "das_sendChunk", + FinalizeStream: "das_commitChunkedStore", } - dataStreamer, err = NewDataStreamer(target, maxStoreChunkBodySize, signer, rpcMethods) + payloadSigner := data_streaming.CustomPayloadSigner(func(bytes []byte, extras ...uint64) ([]byte, error) { + return applyDasSigner(signer, bytes, extras...) + }) + dataStreamer, err = data_streaming.NewDataStreamer[StoreResult](target, maxStoreChunkBodySize, payloadSigner, rpcMethods) if err != nil { return nil, err } diff --git a/daprovider/das/dasRpcServer.go b/daprovider/das/dasRpcServer.go index ef2f4bbdf4..3fbc7c2916 100644 --- a/daprovider/das/dasRpcServer.go +++ b/daprovider/das/dasRpcServer.go @@ -1,4 +1,4 @@ -// Copyright 2021-2022, Offchain Labs, Inc. +// Copyright 2021-2025, Offchain Labs, Inc. // For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md package das @@ -19,6 +19,7 @@ import ( "github.com/offchainlabs/nitro/blsSignatures" "github.com/offchainlabs/nitro/cmd/genericconf" "github.com/offchainlabs/nitro/daprovider/das/dasutil" + "github.com/offchainlabs/nitro/daprovider/das/data_streaming" "github.com/offchainlabs/nitro/util/pretty" ) @@ -46,7 +47,7 @@ type DASRPCServer struct { signatureVerifier *SignatureVerifier - dataStreamReceiver *DataStreamReceiver + dataStreamReceiver *data_streaming.DataStreamReceiver } func StartDASRPCServer(ctx context.Context, addr string, portNum uint64, rpcServerTimeouts genericconf.HTTPServerTimeoutConfig, rpcServerBodyLimit int, daReader dasutil.DASReader, daWriter dasutil.DASWriter, daHealthChecker DataAvailabilityServiceHealthChecker, signatureVerifier *SignatureVerifier) (*http.Server, error) { @@ -69,12 +70,17 @@ func StartDASRPCServerOnListener(ctx context.Context, listener net.Listener, rpc rpcServer.SetHTTPBodyLimit(rpcServerBodyLimit) } + dataStreamPayloadVerifier := data_streaming.CustomPayloadVerifier(func(ctx context.Context, signature []byte, bytes []byte, extras ...uint64) error { + return signatureVerifier.verify(ctx, bytes, signature, extras...) + }) err := rpcServer.RegisterName("das", &DASRPCServer{ - daReader: daReader, - daWriter: daWriter, - daHealthChecker: daHealthChecker, - signatureVerifier: signatureVerifier, - dataStreamReceiver: NewDataStreamReceiver(signatureVerifier, defaultMaxPendingMessages, defaultMessageCollectionExpiry), + daReader: daReader, + daWriter: daWriter, + daHealthChecker: daHealthChecker, + signatureVerifier: signatureVerifier, + dataStreamReceiver: data_streaming.NewDataStreamReceiver(dataStreamPayloadVerifier, defaultMaxPendingMessages, defaultMessageCollectionExpiry, func(id data_streaming.MessageId) { + rpcStoreFailureGauge.Inc(1) + }), }) if err != nil { return nil, err @@ -152,35 +158,22 @@ var ( legacyDASStoreAPIOnly = false ) -// lint:require-exhaustive-initialization -type StartChunkedStoreResult struct { - MessageId hexutil.Uint64 `json:"messageId,omitempty"` -} - -// lint:require-exhaustive-initialization -type SendChunkResult struct { - Ok hexutil.Uint64 `json:"sendChunkResult,omitempty"` -} - -func (s *DASRPCServer) StartChunkedStore(ctx context.Context, timestamp, nChunks, chunkSize, totalSize, timeout hexutil.Uint64, sig hexutil.Bytes) (*StartChunkedStoreResult, error) { +func (s *DASRPCServer) StartChunkedStore(ctx context.Context, timestamp, nChunks, chunkSize, totalSize, timeout hexutil.Uint64, sig hexutil.Bytes) (*data_streaming.StartStreamingResult, error) { rpcStoreRequestGauge.Inc(1) failed := true defer func() { if failed { rpcStoreFailureGauge.Inc(1) - } // success gauge will be incremented on successful commit + } }() - id, err := s.dataStreamReceiver.StartReceiving(ctx, uint64(timestamp), uint64(nChunks), uint64(chunkSize), uint64(totalSize), uint64(timeout), sig) + result, err := s.dataStreamReceiver.StartReceiving(ctx, uint64(timestamp), uint64(nChunks), uint64(chunkSize), uint64(totalSize), uint64(timeout), sig) if err != nil { return nil, err } failed = false - return &StartChunkedStoreResult{ - MessageId: hexutil.Uint64(id), - }, nil - + return result, nil } func (s *DASRPCServer) SendChunk(ctx context.Context, messageId, chunkId hexutil.Uint64, chunk hexutil.Bytes, sig hexutil.Bytes) error { @@ -193,7 +186,7 @@ func (s *DASRPCServer) SendChunk(ctx context.Context, messageId, chunkId hexutil } }() - if err := s.dataStreamReceiver.ReceiveChunk(ctx, MessageId(messageId), uint64(chunkId), chunk, sig); err != nil { + if err := s.dataStreamReceiver.ReceiveChunk(ctx, data_streaming.MessageId(messageId), uint64(chunkId), chunk, sig); err != nil { return err } @@ -202,7 +195,7 @@ func (s *DASRPCServer) SendChunk(ctx context.Context, messageId, chunkId hexutil } func (s *DASRPCServer) CommitChunkedStore(ctx context.Context, messageId hexutil.Uint64, sig hexutil.Bytes) (*StoreResult, error) { - message, timeout, startTime, err := s.dataStreamReceiver.FinalizeReceiving(ctx, MessageId(messageId), sig) + message, timeout, startTime, err := s.dataStreamReceiver.FinalizeReceiving(ctx, data_streaming.MessageId(messageId), sig) if err != nil { return nil, err } @@ -232,12 +225,12 @@ func (s *DASRPCServer) CommitChunkedStore(ctx context.Context, messageId hexutil }, nil } -func (serv *DASRPCServer) HealthCheck(ctx context.Context) error { - return serv.daHealthChecker.HealthCheck(ctx) +func (s *DASRPCServer) HealthCheck(ctx context.Context) error { + return s.daHealthChecker.HealthCheck(ctx) } -func (serv *DASRPCServer) ExpirationPolicy(ctx context.Context) (string, error) { - expirationPolicy, err := serv.daReader.ExpirationPolicy(ctx) +func (s *DASRPCServer) ExpirationPolicy(ctx context.Context) (string, error) { + expirationPolicy, err := s.daReader.ExpirationPolicy(ctx) if err != nil { return "", err } diff --git a/daprovider/das/data_streaming/protocol_test.go b/daprovider/das/data_streaming/protocol_test.go new file mode 100644 index 0000000000..6b035eb080 --- /dev/null +++ b/daprovider/das/data_streaming/protocol_test.go @@ -0,0 +1,139 @@ +// Copyright 2025, Offchain Labs, Inc. +// For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md + +package data_streaming + +import ( + "context" + "math/rand" + "net" + "net/http" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/rpc" + + "github.com/offchainlabs/nitro/cmd/genericconf" + "github.com/offchainlabs/nitro/util/signature" + "github.com/offchainlabs/nitro/util/testhelpers" +) + +const ( + maxPendingMessages = 10 + messageCollectionExpiry = time.Duration(2 * time.Second) + maxStoreChunkBodySize = 1024 + timeout = 10 + serverRPCRoot = "datastreaming" +) + +var rpcMethods = DataStreamingRPCMethods{ + StartStream: serverRPCRoot + "_start", + StreamChunk: serverRPCRoot + "_chunk", + FinalizeStream: serverRPCRoot + "_finish", +} + +func TestDataStreamingProtocol(t *testing.T) { + t.Run("Single sender, short message", func(t *testing.T) { + test(t, maxStoreChunkBodySize/2, 10, 1) + }) + t.Run("Single sender, long message", func(t *testing.T) { + test(t, 2*maxStoreChunkBodySize, 50, 1) + }) + t.Run("Many senders, long messages", func(t *testing.T) { + test(t, 10*maxStoreChunkBodySize, maxStoreChunkBodySize, maxPendingMessages) + }) +} + +func test(t *testing.T, messageSizeMean, messageSizeStdDev, concurrency int) { + ctx := context.Background() + signer, verifier := prepareCrypto(t) + serverUrl := launchServer(t, ctx, verifier) + + streamer, err := NewDataStreamer[ProtocolResult]("http://"+serverUrl, maxStoreChunkBodySize, DefaultPayloadSigner(signer), rpcMethods) + testhelpers.RequireImpl(t, err) + + var wg sync.WaitGroup + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + messageSize := int(rand.NormFloat64()*float64(messageSizeStdDev) + float64(messageSizeMean)) + + message := testhelpers.RandomizeSlice(make([]byte, messageSize)) + result, err := streamer.StreamData(ctx, message, timeout) + testhelpers.RequireImpl(t, err) + require.Equal(t, message, ([]byte)(result.Message), "protocol resulted in an incorrect message") + }() + } + wg.Wait() +} + +func prepareCrypto(t *testing.T) (signature.DataSignerFunc, *signature.Verifier) { + privateKey, err := crypto.GenerateKey() + testhelpers.RequireImpl(t, err) + + signatureVerifierConfig := signature.VerifierConfig{ + AllowedAddresses: []string{crypto.PubkeyToAddress(privateKey.PublicKey).Hex()}, + AcceptSequencer: false, + Dangerous: signature.DangerousVerifierConfig{AcceptMissing: false}, + } + verifier, err := signature.NewVerifier(&signatureVerifierConfig, nil) + testhelpers.RequireImpl(t, err) + + signer := signature.DataSignerFromPrivateKey(privateKey) + return signer, verifier +} + +func launchServer(t *testing.T, ctx context.Context, signatureVerifier *signature.Verifier) string { + rpcServer := rpc.NewServer() + err := rpcServer.RegisterName(serverRPCRoot, &TestServer{ + dataStreamReceiver: NewDataStreamReceiver(DefaultPayloadVerifier(signatureVerifier), maxPendingMessages, messageCollectionExpiry, nil), + }) + testhelpers.RequireImpl(t, err) + + listener, err := net.Listen("tcp", "localhost:0") + testhelpers.RequireImpl(t, err) + + httpServer := &http.Server{Handler: rpcServer, ReadTimeout: genericconf.HTTPServerTimeoutConfigDefault.ReadTimeout} + go func() { + err = httpServer.Serve(listener) + testhelpers.RequireImpl(t, err) + }() + go func() { + <-ctx.Done() + _ = httpServer.Shutdown(context.Background()) + }() + + return listener.Addr().String() +} + +// ======================================= Test server (wrapping the receiver part) ========================== // + +// lint:require-exhaustive-initialization +type TestServer struct { + dataStreamReceiver *DataStreamReceiver +} + +func (server *TestServer) Start(ctx context.Context, timestamp, nChunks, chunkSize, totalSize, timeout hexutil.Uint64, sig hexutil.Bytes) (*StartStreamingResult, error) { + return server.dataStreamReceiver.StartReceiving(ctx, uint64(timestamp), uint64(nChunks), uint64(chunkSize), uint64(totalSize), uint64(timeout), sig) +} + +func (server *TestServer) Chunk(ctx context.Context, messageId, chunkId hexutil.Uint64, chunk hexutil.Bytes, sig hexutil.Bytes) error { + return server.dataStreamReceiver.ReceiveChunk(ctx, MessageId(messageId), uint64(chunkId), chunk, sig) +} + +func (server *TestServer) Finish(ctx context.Context, messageId hexutil.Uint64, sig hexutil.Bytes) (*ProtocolResult, error) { + message, _, _, err := server.dataStreamReceiver.FinalizeReceiving(ctx, MessageId(messageId), sig) + return &ProtocolResult{Message: message}, err +} + +// lint:require-exhaustive-initialization +type ProtocolResult struct { + Message hexutil.Bytes `json:"message"` +} diff --git a/daprovider/das/data_streaming_receiver.go b/daprovider/das/data_streaming/receiver.go similarity index 74% rename from daprovider/das/data_streaming_receiver.go rename to daprovider/das/data_streaming/receiver.go index dad6549d93..4a77651983 100644 --- a/daprovider/das/data_streaming_receiver.go +++ b/daprovider/das/data_streaming/receiver.go @@ -1,7 +1,7 @@ // Copyright 2025, Offchain Labs, Inc. // For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md -package das +package data_streaming import ( "context" @@ -24,44 +24,51 @@ import ( // the interrupted streams. // lint:require-exhaustive-initialization type DataStreamReceiver struct { - signatureVerifier *SignatureVerifier - messageStore *messageStore + payloadVerifier *PayloadVerifier + messageStore *messageStore } -// NewDataStreamReceiver sets up a new stream receiver. `signatureVerifier` must be compatible with message signing on +// NewDataStreamReceiver sets up a new stream receiver. `payloadVerifier` must be compatible with message signing on // the `DataStreamer` sender side. `maxPendingMessages` limits how many parallel protocol instances are supported. // `messageCollectionExpiry` is the window in which a single message streaming must end - otherwise the protocol will // be closed and all related data will be removed. -func NewDataStreamReceiver(signatureVerifier *SignatureVerifier, maxPendingMessages int, messageCollectionExpiry time.Duration) *DataStreamReceiver { +func NewDataStreamReceiver(payloadVerifier *PayloadVerifier, maxPendingMessages int, messageCollectionExpiry time.Duration, expirationCallback func(id MessageId)) *DataStreamReceiver { return &DataStreamReceiver{ - signatureVerifier: signatureVerifier, - messageStore: newMessageStore(maxPendingMessages, messageCollectionExpiry), + payloadVerifier: payloadVerifier, + messageStore: newMessageStore(maxPendingMessages, messageCollectionExpiry, expirationCallback), } } -func (dsr *DataStreamReceiver) StartReceiving(ctx context.Context, timestamp, nChunks, chunkSize, totalSize, timeout uint64, sig []byte) (MessageId, error) { - if err := dsr.signatureVerifier.verify(ctx, []byte{}, sig, timestamp, nChunks, chunkSize, totalSize, timeout); err != nil { - return 0, err +// StartStreamingResult is expected by DataStreamer to be returned by the endpoint responsible for the StartReceiving method. +// lint:require-exhaustive-initialization +type StartStreamingResult struct { + MessageId hexutil.Uint64 `json:"MessageId,omitempty"` +} + +func (dsr *DataStreamReceiver) StartReceiving(ctx context.Context, timestamp, nChunks, chunkSize, totalSize, timeout uint64, signature []byte) (*StartStreamingResult, error) { + if err := dsr.payloadVerifier.verifyPayload(ctx, signature, []byte{}, timestamp, nChunks, chunkSize, totalSize, timeout); err != nil { + return &StartStreamingResult{0}, err } // Prevent replay of old messages // #nosec G115 if time.Since(time.Unix(int64(timestamp), 0)).Abs() > time.Minute { - return 0, errors.New("too much time has elapsed since request was signed") + return &StartStreamingResult{0}, errors.New("too much time has elapsed since request was signed") } - return dsr.messageStore.registerNewMessage(nChunks, timeout, chunkSize, totalSize) + messageId, err := dsr.messageStore.registerNewMessage(nChunks, timeout, chunkSize, totalSize) + return &StartStreamingResult{hexutil.Uint64(messageId)}, err } -func (dsr *DataStreamReceiver) ReceiveChunk(ctx context.Context, messageId MessageId, chunkId uint64, chunk, sig []byte) error { - if err := dsr.signatureVerifier.verify(ctx, chunk, sig, uint64(messageId), chunkId); err != nil { +func (dsr *DataStreamReceiver) ReceiveChunk(ctx context.Context, messageId MessageId, chunkId uint64, chunkData, signature []byte) error { + if err := dsr.payloadVerifier.verifyPayload(ctx, signature, chunkData, uint64(messageId), chunkId); err != nil { return err } - return dsr.messageStore.addNewChunk(messageId, chunkId, chunk) + return dsr.messageStore.addNewChunk(messageId, chunkId, chunkData) } -func (dsr *DataStreamReceiver) FinalizeReceiving(ctx context.Context, messageId MessageId, sig hexutil.Bytes) ([]byte, uint64, time.Time, error) { - if err := dsr.signatureVerifier.verify(ctx, []byte{}, sig, uint64(messageId)); err != nil { +func (dsr *DataStreamReceiver) FinalizeReceiving(ctx context.Context, messageId MessageId, signature hexutil.Bytes) ([]byte, uint64, time.Time, error) { + if err := dsr.payloadVerifier.verifyPayload(ctx, signature, []byte{}, uint64(messageId)); err != nil { return nil, 0, time.Time{}, err } return dsr.messageStore.finalizeMessage(messageId) @@ -89,14 +96,16 @@ type messageStore struct { messages map[MessageId]*partialMessage maxPendingMessages int messageCollectionExpiry time.Duration + expirationCallback func(MessageId) } -func newMessageStore(maxPendingMessages int, messageCollectionExpiry time.Duration) *messageStore { +func newMessageStore(maxPendingMessages int, messageCollectionExpiry time.Duration, expirationCallback func(id MessageId)) *messageStore { return &messageStore{ mutex: sync.Mutex{}, messages: make(map[MessageId]*partialMessage), maxPendingMessages: maxPendingMessages, messageCollectionExpiry: messageCollectionExpiry, + expirationCallback: expirationCallback, } } @@ -134,7 +143,9 @@ func (ms *messageStore) registerNewMessage(nChunks, timeout, chunkSize, totalSiz // Message will only exist if expiry was reached without it being complete. if _, stillExists := ms.messages[id]; stillExists { - rpcStoreFailureGauge.Inc(1) + if ms.expirationCallback != nil { + ms.expirationCallback(id) + } delete(ms.messages, id) } }(id) diff --git a/daprovider/das/data_streaming_sender.go b/daprovider/das/data_streaming/sender.go similarity index 55% rename from daprovider/das/data_streaming_sender.go rename to daprovider/das/data_streaming/sender.go index 00cd3d5be6..cf31b16770 100644 --- a/daprovider/das/data_streaming_sender.go +++ b/daprovider/das/data_streaming/sender.go @@ -1,7 +1,7 @@ // Copyright 2025, Offchain Labs, Inc. // For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md -package das +package data_streaming import ( "context" @@ -13,25 +13,25 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/rpc" - - "github.com/offchainlabs/nitro/util/signature" ) // DataStreamer allows sending arbitrarily big payloads with JSON RPC. It follows a simple chunk-based protocol. -type DataStreamer struct { +// lint:require-exhaustive-initialization +type DataStreamer[Result any] struct { // rpcClient is the underlying client for making RPC calls to the receiver. rpcClient *rpc.Client // chunkSize is the preconfigured size limit on a single data chunk to be sent. chunkSize uint64 // dataSigner is used for sender authentication during the protocol. - dataSigner signature.DataSignerFunc + dataSigner *PayloadSigner // rpcMethods define the actual server API rpcMethods DataStreamingRPCMethods } // DataStreamingRPCMethods configuration specifies names of the protocol's RPC methods on the server side. +// lint:require-exhaustive-initialization type DataStreamingRPCMethods struct { - startReceiving, receiveChunk, finalizeReceiving string + StartStream, StreamChunk, FinalizeStream string } // NewDataStreamer creates a new DataStreamer instance. @@ -42,7 +42,7 @@ type DataStreamingRPCMethods struct { // - `dataSigner` must not be nil; // // otherwise an `error` is returned. -func NewDataStreamer(url string, maxStoreChunkBodySize int, dataSigner signature.DataSignerFunc, rpcMethods DataStreamingRPCMethods) (*DataStreamer, error) { +func NewDataStreamer[T any](url string, maxStoreChunkBodySize int, dataSigner *PayloadSigner, rpcMethods DataStreamingRPCMethods) (*DataStreamer[T], error) { rpcClient, err := rpc.Dial(url) if err != nil { return nil, err @@ -57,7 +57,7 @@ func NewDataStreamer(url string, maxStoreChunkBodySize int, dataSigner signature return nil, errors.New("dataSigner must not be nil") } - return &DataStreamer{ + return &DataStreamer[T]{ rpcClient: rpcClient, chunkSize: chunkSize, dataSigner: dataSigner, @@ -66,7 +66,7 @@ func NewDataStreamer(url string, maxStoreChunkBodySize int, dataSigner signature } func calculateEffectiveChunkSize(maxStoreChunkBodySize int, rpcMethods DataStreamingRPCMethods) (uint64, error) { - jsonOverhead := len("{\"jsonrpc\":\"2.0\",\"id\":4294967295,\"method\":\"\",\"params\":[\"\"]}") + len(rpcMethods.receiveChunk) + jsonOverhead := len("{\"jsonrpc\":\"2.0\",\"id\":4294967295,\"method\":\"\",\"params\":[\"\"]}") + len(rpcMethods.StreamChunk) chunkSize := (maxStoreChunkBodySize - jsonOverhead - 512 /* headers */) / 2 if chunkSize <= 0 { return 0, fmt.Errorf("max-store-chunk-body-size %d doesn't leave enough room for chunk payload", maxStoreChunkBodySize) @@ -75,47 +75,42 @@ func calculateEffectiveChunkSize(maxStoreChunkBodySize int, rpcMethods DataStrea } // StreamData sends arbitrarily long byte sequence to the receiver using a simple chunking-based protocol. -func (ds *DataStreamer) StreamData(ctx context.Context, data []byte, timeout uint64) (storeResult *StoreResult, err error) { +func (ds *DataStreamer[Result]) StreamData(ctx context.Context, data []byte, timeout uint64) (*Result, error) { params := newStreamParams(uint64(len(data)), ds.chunkSize, timeout) - startReqSig, err := ds.generateStartReqSignature(params) + messageId, err := ds.startStream(ctx, params) if err != nil { return nil, err } - batchId, err := ds.startStream(ctx, startReqSig, params) - if err != nil { + if err := ds.doStream(ctx, data, messageId, params); err != nil { return nil, err } - if err := ds.doStream(ctx, data, batchId, params); err != nil { - return nil, err - } + return ds.finalizeStream(ctx, messageId) +} - finalReqSig, err := ds.generateFinalReqSignature(batchId) +func (ds *DataStreamer[Result]) startStream(ctx context.Context, params streamParams) (MessageId, error) { + payloadSignature, err := ds.sign(nil, params.timestamp, params.nChunks, ds.chunkSize, params.dataLen, params.timeout) if err != nil { - return nil, err + return 0, err } - return ds.finalizeStream(ctx, finalReqSig, batchId) -} - -func (ds *DataStreamer) startStream(ctx context.Context, startReqSig []byte, params streamParams) (hexutil.Uint64, error) { - var startChunkedStoreResult StartChunkedStoreResult - err := ds.rpcClient.CallContext( + var result StartStreamingResult + err = ds.rpcClient.CallContext( ctx, - &startChunkedStoreResult, - ds.rpcMethods.startReceiving, + &result, + ds.rpcMethods.StartStream, hexutil.Uint64(params.timestamp), hexutil.Uint64(params.nChunks), hexutil.Uint64(ds.chunkSize), hexutil.Uint64(params.dataLen), hexutil.Uint64(params.timeout), - hexutil.Bytes(startReqSig)) - return startChunkedStoreResult.MessageId, err + hexutil.Bytes(payloadSignature)) + return MessageId(result.MessageId), err } -func (ds *DataStreamer) doStream(ctx context.Context, data []byte, batchId hexutil.Uint64, params streamParams) error { +func (ds *DataStreamer[Result]) doStream(ctx context.Context, data []byte, messageId MessageId, params streamParams) error { chunkRoutines := new(errgroup.Group) for i := uint64(0); i < params.nChunks; i++ { startIndex := i * ds.chunkSize @@ -126,45 +121,34 @@ func (ds *DataStreamer) doStream(ctx context.Context, data []byte, batchId hexut chunkData := data[startIndex:endIndex] chunkRoutines.Go(func() error { - return ds.sendChunk(ctx, batchId, i, chunkData) + return ds.sendChunk(ctx, messageId, i, chunkData) }) } return chunkRoutines.Wait() } -func (ds *DataStreamer) sendChunk(ctx context.Context, batchId hexutil.Uint64, chunkId uint64, chunkData []byte) error { - chunkReqSig, err := ds.generateChunkReqSignature(chunkData, uint64(batchId), chunkId) +func (ds *DataStreamer[Result]) sendChunk(ctx context.Context, messageId MessageId, chunkId uint64, chunkData []byte) error { + payloadSignature, err := ds.sign(chunkData, uint64(messageId), chunkId) if err != nil { return err } + return ds.rpcClient.CallContext(ctx, nil, ds.rpcMethods.StreamChunk, hexutil.Uint64(messageId), hexutil.Uint64(chunkId), hexutil.Bytes(chunkData), hexutil.Bytes(payloadSignature)) +} - err = ds.rpcClient.CallContext(ctx, nil, ds.rpcMethods.receiveChunk, batchId, hexutil.Uint64(chunkId), hexutil.Bytes(chunkData), hexutil.Bytes(chunkReqSig)) +func (ds *DataStreamer[Result]) finalizeStream(ctx context.Context, messageId MessageId) (result *Result, err error) { + payloadSignature, err := ds.sign(nil, uint64(messageId)) if err != nil { - rpcClientSendChunkFailureGauge.Inc(1) - return err + return nil, err } - - rpcClientSendChunkSuccessGauge.Inc(1) - return nil -} - -func (ds *DataStreamer) finalizeStream(ctx context.Context, finalReqSig []byte, batchId hexutil.Uint64) (storeResult *StoreResult, err error) { - err = ds.rpcClient.CallContext(ctx, &storeResult, ds.rpcMethods.finalizeReceiving, batchId, hexutil.Bytes(finalReqSig)) + err = ds.rpcClient.CallContext(ctx, &result, ds.rpcMethods.FinalizeStream, hexutil.Uint64(messageId), hexutil.Bytes(payloadSignature)) return } -func (ds *DataStreamer) generateStartReqSignature(params streamParams) ([]byte, error) { - return applyDasSigner(ds.dataSigner, []byte{}, params.timestamp, params.nChunks, ds.chunkSize, params.dataLen, params.timeout) -} - -func (ds *DataStreamer) generateChunkReqSignature(chunkData []byte, batchId, chunkId uint64) ([]byte, error) { - return applyDasSigner(ds.dataSigner, chunkData, batchId, chunkId) -} - -func (ds *DataStreamer) generateFinalReqSignature(batchId hexutil.Uint64) ([]byte, error) { - return applyDasSigner(ds.dataSigner, []byte{}, uint64(batchId)) +func (ds *DataStreamer[Result]) sign(bytes []byte, extras ...uint64) ([]byte, error) { + return ds.dataSigner.signPayload(bytes, extras...) } +// lint:require-exhaustive-initialization type streamParams struct { timestamp, nChunks, lastChunkSize, dataLen, timeout uint64 } diff --git a/daprovider/das/data_streaming/signing.go b/daprovider/das/data_streaming/signing.go new file mode 100644 index 0000000000..2b64f2b8fe --- /dev/null +++ b/daprovider/das/data_streaming/signing.go @@ -0,0 +1,54 @@ +package data_streaming + +import ( + "context" + "encoding/binary" + + "github.com/ethereum/go-ethereum/crypto" + + "github.com/offchainlabs/nitro/util/arbmath" + "github.com/offchainlabs/nitro/util/signature" +) + +// lint:require-exhaustive-initialization +type PayloadSigner struct { + signPayload func(bytes []byte, extras ...uint64) ([]byte, error) +} + +func DefaultPayloadSigner(signer signature.DataSignerFunc) *PayloadSigner { + return CustomPayloadSigner(func(bytes []byte, extras ...uint64) ([]byte, error) { + return signer(crypto.Keccak256(flattenDataForSigning(bytes, extras...))) + }) +} + +func CustomPayloadSigner(signingFunc func([]byte, ...uint64) ([]byte, error)) *PayloadSigner { + return &PayloadSigner{ + signPayload: signingFunc, + } +} + +// lint:require-exhaustive-initialization +type PayloadVerifier struct { + verifyPayload func(ctx context.Context, signature []byte, bytes []byte, extras ...uint64) error +} + +func DefaultPayloadVerifier(verifier *signature.Verifier) *PayloadVerifier { + return CustomPayloadVerifier(func(ctx context.Context, signature []byte, bytes []byte, extras ...uint64) error { + expectedPayload := flattenDataForSigning(bytes, extras...) + return verifier.VerifyData(ctx, signature, expectedPayload) + }) +} + +func CustomPayloadVerifier(verifyingFunc func(ctx context.Context, signature []byte, bytes []byte, extras ...uint64) error) *PayloadVerifier { + return &PayloadVerifier{ + verifyPayload: verifyingFunc, + } +} + +func flattenDataForSigning(bytes []byte, extras ...uint64) []byte { + var bufferForExtras []byte + for _, field := range extras { + bufferForExtras = binary.BigEndian.AppendUint64(bufferForExtras, field) + } + return arbmath.ConcatByteSlices(bytes, bufferForExtras) +} diff --git a/daprovider/das/rpc_test.go b/daprovider/das/rpc_test.go index da02806137..bf1028363e 100644 --- a/daprovider/das/rpc_test.go +++ b/daprovider/das/rpc_test.go @@ -1,4 +1,4 @@ -// Copyright 2021-2022, Offchain Labs, Inc. +// Copyright 2021-2025, Offchain Labs, Inc. // For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md package das diff --git a/daprovider/das/signature_verifier.go b/daprovider/das/signature_verifier.go index 9f6429cfc5..2787218771 100644 --- a/daprovider/das/signature_verifier.go +++ b/daprovider/das/signature_verifier.go @@ -26,30 +26,6 @@ type SignatureVerifier struct { extraBpVerifier func(message []byte, sig []byte, extraFields ...uint64) bool } -func NewSignatureVerifier(ctx context.Context, config DataAvailabilityConfig) (*SignatureVerifier, error) { - if config.ParentChainNodeURL == "none" { - return NewSignatureVerifierWithSeqInboxCaller(nil, config.ExtraSignatureCheckingPublicKey) - } - l1client, err := GetL1Client(ctx, config.ParentChainConnectionAttempts, config.ParentChainNodeURL) - if err != nil { - return nil, err - } - seqInboxAddress, err := OptionalAddressFromString(config.SequencerInboxAddress) - if err != nil { - return nil, err - } - if seqInboxAddress == nil { - return NewSignatureVerifierWithSeqInboxCaller(nil, config.ExtraSignatureCheckingPublicKey) - } - - seqInboxCaller, err := bridgegen.NewSequencerInboxCaller(*seqInboxAddress, l1client) - if err != nil { - return nil, err - } - return NewSignatureVerifierWithSeqInboxCaller(seqInboxCaller, config.ExtraSignatureCheckingPublicKey) - -} - func NewSignatureVerifierWithSeqInboxCaller( seqInboxCaller *bridgegen.SequencerInboxCaller, extraSignatureCheckingPublicKey string, diff --git a/util/signature/sign_verify.go b/util/signature/sign_verify.go index 738aad4f7f..81eaf6a57d 100644 --- a/util/signature/sign_verify.go +++ b/util/signature/sign_verify.go @@ -33,7 +33,7 @@ func SignVerifyConfigAddOptions(prefix string, f *pflag.FlagSet) { } var DefaultSignVerifyConfig = SignVerifyConfig{ - ECDSA: DefultFeedVerifierConfig, + ECDSA: DefaultFeedVerifierConfig, SymmetricFallback: false, SymmetricSign: false, Symmetric: EmptySimpleHmacConfig, diff --git a/util/signature/verifier.go b/util/signature/verifier.go index e37b8385dc..9e8a7c96eb 100644 --- a/util/signature/verifier.go +++ b/util/signature/verifier.go @@ -37,16 +37,16 @@ var ErrMissingSignature = fmt.Errorf("%w: signature not found", ErrSignatureNotV var ErrSignerNotApproved = fmt.Errorf("%w: signer not approved", ErrSignatureNotVerified) func FeedVerifierConfigAddOptions(prefix string, f *pflag.FlagSet) { - f.StringSlice(prefix+".allowed-addresses", DefultFeedVerifierConfig.AllowedAddresses, "a list of allowed addresses") - f.Bool(prefix+".accept-sequencer", DefultFeedVerifierConfig.AcceptSequencer, "accept verified message from sequencer") + f.StringSlice(prefix+".allowed-addresses", DefaultFeedVerifierConfig.AllowedAddresses, "a list of allowed addresses") + f.Bool(prefix+".accept-sequencer", DefaultFeedVerifierConfig.AcceptSequencer, "accept verified message from sequencer") DangerousFeedVerifierConfigAddOptions(prefix+".dangerous", f) } func DangerousFeedVerifierConfigAddOptions(prefix string, f *pflag.FlagSet) { - f.Bool(prefix+".accept-missing", DefultFeedVerifierConfig.Dangerous.AcceptMissing, "accept empty as valid signature") + f.Bool(prefix+".accept-missing", DefaultFeedVerifierConfig.Dangerous.AcceptMissing, "accept empty as valid signature") } -var DefultFeedVerifierConfig = VerifierConfig{ +var DefaultFeedVerifierConfig = VerifierConfig{ AllowedAddresses: []string{}, AcceptSequencer: true, Dangerous: DangerousVerifierConfig{