diff --git a/.mockery.yaml b/.mockery.yaml index a0c9da282..9105911fc 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -1,6 +1,8 @@ quiet: False dir: "{{.InterfaceDir}}/mocks" mockname: "Mock{{.InterfaceName}}" +with-expecter: true +issue-845-fix: True filename: "{{.InterfaceName | snakecase}}.go" packages: @@ -9,3 +11,6 @@ packages: Header: Protocol: Handler: + github.com/wavesplatform/gowaves/pkg/blockchaininfo: + interfaces: + UpdatesPublisherInterface: diff --git a/Makefile b/Makefile index f779a7122..3896f535e 100644 --- a/Makefile +++ b/Makefile @@ -262,6 +262,8 @@ mock: mockgen -source pkg/p2p/peer/peer.go -destination pkg/mock/peer.go -package mock Peer mockgen -source pkg/state/api.go -destination pkg/mock/state.go -package mock State mockgen -source pkg/grpc/server/api.go -destination pkg/mock/grpc.go -package mock GrpcHandlers + mockery --dir=pkg/mock --filename=blockchaininfo_types.go --outpkg=mock # The interface name must be specified in .mockery.yaml, see examples there. + proto: @protoc --proto_path=pkg/grpc/protobuf-schemas/proto/ --go_out=./ --go_opt=module=$(MODULE) --go-vtproto_out=./ --go-vtproto_opt=features=marshal_strict+unmarshal+size --go-vtproto_opt=module=$(MODULE) pkg/grpc/protobuf-schemas/proto/waves/*.proto diff --git a/cmd/importer/importer.go b/cmd/importer/importer.go index 32bd7b9a7..331a89518 100644 --- a/cmd/importer/importer.go +++ b/cmd/importer/importer.go @@ -192,7 +192,7 @@ func runImporter(c *cfg) error { return err } - st, err := state.NewState(c.dataDirPath, false, c.params(fds), ss, false) + st, err := state.NewState(c.dataDirPath, false, c.params(fds), ss, false, nil) if err != nil { return fmt.Errorf("failed to create state: %w", err) } diff --git a/cmd/node/node.go b/cmd/node/node.go index 7f4497591..534ef4d39 100644 --- a/cmd/node/node.go +++ b/cmd/node/node.go @@ -19,11 +19,13 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus/promhttp" + "go.uber.org/zap" "go.uber.org/zap/zapcore" "golang.org/x/sync/errgroup" "github.com/wavesplatform/gowaves/pkg/api" + "github.com/wavesplatform/gowaves/pkg/blockchaininfo" "github.com/wavesplatform/gowaves/pkg/crypto" "github.com/wavesplatform/gowaves/pkg/grpc/server" "github.com/wavesplatform/gowaves/pkg/libs/microblock_cache" @@ -71,51 +73,53 @@ var defaultPeers = map[string]string{ type config struct { isParsed bool - logLevel zapcore.Level - logDevelopment bool - logNetwork bool - logNetworkData bool - logFSM bool - statePath string - blockchainType string - peerAddresses string - declAddr string - nodeName string - cfgPath string - apiAddr string - apiKey string - apiMaxConnections int - rateLimiterOptions string - grpcAddr string - grpcAPIMaxConnections int - enableMetaMaskAPI bool - enableMetaMaskAPILog bool - enableGrpcAPI bool - blackListResidenceTime time.Duration - buildExtendedAPI bool - serveExtendedAPI bool - buildStateHashes bool - bindAddress string - disableOutgoingConnections bool - minerVoteFeatures string - disableBloomFilter bool - reward int64 - obsolescencePeriod time.Duration - walletPath string - walletPassword string - limitAllConnections uint - minPeersMining int - disableMiner bool - profiler bool - prometheus string - metricsID int - metricsURL string - dropPeers bool - dbFileDescriptors uint - newConnectionsLimit int - disableNTP bool - microblockInterval time.Duration - enableLightMode bool + logLevel zapcore.Level + logDevelopment bool + logNetwork bool + logNetworkData bool + logFSM bool + statePath string + blockchainType string + peerAddresses string + declAddr string + nodeName string + cfgPath string + apiAddr string + apiKey string + apiMaxConnections int + rateLimiterOptions string + grpcAddr string + grpcAPIMaxConnections int + enableMetaMaskAPI bool + enableMetaMaskAPILog bool + enableGrpcAPI bool + blackListResidenceTime time.Duration + buildExtendedAPI bool + serveExtendedAPI bool + buildStateHashes bool + bindAddress string + disableOutgoingConnections bool + minerVoteFeatures string + disableBloomFilter bool + reward int64 + obsolescencePeriod time.Duration + walletPath string + walletPassword string + limitAllConnections uint + minPeersMining int + disableMiner bool + profiler bool + prometheus string + metricsID int + metricsURL string + dropPeers bool + dbFileDescriptors uint + newConnectionsLimit int + disableNTP bool + microblockInterval time.Duration + enableLightMode bool + enableBlockchainUpdatesPlugin bool + BlockchainUpdatesL2Address string } var errConfigNotParsed = stderrs.New("config is not parsed") @@ -168,6 +172,7 @@ func (c *config) logParameters() { zap.S().Debugf("disable-ntp: %t", c.disableNTP) zap.S().Debugf("microblock-interval: %s", c.microblockInterval) zap.S().Debugf("enable-light-mode: %t", c.enableLightMode) + zap.S().Debugf("enable-blockchain-updates-plugin: %t", c.enableBlockchainUpdatesPlugin) } func (c *config) parse() { @@ -265,6 +270,11 @@ func (c *config) parse() { "Interval between microblocks.") flag.BoolVar(&c.enableLightMode, "enable-light-mode", false, "Start node in light mode") + + flag.BoolVar(&c.enableBlockchainUpdatesPlugin, "enable-blockchain-info", false, + "Turn on blockchain updates plugin") + flag.StringVar(&c.BlockchainUpdatesL2Address, "l2-contract-address", "", + "Specify the smart contract address from which the updates will be pulled") flag.Parse() c.logLevel = *l } @@ -361,6 +371,20 @@ func run(nc *config) (retErr error) { return nil } +func initBlockchainUpdatesPlugin(ctx context.Context, + l2addressContract string, + enableBlockchainUpdatesPlugin bool, + updatesChannel chan<- proto.BUpdatesInfo, firstBlock *bool, +) (*proto.BlockchainUpdatesPluginInfo, error) { + l2address, cnvrtErr := proto.NewAddressFromString(l2addressContract) + if cnvrtErr != nil { + return nil, errors.Wrapf(cnvrtErr, "failed to convert L2 contract address %q", l2addressContract) + } + bUpdatesPluginInfo := proto.NewBlockchainUpdatesPluginInfo(ctx, l2address, updatesChannel, + firstBlock, enableBlockchainUpdatesPlugin) + return bUpdatesPluginInfo, nil +} + func runNode(ctx context.Context, nc *config) (_ io.Closer, retErr error) { cfg, err := blockchainSettings(nc) if err != nil { @@ -392,11 +416,39 @@ func runNode(ctx context.Context, nc *config) (_ io.Closer, retErr error) { return nil, errors.Wrap(err, "failed to create state parameters") } - st, err := state.NewState(path, true, params, cfg, nc.enableLightMode) + updatesChannel := make(chan proto.BUpdatesInfo, blockchaininfo.UpdatesBufferedChannelSize) + firstBlock := false + bUpdatesPluginInfo, initErr := initBlockchainUpdatesPlugin(ctx, nc.BlockchainUpdatesL2Address, + nc.enableBlockchainUpdatesPlugin, updatesChannel, &firstBlock) + if initErr != nil { + return nil, errors.Wrap(initErr, "failed to initialize blockchain updates plugin") + } + st, err := state.NewState(path, true, params, cfg, nc.enableLightMode, bUpdatesPluginInfo) if err != nil { return nil, errors.Wrap(err, "failed to initialize node's state") } defer func() { retErr = closeIfErrorf(st, retErr, "failed to close state") }() + makeExtensionReadyFunc := func() { + bUpdatesPluginInfo.MakeExtensionReady() + } + + if nc.enableBlockchainUpdatesPlugin { + bUpdatesExtension, bUErr := initializeBlockchainUpdatesExtension(ctx, cfg, nc.BlockchainUpdatesL2Address, + updatesChannel, &firstBlock, st, makeExtensionReadyFunc) + if bUErr != nil { + bUpdatesExtension.Close() + return nil, errors.Wrap(bUErr, "failed to run blockchain updates plugin") + } + go func() { + publshrErr := bUpdatesExtension.RunBlockchainUpdatesPublisher(ctx, + cfg.AddressSchemeCharacter) + if publshrErr != nil { + zap.S().Fatalf("Failed to run blockchain updates publisher: %v", publshrErr) + } + }() + zap.S().Info("The blockchain info extension started pulling info from smart contract address", + nc.BlockchainUpdatesL2Address) + } features, err := minerFeatures(st, nc.minerVoteFeatures) if err != nil { @@ -794,6 +846,32 @@ func runAPIs( return nil } +func initializeBlockchainUpdatesExtension( + ctx context.Context, + cfg *settings.BlockchainSettings, + l2ContractAddress string, + updatesChannel chan proto.BUpdatesInfo, + firstBlock *bool, + state state.State, + makeExtensionReady func(), +) (*blockchaininfo.BlockchainUpdatesExtension, error) { + bUpdatesExtensionState, err := blockchaininfo.NewBUpdatesExtensionState( + blockchaininfo.StoreBlocksLimit, + cfg.AddressSchemeCharacter, + l2ContractAddress, + state, + ) + if err != nil { + return nil, errors.Wrap(err, "failed to initialize blockchain updates extension state") + } + l2address, cnvrtErr := proto.NewAddressFromString(l2ContractAddress) + if cnvrtErr != nil { + return nil, errors.Wrapf(cnvrtErr, "failed to convert L2 contract address %q", l2ContractAddress) + } + return blockchaininfo.NewBlockchainUpdatesExtension(ctx, l2address, updatesChannel, + bUpdatesExtensionState, firstBlock, makeExtensionReady), nil +} + func FromArgs(scheme proto.Scheme, c *config) func(s *settings.NodeSettings) error { return func(s *settings.NodeSettings) error { s.DeclaredAddr = c.declAddr diff --git a/cmd/rollback/main.go b/cmd/rollback/main.go index a53d3c13d..c4c9c3e54 100644 --- a/cmd/rollback/main.go +++ b/cmd/rollback/main.go @@ -84,7 +84,7 @@ func main() { params.BuildStateHashes = *buildStateHashes params.StoreExtendedApiData = *buildExtendedAPI - s, err := state.NewState(*statePath, true, params, cfg, false) + s, err := state.NewState(*statePath, true, params, cfg, false, nil) if err != nil { zap.S().Error(err) return diff --git a/cmd/statehash/statehash.go b/cmd/statehash/statehash.go index 896f2026c..97e985b00 100644 --- a/cmd/statehash/statehash.go +++ b/cmd/statehash/statehash.go @@ -109,7 +109,7 @@ func run() error { params.BuildStateHashes = true params.ProvideExtendedApi = false - st, err := state.NewState(statePath, false, params, ss, false) + st, err := state.NewState(statePath, false, params, ss, false, nil) if err != nil { zap.S().Errorf("Failed to open state at '%s': %v", statePath, err) return err diff --git a/go.mod b/go.mod index f6e57a2a6..37471cc25 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/jinzhu/copier v0.4.0 github.com/mr-tron/base58 v1.2.0 github.com/nats-io/nats-server/v2 v2.11.3 + github.com/nats-io/nats.go v1.41.2 github.com/neilotoole/slogt v1.1.0 github.com/ory/dockertest/v3 v3.12.0 github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 diff --git a/pkg/blockchaininfo/blockchaininfo.go b/pkg/blockchaininfo/blockchaininfo.go new file mode 100644 index 000000000..c62bd14bb --- /dev/null +++ b/pkg/blockchaininfo/blockchaininfo.go @@ -0,0 +1,105 @@ +package blockchaininfo + +import ( + "bytes" + "encoding/binary" + "strconv" + "strings" + + "github.com/pkg/errors" + "github.com/wavesplatform/gowaves/pkg/proto" +) + +const EpochKeyPrefix = "epoch_" +const blockMeta0xKeyPrefix = "block_0x" + +// Helper function to read uint64 from bytes. +func readInt64(data *bytes.Reader) (int64, error) { + var num int64 + err := binary.Read(data, binary.BigEndian, &num) + if err != nil { + return 0, err + } + return num, nil +} + +// Decode base64 and extract blockHeight and height. +func extractEpochFromBlockMeta(blockMetaValue []byte) (int64, error) { + var blockMeta BlockMeta + err := blockMeta.UnmarshalBinary(blockMetaValue) + if err != nil { + return 0, errors.Errorf("failed to unmarshal blockMeta, %v", err) + } + + return blockMeta.BlockEpoch, nil +} + +func filterEpochEntry(entry proto.DataEntry, beforeHeight uint64) ([]proto.DataEntry, error) { + key := entry.GetKey() + // Extract the part after "epoch_" + epochStr := key[len(EpochKeyPrefix):] + + epochNumber, err := strconv.ParseUint(epochStr, 10, 64) + if err != nil { + return nil, err + } + + // Return this entry only if epochNumber is greater than beforeHeight + if epochNumber > beforeHeight { + return []proto.DataEntry{entry}, nil + } + return nil, nil +} + +func filterBlock0xEntry(entry proto.DataEntry, beforeHeight uint64) ([]proto.DataEntry, error) { + // Extract blockHeight and height from base64. + binaryEntry, ok := entry.(*proto.BinaryDataEntry) + if !ok { + return nil, errors.New("failed to convert block meta key to binary data entry") + } + epoch, err := extractEpochFromBlockMeta(binaryEntry.Value) + if err != nil { + return nil, errors.Errorf("failed to filter data entries, %v", err) + } + + if epoch < 0 { + return nil, errors.New("epoch is less than 0") + } + // Return this entry only if epochNumber is greater than beforeHeight + if uint64(epoch) > beforeHeight { + return []proto.DataEntry{entry}, nil + } + return nil, nil +} + +func filterDataEntries(beforeHeight uint64, dataEntries []proto.DataEntry) ([]proto.DataEntry, error) { + var filteredDataEntries []proto.DataEntry + + for _, entry := range dataEntries { + key := entry.GetKey() + + switch { + // Filter "epoch_" prefixed keys. + case strings.HasPrefix(key, EpochKeyPrefix): + entryOrNil, err := filterEpochEntry(entry, beforeHeight) + if err != nil { + return nil, err + } + filteredDataEntries = append(filteredDataEntries, entryOrNil...) + + // Filter block_0x binary entries. + case strings.HasPrefix(key, blockMeta0xKeyPrefix): + entryOrNil, err := filterBlock0xEntry(entry, beforeHeight) + if err != nil { + return nil, err + } + filteredDataEntries = append(filteredDataEntries, entryOrNil...) + + // Default case to handle non-epoch and non-base64 entries. + default: + filteredDataEntries = append(filteredDataEntries, entry) + } + } + + return filteredDataEntries, nil +} diff --git a/pkg/blockchaininfo/blockchaininfo_test.go b/pkg/blockchaininfo/blockchaininfo_test.go new file mode 100644 index 000000000..07da12246 --- /dev/null +++ b/pkg/blockchaininfo/blockchaininfo_test.go @@ -0,0 +1,490 @@ +package blockchaininfo_test + +import ( + "sort" + "strconv" + "testing" + + "github.com/nats-io/nats-server/v2/server" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/wavesplatform/gowaves/pkg/blockchaininfo" + mocks "github.com/wavesplatform/gowaves/pkg/mock" + "github.com/wavesplatform/gowaves/pkg/proto" +) + +// some random test data. +func testBlockUpdates() proto.BlockUpdatesInfo { + var b proto.BlockUpdatesInfo + + var ( + height uint64 = 100 + vrf = proto.B58Bytes{} + blockID = proto.BlockID{} + blockHeader = proto.BlockHeader{} + ) + + b.Height = height + b.VRF = vrf + b.BlockID = blockID + b.BlockHeader = blockHeader + + return b +} + +func containsDataEntry(changes []proto.DataEntry, key string, dataType string) bool { + for _, entry := range changes { + // Check if the key matches + if entry.GetKey() == key { + // Use a type switch to check the type + switch entry.(type) { + case *proto.BinaryDataEntry: + if dataType == "binary" { + return true + } + case *proto.DeleteDataEntry: + if dataType == "delete" { + return true + } + default: + } + } + } + return false +} + +// This tests check whether the changes generation will show the new records and will remove the old ones. +// Previous state contains 3 records, but the current state doesn't contain them and has 3 new records. +// The change result must be - 3 new records, and 3 old records for deletion. + +func TestChangesGenerationNewEntries(t *testing.T) { + previousFirstKey := "block_0x3a85dedc42db076c91cf61d72fa17c80777aeed70ba68dbc14d6829dd6e88614" + previousSecondKey := "block_0x3a9209ce524553a75fd0e9bde5c99ff254b1fb231916fc89755be957e51e5516" + previousThirdKey := "block_0x3b0181d3f66d9f0ddd8e1e8567b836a01f652b4cb873aa7b7c46fc8bd1e4eeee" + + previousDataEntries := []proto.DataEntry{ + &proto.BinaryDataEntry{Key: previousFirstKey, + Value: []byte("base64:AAAAAAAQYVwAAAAAADDSCpJJsd11jrOMW7AS/AHIMIDQ" + + "XjqmFyhDuGt2RPNvmcCXAVTy/URmfMOj7GNweXnZpzidmxHfPBfcP5A=")}, // height 3199498. + &proto.BinaryDataEntry{Key: previousSecondKey, + Value: []byte("base64:AAAAAAAQYywAAAAAADDSQBBubtiRmKwtaNFF1TrBhsfBu" + + "61fj3qiSrtyu1/kLLAlAVQp5GtuF7Hxji8CQ9SFOEZLLUv88nvIgg8=")}, // height 3199552. + &proto.BinaryDataEntry{Key: previousThirdKey, + Value: []byte("base64:AAAAAAAQZEUAAAAAADDSZeUUyashoWjUKurzA/wVU5prm" + + "68CambvjIo1ESLoLuAaAVRaS4vOsPl9cxvg7aeRj7RFZQzdpmvV/+A=")}, // height 3199589. + } + var previousHeight uint64 = 3199552 + + currentFirstKey := "block_0x3b5ad5c176473be02cc3d19207204af87af03f6fd75c76916765745658f7e842" + currentSecondKey := "block_0x3b72ee917fea7057fb88a357f619c22f6f8ddae03b701fab7c284953ecebbc8c" + currentThirdKey := "block_0x3b973acae11f248a524b463db7d198c7ddb47fd8aeda2f14699e639a0db19911" + + currentDataEntries := []proto.DataEntry{ + &proto.BinaryDataEntry{Key: currentFirstKey, + Value: []byte("base64:AAAAAAAQZKkAAAAAADDSb6xEaq4RsFQruG" + + "NeGdooPmtLBnlERR15qzc/mcKcQ461AVQp5GtuF7Hxji8CQ9SFOEZLLUv88nvIgg8=")}, // height 3199599. + &proto.BinaryDataEntry{Key: currentSecondKey, + Value: []byte("base64:AAAAAAAQZQkAAAAAADDSe+2CGv9zgiR7s" + + "65XEBkYzIbv6jbxcR7Zi3ByUqsX0bkwAVTEkyC5glOJH8Upe49iT3+BUV5zRaDT2dM=")}, // height 3199611. + &proto.BinaryDataEntry{Key: currentThirdKey, + Value: []byte("base64:AAAAAAAQZf8AAAAAADDSolzqc5gjHWP/s" + + "CzqK7+HkAjybjGxq8SxL9ID8yEIKxrlAVRN71D/MD4dykS8vqW7cXqCh5QOclg6DEU=")}, // height 3199650. + } + var currentHeight uint64 = 3199611 + + previousBlockInfo := proto.BUpdatesInfo{ + BlockUpdatesInfo: testBlockUpdates(), + ContractUpdatesInfo: proto.L2ContractDataEntries{ + AllDataEntries: previousDataEntries, + Height: previousHeight, + }, + } + + currentBlockInfo := proto.BUpdatesInfo{ + BlockUpdatesInfo: testBlockUpdates(), + ContractUpdatesInfo: proto.L2ContractDataEntries{ + AllDataEntries: currentDataEntries, + Height: currentHeight, + }, + } + + equal, changes, err := blockchaininfo.CompareBUpdatesInfo(currentBlockInfo, previousBlockInfo, + proto.TestNetScheme) + if err != nil { + return + } + require.False(t, equal) + require.True(t, containsDataEntry(changes.ContractUpdatesInfo.AllDataEntries, currentFirstKey, "binary")) + require.True(t, containsDataEntry(changes.ContractUpdatesInfo.AllDataEntries, currentSecondKey, "binary")) + require.True(t, containsDataEntry(changes.ContractUpdatesInfo.AllDataEntries, currentThirdKey, "binary")) + + require.True(t, containsDataEntry(changes.ContractUpdatesInfo.AllDataEntries, previousFirstKey, "delete")) + require.True(t, containsDataEntry(changes.ContractUpdatesInfo.AllDataEntries, previousSecondKey, "delete")) + require.True(t, containsDataEntry(changes.ContractUpdatesInfo.AllDataEntries, previousThirdKey, "delete")) +} + +// This tests check whether the changes generation will only show the new records and will not remove the old ones. +// Previous state contains 3 records, the current state contains both the previous new records and 3 new ones. +// The change result must be - 3 new records. +func TestChangesGenerationContainsPrevious(t *testing.T) { + previousFirstKey := "block_0x3a85dedc42db076c91cf61d72fa17c80777aeed70ba68dbc14d6829dd6e88614" + previousSecondKey := "block_0x3a9209ce524553a75fd0e9bde5c99ff254b1fb231916fc89755be957e51e5516" + previousThirdKey := "block_0x3b0181d3f66d9f0ddd8e1e8567b836a01f652b4cb873aa7b7c46fc8bd1e4eeee" + + previousDataEntries := []proto.DataEntry{ + &proto.BinaryDataEntry{Key: previousFirstKey, + Value: []byte("base64:AAAAAAAQYVwAAAAAADDSCpJJsd11jrOMW7AS/AHIMIDQXj" + + "qmFyhDuGt2RPNvmcCXAVTy/URmfMOj7GNweXnZpzidmxHfPBfcP5A=")}, // height 3199498. + &proto.BinaryDataEntry{Key: previousSecondKey, + Value: []byte("base64:AAAAAAAQYywAAAAAADDSQBBubtiRmKwtaNFF1TrBhsfBu61" + + "fj3qiSrtyu1/kLLAlAVQp5GtuF7Hxji8CQ9SFOEZLLUv88nvIgg8=")}, // height 3199552. + &proto.BinaryDataEntry{Key: previousThirdKey, + Value: []byte("base64:AAAAAAAQZEUAAAAAADDSZeUUyashoWjUKurzA/wVU5prm68Ca" + + "mbvjIo1ESLoLuAaAVRaS4vOsPl9cxvg7aeRj7RFZQzdpmvV/+A=")}, // height 3199589. + } + var previousHeight uint64 = 3199552 + + currentFirstKey := "block_0x3b5ad5c176473be02cc3d19207204af87af03f6fd75c76916765745658f7e842" + currentSecondKey := "block_0x3b72ee917fea7057fb88a357f619c22f6f8ddae03b701fab7c284953ecebbc8c" + currentThirdKey := "block_0x3b973acae11f248a524b463db7d198c7ddb47fd8aeda2f14699e639a0db19911" + + currentDataEntries := []proto.DataEntry{ + &proto.BinaryDataEntry{Key: previousFirstKey, + Value: []byte("base64:AAAAAAAQYVwAAAAAADDSCpJJsd11jrOMW7AS/A" + + "HIMIDQXjqmFyhDuGt2RPNvmcCXAVTy/URmfMOj7GNweXnZpzidmxHfPBfcP5A=")}, // height 3199498. + &proto.BinaryDataEntry{Key: previousSecondKey, + Value: []byte("base64:AAAAAAAQYywAAAAAADDSQBBubtiRmKwtaNFF1T" + + "rBhsfBu61fj3qiSrtyu1/kLLAlAVQp5GtuF7Hxji8CQ9SFOEZLLUv88nvIgg8=")}, // height 3199552. + &proto.BinaryDataEntry{Key: previousThirdKey, + Value: []byte("base64:AAAAAAAQZEUAAAAAADDSZeUUyashoWjUKurzA/wV" + + "U5prm68CambvjIo1ESLoLuAaAVRaS4vOsPl9cxvg7aeRj7RFZQzdpmvV/+A=")}, // height 3199589. + + &proto.BinaryDataEntry{Key: currentFirstKey, + Value: []byte("base64:AAAAAAAQZKkAAAAAADDSb6xEaq4RsFQruGNeGdoo" + + "PmtLBnlERR15qzc/mcKcQ461AVQp5GtuF7Hxji8CQ9SFOEZLLUv88nvIgg8=")}, // height 3199599. + &proto.BinaryDataEntry{Key: currentSecondKey, + Value: []byte("base64:AAAAAAAQZQkAAAAAADDSe+2CGv9zgiR7s65XEBkYz" + + "Ibv6jbxcR7Zi3ByUqsX0bkwAVTEkyC5glOJH8Upe49iT3+BUV5zRaDT2dM=")}, // height 3199611. + &proto.BinaryDataEntry{Key: currentThirdKey, + Value: []byte("base64:AAAAAAAQZf8AAAAAADDSolzqc5gjHWP/sCzqK7+Hk" + + "AjybjGxq8SxL9ID8yEIKxrlAVRN71D/MD4dykS8vqW7cXqCh5QOclg6DEU=")}, // height 3199650. + } + var currentHeight uint64 = 3199611 + + previousBlockInfo := proto.BUpdatesInfo{ + BlockUpdatesInfo: testBlockUpdates(), + ContractUpdatesInfo: proto.L2ContractDataEntries{ + AllDataEntries: previousDataEntries, + Height: previousHeight, + }, + } + + currentBlockInfo := proto.BUpdatesInfo{ + BlockUpdatesInfo: testBlockUpdates(), + ContractUpdatesInfo: proto.L2ContractDataEntries{ + AllDataEntries: currentDataEntries, + Height: currentHeight, + }, + } + + equal, changes, err := blockchaininfo.CompareBUpdatesInfo(currentBlockInfo, previousBlockInfo, + proto.TestNetScheme) + if err != nil { + return + } + require.False(t, equal) + + require.True(t, containsDataEntry(changes.ContractUpdatesInfo.AllDataEntries, currentFirstKey, "binary")) + require.True(t, containsDataEntry(changes.ContractUpdatesInfo.AllDataEntries, currentSecondKey, "binary")) + require.True(t, containsDataEntry(changes.ContractUpdatesInfo.AllDataEntries, currentThirdKey, "binary")) +} + +// This tests check whether the changes generation will not show anything, because there are no changes. +// Previous state contains 3 records, the current state contains the same records. +// The change result must be - 0 records. +func TestNoChangesGeneration(t *testing.T) { + previousFirstKey := "block_0x3a85dedc42db076c91cf61d72fa17c80777aeed70ba68dbc14d6829dd6e88614" + previousSecondKey := "block_0x3a9209ce524553a75fd0e9bde5c99ff254b1fb231916fc89755be957e51e5516" + previousThirdKey := "block_0x3b0181d3f66d9f0ddd8e1e8567b836a01f652b4cb873aa7b7c46fc8bd1e4eeee" + + previousDataEntries := []proto.DataEntry{ + &proto.BinaryDataEntry{Key: previousFirstKey, + Value: []byte("base64:AAAAAAAQYVwAAAAAADDSCpJJsd11jrO" + + "MW7AS/AHIMIDQXjqmFyhDuGt2RPNvmcCXAVTy/URmfMOj7GNweXnZpzidmxHfPBfcP5A=")}, // height 3199498. + &proto.BinaryDataEntry{Key: previousSecondKey, + Value: []byte("base64:AAAAAAAQYywAAAAAADDSQBBubtiRmKwt" + + "aNFF1TrBhsfBu61fj3qiSrtyu1/kLLAlAVQp5GtuF7Hxji8CQ9SFOEZLLUv88nvIgg8=")}, // height 3199552. + &proto.BinaryDataEntry{Key: previousThirdKey, + Value: []byte("base64:AAAAAAAQZEUAAAAAADDSZeUUyashoWjU" + + "KurzA/wVU5prm68CambvjIo1ESLoLuAaAVRaS4vOsPl9cxvg7aeRj7RFZQzdpmvV/+A=")}, // height 3199589. + } + var previousHeight uint64 = 3199552 + + currentDataEntries := []proto.DataEntry{ + &proto.BinaryDataEntry{Key: previousFirstKey, + Value: []byte("base64:AAAAAAAQYVwAAAAAADDSCpJJsd11jrO" + + "MW7AS/AHIMIDQXjqmFyhDuGt2RPNvmcCXAVTy/URmfMOj7GNweXnZpzidmxHfPBfcP5A=")}, // height 3199498. + &proto.BinaryDataEntry{Key: previousSecondKey, + Value: []byte("base64:AAAAAAAQYywAAAAAADDSQBBubtiRmKwta" + + "NFF1TrBhsfBu61fj3qiSrtyu1/kLLAlAVQp5GtuF7Hxji8CQ9SFOEZLLUv88nvIgg8=")}, // height 3199552. + &proto.BinaryDataEntry{Key: previousThirdKey, + Value: []byte("base64:AAAAAAAQZEUAAAAAADDSZeUUyashoWjU" + + "KurzA/wVU5prm68CambvjIo1ESLoLuAaAVRaS4vOsPl9cxvg7aeRj7RFZQzdpmvV/+A=")}, // height 3199589. + } + var currentHeight uint64 = 3199611 + + previousBlockInfo := proto.BUpdatesInfo{ + BlockUpdatesInfo: testBlockUpdates(), + ContractUpdatesInfo: proto.L2ContractDataEntries{ + AllDataEntries: previousDataEntries, + Height: previousHeight, + }, + } + + currentBlockInfo := proto.BUpdatesInfo{ + BlockUpdatesInfo: testBlockUpdates(), + ContractUpdatesInfo: proto.L2ContractDataEntries{ + AllDataEntries: currentDataEntries, + Height: currentHeight, + }, + } + + equal, changes, err := blockchaininfo.CompareBUpdatesInfo(currentBlockInfo, previousBlockInfo, + proto.TestNetScheme) + if err != nil { + return + } + require.True(t, equal) + + require.True(t, len(changes.ContractUpdatesInfo.AllDataEntries) == 0) +} + +func TestDecodeBlockMeta(t *testing.T) { + binaryDataEntryJSON := []byte(`{ + "key": "block_0x000cf2d957da5e30dcfae8b5eba2b585f0102680a5c343a1a107aa529f61c2db", + "type": "binary", + "value": "base64:AAAAAAACn6IAAAAAADKE1/L3xxY2i+uZZ0Rzd3XOD2O+12+a8D2j0d4Ymk/7v7YdAAAAAAAAAAAAAAAAAAAAFg==" + }`) + var binaryEntry proto.BinaryDataEntry + err := binaryEntry.UnmarshalJSON(binaryDataEntryJSON) + require.NoError(t, err) + var blockMeta blockchaininfo.BlockMeta + err = blockMeta.UnmarshalBinary(binaryEntry.Value) + require.NoError(t, err) + require.Equal(t, blockMeta.BlockHeight, int64(171938)) + require.Equal(t, blockMeta.BlockEpoch, int64(3310807)) +} + +func RunNatsTestServer() (*server.Server, error) { + opts := &server.Options{ + MaxPayload: 1024 * 1024, + Host: "127.0.0.1", + Port: 4756, + NoSigs: true, + } + s, err := server.NewServer(opts) + if err != nil { + return nil, errors.Wrap(err, "failed to create NATS server") + } + go s.Start() + if !s.ReadyForConnections(5 * server.AUTH_TIMEOUT) { + return nil, errors.New("NATS server is not ready for connections") + } + return s, nil +} + +const ( + blockID1 = "7wKAcTGbvDtruMSSYyndzN9YK3cQ47ZdTPeT8ej22qRg" + BlockID2 = "gzz8aN4b5rr1rkeAdmuwytuGv1jbm9LLRbXNKNb7ETX" + BlockID3 = "GrgPhEZ5rruNPSac5QxirgoYA2VwEKBJju3ppPgNyBWi" + BlockID4 = "5g9Ws6Z3SJ9dXN3JqPQxVWeCEYssmYzFdVNXX1rcyHib" + BlockID5 = "AEB4sYgpA2wMVSdzSCkVuN3R2moPnQiStDs9gPSRStny" + BlockID6 = "5bEZ4Y9BiVvM53RtBWmpT5jADeLmSt2vmC1iBB2gKuE8" + + l2ContractAddress = "3Mw2AVgk5xNmkWQkzKKhinhBH1YyBTeVku2" + + checkedBlockNumber = 3 +) + +func fillThirdCheckedBlock(t *testing.T) ([]proto.DataEntry, proto.BlockUpdatesInfo) { + var integerEntries []proto.DataEntry + blockID, err := proto.NewBlockIDFromBase58(BlockID3) + + for j := 1; j <= 3; j++ { + integerDataEntry := &proto.IntegerDataEntry{ + Key: strconv.Itoa(j), + Value: int64(-j), + } + assert.NoError(t, err) + integerEntries = append(integerEntries, integerDataEntry) + } + blockInfo := proto.BlockUpdatesInfo{ + Height: uint64(3), + BlockID: blockID, + } + return integerEntries, blockInfo +} + +func fillHistoryJournal(t *testing.T, stateCache *blockchaininfo.StateCache) *blockchaininfo.HistoryJournal { + var historyJorunal blockchaininfo.HistoryJournal + blockIDs := []string{blockID1, BlockID2, BlockID3, BlockID4, BlockID5} + for i := 1; i <= 5; i++ { + if i == checkedBlockNumber { + integerEntries, blockInfo := fillThirdCheckedBlock(t) + historyEntry := blockchaininfo.HistoryEntry{ + Height: blockInfo.Height, + BlockID: blockInfo.BlockID, + Entries: integerEntries, + } + historyJorunal.Push(historyEntry) + } + + var integerEntries []proto.DataEntry + blockID, err := proto.NewBlockIDFromBase58(blockIDs[i-1]) + + for j := 1; j <= i; j++ { + integerDataEntry := &proto.IntegerDataEntry{ + Key: strconv.Itoa(j), + Value: int64(j), + } + assert.NoError(t, err) + integerEntries = append(integerEntries, integerDataEntry) + } + historyEntry := blockchaininfo.HistoryEntry{ + Height: uint64(i), + BlockID: blockID, + Entries: integerEntries, + } + historyJorunal.Push(historyEntry) + continue + } + historyJorunal.StateCache = stateCache + return &historyJorunal +} + +func fillCache(t *testing.T) *blockchaininfo.StateCache { + stateCache := blockchaininfo.NewStateCache() + blockIDs := []string{blockID1, BlockID2, BlockID3, BlockID4, BlockID5} + for i := 1; i <= 5; i++ { + if i == checkedBlockNumber { + integerEntries, blockInfo := fillThirdCheckedBlock(t) + stateCache.AddCacheRecord(blockInfo.Height, integerEntries, blockInfo) + continue + } + + var integerEntries []proto.DataEntry + blockID, err := proto.NewBlockIDFromBase58(blockIDs[i-1]) + require.NoError(t, err) + for j := 1; j <= i; j++ { + integerDataEntry := &proto.IntegerDataEntry{ + Key: strconv.Itoa(j), + Value: int64(j), + } + integerEntries = append(integerEntries, integerDataEntry) + } + blockInfo := proto.BlockUpdatesInfo{ + Height: uint64(i), + BlockID: blockID, + } + stateCache.AddCacheRecord(uint64(i), integerEntries, blockInfo) + } + return stateCache +} + +// Rollback from block 5 to block 3. +// On block 3, keys "1", "2", "3" had negative values, so the patch should generate the negative +// values only found on that block. +func TestRollback(t *testing.T) { + mockPublisherInterface := mocks.NewMockUpdatesPublisherInterface(t) + mockPublisherInterface.EXPECT().PublishUpdates(mock.Anything, mock.Anything, proto.TestNetScheme, + l2ContractAddress).Return(nil) + mockPublisherInterface.EXPECT().L2ContractAddress().Return(l2ContractAddress) + + blockID6, err := proto.NewBlockIDFromBase58(BlockID6) + assert.NoError(t, err) + currentState := proto.BUpdatesInfo{ + BlockUpdatesInfo: proto.BlockUpdatesInfo{ + Height: 6, + BlockID: blockID6, + }, + ContractUpdatesInfo: proto.L2ContractDataEntries{ + Height: 6, + AllDataEntries: []proto.DataEntry{&proto.IntegerDataEntry{ + Key: "5", + Value: 6, + }}, + }, + } + blockID5, err := proto.NewBlockIDFromBase58(BlockID5) + assert.NoError(t, err) + previousState := proto.BUpdatesInfo{ + BlockUpdatesInfo: proto.BlockUpdatesInfo{ + Height: 5, + BlockID: blockID5, + }, + ContractUpdatesInfo: proto.L2ContractDataEntries{ + Height: 5, + AllDataEntries: []proto.DataEntry{&proto.IntegerDataEntry{ + Key: "5", + Value: 5, + }}, + }, + } + blockID4, err := proto.NewBlockIDFromBase58(BlockID4) + assert.NoError(t, err) + updates := proto.BUpdatesInfo{ + BlockUpdatesInfo: proto.BlockUpdatesInfo{ + Height: 4, + BlockID: blockID4, + }, + ContractUpdatesInfo: proto.L2ContractDataEntries{ + Height: 4, + AllDataEntries: []proto.DataEntry{&proto.IntegerDataEntry{ + Key: "4", + Value: 4, + }}, + }, + } + updatesExtensionState := &blockchaininfo.BUpdatesExtensionState{ + CurrentState: ¤tState, + PreviousState: &previousState, + Limit: 100, + Scheme: proto.TestNetScheme, + L2ContractAddress: l2ContractAddress, + HistoryJournal: fillHistoryJournal(t, fillCache(t)), + St: nil, + } + + // Rollback from block 5 to 3 + patch := blockchaininfo.HandleRollback(updatesExtensionState, updates, mockPublisherInterface, + nil, proto.TestNetScheme) + + expectedPatchEntries := []proto.DataEntry{ + &proto.IntegerDataEntry{ + Key: "1", + Value: -1, + }, + &proto.IntegerDataEntry{ + Key: "2", + Value: -2, + }, + &proto.IntegerDataEntry{ + Key: "3", + Value: -3, + }, + &proto.DeleteDataEntry{Key: "4"}, + &proto.DeleteDataEntry{Key: "5"}, + } + expectedL2Patch := proto.L2ContractDataEntries{ + AllDataEntries: expectedPatchEntries, + Height: 3, + } + + sort.Sort(patch.ContractUpdatesInfo.AllDataEntries) + sort.Sort(expectedL2Patch.AllDataEntries) + + assert.Equal(t, patch.ContractUpdatesInfo.AllDataEntries, expectedL2Patch.AllDataEntries) + assert.Equal(t, patch.ContractUpdatesInfo.Height, expectedL2Patch.Height) +} diff --git a/pkg/blockchaininfo/bupdates.go b/pkg/blockchaininfo/bupdates.go new file mode 100644 index 000000000..0bab4a6b3 --- /dev/null +++ b/pkg/blockchaininfo/bupdates.go @@ -0,0 +1,64 @@ +package blockchaininfo + +import ( + "context" + "sync" + + "github.com/wavesplatform/gowaves/pkg/proto" +) + +type BlockchainUpdatesExtension struct { + ctx context.Context + l2ContractAddress proto.WavesAddress + bUpdatesChannel chan proto.BUpdatesInfo + firstBlock *bool + blockchainExtensionState *BUpdatesExtensionState + lock sync.Mutex + makeExtensionReadyFunc func() +} + +func NewBlockchainUpdatesExtension( + ctx context.Context, + l2ContractAddress proto.WavesAddress, + bUpdatesChannel chan proto.BUpdatesInfo, + blockchainExtensionState *BUpdatesExtensionState, + firstBlock *bool, + makeExtensionReadyFunc func(), +) *BlockchainUpdatesExtension { + return &BlockchainUpdatesExtension{ + ctx: ctx, + l2ContractAddress: l2ContractAddress, + bUpdatesChannel: bUpdatesChannel, + firstBlock: firstBlock, + blockchainExtensionState: blockchainExtensionState, + makeExtensionReadyFunc: makeExtensionReadyFunc, + } +} + +func (e *BlockchainUpdatesExtension) L2ContractAddress() proto.WavesAddress { + return e.l2ContractAddress +} + +func (e *BlockchainUpdatesExtension) MarkExtensionReady() { + e.lock.Lock() + defer e.lock.Unlock() + e.makeExtensionReadyFunc() +} + +func (e *BlockchainUpdatesExtension) IsFirstRequestedBlock() bool { + return *e.firstBlock +} + +func (e *BlockchainUpdatesExtension) EmptyPreviousState() { + e.lock.Lock() + defer e.lock.Unlock() + *e.firstBlock = true + e.blockchainExtensionState.PreviousState = nil +} + +func (e *BlockchainUpdatesExtension) Close() { + if e.bUpdatesChannel != nil { + close(e.bUpdatesChannel) + } + e.bUpdatesChannel = nil +} diff --git a/pkg/blockchaininfo/nats_publisher.go b/pkg/blockchaininfo/nats_publisher.go new file mode 100644 index 000000000..d57f72326 --- /dev/null +++ b/pkg/blockchaininfo/nats_publisher.go @@ -0,0 +1,518 @@ +package blockchaininfo + +import ( + "context" + "sync" + "time" + + "go.uber.org/zap" + + "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" + "github.com/pkg/errors" + "github.com/wavesplatform/gowaves/pkg/proto" + "github.com/wavesplatform/gowaves/pkg/state" +) + +const StoreBlocksLimit = 200 + +const UpdatesBufferedChannelSize = 256 + +const portDefault = 4222 +const hostDefault = "127.0.0.1" +const natsMaxPayloadSize int32 = 1024 * 1024 // 1 MB +const publisherWaitingTime = 100 * time.Millisecond + +func ConcatenateContractTopics(contractAddress string) string { + return ContractUpdates + contractAddress +} + +type BUpdatesExtensionState struct { + CurrentState *proto.BUpdatesInfo + PreviousState *proto.BUpdatesInfo // this information is what was just published + Limit uint64 + Scheme proto.Scheme + constantContractKeys []string + L2ContractAddress string + HistoryJournal *HistoryJournal + St state.State +} + +type UpdatesPublisher struct { + l2ContractAddress string +} + +func NewBUpdatesExtensionState(limit uint64, scheme proto.Scheme, l2ContractAddress string, + st state.State) (*BUpdatesExtensionState, error) { + stateCache := NewStateCache() + currentHeight, err := st.Height() + if err != nil { + return nil, err + } + l2address, cnvrtErr := proto.NewAddressFromString(l2ContractAddress) + if cnvrtErr != nil { + return nil, errors.Wrapf(cnvrtErr, "failed to convert L2 contract address %s", l2ContractAddress) + } + historyJournal := NewHistoryJournal() + for targetHeight := currentHeight - HistoryJournalLengthMax; targetHeight <= currentHeight; targetHeight++ { + blockSnapshot, retrieveErr := st.SnapshotsAtHeight(targetHeight) + if retrieveErr != nil { + return nil, retrieveErr + } + blockInfo, pullErr := st.NewestBlockInfoByHeight(targetHeight) + if pullErr != nil { + return nil, errors.Wrap(pullErr, "failed to get newest block info") + } + blockHeader, blockErr := st.NewestHeaderByHeight(targetHeight) + if blockErr != nil { + return nil, errors.Wrap(blockErr, "failed to get newest block info") + } + bUpdatesInfo := state.BuildBlockUpdatesInfoFromSnapshot(blockInfo, blockHeader, blockSnapshot, l2address) + + filteredDataEntries, filtrErr := filterDataEntries(targetHeight-limit, + bUpdatesInfo.ContractUpdatesInfo.AllDataEntries) + if filtrErr != nil { + return nil, errors.Wrap(filtrErr, "failed to initialize state cache, failed to filter data entries") + } + + stateCache.AddCacheRecord(targetHeight, filteredDataEntries, bUpdatesInfo.BlockUpdatesInfo) + + historyEntry := HistoryEntry{ + Height: targetHeight, + BlockID: bUpdatesInfo.BlockUpdatesInfo.BlockID, + Entries: filteredDataEntries, + VRF: bUpdatesInfo.BlockUpdatesInfo.VRF, + BlockHeader: bUpdatesInfo.BlockUpdatesInfo.BlockHeader, + } + historyJournal.Push(historyEntry) + } + historyJournal.StateCache = stateCache + + return &BUpdatesExtensionState{Limit: limit, Scheme: scheme, + L2ContractAddress: l2ContractAddress, HistoryJournal: historyJournal, St: st}, nil +} + +func (bu *BUpdatesExtensionState) SetPreviousState(updates proto.BUpdatesInfo) { + bu.PreviousState = &updates +} + +func (bu *BUpdatesExtensionState) HasStateChanged() (bool, proto.BUpdatesInfo, error) { + statesAreEqual, changes, err := bu.StatesEqual(bu.Scheme) + if err != nil { + return false, proto.BUpdatesInfo{}, err + } + if statesAreEqual { + return false, proto.BUpdatesInfo{}, nil + } + return true, changes, nil +} + +func (bu *BUpdatesExtensionState) StatesEqual(scheme proto.Scheme) (bool, proto.BUpdatesInfo, error) { + return CompareBUpdatesInfo(*bu.CurrentState, *bu.PreviousState, scheme) +} + +func splitIntoChunks(array []byte, maxChunkSize int) [][]byte { + if maxChunkSize <= 0 { + return nil + } + var chunkedArray [][]byte + + for i := 0; i < len(array); i += maxChunkSize { + end := i + maxChunkSize + if end > len(array) { + end = len(array) + } + chunkedArray = append(chunkedArray, array[i:end]) + } + + return chunkedArray +} + +func PublishContractUpdates(contractUpdates proto.L2ContractDataEntries, nc *nats.Conn, + l2ContractAddress string) error { + dataEntriesProtobuf, err := L2ContractDataEntriesToProto(contractUpdates).MarshalVTStrict() + if err != nil { + return err + } + + if len(dataEntriesProtobuf) <= int(natsMaxPayloadSize-1) { + var msg []byte + msg = append(msg, NoPaging) + msg = append(msg, dataEntriesProtobuf...) + err = nc.Publish(ConcatenateContractTopics(l2ContractAddress), msg) + if err != nil { + zap.S().Errorf("failed to publish message on topic %s", ConcatenateContractTopics(l2ContractAddress)) + return err + } + return nil + } + + chunkedPayload := splitIntoChunks(dataEntriesProtobuf, int(natsMaxPayloadSize-1)/2) + + for i, chunk := range chunkedPayload { + var msg []byte + + if i == len(chunkedPayload)-1 { + msg = append(msg, EndPaging) + msg = append(msg, chunk...) + err = nc.Publish(ConcatenateContractTopics(l2ContractAddress), msg) + if err != nil { + zap.S().Errorf("failed to publish message on topic %s", ConcatenateContractTopics(l2ContractAddress)) + return err + } + break + } + msg = append(msg, StartPaging) + msg = append(msg, chunk...) + err = nc.Publish(ConcatenateContractTopics(l2ContractAddress), msg) + if err != nil { + zap.S().Errorf("failed to publish message on topic %s", ConcatenateContractTopics(l2ContractAddress)) + return err + } + time.Sleep(publisherWaitingTime) + } + + return nil +} + +func PublishBlockUpdates(updates proto.BUpdatesInfo, nc *nats.Conn, scheme proto.Scheme) error { + blockInfo, err := BlockUpdatesInfoToProto(updates.BlockUpdatesInfo, scheme) + if err != nil { + return err + } + blockInfoProtobuf, err := blockInfo.MarshalVTStrict() + if err != nil { + return err + } + err = nc.Publish(BlockUpdates, blockInfoProtobuf) + if err != nil { + zap.S().Errorf("failed to publish message on topic %s", BlockUpdates) + return err + } + return nil +} + +func (p *UpdatesPublisher) PublishUpdates(updates proto.BUpdatesInfo, + nc *nats.Conn, scheme proto.Scheme, l2ContractAddress string) error { + /* first publish block data */ + err := PublishBlockUpdates(updates, nc, scheme) + if err != nil { + zap.S().Errorf("failed to publish message on topic %s", BlockUpdates) + return err + } + /* second publish contract data entries */ + pblshErr := PublishContractUpdates(updates.ContractUpdatesInfo, nc, l2ContractAddress) + if pblshErr != nil { + zap.S().Errorf("failed to publish message on topic %s", ConcatenateContractTopics(p.L2ContractAddress())) + return pblshErr + } + return nil +} + +func (p *UpdatesPublisher) L2ContractAddress() string { + return p.l2ContractAddress +} + +func (bu *BUpdatesExtensionState) AddEntriesToHistoryJournalAndCache(updates proto.BUpdatesInfo) { + height := updates.BlockUpdatesInfo.Height + blockID := updates.BlockUpdatesInfo.BlockID + + historyEntry := HistoryEntry{ + Height: height, + BlockID: blockID, + Entries: updates.ContractUpdatesInfo.AllDataEntries, + VRF: updates.BlockUpdatesInfo.VRF, + BlockHeader: updates.BlockUpdatesInfo.BlockHeader, + } + bu.HistoryJournal.Push(historyEntry) + bu.HistoryJournal.StateCache.AddCacheRecord(height, updates.ContractUpdatesInfo.AllDataEntries, + updates.BlockUpdatesInfo) +} + +func (bu *BUpdatesExtensionState) RollbackHappened(updates proto.BUpdatesInfo, previousState proto.BUpdatesInfo) bool { + if _, blockIDFound := bu.HistoryJournal.SearchByBlockID(updates.BlockUpdatesInfo.BlockHeader.Parent); blockIDFound { + return false + } + if updates.BlockUpdatesInfo.Height < previousState.BlockUpdatesInfo.Height { + return true + } + return false +} + +func (bu *BUpdatesExtensionState) GeneratePatch(latestUpdates proto.BUpdatesInfo) (proto.BUpdatesInfo, error) { + keysForAnalysis, found := bu.HistoryJournal.FetchKeysUntilBlockID(latestUpdates.BlockUpdatesInfo.BlockID) + if !found { + previousHeight := bu.PreviousState.BlockUpdatesInfo.Height + newHeight := latestUpdates.BlockUpdatesInfo.Height + return proto.BUpdatesInfo{}, errors.Errorf("failed to fetch keys after rollback, the rollback is too deep."+ + "Previous height %d, new height %d", previousHeight, newHeight) + } + + // If the key is found in the state, fetch it. If it is not found, it creates a DeleteEntry. + patchDataEntries, patchBlockInfo, err := bu.BuildPatch(keysForAnalysis, + latestUpdates.BlockUpdatesInfo.Height-1) // Height. + // -1 because the current height is from the new block updates, + // and we need to return the client's state to the previous block + if err != nil { + return proto.BUpdatesInfo{}, err + } + + // Clean the journal and cache after rollback + historyJournalLatestHeight, err := bu.HistoryJournal.TopHeight() + if err != nil { + return proto.BUpdatesInfo{}, err + } + err = bu.CleanRecordsAfterRollback(historyJournalLatestHeight, latestUpdates.BlockUpdatesInfo.Height) + if err != nil { + return proto.BUpdatesInfo{}, err + } + + patch := proto.BUpdatesInfo{ + BlockUpdatesInfo: patchBlockInfo, // wrong, must be the previous block + ContractUpdatesInfo: proto.L2ContractDataEntries{ + AllDataEntries: patchDataEntries, + Height: latestUpdates.ContractUpdatesInfo.Height - 1, + BlockID: patchBlockInfo.BlockID, + }, + } + return patch, nil +} + +func (bu *BUpdatesExtensionState) IsKeyConstant(keyDataEntry string) bool { + for _, constantKey := range bu.constantContractKeys { + if constantKey == keyDataEntry { + return true + } + } + return false +} + +func (bu *BUpdatesExtensionState) BuildPatch(keysForPatch []string, targetHeight uint64) (proto.DataEntries, + proto.BlockUpdatesInfo, error) { + l2WavesAddress, cnvrtErr := proto.NewAddressFromString(bu.L2ContractAddress) + if cnvrtErr != nil { + return nil, proto.BlockUpdatesInfo{}, errors.Wrapf(cnvrtErr, + "failed to convert L2 contract address %q", bu.L2ContractAddress) + } + patch := make(map[string]proto.DataEntry) + for _, dataEntryKey := range keysForPatch { + recipient := proto.NewRecipientFromAddress(l2WavesAddress) + dataEntry, ok, err := bu.HistoryJournal.StateCache.SearchValue(dataEntryKey, targetHeight) + if err != nil { + // If the key is constant, we will go to State, if not, consider it a DeleteDataEntry + if bu.IsKeyConstant(dataEntryKey) { + dataEntry, err = bu.St.RetrieveEntry(recipient, dataEntryKey) + if err != nil { + dataEntry = &proto.DeleteDataEntry{Key: dataEntryKey} + } + } else { + dataEntry = &proto.DeleteDataEntry{Key: dataEntryKey} + } + } + if !ok { + dataEntry = &proto.DeleteDataEntry{Key: dataEntryKey} + } + patch[dataEntry.GetKey()] = dataEntry + } + var patchArray []proto.DataEntry + for _, elem := range patch { + patchArray = append(patchArray, elem) + } + blockInfo, err := bu.HistoryJournal.StateCache.SearchBlockInfo(targetHeight) + if err != nil { + return nil, proto.BlockUpdatesInfo{}, err + } + return patchArray, blockInfo, nil +} + +func (bu *BUpdatesExtensionState) CleanRecordsAfterRollback(latestHeightFromHistory uint64, + heightAfterRollback uint64) error { + err := bu.HistoryJournal.CleanAfterRollback(latestHeightFromHistory, heightAfterRollback) + if err != nil { + return err + } + + // This should never happen + if latestHeightFromHistory < heightAfterRollback { + return errors.New("the height after rollback is bigger than the last saved height") + } + for i := latestHeightFromHistory; i >= heightAfterRollback; i-- { + bu.HistoryJournal.StateCache.RemoveCacheRecord(i) + } + return nil +} + +func HandleRollback(be *BUpdatesExtensionState, updates proto.BUpdatesInfo, updatesPublisher UpdatesPublisherInterface, + nc *nats.Conn, scheme proto.Scheme) proto.BUpdatesInfo { + patch, err := be.GeneratePatch(updates) + if err != nil { + zap.S().Errorf("failed to generate a patch, %v", err) + } + pblshErr := updatesPublisher.PublishUpdates(patch, nc, scheme, updatesPublisher.L2ContractAddress()) + if pblshErr != nil { + zap.S().Errorf("failed to publish updates, %v", pblshErr) + return proto.BUpdatesInfo{} + } + be.AddEntriesToHistoryJournalAndCache(patch) + be.SetPreviousState(patch) + return patch +} + +func handleBlockchainUpdate(updates proto.BUpdatesInfo, be *BUpdatesExtensionState, scheme proto.Scheme, nc *nats.Conn, + updatesPublisher UpdatesPublisher, handleRollback bool) { + // update current state + be.CurrentState = &updates + if be.PreviousState == nil { + // publish initial updates + filteredDataEntries, err := filterDataEntries(updates.BlockUpdatesInfo.Height-be.Limit, + updates.ContractUpdatesInfo.AllDataEntries) + if err != nil { + return + } + updates.ContractUpdatesInfo.AllDataEntries = filteredDataEntries + pblshErr := updatesPublisher.PublishUpdates(updates, nc, scheme, updatesPublisher.L2ContractAddress()) + if pblshErr != nil { + zap.S().Errorf("failed to publish updates, %v", pblshErr) + return + } + be.PreviousState = &updates + return + } + if handleRollback { + if be.RollbackHappened(updates, *be.PreviousState) { + HandleRollback(be, updates, &updatesPublisher, nc, scheme) + } + } + // compare the current state to the previous state + stateChanged, changes, cmprErr := be.HasStateChanged() + if cmprErr != nil { + zap.S().Errorf("failed to compare current and previous states, %v", cmprErr) + return + } + // if there is any diff, send the update + if stateChanged { + pblshErr := updatesPublisher.PublishUpdates(updates, nc, scheme, updatesPublisher.L2ContractAddress()) + if pblshErr != nil { + zap.S().Errorf("failed to publish changes, %v", pblshErr) + } + be.AddEntriesToHistoryJournalAndCache(changes) + be.PreviousState = &updates + } +} + +func runPublisher(ctx context.Context, extension *BlockchainUpdatesExtension, scheme proto.Scheme, nc *nats.Conn, + updatesPublisher UpdatesPublisher) { + for { + select { + case updates, ok := <-extension.bUpdatesChannel: + if !ok { + zap.S().Errorf("the updates channel for publisher was closed") + return + } + handleBlockchainUpdate(updates, extension.blockchainExtensionState, scheme, nc, updatesPublisher, true) + case <-ctx.Done(): + return + } + } +} + +func runReceiver(nc *nats.Conn, bu *BlockchainUpdatesExtension) error { + _, subErr := nc.Subscribe(L2RequestsTopic, func(request *nats.Msg) { + signal := string(request.Data) + switch signal { + case RequestRestartSubTopic: + notNilResponse := "ok" + err := request.Respond([]byte(notNilResponse)) + if err != nil { + zap.S().Errorf("failed to respond to a restart signal, %v", err) + return + } + bu.EmptyPreviousState() + default: + zap.S().Errorf("nats receiver received an unknown signal, %s", signal) + } + }) + return subErr +} + +func (e *BlockchainUpdatesExtension) RunBlockchainUpdatesPublisher(ctx context.Context, + scheme proto.Scheme) error { + opts := &server.Options{ + MaxPayload: natsMaxPayloadSize, + Host: hostDefault, + Port: portDefault, + NoSigs: true, + } + s, err := server.NewServer(opts) + if err != nil { + return errors.Wrap(err, "failed to create NATS server") + } + go s.Start() + defer func() { + s.Shutdown() + s.WaitForShutdown() + }() + if !s.ReadyForConnections(NatsConnectionsTimeoutDefault) { + return errors.New("NATS server is not ready for connections") + } + + zap.S().Infof("NATS Server is running on port %d", portDefault) + + nc, err := nats.Connect(nats.DefaultURL) + if err != nil { + return errors.Wrap(err, "failed to connect to NATS server") + } + defer nc.Close() + + var wg sync.WaitGroup + wg.Add(1) + reqErr := e.requestConstantKeys(nc, &wg) + if reqErr != nil { + return errors.Wrap(reqErr, "failed to request constant keys from the client") + } + wg.Wait() + e.MarkExtensionReady() + updatesPublisher := UpdatesPublisher{l2ContractAddress: e.l2ContractAddress.String()} + // Publish the first 100 history entries for the rollback functionality. + publishHistoryBlocks(e, scheme, nc, updatesPublisher) + + receiverErr := runReceiver(nc, e) + if receiverErr != nil { + return receiverErr + } + runPublisher(ctx, e, scheme, nc, updatesPublisher) + return nil +} + +func (e *BlockchainUpdatesExtension) requestConstantKeys(nc *nats.Conn, wg *sync.WaitGroup) error { + _, subErr := nc.Subscribe(ConstantKeys, func(msg *nats.Msg) { + defer wg.Done() + constantKeys, err := DeserializeConstantKeys(msg.Data) + if err != nil { + zap.S().Errorf("failed to deserialize constant keys %v", err) + return + } + e.blockchainExtensionState.constantContractKeys = constantKeys + }) + return subErr +} + +func publishHistoryBlocks(e *BlockchainUpdatesExtension, scheme proto.Scheme, + nc *nats.Conn, updatesPublisher UpdatesPublisher) { + for _, historyEntry := range e.blockchainExtensionState.HistoryJournal.historyJournal { + updates := proto.BUpdatesInfo{ + BlockUpdatesInfo: proto.BlockUpdatesInfo{ + Height: historyEntry.Height, + BlockID: historyEntry.BlockID, + VRF: historyEntry.VRF, + BlockHeader: historyEntry.BlockHeader, + }, + ContractUpdatesInfo: proto.L2ContractDataEntries{ + Height: historyEntry.Height, + AllDataEntries: historyEntry.Entries, + BlockID: historyEntry.BlockID, + }, + } + handleBlockchainUpdate(updates, e.blockchainExtensionState, scheme, nc, updatesPublisher, false) + } +} diff --git a/pkg/blockchaininfo/types.go b/pkg/blockchaininfo/types.go new file mode 100644 index 000000000..0ee8958cd --- /dev/null +++ b/pkg/blockchaininfo/types.go @@ -0,0 +1,422 @@ +package blockchaininfo + +import ( + "bytes" + "io" + "math" + "slices" + "sync" + + "github.com/ccoveille/go-safecast" + "github.com/nats-io/nats.go" + "github.com/pkg/errors" + "github.com/wavesplatform/gowaves/pkg/crypto" + "github.com/wavesplatform/gowaves/pkg/proto" +) + +var ErrNotInCache = errors.New("the target height is not in cache") + +const ( + RootHashSize = 32 + + HistoryJournalLengthMax = 100 +) + +type UpdatesPublisherInterface interface { + PublishUpdates(updates proto.BUpdatesInfo, + nc *nats.Conn, scheme proto.Scheme, l2ContractAddress string) error + L2ContractAddress() string +} + +type StateCacheRecord struct { + dataEntries map[string]proto.DataEntry + blockInfo proto.BlockUpdatesInfo +} + +func NewStateCacheRecord(dataEntries []proto.DataEntry, blockInfo proto.BlockUpdatesInfo) StateCacheRecord { + var stateCacheRecord StateCacheRecord + stateCacheRecord.dataEntries = make(map[string]proto.DataEntry, len(dataEntries)) + + for _, dataEntry := range dataEntries { + stateCacheRecord.dataEntries[dataEntry.GetKey()] = dataEntry + } + stateCacheRecord.blockInfo = blockInfo + return stateCacheRecord +} + +type StateCache struct { + lock sync.Mutex + records map[proto.Height]StateCacheRecord + heights []uint64 +} + +func NewStateCache() *StateCache { + return &StateCache{ + records: make(map[proto.Height]StateCacheRecord), + } +} + +func (sc *StateCache) SearchValue(key string, height uint64) (proto.DataEntry, bool, error) { + sc.lock.Lock() + defer sc.lock.Unlock() + + record, found := sc.records[height] + if !found { + return nil, false, ErrNotInCache + } + entry, ok := record.dataEntries[key] + return entry, ok, nil +} + +func (sc *StateCache) SearchBlockInfo(height uint64) (proto.BlockUpdatesInfo, error) { + sc.lock.Lock() + defer sc.lock.Unlock() + + if _, ok := sc.records[height]; !ok { + return proto.BlockUpdatesInfo{}, ErrNotInCache + } + return sc.records[height].blockInfo, nil +} + +func (sc *StateCache) AddCacheRecord(height uint64, dataEntries []proto.DataEntry, blockInfo proto.BlockUpdatesInfo) { + sc.lock.Lock() + defer sc.lock.Unlock() + // clean the oldest record if the cache is too big + if len(sc.heights) > HistoryJournalLengthMax { + minHeight := sc.heights[0] + for _, v := range sc.heights { + minHeight = min(minHeight, v) + } + delete(sc.records, minHeight) + } + stateCacheRecord := NewStateCacheRecord(dataEntries, blockInfo) + sc.records[height] = stateCacheRecord + sc.heights = append(sc.heights, height) +} + +func (sc *StateCache) RemoveCacheRecord(targetHeight uint64) { + sc.lock.Lock() + defer sc.lock.Unlock() + + delete(sc.records, targetHeight) + + if i := slices.Index(sc.heights, targetHeight); i != -1 { + sc.heights = append(sc.heights[:i], sc.heights[i+1:]...) + } +} + +type HistoryEntry struct { + Height uint64 + BlockID proto.BlockID + VRF proto.B58Bytes + BlockHeader proto.BlockHeader + + Entries proto.DataEntries +} + +type HistoryJournal struct { + lock sync.Mutex + StateCache *StateCache + historyJournal [HistoryJournalLengthMax]HistoryEntry + top int + size int +} + +func NewHistoryJournal() *HistoryJournal { + return &HistoryJournal{ + top: 0, + size: 0, + } +} + +func (hj *HistoryJournal) SetStateCache(stateCache *StateCache) { + hj.StateCache = stateCache +} + +// FetchKeysUntilBlockID TODO write tests. +// FetchKeysUntilBlockID goes from top to bottom and fetches all keys. +// If the blockID is found, it returns the keys up to and including that element and true. +// If the blockID is not found - nil and false. +func (hj *HistoryJournal) FetchKeysUntilBlockID(blockID proto.BlockID) ([]string, bool) { + hj.lock.Lock() + defer hj.lock.Unlock() + + var keys []string + for i := 0; i < hj.size; i++ { + idx := (hj.top - 1 - i + HistoryJournalLengthMax) % HistoryJournalLengthMax + historyEntry := hj.historyJournal[idx] + + dataEntries := historyEntry.Entries + for _, dataEntry := range dataEntries { + keys = append(keys, dataEntry.GetKey()) + } + if historyEntry.BlockID == blockID { + return keys, true + } + } + + return nil, false +} + +// SearchByBlockID TODO write tests. +func (hj *HistoryJournal) SearchByBlockID(blockID proto.BlockID) (HistoryEntry, bool) { + hj.lock.Lock() + defer hj.lock.Unlock() + + // Iterate over the elements from the top (latest) to the bottom. + for i := 0; i < hj.size; i++ { + idx := (hj.top - 1 - i + HistoryJournalLengthMax) % HistoryJournalLengthMax + if hj.historyJournal[idx].BlockID == blockID { + return hj.historyJournal[idx], true + } + } + return HistoryEntry{}, false +} + +// SearchByBlockID TODO write tests. +func (hj *HistoryJournal) TopHeight() (uint64, error) { + hj.lock.Lock() + defer hj.lock.Unlock() + + if hj.size == 0 { + return 0, errors.New("failed to pull the top height, history journal is empty") + } + + // Shift "top" back. + hj.top = (hj.top - 1 + HistoryJournalLengthMax) % HistoryJournalLengthMax + topHeight := hj.historyJournal[hj.top].Height + return topHeight, nil +} + +// CleanAfterRollback TODO write tests. +func (hj *HistoryJournal) CleanAfterRollback(latestHeightFromHistory uint64, heightAfterRollback uint64) error { + hj.lock.Lock() + defer hj.lock.Unlock() + + distance := latestHeightFromHistory - heightAfterRollback + if distance > math.MaxInt64 { + return errors.New("distance too large to fit in an int64") + } + dist, err := safecast.ToInt(distance) + if err != nil { + return errors.Wrapf(err, "failed to convert int64 to int") + } + if dist > hj.size { + return errors.New("distance out of range") + } + + // Remove the number of elements from the top to `distance`. + hj.top = (hj.top - dist + HistoryJournalLengthMax) % HistoryJournalLengthMax + hj.size -= int(distance) + return nil +} + +func (hj *HistoryJournal) Push(v HistoryEntry) { + hj.lock.Lock() + defer hj.lock.Unlock() + hj.historyJournal[hj.top] = v // Add to top or rewrite the oldest element. + + hj.top = (hj.top + 1) % HistoryJournalLengthMax + + if hj.size < HistoryJournalLengthMax { + hj.size++ + } +} + +func (hj *HistoryJournal) Pop() (HistoryEntry, error) { + hj.lock.Lock() + defer hj.lock.Unlock() + + if hj.size == 0 { + return HistoryEntry{}, errors.New("failed to pop from the history journal, it's empty") + } + + // Shift "top" back. + hj.top = (hj.top - 1 + HistoryJournalLengthMax) % HistoryJournalLengthMax + entry := hj.historyJournal[hj.top] + hj.size-- + return entry, nil +} + +type L2Requests struct { + Restart bool +} + +func CompareBUpdatesInfo(current, previous proto.BUpdatesInfo, + scheme proto.Scheme) (bool, proto.BUpdatesInfo, error) { + changes := proto.BUpdatesInfo{ + BlockUpdatesInfo: proto.BlockUpdatesInfo{}, + ContractUpdatesInfo: proto.L2ContractDataEntries{}, + } + + equal := true + if current.BlockUpdatesInfo.Height != previous.BlockUpdatesInfo.Height { + equal = false + changes.BlockUpdatesInfo.Height = current.BlockUpdatesInfo.Height + } + if !bytes.Equal(current.BlockUpdatesInfo.VRF, previous.BlockUpdatesInfo.VRF) { + equal = false + changes.BlockUpdatesInfo.VRF = current.BlockUpdatesInfo.VRF + } + if !bytes.Equal(current.BlockUpdatesInfo.BlockID.Bytes(), previous.BlockUpdatesInfo.BlockID.Bytes()) { + equal = false + changes.BlockUpdatesInfo.BlockID = current.BlockUpdatesInfo.BlockID + } + equalHeaders, err := compareBlockHeader(current.BlockUpdatesInfo.BlockHeader, + previous.BlockUpdatesInfo.BlockHeader, scheme) + if err != nil { + return false, proto.BUpdatesInfo{}, err + } + if !equalHeaders { + equal = false + changes.BlockUpdatesInfo.BlockHeader = current.BlockUpdatesInfo.BlockHeader + } + + equalEntries, dataEntryChanges, err := compareDataEntries(current.ContractUpdatesInfo.AllDataEntries, + previous.ContractUpdatesInfo.AllDataEntries) + if err != nil { + return false, proto.BUpdatesInfo{}, err + } + if !equalEntries { + equal = false + changes.ContractUpdatesInfo.AllDataEntries = dataEntryChanges + changes.ContractUpdatesInfo.Height = current.BlockUpdatesInfo.Height + } + + return equal, changes, nil +} + +func compareBlockHeader(a, b proto.BlockHeader, scheme proto.Scheme) (bool, error) { + blockAbytes, err := a.MarshalHeader(scheme) + if err != nil { + return false, err + } + + blockBbytes, err := b.MarshalHeader(scheme) + if err != nil { + return false, err + } + + return bytes.Equal(blockAbytes, blockBbytes), nil +} + +func compareDataEntries(current, previous proto.DataEntries) (bool, []proto.DataEntry, error) { + currentMap := make(map[string][]byte) // Data entries. + previousMap := make(map[string][]byte) // Data entries. + + for _, dataEntry := range current { + value, err := dataEntry.MarshalValue() + if err != nil { + return false, nil, err + } + currentMap[dataEntry.GetKey()] = value + } + + for _, dataEntry := range previous { + value, err := dataEntry.MarshalValue() + if err != nil { + return false, nil, err + } + previousMap[dataEntry.GetKey()] = value + } + var changes []proto.DataEntry + + for key, valueCur := range currentMap { + // Existing keys, not found in the previous state. This means that these keys were added. + if valuePrev, found := previousMap[key]; !found { + entryChange, err := proto.NewDataEntryFromValueBytes(valueCur) + if err != nil { + return false, nil, err + } + entryChange.SetKey(key) + changes = append(changes, entryChange) + // Existing keys, found in the previous state, different values. This means that data changed. + } else if !bytes.Equal(valuePrev, valueCur) { + entryChange, err := proto.NewDataEntryFromValueBytes(valueCur) + if err != nil { + return false, nil, err + } + entryChange.SetKey(key) + changes = append(changes, entryChange) + } + } + + // Keys existing in the previous state, not found in the current state. This means that these keys were deleted. + for key := range previousMap { + if _, found := currentMap[key]; !found { + deleteEntry := &proto.DeleteDataEntry{} + deleteEntry.SetKey(key) + changes = append(changes, deleteEntry) + } + } + + equal := len(changes) == 0 + return equal, changes, nil +} + +type BlockMeta struct { + BlockHeight int64 `json:"blockHeight"` + BlockEpoch int64 `json:"blockEpoch"` + BlockParent []byte `json:"blockParent"` + ChainID int64 `json:"chainId"` + E2CTransfersRootHash []byte `json:"e2cTransfersRootHash"` + LastC2ETransferIndex int64 `json:"lastC2ETransferIndex"` +} + +func readBytes(reader *bytes.Reader, length int) ([]byte, error) { + buf := make([]byte, length) + _, err := io.ReadFull(reader, buf) + if err != nil { + return nil, err + } + return buf, nil +} + +func (bm *BlockMeta) UnmarshalBinary(value []byte) error { + var err error + binaryData := value + + reader := bytes.NewReader(binaryData) + // Step 1: Extract blockHeight, 8 bytes + bm.BlockHeight, err = readInt64(reader) + if err != nil { + return errors.Errorf("failed to read block height from blockMeta: %v", err) + } + // Step 2: Extract blockEpoch, 8 bytes + bm.BlockEpoch, err = readInt64(reader) + if err != nil { + return errors.Errorf("failed to read block epoch from blockMeta: %v", err) + } + // Step 3: Extract blockParent, 32 bytes + bm.BlockParent, err = readBytes(reader, crypto.DigestSize) + if err != nil { + return errors.Errorf("failed to read block parent from blockMeta: %v", err) + } + // Step 4: Extract chainId, 8 bytes + bm.ChainID, err = readInt64(reader) + if err != nil { + return errors.Errorf("failed to read chain ID from blockMeta: %v", err) + } + // How many bytes are left to read + remainingBytes := reader.Len() + // Step 5: Extract e2cTransfersRootHash + if remainingBytes >= RootHashSize { + bm.E2CTransfersRootHash, err = readBytes(reader, RootHashSize) + if err != nil { + return errors.Errorf("failed to read E2CTransfersRootHash from blockMeta: %v", err) + } + } else { + bm.E2CTransfersRootHash = nil // Represents base58'' + } + // Step 6: Extract lastC2ETransferIndex + if remainingBytes == 8 || remainingBytes > RootHashSize { + index, readErr := readInt64(reader) + if readErr != nil { + return errors.Errorf("failed to read lastC2ETransferIndex from blockMeta: %v", readErr) + } + bm.LastC2ETransferIndex = index + } else { + bm.LastC2ETransferIndex = -1 + } + return nil +} diff --git a/pkg/grpc/server/common_test.go b/pkg/grpc/server/common_test.go index 5952ee307..a01f861cb 100644 --- a/pkg/grpc/server/common_test.go +++ b/pkg/grpc/server/common_test.go @@ -86,7 +86,7 @@ func stateWithCustomGenesis(t *testing.T, genesisPath string) state.State { // Activate data transactions. sets.PreactivatedFeatures = []int16{5} params := defaultStateParams() - st, err := state.NewState(dataDir, true, params, sets, false) + st, err := state.NewState(dataDir, true, params, sets, false, nil) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, st.Close()) @@ -120,7 +120,7 @@ func withAutoCancel(t *testing.T, ctx context.Context) context.Context { func newTestState(t *testing.T, amend bool, params state.StateParams, settings *settings.BlockchainSettings) state.State { dataDir := t.TempDir() - st, err := state.NewState(dataDir, amend, params, settings, false) + st, err := state.NewState(dataDir, amend, params, settings, false, nil) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, st.Close()) diff --git a/pkg/mock/blockchaininfo_types.go b/pkg/mock/blockchaininfo_types.go new file mode 100644 index 000000000..e46e70906 --- /dev/null +++ b/pkg/mock/blockchaininfo_types.go @@ -0,0 +1,131 @@ +// Code generated by mockery v2.50.1. DO NOT EDIT. + +package mock + +import ( + nats "github.com/nats-io/nats.go" + mock "github.com/stretchr/testify/mock" + + proto "github.com/wavesplatform/gowaves/pkg/proto" +) + +// MockUpdatesPublisherInterface is an autogenerated mock type for the UpdatesPublisherInterface type +type MockUpdatesPublisherInterface struct { + mock.Mock +} + +type MockUpdatesPublisherInterface_Expecter struct { + mock *mock.Mock +} + +func (_m *MockUpdatesPublisherInterface) EXPECT() *MockUpdatesPublisherInterface_Expecter { + return &MockUpdatesPublisherInterface_Expecter{mock: &_m.Mock} +} + +// L2ContractAddress provides a mock function with no fields +func (_m *MockUpdatesPublisherInterface) L2ContractAddress() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for L2ContractAddress") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockUpdatesPublisherInterface_L2ContractAddress_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'L2ContractAddress' +type MockUpdatesPublisherInterface_L2ContractAddress_Call struct { + *mock.Call +} + +// L2ContractAddress is a helper method to define mock.On call +func (_e *MockUpdatesPublisherInterface_Expecter) L2ContractAddress() *MockUpdatesPublisherInterface_L2ContractAddress_Call { + return &MockUpdatesPublisherInterface_L2ContractAddress_Call{Call: _e.mock.On("L2ContractAddress")} +} + +func (_c *MockUpdatesPublisherInterface_L2ContractAddress_Call) Run(run func()) *MockUpdatesPublisherInterface_L2ContractAddress_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockUpdatesPublisherInterface_L2ContractAddress_Call) Return(_a0 string) *MockUpdatesPublisherInterface_L2ContractAddress_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockUpdatesPublisherInterface_L2ContractAddress_Call) RunAndReturn(run func() string) *MockUpdatesPublisherInterface_L2ContractAddress_Call { + _c.Call.Return(run) + return _c +} + +// PublishUpdates provides a mock function with given fields: updates, nc, scheme, l2ContractAddress +func (_m *MockUpdatesPublisherInterface) PublishUpdates(updates proto.BUpdatesInfo, nc *nats.Conn, scheme byte, l2ContractAddress string) error { + ret := _m.Called(updates, nc, scheme, l2ContractAddress) + + if len(ret) == 0 { + panic("no return value specified for PublishUpdates") + } + + var r0 error + if rf, ok := ret.Get(0).(func(proto.BUpdatesInfo, *nats.Conn, byte, string) error); ok { + r0 = rf(updates, nc, scheme, l2ContractAddress) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockUpdatesPublisherInterface_PublishUpdates_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PublishUpdates' +type MockUpdatesPublisherInterface_PublishUpdates_Call struct { + *mock.Call +} + +// PublishUpdates is a helper method to define mock.On call +// - updates proto.BUpdatesInfo +// - nc *nats.Conn +// - scheme byte +// - l2ContractAddress string +func (_e *MockUpdatesPublisherInterface_Expecter) PublishUpdates(updates interface{}, nc interface{}, scheme interface{}, l2ContractAddress interface{}) *MockUpdatesPublisherInterface_PublishUpdates_Call { + return &MockUpdatesPublisherInterface_PublishUpdates_Call{Call: _e.mock.On("PublishUpdates", updates, nc, scheme, l2ContractAddress)} +} + +func (_c *MockUpdatesPublisherInterface_PublishUpdates_Call) Run(run func(updates proto.BUpdatesInfo, nc *nats.Conn, scheme byte, l2ContractAddress string)) *MockUpdatesPublisherInterface_PublishUpdates_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(proto.BUpdatesInfo), args[1].(*nats.Conn), args[2].(byte), args[3].(string)) + }) + return _c +} + +func (_c *MockUpdatesPublisherInterface_PublishUpdates_Call) Return(_a0 error) *MockUpdatesPublisherInterface_PublishUpdates_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockUpdatesPublisherInterface_PublishUpdates_Call) RunAndReturn(run func(proto.BUpdatesInfo, *nats.Conn, byte, string) error) *MockUpdatesPublisherInterface_PublishUpdates_Call { + _c.Call.Return(run) + return _c +} + +// NewMockUpdatesPublisherInterface creates a new instance of MockUpdatesPublisherInterface. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockUpdatesPublisherInterface(t interface { + mock.TestingT + Cleanup(func()) +}) *MockUpdatesPublisherInterface { + mock := &MockUpdatesPublisherInterface{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/mock/state.go b/pkg/mock/state.go index ef14dc2b0..6aa4d2d99 100644 --- a/pkg/mock/state.go +++ b/pkg/mock/state.go @@ -688,6 +688,36 @@ func (mr *MockStateInfoMockRecorder) NewAddrTransactionsIterator(addr interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewAddrTransactionsIterator", reflect.TypeOf((*MockStateInfo)(nil).NewAddrTransactionsIterator), addr) } +// NewestBlockInfoByHeight mocks base method. +func (m *MockStateInfo) NewestBlockInfoByHeight(height proto.Height) (*proto.BlockInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewestBlockInfoByHeight", height) + ret0, _ := ret[0].(*proto.BlockInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewestBlockInfoByHeight indicates an expected call of NewestBlockInfoByHeight. +func (mr *MockStateInfoMockRecorder) NewestBlockInfoByHeight(height interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewestBlockInfoByHeight", reflect.TypeOf((*MockStateInfo)(nil).NewestBlockInfoByHeight), height) +} + +// NewestHeaderByHeight mocks base method. +func (m *MockStateInfo) NewestHeaderByHeight(height uint64) (*proto.BlockHeader, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewestHeaderByHeight", height) + ret0, _ := ret[0].(*proto.BlockHeader) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewestHeaderByHeight indicates an expected call of NewestHeaderByHeight. +func (mr *MockStateInfoMockRecorder) NewestHeaderByHeight(height interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewestHeaderByHeight", reflect.TypeOf((*MockStateInfo)(nil).NewestHeaderByHeight), height) +} + // NewestScriptByAccount mocks base method. func (m *MockStateInfo) NewestScriptByAccount(account proto.Recipient) (*ast.Tree, error) { m.ctrl.T.Helper() @@ -2091,6 +2121,36 @@ func (mr *MockStateMockRecorder) NewAddrTransactionsIterator(addr interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewAddrTransactionsIterator", reflect.TypeOf((*MockState)(nil).NewAddrTransactionsIterator), addr) } +// NewestBlockInfoByHeight mocks base method. +func (m *MockState) NewestBlockInfoByHeight(height proto.Height) (*proto.BlockInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewestBlockInfoByHeight", height) + ret0, _ := ret[0].(*proto.BlockInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewestBlockInfoByHeight indicates an expected call of NewestBlockInfoByHeight. +func (mr *MockStateMockRecorder) NewestBlockInfoByHeight(height interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewestBlockInfoByHeight", reflect.TypeOf((*MockState)(nil).NewestBlockInfoByHeight), height) +} + +// NewestHeaderByHeight mocks base method. +func (m *MockState) NewestHeaderByHeight(height uint64) (*proto.BlockHeader, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewestHeaderByHeight", height) + ret0, _ := ret[0].(*proto.BlockHeader) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewestHeaderByHeight indicates an expected call of NewestHeaderByHeight. +func (mr *MockStateMockRecorder) NewestHeaderByHeight(height interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewestHeaderByHeight", reflect.TypeOf((*MockState)(nil).NewestHeaderByHeight), height) +} + // NewestScriptByAccount mocks base method. func (m *MockState) NewestScriptByAccount(account proto.Recipient) (*ast.Tree, error) { m.ctrl.T.Helper() diff --git a/pkg/proto/blockchain_updates_types.go b/pkg/proto/blockchain_updates_types.go index 0e0339d37..c94f53237 100644 --- a/pkg/proto/blockchain_updates_types.go +++ b/pkg/proto/blockchain_updates_types.go @@ -1,5 +1,20 @@ package proto +import ( + "context" + "sync" + "time" + + "go.uber.org/zap" +) + +const ChannelWriteTimeout = 10 * time.Second + +type BUpdatesInfo struct { + BlockUpdatesInfo BlockUpdatesInfo + ContractUpdatesInfo L2ContractDataEntries +} + // L2ContractDataEntries L2 contract data entries. type L2ContractDataEntries struct { AllDataEntries DataEntries `json:"all_data_entries"` @@ -14,3 +29,81 @@ type BlockUpdatesInfo struct { BlockID BlockID `json:"block_id"` BlockHeader BlockHeader `json:"block_header"` } + +type BlockchainUpdatesPluginInfo struct { + enableBlockchainUpdatesPlugin bool + L2ContractAddress WavesAddress + FirstBlock *bool + Lock sync.Mutex + Ready bool + BUpdatesChannel chan<- BUpdatesInfo + ctx context.Context +} + +func NewBlockchainUpdatesPluginInfo(ctx context.Context, + l2Address WavesAddress, bUpdatesChannel chan<- BUpdatesInfo, + firstBlock *bool, + enableBlockchainUpdatesPlugin bool) *BlockchainUpdatesPluginInfo { + return &BlockchainUpdatesPluginInfo{ + L2ContractAddress: l2Address, + FirstBlock: firstBlock, + BUpdatesChannel: bUpdatesChannel, + ctx: ctx, + enableBlockchainUpdatesPlugin: enableBlockchainUpdatesPlugin, + Ready: false, + } +} + +func (e *BlockchainUpdatesPluginInfo) IsBlockchainUpdatesEnabled() bool { + return e.enableBlockchainUpdatesPlugin +} + +func (e *BlockchainUpdatesPluginInfo) IsReady() bool { + e.Lock.Lock() + defer e.Lock.Unlock() + return e.enableBlockchainUpdatesPlugin && e.Ready +} + +func (e *BlockchainUpdatesPluginInfo) MakeExtensionReady() { + e.Lock.Lock() + defer e.Lock.Unlock() + e.Ready = true +} + +func (e *BlockchainUpdatesPluginInfo) Ctx() context.Context { + return e.ctx +} + +func (e *BlockchainUpdatesPluginInfo) FirstBlockDone() { + e.Lock.Lock() + defer e.Lock.Unlock() + *e.FirstBlock = false +} + +func (e *BlockchainUpdatesPluginInfo) IsFirstBlockDone() bool { + e.Lock.Lock() + defer e.Lock.Unlock() + return *e.FirstBlock +} + +func (e *BlockchainUpdatesPluginInfo) WriteBUpdates(bUpdates BUpdatesInfo) { + if e.BUpdatesChannel == nil || !e.IsReady() { + return + } + select { + case e.BUpdatesChannel <- bUpdates: + case <-time.After(ChannelWriteTimeout): + zap.S().Errorf("failed to write into the blockchain updates channel, out of time") + return + case <-e.ctx.Done(): + e.Close() + return + } +} + +func (e *BlockchainUpdatesPluginInfo) Close() { + if e.BUpdatesChannel != nil { + close(e.BUpdatesChannel) + } + e.BUpdatesChannel = nil +} diff --git a/pkg/proto/protobuf_converters.go b/pkg/proto/protobuf_converters.go index 3abf6a48e..9ce0c3e5a 100644 --- a/pkg/proto/protobuf_converters.go +++ b/pkg/proto/protobuf_converters.go @@ -1804,36 +1804,18 @@ func (c *ProtobufConverter) BlockHeader(block *g.Block) (BlockHeader, error) { if block.Header == nil { return BlockHeader{}, errors.New("empty block header") } - features := c.features(block.Header.FeatureVotes) - consensus := c.consensus(block.Header) - v := BlockVersion(c.byte(block.Header.Version)) - header := BlockHeader{ - Version: v, - Timestamp: c.uint64(block.Header.Timestamp), - Parent: c.blockID(block.Header.Reference), - FeaturesCount: len(features), - Features: features, - RewardVote: block.Header.RewardVote, - ConsensusBlockLength: uint32(consensus.BinarySize()), - NxtConsensus: consensus, - TransactionCount: len(block.Transactions), - GeneratorPublicKey: c.publicKey(block.Header.Generator), - BlockSignature: c.signature(block.Signature), - TransactionsRoot: block.Header.TransactionsRoot, - StateHash: c.stateHash(block.Header.StateHash), - ChallengedHeader: c.challengedHeader(block.Header.ChallengedHeader), - } - if c.err != nil { - err := c.err - c.reset() + header, err := c.PartialBlockHeader(block.Header) + if err != nil { return BlockHeader{}, err } + header.TransactionCount = len(block.Transactions) // set tx count from whole block message + header.BlockSignature = c.signature(block.Signature) // set block signature from whole block message scheme := c.byte(block.Header.ChainId) if scheme == 0 { scheme = c.FallbackChainID } - if err := header.GenerateBlockID(scheme); err != nil { - return BlockHeader{}, err + if gbErr := header.GenerateBlockID(scheme); gbErr != nil { // generate block ID after setting all fields + return BlockHeader{}, gbErr } return header, nil } diff --git a/pkg/proto/types.go b/pkg/proto/types.go index a5ddf63e4..025e919f9 100644 --- a/pkg/proto/types.go +++ b/pkg/proto/types.go @@ -3168,6 +3168,18 @@ func NewDataEntryFromJSON(data []byte) (DataEntry, error) { // DataEntries the slice of various entries of DataTransaction type DataEntries []DataEntry +func (e DataEntries) Len() int { + return len(e) +} + +func (e DataEntries) Less(i int, j int) bool { + return e[i].GetKey() < e[j].GetKey() +} + +func (e DataEntries) Swap(i int, j int) { + e[i], e[j] = e[j], e[i] +} + // PayloadSize returns summary payload size of all entries. func (e DataEntries) PayloadSize() int { pl := 0 @@ -3208,9 +3220,9 @@ func (e *DataEntries) UnmarshalJSON(data []byte) error { entries := make([]DataEntry, len(ets)) for i, row := range ets { - et, err := guessDataEntryType(row) - if err != nil { - return wrapError(err) + et, guessErr := guessDataEntryType(row) + if guessErr != nil { + return wrapError(guessErr) } entries[i] = et } diff --git a/pkg/ride/environment.go b/pkg/ride/environment.go index 358a94d78..4f041b7ee 100644 --- a/pkg/ride/environment.go +++ b/pkg/ride/environment.go @@ -86,6 +86,10 @@ func (ws *WrappedState) NewestTransactionByID(id []byte) (proto.Transaction, err return ws.diff.state.NewestTransactionByID(id) } +func (ws *WrappedState) RetrieveEntries(account proto.Recipient) ([]proto.DataEntry, error) { + return ws.diff.state.RetrieveEntries(account) +} + func (ws *WrappedState) NewestTransactionHeightByID(id []byte) (uint64, error) { return ws.diff.state.NewestTransactionHeightByID(id) } diff --git a/pkg/ride/smart_state_moq_test.go b/pkg/ride/smart_state_moq_test.go index 4fd0eacc2..21755cd21 100644 --- a/pkg/ride/smart_state_moq_test.go +++ b/pkg/ride/smart_state_moq_test.go @@ -87,6 +87,9 @@ var _ types.SmartState = &MockSmartState{} // NewestWavesBalanceFunc: func(account proto.Recipient) (uint64, error) { // panic("mock out the NewestWavesBalance method") // }, +// RetrieveEntriesFunc: func(account proto.Recipient) ([]proto.DataEntry, error) { +// panic("mock out the RetrieveEntries method") +// }, // RetrieveNewestBinaryEntryFunc: func(account proto.Recipient, key string) (*proto.BinaryDataEntry, error) { // panic("mock out the RetrieveNewestBinaryEntry method") // }, @@ -175,6 +178,9 @@ type MockSmartState struct { // NewestWavesBalanceFunc mocks the NewestWavesBalance method. NewestWavesBalanceFunc func(account proto.Recipient) (uint64, error) + // RetrieveEntriesFunc mocks the RetrieveEntries method. + RetrieveEntriesFunc func(account proto.Recipient) ([]proto.DataEntry, error) + // RetrieveNewestBinaryEntryFunc mocks the RetrieveNewestBinaryEntry method. RetrieveNewestBinaryEntryFunc func(account proto.Recipient, key string) (*proto.BinaryDataEntry, error) @@ -302,6 +308,11 @@ type MockSmartState struct { // Account is the account argument value. Account proto.Recipient } + // RetrieveEntries holds details about calls to the RetrieveEntries method. + RetrieveEntries []struct { + // Account is the account argument value. + Account proto.Recipient + } // RetrieveNewestBinaryEntry holds details about calls to the RetrieveNewestBinaryEntry method. RetrieveNewestBinaryEntry []struct { // Account is the account argument value. @@ -358,6 +369,7 @@ type MockSmartState struct { lockNewestTransactionByID sync.RWMutex lockNewestTransactionHeightByID sync.RWMutex lockNewestWavesBalance sync.RWMutex + lockRetrieveEntries sync.RWMutex lockRetrieveNewestBinaryEntry sync.RWMutex lockRetrieveNewestBooleanEntry sync.RWMutex lockRetrieveNewestIntegerEntry sync.RWMutex @@ -1067,6 +1079,38 @@ func (mock *MockSmartState) NewestWavesBalanceCalls() []struct { return calls } +// RetrieveEntries calls RetrieveEntriesFunc. +func (mock *MockSmartState) RetrieveEntries(account proto.Recipient) ([]proto.DataEntry, error) { + if mock.RetrieveEntriesFunc == nil { + panic("MockSmartState.RetrieveEntriesFunc: method is nil but SmartState.RetrieveEntries was just called") + } + callInfo := struct { + Account proto.Recipient + }{ + Account: account, + } + mock.lockRetrieveEntries.Lock() + mock.calls.RetrieveEntries = append(mock.calls.RetrieveEntries, callInfo) + mock.lockRetrieveEntries.Unlock() + return mock.RetrieveEntriesFunc(account) +} + +// RetrieveEntriesCalls gets all the calls that were made to RetrieveEntries. +// Check the length with: +// +// len(mockedSmartState.RetrieveEntriesCalls()) +func (mock *MockSmartState) RetrieveEntriesCalls() []struct { + Account proto.Recipient +} { + var calls []struct { + Account proto.Recipient + } + mock.lockRetrieveEntries.RLock() + calls = mock.calls.RetrieveEntries + mock.lockRetrieveEntries.RUnlock() + return calls +} + // RetrieveNewestBinaryEntry calls RetrieveNewestBinaryEntryFunc. func (mock *MockSmartState) RetrieveNewestBinaryEntry(account proto.Recipient, key string) (*proto.BinaryDataEntry, error) { if mock.RetrieveNewestBinaryEntryFunc == nil { diff --git a/pkg/state/accounts_data_storage.go b/pkg/state/accounts_data_storage.go index c60e2539d..e21c99b47 100644 --- a/pkg/state/accounts_data_storage.go +++ b/pkg/state/accounts_data_storage.go @@ -8,6 +8,7 @@ import ( "github.com/pkg/errors" "github.com/wavesplatform/gowaves/pkg/keyvalue" "github.com/wavesplatform/gowaves/pkg/proto" + "github.com/wavesplatform/gowaves/pkg/state/stateerr" "go.uber.org/zap" ) @@ -242,7 +243,7 @@ func (s *accountsDataStorage) entryBytes(addr proto.Address, entryKey string) ([ func (s *accountsDataStorage) retrieveEntries(addr proto.Address) ([]proto.DataEntry, error) { addrNum, err := s.addrToNum(addr) if err != nil { - return nil, err + return nil, wrapErr(stateerr.NotFoundError, err) } key := accountsDataStorKey{addrNum: addrNum} iter, err := s.hs.newTopEntryIteratorByPrefix(key.accountPrefix()) diff --git a/pkg/state/address_transactions_test.go b/pkg/state/address_transactions_test.go index c2f201b76..e24b6d5da 100644 --- a/pkg/state/address_transactions_test.go +++ b/pkg/state/address_transactions_test.go @@ -13,7 +13,7 @@ import ( func testIterImpl(t *testing.T, params StateParams) { dataDir := t.TempDir() - st, err := NewState(dataDir, true, params, settings.MustMainNetSettings(), false) + st, err := NewState(dataDir, true, params, settings.MustMainNetSettings(), false, nil) require.NoError(t, err) t.Cleanup(func() { diff --git a/pkg/state/api.go b/pkg/state/api.go index 91d7818a8..eec5fdd03 100644 --- a/pkg/state/api.go +++ b/pkg/state/api.go @@ -36,9 +36,11 @@ type StateInfo interface { TopBlock() *proto.Block Block(blockID proto.BlockID) (*proto.Block, error) BlockByHeight(height proto.Height) (*proto.Block, error) + NewestBlockInfoByHeight(height proto.Height) (*proto.BlockInfo, error) // Header getters. Header(blockID proto.BlockID) (*proto.BlockHeader, error) HeaderByHeight(height proto.Height) (*proto.BlockHeader, error) + NewestHeaderByHeight(height uint64) (*proto.BlockHeader, error) // Height returns current blockchain height. Height() (proto.Height, error) // Height <---> blockID converters. @@ -232,8 +234,9 @@ func NewState( params StateParams, settings *settings.BlockchainSettings, enableLightNode bool, + bUpdatesPluginInfo *proto.BlockchainUpdatesPluginInfo, ) (State, error) { - s, err := newStateManager(dataDir, amend, params, settings, enableLightNode) + s, err := newStateManager(dataDir, amend, params, settings, enableLightNode, bUpdatesPluginInfo) if err != nil { return nil, errors.Wrap(err, "failed to create new state instance") } diff --git a/pkg/state/appender.go b/pkg/state/appender.go index 118e207d0..4303a0add 100644 --- a/pkg/state/appender.go +++ b/pkg/state/appender.go @@ -56,6 +56,8 @@ type txAppender struct { // buildApiData flag indicates that additional data for API is built when // appending transactions. buildApiData bool + + bUpdatesPluginInfo *proto.BlockchainUpdatesPluginInfo } func newTxAppender( @@ -66,6 +68,7 @@ func newTxAppender( stateDB *stateDB, atx *addressTransactions, snapshotApplier *blockSnapshotsApplier, + bUpdatesPluginInfo *proto.BlockchainUpdatesPluginInfo, ) (*txAppender, error) { buildAPIData, err := stateDB.stateStoresApiData() if err != nil { @@ -99,21 +102,22 @@ func newTxAppender( ia := newInvokeApplier(state, sc, txHandler, stor, settings, blockDiffer, diffStorInvoke, diffApplier) ethKindResolver := proto.NewEthereumTransactionKindResolver(state, settings.AddressSchemeCharacter) return &txAppender{ - sc: sc, - ia: ia, - rw: rw, - blockInfoProvider: state, - atx: atx, - stor: stor, - settings: settings, - txHandler: txHandler, - blockDiffer: blockDiffer, - recentTxIds: make(map[string]struct{}), - diffStor: diffStor, - diffStorInvoke: diffStorInvoke, - diffApplier: diffApplier, - buildApiData: buildAPIData, - ethTxKindResolver: ethKindResolver, + sc: sc, + ia: ia, + rw: rw, + blockInfoProvider: state, + atx: atx, + stor: stor, + settings: settings, + txHandler: txHandler, + blockDiffer: blockDiffer, + recentTxIds: make(map[string]struct{}), + diffStor: diffStor, + diffStorInvoke: diffStorInvoke, + diffApplier: diffApplier, + buildApiData: buildAPIData, + ethTxKindResolver: ethKindResolver, + bUpdatesPluginInfo: bUpdatesPluginInfo, }, nil } @@ -838,6 +842,17 @@ func (a *txAppender) appendBlock(params *appendBlockParams) error { if err != nil { return err } + + // write updates into the updatesChannel here + if a.bUpdatesPluginInfo != nil && a.bUpdatesPluginInfo.IsBlockchainUpdatesEnabled() { + if a.bUpdatesPluginInfo.IsReady() { + errUpdt := a.updateBlockchainUpdateInfo(blockInfo, params.block, blockSnapshot) + if errUpdt != nil { + return errors.Wrapf(errUpdt, "failed to request blockchain info from L2 smart contract state") + } + } + } + // check whether the calculated snapshot state hash equals with the provided one if blockStateHash, present := params.block.GetStateHash(); present && blockStateHash != stateHash { return errors.Wrapf(errBlockSnapshotStateHashMismatch, @@ -859,6 +874,57 @@ func (a *txAppender) appendBlock(params *appendBlockParams) error { return a.blockDiffer.saveCurFeeDistr(params.block) } +func (a *txAppender) updateBlockchainUpdateInfo(blockInfo *proto.BlockInfo, blockHeader *proto.BlockHeader, + blockSnapshot proto.BlockSnapshot) error { + bUpdatesInfo := BuildBlockUpdatesInfoFromSnapshot(blockInfo, blockHeader, blockSnapshot, + a.bUpdatesPluginInfo.L2ContractAddress) + + if a.bUpdatesPluginInfo.IsFirstBlockDone() { + dataEntries, err := a.ia.state.RetrieveEntries(proto.NewRecipientFromAddress(a.bUpdatesPluginInfo.L2ContractAddress)) + if err != nil && !a.ia.state.IsNotFound(err) { + return err + } + bUpdatesInfo.ContractUpdatesInfo.AllDataEntries = dataEntries + a.bUpdatesPluginInfo.FirstBlockDone() + a.bUpdatesPluginInfo.WriteBUpdates(bUpdatesInfo) + return nil + } + + a.bUpdatesPluginInfo.WriteBUpdates(bUpdatesInfo) + return nil +} + +func BuildBlockUpdatesInfoFromSnapshot(blockInfo *proto.BlockInfo, blockHeader *proto.BlockHeader, + blockSnapshot proto.BlockSnapshot, l2ContractAddress proto.WavesAddress) proto.BUpdatesInfo { + blockID := blockHeader.BlockID() + bUpdatesInfo := proto.BUpdatesInfo{ + BlockUpdatesInfo: proto.BlockUpdatesInfo{ + Height: blockInfo.Height, + VRF: blockInfo.VRF, + BlockID: blockID, + BlockHeader: *blockHeader, + }, + ContractUpdatesInfo: proto.L2ContractDataEntries{ + AllDataEntries: nil, + Height: blockInfo.Height, + BlockID: blockID, + }, + } + + // Write the L2 contract updates into the structure. + for _, txSnapshots := range blockSnapshot.TxSnapshots { + for _, snapshot := range txSnapshots { + if dataEntriesSnapshot, ok := snapshot.(*proto.DataEntriesSnapshot); ok { + if dataEntriesSnapshot.Address == l2ContractAddress { + bUpdatesInfo.ContractUpdatesInfo.AllDataEntries = append(bUpdatesInfo.ContractUpdatesInfo.AllDataEntries, + dataEntriesSnapshot.DataEntries...) + } + } + } + } + return bUpdatesInfo +} + func (a *txAppender) createCheckerInfo(params *appendBlockParams) (*checkerInfo, error) { rideV5Activated, err := a.stor.features.newestIsActivated(int16(settings.RideV5)) if err != nil { diff --git a/pkg/state/history_storage.go b/pkg/state/history_storage.go index cf40deb09..8b604c6ee 100644 --- a/pkg/state/history_storage.go +++ b/pkg/state/history_storage.go @@ -593,7 +593,6 @@ func (hs *historyStorage) getHistory(key []byte, update bool) (*historyRecord, e // So we do both read and write under same lock. hs.writeLock.Lock() defer hs.writeLock.Unlock() - historyBytes, err := hs.db.Get(key) if err != nil { return nil, err // `keyvalue.ErrNotFound` is possible here along with other unwrapped DB errors diff --git a/pkg/state/invoke_applier_test.go b/pkg/state/invoke_applier_test.go index f1a514c6d..7882eb856 100644 --- a/pkg/state/invoke_applier_test.go +++ b/pkg/state/invoke_applier_test.go @@ -35,7 +35,8 @@ type invokeApplierTestObjects struct { } func createInvokeApplierTestObjects(t *testing.T) *invokeApplierTestObjects { - state, err := newStateManager(t.TempDir(), true, DefaultTestingStateParams(), settings.MustMainNetSettings(), false) + state, err := newStateManager(t.TempDir(), true, DefaultTestingStateParams(), + settings.MustMainNetSettings(), false, nil) assert.NoError(t, err, "newStateManager() failed") to := &invokeApplierTestObjects{state} randGenesisBlockID := genRandBlockId(t) diff --git a/pkg/state/smart_state_moq_test.go b/pkg/state/smart_state_moq_test.go index d3a2e656d..00d678954 100644 --- a/pkg/state/smart_state_moq_test.go +++ b/pkg/state/smart_state_moq_test.go @@ -87,6 +87,9 @@ var _ types.EnrichedSmartState = &AnotherMockSmartState{} // NewestWavesBalanceFunc: func(account proto.Recipient) (uint64, error) { // panic("mock out the NewestWavesBalance method") // }, +// RetrieveEntriesFunc: func(account proto.Recipient) ([]proto.DataEntry, error) { +// panic("mock out the RetrieveEntries method") +// }, // RetrieveNewestBinaryEntryFunc: func(account proto.Recipient, key string) (*proto.BinaryDataEntry, error) { // panic("mock out the RetrieveNewestBinaryEntry method") // }, @@ -175,6 +178,9 @@ type AnotherMockSmartState struct { // NewestWavesBalanceFunc mocks the NewestWavesBalance method. NewestWavesBalanceFunc func(account proto.Recipient) (uint64, error) + // RetrieveEntriesFunc mocks the RetrieveEntries method. + RetrieveEntriesFunc func(account proto.Recipient) ([]proto.DataEntry, error) + // RetrieveNewestBinaryEntryFunc mocks the RetrieveNewestBinaryEntry method. RetrieveNewestBinaryEntryFunc func(account proto.Recipient, key string) (*proto.BinaryDataEntry, error) @@ -302,6 +308,11 @@ type AnotherMockSmartState struct { // Account is the account argument value. Account proto.Recipient } + // RetrieveEntries holds details about calls to the RetrieveEntries method. + RetrieveEntries []struct { + // Account is the account argument value. + Account proto.Recipient + } // RetrieveNewestBinaryEntry holds details about calls to the RetrieveNewestBinaryEntry method. RetrieveNewestBinaryEntry []struct { // Account is the account argument value. @@ -358,6 +369,7 @@ type AnotherMockSmartState struct { lockNewestTransactionByID sync.RWMutex lockNewestTransactionHeightByID sync.RWMutex lockNewestWavesBalance sync.RWMutex + lockRetrieveEntries sync.RWMutex lockRetrieveNewestBinaryEntry sync.RWMutex lockRetrieveNewestBooleanEntry sync.RWMutex lockRetrieveNewestIntegerEntry sync.RWMutex @@ -1067,6 +1079,38 @@ func (mock *AnotherMockSmartState) NewestWavesBalanceCalls() []struct { return calls } +// RetrieveEntries calls RetrieveEntriesFunc. +func (mock *AnotherMockSmartState) RetrieveEntries(account proto.Recipient) ([]proto.DataEntry, error) { + if mock.RetrieveEntriesFunc == nil { + panic("AnotherMockSmartState.RetrieveEntriesFunc: method is nil but SmartState.RetrieveEntries was just called") + } + callInfo := struct { + Account proto.Recipient + }{ + Account: account, + } + mock.lockRetrieveEntries.Lock() + mock.calls.RetrieveEntries = append(mock.calls.RetrieveEntries, callInfo) + mock.lockRetrieveEntries.Unlock() + return mock.RetrieveEntriesFunc(account) +} + +// RetrieveEntriesCalls gets all the calls that were made to RetrieveEntries. +// Check the length with: +// +// len(mockedSmartState.RetrieveEntriesCalls()) +func (mock *AnotherMockSmartState) RetrieveEntriesCalls() []struct { + Account proto.Recipient +} { + var calls []struct { + Account proto.Recipient + } + mock.lockRetrieveEntries.RLock() + calls = mock.calls.RetrieveEntries + mock.lockRetrieveEntries.RUnlock() + return calls +} + // RetrieveNewestBinaryEntry calls RetrieveNewestBinaryEntryFunc. func (mock *AnotherMockSmartState) RetrieveNewestBinaryEntry(account proto.Recipient, key string) (*proto.BinaryDataEntry, error) { if mock.RetrieveNewestBinaryEntryFunc == nil { diff --git a/pkg/state/state.go b/pkg/state/state.go index 857d9a8c2..45b9a9623 100644 --- a/pkg/state/state.go +++ b/pkg/state/state.go @@ -520,6 +520,7 @@ func newStateManager( params StateParams, settings *settings.BlockchainSettings, enableLightNode bool, + bUpdatesPluginInfo *proto.BlockchainUpdatesPluginInfo, ) (_ *stateManager, retErr error) { if err := validateSettings(settings); err != nil { return nil, err @@ -604,7 +605,7 @@ func newStateManager( // Set fields which depend on state. // Consensus validator is needed to check block headers. snapshotApplier := newBlockSnapshotsApplier(nil, newSnapshotApplierStorages(stor, rw)) - appender, err := newTxAppender(state, rw, stor, settings, sdb, atx, &snapshotApplier) + appender, err := newTxAppender(state, rw, stor, settings, sdb, atx, &snapshotApplier, bUpdatesPluginInfo) if err != nil { return nil, wrapErr(stateerr.Other, err) } diff --git a/pkg/state/state_test.go b/pkg/state/state_test.go index d72a5f61e..ca1cebe09 100644 --- a/pkg/state/state_test.go +++ b/pkg/state/state_test.go @@ -47,7 +47,7 @@ func bigFromStr(s string) *big.Int { func newTestState(t *testing.T, amend bool, params StateParams, settings *settings.BlockchainSettings) State { dataDir := t.TempDir() - m, err := NewState(dataDir, amend, params, settings, false) + m, err := NewState(dataDir, amend, params, settings, false, nil) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, m.Close(), "manager.Close() failed") @@ -57,7 +57,7 @@ func newTestState(t *testing.T, amend bool, params StateParams, settings *settin func newTestStateManager(t *testing.T, amend bool, params StateParams, settings *settings.BlockchainSettings) *stateManager { dataDir := t.TempDir() - m, err := newStateManager(dataDir, amend, params, settings, false) + m, err := newStateManager(dataDir, amend, params, settings, false, nil) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, m.Close(), "manager.Close() failed") @@ -69,7 +69,7 @@ func TestHandleAmendFlag(t *testing.T) { dataDir := t.TempDir() bs := settings.MustMainNetSettings() // first open with false amend - manager, err := newStateManager(dataDir, false, DefaultTestingStateParams(), bs, false) + manager, err := newStateManager(dataDir, false, DefaultTestingStateParams(), bs, false, nil) assert.NoError(t, err, "newStateManager() failed") t.Cleanup(func() { assert.NoError(t, manager.Close(), "manager.Close() failed") @@ -78,18 +78,20 @@ func TestHandleAmendFlag(t *testing.T) { // open with true amend assert.NoError(t, manager.Close(), "manager.Close() failed") - manager, err = newStateManager(dataDir, true, DefaultTestingStateParams(), bs, false) + manager, err = newStateManager(dataDir, true, DefaultTestingStateParams(), bs, false, nil) assert.NoError(t, err, "newStateManager() failed") assert.True(t, manager.stor.hs.amend) // open with false amend again. Result amend should be true assert.NoError(t, manager.Close(), "manager.Close() failed") - manager, err = newStateManager(dataDir, false, DefaultTestingStateParams(), bs, false) + manager, err = newStateManager(dataDir, false, DefaultTestingStateParams(), bs, + false, nil) assert.NoError(t, err, "newStateManager() failed") assert.True(t, manager.stor.hs.amend) // first open with true amend - newManager, err := newStateManager(t.TempDir(), true, DefaultTestingStateParams(), bs, false) + newManager, err := newStateManager(t.TempDir(), true, DefaultTestingStateParams(), bs, + false, nil) assert.NoError(t, err, "newStateManager() failed") t.Cleanup(func() { assert.NoError(t, newManager.Close(), "newManager.Close() failed") @@ -394,7 +396,7 @@ func TestStateManager_TopBlock(t *testing.T) { bs := settings.MustMainNetSettings() assert.NoError(t, err) dataDir := t.TempDir() - manager, err := newStateManager(dataDir, true, DefaultTestingStateParams(), bs, false) + manager, err := newStateManager(dataDir, true, DefaultTestingStateParams(), bs, false, nil) assert.NoError(t, err, "newStateManager() failed") t.Cleanup(func() { @@ -428,7 +430,7 @@ func TestStateManager_TopBlock(t *testing.T) { // Test after closure. err = manager.Close() assert.NoError(t, err, "manager.Close() failed") - manager, err = newStateManager(dataDir, true, DefaultTestingStateParams(), settings.MustMainNetSettings(), false) + manager, err = newStateManager(dataDir, true, DefaultTestingStateParams(), settings.MustMainNetSettings(), false, nil) assert.NoError(t, err, "newStateManager() failed") assert.Equal(t, correct, manager.TopBlock()) } @@ -523,6 +525,7 @@ func createMockStateManager(t *testing.T, bs *settings.BlockchainSettings) (*sta state.stateDB, state.atx, &snapshotApplier, + nil, ) require.NoError(t, err, "newTxAppender() failed") state.appender = appender diff --git a/pkg/state/threadsafe_wrapper.go b/pkg/state/threadsafe_wrapper.go index 461f8f457..6ffe84b06 100644 --- a/pkg/state/threadsafe_wrapper.go +++ b/pkg/state/threadsafe_wrapper.go @@ -28,6 +28,12 @@ func (a *ThreadSafeReadWrapper) BlockVRF(blockHeader *proto.BlockHeader, blockHe return a.s.BlockVRF(blockHeader, blockHeight) } +func (a *ThreadSafeReadWrapper) NewestBlockInfoByHeight(height proto.Height) (*proto.BlockInfo, error) { + a.mu.RLock() + defer a.mu.RUnlock() + return a.s.NewestBlockInfoByHeight(height) +} + func (a *ThreadSafeReadWrapper) MapR(f func(StateInfo) (interface{}, error)) (interface{}, error) { a.mu.RLock() defer a.mu.RUnlock() @@ -62,6 +68,12 @@ func (a *ThreadSafeReadWrapper) HeaderByHeight(height proto.Height) (*proto.Bloc return a.s.HeaderByHeight(height) } +func (a *ThreadSafeReadWrapper) NewestHeaderByHeight(height uint64) (*proto.BlockHeader, error) { + a.mu.RLock() + defer a.mu.RUnlock() + return a.s.NewestHeaderByHeight(height) +} + func (a *ThreadSafeReadWrapper) Height() (proto.Height, error) { a.mu.RLock() defer a.mu.RUnlock() diff --git a/pkg/state/transaction_handler.go b/pkg/state/transaction_handler.go index a3964ba47..103af3f89 100644 --- a/pkg/state/transaction_handler.go +++ b/pkg/state/transaction_handler.go @@ -264,6 +264,7 @@ func (h *transactionHandler) performTx( if err := snapshot.Apply(h.sa, tx, validatingUTX); err != nil { return txSnapshot{}, errors.Wrap(err, "failed to apply transaction snapshot") } + return snapshot, nil } diff --git a/pkg/types/types.go b/pkg/types/types.go index 222e1268f..ec7397f28 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -125,6 +125,7 @@ type SmartState interface { RetrieveNewestBooleanEntry(account proto.Recipient, key string) (*proto.BooleanDataEntry, error) RetrieveNewestStringEntry(account proto.Recipient, key string) (*proto.StringDataEntry, error) RetrieveNewestBinaryEntry(account proto.Recipient, key string) (*proto.BinaryDataEntry, error) + RetrieveEntries(account proto.Recipient) ([]proto.DataEntry, error) NewestAssetIsSponsored(assetID crypto.Digest) (bool, error) NewestAssetConstInfo(assetID proto.AssetID) (*proto.AssetConstInfo, error) NewestAssetInfo(assetID crypto.Digest) (*proto.AssetInfo, error)