diff --git a/README.md b/README.md index ba68e07..2ac2a07 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,11 @@ export GOOGLE_APPLICATION_CREDENTIALS="/home/user/Downloads/[FILE_NAME].json" More information: https://cloud.google.com/docs/authentication/getting-started -> Now modify `schema/CONSTANTS.js` to point to your own projectId, dataset and table names. +Set the following env variables: +- `PROJECT_ID`: the BigQuery project ID to use +- `DATASET_NAME`: the BigQuery dataset to use +- `TRANSACTION_TABLE_NAME`: the BigQuery table to write to for transactions +- `LEDGER_TABLE_NAME`: the BigQuery table to write to for ledgers # Create schema @@ -73,3 +77,11 @@ You can invoke the script from a node enabled environment by setting these envir - `NODE`: the rippled node (`wss://...`) to connect to, default: **wss://s2.ripple.com** - `LEDGER`: the ledger index to start fetching transactions from, default: **32570** +- `PROJECT_ID`: the BigQuery project ID to use +- `DATASET_NAME`: the BigQuery dataset to use +- `MODE`: `transactions` will insert transaction data, `ledgers` will insert ledger data +- `TABLE_NAME`: the BigQuery table to write to + +## systemd integration + +Use the `exampleService.servie` file as a template for writing a Linux `systemd` service. diff --git a/applySchema.js b/applySchema.js index d17e38a..1a29b3e 100644 --- a/applySchema.js +++ b/applySchema.js @@ -1,55 +1,58 @@ const { - PROJECT_ID, - DATASET_NAME, - TRANSACTION_TABLE_NAME, - LEDGER_TABLE_NAME, transactionSchema, ledgerSchema, } = require('./schema') const BigQuery = require('@google-cloud/bigquery') -const bigquery = new BigQuery({ projectId: PROJECT_ID }) - -const createTable = async (name, schema) => { - return new Promise((resolve, reject) => { - bigquery.dataset(DATASET_NAME).createTable(name, { schema: schema }) - .then(r => { - console.log(` -- BigQuery Table ${r[0].id} created`) - resolve() - }) - .catch(e => { - reject(e) - }) - }) -} -const deleteTable = async (name) => { - return new Promise((resolve, reject) => { - bigquery.dataset(DATASET_NAME).table(name).delete().then(() => { - console.log(` -- BigQuery Table ${name} removed`) - resolve() - }).catch(e => { - if (e.errors[0].reason === 'notFound') { - resolve() - } else{ - reject(e) - } - }) - }) +async function createTable(name, schema, bigquery, dataset) { + const result = await bigquery.dataset(dataset).createTable(name, { schema }) + console.log(` -- BigQuery Table ${result[0].id} created`) } -const recreateTable = async (name, schema) => { - console.log(`Dropping and creating table [ ${name} ] in dataset [ ${DATASET_NAME} ] @ Google BigQuery`) +async function deleteTable(name, bigquery, dataset) { + try { + await bigquery.dataset(dataset).table(name).delete() + console.log(` -- BigQuery Table ${name} removed`) + } catch(e) { + if (e.errors[0].reason === 'notFound') { + console.log(` Table ${name} doesn't yet exist - nothing to delete`) + } else { + throw e + } + } +} - await deleteTable(name) - await createTable(name, schema) +async function recreateTable(name, schema, bigquery, dataset) { + console.log(`Dropping and creating table [ ${name} ] in dataset [ ${dataset} ] @ Google BigQuery`) + await deleteTable(name, bigquery, dataset) + await createTable(name, schema, bigquery, dataset) } -(async () => { +async function main() { + const dbDetails = { + projectID: process.env.PROJECT_ID?.trim(), + datasetName: process.env.DATASET_NAME?.trim(), + txTableName: process.env.TRANSACTION_TABLE_NAME?.trim(), + ledgerTableName: process.env.LEDGER_TABLE_NAME?.trim(), + } + + const hasInvalidValue = Object.values(dbDetails).find((value) => { + return typeof value !== 'string' || value.length === 0 + }) !== undefined + if (hasInvalidValue) { + console.error('Invalid db args') + process.exit(1) + } + + const bigquery = new BigQuery({ projectId: dbDetails.projectID }) + await Promise.all([ - recreateTable(TRANSACTION_TABLE_NAME, transactionSchema), - recreateTable(LEDGER_TABLE_NAME, ledgerSchema), + recreateTable(dbDetails.txTableName, transactionSchema, bigquery, dbDetails.datasetName), + recreateTable(dbDetails.ledgerTableName, ledgerSchema, bigquery, dbDetails.datasetName), ]) - console.log(`Done\n`) -})() + process.exit(0) +} + +main().then() diff --git a/exampleService.service b/exampleService.service new file mode 100644 index 0000000..0ccbe9d --- /dev/null +++ b/exampleService.service @@ -0,0 +1,13 @@ +[Unit] +Description=Transaction ETL + +[Service] +Environment=GOOGLE_APPLICATION_CREDENTIALS=/home/myuser/secret.json NODE=wss://s1.ripple.com:51233 LEDGER=75443456 PROJECT_ID=my-project DATASET_NAME=my_dataset TABLE_NAME=transactions MODE=transactions +ExecStart=/usr/bin/node /path/to/this/repo/index.js +TimeoutStopSec=60 +KillSignal=SIGTERM +RestartSec=4320 +Restart=on-failure + +[Install] +WantedBy=multi-user.target diff --git a/index.js b/index.js index fbc812c..58c2794 100644 --- a/index.js +++ b/index.js @@ -1,255 +1,120 @@ const { - PROJECT_ID, - DATASET_NAME, - TRANSACTION_TABLE_NAME, - transactionSchema, - CurrencyFields, -} = require('./schema') + TaskRunner, + ledgerInfoProcess, + ledgerGetLastDBLedger, + transactionInfoProcess, + transactionGetLastDBLedger, +} = require('./tasks') -const XrplClient = require('xrpl-client').XrplClient const BigQuery = require('@google-cloud/bigquery') -const bigquery = new BigQuery({ projectId: PROJECT_ID }) +const XrplClient = require('xrpl-client').XrplClient -const XRPLNodeUrl = typeof process.env.NODE === 'undefined' ? 'wss://s2.ripple.com' : process.env.NODE.trim() -const StartLedger = typeof process.env.LEDGER === 'undefined' ? 32570 : parseInt(process.env.LEDGER) +const XrplRequestOptions = { + timeoutSeconds: 10, +} +const Task = new TaskRunner() + +function onRequestStop() { + console.log(`\nGracefully shutting down\n -- Wait for remaining BigQuery inserts and XRPL Connection close...`); + Task.stop() +} + +function getDBDetails() { + const dbDetails = { + projectID: process.env.PROJECT_ID?.trim(), + datasetName: process.env.DATASET_NAME?.trim(), + tableName: process.env.TABLE_NAME?.trim(), + } + const hasInvalidValue = Object.values(dbDetails).find((value) => { + return typeof value !== 'string' || value.length === 0 + }) !== undefined + if (hasInvalidValue) { + return null + } + return dbDetails +} + +function getModeDetails() { + const mode = process.env.MODE?.trim() + if (mode === 'ledgers') { + return { + processFunc: ledgerInfoProcess, + lastDBLedgerFunc: ledgerGetLastDBLedger, + message: 'Fetch XRPL Ledger Info into Google BigQuery', + } + } + if (mode === 'transactions') { + return { + processFunc: transactionInfoProcess, + lastDBLedgerFunc: transactionGetLastDBLedger, + message: 'Fetch XRPL transactions into Google BigQuery', + } + } + return null +} + +async function getInitialLastLedger(dbFunc, bigquery, dbDetails) { + // Determine start ledger. lastLedger represents the last ledger that we + // _have_ stored. So lastLedger + 1 is the next ledger we need. The + // commandline input is supposed to represent the _next_ ledger, so we need + // to subtract one. + const lastDBLedger = await dbFunc({ bigquery, ...dbDetails }) + const cmdLineStartLedger = typeof process.env.LEDGER === 'undefined' ? 32570 : parseInt(process.env.LEDGER) + if (lastDBLedger >= cmdLineStartLedger) { + console.log(`BigQuery History at ledger [ ${lastDBLedger} ], > StartLedger.\n Forcing StartLedger at:\n >>> ${lastDBLedger+1}\n\n`) + return lastDBLedger + } + console.log(`Starting at ledger ${cmdLineStartLedger}`) + return Math.max(cmdLineStartLedger - 1, 1) +} + +async function main() { + const dbDetails = getDBDetails() + if (dbDetails == null) { + console.error('One or more BigQuery parameters are invalid or were omitted') + process.exit(1) + } + const bigquery = new BigQuery({ projectId: dbDetails.projectID }) -console.log('Fetch XRPL transactions into Google BigQuery') + const modeDetails = getModeDetails() + if (modeDetails == null) { + console.error('Invalid mode') + process.exit(1) + } + console.log(modeDetails.message) -const Client = new XrplClient(XRPLNodeUrl) - -Client.ready().then(Connection => { - let Stopped = false - let LastLedger = 0 + let lastLedger = await getInitialLastLedger(modeDetails.lastDBLedgerFunc, bigquery, dbDetails) + // Setup client + const xrplNodeUrl = typeof process.env.NODE === 'undefined' ? 'wss://s2.ripple.com' : process.env.NODE.trim() + const client = new XrplClient(xrplNodeUrl) + await client.ready() console.log('Connected to the XRPL') - let retryTimeout = 60 * 60 * 12 - - const fetchLedgerTransactions = (ledger_index) => { - return new Promise((resolve, reject) => { - return Connection.send({ - command: 'ledger', - ledger_index: parseInt(ledger_index), - transactions: true, - expand: false - }, 10).then(Result => { - if (typeof Result.ledger.transactions === 'undefined' || Result.ledger.transactions.length === 0) { - // Do nothing - resolve({ ledger_index: ledger_index, transactions: [] }) - return - } else { - if (Result.ledger.transactions.length > 200) { - // Lots of data. Per TX - console.log(`<<< MANY TXS at ledger ${ledger_index}: [[ ${Result.ledger.transactions.length} ]], processing per-tx...`) - let transactions = Result.ledger.transactions.map(Tx => { - return Connection.send({ - command: 'tx', - transaction: Tx - }, 10) - }) - Promise.all(transactions).then(r => { - let allTxs = r.filter(t => { - return typeof t.error === 'undefined' && typeof t.meta !== 'undefined' && typeof t.meta.TransactionResult !== 'undefined' - }) - console.log('>>> ALL TXS FETCHED:', allTxs.length) - resolve({ ledger_index: ledger_index, transactions: allTxs.map(t => { - return Object.assign(t, { - metaData: t.meta - }) - }) }) - return - }) - } else { - // Fetch at once. - resolve(new Promise((resolve, reject) => { - Connection.send({ - command: 'ledger', - ledger_index: parseInt(ledger_index), - transactions: true, - expand: true - }, 10).then(Result => { - resolve({ ledger_index: ledger_index, transactions: Result.ledger.transactions }) - return - }).catch(reject) - })) - } - } - return - }).catch(reject) - }) + const clientSender = async (args) => { + return await client.send(args, XrplRequestOptions) } - const run = (ledger_index) => { - return fetchLedgerTransactions(ledger_index).then(Result => { - let txCount = Result.transactions.length - console.log(`${txCount > 0 ? 'Transactions in' : ' '.repeat(15)} ${Result.ledger_index}: `, txCount > 0 ? txCount : '-') - - if (txCount > 0) { - let Transactions = Result.transactions.map(Tx => { - let _Tx = { - LedgerIndex: Result.ledger_index - } - // Auto mapping for 1:1 fields (non RECORD) - transactionSchema.forEach(SchemaNode => { - if (typeof Tx[SchemaNode.description] !== 'undefined' - && Tx[SchemaNode.description] !== null - && typeof Tx[SchemaNode.description] !== 'object' - && SchemaNode.description === SchemaNode.name - ) { - let Value = Tx[SchemaNode.description] - if (typeof Value === 'string' && typeof SchemaNode.type !== 'STRING') { - if (SchemaNode.type === 'INTEGER') { - Value = parseInt(Value) - } - if (SchemaNode.type === 'FLOAT') { - Value = parseFloat(Value) - } - } - Object.assign(_Tx, { - [SchemaNode.name]: Value - }) - } - if (SchemaNode.description.match(/^metaData\./) - && typeof Tx.metaData[SchemaNode.name] !== 'undefined' - && Tx.metaData[SchemaNode.name] !== null - && typeof Tx.metaData[SchemaNode.name] !== 'object' - && SchemaNode.name !== 'DeliveredAmount' - ) { - Object.assign(_Tx, { - [SchemaNode.name]: Tx.metaData[SchemaNode.name] - }) - } - }) - - if (typeof Tx.metaData.DeliveredAmount === 'undefined' && typeof Tx.metaData.delivered_amount !== 'undefined') { - Tx.metaData.DeliveredAmount = Tx.metaData.delivered_amount - } - if (typeof Tx.metaData.DeliveredAmount !== 'undefined') { - let DeliveredAmount = parseInt(Tx.metaData.DeliveredAmount) - if (!isNaN(DeliveredAmount)) { - Object.assign(_Tx, { - DeliveredAmount: DeliveredAmount - }) - } - } - - if (typeof Tx.Memos !== 'undefined') { - Object.assign(_Tx, { - Memos: Tx.Memos.map(m => { - let n = { Memo: {} } - if (typeof m.Memo !== 'undefined') { - if (typeof m.Memo.MemoData !== 'undefined') n.Memo.MemoData = m.Memo.MemoData - if (typeof m.Memo.MemoFormat !== 'undefined') n.Memo.MemoData = m.Memo.MemoFormat - if (typeof m.Memo.MemoType !== 'undefined') n.Memo.MemoData = m.Memo.MemoType - } - return n - }) - }) - } - - if (Tx.NFTokenOffers != null) { - _Tx.NFTokenOffers = Tx.NFTokenOffers - } - - if (Tx.metaData != null) { - _Tx.Metadata = JSON.stringify(Tx.metaData) - } - - CurrencyFields.forEach(CurrencyField => { - if (typeof Tx[CurrencyField] === 'string') { - Object.assign(_Tx, { - [CurrencyField + 'XRP']: parseInt(Tx[CurrencyField]) - }) - } - if (typeof Tx[CurrencyField] === 'object' && typeof Tx[CurrencyField].currency !== 'undefined') { - Object.assign(_Tx, { - [CurrencyField + 'DEX']: { - currency: Tx[CurrencyField].currency, - issuer: Tx[CurrencyField].issuer, - value: parseFloat(Tx[CurrencyField].value) - } - }) - } - }) - - // Special handling for timestamps - _Tx._InsertedAt = bigquery.timestamp(new Date()) - - return _Tx - }) - - // console.dir(Transactions[0], { depth: null }) - // process.exit(1) - - bigquery.dataset(DATASET_NAME).table(TRANSACTION_TABLE_NAME).insert(Transactions) - .then(r => { - console.log(`Inserted rows`, r) - LastLedger = Result.ledger_index - // process.exit(0) - }) - .catch(err => { - if (err && err.name === 'PartialFailureError') { - if (err.errors && err.errors.length > 0) { - console.log('Insert errors:') - err.errors.forEach(err => console.dir(err, { depth: null })) - process.exit(1) - } - } else { - console.error('ERROR:', err) - process.exit(1) - } - }) - } - - // retryTimeout = 0 - - if (Stopped) { - return - } - - return run(ledger_index + 1) - }).catch(e => { - console.log(e) - process.exit(1) - - // retryTimeout += 500 - // if (retryTimeout > 5000) retryTimeout = 5000 - console.log(`Oops... Retry in ${retryTimeout / 1000} sec.`) - setTimeout(() => { - return run(ledger_index) - }, retryTimeout * 1000) + // main loop + const task = async () => { + lastLedger = await modeDetails.processFunc({ + clientSender, + lastLedger, + bigquery, + dbDetails, }) } + await Task.start(task) - console.log(`Starting at ledger [ ${StartLedger} ], \n Checking last ledger in BigQuery...`) + // Cleanup + console.log('Disconnecting from ledger') + await client.close() + console.log('Disconnected') + console.log(`\nLast ledger: [ ${lastLedger} ]\n\nRun your next job with ENV: "LEDGER=${lastLedger+1}"\n\n`) - bigquery.query({ - query: `SELECT - COUNT(1) as TxCount, - MIN(LedgerIndex) as MinLedger, - MAX(LedgerIndex) as MaxLedger, - COUNT(DISTINCT LedgerIndex) as LedgersWithTxCount - FROM - ${PROJECT_ID}.${DATASET_NAME}.${TRANSACTION_TABLE_NAME}`, - useLegacySql: false, // Use standard SQL syntax for queries. - }).then(r => { - if (r[0][0].MaxLedger > StartLedger) { - console.log(`BigQuery History at ledger [ ${r[0][0].MaxLedger} ], > StartLedger.\n Forcing StartLedger at:\n >>> ${r[0][0].MaxLedger+1}\n\n`) - run(r[0][0].MaxLedger + 1) - } else{ - run(StartLedger) - } - }).catch(e => { - console.log('Google BigQuery Error', e) - process.exit(1) - }) + process.exit(Task.errored ? 1 : 0) +} - process.on('SIGINT', function() { - console.log(`\nGracefully shutting down from SIGINT (Ctrl+C)\n -- Wait for remaining BigQuery inserts and XRPL Connection close...`); - - Stopped = true - Connection.close() - if (LastLedger > 0) { - console.log(`\nLast ledger: [ ${LastLedger} ]\n\nRun your next job with ENV: "LEDGER=${LastLedger+1}"\n\n`) - } - }) -}) +process.on('SIGINT', onRequestStop) +process.on('SIGTERM', onRequestStop) + +main().then() diff --git a/ledgerInfo.js b/ledgerInfo.js deleted file mode 100644 index edc430a..0000000 --- a/ledgerInfo.js +++ /dev/null @@ -1,122 +0,0 @@ -const { - PROJECT_ID, - DATASET_NAME, - LEDGER_TABLE_NAME, -} = require('./schema') - -const XrplClient = require('xrpl-client').XrplClient -const BigQuery = require('@google-cloud/bigquery') -const bigquery = new BigQuery({ projectId: PROJECT_ID }) - -const XRPLNodeUrl = typeof process.env.NODE === 'undefined' ? 'wss://s2.ripple.com' : process.env.NODE.trim() -const StartLedger = typeof process.env.LEDGER === 'undefined' ? 32570 : parseInt(process.env.LEDGER) - -console.log('Fetch XRPL Ledger Info into Google BigQuery') - -const Client = new XrplClient(XRPLNodeUrl) - -Client.ready().then(Connection => { - let Stopped = false - let LastLedger = 0 - - console.log('Connected to the XRPL') - let retryTimeout = 60 * 60 * 12 - - const fetchLedger = (ledger_index) => { - return new Promise((resolve, reject) => { - return Connection.send({ - command: 'ledger', - ledger_index: parseInt(ledger_index), - transactions: false, - expand: false - }).then(Result => { - resolve(Result) - return - }).catch(reject) - }) - } - - const run = (ledger_index) => { - return fetchLedger(ledger_index).then(Result => { - console.log(`${Result.ledger_index}`) - // console.log(Result) - bigquery.dataset(DATASET_NAME).table(LEDGER_TABLE_NAME).insert([{ - LedgerIndex: parseInt(Result.ledger.ledger_index), - hash: Result.ledger.hash, - CloseTime: bigquery.timestamp(new Date(Date.parse(Result.ledger.close_time_human)).toISOString().replace('T', ' ').replace(/[^0-9]+$/, '')), - CloseTimeTimestamp: Result.ledger.close_time, - CloseTimeHuman: Result.ledger.close_time_human, - TotalCoins: parseInt(Result.ledger.totalCoins), - ParentHash: Result.ledger.parent_hash, - AccountHash: Result.ledger.account_hash, - TransactionHash: Result.ledger.transaction_hash, - _InsertedAt: bigquery.timestamp(new Date()), - }]) - .then(r => { - console.log(`Inserted rows`, r) - LastLedger = Result.ledger_index - // process.exit(0) - }) - .catch(err => { - if (err && err.name === 'PartialFailureError') { - if (err.errors && err.errors.length > 0) { - console.log('Insert errors:') - err.errors.forEach(err => console.dir(err, { depth: null })) - process.exit(1) - } - } else { - console.error('ERROR:', err) - process.exit(1) - } - }) - -// retryTimeout = 0 - - if (Stopped) { - return - } - - return run(ledger_index + 1) - }).catch(e => { - console.log(e) - process.exit(1) - - // retryTimeout += 500 -// if (retryTimeout > 5000) retryTimeout = 5000 - console.log(`Oops... Retry in ${retryTimeout / 1000} sec.`) - setTimeout(() => { - return run(ledger_index) - }, retryTimeout * 1000) - }) - } - - console.log(`Starting at ledger [ ${StartLedger} ], \n Checking last ledger in BigQuery...`) - - bigquery.query({ - query: `SELECT - MAX(LedgerIndex) as MaxLedger - FROM - ${PROJECT_ID}.${DATASET_NAME}.${LEDGER_TABLE_NAME}`, - useLegacySql: false, // Use standard SQL syntax for queries. - }).then(r => { - if (r[0][0].MaxLedger > StartLedger) { - console.log(`BigQuery History at ledger [ ${r[0][0].MaxLedger} ], > StartLedger.\n Forcing StartLedger at:\n >>> ${r[0][0].MaxLedger+1}\n\n`) - run(r[0][0].MaxLedger + 1) - } else{ - run(StartLedger) - } - }).catch(e => { - console.log('Google BigQuery Error', e) - process.exit(1) - }) - - process.on('SIGINT', function() { - console.log(`\nGracefully shutting down from SIGINT (Ctrl+C)\n -- Wait for remaining BigQuery inserts and XRPL Connection close...`); - - Stopped = true - Connection.close() - if (LastLedger > 0) { - console.log(`\nLast ledger: [ ${LastLedger} ]\n\nRun your next job with ENV: "LEDGER=${LastLedger+1}"\n\n`) - } - }) -}) diff --git a/package.json b/package.json index d65d3f9..28f53a2 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "fetch-xrpl-transactions", - "version": "0.9.0", + "version": "1.0.0", "description": "Fetch XRPL transactions and store in Google BigQuery", "main": "./index.js", "repository": { diff --git a/run.md b/run.md deleted file mode 100644 index 44b12cd..0000000 --- a/run.md +++ /dev/null @@ -1,5 +0,0 @@ -forever --spinSleepTime 4320000 --minUptime 4320000 index.js # 46251014 -forever --spinSleepTime 4320000 --minUptime 4320000 ledgerInfo.js # 46249654 - -# root 21505 0.0 2.9 977228 57888 pts/1 Sl+ Apr02 3:23 node /usr/local/bin/forever --spinSleepTime 4320000 --minUptime 4320000 index.js -# root 21525 0.0 2.8 977500 57112 pts/3 Sl+ Apr02 3:16 node /usr/local/bin/forever --spinSleepTime 4320000 --minUptime 4320000 ledgerInfo.js diff --git a/schema/CONSTANTS.js b/schema/CONSTANTS.js deleted file mode 100644 index 360ff31..0000000 --- a/schema/CONSTANTS.js +++ /dev/null @@ -1,12 +0,0 @@ -// Put your BQ info here -const PROJECT_ID = '' -const DATASET_NAME = '' -const TRANSACTION_TABLE_NAME = '' -const LEDGER_TABLE_NAME = '' - -module.exports = { - PROJECT_ID: PROJECT_ID, - DATASET_NAME: DATASET_NAME, - TRANSACTION_TABLE_NAME: TRANSACTION_TABLE_NAME, - LEDGER_TABLE_NAME: LEDGER_TABLE_NAME, -} diff --git a/schema/index.js b/schema/index.js index d34f684..b6701a8 100644 --- a/schema/index.js +++ b/schema/index.js @@ -1,9 +1,3 @@ -const { - PROJECT_ID, - DATASET_NAME, - TRANSACTION_TABLE_NAME, - LEDGER_TABLE_NAME, -} = require('./CONSTANTS') const { transactionSchema, CurrencyFields, @@ -11,10 +5,6 @@ const { const { ledgerSchema } = require('./ledgers') module.exports = { - PROJECT_ID: PROJECT_ID, - DATASET_NAME: DATASET_NAME, - TRANSACTION_TABLE_NAME: TRANSACTION_TABLE_NAME, - LEDGER_TABLE_NAME: LEDGER_TABLE_NAME, transactionSchema: transactionSchema, ledgerSchema: ledgerSchema, CurrencyFields: CurrencyFields, diff --git a/tasks/index.js b/tasks/index.js new file mode 100644 index 0000000..79b0ed9 --- /dev/null +++ b/tasks/index.js @@ -0,0 +1,17 @@ +const { TaskRunner } = require('./taskRunner') +const { + process: ledgerInfoProcess, + getLastDBLedger: ledgerGetLastDBLedger, +} = require('./ledgerInfo') +const { + process: transactionInfoProcess, + getLastDBLedger: transactionGetLastDBLedger, +} = require('./transactionInfo') + +module.exports = { + TaskRunner, + ledgerInfoProcess, + ledgerGetLastDBLedger, + transactionInfoProcess, + transactionGetLastDBLedger, +} diff --git a/tasks/ledgerInfo.js b/tasks/ledgerInfo.js new file mode 100644 index 0000000..d939f06 --- /dev/null +++ b/tasks/ledgerInfo.js @@ -0,0 +1,76 @@ +async function fetchLedger(clientSender, ledgerIndex) { + try { + return await clientSender({ + command: 'ledger', + ledger_index: parseInt(ledgerIndex), + transactions: false, + expand: false, + }) + } catch(e) { + console.error('Ledger fetching error', e) + throw e + } +} + +async function insertDBLedger(ledgerResult, bigquery, dbDetails) { + try { + await bigquery.dataset(dbDetails.datasetName).table(dbDetails.tableName).insert([{ + LedgerIndex: parseInt(ledgerResult.ledger.ledger_index), + hash: ledgerResult.ledger.hash, + CloseTime: bigquery.timestamp(new Date(Date.parse(ledgerResult.ledger.close_time_human)).toISOString().replace('T', ' ').replace(/[^0-9]+$/, '')), + CloseTimeTimestamp: ledgerResult.ledger.close_time, + CloseTimeHuman: ledgerResult.ledger.close_time_human, + TotalCoins: parseInt(ledgerResult.ledger.totalCoins), + ParentHash: ledgerResult.ledger.parent_hash, + AccountHash: ledgerResult.ledger.account_hash, + TransactionHash: ledgerResult.ledger.transaction_hash, + _InsertedAt: bigquery.timestamp(new Date()), + }]) + } catch(err) { + if (err && err.name === 'PartialFailureError') { + if (err.errors && err.errors.length > 0) { + console.error('Insert errors:') + err.errors.forEach(err => console.dir(err, { depth: null })) + throw err + } + } else { + console.error('ERROR:', err) + throw err + } + } +} + +async function process(args) { + const { + clientSender, + lastLedger, + bigquery, + dbDetails, + } = args + const ledgerResult = await fetchLedger(clientSender, lastLedger + 1) + await insertDBLedger(ledgerResult, bigquery, dbDetails) + const resultLastLedger = ledgerResult.ledger_index + console.log(`${resultLastLedger} inserted`) + return resultLastLedger +} + +async function getLastDBLedger(args) { + const { bigquery, projectID, datasetName, tableName } = args + let result + try { + result = await bigquery.query({ + query: `SELECT MAX(LedgerIndex) as MaxLedger + FROM ${projectID}.${datasetName}.${tableName}`, + useLegacySql: false, + }) + } catch(e) { + console.error('Google BigQuery Error', e) + throw e + } + return result[0][0].MaxLedger +} + +module.exports = { + process, + getLastDBLedger, +} diff --git a/tasks/taskRunner.js b/tasks/taskRunner.js new file mode 100644 index 0000000..65ea43d --- /dev/null +++ b/tasks/taskRunner.js @@ -0,0 +1,27 @@ +class TaskRunner { + shouldRun = true + errored = false + + constructor() { + } + + async start(task) { + while (this.shouldRun) { + try { + await task() + } catch(e) { + console.error('Error:', e) + this.errored = true + break + } + } + } + + stop() { + this.shouldRun = false + } +} + +module.exports = { + TaskRunner, +} diff --git a/tasks/transactionInfo.js b/tasks/transactionInfo.js new file mode 100644 index 0000000..b204970 --- /dev/null +++ b/tasks/transactionInfo.js @@ -0,0 +1,203 @@ +const { + transactionSchema, + CurrencyFields, +} = require('../schema') + +async function fetchLedgerTransactions(clientSender, ledgerIndex) { + const result = await clientSender({ + command: 'ledger', + ledger_index: ledgerIndex, + transactions: true, + expand: false, + }) + + if (typeof result.ledger.transactions === 'undefined' || result.ledger.transactions.length === 0) { + return { ledger_index: ledgerIndex, transactions: [] } + } + + if (result.ledger.transactions.length <= 200) { + const txResults = await clientSender({ + command: 'ledger', + ledger_index: ledgerIndex, + transactions: true, + expand: true, + }) + return { ledger_index: ledgerIndex, transactions: txResults.ledger.transactions } + } + + console.log(`<<< MANY TXS at ledger ${ledgerIndex}: [[ ${result.ledger.transactions.length} ]], processing per-tx...`) + const txPromises = result.ledger.transactions.map((tx) => { + return clientSender({ + command: 'tx', + transaction: tx, + }) + }) + const txResults = (await Promise.all(txPromises)).filter((tx) => { + return typeof tx.error === 'undefined' && typeof tx.meta !== 'undefined' && typeof tx.meta.TransactionResult !== 'undefined' + }).map((tx) => { + return Object.assign(tx, { + metaData: tx.meta, + }) + }) + console.log('>>> ALL TXS FETCHED:', txResults.length) + return { ledger_index: ledgerIndex, transactions: txResults } +} + +async function insertIntoDB(txs, bigquery, dbDetails) { + try { + await bigquery.dataset(dbDetails.datasetName).table(dbDetails.tableName).insert(txs) + } catch(err) { + if (err && err.name === 'PartialFailureError') { + if (err.errors && err.errors.length > 0) { + console.log('Insert errors:') + err.errors.forEach(err => console.dir(err, { depth: null })) + throw err + } + } else { + console.error('ERROR:', err) + throw err + } + } +} + +function formatTxForDB(tx) { + const _Tx = {} + + // Auto mapping for 1:1 fields (non RECORD) + transactionSchema.forEach(SchemaNode => { + if (typeof tx[SchemaNode.description] !== 'undefined' + && tx[SchemaNode.description] !== null + && typeof tx[SchemaNode.description] !== 'object' + && SchemaNode.description === SchemaNode.name + ) { + let Value = tx[SchemaNode.description] + if (typeof Value === 'string' && typeof SchemaNode.type !== 'STRING') { + if (SchemaNode.type === 'INTEGER') { + Value = parseInt(Value) + } + if (SchemaNode.type === 'FLOAT') { + Value = parseFloat(Value) + } + } + Object.assign(_Tx, { + [SchemaNode.name]: Value, + }) + } + if (SchemaNode.description.match(/^metaData\./) + && typeof tx.metaData[SchemaNode.name] !== 'undefined' + && tx.metaData[SchemaNode.name] !== null + && typeof tx.metaData[SchemaNode.name] !== 'object' + && SchemaNode.name !== 'DeliveredAmount' + ) { + Object.assign(_Tx, { + [SchemaNode.name]: tx.metaData[SchemaNode.name], + }) + } + }) + + if (typeof tx.metaData.DeliveredAmount === 'undefined' && typeof tx.metaData.delivered_amount !== 'undefined') { + tx.metaData.DeliveredAmount = tx.metaData.delivered_amount + } + if (typeof tx.metaData.DeliveredAmount !== 'undefined') { + const DeliveredAmount = parseInt(tx.metaData.DeliveredAmount) + if (!isNaN(DeliveredAmount)) { + Object.assign(_Tx, { + DeliveredAmount: DeliveredAmount + }) + } + } + + if (typeof tx.Memos !== 'undefined') { + Object.assign(_Tx, { + Memos: tx.Memos.map(m => { + const n = { Memo: {} } + if (typeof m.Memo !== 'undefined') { + if (typeof m.Memo.MemoData !== 'undefined') n.Memo.MemoData = m.Memo.MemoData + if (typeof m.Memo.MemoFormat !== 'undefined') n.Memo.MemoData = m.Memo.MemoFormat + if (typeof m.Memo.MemoType !== 'undefined') n.Memo.MemoData = m.Memo.MemoType + } + return n + }) + }) + } + + if (tx.NFTokenOffers != null) { + _Tx.NFTokenOffers = tx.NFTokenOffers + } + + if (tx.metaData != null) { + _Tx.Metadata = JSON.stringify(tx.metaData) + } + + CurrencyFields.forEach(CurrencyField => { + if (typeof tx[CurrencyField] === 'string') { + Object.assign(_Tx, { + [CurrencyField + 'XRP']: parseInt(tx[CurrencyField]), + }) + } + if (typeof tx[CurrencyField] === 'object' && typeof tx[CurrencyField].currency !== 'undefined') { + Object.assign(_Tx, { + [CurrencyField + 'DEX']: { + currency: tx[CurrencyField].currency, + issuer: tx[CurrencyField].issuer, + value: parseFloat(tx[CurrencyField].value), + } + }) + } + }) + + return _Tx +} + +async function process(args) { + const { + clientSender, + lastLedger, + bigquery, + dbDetails, + } = args + const ledgerResult = await fetchLedgerTransactions(clientSender, lastLedger + 1) + const txCount = ledgerResult.transactions.length + + if (txCount === 0) { + console.log(`${ledgerResult.ledger_index}: no transactions to insert`) + return ledgerResult.ledger_index + } + + const txs = ledgerResult.transactions.map((tx) => { + return Object.assign(formatTxForDB(tx), { + LedgerIndex: ledgerResult.ledger_index, + _InsertedAt: bigquery.timestamp(new Date()), + }) + }) + + await insertIntoDB(txs, bigquery, dbDetails) + console.log(`${ledgerResult.ledger_index}: inserted ${txCount} transactions`) + return ledgerResult.ledger_index +} + +async function getLastDBLedger(args) { + const { bigquery, projectID, datasetName, tableName } = args + let result + try { + result = await bigquery.query({ + query: `SELECT + COUNT(1) as TxCount, + MIN(LedgerIndex) as MinLedger, + MAX(LedgerIndex) as MaxLedger, + COUNT(DISTINCT LedgerIndex) as LedgersWithTxCount + FROM + ${projectID}.${datasetName}.${tableName}`, + useLegacySql: false, + }) + } catch(e) { + console.log('Google BigQuery Error', e) + throw e + } + return result[0][0].MaxLedger +} + +module.exports = { + process, + getLastDBLedger, +}