Skip to content

Commit 899fd66

Browse files
NODE-2617 Return asset distribution route (#3893)
1 parent 261b759 commit 899fd66

File tree

9 files changed

+136
-15
lines changed

9 files changed

+136
-15
lines changed

node-it/src/test/scala/com/wavesplatform/it/api/AsyncHttpApi.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import com.wavesplatform.lang.v1.FunctionHeader
2323
import com.wavesplatform.lang.v1.compiler.Terms
2424
import com.wavesplatform.lang.v1.compiler.Terms.FUNCTION_CALL
2525
import com.wavesplatform.state.DataEntry.Format
26-
import com.wavesplatform.state.{AssetDistributionPage, DataEntry, EmptyDataEntry, LeaseBalance, Portfolio}
26+
import com.wavesplatform.state.{AssetDistribution, AssetDistributionPage, DataEntry, EmptyDataEntry, LeaseBalance, Portfolio}
2727
import com.wavesplatform.transaction.Asset.{IssuedAsset, Waves}
2828
import com.wavesplatform.transaction.assets.*
2929
import com.wavesplatform.transaction.assets.exchange.{Order, ExchangeTransaction as ExchangeTx}
@@ -338,6 +338,11 @@ object AsyncHttpApi extends Assertions {
338338
get(url, amountsAsStrings).as[AssetDistributionPage](amountsAsStrings)
339339
}
340340

341+
def assetDistribution(asset: String, amountsAsStrings: Boolean = false): Future[AssetDistribution] = {
342+
val req = s"/assets/$asset/distribution"
343+
get(req, amountsAsStrings).as[AssetDistribution](amountsAsStrings)
344+
}
345+
341346
def effectiveBalance(address: String, confirmations: Option[Int] = None, amountsAsStrings: Boolean = false): Future[Balance] = {
342347
val maybeConfirmations = confirmations.fold("")(a => s"/$a")
343348
get(s"/addresses/effectiveBalance/$address$maybeConfirmations", amountsAsStrings).as[Balance](amountsAsStrings)

node-it/src/test/scala/com/wavesplatform/it/api/SyncHttpApi.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import com.wavesplatform.it.Node
1414
import com.wavesplatform.it.sync.*
1515
import com.wavesplatform.lang.script.v1.ExprScript
1616
import com.wavesplatform.lang.v1.compiler.Terms
17-
import com.wavesplatform.state.{AssetDistributionPage, DataEntry}
17+
import com.wavesplatform.state.{AssetDistribution, AssetDistributionPage, DataEntry}
1818
import com.wavesplatform.transaction.assets.exchange.Order
1919
import com.wavesplatform.transaction.lease.{LeaseCancelTransaction, LeaseTransaction}
2020
import com.wavesplatform.transaction.smart.InvokeScriptTransaction
@@ -261,6 +261,9 @@ object SyncHttpApi extends Assertions with matchers.should.Matchers {
261261
): AssetDistributionPage =
262262
sync(async(n).assetDistributionAtHeight(asset, height, limit, maybeAfter, amountsAsStrings))
263263

264+
def assetDistribution(asset: String): AssetDistribution =
265+
sync(async(n).assetDistribution(asset))
266+
264267
def broadcastIssue(
265268
source: KeyPair,
266269
name: String,

node-it/src/test/scala/com/wavesplatform/it/asset/IssueReissueBurnAssetSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -250,15 +250,15 @@ class IssueReissueBurnAssetSuite extends BaseFreeSpec {
250250
val acc = createDapp(script(simpleReissuableAsset))
251251
val asset = issueValidated(acc, simpleReissuableAsset)
252252
invokeScript(acc, "transferAndBurn", assetId = asset, count = 100)
253-
val height1 = nodes.waitForHeightArise()
254-
sender.assetDistributionAtHeight(asset, height1 - 1, 10).items.map { case (a, v) => a.toString -> v } shouldBe Map(
253+
nodes.waitForHeightArise()
254+
sender.assetDistribution(asset).map { case (a, v) => a.toString -> v } shouldBe Map(
255255
miner.address -> 100L,
256256
acc.toAddress.toString -> (simpleReissuableAsset.quantity - 200)
257257
)
258258
reissue(acc, CallableMethod, asset, 400, reissuable = false)
259259
invokeScript(acc, "transferAndBurn", assetId = asset, count = 100)
260-
val height2 = nodes.waitForHeightArise()
261-
sender.assetDistributionAtHeight(asset, height2 - 1, 10).items.map { case (a, v) => a.toString -> v } shouldBe Map(
260+
nodes.waitForHeightArise()
261+
sender.assetDistribution(asset).map { case (a, v) => a.toString -> v } shouldBe Map(
262262
miner.address -> 200L,
263263
acc.toAddress.toString -> simpleReissuableAsset.quantity
264264
)

node-it/src/test/scala/com/wavesplatform/it/sync/AssetDistributionSuite.scala

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import com.wavesplatform.state.AssetDistributionPage
88
import com.wavesplatform.transaction.transfer.MassTransferTransaction
99
import org.scalatest.CancelAfterFailure
1010

11+
import scala.concurrent.duration.*
12+
1113
class AssetDistributionSuite extends BaseTransactionSuite with CancelAfterFailure {
1214

1315
lazy val node: Node = nodes.head
@@ -23,7 +25,7 @@ class AssetDistributionSuite extends BaseTransactionSuite with CancelAfterFailur
2325

2426
nodes.waitForHeightArise()
2527

26-
val issueTx = node.issue(issuer, "TestCoin", "no description", issueAmount, 8, false, issueFee, waitForTx = true).id
28+
val issueTx = node.issue(issuer, "TestCoin", "no description", issueAmount, 8, reissuable = false, issueFee, waitForTx = true).id
2729

2830
node.massTransfer(
2931
issuer,
@@ -47,6 +49,8 @@ class AssetDistributionSuite extends BaseTransactionSuite with CancelAfterFailur
4749

4850
val issuerAssetDis = assetDis.view.filterKeys(_ == issuer.toAddress).values
4951

52+
assetDis should be equals node.assetDistribution(issueTx)
53+
5054
issuerAssetDis.size shouldBe 1
5155
issuerAssetDis.head shouldBe (issueAmount - addresses.length * transferAmount)
5256

@@ -68,10 +72,34 @@ class AssetDistributionSuite extends BaseTransactionSuite with CancelAfterFailur
6872
)
6973
}
7074

75+
test("'Asset distribution' works properly") {
76+
val receivers = for (i <- 0 until 10) yield KeyPair(s"receiver#$i".getBytes("UTF-8"))
77+
78+
val issueTx = node.issue(issuer, "TestCoin#2", "no description", issueAmount, 8, reissuable = false, issueFee, waitForTx = true).id
79+
80+
node
81+
.massTransfer(
82+
issuer,
83+
receivers.map(rc => MassTransferTransaction.Transfer(rc.toAddress.toString, 10)).toList,
84+
minFee + minFee * receivers.length,
85+
assetId = Some(issueTx),
86+
waitForTx = true
87+
)
88+
89+
nodes.waitForHeightArise()
90+
91+
val distribution = node.assetDistribution(issueTx)
92+
93+
distribution.size shouldBe (receivers.size + 1)
94+
distribution(issuer.toAddress) shouldBe (issueAmount - 10 * receivers.length)
95+
96+
assert(receivers.forall(rc => distribution(rc.toAddress) == 10), "Distribution correct")
97+
}
98+
7199
test("Correct last page and entry count") {
72100
val receivers = for (i <- 0 until 50) yield KeyPair(s"receiver#$i".getBytes("UTF-8"))
73101

74-
val issueTx = node.issue(issuer, "TestCoin#2", "no description", issueAmount, 8, false, issueFee, waitForTx = true).id
102+
val issueTx = node.issue(issuer, "TestCoin#2", "no description", issueAmount, 8, reissuable = false, issueFee, waitForTx = true).id
75103

76104
node
77105
.massTransfer(
@@ -96,6 +124,24 @@ class AssetDistributionSuite extends BaseTransactionSuite with CancelAfterFailur
96124
assert(pages.map(_.items.size).sum == 51)
97125
}
98126

127+
test("Unlimited list") {
128+
val assetId = node.issue(issuer, "TestCoin#2", "no description", issueAmount, 8, reissuable = false, issueFee, waitForTx = true).id
129+
130+
val receivers = for (i <- 0 until 2000) yield KeyPair(s"receiver#$i".getBytes("UTF-8"))
131+
132+
val transfers = receivers.map { r => MassTransferTransaction.Transfer(r.toAddress.toString, 10L) }.toList
133+
134+
transfers.grouped(100).foreach { t =>
135+
node.massTransfer(issuer, t, minFee + t.length * minFee, assetId = Some(assetId))
136+
}
137+
138+
node.waitFor("empty utx")(_.utxSize, (_: Int) == 0, 1 second)
139+
nodes.waitForHeightArise()
140+
141+
val list = node.assetDistribution(assetId)
142+
list should have size 2001
143+
}
144+
99145
def distributionPages(asset: String, height: Int, limit: Int): List[AssetDistributionPage] = {
100146
def _load(acc: List[AssetDistributionPage], maybeAfter: Option[String]): List[AssetDistributionPage] = {
101147
val page = node.assetDistributionAtHeight(asset, height, limit, maybeAfter)

node/src/main/resources/swagger-ui/openapi.yaml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2440,6 +2440,38 @@ paths:
24402440
type: string
24412441
balance:
24422442
type: string
2443+
'/assets/{assetId}/distribution':
2444+
get:
2445+
tags:
2446+
- assets
2447+
summary: Asset balance distribution
2448+
description: Get asset balance distribution by addresses
2449+
operationId: getAssetDistributionOld
2450+
parameters:
2451+
- $ref: '#/components/parameters/assetId'
2452+
responses:
2453+
'200':
2454+
description: successful operation
2455+
content:
2456+
application/json:
2457+
schema:
2458+
type: object
2459+
additionalProperties:
2460+
type: integer
2461+
format: int64
2462+
description: map of assetId <-> balance
2463+
example:
2464+
2eEUvypDSivnzPiLrbYEW39SM8yMZ1aq4eJuiKfs4sEY: 15
2465+
3PPqZ623dAfbmxmnpTjwV6yD5GA5s3PJiUG: 25
2466+
application/json;large-significand-format=string:
2467+
schema:
2468+
type: object
2469+
additionalProperties:
2470+
type: string
2471+
description: map of assetId <-> balance
2472+
example:
2473+
2eEUvypDSivnzPiLrbYEW39SM8yMZ1aq4eJuiKfs4sEY: "15"
2474+
3PPqZ623dAfbmxmnpTjwV6yD5GA5s3PJiUG: "25"
24432475
'/assets/{assetId}/distribution/{height}/limit/{limit}':
24442476
get:
24452477
tags:

node/src/main/scala/com/wavesplatform/Application.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -374,9 +374,8 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
374374
else heavyRequestExecutor
375375
)
376376

377-
val routeTimeout = new RouteTimeout(
378-
FiniteDuration(settings.config.getDuration("akka.http.server.request-timeout").getSeconds, TimeUnit.SECONDS)
379-
)(heavyRequestScheduler)
377+
val serverRequestTimeout = FiniteDuration(settings.config.getDuration("akka.http.server.request-timeout").getSeconds, TimeUnit.SECONDS)
378+
val routeTimeout = new RouteTimeout(serverRequestTimeout)(heavyRequestScheduler)
380379

381380
val apiRoutes = Seq(
382381
new EthRpcRoute(blockchainUpdater, extensionContext.transactionsApi, time),
@@ -440,6 +439,7 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
440439
),
441440
AssetsApiRoute(
442441
settings.restAPISettings,
442+
serverRequestTimeout,
443443
wallet,
444444
transactionPublisher,
445445
blockchainUpdater,

node/src/main/scala/com/wavesplatform/api/http/assets/AssetsApiRoute.scala

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,13 @@ import com.wavesplatform.api.common.{CommonAccountsApi, CommonAssetsApi}
1818
import com.wavesplatform.api.http.*
1919
import com.wavesplatform.api.http.ApiError.*
2020
import com.wavesplatform.api.http.StreamSerializerUtils.*
21-
import com.wavesplatform.api.http.assets.AssetsApiRoute.{AssetDetails, AssetInfo, DistributionParams, assetDetailsSerializer}
21+
import com.wavesplatform.api.http.assets.AssetsApiRoute.{
22+
AssetDetails,
23+
AssetInfo,
24+
DistributionParams,
25+
assetDetailsSerializer,
26+
assetDistributionSerializer
27+
}
2228
import com.wavesplatform.common.state.ByteStr
2329
import com.wavesplatform.lang.ValidationError
2430
import com.wavesplatform.network.TransactionPublisher
@@ -39,9 +45,11 @@ import play.api.libs.json.*
3945

4046
import java.util.concurrent.*
4147
import scala.concurrent.Future
48+
import scala.concurrent.duration.FiniteDuration
4249

4350
case class AssetsApiRoute(
4451
settings: RestAPISettings,
52+
serverRequestTimeout: FiniteDuration,
4553
wallet: Wallet,
4654
transactionPublisher: TransactionPublisher,
4755
blockchain: Blockchain,
@@ -65,6 +73,8 @@ case class AssetsApiRoute(
6573
)
6674
)
6775

76+
private val assetDistRouteTimeout = new RouteTimeout(serverRequestTimeout)(distributionTaskScheduler)
77+
6878
override lazy val route: Route =
6979
pathPrefix("assets") {
7080
pathPrefix("balance" / AddrSegment) { address =>
@@ -97,9 +107,10 @@ case class AssetsApiRoute(
97107
(path("nft" / AddrSegment / "limit" / IntNumber) & parameter("after".as[String].?)) { (address, limit, maybeAfter) =>
98108
nft(address, limit, maybeAfter)
99109
} ~ pathPrefix(AssetId / "distribution") { assetId =>
100-
(path(IntNumber / "limit" / IntNumber) & parameter("after".?)) { (height, limit, maybeAfter) =>
101-
balanceDistributionAtHeight(assetId, height, limit, maybeAfter)
102-
}
110+
pathEndOrSingleSlash(balanceDistribution(assetId)) ~
111+
(path(IntNumber / "limit" / IntNumber) & parameter("after".?)) { (height, limit, maybeAfter) =>
112+
balanceDistributionAtHeight(assetId, height, limit, maybeAfter)
113+
}
103114
}
104115
}
105116
}
@@ -180,6 +191,16 @@ case class AssetsApiRoute(
180191
}
181192
}
182193

194+
def balanceDistribution(assetId: IssuedAsset): Route = {
195+
implicit val jsonStreamingSupport: ToResponseMarshaller[Source[(Address, Long), NotUsed]] =
196+
jacksonStreamMarshaller(prefix = "{", suffix = "}")(assetDistributionSerializer)
197+
198+
assetDistRouteTimeout.executeFromObservable(
199+
commonAssetsApi
200+
.assetDistribution(assetId, blockchain.height, None)
201+
)
202+
}
203+
183204
def balanceDistributionAtHeight(assetId: IssuedAsset, heightParam: Int, limitParam: Int, afterParam: Option[String]): Route =
184205
optionalHeaderValueByType(Accept) { accept =>
185206
val paramsEi: Either[ValidationError, DistributionParams] =
@@ -511,4 +532,16 @@ object AssetsApiRoute {
511532
gen.writeEndObject()
512533
}
513534
}
535+
536+
def assetDistributionSerializer(numbersAsString: Boolean): JsonSerializer[(Address, Long)] =
537+
(value: (Address, Long), gen: JsonGenerator, _: SerializerProvider) => {
538+
val (address, balance) = value
539+
if (numbersAsString) {
540+
gen.writeRaw(s"\"${address.toString}\":")
541+
gen.writeString(balance.toString)
542+
} else {
543+
gen.writeRaw(s"\"${address.toString}\":")
544+
gen.writeNumber(balance)
545+
}
546+
}
514547
}

node/src/test/scala/com/wavesplatform/api/http/CustomJsonMarshallerSpec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ class CustomJsonMarshallerSpec
115115

116116
private val assetsRoute = AssetsApiRoute(
117117
restAPISettings,
118+
60.seconds,
118119
testWallet,
119120
publisher,
120121
blockchain,

node/src/test/scala/com/wavesplatform/http/AssetsRouteSpec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class AssetsRouteSpec
5757
seal(
5858
AssetsApiRoute(
5959
restAPISettings,
60+
60.seconds,
6061
testWallet,
6162
DummyTransactionPublisher.accepting,
6263
d.blockchain,

0 commit comments

Comments
 (0)