Skip to content

Commit c8b9d04

Browse files
committed
Fix memory efficiency problem with statistics loader
1 parent 94d4314 commit c8b9d04

File tree

4 files changed

+13
-8
lines changed

4 files changed

+13
-8
lines changed

src/StatisticsTestLoader/DestinationKafka.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public DestinationKafka(params Uri[] servers)
2020
{
2121
var options = new KafkaOptions(servers) { Log = new ConsoleLogger() };
2222
_router = new BrokerRouter(options);
23-
_producer = new Producer(_router, maximumMessageBuffer: 5000, maximumAsyncRequests: 10) { BatchSize = 1000, BatchDelayTime = TimeSpan.FromSeconds(5) };
23+
_producer = new Producer(_router, maximumMessageBuffer: 5000, maximumAsyncRequests: 10) { BatchSize = 1000, BatchDelayTime = TimeSpan.FromSeconds(1) };
2424

2525
StatisticsTracker.OnStatisticsHeartbeat += StatisticsTracker_OnStatisticsHeartbeat;
2626
}
@@ -79,7 +79,7 @@ public async Task PostBatchAsync(List<KafkaRecord> records)
7979
{
8080
Console.WriteLine("POSTING: {0}, {1}", topicBatch.Count(), _producer.BufferCount);
8181
var result = await _producer
82-
.SendMessageAsync(topicBatch.Key, topicBatch.Select(x => new Message(x.Record, x.Key)), acks: 1, codec: MessageCodec.CodecGzip)
82+
.SendMessageAsync(topicBatch.Key, topicBatch.Select(x => x.Message), acks: 0, codec: MessageCodec.CodecGzip)
8383
.ConfigureAwait(false);
8484

8585
if (result.Any(x => x.Error != (int)ErrorResponseCode.NoError))
@@ -88,9 +88,7 @@ public async Task PostBatchAsync(List<KafkaRecord> records)
8888
}
8989
else
9090
{
91-
var offset = topicBatch.Max(x => x.Offset);
9291
await SetStoredOffsetAsync(topicBatch.Key, topicBatch.Max(x => x.Offset)).ConfigureAwait(false);
93-
//Console.WriteLine("Topic:{0} Count:{1} Offset:{2}", topicBatch.Key, topicBatch.Count(), offset);
9492
}
9593
}
9694
}
Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
1-
namespace StatisticsTestLoader
1+
using KafkaNet.Protocol;
2+
3+
namespace StatisticsTestLoader
24
{
35
public class KafkaRecord
46
{
57
public string Key { get; set; }
68
public string Topic { get; set; }
79
public long Offset { get; set; }
8-
public string Record { get; set; }
10+
public Message Message { get; private set; }
11+
12+
public void AddDocument(string document)
13+
{
14+
Message = new Message(Key, document);
15+
}
916
}
1017
}

src/StatisticsTestLoader/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ private static void StartPolling(IRecordSource source, DestinationKafka kafka)
4242
try
4343
{
4444
var index = kafka.GetStoredOffset(source.Topic);
45-
foreach (var batchEnumerable in source.Poll(index).Batch(1000))
45+
foreach (var batchEnumerable in source.Poll(index).Batch(500))
4646
{
4747
var batch = batchEnumerable.ToList();
4848

src/StatisticsTestLoader/SourcePropertyChanges.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ private void PopulateData(long index, BlockingCollection<KafkaRecord> data)
7777
{
7878
if (records.ContainsKey(change.Key))
7979
{
80-
change.Record = records[change.Key].ToString();
80+
change.AddDocument(records[change.Key].ToString());
8181
data.Add(change);
8282
}
8383
}

0 commit comments

Comments
 (0)