Skip to content

Exporter speedup #3993

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: version-1.5.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 88 additions & 138 deletions node/src/main/scala/com/wavesplatform/Exporter.scala
Original file line number Diff line number Diff line change
@@ -1,23 +1,30 @@
package com.wavesplatform

import com.google.common.collect.AbstractIterator
import com.google.common.primitives.Ints
import com.wavesplatform.block.Block
import com.google.common.primitives.{Bytes, Ints}
import com.google.protobuf.{ByteString, CodedInputStream}
import com.wavesplatform.block.{Block, BlockHeader}
import com.wavesplatform.block.serialization.{BlockSerializer, mkTxsCountBytes, writeTransactionData}
import com.wavesplatform.database.protobuf.BlockMeta
import com.wavesplatform.database.{KeyTag, RDB, createBlock, readBlockMeta, readTransaction}
import com.wavesplatform.database.{Caches, KeyTag, RDB, createBlock, readBlockMeta, readTransaction}
import com.wavesplatform.events.BlockchainUpdateTriggers
import com.wavesplatform.features.{BlockchainFeature, BlockchainFeatures}
import com.wavesplatform.history.StorageFactory
import com.wavesplatform.metrics.Metrics
import com.wavesplatform.protobuf.ByteStringExt
import com.wavesplatform.protobuf.block.PBBlocks
import com.wavesplatform.protobuf.block.PBBlocks.protobuf
import com.wavesplatform.protobuf.block.{PBBlock, PBBlocks}
import com.wavesplatform.protobuf.transaction.{PBTransactions, SignedTransaction}
import com.wavesplatform.protobuf.utils.PBUtils
import com.wavesplatform.state.Height
import com.wavesplatform.transaction.Transaction
import com.wavesplatform.transaction.{Transaction, TransactionParsers}
import com.wavesplatform.utils.*
import kamon.Kamon
import org.rocksdb.{ColumnFamilyHandle, ReadOptions, RocksDB}
import scopt.OParser

import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStream}
import java.nio.ByteBuffer
import scala.annotation.tailrec
import scala.concurrent.Await
import scala.concurrent.duration.*
Expand All @@ -26,20 +33,10 @@ import scala.util.Using.Releasable
import scala.util.{Failure, Success, Try, Using}

