Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions Custom_Ingestion_File_Watcher.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Custom Ingestion Architecture

The below instructions are meant to show configuration for AWS Lambda functions in order to create interactions between IDO / AWS.

1. Create a new AWS Lambda Function using Python
2. Inside of the lambda function home page, add a trigger for the given action that you would like
3. Role: You will need to configure a role for correct permissions. It will need different permissions depending on your use case - the below has some that are common
1. EC2 Access
2. S3 Read Only
3. RDS Access
4. Cloudwatch Write
4. VPC config
1. You will need access to the VPC associated subnets
2. Will need security group for your given use
5. Enable Monitoring
6. Code
1. Libraries: Json, urllib.parse, boto3, requests
2. If you are running queries you will need to configure the below
1. Create an [rds client](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds-data.html)
2. Depending on wanted functionality, configure a sql string
3. Configure your [execute statement](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds-data.html#RDSDataService.Client.execute_statement)
7. Hit any API's that are needed using the requests lib
8. test / use cloudwatch to debug.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ The below instructions are for setting up a development environment in intellij
3. Open GitKraken and clone the [sdk repo](https://github.com/intellio/dataops-sdk-plugins)
4. Open IntelliJ and open the repository that you just pulled. After opening - go to plugins and add the plugin for sbt
5. Go to file->project structure to ensure that the SDK is corretto
6. Reference these files as you build out your notebook
6. Reference these files as you build out your notebook
90 changes: 70 additions & 20 deletions dataops-excel/README.md
Original file line number Diff line number Diff line change
@@ -1,25 +1,72 @@
# **DataOps Excel Plugin**
This code allows you to customize a dataframe from an excel spreadsheet. This enables you to pull out different information
from different sheets and ignore inimportant miscellaneous cells.
This code allows you to customize and ingest a dataframe from an excel spreadsheet. This enables you to pull out different information
from different sheets and ignore unimportant miscellaneous cells.

Steps
1. Set up [Custom Ingestion](https://intellio.gitbook.io/dataops/configuring-the-data-integration-process/custom-ingestion) and connect the sessionIngestion to your source
2. Copy the code in dataops-excel.scala into a databricks notebook
3. Ensure you have the following libraries
* https://github.com/crealytics/spark-excel (can be installed with Maven)
* intellio dataops sdk (will be at the toplevel of the datalake location)

4. Configure parameters in the source settings for getting excel data
5. Run the ingestion
6. Verify
1. Create a new workbook in your databricks environment. This can be in any location and with any name. Copy the code from dataops-excel.scala to your notebook.
2. Set up [Custom Ingestion](https://intellio.gitbook.io/dataops/user-manual/sdk/custom-ingestion) and connect the sessionIngestion to your source
1. Configure a custom Connection
2. Configure the custom source following the directions, set the notebook path to what you specified in step 1.
3. When setting up a cluster configuration, There are two options:
1. Creating a singular cluster: Not recommended but can be used for single file testing
2. Configuring a pool: Is supported in more instances and scales effectively with IDO configurations. See below instructions *Configuing a Pool*
3. Configure parameters in the source settings for getting excel data. See below *Configuring parameters*
4. Mount the s3 bucket to the DBFS. see below *Mounting S3 Bucket*
5. Pull data and verify wanted functionality


### Configuring a Pool
1. In your Databricks environment select "Compute" -> "Pools"
2. Create Pool
3. Configure the pool
1. Name - Excel Ingestion
2. Min Idle - 0 * Note - if you need compute fast, you can increase this number to have warmed up instances
3. Idle instance termination - {By case decision, 30 is a good amount}
4. Enable autoscaling local storage
5. Choose your instance type. See [here](https://aws.amazon.com/ec2/instance-types/) for helping decide what you should use.
6. Runtime - 7.3 LTS
7. Availability Zone - Same as where IDO is hosted
8. On demand/spot composition - All Spot
9. Max Spot Price - 100 * Note - can be whatever you want if you want more strict cost savings

### Configuring Parameters
There are two parameters you need to set for custom ingestion. The first is related to the pool you will be linking to, and the second will to feed the script.
1. Custom Cluster Params
```
{
"libraries":[
{"jar":"{intellio dataops-sdk jar location}"},
{"jar":"{intellio sparky jar location},
{"maven":{"coordinates":{ most up to date spark excel library}}}
],
"new_cluster":
{"spark_conf":
{"spark.sql.legacy.timeParserPolicy":"LEGACY",
"spark.hadoop.fs.s3a.experimental.input.fadvise":"sequential",
"spark.sql.legacy.avro.datetimeRebaseModeInRead":"CORRECTED",
"spark.sql.legacy.avro.datetimeRebaseModeInWrite":"CORRECTED"
},
"num_workers":{can scale to what you need, can default to 2},
"spark_version":"7.3.x-scala2.12",
"aws_attributes": {specify the aws instance profile arn},
"instance_pool_id":"{ID of the pool instance}"}
}
```
{
"DataRows":"A4", //select up the top left cell that your tabular data works with (REQUIRED)
"SheetName":"Sheet1", // sheetname (REQUIRED)
1. Finding the pool instance number
1. Navigate to the pool configuration tab within your Databricks location, at the end following the last '/', there will be an id collection of numbers and letters. That is the id to copy into that location.

2. Custom Parameters
```
{
"folderPath": "testing/test/madeUpFolder" //Folder location relative to mounted AWS bucket (REQUIRED)
"DataRows":"F99", //select up the top left cell that your tabular data works with (REQUIRED)
"SheetName":"MySheet1", // sheetname (REQUIRED)
"FileKey": "name", //key in the filename that shows that that file should be ingested (REQUIRED)
"FolderName": "ingestableFiles", //The lowest level folder to ingest files from, if its at the bucket level, specify the bucket name (REQUIRED)
"ShouldArchive":"true" // if you would like your file to be archived once it has been ingested (DEFAULT = FALSE)
"ShouldArchive":"true", // if you would like your file to be archived once it has been ingested (DEFAULT = FALSE)
"ShouldInferDataTypes": "true" // if you would like datetypes inferred from the file, if not, everything comes in as a string (DEFAULT = FALSE)
"AddColorColumns":"false" // if you would like to include data about cell coloring (DEFAULT = FALSE)
"SourceName": "{source name from ido}" // source that is ingesting the data (REQUIRED)
"SourceEnvironment": "[dev|prod|test|...]" // Specify whatever environment you are ingesting for (REQUIRED)
"HeaderRows": // (OPTIONAL)
[
{"key_reference":"A1","value":"B1"}, //Use key_reference when using a cell on the sheet as the header
Expand All @@ -29,10 +76,13 @@ Steps
}
```

Assumptions and Notes
### Mounting S3 Bucket
You will need to mount your aws bucket to the databricks filesystem. You do this using the mount command from the [filesystem](https://docs.databricks.com/data/data-sources/aws/amazon-s3.html).

### Assumptions and Notes
1. Each source can only load from one sheet
2. The DataRows Column selected is the top left of the tabular data and it will ingest any data below that
3. Archive archives file with a timestamp in an 'archive' folder in the same directory
3. 'Archive' archives 2 files into the archive bucket: The original file and the dataframe ingested. Both are timestmpaed
4. Hidden Cells are still read in
5. Whatever is shown to the user is what is copied to the dataframe, all formatting included
6. Merged data - it reads the upper left hand cell
5. Whatever is shown to the user is what is copied to the dataframe, all formatting included (unless you infer the schema).
6. For Merged data - it reads the upper left hand cell as what it should be reported within.
9 changes: 0 additions & 9 deletions dataops-excel/build.sbt

This file was deleted.

Binary file removed dataops-excel/lib/dataops-sdk.jar
Binary file not shown.
Binary file not shown.
1 change: 0 additions & 1 deletion dataops-excel/project/build.properties

This file was deleted.

133 changes: 87 additions & 46 deletions dataops-excel/src/main/scala/notebook/dataops-excel.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Import the neccesary libraries
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import com.crealytics.spark.excel._
Expand All @@ -6,94 +7,124 @@ import org.apache.spark.sql.functions._
import play.api.libs.json._
import java.io.File
import scala.util.matching._

// Create spark session and link session
// Create spark session
val spark = SparkSession.builder().getOrCreate()
val session = new IngestionSession("Sandbox","CustomExcelIngestion")

// Substantiate custom parameters
val folderName = getStringParameter("FolderName")
val session = new IngestionSession()
// Substantiate custom parameters coming in from IDO
val folderPath = getStringParameter("FolderPath")
val dataRows = getStringParameter("DataRows")
val sheetName = getStringParameter("SheetName")
val shouldArchive = getStringParameter("ShouldArchive")
val shouldInferDataTypes = getStringParameter("ShouldInferDataTypes")
val fileKey = getStringParameter("FileKey")
val addColorColumns = getStringParameter("AddColorColumns")
val sourceName = getStringParameter("SourceName")
val sourceEnvironment = getStringParameter("SourceEnvironment")
val headerDataRows = (session.customParameters \ "HeaderRows").asOpt[Vector[JsValue]]
val filesFound = dbutils.fs.ls(s"/mnt/$folderName").map((_.path)).toList.filter(x => x.contains(fileKey) && x.contains(folderName))

// Get all files in a list from the location
val filesFound = dbutils.fs.ls(s"/mnt/{mounted location on databricks file system}$folderPath").map((_.path)).toList.filter(x => x.contains(fileKey))

// Get the parameters that are strings coming in and default them or error
def getStringParameter(paramName: String) : String = {
val incomingParam = (session.customParameters \ paramName).asOpt[String]

//If its header information, it can be left Empty
if(incomingParam == None && paramName == "ShouldArchive") return ""
if(incomingParam == None && (paramName == "ShouldArchive" || paramName == "AddColorColumns" || paramName == "ShouldInferDataTypes")) {
return "false"
}
else if (incomingParam == None){
session.log("Parameter not substantiated: " + paramName)
}
incomingParam.get
}

def ingestAllDFs(): DataFrame = {
filesFound.isEmpty match {
case true => returnEmpty()
case false => FilesToIngest()
}

return incomingParam.get
}

// Send back empty dataframe
def returnEmpty(): DataFrame = {
val schema = StructType(Array(StructField("FileName", StringType, true,Metadata.empty)))
return spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)
}

def FilesToIngest(): DataFrame = {
val DFs = filesFound.map(x => ingestDf(x))
return UnionDFs(DFs.head, DFs.tail)
//Main ingestion point
def FilesToIngest(): Unit = {
// catch in case of edge case
if(filesFound.isEmpty){
return session.ingest(returnEmpty)
}
else{
//Get the first session
val firstFileDf = ingestDf(filesFound.head)
session.ingest( () => firstFileDf )

//archive the first session
if (shouldArchive.toBoolean){
archive(firstFileDf,filesFound.head)
}

//ingest and archive any others with the specified environment and source name
for((fileLocation) <- filesFound.tail){
val session = new IngestionSession(sourceEnvironment, sourceName)
val df = ingestDf(fileLocation)
session.ingest(() => df)

if (shouldArchive.toBoolean){
archive(df,fileLocation)
}
}
}
}

def UnionDFs(df: DataFrame, DFs: List[DataFrame]): DataFrame = DFs match {
case Nil => df
case x :: xs => UnionDFs(df.union(x),xs)
//Archive the data sources
def archive(returnedDataSet: DataFrame, fileLocation: String): Any = {
val fileName = fileLocation.split("/")(fileLocation.split("/").size - 1)
val exportTimeStamped = constructExport(fileLocation.split("/"),fileName)
val exportRawData = constructMove(fileLocation.split("/"),fileName)

returnedDataSet.write
.format("com.crealytics.spark.excel")
.option("header", "true")
.option("dataAddress", "'" + sheetName + "'!" + dataRows)
.save(exportTimeStamped)

dbutils.fs.mv(fileLocation, exportRawData)
}

// Ingest the Excel Sheet
def ingestDf(fileLocation: String): DataFrame = {
def ingestDf(fileLocation: String): (DataFrame) = {
//Start the Session
session.log("Starting Ingestion")

// Read in the Tabular Data
val df = spark.read
.format("com.crealytics.spark.excel")
.option("header", "true")
.option("addColorColumns", addColorColumns)
.option("usePlainNumberFormat", shouldInferDataTypes)
.option("inferSchema", shouldInferDataTypes)
.option("dataAddress", "'" + sheetName + "'!" + dataRows)
.load(fileLocation)

//Get the FileName and add the file header
val fileName = fileLocation.split("/")(fileLocation.split("/").size - 1)
val dfWithHeader = df.withColumn("FileName", lit(fileName)).withColumn("FileName&Sheet", lit(fileName + "/" + sheetName))

val returnedDataSet = getFinalDataSet(headerDataRows, dfWithHeader,fileLocation)

//Write out to archive
if (shouldArchive != "" && shouldArchive.toBoolean){
returnedDataSet.write
.format("com.crealytics.spark.excel")
.option("header", "true")
.option("dataAddress", "'" + sheetName + "'!" + dataRows)
.save(constructExport(fileLocation.split("/"),fileName))

//delete the file TODO - ask Jacob why it fails
//dbutils.fs.rm(fileLocation)
}
return returnedDataSet
return (returnedDataSet)
}

//If the header data is none then return the current DF, else add on the headers
def getFinalDataSet(HeaderData: AnyRef, dfWithHeader: DataFrame, fileLocation: String) : DataFrame = {
HeaderData match {
case None => dfWithHeader
case _ => HandleHeaderData(dfWithHeader,fileLocation)
}
}

//New header Stuff
//New header Specs
def HandleHeaderData(dfWithHeader: DataFrame, fileLocation: String) : DataFrame = {
val headerMap = scala.collection.mutable.Map[String,String]()
val mappedValues = headerDataRows.get.map(x => getValues(x,headerMap,fileLocation))
Expand All @@ -102,9 +133,9 @@ def HandleHeaderData(dfWithHeader: DataFrame, fileLocation: String) : DataFrame
val completeDF = dfWithHeader.selectExpr(headerSeq:_*)
return completeDF
}

//get the values into the map
def getValues(Item: JsValue,headerMap: scala.collection.mutable.Map[String, String], fileLocation: String){
def getValues(Item: JsValue, headerMap: scala.collection.mutable.Map[String, String], fileLocation: String){
val value = (Item \ "value").as[String]
if((Item \ "key_custom").asOpt[String] != None){
headerMap.put((Item \ "key_custom").as[String], lookupCell(value, fileLocation))
Expand All @@ -114,22 +145,32 @@ def getValues(Item: JsValue,headerMap: scala.collection.mutable.Map[String, Stri
session.log("Header Cell Mapping not correct")
}
}

def lookupCell(cell: String, fileLocation: String) : String = {
val headerDataDF = spark.read
.format("com.crealytics.spark.excel")
.option("header", "false")
.option("addColorColumns", addColorColumns)
.option("usePlainNumberFormat", shouldInferDataTypes)
.option("inferSchema", shouldInferDataTypes)
.option("dataAddress", "'" + sheetName + "'!" + cell + ":" + cell)
.load(fileLocation)
return headerDataDF.first().getString(0)
}

def constructMove(fileLocation: Array[String], fileName: String): String = {
fileLocation.head match {
case `fileName` => "archive/" + fileName.split("\\.")(0) + "_Original_" + sheetName + "_" + java.time.LocalDate.now + "_" + java.time.LocalTime.now + "." + fileName.split("\\.")(1)
case _ => fileLocation.head + "/" + constructMove(fileLocation.tail, fileName)
}
}

//Export file to archive folder with timestamp
def constructExport(fileLocation: Array[String], fileName: String): String = {
fileLocation.head match {
case `fileName` => "archive/" + fileName.split("\\.")(0) + "_" + sheetName + "_" + java.time.LocalDate.now + "_" + java.time.LocalTime.now + "." + fileName.split("\\.")(1)
case _ => fileLocation.head + "/" + constructExport(fileLocation.tail, fileName)
}
}

session.ingest(ingestAllDFs)
FilesToIngest()