Skip to content
Draft
207 changes: 170 additions & 37 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transac
hash: tx.Hash(),
vhashes: tx.BlobHashes(),
id: id,
storageSize: storageSize,
size: size,
storageSize: storageSize, // size of tx including cells
size: size, // size of tx including blobs
nonce: tx.Nonce(),
costCap: uint256.MustFromBig(tx.Cost()),
execTipCap: uint256.MustFromBig(tx.GasTipCap()),
Expand All @@ -136,6 +136,71 @@ func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transac
return meta
}

type PooledBlobTx struct {
Transaction *types.Transaction
Sidecar *types.BlobTxCellSidecar
Size uint64 // original transaction size
}

func NewPooledBlobTx(tx *types.Transaction, sidecar *types.BlobTxCellSidecar, size uint64) *PooledBlobTx {
return &PooledBlobTx{
Transaction: tx,
Sidecar: sidecar,
Size: size,
}
}

func (ptx *PooledBlobTx) Hash() common.Hash {
return ptx.Transaction.Hash()
}

func (ptx *PooledBlobTx) Convert() (*types.Transaction, error) {
if ptx.Sidecar == nil {
return nil, errors.New("cell sidecar missing")
}
cellSidecar := ptx.Sidecar
blobs, err := kzg4844.RecoverBlobs(cellSidecar.Cells, cellSidecar.Custody.Indices())
if err != nil {
return nil, err
}
sidecar := types.NewBlobTxSidecar(cellSidecar.Version, blobs, cellSidecar.Commitments, cellSidecar.Proofs)

return ptx.Transaction.WithBlobTxSidecar(sidecar), nil
}
func (ptx *PooledBlobTx) RemoveParity() error {
sc := ptx.Sidecar
if sc == nil {
return errors.New("nil sidecar")
}

for bit := range kzg4844.DataPerBlob {
if !sc.Custody.IsSet(uint(bit)) {
return errors.New("cannot remove parity for non-full payload transaction")
}
}

blobCount := len(sc.Cells) / kzg4844.CellsPerBlob
if blobCount == 0 || len(sc.Cells)%kzg4844.CellsPerBlob != 0 {
return errors.New("inconsistent cell count")
}

var cellsWithoutParity []kzg4844.Cell
for blob := range blobCount {
offset := blob * kzg4844.CellsPerBlob
cellsWithoutParity = append(
cellsWithoutParity,
sc.Cells[offset:offset+kzg4844.DataPerBlob]...,
)

for bit := 64; bit < kzg4844.CellsPerBlob; bit++ {
sc.Custody.Clear(uint(bit))
}
}

sc.Cells = cellsWithoutParity
return nil
}