object Exporter extends ScorexLogging {
private[wavesplatform] object Formats {
val Binary = "BINARY"
val Protobuf = "PROTOBUF"

def list: Seq[String] = Seq(Binary, Protobuf)
def default: String = Binary

def isSupported(f: String): Boolean = list.contains(f.toUpperCase)
}

// noinspection ScalaStyle
def main(args: Array[String]): Unit = {
OParser.parse(commandParser, args, ExporterOptions()).foreach {
case ExporterOptions(configFile, blocksOutputFileNamePrefix, snapshotsOutputFileNamePrefix, exportHeight, format) =>
case ExporterOptions(configFile, blocksOutputFileNamePrefix, snapshotsOutputFileNamePrefix, exportHeight) =>
val settings = Application.loadApplicationConfig(configFile)

Using.resources(
Expand Down Expand Up @@ -76,22 +73,51 @@ object Exporter extends ScorexLogging {
var exportedSnapshotsBytes = 0L
val start = System.currentTimeMillis()

new BlockSnapshotIterator(rdb, height, exportSnapshots).asScala.foreach { case (h, block, txSnapshots) =>
val txCount = block.transactionData.length
if (exportSnapshots && txCount != txSnapshots.length)
throw new RuntimeException(
s"${txSnapshots.length} snapshot(s) don't match $txCount transaction(s) on height $h, data is corrupted"
val txIterator = new DataIterator[ByteString | SignedTransaction](
rdb.db, rdb.txHandle.handle, KeyTag.NthTransactionInfoAtHeight.prefixBytes, _.slice(2, 6),
height => bytes => readTx(CodedInputStream.newInstance(bytes))
)

var counter = 0

val blockMetaIterator: DataIterator[BlockMeta] =
new DataIterator[BlockMeta](
rdb.db,
rdb.db.getDefaultColumnFamily,
KeyTag.BlockInfoAtHeight.prefixBytes,
_.takeRight(Ints.BYTES),
_ => readBlockMeta
)

while (blockMetaIterator.hasNext) {
val meta = blockMetaIterator.next()._2
val txCount = meta.transactionCount
val txs = txIterator.asScala.take(txCount).toSeq
val signedHeader = Caches.toSignedHeader(meta)
val blockBytes = if (signedHeader.header.version >= Block.ProtoBlockVersion)
PBUtils.encodeDeterministic(new PBBlock(
Some(protobuf(signedHeader.header)),
ByteString.copyFrom(signedHeader.signature.arr),
txs.map(_._2 match {
case s: SignedTransaction => s
case bs: ByteString => PBTransactions.protobuf(TransactionParsers.parseBytes(bs.toByteArray).get)
}).toSeq
))
else {
Bytes.concat(
BlockSerializer.mkPrefixBytes(signedHeader.header),
mkTxsDataBytes(signedHeader.header, txs.map(_._2.asInstanceOf[ByteString].toByteArray).toSeq),
BlockSerializer.mkSuffixBytes(signedHeader.header, signedHeader.signature)
)
exportedBlocksBytes += IO.exportBlock(blocksStream, Some(block), format == Formats.Binary)
snapshotsStream.foreach { output =>
exportedSnapshotsBytes += IO.exportBlockTxSnapshots(output, txSnapshots)
}
blocksStream.write(Ints.toByteArray(blockBytes.length))
blocksStream.write(blockBytes)
exportedBlocksBytes += blockBytes.length

if (h % (height / 10) == 0) {
log.info(
s"$h blocks exported, ${humanReadableSize(exportedBlocksBytes)} written for blocks${snapshotsLogInfo(exportSnapshots, exportedSnapshotsBytes)}"
)
if ((counter + txCount) / 1_000_000 - (counter / 1_000_000) > 0) {
log.info(f"Exported ${counter + txCount}%,d transactions, written $exportedBlocksBytes%,d bytes")
}
counter += txCount
}
val duration = System.currentTimeMillis() - start
log
Expand All @@ -107,96 +133,6 @@ object Exporter extends ScorexLogging {
}
}

private class BlockSnapshotIterator(rdb: RDB, targetHeight: Int, exportSnapshots: Boolean)
extends AbstractIterator[(Int, Block, Seq[Array[Byte]])] {
var nextTxEntry: Option[(Int, Transaction)] = None
var nextSnapshotEntry: Option[(Int, Array[Byte])] = None

val blockMetaIterator: DataIterator[BlockMeta] =
new DataIterator[BlockMeta](
rdb.db,
rdb.db.getDefaultColumnFamily,
KeyTag.BlockInfoAtHeight.prefixBytes,
_.takeRight(Ints.BYTES),
_ => readBlockMeta
)
val txIterator: DataIterator[Transaction] = {
val prefixBytes = KeyTag.NthTransactionInfoAtHeight.prefixBytes
new DataIterator(
rdb.db,
rdb.txHandle.handle,
prefixBytes,
_.slice(prefixBytes.length, prefixBytes.length + Ints.BYTES),
h => readTransaction(Height(h))(_)._2
)
}
val snapshotIterator: DataIterator[Array[Byte]] = {
val prefixBytes = KeyTag.NthTransactionStateSnapshotAtHeight.prefixBytes
new DataIterator(
rdb.db,
rdb.txSnapshotHandle.handle,
prefixBytes,
_.slice(prefixBytes.length, prefixBytes.length + Ints.BYTES),
_ => identity
)
}

@tailrec
private def loadTxData[A](acc: Seq[A], height: Int, iterator: DataIterator[A], updateNextEntryF: (Int, A) => Unit): Seq[A] = {
if (iterator.hasNext) {
val (h, txData) = iterator.next()
if (h == height) {
loadTxData(txData +: acc, height, iterator, updateNextEntryF)
} else {
updateNextEntryF(h, txData)
acc.reverse
}
} else acc.reverse
}

@tailrec
override final def computeNext(): (Int, Block, Seq[Array[Byte]]) = {
if (blockMetaIterator.hasNext) {
val (h, meta) = blockMetaIterator.next()
if (h <= targetHeight) {
val txs = nextTxEntry match {
case Some((txHeight, tx)) if txHeight == h =>
nextTxEntry = None
loadTxData[Transaction](Seq(tx), h, txIterator, (h, tx) => nextTxEntry = Some(h -> tx))
case Some(_) => Seq.empty
case _ => loadTxData[Transaction](Seq.empty, h, txIterator, (h, tx) => nextTxEntry = Some(h -> tx))
}
val snapshots = if (exportSnapshots) {
nextSnapshotEntry match {
case Some((snapshotHeight, txSnapshot)) if snapshotHeight == h =>
nextSnapshotEntry = None
loadTxData[Array[Byte]](Seq(txSnapshot), h, snapshotIterator, (h, sn) => nextSnapshotEntry = Some(h -> sn))
case Some(_) => Seq.empty
case _ => loadTxData[Array[Byte]](Seq.empty, h, snapshotIterator, (h, sn) => nextSnapshotEntry = Some(h -> sn))
}
} else Seq.empty
createBlock(PBBlocks.vanilla(meta.getHeader), meta.signature.toByteStr, txs).toOption
.map(block => (h, block, snapshots)) match {
case Some(r) => r
case None => computeNext()
}
} else {
closeResources()
endOfData()
}
} else {
closeResources()
endOfData()
}
}

def closeResources(): Unit = {
txIterator.closeResources()
snapshotIterator.closeResources()
blockMetaIterator.closeResources()
}
}

private class DataIterator[A](
db: RocksDB,
cfHandle: ColumnFamilyHandle,
Expand Down Expand Up @@ -269,20 +205,13 @@ object Exporter extends ScorexLogging {

fullSize + Ints.BYTES
}

def writeString(stream: OutputStream, str: String): Int = {
val bytes = str.utf8Bytes
stream.write(bytes)
bytes.length
}
}

private final case class ExporterOptions(
configFileName: Option[File] = None,
blocksOutputFileNamePrefix: String = "blockchain",
snapshotsFileNamePrefix: Option[String] = None,
exportHeight: Option[Int] = None,
format: String = Formats.Binary
exportHeight: Option[Int] = None
)

private lazy val commandParser = {
Expand All @@ -307,18 +236,6 @@ object Exporter extends ScorexLogging {
.text("Export to height")
.action((h, c) => c.copy(exportHeight = Some(h)))
.validate(h => if (h > 0) success else failure("Export height must be > 0")),
opt[String]('f', "format")
.hidden()
.text("Output file format")
.valueName(s"<${Formats.list.mkString("|")}> (default is ${Formats.default})")
.action { (f, c) =>
log.warn("Export file format option is deprecated and will be removed eventually")
c.copy(format = f)
}
.validate {
case f if Formats.isSupported(f.toUpperCase) => success
case f => failure(s"Unsupported format: $f")
},
opt[Int]('h', "height")
.text("Export to height")
.action((h, c) => c.copy(exportHeight = Some(h)))
Expand All @@ -342,4 +259,37 @@ object Exporter extends ScorexLogging {
if (exportSnapshots) {
s", ${humanReadableSize(exportedSnapshotsBytes)} for snapshots"
} else ""

@tailrec
private final def readTx(in: CodedInputStream): ByteString | SignedTransaction =
in.readTag() match {
case 0 => throw new IllegalArgumentException("no tx found")
case 42 => SignedTransaction(SignedTransaction.Transaction.EthereumTransaction(in.readBytes()))
case 18 => scalapb.LiteParser.readMessage[SignedTransaction](in)
case 10 => in.readBytes()
case 24 =>
in.readEnum()
readTx(in)
case 32 =>
in.readInt64()
readTx(in)
case tag =>
throw new IllegalArgumentException("unexpected field")
}

def mkTxsDataBytes(header: BlockHeader, transactions: Seq[Array[Byte]]): Array[Byte] = {
val transactionsDataBytes = writeTransactionData(header.version, transactions)
Bytes.concat(
Ints.toByteArray(transactionsDataBytes.length),
transactionsDataBytes
)
}

def writeTransactionData(version: Byte, txsBytes: Seq[Array[Byte]]): Array[Byte] = {
val txsBytesSize = txsBytes.map(_.length + Ints.BYTES).sum
val txsBuf = ByteBuffer.allocate(txsBytesSize)
txsBytes.foreach(tx => txsBuf.putInt(tx.length).put(tx))

Bytes.concat(mkTxsCountBytes(version, txsBytes.size), txsBuf.array())
}
}
Loading
Loading