|
1 | 1 | // Allow CloudWatch to read source maps
|
2 | 2 | import 'source-map-support/register'
|
3 |
| -import { DocumentClient } from 'aws-sdk/clients/dynamodb' |
4 |
| -import { S3Event, S3EventRecord } from 'aws-lambda' |
5 |
| -import S3 from 'aws-sdk/clients/s3' |
6 |
| -import path from 'path' |
7 |
| -import os from 'os' |
8 |
| -import util from 'util' |
9 |
| -import * as stream from 'stream' |
10 |
| -import * as fs from 'fs' |
11 |
| -import neatCsv from 'neat-csv' |
12 |
| -import { v4 as uuidv4 } from 'uuid' |
13 |
| -const pipeline = util.promisify(stream.pipeline) |
| 3 | +import AWSXRay from 'aws-xray-sdk-core' |
| 4 | +import { S3Event } from 'aws-lambda' |
| 5 | +import https from 'https' |
| 6 | + |
| 7 | +// Instead of keeping all of our logic in one file, |
| 8 | +// it is much easier to handle the logic by storing it the /src folder |
| 9 | +import processCSV from './src/main' |
| 10 | + |
| 11 | +if (process.env.AWS_EXECUTION_ENV) { |
| 12 | + AWSXRay.captureHTTPsGlobal(https, true) |
| 13 | +} |
| 14 | + |
| 15 | +// Here we are introducing a File Event Port, per Hexagonal architecture principles |
| 16 | +import parseFileEvent from './src/parse-file-event' |
| 17 | + |
| 18 | +// Since we are going to reference a File structure, { key, bucket }, quite a lot, |
| 19 | +// it is much better to define a proper interface than to rely on "any" a type |
| 20 | +import IFile from './src/IFile' |
| 21 | + |
14 | 22 | const TABLE_NAME = process.env.TABLE_NAME || ''
|
15 |
| -const documentClient = new DocumentClient() |
16 |
| -const s3 = new S3() |
| 23 | + |
17 | 24 | export async function handler(event: S3Event): Promise<any> {
|
18 |
| - const s3Records = event.Records |
19 |
| - await Promise.all(s3Records.map(async (record: S3EventRecord) => processCSV(record))) |
20 |
| -} |
21 |
| -async function processCSV(record: S3EventRecord): Promise<any> { |
22 |
| - const downloadPath = path.join(os.tmpdir(), record.s3.object.key) |
23 |
| - try { |
24 |
| - const readable = s3.getObject({ |
25 |
| - Bucket: record.s3.bucket.name, |
26 |
| - Key: record.s3.object.key, |
27 |
| - }).createReadStream() |
28 |
| - const writable = fs.createWriteStream(downloadPath, { encoding: 'utf8' }) |
29 |
| - await pipeline( |
30 |
| - readable, |
31 |
| - writable |
32 |
| - ) |
33 |
| - } catch (e) { |
34 |
| - console.log(e) |
35 |
| - throw e |
36 |
| - } |
37 |
| - const readCsv = fs.createReadStream(downloadPath) |
38 |
| - const jsonData = await neatCsv(readCsv) |
39 |
| - await Promise.all(jsonData.map(async (entry: any) => { |
40 |
| - entry.id = uuidv4() |
41 |
| - try { |
42 |
| - return await documentClient.put({ |
43 |
| - TableName: TABLE_NAME, |
44 |
| - Item: entry, |
45 |
| - }).promise() |
46 |
| - } catch (e) { |
47 |
| - console.error(e) |
48 |
| - throw e |
49 |
| - } |
50 |
| - })) |
| 25 | + |
| 26 | + // With it we are parsing an external event to a format more suitable for our business logic |
| 27 | + const receivedFiles: IFile[] = parseFileEvent(event) |
| 28 | + |
| 29 | + // Simply, we are going through all event Files, |
| 30 | + // and passing them to our main business logic for processing |
| 31 | + return await Promise.all(receivedFiles.map(async (file: IFile) => processCSV(file, TABLE_NAME))) |
51 | 32 | }
|
0 commit comments