25
25
package org .hatdex .hat .she .models
26
26
27
27
import akka .actor .ActorSystem
28
- import akka .stream .alpakka .awslambda .scaladsl .AwsLambdaFlow
29
- import akka .stream .scaladsl .{ Sink , Source }
30
28
import akka .stream .{ ActorMaterializer , Materializer }
31
29
import io .dataswift .models .hat .EndpointDataBundle
32
30
import io .dataswift .models .hat .applications .Version
@@ -35,11 +33,11 @@ import org.hatdex.hat.api.service.RemoteExecutionContext
35
33
import org .joda .time .DateTime
36
34
import play .api .libs .json .{ Format , Json }
37
35
import play .api .{ Configuration , Logger }
38
- import software .amazon .awssdk .auth .credentials .ContainerCredentialsProvider
39
36
import software .amazon .awssdk .core .SdkBytes
40
37
import software .amazon .awssdk .regions .Region
41
38
import software .amazon .awssdk .services .lambda .LambdaAsyncClient
42
39
import software .amazon .awssdk .services .lambda .model .{ InvokeRequest , InvokeResponse }
40
+ import software .amazon .awssdk .auth .credentials .DefaultCredentialsProvider
43
41
44
42
import javax .inject .Inject
45
43
import scala .concurrent .{ ExecutionContext , Future }
@@ -166,7 +164,7 @@ class AwsLambdaExecutor @Inject() (
166
164
LambdaAsyncClient
167
165
.builder()
168
166
.region(Region .of(configuration.get[String ](" she.aws.region" )))
169
- .credentialsProvider(ContainerCredentialsProvider .builder().build ())
167
+ .credentialsProvider(DefaultCredentialsProvider .create ())
170
168
.build()
171
169
172
170
actorSystem.registerOnTermination(lambdaClient.close())
@@ -175,7 +173,38 @@ class AwsLambdaExecutor @Inject() (
175
173
request : InvokeRequest
176
174
)(implicit jsonFormatter : Format [T ]): Future [T ] =
177
175
if (mock) Future .successful(null .asInstanceOf [T ])
178
- else
176
+ else {
177
+ lambdaClient.invoke{request}.get match {
178
+ case r : InvokeResponse if r.functionError() == null =>
179
+ logger.debug(s """ Function responded with:
180
+ | Status: ${r.statusCode()}
181
+ | Body: ${r.payload().asUtf8String()}
182
+ | Logs: ${Option (r.logResult()).map(log => java.util.Base64 .getDecoder.decode(log))}
183
+ """ .stripMargin)
184
+ val jsResponse =
185
+ Json .parse(r.payload().asUtf8String()).validate[T ] recover {
186
+ case e =>
187
+ val message = s " Error parsing lambda response: $e"
188
+ logger.error(message)
189
+ logger.error(s " Unable to parse: ${r.payload().asUtf8String()}" )
190
+ throw DataFormatException (message)
191
+ }
192
+ Future (jsResponse.get)
193
+ case r : InvokeResponse if r.functionError() != null =>
194
+ val message =
195
+ s " Retrieving SHE function Response Error: ${r.functionError()}"
196
+ logger.error(message)
197
+ throw new ApiException (message)
198
+ case r =>
199
+ val message =
200
+ s " Retrieving SHE function Response FAILED: $r, ${r.payload().asUtf8String()}"
201
+ logger.error(message)
202
+ throw new ApiException (message)
203
+ }
204
+ }
205
+
206
+
207
+ /*
179
208
Source
180
209
.single(request)
181
210
.via(AwsLambdaFlow(1)(lambdaClient))
@@ -200,5 +229,6 @@ class AwsLambdaExecutor @Inject() (
200
229
s"Retrieving SHE function configuration failed: $r, ${r.payload().asUtf8String()}"
201
230
logger.error(message)
202
231
throw new ApiException(message)
203
- }
232
+ }*/
233
+
204
234
}
0 commit comments