Skip to content

Commit c9e5409

Browse files
committed
commits reading cache - save intermediate results into a local file (handy when reading a repo with a multi-year history)
1 parent 28ea575 commit c9e5409

File tree

5 files changed

+149
-29
lines changed

5 files changed

+149
-29
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
.idea
22
node_modules
33
coverage
4+
.cache

.prettierignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
dist/**/*
2-
coverage/**/*
2+
coverage/**/*
3+
.cache/**/*

src/cache/resumable-processor.ts

+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import fs from "fs/promises";
2+
import path from "path";
3+
4+
interface ProgressCache<R> {
5+
completedItems: { [key: number]: R };
6+
lastProcessedIndex: number;
7+
}
8+
9+
export class ResumableProcessor {
10+
private cachePath: string;
11+
private snapshotInterval: number;
12+
13+
constructor(cachePath = ".cache", snapshotInterval = 100) {
14+
this.cachePath = cachePath;
15+
this.snapshotInterval = snapshotInterval;
16+
}
17+
18+
private async ensureCacheDirectory(): Promise<void> {
19+
await fs.mkdir(this.cachePath, { recursive: true });
20+
}
21+
22+
private getCachePath(jobId: string): string {
23+
const sanitizedJobId = jobId.replace(/\//g, "-");
24+
return path.join(this.cachePath, `${sanitizedJobId}.json`);
25+
}
26+
27+
private async loadCache<R>(jobId: string): Promise<ProgressCache<R>> {
28+
try {
29+
const data = await fs.readFile(this.getCachePath(jobId), "utf-8");
30+
return JSON.parse(data);
31+
} catch {
32+
return { completedItems: {}, lastProcessedIndex: -1 };
33+
}
34+
}
35+
36+
private async saveCache<R>(
37+
jobId: string,
38+
cache: ProgressCache<R>
39+
): Promise<void> {
40+
await fs.writeFile(
41+
this.getCachePath(jobId),
42+
JSON.stringify(cache, null, 2)
43+
);
44+
}
45+
46+
// eslint-disable-next-line max-params
47+
async process<T, R>(
48+
jobId: string,
49+
items: T[],
50+
processFn: (item: T) => Promise<R>,
51+
notifyFn: (result: R) => void
52+
): Promise<R[]> {
53+
await this.ensureCacheDirectory();
54+
const cache = await this.loadCache<R>(jobId);
55+
const results: R[] = new Array(items.length);
56+
57+
// Restore completed items from cache
58+
for (const [index, value] of Object.entries(cache.completedItems)) {
59+
const idx = parseInt(index);
60+
if (idx < items.length) {
61+
results[idx] = value;
62+
notifyFn(value);
63+
}
64+
}
65+
66+
// Process remaining items
67+
for (let i = cache.lastProcessedIndex + 1; i < items.length; i++) {
68+
results[i] = await processFn(items[i]);
69+
cache.completedItems[i] = results[i];
70+
cache.lastProcessedIndex = i;
71+
72+
// Save snapshot at intervals
73+
if ((i + 1) % this.snapshotInterval === 0) {
74+
await this.saveCache(jobId, cache);
75+
}
76+
}
77+
78+
// Final save
79+
await this.saveCache(jobId, cache);
80+
return results;
81+
}
82+
83+
async clearCache(jobId: string): Promise<void> {
84+
try {
85+
await fs.unlink(this.getCachePath(jobId));
86+
} catch {
87+
// Ignore if file doesn't exist
88+
}
89+
}
90+
}

src/git-reader/git-repository.ts

+49-27
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import fs from "fs";
22
import git from "isomorphic-git";
33
import { Readable } from "stream";
44

5+
import { ResumableProcessor } from "../cache/resumable-processor.js";
56
import { time, timeLog } from "../index.js";
67
import { ChangedFile, Commit, ExpandedCommit } from "../interfaces.js";
78

@@ -10,6 +11,8 @@ interface GitReadOptions {
1011
}
1112

1213
export class GitRepository {
14+
private previousCommit: Commit = null;
15+
1316
constructor(private repoPath: string) {}
1417

1518
public async getListOfCommits(): Promise<Commit[]> {
@@ -18,39 +21,58 @@ export class GitRepository {
1821
public async getListOfCommitsWithChangedFiles(
1922
options: GitReadOptions = {}
2023
): Promise<ExpandedCommit[]> {
21-
const results: ExpandedCommit[] = [];
2224
const commits = await this.getListOfCommits();
23-
let previousCommit;
2425
time("getListOfCommitsWithChangedFiles");
25-
for (const c of commits) {
26-
if (!previousCommit) {
27-
previousCommit = c;
28-
continue;
29-
}
30-
time(`getFilesDiff-${previousCommit.oid}-${c.oid}`);
31-
const changedFiles: ChangedFile[] = await this.getFilesDiff(
32-
previousCommit.oid,
33-
c.oid
34-
);
35-
timeLog(`getFilesDiff-${previousCommit.oid}-${c.oid}`);
36-
37-
const result = {
38-
oid: c.oid,
39-
commit: previousCommit,
40-
changedFiles,
41-
};
42-
43-
results.push(result);
44-
45-
if (options.stream) {
46-
options.stream.push(result);
47-
}
4826

49-
previousCommit = c;
50-
}
27+
const taskProcessor = new ResumableProcessor(".cache", 100);
28+
const results: ExpandedCommit[] = await taskProcessor.process(
29+
this.repoPath,
30+
commits,
31+
this.getFilesDiffWithCursor.bind(this, options),
32+
this.updateGitReadStream.bind(this, options)
33+
);
34+
5135
timeLog("getListOfCommitsWithChangedFiles");
5236
return results;
5337
}
38+
private async getFilesDiffWithCursor(
39+
options: GitReadOptions = {},
40+
cursor: Commit
41+
): Promise<ExpandedCommit> {
42+
if (!this.previousCommit) {
43+
this.previousCommit = cursor;
44+
return null;
45+
}
46+
const taskID = `getFilesDiff-${this.previousCommit.oid}-${cursor.oid}`;
47+
time(taskID);
48+
const changedFiles: ChangedFile[] = await this.getFilesDiff(
49+
this.previousCommit.oid,
50+
cursor.oid
51+
);
52+
timeLog(taskID);
53+
54+
const result = {
55+
oid: cursor.oid,
56+
commit: this.previousCommit,
57+
changedFiles,
58+
};
59+
60+
this.updateGitReadStream(options, result);
61+
62+
this.previousCommit = cursor;
63+
64+
return result;
65+
}
66+
private updateGitReadStream(
67+
options: GitReadOptions = {},
68+
result: ExpandedCommit
69+
) {
70+
if (options.stream) {
71+
if (result !== null) {
72+
options.stream.push(result);
73+
}
74+
}
75+
}
5476
private async getFilesDiff(
5577
prevOID: string,
5678
nextOID: string

src/index.ts

+7-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ async function main() {
107107

108108
let commitsCounter = 0;
109109
commitsStream.on("data", (commit: ExpandedCommit) => {
110-
debug("Commit", commit);
110+
log("Commit", commit);
111111

112112
commitsCounter += 1;
113113
intermediateAggregateMonthly.addCommit(commit);
@@ -116,9 +116,15 @@ async function main() {
116116
quarterlyDashboard.updateData(intermediateAggregateQuarterly.getData());
117117
summaryDashboard.setCurrentProgress(commitsCounter, commit);
118118
});
119+
commitsStream.on("error", (err) => {
120+
log("error reading commits", { err });
121+
});
119122
commitsStream.on("end", () => {
120123
log("done reading commits", {});
121124
});
125+
commitsStream.on("close", () => {
126+
log("stream closed", {});
127+
});
122128

123129
// number of commits by author:
124130
const commits = await repo.getListOfCommits();

0 commit comments

Comments
 (0)