From 0b61270db65a098d39fa7c4db884736c7e55a135 Mon Sep 17 00:00:00 2001 From: Sergey Nazarov Date: Wed, 7 May 2025 13:23:32 +0300 Subject: [PATCH] Exporter speedup --- .../scala/com/wavesplatform/Exporter.scala | 226 +++++++----------- .../scala/com/wavesplatform/Importer.scala | 80 ++++--- .../scala/com/wavesplatform/block/Block.scala | 2 +- .../block/serialization/package.scala | 2 +- .../state/ParSignatureChecker.scala | 66 +++-- 5 files changed, 164 insertions(+), 212 deletions(-) diff --git a/node/src/main/scala/com/wavesplatform/Exporter.scala b/node/src/main/scala/com/wavesplatform/Exporter.scala index 171e544a67d..a13558bd4fc 100644 --- a/node/src/main/scala/com/wavesplatform/Exporter.scala +++ b/node/src/main/scala/com/wavesplatform/Exporter.scala @@ -1,23 +1,30 @@ package com.wavesplatform import com.google.common.collect.AbstractIterator -import com.google.common.primitives.Ints -import com.wavesplatform.block.Block +import com.google.common.primitives.{Bytes, Ints} +import com.google.protobuf.{ByteString, CodedInputStream} +import com.wavesplatform.block.{Block, BlockHeader} +import com.wavesplatform.block.serialization.{BlockSerializer, mkTxsCountBytes, writeTransactionData} import com.wavesplatform.database.protobuf.BlockMeta -import com.wavesplatform.database.{KeyTag, RDB, createBlock, readBlockMeta, readTransaction} +import com.wavesplatform.database.{Caches, KeyTag, RDB, createBlock, readBlockMeta, readTransaction} import com.wavesplatform.events.BlockchainUpdateTriggers +import com.wavesplatform.features.{BlockchainFeature, BlockchainFeatures} import com.wavesplatform.history.StorageFactory import com.wavesplatform.metrics.Metrics import com.wavesplatform.protobuf.ByteStringExt -import com.wavesplatform.protobuf.block.PBBlocks +import com.wavesplatform.protobuf.block.PBBlocks.protobuf +import com.wavesplatform.protobuf.block.{PBBlock, PBBlocks} +import com.wavesplatform.protobuf.transaction.{PBTransactions, SignedTransaction} +import com.wavesplatform.protobuf.utils.PBUtils import com.wavesplatform.state.Height -import com.wavesplatform.transaction.Transaction +import com.wavesplatform.transaction.{Transaction, TransactionParsers} import com.wavesplatform.utils.* import kamon.Kamon import org.rocksdb.{ColumnFamilyHandle, ReadOptions, RocksDB} import scopt.OParser import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStream} +import java.nio.ByteBuffer import scala.annotation.tailrec import scala.concurrent.Await import scala.concurrent.duration.* @@ -26,20 +33,10 @@ import scala.util.Using.Releasable import scala.util.{Failure, Success, Try, Using} object Exporter extends ScorexLogging { - private[wavesplatform] object Formats { - val Binary = "BINARY" - val Protobuf = "PROTOBUF" - - def list: Seq[String] = Seq(Binary, Protobuf) - def default: String = Binary - - def isSupported(f: String): Boolean = list.contains(f.toUpperCase) - } - // noinspection ScalaStyle def main(args: Array[String]): Unit = { OParser.parse(commandParser, args, ExporterOptions()).foreach { - case ExporterOptions(configFile, blocksOutputFileNamePrefix, snapshotsOutputFileNamePrefix, exportHeight, format) => + case ExporterOptions(configFile, blocksOutputFileNamePrefix, snapshotsOutputFileNamePrefix, exportHeight) => val settings = Application.loadApplicationConfig(configFile) Using.resources( @@ -76,22 +73,51 @@ object Exporter extends ScorexLogging { var exportedSnapshotsBytes = 0L val start = System.currentTimeMillis() - new BlockSnapshotIterator(rdb, height, exportSnapshots).asScala.foreach { case (h, block, txSnapshots) => - val txCount = block.transactionData.length - if (exportSnapshots && txCount != txSnapshots.length) - throw new RuntimeException( - s"${txSnapshots.length} snapshot(s) don't match $txCount transaction(s) on height $h, data is corrupted" + val txIterator = new DataIterator[ByteString | SignedTransaction]( + rdb.db, rdb.txHandle.handle, KeyTag.NthTransactionInfoAtHeight.prefixBytes, _.slice(2, 6), + height => bytes => readTx(CodedInputStream.newInstance(bytes)) + ) + + var counter = 0 + + val blockMetaIterator: DataIterator[BlockMeta] = + new DataIterator[BlockMeta]( + rdb.db, + rdb.db.getDefaultColumnFamily, + KeyTag.BlockInfoAtHeight.prefixBytes, + _.takeRight(Ints.BYTES), + _ => readBlockMeta + ) + + while (blockMetaIterator.hasNext) { + val meta = blockMetaIterator.next()._2 + val txCount = meta.transactionCount + val txs = txIterator.asScala.take(txCount).toSeq + val signedHeader = Caches.toSignedHeader(meta) + val blockBytes = if (signedHeader.header.version >= Block.ProtoBlockVersion) + PBUtils.encodeDeterministic(new PBBlock( + Some(protobuf(signedHeader.header)), + ByteString.copyFrom(signedHeader.signature.arr), + txs.map(_._2 match { + case s: SignedTransaction => s + case bs: ByteString => PBTransactions.protobuf(TransactionParsers.parseBytes(bs.toByteArray).get) + }).toSeq + )) + else { + Bytes.concat( + BlockSerializer.mkPrefixBytes(signedHeader.header), + mkTxsDataBytes(signedHeader.header, txs.map(_._2.asInstanceOf[ByteString].toByteArray).toSeq), + BlockSerializer.mkSuffixBytes(signedHeader.header, signedHeader.signature) ) - exportedBlocksBytes += IO.exportBlock(blocksStream, Some(block), format == Formats.Binary) - snapshotsStream.foreach { output => - exportedSnapshotsBytes += IO.exportBlockTxSnapshots(output, txSnapshots) } + blocksStream.write(Ints.toByteArray(blockBytes.length)) + blocksStream.write(blockBytes) + exportedBlocksBytes += blockBytes.length - if (h % (height / 10) == 0) { - log.info( - s"$h blocks exported, ${humanReadableSize(exportedBlocksBytes)} written for blocks${snapshotsLogInfo(exportSnapshots, exportedSnapshotsBytes)}" - ) + if ((counter + txCount) / 1_000_000 - (counter / 1_000_000) > 0) { + log.info(f"Exported ${counter + txCount}%,d transactions, written $exportedBlocksBytes%,d bytes") } + counter += txCount } val duration = System.currentTimeMillis() - start log @@ -107,96 +133,6 @@ object Exporter extends ScorexLogging { } } - private class BlockSnapshotIterator(rdb: RDB, targetHeight: Int, exportSnapshots: Boolean) - extends AbstractIterator[(Int, Block, Seq[Array[Byte]])] { - var nextTxEntry: Option[(Int, Transaction)] = None - var nextSnapshotEntry: Option[(Int, Array[Byte])] = None - - val blockMetaIterator: DataIterator[BlockMeta] = - new DataIterator[BlockMeta]( - rdb.db, - rdb.db.getDefaultColumnFamily, - KeyTag.BlockInfoAtHeight.prefixBytes, - _.takeRight(Ints.BYTES), - _ => readBlockMeta - ) - val txIterator: DataIterator[Transaction] = { - val prefixBytes = KeyTag.NthTransactionInfoAtHeight.prefixBytes - new DataIterator( - rdb.db, - rdb.txHandle.handle, - prefixBytes, - _.slice(prefixBytes.length, prefixBytes.length + Ints.BYTES), - h => readTransaction(Height(h))(_)._2 - ) - } - val snapshotIterator: DataIterator[Array[Byte]] = { - val prefixBytes = KeyTag.NthTransactionStateSnapshotAtHeight.prefixBytes - new DataIterator( - rdb.db, - rdb.txSnapshotHandle.handle, - prefixBytes, - _.slice(prefixBytes.length, prefixBytes.length + Ints.BYTES), - _ => identity - ) - } - - @tailrec - private def loadTxData[A](acc: Seq[A], height: Int, iterator: DataIterator[A], updateNextEntryF: (Int, A) => Unit): Seq[A] = { - if (iterator.hasNext) { - val (h, txData) = iterator.next() - if (h == height) { - loadTxData(txData +: acc, height, iterator, updateNextEntryF) - } else { - updateNextEntryF(h, txData) - acc.reverse - } - } else acc.reverse - } - - @tailrec - override final def computeNext(): (Int, Block, Seq[Array[Byte]]) = { - if (blockMetaIterator.hasNext) { - val (h, meta) = blockMetaIterator.next() - if (h <= targetHeight) { - val txs = nextTxEntry match { - case Some((txHeight, tx)) if txHeight == h => - nextTxEntry = None - loadTxData[Transaction](Seq(tx), h, txIterator, (h, tx) => nextTxEntry = Some(h -> tx)) - case Some(_) => Seq.empty - case _ => loadTxData[Transaction](Seq.empty, h, txIterator, (h, tx) => nextTxEntry = Some(h -> tx)) - } - val snapshots = if (exportSnapshots) { - nextSnapshotEntry match { - case Some((snapshotHeight, txSnapshot)) if snapshotHeight == h => - nextSnapshotEntry = None - loadTxData[Array[Byte]](Seq(txSnapshot), h, snapshotIterator, (h, sn) => nextSnapshotEntry = Some(h -> sn)) - case Some(_) => Seq.empty - case _ => loadTxData[Array[Byte]](Seq.empty, h, snapshotIterator, (h, sn) => nextSnapshotEntry = Some(h -> sn)) - } - } else Seq.empty - createBlock(PBBlocks.vanilla(meta.getHeader), meta.signature.toByteStr, txs).toOption - .map(block => (h, block, snapshots)) match { - case Some(r) => r - case None => computeNext() - } - } else { - closeResources() - endOfData() - } - } else { - closeResources() - endOfData() - } - } - - def closeResources(): Unit = { - txIterator.closeResources() - snapshotIterator.closeResources() - blockMetaIterator.closeResources() - } - } - private class DataIterator[A]( db: RocksDB, cfHandle: ColumnFamilyHandle, @@ -269,20 +205,13 @@ object Exporter extends ScorexLogging { fullSize + Ints.BYTES } - - def writeString(stream: OutputStream, str: String): Int = { - val bytes = str.utf8Bytes - stream.write(bytes) - bytes.length - } } private final case class ExporterOptions( configFileName: Option[File] = None, blocksOutputFileNamePrefix: String = "blockchain", snapshotsFileNamePrefix: Option[String] = None, - exportHeight: Option[Int] = None, - format: String = Formats.Binary + exportHeight: Option[Int] = None ) private lazy val commandParser = { @@ -307,18 +236,6 @@ object Exporter extends ScorexLogging { .text("Export to height") .action((h, c) => c.copy(exportHeight = Some(h))) .validate(h => if (h > 0) success else failure("Export height must be > 0")), - opt[String]('f', "format") - .hidden() - .text("Output file format") - .valueName(s"<${Formats.list.mkString("|")}> (default is ${Formats.default})") - .action { (f, c) => - log.warn("Export file format option is deprecated and will be removed eventually") - c.copy(format = f) - } - .validate { - case f if Formats.isSupported(f.toUpperCase) => success - case f => failure(s"Unsupported format: $f") - }, opt[Int]('h', "height") .text("Export to height") .action((h, c) => c.copy(exportHeight = Some(h))) @@ -342,4 +259,37 @@ object Exporter extends ScorexLogging { if (exportSnapshots) { s", ${humanReadableSize(exportedSnapshotsBytes)} for snapshots" } else "" + + @tailrec + private final def readTx(in: CodedInputStream): ByteString | SignedTransaction = + in.readTag() match { + case 0 => throw new IllegalArgumentException("no tx found") + case 42 => SignedTransaction(SignedTransaction.Transaction.EthereumTransaction(in.readBytes())) + case 18 => scalapb.LiteParser.readMessage[SignedTransaction](in) + case 10 => in.readBytes() + case 24 => + in.readEnum() + readTx(in) + case 32 => + in.readInt64() + readTx(in) + case tag => + throw new IllegalArgumentException("unexpected field") + } + + def mkTxsDataBytes(header: BlockHeader, transactions: Seq[Array[Byte]]): Array[Byte] = { + val transactionsDataBytes = writeTransactionData(header.version, transactions) + Bytes.concat( + Ints.toByteArray(transactionsDataBytes.length), + transactionsDataBytes + ) + } + + def writeTransactionData(version: Byte, txsBytes: Seq[Array[Byte]]): Array[Byte] = { + val txsBytesSize = txsBytes.map(_.length + Ints.BYTES).sum + val txsBuf = ByteBuffer.allocate(txsBytesSize) + txsBytes.foreach(tx => txsBuf.putInt(tx.length).put(tx)) + + Bytes.concat(mkTxsCountBytes(version, txsBytes.size), txsBuf.array()) + } } diff --git a/node/src/main/scala/com/wavesplatform/Importer.scala b/node/src/main/scala/com/wavesplatform/Importer.scala index c792c81777f..fcb9b0b14d9 100644 --- a/node/src/main/scala/com/wavesplatform/Importer.scala +++ b/node/src/main/scala/com/wavesplatform/Importer.scala @@ -4,9 +4,8 @@ import cats.implicits.catsSyntaxOption import cats.syntax.apply.* import com.google.common.io.ByteStreams import com.google.common.primitives.{Ints, Longs} -import com.wavesplatform.Exporter.Formats import com.wavesplatform.api.common.{CommonAccountsApi, CommonAssetsApi, CommonBlocksApi, CommonTransactionsApi} -import com.wavesplatform.block.{Block, BlockHeader} +import com.wavesplatform.block.{Block, BlockHeader, mkMerkleTree} import com.wavesplatform.common.state.ByteStr import com.wavesplatform.consensus.PoSSelector import com.wavesplatform.database.{DBExt, KeyTag, RDB} @@ -21,9 +20,8 @@ import com.wavesplatform.protobuf.block.{PBBlocks, VanillaBlock} import com.wavesplatform.protobuf.snapshot.TransactionStateSnapshot import com.wavesplatform.settings.WavesSettings import com.wavesplatform.state.BlockchainUpdaterImpl.BlockApplyResult -import com.wavesplatform.state.ParSignatureChecker.sigverify import com.wavesplatform.state.appender.BlockAppender -import com.wavesplatform.state.{Blockchain, BlockchainUpdaterImpl, Height, ParSignatureChecker} +import com.wavesplatform.state.{Blockchain, BlockchainUpdaterImpl, Height} import com.wavesplatform.transaction.TxValidationError.GenericError import com.wavesplatform.transaction.smart.script.trace.TracedResult import com.wavesplatform.transaction.{DiscardedBlocks, Transaction} @@ -54,8 +52,8 @@ object Importer extends ScorexLogging { blockchainFile: String = "blockchain", snapshotsFile: Option[String] = None, importHeight: Int = Int.MaxValue, - format: String = Formats.Binary, verify: Boolean = true, + dryRun: Boolean = false, maxQueueSize: Int = 100 ) @@ -83,18 +81,12 @@ object Importer extends ScorexLogging { .text("Import to height") .action((h, c) => c.copy(importHeight = h)) .validate(h => if (h > 0) success else failure("Import height must be > 0")), - opt[String]('f', "format") - .hidden() - .text("Blockchain data file format") - .action((f, c) => c.copy(format = f)) - .valueName(s"<${Formats.list.mkString("|")}> (default is ${Formats.default})") - .validate { - case f if Formats.isSupported(f) => success - case f => failure(s"Unsupported format: $f") - }, opt[Unit]('n', "no-verify") .text("Disable signatures verification") .action((_, c) => c.copy(verify = false)), + opt[Unit]("dry-run") + .text("Do not import, only verify signatures") + .action((_, c) => c.copy(dryRun = true)), opt[Int]('q', "max-queue-size") .text("Max size of blocks' queue") .action((maxSize, c) => c.copy(maxQueueSize = maxSize)) @@ -179,8 +171,9 @@ object Importer extends ScorexLogging { } } - @volatile private var quit = false - private val lock = new Object + @volatile private var quit = false + @volatile private var prevBlock = ByteStr(new Array[Byte](crypto.DigestLength)) + private val lock = new Object // noinspection UnstableApiUsage def startImport( @@ -266,7 +259,7 @@ object Importer extends ScorexLogging { ) } - ParSignatureChecker.checkBlockAndTxSignatures(block, blockSnapshot.isEmpty, rideV6) +// ParSignatureChecker.checkBlockAndTxSignatures(block, blockSnapshot.isEmpty, rideV6) queue.enqueue(block -> blockSnapshot) } @@ -299,14 +292,18 @@ object Importer extends ScorexLogging { } else { lock.synchronized { val (block, snapshot) = queue.dequeue() - if (blockchain.lastBlockId.contains(block.header.reference)) { + if (blockchain.lastBlockId.contains(block.header.reference) || prevBlock == block.header.reference) { Await.result(appendBlock(block, snapshot).runAsyncLogErr(appender), Duration.Inf) match { case Left(ve) => log.error(s"Error appending block: $ve") queue.clear() quit = true case _ => + prevBlock = block.id() counter = counter + 1 + if (counter % 1000 == 0) { + log.info(s"New height: $counter, block $prevBlock, queue size ${queue.size}") + } } } else if (!quit) { log.warn(s"Block $block is not a child of the last block ${blockchain.lastBlockId.get}") @@ -353,25 +350,39 @@ object Importer extends ScorexLogging { val extAppender: (Block, Option[BlockSnapshotResponse]) => Task[Either[ValidationError, BlockApplyResult]] = BlockAppender(blockchainUpdater, time, utxPool, pos, scheduler, importOptions.verify, txSignParCheck = false) + def nullAppender(b: Block, s: Option[BlockSnapshotResponse]): Task[Either[ValidationError, BlockApplyResult]] = + Task.now { + if (!crypto.verify(b.signature, b.bodyBytes(), b.header.generator, checkWeakPk = true)) Left(GenericError("Invalid header signature")) + else if (b.header.version > Block.ProtoBlockVersion && mkMerkleTree(b.transactionData) != b.header.transactionsRoot) + Left(GenericError("Invalid tx merkle root")) + else if ( + b.header.challengedHeader.exists(ch => + !crypto.verify( + ch.headerSignature, + PBBlocks.protobuf(b.originalHeader()).toByteArray, + ch.generator, + checkWeakPk = true + ) + ) + ) Left(GenericError("Invalid original header signature")) + else Right(BlockApplyResult.Ignored) + } + val extensions = initExtensions(settings, blockchainUpdater, scheduler, time, utxPool, rdb) checkGenesis(settings, blockchainUpdater, Miner.Disabled) - val blocksFileOffset = - importOptions.format match { - case Formats.Binary => - var blocksOffset = 0L - rdb.db.iterateOver(KeyTag.BlockInfoAtHeight) { e => - e.getKey match { - case Array(_, _, 0, 0, 0, 1) => // Skip genesis - case _ => - val meta = com.wavesplatform.database.readBlockMeta(e.getValue) - blocksOffset += meta.size + 4 - } - } - blocksOffset - case _ => - 0 + val blocksFileOffset = { + var blocksOffset = 0L + rdb.db.iterateOver(KeyTag.BlockInfoAtHeight) { e => + e.getKey match { + case Array(_, _, 0, 0, 0, 1) => // Skip genesis + case _ => + val meta = com.wavesplatform.database.readBlockMeta(e.getValue) + blocksOffset += meta.size + 4 + } } + blocksOffset + } val blocksInputStream = new BufferedInputStream(initFileStream(importOptions.blockchainFile, blocksFileOffset), 2 * 1024 * 1024) val snapshotsInputStream = importOptions.snapshotsFile @@ -417,6 +428,7 @@ object Importer extends ScorexLogging { scheduler.awaitTermination(10 seconds) // Terminate extensions + import scala.concurrent.ExecutionContext.Implicits.global Await.ready(Future.sequence(extensions.map(_.shutdown())), settings.extensionsShutdownTimeout) utxPool.close() @@ -432,7 +444,7 @@ object Importer extends ScorexLogging { blocksInputStream, snapshotsInputStream, blockchainUpdater, - extAppender, + if (importOptions.dryRun) nullAppender else extAppender, importOptions, blocksFileOffset == 0, scheduler diff --git a/node/src/main/scala/com/wavesplatform/block/Block.scala b/node/src/main/scala/com/wavesplatform/block/Block.scala index a3f704f5b00..a5fb4f14876 100644 --- a/node/src/main/scala/com/wavesplatform/block/Block.scala +++ b/node/src/main/scala/com/wavesplatform/block/Block.scala @@ -70,7 +70,7 @@ case class Block( private[block] val transactionsMerkleTree: Coeval[TransactionsMerkleTree] = Coeval.evalOnce(mkMerkleTree(transactionData)) - private[block] val originalHeader: Coeval[BlockHeader] = + val originalHeader: Coeval[BlockHeader] = Coeval.evalOnce( header.challengedHeader .map { ch => diff --git a/node/src/main/scala/com/wavesplatform/block/serialization/package.scala b/node/src/main/scala/com/wavesplatform/block/serialization/package.scala index 43ee72ad18f..380780867fa 100644 --- a/node/src/main/scala/com/wavesplatform/block/serialization/package.scala +++ b/node/src/main/scala/com/wavesplatform/block/serialization/package.scala @@ -11,7 +11,7 @@ import com.wavesplatform.serialization.ByteBufferOps import com.wavesplatform.transaction.{EthereumTransaction, Transaction, TransactionParsers} package object serialization { - private[block] def writeTransactionData(version: Byte, txs: Seq[Transaction]): Array[Byte] = { + def writeTransactionData(version: Byte, txs: Seq[Transaction]): Array[Byte] = { val txsBytes = txs.map(tx => if (version == ProtoBlockVersion) PBUtils.encodeDeterministic(PBTransactions.protobuf(tx)) else tx.bytes().ensuring(!tx.isInstanceOf[EthereumTransaction]) diff --git a/node/src/main/scala/com/wavesplatform/state/ParSignatureChecker.scala b/node/src/main/scala/com/wavesplatform/state/ParSignatureChecker.scala index 8d9925f7290..237d7a68f46 100644 --- a/node/src/main/scala/com/wavesplatform/state/ParSignatureChecker.scala +++ b/node/src/main/scala/com/wavesplatform/state/ParSignatureChecker.scala @@ -1,50 +1,40 @@ package com.wavesplatform.state -import cats.syntax.parallel.* import com.wavesplatform.block.Block import com.wavesplatform.transaction.{ProvenTransaction, Transaction} -import com.wavesplatform.utils.Schedulers -import monix.eval.Task -import monix.execution.schedulers.SchedulerService + +import java.util.concurrent.* object ParSignatureChecker { - implicit val sigverify: SchedulerService = Schedulers.fixedPool(4, "sigverify") + + private val rejectedHandler: RejectedExecutionHandler = (r: Runnable, executor: ThreadPoolExecutor) => + try executor.getQueue.put(r) + catch { + case ie: InterruptedException => + Thread.currentThread().interrupt() + throw new RejectedExecutionException("Task submission interrupted", ie) + } + + private val sigverify = new ThreadPoolExecutor(4, 8, 10, TimeUnit.SECONDS, new LinkedBlockingQueue[Runnable](100000), rejectedHandler) def checkTxSignatures(txs: Seq[Transaction], rideV6Activated: Boolean): Unit = - txs - .parUnorderedTraverse { - case tx: ProvenTransaction => - Task { - if (rideV6Activated) { - tx.firstProofIsValidSignatureAfterV6 - } else { - tx.firstProofIsValidSignatureBeforeV6 - } - }.void - case _ => Task.unit - } - .executeOn(sigverify) - .runAsyncAndForget - - def checkBlockAndTxSignatures(block: Block, checkTxSignatures: Boolean, rideV6Activated: Boolean): Unit = { - val verifiedObjects: Seq[Any] = (block +: block.transactionData) - verifiedObjects - .parTraverse { - case tx: ProvenTransaction if checkTxSignatures => - Task { - if (rideV6Activated) { - tx.firstProofIsValidSignatureAfterV6 - } else { - tx.firstProofIsValidSignatureBeforeV6 - } - }.void - case b: Block => Task(b.signatureValid()).void - case _ => Task.unit - } - .executeOn(sigverify) - .runAsyncAndForget + txs.foreach { + case tx: ProvenTransaction => + if (rideV6Activated) { + sigverify.execute(() => tx.firstProofIsValidSignatureAfterV6) + } else { + sigverify.execute(() => tx.firstProofIsValidSignatureBeforeV6) + } + case _ => + } + + def checkBlockAndTxSignatures(block: Block, checkTransactionSignatures: Boolean, rideV6Activated: Boolean): Unit = { + checkBlockSignature(block) + if (checkTransactionSignatures && block.transactionData.nonEmpty) { + checkTxSignatures(block.transactionData, rideV6Activated) + } } def checkBlockSignature(block: Block): Unit = - Task(block.signatureValid()).executeOn(sigverify).runAsyncAndForget + sigverify.execute(() => block.signatureValid()) }