-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMultipleHandlersSupport.scala
252 lines (225 loc) · 8.46 KB
/
MultipleHandlersSupport.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
package org.encalmo.lambda
import org.encalmo.utils.JsonUtils.*
import ujson.Arr
import ujson.Obj
import upickle.default.*
import scala.util.control.NonFatal
final case class UnsupportedRequestError(message: String) extends Exception(message) with ApiGatewayBadRequestException
final case class UnsupportedEventError(message: String) extends Exception(message)
trait MultipleHandlersSupport extends EventHandler, EventHandlerTag {
private val functionNameRegex = "\"(?:function|functionName|handler)\"(?:\\s*):(?:\\s*)\"(.+?)\"".r
private val requestPathRegex = "\"path\"(?:\\s*):(?:\\s*)\"(.+?)\"".r
final override inline def getEventHandlerTag(event: String): Option[String] =
try {
functionNameRegex
.findFirstMatchIn(event)
.orElse(requestPathRegex.findFirstMatchIn(event))
.flatMap(m => Option(m.group(1)))
} catch { case NonFatal(_) => None }
def apiGatewayRequestHandlers: Iterable[ApiGatewayRequestHandler[ApplicationContext]]
def sqsEventHandlers: Iterable[SqsEventHandler[ApplicationContext]]
def genericEventHandlers: Iterable[GenericEventHandler[ApplicationContext]]
lazy val sqsEventHandlersMap: Map[String, SqsEventHandler[ApplicationContext]] =
sqsEventHandlers
.map(handler =>
(
handler.functionName
.getOrElse {
val className = handler.getClass.getSimpleName()
if (className.endsWith("Handler")) then className.dropRight(7)
else className
},
handler
)
)
.toMap
lazy val genericEventHandlersMap: Map[String, GenericEventHandler[ApplicationContext]] = genericEventHandlers
.map(handler =>
(
handler.functionName
.getOrElse {
val className = handler.getClass.getSimpleName()
if (className.endsWith("Handler")) then className.dropRight(7)
else className
},
handler
)
)
.toMap
final override def handleRequest(input: String)(using LambdaContext, ApplicationContext): String =
parseInput(input).match {
case request: ApiGatewayRequest =>
try {
apiGatewayRequestHandlers
.foldLeft[Option[ApiGatewayResponse]](None)((result, stub) => result.orElse(stub.handleRequest(request)))
.map(_.writeAsString)
.getOrElse(
throw UnsupportedRequestError(
s"${request.httpMethod} ${request.path}"
)
)
} catch handleApiGatewayHandlerException(input)
case event: SqsEvent =>
println(s"Processing ${event.Records.size} record(s)")
event.Records.zipWithIndex.foreach { (record, index) =>
try {
val recordJson = record.maybeParseBodyAsJson
recordJson.flatMap(maybeFunctionName).match {
case Some(functionName) =>
sqsEventHandlersMap
.get(functionName)
.flatMap { handler =>
handler.handleRecord(getFunctionInput(record))
}
.getOrElse(
throw UnsupportedEventError(write(event))
)
case None =>
if (sqsEventHandlersMap.size == 1)
then
sqsEventHandlersMap.head._2
.handleRecord(record)
.getOrElse(
throw UnsupportedEventError(write(event))
)
else
throw UnsupportedEventError(
"Ambiguous SQS event cannot be processed because 'handler' parameter is missing. Add \"handler\":\"{sqsHandlerName}\" field to your record body."
)
}
} catch handleSqsEventHandlerException(input)
}
"" // returning empty string always
case event: ujson.Value =>
try {
maybeFunctionName(event).match {
case Some(functionName) =>
genericEventHandlersMap
.get(functionName)
.flatMap(_.handleEvent(getFunctionInput(event)))
.getOrElse(
throw UnsupportedEventError(write(event))
)
case None =>
if (genericEventHandlersMap.size == 1)
then
genericEventHandlersMap.head._2
.handleEvent(event)
.getOrElse(
throw UnsupportedEventError(write(event))
)
else
throw UnsupportedEventError(
"Ambiguous generic event cannot be processed because 'handler' parameter is missing. Add \"handler\":\"{genericHandlerName}\" field to your event body."
)
}
} catch handleGenericEventHandlerException(input)
case unsupported: String =>
handleUnsupportedInputType(unsupported)
}
def handleUnsupportedInputType(input: String)(using
lambdaContext: LambdaContext
): String =
throw UnsupportedRequestError(input)
/** Provide custom ApiGateway error handling implementation here. */
def handleApiGatewayHandlerException(input: String)(
exception: Throwable
)(using lambdaContext: LambdaContext): String = exception match {
case e: ApiGatewayRequestBodyParseException =>
ApiGatewayResponse(
statusCode = 400,
body = "Invalid request body.",
headers = Map("Content-Type" -> "text/plain"),
isBase64Encoded = false
).writeAsString
case e =>
ApiGatewayResponse(
statusCode = 502,
body = s"""Request ID: ${lambdaContext.requestId}
|
|Exception
|------------------
|${e.getClass().getName()}
|${e.getMessage()}${e
.getStackTrace()
.take(10)
.mkString("\n")}
|
|Input
|------------------
|$input
|""".stripMargin,
headers = Map("Content-Type" -> "text/plain"),
isBase64Encoded = false
).writeAsString
}
def handleSqsEventHandlerException(input: String)(
e: Throwable
)(using lambdaContext: LambdaContext): Unit =
println(s"""Request ID: ${lambdaContext.requestId}
|
|Exception
|------------------
|${e.getClass().getName()}
|${e.getMessage()}${e
.getStackTrace()
.take(10)
.mkString("\n")}
|
|Input
|------------------
|$input
|""".stripMargin)
def handleGenericEventHandlerException(input: String)(
exception: Throwable
)(using lambdaContext: LambdaContext): String =
new Error(
errorCode = Error.getErrorCode(exception),
errorMessage = exception.getMessage()
).writeAsString
/** Simple and effective method to decide on the type of the event. */
private inline def parseInput(input: String) =
if (input.contains("\"Records\""))
then input.readAs[SqsEvent]
else if (input.contains("\"httpMethod\""))
then input.readAs[ApiGatewayRequest]
else
try (ujson.read(input))
catch {
case e => input
}
private inline def maybeFunctionName(json: ujson.Value): Option[String] =
json
.maybeString("functionName")
.orElse(json.maybeString("handlerName"))
.orElse(json.maybeString("handler"))
.orElse(json.maybeString("function"))
private inline def getFunctionInput(event: ujson.Value): ujson.Value =
event
.get("functionInputParts")
.orElse(event.get("handlerInputParts"))
.match {
case Some(ujson.Arr(values)) =>
values.foldLeft(ujson.Obj()) { (a, v) =>
v match {
case Obj(newFields) => Obj.from(a.value ++ newFields)
case other =>
throw new Exception(
s"Expected functionInputParts array to contain only objects, but got ${other.getClass().getSimpleName()} $other"
)
}
}
case Some(value) => value
case None =>
event
.get("functionInput")
.orElse(event.get("handlerInput"))
.getOrElse(event)
}
private inline def getFunctionInput(record: SqsEvent.Record): SqsEvent.Record =
record.copy(body =
record.body.maybeReadAsJson
.map(e => getFunctionInput(e).writeAsString)
.getOrElse(record.body)
)
}