Skip to content

feat(schema-compiler): Bulk JS files transpilation in worker threads #9644

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,8 @@ jobs:
node-version: [22.x]
# Don't forget to update build-native-release
python-version: [3.11]
transpile-worker-threads: [false, true]
fail-fast: false

env:
CUBEJS_TRANSPILATION_WORKER_THREADS: ${{ matrix.transpile-worker-threads }}
steps:
- id: get-tag-out
run: echo "$OUT"
Expand Down Expand Up @@ -121,16 +118,16 @@ jobs:
- name: Lerna test
run: yarn lerna run --concurrency 1 --stream --no-prefix unit
- name: Fix lcov paths
if: (matrix.node-version == '22.x' && matrix.transpile-worker-threads == true)
if: (matrix.node-version == '22.x')
run: |
./.github/actions/codecov-fix.sh
- name: Combine all fixed LCOV files
if: (matrix.node-version == '22.x' && matrix.transpile-worker-threads == true)
if: (matrix.node-version == '22.x')
run: |
echo "" > ./combined-unit.lcov
find ./packages -type f -name lcov.fixed.info -exec cat {} + >> ./combined-unit.lcov || true
- name: Upload coverage artifact
if: (matrix.node-version == '22.x' && matrix.transpile-worker-threads == true)
if: (matrix.node-version == '22.x')
uses: actions/upload-artifact@v4
with:
name: coverage-unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1318,21 +1318,6 @@ learn more.
| --------------- | ---------------------- | --------------------- |
| A valid number | 86400 | 86400 |

## `CUBEJS_TRANSPILATION_WORKER_THREADS`

If `true`, optimizes data model compilation by running critical parts of the
code in worker threads.

| Possible Values | Default in Development | Default in Production |
| --------------- | ---------------------- | --------------------- |
| `true`, `false` | `false` | `false` |

<ReferenceBox>

