diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index b1d622e22c8a3..a909c98f413b1 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -63,11 +63,8 @@ jobs: node-version: [22.x] # Don't forget to update build-native-release python-version: [3.11] - transpile-worker-threads: [false, true] fail-fast: false - env: - CUBEJS_TRANSPILATION_WORKER_THREADS: ${{ matrix.transpile-worker-threads }} steps: - id: get-tag-out run: echo "$OUT" @@ -121,16 +118,16 @@ jobs: - name: Lerna test run: yarn lerna run --concurrency 1 --stream --no-prefix unit - name: Fix lcov paths - if: (matrix.node-version == '22.x' && matrix.transpile-worker-threads == true) + if: (matrix.node-version == '22.x') run: | ./.github/actions/codecov-fix.sh - name: Combine all fixed LCOV files - if: (matrix.node-version == '22.x' && matrix.transpile-worker-threads == true) + if: (matrix.node-version == '22.x') run: | echo "" > ./combined-unit.lcov find ./packages -type f -name lcov.fixed.info -exec cat {} + >> ./combined-unit.lcov || true - name: Upload coverage artifact - if: (matrix.node-version == '22.x' && matrix.transpile-worker-threads == true) + if: (matrix.node-version == '22.x') uses: actions/upload-artifact@v4 with: name: coverage-unit diff --git a/docs/pages/product/configuration/reference/environment-variables.mdx b/docs/pages/product/configuration/reference/environment-variables.mdx index 24e79dbaa158d..89800a341c3ac 100644 --- a/docs/pages/product/configuration/reference/environment-variables.mdx +++ b/docs/pages/product/configuration/reference/environment-variables.mdx @@ -1318,21 +1318,6 @@ learn more. | --------------- | ---------------------- | --------------------- | | A valid number | 86400 | 86400 | -## `CUBEJS_TRANSPILATION_WORKER_THREADS` - -If `true`, optimizes data model compilation by running critical parts of the -code in worker threads. - -| Possible Values | Default in Development | Default in Production | -| --------------- | ---------------------- | --------------------- | -| `true`, `false` | `false` | `false` | - - - -See [this issue](https://github.com/cube-js/cube/issues/9285) for details. - - - ## `CUBEJS_WEB_SOCKETS` If `true`, then use WebSocket for data fetching. diff --git a/packages/cubejs-backend-shared/src/env.ts b/packages/cubejs-backend-shared/src/env.ts index 94bbdea86ad3f..d5f2850041313 100644 --- a/packages/cubejs-backend-shared/src/env.ts +++ b/packages/cubejs-backend-shared/src/env.ts @@ -223,16 +223,12 @@ const variables: Record any> = { nativeOrchestrator: () => get('CUBEJS_TESSERACT_ORCHESTRATOR') .default('true') .asBoolStrict(), - transpilationWorkerThreads: () => get('CUBEJS_TRANSPILATION_WORKER_THREADS') - .default('false') - .asBoolStrict(), allowNonStrictDateRangeMatching: () => get('CUBEJS_PRE_AGGREGATIONS_ALLOW_NON_STRICT_DATE_RANGE_MATCH') .default('true') .asBoolStrict(), transpilationWorkerThreadsCount: () => get('CUBEJS_TRANSPILATION_WORKER_THREADS_COUNT') .default('0') .asInt(), - // This one takes precedence over CUBEJS_TRANSPILATION_WORKER_THREADS transpilationNative: () => get('CUBEJS_TRANSPILATION_NATIVE') .default('false') .asBoolStrict(), diff --git a/packages/cubejs-schema-compiler/src/compiler/DataSchemaCompiler.js b/packages/cubejs-schema-compiler/src/compiler/DataSchemaCompiler.js index c198fa4b728d8..47973e4fa391e 100644 --- a/packages/cubejs-schema-compiler/src/compiler/DataSchemaCompiler.js +++ b/packages/cubejs-schema-compiler/src/compiler/DataSchemaCompiler.js @@ -4,9 +4,6 @@ import fs from 'fs'; import os from 'os'; import path from 'path'; import syntaxCheck from 'syntax-error'; -import { parse } from '@babel/parser'; -import babelGenerator from '@babel/generator'; -import babelTraverse from '@babel/traverse'; import R from 'ramda'; import workerpool from 'workerpool'; @@ -114,16 +111,14 @@ export class DataSchemaCompiler { const errorsReport = new ErrorReporter(null, [], this.errorReport); this.errorsReport = errorsReport; - const transpilationWorkerThreads = getEnv('transpilationWorkerThreads'); const transpilationNative = getEnv('transpilationNative'); const transpilationNativeThreadsCount = getThreadsCount(); const { compilerId } = this; - if (!transpilationNative && transpilationWorkerThreads) { - const wc = getEnv('transpilationWorkerThreadsCount'); + if (!transpilationNative) { this.workerPool = workerpool.pool( path.join(__dirname, 'transpilers/transpiler_worker'), - wc > 0 ? { maxWorkers: wc } : undefined, + { maxWorkers: transpilationNativeThreadsCount }, ); } @@ -132,31 +127,45 @@ export class DataSchemaCompiler { * @returns {Promise<*>} */ const transpile = async (stage) => { - let cubeNames; - let cubeSymbols; - let transpilerNames; - let results; - - if (transpilationNative || transpilationWorkerThreads) { - cubeNames = Object.keys(this.cubeDictionary.byId); - // We need only cubes and all its member names for transpiling. - // Cubes doesn't change during transpiling, but are changed during compilation phase, - // so we can prepare them once for every phase. - // Communication between main and worker threads uses - // The structured clone algorithm (@see https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm) - // which doesn't allow passing any function objects, so we need to sanitize the symbols. - // Communication with native backend also involves deserialization. - cubeSymbols = Object.fromEntries( - Object.entries(this.cubeSymbols.symbols) - .map( - ([key, value]) => [key, Object.fromEntries( - Object.keys(value).map((k) => [k, true]), - )], - ), - ); + const cubeNames = Object.keys(this.cubeDictionary.byId); + // We need only cubes and all its member names for transpiling. + // Cubes doesn't change during transpiling, but are changed during compilation phase, + // so we can prepare them once for every phase. + // Communication between main and worker threads uses + // The structured clone algorithm (@see https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm) + // which doesn't allow passing any function objects, so we need to sanitize the symbols. + // Communication with native backend also involves deserialization. + const cubeSymbols = Object.fromEntries( + Object.entries(this.cubeSymbols.symbols) + .map( + ([key, value]) => [key, Object.fromEntries( + Object.keys(value).map((k) => [k, true]), + )], + ), + ); - // Transpilers are the same for all files within phase. - transpilerNames = this.transpilers.map(t => t.constructor.name); + // Transpilers are the same for all files within phase. + const transpilerNames = this.transpilers.map(t => t.constructor.name); + + const nonJsFilesTasks = toCompile.filter(file => !file.fileName.endsWith('.js')) + .map(f => this.transpileFile(f, errorsReport, { transpilerNames, compilerId })); + + const jsFiles = toCompile.filter(file => file.fileName.endsWith('.js')); + let JsFilesTasks = []; + let jsChunks = []; + + if (jsFiles.length > 0) { + if (jsFiles.length < transpilationNativeThreadsCount * transpilationNativeThreadsCount) { + jsChunks = [jsFiles]; + } else { + const baseSize = Math.floor(jsFiles.length / transpilationNativeThreadsCount); + for (let i = 0; i < transpilationNativeThreadsCount; i++) { + // For the last part, we take the remaining files so we don't lose the extra ones. + const start = i * baseSize; + const end = (i === transpilationNativeThreadsCount - 1) ? jsFiles.length : start + baseSize; + jsChunks.push(jsFiles.slice(start, end)); + } + } } if (transpilationNative) { @@ -168,36 +177,13 @@ export class DataSchemaCompiler { await this.transpileJsFile(dummyFile, errorsReport, { cubeNames, cubeSymbols, transpilerNames, contextSymbols: CONTEXT_SYMBOLS, compilerId, stage }); - const nonJsFilesTasks = toCompile.filter(file => !file.fileName.endsWith('.js')) - .map(f => this.transpileFile(f, errorsReport, { transpilerNames, compilerId })); - - const jsFiles = toCompile.filter(file => file.fileName.endsWith('.js')); - let JsFilesTasks = []; - - if (jsFiles.length > 0) { - let jsChunks; - if (jsFiles.length < transpilationNativeThreadsCount * transpilationNativeThreadsCount) { - jsChunks = [jsFiles]; - } else { - const baseSize = Math.floor(jsFiles.length / transpilationNativeThreadsCount); - jsChunks = []; - for (let i = 0; i < transpilationNativeThreadsCount; i++) { - // For the last part, we take the remaining files so we don't lose the extra ones. - const start = i * baseSize; - const end = (i === transpilationNativeThreadsCount - 1) ? jsFiles.length : start + baseSize; - jsChunks.push(jsFiles.slice(start, end)); - } - } - JsFilesTasks = jsChunks.map(chunk => this.transpileJsFilesBulk(chunk, errorsReport, { transpilerNames, compilerId })); - } - - results = (await Promise.all([...nonJsFilesTasks, ...JsFilesTasks])).flat(); - } else if (transpilationWorkerThreads) { - results = await Promise.all(toCompile.map(f => this.transpileFile(f, errorsReport, { cubeNames, cubeSymbols, transpilerNames }))); + JsFilesTasks = jsChunks.map(chunk => this.transpileJsFilesBulk(chunk, errorsReport, { transpilerNames, compilerId })); } else { - results = await Promise.all(toCompile.map(f => this.transpileFile(f, errorsReport, {}))); + JsFilesTasks = jsChunks.map(chunk => this.transpileJsFilesBulk(chunk, errorsReport, { cubeNames, cubeSymbols, transpilerNames, compilerId, stage })); } + const results = (await Promise.all([...nonJsFilesTasks, ...JsFilesTasks])).flat(); + return results.filter(f => !!f); }; @@ -225,7 +211,7 @@ export class DataSchemaCompiler { errorsReport, { cubeNames: [], cubeSymbols: {}, transpilerNames: [], contextSymbols: {}, compilerId: this.compilerId, stage: 0 } ); - } else if (transpilationWorkerThreads && this.workerPool) { + } else if (this.workerPool) { this.workerPool.terminate(); } @@ -271,42 +257,62 @@ export class DataSchemaCompiler { } } - /** - * Right now it is used only for transpilation in native, - * so no checks for transpilation type inside this method - */ async transpileJsFilesBulk(files, errorsReport, { cubeNames, cubeSymbols, contextSymbols, transpilerNames, compilerId, stage }) { - // for bulk processing this data may be optimized even more by passing transpilerNames, compilerId only once for a bulk - // but this requires more complex logic to be implemented in the native side. - // And comparing to the file content sizes, a few bytes of JSON data is not a big deal here - const reqDataArr = files.map(file => ({ - fileName: file.fileName, - fileContent: file.content, - transpilers: transpilerNames, - compilerId, - ...(cubeNames && { - metaData: { - cubeNames, - cubeSymbols, - contextSymbols, - stage - }, - }), - })); - const res = await transpileJs(reqDataArr); - - return files.map((file, index) => { - errorsReport.inFile(file); - if (!res[index]) { // This should not happen in theory but just to be safe - errorsReport.error(`No transpilation result received for the file ${file.fileName}.`); - return undefined; - } - errorsReport.addErrors(res[index].errors); - errorsReport.addWarnings(res[index].warnings); - errorsReport.exitFile(); + const transpilationNative = getEnv('transpilationNative'); - return { ...file, content: res[index].code }; - }); + if (transpilationNative) { + // for bulk processing this data may be optimized even more by passing transpilerNames, compilerId only once for a bulk + // but this requires more complex logic to be implemented in the native side. + // And comparing to the file content sizes, a few bytes of JSON data is not a big deal here + const reqDataArr = files.map(file => ({ + fileName: file.fileName, + fileContent: file.content, + transpilers: transpilerNames, + compilerId, + ...(cubeNames && { + metaData: { + cubeNames, + cubeSymbols, + contextSymbols, + stage + }, + }), + })); + const res = await transpileJs(reqDataArr); + + return files.map((file, index) => { + errorsReport.inFile(file); + if (!res[index]) { // This should not happen in theory but just to be safe + errorsReport.error(`No transpilation result received for the file ${file.fileName}.`); + return undefined; + } + errorsReport.addErrors(res[index].errors); + errorsReport.addWarnings(res[index].warnings); + errorsReport.exitFile(); + + return { ...file, content: res[index].code }; + }); + } else { + const request = { + files, + transpilers: transpilerNames, + cubeNames, + cubeSymbols, + }; + + const res = await this.workerPool.exec('transpile', [request]); + errorsReport.addErrors(res.errors); + errorsReport.addWarnings(res.warnings); + + return files.map((file, index) => { + if (!res.content[index] && res.content[index] !== '') { // This should not happen in theory but just to be safe + errorsReport.error(`No transpilation result received for the file ${file.fileName}.`); + return undefined; + } + + return { ...file, content: res.content[index] }; + }); + } } async transpileJsFile(file, errorsReport, { cubeNames, cubeSymbols, contextSymbols, transpilerNames, compilerId, stage }) { @@ -334,10 +340,9 @@ export class DataSchemaCompiler { errorsReport.exitFile(); return { ...file, content: res[0].code }; - } else if (getEnv('transpilationWorkerThreads')) { + } else { const data = { - fileName: file.fileName, - content: file.content, + files: [file], transpilers: transpilerNames, cubeNames, cubeSymbols, @@ -347,25 +352,7 @@ export class DataSchemaCompiler { errorsReport.addErrors(res.errors); errorsReport.addWarnings(res.warnings); - return { ...file, content: res.content }; - } else { - const ast = parse( - file.content, - { - sourceFilename: file.fileName, - sourceType: 'module', - plugins: ['objectRestSpread'], - }, - ); - - errorsReport.inFile(file); - this.transpilers.forEach((t) => { - babelTraverse(ast, t.traverseObject(errorsReport)); - }); - errorsReport.exitFile(); - - const content = babelGenerator(ast, {}, file.content).code; - return { ...file, content }; + return { ...file, content: res.content[0] }; } } catch (e) { if (e.toString().indexOf('SyntaxError') !== -1) { diff --git a/packages/cubejs-schema-compiler/src/compiler/transpilers/transpiler_worker.ts b/packages/cubejs-schema-compiler/src/compiler/transpilers/transpiler_worker.ts index 93c1c142744f0..2200926bcb854 100644 --- a/packages/cubejs-schema-compiler/src/compiler/transpilers/transpiler_worker.ts +++ b/packages/cubejs-schema-compiler/src/compiler/transpilers/transpiler_worker.ts @@ -12,8 +12,10 @@ import { LightweightSymbolResolver } from './LightweightSymbolResolver'; import { LightweightNodeCubeDictionary } from './LightweightNodeCubeDictionary'; type TransferContent = { - fileName: string; - content: string; + files: { + fileName: string; + content: string; + }[]; transpilers: string[]; cubeNames: string[]; cubeSymbols: Record>; @@ -34,26 +36,28 @@ const transpile = (data: TransferContent) => { cubeDictionary.setCubeNames(data.cubeNames); cubeSymbols.setSymbols(data.cubeSymbols); - const ast = parse( - data.content, - { - sourceFilename: data.fileName, - sourceType: 'module', - plugins: ['objectRestSpread'] - }, - ); + const content = data.files.map((file) => { + const ast = parse( + file.content, + { + sourceFilename: file.fileName, + sourceType: 'module', + plugins: ['objectRestSpread'] + }, + ); - errorsReport.inFile(data); - data.transpilers.forEach(transpilerName => { - if (transpilers[transpilerName]) { - babelTraverse(ast, transpilers[transpilerName].traverseObject(errorsReport)); - } else { - throw new Error(`Transpiler ${transpilerName} not supported`); - } - }); - errorsReport.exitFile(); + errorsReport.inFile(file); + data.transpilers.forEach(transpilerName => { + if (transpilers[transpilerName]) { + babelTraverse(ast, transpilers[transpilerName].traverseObject(errorsReport)); + } else { + throw new Error(`Transpiler ${transpilerName} not supported`); + } + }); + errorsReport.exitFile(); - const content = babelGenerator(ast, {}, data.content).code; + return babelGenerator(ast, {}, file.content).code; + }); return { content, diff --git a/packages/cubejs-schema-compiler/test/integration/postgres/dataschema-compiler.test.ts b/packages/cubejs-schema-compiler/test/integration/postgres/dataschema-compiler.test.ts index faf68af2a3008..3c33a04673ab1 100644 --- a/packages/cubejs-schema-compiler/test/integration/postgres/dataschema-compiler.test.ts +++ b/packages/cubejs-schema-compiler/test/integration/postgres/dataschema-compiler.test.ts @@ -186,7 +186,7 @@ describe('DataSchemaCompiler', () => { }) `; - it('Should compile 200 schemas in less than 2500ms * 10', async () => { + it('Should compile 200 schemas in less than 5000ms * 10', async () => { const repeats = 200; const compilerWith = prepareJsCompiler(schema, { allowJsDuplicatePropsInSchema: false }); @@ -198,7 +198,7 @@ describe('DataSchemaCompiler', () => { const end = new Date().getTime(); const time = end - start; - expect(time).toBeLessThan(2500 * 10); + expect(time).toBeLessThan(5000 * 10); }); }); });