Skip to content

Commit 389875f

Browse files
committed
truncate lines on resend
1 parent 0d2527e commit 389875f

File tree

1 file changed

+38
-39
lines changed

1 file changed

+38
-39
lines changed

analytics/src/main/java/com/segment/analytics/internal/FallbackAppender.java

Lines changed: 38 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@
1111
import java.io.OutputStream;
1212
import java.nio.channels.Channels;
1313
import java.nio.channels.FileChannel;
14+
import java.nio.channels.FileLock;
1415
import java.nio.charset.StandardCharsets;
1516
import java.nio.file.StandardOpenOption;
1617
import java.util.ArrayList;
17-
import java.util.Arrays;
1818
import java.util.Collections;
1919
import java.util.Date;
2020
import java.util.List;
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.locks.Lock;
2626
import java.util.concurrent.locks.ReentrantLock;
2727
import java.util.stream.Collectors;
28+
import org.apache.commons.io.FileSystem;
2829

2930
public class FallbackAppender {
3031

@@ -86,25 +87,18 @@ public void run() {
8687
}
8788

8889
List<Message> msgs;
89-
lock.lock();
9090
try {
91-
msgs = read();
91+
msgs = truncate(20); // XXX messageSize
9292
if (msgs.isEmpty()) {
9393
continue;
9494
}
95-
// FIXME now its reading all the msgs and waits until all is processed
96-
// it will be better to work with batch and truncate the file
97-
file.delete();
9895
} catch (IOException e) {
9996
// TODO Auto-generated catch block
10097
e.printStackTrace();
10198
lastMessage = System.currentTimeMillis();
10299
continue;
103-
} finally {
104-
lock.unlock();
105100
}
106101

107-
// FIXME batch
108102
while (!msgs.isEmpty()) {
109103
boolean canEnqueue = true;
110104
for (int i = msgs.size() - 1; canEnqueue && i >= 0; i--) {
@@ -113,16 +107,16 @@ public void run() {
113107
if (canEnqueue) {
114108
msgs.remove(i);
115109
System.err.println("reenqueued " + msg.messageId());
110+
} else {
111+
// slow down next iteration when http queue overflow
112+
try {
113+
Thread.sleep(1_000);
114+
} catch (InterruptedException e) {
115+
Thread.currentThread().interrupt();
116+
}
116117
}
117118
}
118-
try {
119-
Thread.sleep(1_000);
120-
} catch (InterruptedException e) {
121-
Thread.currentThread().interrupt();
122-
}
123119
}
124-
125-
lastMessage = System.currentTimeMillis();
126120
}
127121

128122
try {
@@ -161,38 +155,43 @@ public void run() {
161155
}
162156
}
163157

164-
List<Message> read() throws IOException {
165-
if (file.exists()) {
166-
try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ)) {
167-
fileChannel.lock(0, Long.MAX_VALUE, true);
168-
169-
final String[] lines = new String(
170-
Channels.newInputStream(fileChannel).readAllBytes(), StandardCharsets.UTF_8)
171-
.split(System.lineSeparator());
172-
return Arrays.stream(lines)
173-
.map(m -> fromJson(m))
174-
.filter(Objects::nonNull)
175-
.collect(Collectors.toList());
176-
}
177-
} else {
158+
List<Message> truncate(int numMessages) throws IOException {
159+
lock.lock();
160+
161+
if (!file.exists()) {
162+
lock.unlock();
178163
return Collections.emptyList();
179164
}
165+
166+
try (ReversedLinesFileReader reader = ReversedLinesFileReader.builder()
167+
.setPath(file.toPath())
168+
.setBufferSize(FileSystem.getCurrent().getBlockSize())
169+
.setCharset(StandardCharsets.UTF_8)
170+
.get()) {
171+
172+
return reader.readLines(numMessages).stream()
173+
.map(this::fromJson)
174+
.filter(Objects::nonNull)
175+
.collect(Collectors.toList());
176+
} finally {
177+
lock.unlock();
178+
}
180179
}
181180

181+
private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
182+
182183
private void write(List<Message> batch) {
183184
lock.lock();
184185
try (FileChannel fileChannel = FileChannel.open(
185-
file.toPath(), StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE)) {
186-
fileChannel.lock();
186+
file.toPath(), StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
187+
OutputStream os = Channels.newOutputStream(fileChannel);
188+
FileLock fileLock = fileChannel.lock(); ) {
187189

188-
final String lines = batch.stream()
189-
.map(this::toJson)
190-
.filter(Objects::nonNull)
191-
.collect(Collectors.joining(System.lineSeparator()));
190+
for (Message msg : batch) {
191+
os.write(toJson(msg).getBytes(StandardCharsets.UTF_8));
192+
os.write(NEW_LINE);
193+
}
192194

193-
OutputStream os = Channels.newOutputStream(fileChannel);
194-
os.write(lines.getBytes(StandardCharsets.UTF_8));
195-
os.write(System.lineSeparator().getBytes(StandardCharsets.UTF_8));
196195
fileChannel.force(true);
197196

198197
batch.clear();

0 commit comments

Comments
 (0)