// BlobPool is the transaction pool dedicated to EIP-4844 blob transactions.
//
// Blob transactions are special snowflakes that are designed for a very specific
Expand Down Expand Up @@ -392,6 +457,7 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
fails = append(fails, id)
}
}
//todo(healthykim) edit slotter logic
slotter := newSlotter(eip4844.LatestMaxBlobsPerBlock(p.chain.Config()))
store, err := billy.Open(billy.Options{Path: queuedir, Repair: true}, slotter, index)
if err != nil {
Expand Down Expand Up @@ -473,19 +539,28 @@ func (p *BlobPool) Close() error {
// each transaction on disk to create the in-memory metadata index.
func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
tx := new(types.Transaction)
if err := rlp.DecodeBytes(blob, tx); err != nil {

pooledTx := new(PooledBlobTx)
if err := rlp.DecodeBytes(blob, pooledTx); err != nil {
// This path is impossible unless the disk data representation changes
// across restarts. For that ever improbable case, recover gracefully
// by ignoring this data entry.
log.Error("Failed to decode blob pool entry", "id", id, "err", err)
return err
}
if tx.BlobTxSidecar() == nil {
log.Error("Missing sidecar in blob pool entry", "id", id, "hash", tx.Hash())
return errors.New("missing blob sidecar")
if err := rlp.DecodeBytes(blob, tx); err != nil {
log.Error("Failed to decode blob pool entry", "id", id, "err", err)
return errors.New("unknown tx type")
}
if tx.BlobTxSidecar() == nil {
log.Error("Missing sidecar in blob pool entry", "id", id, "hash", tx.Hash())
return errors.New("missing sidecar")
}
} else {
tx, err = pooledTx.Convert()
if err != nil {
return err
}
}

meta := newBlobTxMeta(id, tx.Size(), size, tx)

if p.lookup.exists(meta.hash) {
// This path is only possible after a crash, where deleted items are not
// removed via the normal shutdown-startup procedure and thus may get
Expand Down Expand Up @@ -775,17 +850,17 @@ func (p *BlobPool) offload(addr common.Address, nonce uint64, id uint64, inclusi
log.Error("Blobs missing for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err)
return
}
var tx types.Transaction
if err = rlp.DecodeBytes(data, &tx); err != nil {
var pooledTx PooledBlobTx
if err = rlp.DecodeBytes(data, &pooledTx); err != nil {
log.Error("Blobs corrupted for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err)
return
}
block, ok := inclusions[tx.Hash()]
block, ok := inclusions[pooledTx.Transaction.Hash()]
if !ok {
log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", nonce, "id", id)
return
}
if err := p.limbo.push(&tx, block); err != nil {
if err := p.limbo.push(&pooledTx, block); err != nil {
log.Warn("Failed to offload blob tx into limbo", "err", err)
return
}
Expand Down Expand Up @@ -997,20 +1072,20 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
// Serialize the transaction back into the primary datastore.
blob, err := rlp.EncodeToBytes(tx)
if err != nil {
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
log.Error("Failed to encode transaction for storage", "hash", tx.Transaction.Hash(), "err", err)
return err
}
id, err := p.store.Put(blob)
if err != nil {
log.Error("Failed to write transaction into storage", "hash", tx.Hash(), "err", err)
log.Error("Failed to write transaction into storage", "hash", tx.Transaction.Hash(), "err", err)
return err
}

// Update the indices and metrics
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx)
meta := newBlobTxMeta(id, tx.Size, p.store.Size(id), tx.Transaction)
if _, ok := p.index[addr]; !ok {
if err := p.reserver.Hold(addr); err != nil {
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err)
log.Warn("Failed to reserve account for blob pool", "tx", tx.Transaction.Hash(), "from", addr, "err", err)
return err
}
p.index[addr] = []*blobTxMeta{meta}
Expand Down Expand Up @@ -1140,7 +1215,17 @@ func (p *BlobPool) checkDelegationLimit(tx *types.Transaction) error {

// validateTx checks whether a transaction is valid according to the consensus
// rules and adheres to some heuristic limits of the local node (price and size).
func (p *BlobPool) validateTx(tx *types.Transaction) error {
func (p *BlobPool) validateTx(tx *types.Transaction, cellSidecar *types.BlobTxCellSidecar) error {
if tx.BlobTxSidecar() != nil || cellSidecar == nil {
return errors.New("malformed transaction and cellSidecar")
}
if err := txpool.ValidateBlobSidecar(tx, cellSidecar, p.head, &txpool.ValidationOptions{
Config: p.chain.Config(),
MaxBlobCount: maxBlobsPerTx,
}); err != nil {
return err
}

if err := p.ValidateTxBasics(tx); err != nil {
return err
}
Expand Down Expand Up @@ -1250,10 +1335,30 @@ func (p *BlobPool) getRLP(hash common.Hash) []byte {
}
data, err := p.store.Get(id)
if err != nil {
log.Error("Tracked blob transaction missing from store", "hash", hash, "id", id, "err", err)
log.Error("Failed to get transaction in blobpool", "hash", hash, "id", id, "err", err)
return nil
}
tx := new(types.Transaction)
pooledTx := new(PooledBlobTx)
if err := rlp.DecodeBytes(data, pooledTx); err != nil {
if err := rlp.DecodeBytes(data, tx); err != nil {
log.Error("Failed to decode transaction in blobpool", "hash", hash, "id", id, "err", err)
return nil
}
return data
}
tx, err = pooledTx.Convert()
if err != nil {
log.Error("Failed to convert transaction in blobpool", "hash", hash, "id", id, "err", err)
return nil
}
encoded, err := rlp.EncodeToBytes(tx)
if err != nil {
log.Error("Failed to encode transaction in blobpool", "hash", hash, "id", id, "err", err)
return nil
}
return data

return encoded
}

// Get returns a transaction if it is contained in the pool, or nil otherwise.
Expand All @@ -1262,15 +1367,20 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
if len(data) == 0 {
return nil
}
item := new(types.Transaction)
if err := rlp.DecodeBytes(data, item); err != nil {
id, _ := p.lookup.storeidOfTx(hash)
tx := new(types.Transaction)
pooledTx := new(PooledBlobTx)
if err := rlp.DecodeBytes(data, pooledTx); err != nil {
if err := rlp.DecodeBytes(data, tx); err != nil {
id, _ := p.lookup.storeidOfTx(hash)

log.Error("Blobs corrupted for traced transaction",
"hash", hash, "id", id, "err", err)
return nil
log.Error("Blobs corrupted for traced transaction",
"hash", hash, "id", id, "err", err)
return nil
}
return tx
}
return item
tx, _ = pooledTx.Convert()
return tx
}

// GetRLP returns a RLP-encoded transaction if it is contained in the pool.
Expand Down Expand Up @@ -1340,11 +1450,20 @@ func (p *BlobPool) GetBlobs(vhashes []common.Hash, version byte) ([]*kzg4844.Blo

// Decode the blob transaction
tx := new(types.Transaction)
if err := rlp.DecodeBytes(data, tx); err != nil {
log.Error("Blobs corrupted for traced transaction", "id", txID, "err", err)
continue
var sidecar *types.BlobTxSidecar
pooledTx := new(PooledBlobTx)
if err := rlp.DecodeBytes(data, pooledTx); err != nil {
if err := rlp.DecodeBytes(data, tx); err != nil {
log.Error("Blobs corrupted for traced transaction", "id", txID, "err", err)
continue
}
} else {
tx, err = pooledTx.Convert()
if err != nil {
return nil, nil, nil, err
}
}
sidecar := tx.BlobTxSidecar()
sidecar = tx.BlobTxSidecar()
if sidecar == nil {
log.Error("Blob tx without sidecar", "hash", tx.Hash(), "id", txID)
continue
Expand Down Expand Up @@ -1419,7 +1538,15 @@ func (p *BlobPool) Add(txs []*types.Transaction, sync bool) []error {
adds = make([]*types.Transaction, 0, len(txs))
)
for i, tx := range txs {
errs[i] = p.add(tx)
if errs[i] != nil {
continue
}
cellSidecar, err := tx.BlobTxSidecar().ToBlobTxCellSidecar()
if err != nil {
errs[i] = err
continue
}
errs[i] = p.add(tx.WithoutBlobTxSidecar(), cellSidecar, tx.Size())
if errs[i] == nil {
adds = append(adds, tx.WithoutBlobTxSidecar())
}
Expand All @@ -1433,7 +1560,7 @@ func (p *BlobPool) Add(txs []*types.Transaction, sync bool) []error {

// add inserts a new blob transaction into the pool if it passes validation (both
// consensus validity and pool restrictions).
func (p *BlobPool) add(tx *types.Transaction) (err error) {
func (p *BlobPool) add(tx *types.Transaction, cellSidecar *types.BlobTxCellSidecar, size uint64) (err error) {
// The blob pool blocks on adding a transaction. This is because blob txs are
// only even pulled from the network, so this method will act as the overload
// protection for fetches.
Expand All @@ -1447,7 +1574,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
}(time.Now())

// Ensure the transaction is valid from all perspectives
if err := p.validateTx(tx); err != nil {
if err := p.validateTx(tx, cellSidecar); err != nil {
log.Trace("Transaction validation failed", "hash", tx.Hash(), "err", err)
switch {
case errors.Is(err, txpool.ErrUnderpriced):
Expand Down Expand Up @@ -1489,9 +1616,15 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
}
}()
}

// Transaction permitted into the pool from a nonce and cost perspective,
// insert it into the database and update the indices
blob, err := rlp.EncodeToBytes(tx)
pooledTx := NewPooledBlobTx(tx, cellSidecar, size)
if pooledTx.RemoveParity() != nil {
return err
}

blob, err := rlp.EncodeToBytes(pooledTx)
if err != nil {
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
return err
Expand All @@ -1500,7 +1633,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
if err != nil {
return err
}
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx)
meta := newBlobTxMeta(id, pooledTx.Size, p.store.Size(id), tx)

var (
next = p.state.GetNonce(from)
Expand Down
17 changes: 12 additions & 5 deletions core/txpool/blobpool/blobpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,12 @@ func verifyBlobRetrievals(t *testing.T, pool *BlobPool) {
}
// Item retrieved, make sure it matches the expectation
index := testBlobIndices[hash]
if *blobs1[i] != *testBlobs[index] || proofs1[i][0] != testBlobProofs[index] {
t.Errorf("retrieved blob or proof mismatch: item %d, hash %x", i, hash)
if *blobs1[i] != *testBlobs[index] {
t.Errorf("retrieved blob mismatch: item %d, hash %x", i, hash)
continue
}
if proofs1[i][0] != testBlobProofs[index] {
t.Errorf("retrieved proof mismatch: item %d, hash %x", i, hash)
continue
}
if *blobs2[i] != *testBlobs[index] || !slices.Equal(proofs2[i], testBlobCellProofs[index]) {
Expand Down Expand Up @@ -1214,7 +1218,7 @@ func TestBlobCountLimit(t *testing.T) {

// Check that first succeeds second fails.
if errs[0] != nil {
t.Fatalf("expected tx with 7 blobs to succeed")
t.Fatalf("expected tx with 7 blobs to succeed, got: %v", errs[0])
}
if !errors.Is(errs[1], txpool.ErrTxBlobLimitExceeded) {
t.Fatalf("expected tx with 8 blobs to fail, got: %v", errs[1])
Expand Down Expand Up @@ -1643,7 +1647,9 @@ func TestAdd(t *testing.T) {
// Add each transaction one by one, verifying the pool internals in between
for j, add := range tt.adds {
signed, _ := types.SignNewTx(keys[add.from], types.LatestSigner(params.MainnetChainConfig), add.tx)
if err := pool.add(signed); !errors.Is(err, add.err) {
sidecar, _ := signed.BlobTxSidecar().ToBlobTxCellSidecar()

if err := pool.add(signed.WithoutBlobTxSidecar(), sidecar, signed.Size()); !errors.Is(err, add.err) {
t.Errorf("test %d, tx %d: adding transaction error mismatch: have %v, want %v", i, j, err, add.err)
}
if add.err == nil {
Expand Down Expand Up @@ -2015,7 +2021,8 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) {
b.Fatal(err)
}
statedb.AddBalance(addr, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
pool.add(tx)
sidecar, _ := tx.BlobTxSidecar().ToBlobTxCellSidecar()
pool.add(tx.WithoutBlobTxSidecar(), sidecar, tx.Size())
}
statedb.Commit(0, true, false)
defer pool.Close()
Expand Down
Loading
Loading