Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3390,6 +3390,16 @@ class PlanGenerationTestSuite
fn.typedLit(java.time.Duration.ofSeconds(200L)),
fn.typedLit(java.time.Period.ofDays(100)),
fn.typedLit(new CalendarInterval(2, 20, 100L)),
fn.typedLit(
(
java.time.LocalDate.of(2020, 10, 10),
java.time.Instant.ofEpochMilli(1677155519808L),
new java.sql.Timestamp(12345L),
java.time.LocalDateTime.of(2023, 2, 23, 20, 36),
java.sql.Date.valueOf("2023-02-23"),
java.time.Duration.ofSeconds(200L),
java.time.Period.ofDays(100),
new CalendarInterval(2, 20, 100L))),

// Handle parameterized scala types e.g.: List, Seq and Map.
fn.typedLit(Some(1)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ object LiteralValueProtoConverter {
}
}

private def getConverter(dataType: proto.DataType): proto.Expression.Literal => Any = {
private def getScalaConverter(dataType: proto.DataType): proto.Expression.Literal => Any = {
if (dataType.hasShort) { v =>
v.getShort.toShort
} else if (dataType.hasInteger) { v =>
Expand All @@ -316,15 +316,15 @@ object LiteralValueProtoConverter {
} else if (dataType.hasBinary) { v =>
v.getBinary.toByteArray
} else if (dataType.hasDate) { v =>
v.getDate
SparkDateTimeUtils.toJavaDate(v.getDate)
} else if (dataType.hasTimestamp) { v =>
v.getTimestamp
SparkDateTimeUtils.toJavaTimestamp(v.getTimestamp)
} else if (dataType.hasTimestampNtz) { v =>
v.getTimestampNtz
SparkDateTimeUtils.microsToLocalDateTime(v.getTimestampNtz)
} else if (dataType.hasDayTimeInterval) { v =>
v.getDayTimeInterval
SparkIntervalUtils.microsToDuration(v.getDayTimeInterval)
} else if (dataType.hasYearMonthInterval) { v =>
v.getYearMonthInterval
SparkIntervalUtils.monthsToPeriod(v.getYearMonthInterval)
} else if (dataType.hasDecimal) { v =>
Decimal(v.getDecimal.getValue)
} else if (dataType.hasCalendarInterval) { v =>
Expand Down Expand Up @@ -354,7 +354,7 @@ object LiteralValueProtoConverter {
builder.result()
}

makeArrayData(getConverter(array.getElementType))
makeArrayData(getScalaConverter(array.getElementType))
}

def toCatalystMap(map: proto.Expression.Literal.Map): mutable.Map[_, _] = {
Expand All @@ -373,7 +373,7 @@ object LiteralValueProtoConverter {
builder
}

makeMapData(getConverter(map.getKeyType), getConverter(map.getValueType))
makeMapData(getScalaConverter(map.getKeyType), getScalaConverter(map.getValueType))
}

def toCatalystStruct(struct: proto.Expression.Literal.Struct): Any = {
Expand All @@ -392,7 +392,7 @@ object LiteralValueProtoConverter {
val structData = elements
.zip(dataTypes)
.map { case (element, dataType) =>
getConverter(dataType)(element)
getScalaConverter(dataType)(element)
}
.asInstanceOf[scala.collection.Seq[Object]]
.toSeq
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Project [id#0L, id#0L, 1 AS 1#0, null AS NULL#0, true AS true#0, 68 AS 68#0, 9872 AS 9872#0, -8726532 AS -8726532#0, 7834609328726532 AS 7834609328726532#0L, 2.718281828459045 AS 2.718281828459045#0, -0.8 AS -0.8#0, 89.97620 AS 89.97620#0, 89889.7667231 AS 89889.7667231#0, connect! AS connect!#0, T AS T#0, ABCDEFGHIJ AS ABCDEFGHIJ#0, 0x78797A7B7C7D7E7F808182838485868788898A8B8C8D8E AS X'78797A7B7C7D7E7F808182838485868788898A8B8C8D8E'#0, 0x0806 AS X'0806'#0, [8,6] AS ARRAY(8, 6)#0, null AS NULL#0, 2020-10-10 AS DATE '2020-10-10'#0, 8.997620 AS 8.997620#0, 2023-02-23 04:31:59.808 AS TIMESTAMP '2023-02-23 04:31:59.808'#0, 1969-12-31 16:00:12.345 AS TIMESTAMP '1969-12-31 16:00:12.345'#0, 2023-02-23 20:36:00 AS TIMESTAMP_NTZ '2023-02-23 20:36:00'#0, ... 18 more fields]
Project [id#0L, id#0L, 1 AS 1#0, null AS NULL#0, true AS true#0, 68 AS 68#0, 9872 AS 9872#0, -8726532 AS -8726532#0, 7834609328726532 AS 7834609328726532#0L, 2.718281828459045 AS 2.718281828459045#0, -0.8 AS -0.8#0, 89.97620 AS 89.97620#0, 89889.7667231 AS 89889.7667231#0, connect! AS connect!#0, T AS T#0, ABCDEFGHIJ AS ABCDEFGHIJ#0, 0x78797A7B7C7D7E7F808182838485868788898A8B8C8D8E AS X'78797A7B7C7D7E7F808182838485868788898A8B8C8D8E'#0, 0x0806 AS X'0806'#0, [8,6] AS ARRAY(8, 6)#0, null AS NULL#0, 2020-10-10 AS DATE '2020-10-10'#0, 8.997620 AS 8.997620#0, 2023-02-23 04:31:59.808 AS TIMESTAMP '2023-02-23 04:31:59.808'#0, 1969-12-31 16:00:12.345 AS TIMESTAMP '1969-12-31 16:00:12.345'#0, 2023-02-23 20:36:00 AS TIMESTAMP_NTZ '2023-02-23 20:36:00'#0, ... 19 more fields]
+- LocalRelation <empty>, [id#0L, a#0, b#0]
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,114 @@
}
}
}
}, {
"literal": {
"struct": {
"structType": {
"struct": {
"fields": [{
"name": "_1",
"dataType": {
"date": {
}
},
"nullable": true
}, {
"name": "_2",
"dataType": {
"timestamp": {
}
},
"nullable": true
}, {
"name": "_3",
"dataType": {
"timestamp": {
}
},
"nullable": true
}, {
"name": "_4",
"dataType": {
"timestampNtz": {
}
},
"nullable": true
}, {
"name": "_5",
"dataType": {
"date": {
}
},
"nullable": true
}, {
"name": "_6",
"dataType": {
"dayTimeInterval": {
"startField": 0,
"endField": 3
}
},
"nullable": true
}, {
"name": "_7",
"dataType": {
"yearMonthInterval": {
"startField": 0,
"endField": 1
}
},
"nullable": true
}, {
"name": "_8",
"dataType": {
"calendarInterval": {
}
},
"nullable": true
}]
}
},
"elements": [{
"date": 18545
}, {
"timestamp": "1677155519808000"
}, {
"timestamp": "12345000"
}, {
"timestampNtz": "1677184560000000"
}, {
"date": 19411
}, {
"dayTimeInterval": "200000000"
}, {
"yearMonthInterval": 0
}, {
"calendarInterval": {
"months": 2,
"days": 20,
"microseconds": "100"
}
}]
}
},
"common": {
"origin": {
"jvmOrigin": {
"stackTrace": [{
"classLoaderName": "app",
"declaringClass": "org.apache.spark.sql.functions$",
"methodName": "typedLit",
"fileName": "functions.scala"
}, {
"classLoaderName": "app",
"declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite",
"methodName": "~~trimmed~anonfun~~",
"fileName": "PlanGenerationTestSuite.scala"
}]
}
}
}
}, {
"literal": {
"integer": 1
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import io.grpc.{Context, Status, StatusRuntimeException}
import io.grpc.stub.StreamObserver
import org.apache.commons.lang3.exception.ExceptionUtils

import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this irrelevant change, @heyihong .

import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
import org.apache.spark.connect.proto
Expand Down