Skip to content

Commit 477c7d3

Browse files
[SDP] Sink et al.
1 parent 93c3e1c commit 477c7d3

File tree

10 files changed

+185
-93
lines changed

10 files changed

+185
-93
lines changed

docs/declarative-pipelines/DataflowGraph.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
* <span id="flows"> [Flow](Flow.md)s
1010
* <span id="tables"> [Table](Table.md)s
11+
* <span id="sinks"> [Sink](Sink.md)s
1112
* <span id="views"> [View](View.md)s
1213

1314
`DataflowGraph` is created when:
@@ -26,7 +27,7 @@ output: Map[TableIdentifier, Output]
2627

2728
Learn more in the [Scala Language Specification]({{ scala.spec }}/05-classes-and-objects.html#lazy).
2829

29-
`output` is a collection of unique `Output`s ([Table](Table.md)s) by their `TableIdentifier`.
30+
`output` is a collection of unique `Output`s ([tables](#tables) and [sinks](#sinks)) by their `TableIdentifier`.
3031

3132
---
3233

docs/declarative-pipelines/GraphElementRegistry.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,22 @@
44

55
## Contract
66

7-
### register_dataset { #register_dataset }
7+
### Register Output { #register_output }
88

99
```py
10-
register_dataset(
10+
register_output(
1111
self,
12-
dataset: Dataset,
12+
output: Output,
1313
) -> None
1414
```
1515

1616
See:
1717

18-
* [SparkConnectGraphElementRegistry](SparkConnectGraphElementRegistry.md#register_dataset)
18+
* [SparkConnectGraphElementRegistry](SparkConnectGraphElementRegistry.md#register_output)
1919

2020
Used when:
2121

22+
* [create_sink](./index.md#create_sink) is used
2223
* [@create_streaming_table](./index.md#create_streaming_table), [@table](./index.md#table), [@materialized_view](./index.md#materialized_view), [@temporary_view](./index.md#temporary_view) decorators are used
2324

2425
### register_flow { #register_flow }

docs/declarative-pipelines/GraphRegistrationContext.md

Lines changed: 56 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,29 @@ Eventually, `GraphRegistrationContext` [becomes a DataflowGraph](#toDataflowGrap
2424
toDataflowGraph: DataflowGraph
2525
```
2626

27-
`toDataflowGraph` creates a new [DataflowGraph](DataflowGraph.md) with the [tables](#tables), [views](#views), and [flows](#flows) fully-qualified, resolved, and de-duplicated.
27+
`toDataflowGraph` creates a new [DataflowGraph](DataflowGraph.md) with the [tables](#tables), [views](#views), [sinks](#sinks) and [flows](#flows) fully-qualified, resolved, and de-duplicated.
2828

2929
??? note "AnalysisException"
30-
`toDataflowGraph` reports an `AnalysisException` for a `GraphRegistrationContext` with no [tables](#tables) and no `PersistedView`s (in the [views](#views) registry).
30+
`toDataflowGraph` reports an `AnalysisException` when this `GraphRegistrationContext` is [empty](#isPipelineEmpty).
3131

3232
---
3333

3434
`toDataflowGraph` is used when:
3535

3636
* `PipelinesHandler` is requested to [start a pipeline run](PipelinesHandler.md#startRun)
3737

38+
### isPipelineEmpty { #isPipelineEmpty }
39+
40+
```scala
41+
isPipelineEmpty: Boolean
42+
```
43+
44+
`isPipelineEmpty` is `true` when this pipeline (this `GraphRegistrationContext`) is empty, i.e., for all the following met:
45+
46+
1. No [tables](#tables) registered
47+
1. No [PersistedView](PersistedView.md)s registered (among the [views](#views))
48+
1. No [sinks](#sinks) registered
49+
3850
### assertNoDuplicates { #assertNoDuplicates }
3951

4052
```scala
@@ -65,48 +77,71 @@ Flow [flow_name] was found in multiple datasets: [dataset_names]
6577

6678
`GraphRegistrationContext` creates an empty registry of [Table](Table.md)s when [created](#creating-instance).
6779

68-
A new [Table](Table.md) is added when [registerTable](#registerTable).
80+
A new [Table](Table.md) is added when `GraphRegistrationContext` is requested to [register a table](#registerTable).
6981

7082
## Views { #views }
7183

7284
`GraphRegistrationContext` creates an empty registry of [View](View.md)s when [created](#creating-instance).
7385

86+
## Sinks { #sinks }
87+
88+
`GraphRegistrationContext` creates an empty registry of [Sink](Sink.md)s when [created](#creating-instance).
89+
7490
## Flows { #flows }
7591

7692
`GraphRegistrationContext` creates an empty registry of [UnresolvedFlow](UnresolvedFlow.md)s when [created](#creating-instance).
7793

78-
## Register Table { #registerTable }
94+
## Register Flow { #registerFlow }
7995

8096
```scala
81-
registerTable(
82-
tableDef: Table): Unit
97+
registerFlow(
98+
flowDef: UnresolvedFlow): Unit
8399
```
84100

85-
`registerTable` adds the given [Table](Table.md) to the [tables](#tables) registry.
101+
`registerFlow` adds the given [UnresolvedFlow](UnresolvedFlow.md) to the [flows](#flows) registry.
86102

87103
---
88104

89-
`registerTable` is used when:
105+
`registerFlow` is used when:
90106

91-
* `PipelinesHandler` is requested to [define a dataset](PipelinesHandler.md#defineDataset)
107+
* `PipelinesHandler` is requested to [define a flow](PipelinesHandler.md#defineFlow)
108+
* `SqlGraphRegistrationContext` is requested to [process the following logical commands](SqlGraphRegistrationContext.md#processSqlQuery):
109+
* [CREATE FLOW ... AS INSERT INTO ... BY NAME](../logical-operators/CreateFlowCommand.md)
110+
* [CREATE MATERIALIZED VIEW ... AS](../logical-operators/CreateMaterializedViewAsSelect.md)
111+
* [CREATE STREAMING TABLE ... AS](../logical-operators/CreateStreamingTableAsSelect.md)
112+
* [CREATE TEMPORARY VIEW](../logical-operators/CreateViewCommand.md)
113+
* [CREATE VIEW](../logical-operators/CreateView.md)
92114

93-
## Register Flow { #registerFlow }
115+
## Register Sink { #registerSink }
94116

95117
```scala
96-
registerFlow(
97-
flowDef: UnresolvedFlow): Unit
118+
registerSink(
119+
sinkDef: Sink): Unit
98120
```
99121

100-
`registerFlow` adds the given [UnresolvedFlow](UnresolvedFlow.md) to the [flows](#flows) registry.
122+
`registerSink` adds the given [Sink](Sink.md) to the [sinks](#sinks) registry.
101123

102124
---
103125

104-
`registerFlow` is used when:
126+
`registerSink` is used when:
105127

106-
* `PipelinesHandler` is requested to [define a flow](PipelinesHandler.md#defineFlow)
107-
* `SqlGraphRegistrationContext` is requested to [handle the following logical commands](SqlGraphRegistrationContext.md#processSqlQuery):
108-
* [CreateFlowCommand](SqlGraphRegistrationContext.md#CreateFlowCommand)
109-
* [CreateMaterializedViewAsSelect](SqlGraphRegistrationContext.md#CreateMaterializedViewAsSelect)
110-
* [CreateView](SqlGraphRegistrationContext.md#CreateView)
111-
* [CreateStreamingTableAsSelect](SqlGraphRegistrationContext.md#CreateStreamingTableAsSelect)
112-
* [CreateViewCommand](SqlGraphRegistrationContext.md#CreateViewCommand)
128+
* `PipelinesHandler` is requested to [define an output](PipelinesHandler.md#defineOutput)
129+
130+
## Register Table { #registerTable }
131+
132+
```scala
133+
registerTable(
134+
tableDef: Table): Unit
135+
```
136+
137+
`registerTable` adds the given [Table](Table.md) to the [tables](#tables) registry.
138+
139+
---
140+
141+
`registerTable` is used when:
142+
143+
* `PipelinesHandler` is requested to [define an output](PipelinesHandler.md#defineOutput)
144+
* `SqlGraphRegistrationContext` is requested to [process the following logical commands](SqlGraphRegistrationContext.md#processSqlQuery):
145+
* [CREATE MATERIALIZED VIEW ... AS](../logical-operators/CreateMaterializedViewAsSelect.md)
146+
* [CREATE STREAMING TABLE ... AS](../logical-operators/CreateStreamingTableAsSelect.md)
147+
* [CREATE STREAMING TABLE](../logical-operators/CreateStreamingTable.md)
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# PersistedView
2+
3+
`PersistedView` is...FIXME

docs/declarative-pipelines/PipelinesHandler.md

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,17 @@ handlePipelinesCommand(
2626
|-----------------|-------------|-----------|
2727
| `CREATE_DATAFLOW_GRAPH` | [Creates a new dataflow graph](#CREATE_DATAFLOW_GRAPH) | [pyspark.pipelines.spark_connect_pipeline](spark_connect_pipeline.md#create_dataflow_graph) |
2828
| `DROP_DATAFLOW_GRAPH` | [Drops a pipeline](#DROP_DATAFLOW_GRAPH) ||
29-
| `DEFINE_DATASET` | [Defines a dataset](#DEFINE_DATASET) | [SparkConnectGraphElementRegistry](SparkConnectGraphElementRegistry.md#register_dataset) |
29+
| `DEFINE_OUTPUT` | [Defines an output](#DEFINE_OUTPUT) (a table, a materialized view, a temporary view or a sink) | [SparkConnectGraphElementRegistry](SparkConnectGraphElementRegistry.md#register_output) |
3030
| `DEFINE_FLOW` | [Defines a flow](#DEFINE_FLOW) | [SparkConnectGraphElementRegistry](SparkConnectGraphElementRegistry.md#register_flow) |
3131
| `START_RUN` | [Starts a pipeline run](#START_RUN) | [pyspark.pipelines.spark_connect_pipeline](spark_connect_pipeline.md#start_run) |
3232
| `DEFINE_SQL_GRAPH_ELEMENTS` | [DEFINE_SQL_GRAPH_ELEMENTS](#DEFINE_SQL_GRAPH_ELEMENTS) | [SparkConnectGraphElementRegistry](SparkConnectGraphElementRegistry.md#register_sql) |
3333

34-
`handlePipelinesCommand` reports an `UnsupportedOperationException` for incorrect commands:
34+
??? warning "UnsupportedOperationException"
35+
`handlePipelinesCommand` reports an `UnsupportedOperationException` for incorrect commands:
3536

36-
```text
37-
[other] not supported
38-
```
37+
```text
38+
[other] not supported
39+
```
3940

4041
---
4142

@@ -51,15 +52,15 @@ handlePipelinesCommand(
5152

5253
[handlePipelinesCommand](#handlePipelinesCommand)...FIXME
5354

54-
### <span id="DefineDataset"> DEFINE_DATASET { #DEFINE_DATASET }
55+
### <span id="DefineOutput"> DEFINE_OUTPUT { #DEFINE_OUTPUT }
5556

5657
[handlePipelinesCommand](#handlePipelinesCommand) prints out the following INFO message to the logs:
5758

5859
```text
59-
Define pipelines dataset cmd received: [cmd]
60+
Define pipelines output cmd received: [cmd]
6061
```
6162

62-
`handlePipelinesCommand` [defines a dataset](#defineDataset).
63+
`handlePipelinesCommand` [defines an output](#defineOutput) and responds with a resolved dataset (with a catalog and a database when specified)
6364

6465
### <span id="DefineFlow"> DEFINE_FLOW { #DEFINE_FLOW }
6566

@@ -94,8 +95,8 @@ startRun(
9495
sessionHolder: SessionHolder): Unit
9596
```
9697

97-
??? note "`START_RUN` Pipeline Command"
98-
`startRun` is used when `PipelinesHandler` is requested to handle [proto.PipelineCommand.CommandTypeCase.START_RUN](#START_RUN) command.
98+
??? note "START_RUN Pipeline Command"
99+
`startRun` is used to handle [START_RUN](#START_RUN) pipeline command.
99100

100101
`startRun` finds the [GraphRegistrationContext](GraphRegistrationContext.md) by `dataflowGraphId` in the [DataflowGraphRegistry](DataflowGraphRegistry.md) (in the given `SessionHolder`).
101102

@@ -113,6 +114,9 @@ createDataflowGraph(
113114
spark: SparkSession): String
114115
```
115116

117+
??? note "CREATE_DATAFLOW_GRAPH Pipeline Command"
118+
`createDataflowGraph` is used to handle [CREATE_DATAFLOW_GRAPH](#CREATE_DATAFLOW_GRAPH) pipeline command.
119+
116120
`createDataflowGraph` gets the catalog (from the given `CreateDataflowGraph` if defined in the [pipeline specification file](index.md#pipeline-specification-file)) or prints out the following INFO message to the logs and uses the current catalog instead.
117121

118122
```text
@@ -127,40 +131,48 @@ No default database was supplied. Falling back to the current database: [current
127131

128132
In the end, `createDataflowGraph` [creates a dataflow graph](DataflowGraphRegistry.md#createDataflowGraph) (in the session's [DataflowGraphRegistry](DataflowGraphRegistry.md)).
129133

130-
## defineSqlGraphElements { #defineSqlGraphElements }
134+
## Define SQL Datasets { #defineSqlGraphElements }
131135

132136
```scala
133137
defineSqlGraphElements(
134138
cmd: proto.PipelineCommand.DefineSqlGraphElements,
135139
session: SparkSession): Unit
136140
```
137141

142+
??? note "DEFINE_SQL_GRAPH_ELEMENTS Pipeline Command"
143+
`defineSqlGraphElements` is used to handle [DEFINE_SQL_GRAPH_ELEMENTS](#DEFINE_SQL_GRAPH_ELEMENTS) pipeline command.
144+
138145
`defineSqlGraphElements` [looks up the GraphRegistrationContext for the dataflow graph ID](DataflowGraphRegistry.md#getDataflowGraphOrThrow) (from the given `DefineSqlGraphElements` command and in the given `SessionHolder`).
139146

140147
`defineSqlGraphElements` creates a new [SqlGraphRegistrationContext](SqlGraphRegistrationContext.md) (for the `GraphRegistrationContext`) to [process the SQL definition file](SqlGraphRegistrationContext.md#processSqlFile).
141148

142-
## Define Dataset (Table or View) { #defineDataset }
149+
## Define Output { #defineOutput }
143150

144151
```scala
145-
defineDataset(
146-
dataset: proto.PipelineCommand.DefineDataset,
147-
sparkSession: SparkSession): Unit
152+
defineOutput(
153+
output: proto.PipelineCommand.DefineOutput,
154+
sessionHolder: SessionHolder): TableIdentifier
148155
```
149156

150-
`defineDataset` looks up the [GraphRegistrationContext](DataflowGraphRegistry.md#getDataflowGraphOrThrow) for the given `dataset` (or throws a `SparkException` if not found).
157+
??? note "DEFINE_OUTPUT Pipeline Command"
158+
`defineOutput` is used to handle [DEFINE_OUTPUT](#DEFINE_OUTPUT) pipeline command.
159+
160+
`defineOutput` looks up the [GraphRegistrationContext](DataflowGraphRegistry.md#getDataflowGraphOrThrow) for the dataflow graph ID of the given `output` (or throws a `SparkException` if not found).
151161

152-
`defineDataset` branches off based on the `dataset` type:
162+
`defineOutput` branches off based on the `output` type:
153163

154164
| Dataset Type | Action |
155165
|--------------|--------|
156166
| `MATERIALIZED_VIEW` or `TABLE` | [Registers a table](GraphRegistrationContext.md#registerTable) |
157167
| `TEMPORARY_VIEW` | [Registers a view](GraphRegistrationContext.md#registerView) |
168+
| `SINK` | [Registers a sink](GraphRegistrationContext.md#registerSink) |
158169

159-
For unknown types, `defineDataset` reports an `IllegalArgumentException`:
170+
??? warning "IllegalArgumentException"
171+
For unknown types, `defineOutput` reports an `IllegalArgumentException`:
160172

161-
```text
162-
Unknown dataset type: [type]
163-
```
173+
```text
174+
Unknown output type: [type]
175+
```
164176

165177
## Define Flow { #defineFlow }
166178

@@ -172,7 +184,7 @@ defineFlow(
172184
```
173185

174186
??? note "DEFINE_FLOW Pipeline Command"
175-
`defineFlow` is used to handle [DEFINE_FLOW](#DEFINE_FLOW).
187+
`defineFlow` is used to handle [DEFINE_FLOW](#DEFINE_FLOW) pipeline command.
176188

177189
`defineFlow` looks up the [GraphRegistrationContext](DataflowGraphRegistry.md#getDataflowGraphOrThrow) for the given `flow` (or throws a `SparkException` if not found).
178190

@@ -185,7 +197,7 @@ defineFlow(
185197

186198
`defineFlow` [creates a flow identifier](GraphIdentifierManager.md#parseTableIdentifier) (for the `flow` name).
187199

188-
??? note "AnalysisException"
200+
??? warning "AnalysisException"
189201
`defineFlow` reports an `AnalysisException` if the given `flow` is not an implicit flow, but is defined with a multi-part identifier.
190202

191203
In the end, `defineFlow` [registers a flow](GraphRegistrationContext.md#registerFlow) (with a proper [FlowFunction](FlowAnalysis.md#createFlowFunctionFromLogicalPlan)).

docs/declarative-pipelines/Sink.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Sink
2+
3+
`Sink` is an [extension](#contract) of the [GraphElement](GraphElement.md) and [Output](Output.md) abstractions for [pipeline sinks](#implementations) that can define their [write format](#format) and [options](#options).
4+
5+
## Contract
6+
7+
### Format { #format }
8+
9+
```scala
10+
format: String
11+
```
12+
13+
Used when:
14+
15+
* `PipelinesHandler` is requested to [define a sink (output)](PipelinesHandler.md#defineOutput)
16+
* `SinkWrite` is requested to [start a stream](SinkWrite.md#startStream)
17+
18+
### Options { #options }
19+
20+
```scala
21+
options: Map[String, String]
22+
```
23+
24+
Used when:
25+
26+
* `PipelinesHandler` is requested to [define a sink (output)](PipelinesHandler.md#defineOutput)
27+
* `SinkWrite` is requested to [start a stream](SinkWrite.md#startStream)
28+
29+
## Implementations
30+
31+
* [SinkImpl](SinkImpl.md)
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# SinkImpl
2+
3+
`SinkImpl` is a [Sink](Sink.md).
4+
5+
## Creating Instance
6+
7+
`SinkImpl` takes the following to be created:
8+
9+
* <span id="identifier"> [TableIdentifier](GraphElement.md#identifier)
10+
* <span id="format"> [Format](Sink.md#format)
11+
* <span id="options"> [Options](Sink.md#options)
12+
* <span id="origin"> [QueryOrigin](GraphElement.md#origin)
13+
14+
`SinkImpl` is created when:
15+
16+
* `PipelinesHandler` is requested to [defineOutput](PipelinesHandler.md#defineOutput)
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# SinkWrite
2+
3+
`SinkWrite` is...FIXME

0 commit comments

Comments
 (0)