See [this issue](https://github.com/cube-js/cube/issues/9285) for details.

</ReferenceBox>

## `CUBEJS_WEB_SOCKETS`

If `true`, then use WebSocket for data fetching.
Expand Down
4 changes: 0 additions & 4 deletions packages/cubejs-backend-shared/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,16 +223,12 @@ const variables: Record<string, (...args: any) => any> = {
nativeOrchestrator: () => get('CUBEJS_TESSERACT_ORCHESTRATOR')
.default('true')
.asBoolStrict(),
transpilationWorkerThreads: () => get('CUBEJS_TRANSPILATION_WORKER_THREADS')
.default('false')
.asBoolStrict(),
allowNonStrictDateRangeMatching: () => get('CUBEJS_PRE_AGGREGATIONS_ALLOW_NON_STRICT_DATE_RANGE_MATCH')
.default('true')
.asBoolStrict(),
transpilationWorkerThreadsCount: () => get('CUBEJS_TRANSPILATION_WORKER_THREADS_COUNT')
.default('0')
.asInt(),
// This one takes precedence over CUBEJS_TRANSPILATION_WORKER_THREADS
transpilationNative: () => get('CUBEJS_TRANSPILATION_NATIVE')
.default('false')
.asBoolStrict(),
Expand Down
217 changes: 102 additions & 115 deletions packages/cubejs-schema-compiler/src/compiler/DataSchemaCompiler.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ import fs from 'fs';
import os from 'os';
import path from 'path';
import syntaxCheck from 'syntax-error';
import { parse } from '@babel/parser';
import babelGenerator from '@babel/generator';
import babelTraverse from '@babel/traverse';
import R from 'ramda';
import workerpool from 'workerpool';

Expand Down Expand Up @@ -114,16 +111,14 @@ export class DataSchemaCompiler {
const errorsReport = new ErrorReporter(null, [], this.errorReport);
this.errorsReport = errorsReport;

const transpilationWorkerThreads = getEnv('transpilationWorkerThreads');
const transpilationNative = getEnv('transpilationNative');
const transpilationNativeThreadsCount = getThreadsCount();
const { compilerId } = this;

if (!transpilationNative && transpilationWorkerThreads) {
const wc = getEnv('transpilationWorkerThreadsCount');
if (!transpilationNative) {
this.workerPool = workerpool.pool(
path.join(__dirname, 'transpilers/transpiler_worker'),
wc > 0 ? { maxWorkers: wc } : undefined,
{ maxWorkers: transpilationNativeThreadsCount },
);
}

Expand All @@ -132,31 +127,45 @@ export class DataSchemaCompiler {
* @returns {Promise<*>}
*/
const transpile = async (stage) => {
let cubeNames;
let cubeSymbols;
let transpilerNames;
let results;

if (transpilationNative || transpilationWorkerThreads) {
cubeNames = Object.keys(this.cubeDictionary.byId);
// We need only cubes and all its member names for transpiling.
// Cubes doesn't change during transpiling, but are changed during compilation phase,
// so we can prepare them once for every phase.
// Communication between main and worker threads uses
// The structured clone algorithm (@see https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm)
// which doesn't allow passing any function objects, so we need to sanitize the symbols.
// Communication with native backend also involves deserialization.
cubeSymbols = Object.fromEntries(
Object.entries(this.cubeSymbols.symbols)
.map(
([key, value]) => [key, Object.fromEntries(
Object.keys(value).map((k) => [k, true]),
)],
),
);
const cubeNames = Object.keys(this.cubeDictionary.byId);
// We need only cubes and all its member names for transpiling.
// Cubes doesn't change during transpiling, but are changed during compilation phase,
// so we can prepare them once for every phase.
// Communication between main and worker threads uses
// The structured clone algorithm (@see https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm)
// which doesn't allow passing any function objects, so we need to sanitize the symbols.
// Communication with native backend also involves deserialization.
const cubeSymbols = Object.fromEntries(
Object.entries(this.cubeSymbols.symbols)
.map(
([key, value]) => [key, Object.fromEntries(
Object.keys(value).map((k) => [k, true]),
)],
),
);

// Transpilers are the same for all files within phase.
transpilerNames = this.transpilers.map(t => t.constructor.name);
// Transpilers are the same for all files within phase.
const transpilerNames = this.transpilers.map(t => t.constructor.name);

const nonJsFilesTasks = toCompile.filter(file => !file.fileName.endsWith('.js'))
.map(f => this.transpileFile(f, errorsReport, { transpilerNames, compilerId }));

const jsFiles = toCompile.filter(file => file.fileName.endsWith('.js'));
let JsFilesTasks = [];
let jsChunks = [];

if (jsFiles.length > 0) {
if (jsFiles.length < transpilationNativeThreadsCount * transpilationNativeThreadsCount) {
jsChunks = [jsFiles];
} else {
const baseSize = Math.floor(jsFiles.length / transpilationNativeThreadsCount);
for (let i = 0; i < transpilationNativeThreadsCount; i++) {
// For the last part, we take the remaining files so we don't lose the extra ones.
const start = i * baseSize;
const end = (i === transpilationNativeThreadsCount - 1) ? jsFiles.length : start + baseSize;
jsChunks.push(jsFiles.slice(start, end));
}
}
}

if (transpilationNative) {
Expand All @@ -168,36 +177,13 @@ export class DataSchemaCompiler {

await this.transpileJsFile(dummyFile, errorsReport, { cubeNames, cubeSymbols, transpilerNames, contextSymbols: CONTEXT_SYMBOLS, compilerId, stage });

const nonJsFilesTasks = toCompile.filter(file => !file.fileName.endsWith('.js'))
.map(f => this.transpileFile(f, errorsReport, { transpilerNames, compilerId }));

const jsFiles = toCompile.filter(file => file.fileName.endsWith('.js'));
let JsFilesTasks = [];

if (jsFiles.length > 0) {
let jsChunks;
if (jsFiles.length < transpilationNativeThreadsCount * transpilationNativeThreadsCount) {
jsChunks = [jsFiles];
} else {
const baseSize = Math.floor(jsFiles.length / transpilationNativeThreadsCount);
jsChunks = [];
for (let i = 0; i < transpilationNativeThreadsCount; i++) {
// For the last part, we take the remaining files so we don't lose the extra ones.
const start = i * baseSize;
const end = (i === transpilationNativeThreadsCount - 1) ? jsFiles.length : start + baseSize;
jsChunks.push(jsFiles.slice(start, end));
}
}
JsFilesTasks = jsChunks.map(chunk => this.transpileJsFilesBulk(chunk, errorsReport, { transpilerNames, compilerId }));
}

results = (await Promise.all([...nonJsFilesTasks, ...JsFilesTasks])).flat();
} else if (transpilationWorkerThreads) {
results = await Promise.all(toCompile.map(f => this.transpileFile(f, errorsReport, { cubeNames, cubeSymbols, transpilerNames })));
JsFilesTasks = jsChunks.map(chunk => this.transpileJsFilesBulk(chunk, errorsReport, { transpilerNames, compilerId }));
} else {
results = await Promise.all(toCompile.map(f => this.transpileFile(f, errorsReport, {})));
JsFilesTasks = jsChunks.map(chunk => this.transpileJsFilesBulk(chunk, errorsReport, { cubeNames, cubeSymbols, transpilerNames, compilerId, stage }));
}

const results = (await Promise.all([...nonJsFilesTasks, ...JsFilesTasks])).flat();

return results.filter(f => !!f);
};

Expand Down Expand Up @@ -225,7 +211,7 @@ export class DataSchemaCompiler {
errorsReport,
{ cubeNames: [], cubeSymbols: {}, transpilerNames: [], contextSymbols: {}, compilerId: this.compilerId, stage: 0 }
);
} else if (transpilationWorkerThreads && this.workerPool) {
} else if (this.workerPool) {
this.workerPool.terminate();
}

Expand Down Expand Up @@ -271,42 +257,62 @@ export class DataSchemaCompiler {
}
}

/**
* Right now it is used only for transpilation in native,
* so no checks for transpilation type inside this method
*/
async transpileJsFilesBulk(files, errorsReport, { cubeNames, cubeSymbols, contextSymbols, transpilerNames, compilerId, stage }) {
// for bulk processing this data may be optimized even more by passing transpilerNames, compilerId only once for a bulk
// but this requires more complex logic to be implemented in the native side.
// And comparing to the file content sizes, a few bytes of JSON data is not a big deal here
const reqDataArr = files.map(file => ({
fileName: file.fileName,
fileContent: file.content,
transpilers: transpilerNames,
compilerId,
...(cubeNames && {
metaData: {
cubeNames,
cubeSymbols,
contextSymbols,
stage
},
}),
}));
const res = await transpileJs(reqDataArr);

return files.map((file, index) => {
errorsReport.inFile(file);
if (!res[index]) { // This should not happen in theory but just to be safe
errorsReport.error(`No transpilation result received for the file ${file.fileName}.`);
return undefined;
}
errorsReport.addErrors(res[index].errors);
errorsReport.addWarnings(res[index].warnings);
errorsReport.exitFile();
const transpilationNative = getEnv('transpilationNative');

return { ...file, content: res[index].code };
});
if (transpilationNative) {
// for bulk processing this data may be optimized even more by passing transpilerNames, compilerId only once for a bulk
// but this requires more complex logic to be implemented in the native side.
// And comparing to the file content sizes, a few bytes of JSON data is not a big deal here
const reqDataArr = files.map(file => ({
fileName: file.fileName,
fileContent: file.content,
transpilers: transpilerNames,
compilerId,
...(cubeNames && {
metaData: {
cubeNames,
cubeSymbols,
contextSymbols,
stage
},
}),
}));
const res = await transpileJs(reqDataArr);

return files.map((file, index) => {
errorsReport.inFile(file);
if (!res[index]) { // This should not happen in theory but just to be safe
errorsReport.error(`No transpilation result received for the file ${file.fileName}.`);
return undefined;
}
errorsReport.addErrors(res[index].errors);
errorsReport.addWarnings(res[index].warnings);
errorsReport.exitFile();

return { ...file, content: res[index].code };
});
} else {
const request = {
files,
transpilers: transpilerNames,
cubeNames,
cubeSymbols,
};

const res = await this.workerPool.exec('transpile', [request]);
errorsReport.addErrors(res.errors);
errorsReport.addWarnings(res.warnings);

return files.map((file, index) => {
if (!res.content[index] && res.content[index] !== '') { // This should not happen in theory but just to be safe
errorsReport.error(`No transpilation result received for the file ${file.fileName}.`);
return undefined;
}

return { ...file, content: res.content[index] };
});
}
}

async transpileJsFile(file, errorsReport, { cubeNames, cubeSymbols, contextSymbols, transpilerNames, compilerId, stage }) {
Expand Down Expand Up @@ -334,10 +340,9 @@ export class DataSchemaCompiler {
errorsReport.exitFile();

return { ...file, content: res[0].code };
} else if (getEnv('transpilationWorkerThreads')) {
} else {
const data = {
fileName: file.fileName,
content: file.content,
files: [file],
transpilers: transpilerNames,
cubeNames,
cubeSymbols,
Expand All @@ -347,25 +352,7 @@ export class DataSchemaCompiler {
errorsReport.addErrors(res.errors);
errorsReport.addWarnings(res.warnings);

return { ...file, content: res.content };
} else {
const ast = parse(
file.content,
{
sourceFilename: file.fileName,
sourceType: 'module',
plugins: ['objectRestSpread'],
},
);

errorsReport.inFile(file);
this.transpilers.forEach((t) => {
babelTraverse(ast, t.traverseObject(errorsReport));
});
errorsReport.exitFile();

const content = babelGenerator(ast, {}, file.content).code;
return { ...file, content };
return { ...file, content: res.content[0] };
}
} catch (e) {
if (e.toString().indexOf('SyntaxError') !== -1) {
Expand Down
Loading
Loading