Skip to content

Commit cafc845

Browse files
committed
add weka_api function
terraform definitions for WekaApi wip wip 2 wip 3 wip 4 wip 5 terraform deletion moved management function to be under functions folder remove unused env delete ManagementRequest, use WekaApiRequest instead weka-api lambda fix terraform-docs: automated action chore: update lambdas_version lambda dist - dev PR comments don't fetch creds on WekaApi, just pass the creds on the Payload if exist return json.RawMessage from management call (response will be parsed by client) return json.RawMessage from management call (response will be parsed by client) remove isSupportedMethod check wip wip include output wip refactor WekaApi to management package fix wip use MakeWekaApi wip use MakeWekaApi (contd.)
1 parent 90c7be9 commit cafc845

File tree

8 files changed

+230
-130
lines changed

8 files changed

+230
-130
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -801,6 +801,7 @@ The `helper_commands` part in the output provides lambda call that can be used t
801801
| [aws_lambda_function.status_lambda](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lambda_function) | resource |
802802
| [aws_lambda_function.terminate_lambda](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lambda_function) | resource |
803803
| [aws_lambda_function.transient_lambda](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lambda_function) | resource |
804+
| [aws_lambda_function.weka_api_lambda](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lambda_function) | resource |
804805
| [aws_lambda_permission.invoke_lambda_permission](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lambda_permission) | resource |
805806
| [aws_launch_template.launch_template](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/launch_template) | resource |
806807
| [aws_lb.alb](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lb) | resource |

lambdas.tf

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ locals {
55
s3_key = var.lambdas_custom_s3_key != null ? var.lambdas_custom_s3_key : "${var.lambdas_dist}/${var.lambdas_version}.zip"
66
functions = toset([
77
"deploy", "clusterize", "report", "clusterize-finalization", "status", "scale-down", "fetch", "terminate",
8-
"transient", "join-nfs-finalization"
8+
"transient", "join-nfs-finalization", "weka-api"
99
])
1010
enable_lambda_vpc = var.enable_lambda_vpc_config ? 1 : 0
1111
obs_prefix = lookup(var.custom_prefix, "obs", var.prefix)
@@ -291,6 +291,7 @@ resource "aws_lambda_function" "status_lambda" {
291291
STATE_KEY = local.state_key
292292
NFS_STATE_KEY = local.nfs_state_key
293293
CLUSTER_NAME = var.cluster_name
294+
WEKA_API_LAMBDA = aws_lambda_function.weka_api.function_name
294295
MANAGEMENT_LAMBDA = aws_lambda_function.management.function_name
295296
USERNAME_ID = aws_secretsmanager_secret.weka_username.id
296297
DEPLOYMENT_PASSWORD_ID = aws_secretsmanager_secret.weka_deployment_password.id
@@ -302,6 +303,39 @@ resource "aws_lambda_function" "status_lambda" {
302303
depends_on = [aws_cloudwatch_log_group.lambdas_log_group]
303304
}
304305

306+
resource "aws_lambda_function" "weka_api" {
307+
function_name = "${var.prefix}-${var.cluster_name}-weka-api-lambda"
308+
s3_bucket = local.s3_bucket
309+
s3_key = local.s3_key
310+
handler = local.handler_name
311+
role = local.lambda_iam_role_arn
312+
memory_size = 128
313+
timeout = 20
314+
runtime = "provided.al2"
315+
architectures = ["arm64"]
316+
dynamic "vpc_config" {
317+
for_each = range(0, local.enable_lambda_vpc)
318+
content {
319+
security_group_ids = local.sg_ids
320+
subnet_ids = local.subnet_ids
321+
}
322+
}
323+
environment {
324+
variables = {
325+
LAMBDA = "weka-api"
326+
CLUSTER_NAME = var.cluster_name
327+
MANAGEMENT_LAMBDA = aws_lambda_function.management.function_name
328+
USERNAME_ID = aws_secretsmanager_secret.weka_username.id
329+
DEPLOYMENT_PASSWORD_ID = aws_secretsmanager_secret.weka_deployment_password.id
330+
ADMIN_PASSWORD_ID = aws_secretsmanager_secret.weka_password.id
331+
USE_SECRETMANAGER_ENDPOINT = var.secretmanager_use_vpc_endpoint
332+
}
333+
}
334+
tags = var.tags_map
335+
depends_on = [aws_cloudwatch_log_group.lambdas_log_group]
336+
}
337+
338+
305339
resource "aws_lambda_function" "fetch_lambda" {
306340
function_name = "${local.lambda_prefix}-${var.cluster_name}-fetch-lambda"
307341
s3_bucket = local.s3_bucket
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package management
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"github.com/weka/go-cloud-lib/connectors"
8+
"github.com/weka/go-cloud-lib/lib/jrpc"
9+
"github.com/weka/go-cloud-lib/lib/weka"
10+
"github.com/weka/go-cloud-lib/logging"
11+
)
12+
13+
type WekaApiRequest struct {
14+
Method weka.JrpcMethod `json:"method"`
15+
Params map[string]string `json:"params"`
16+
}
17+
18+
type ManagementRequest struct {
19+
WekaApiRequest
20+
21+
Username string `json:"username"`
22+
Password string `json:"password"`
23+
BackendPrivateIps []string `json:"backend_private_ips"`
24+
}
25+
26+
func CallJRPC(ctx context.Context, request ManagementRequest) (json.RawMessage, error) {
27+
logger := logging.LoggerFromCtx(ctx)
28+
logger.Debug().Msg("CallJRPC > Start")
29+
logger.Info().Msgf("CallJRPC > method %s", request.Method)
30+
31+
var jrpcResponse json.RawMessage
32+
33+
logger.Debug().Msgf("CallJRPC > Username: %s", request.Username)
34+
jrpcBuilder := func(ip string) *jrpc.BaseClient {
35+
return connectors.NewJrpcClient(ctx, ip, weka.ManagementJrpcPort, request.Username, request.Password)
36+
}
37+
38+
ips := request.BackendPrivateIps
39+
if len(ips) == 0 {
40+
return nil, fmt.Errorf("CallJRPC - backend private ips are empty")
41+
}
42+
logger.Debug().Msgf("CallJRPC > BackendPrivateIps: %v", ips)
43+
44+
var params interface{}
45+
if request.Params != nil {
46+
params = request.Params
47+
} else {
48+
params = struct{}{}
49+
}
50+
51+
jpool := &jrpc.Pool{
52+
Ips: ips,
53+
Clients: map[string]*jrpc.BaseClient{},
54+
Active: "",
55+
Builder: jrpcBuilder,
56+
Ctx: ctx,
57+
}
58+
59+
if err := jpool.Call(request.Method, params, &jrpcResponse); err != nil {
60+
return nil, fmt.Errorf("CallJRPC - call [%s] failed > %w", request.Method, err)
61+
}
62+
return jrpcResponse, nil
63+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package weka_api
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/rs/zerolog/log"
7+
"github.com/weka/aws-tf/modules/deploy_weka/lambdas/common"
8+
"github.com/weka/aws-tf/modules/deploy_weka/lambdas/functions/management"
9+
"github.com/weka/go-cloud-lib/logging"
10+
"os"
11+
"strconv"
12+
)
13+
14+
func MakeWekaApiRequest[T any](ctx context.Context, wr *management.WekaApiRequest) (response *T, err error) {
15+
logger := logging.LoggerFromCtx(ctx)
16+
logger.Debug().Msg("MakeWekaApiRequest > Start")
17+
18+
r, err := invokeManagementLambda[T](ctx, wr)
19+
if err != nil {
20+
return nil, fmt.Errorf("MakeWekaApiRequest > lambda invocation failed: %v", err)
21+
}
22+
return r, nil
23+
}
24+
25+
func invokeManagementLambda[T any](ctx context.Context, wr *management.WekaApiRequest) (response *T, err error) {
26+
logger := logging.LoggerFromCtx(ctx)
27+
logger.Debug().Msg("invokeManagementLambda > Start")
28+
logger.Info().Msgf("invokeManagementLambda > Params: %s", wr.Params)
29+
30+
managementLambdaName := os.Getenv("MANAGEMENT_LAMBDA")
31+
if managementLambdaName == "" {
32+
return nil, fmt.Errorf("MANAGEMENT_LAMBDA is not set")
33+
}
34+
if wr.Method == "" {
35+
return nil, fmt.Errorf("weka-api method is not set")
36+
}
37+
38+
useSecretManagerEndpoint, err := strconv.ParseBool(os.Getenv("USE_SECRETMANAGER_ENDPOINT"))
39+
if err != nil {
40+
log.Warn().Msg("Failed to parse USE_SECRETMANAGER_ENDPOINT, assuming false")
41+
}
42+
var username, password string
43+
if !useSecretManagerEndpoint {
44+
log.Info().Msg("Secret manager endpoint not in use, sending credentials in body")
45+
usernameId := os.Getenv("USERNAME_ID")
46+
deploymentPasswordId := os.Getenv("DEPLOYMENT_PASSWORD_ID")
47+
adminPasswordId := os.Getenv("ADMIN_PASSWORD_ID")
48+
creds, err := common.GetDeploymentOrAdminUsernameAndPassword(usernameId, deploymentPasswordId, adminPasswordId)
49+
if err != nil {
50+
return nil, fmt.Errorf("invokeManagementLambda > GetDeploymentOrAdminUsernameAndPassword: %w", err)
51+
}
52+
53+
username = creds.Username
54+
password = creds.Password
55+
}
56+
57+
clusterName := os.Getenv("CLUSTER_NAME")
58+
if clusterName == "" {
59+
return nil, fmt.Errorf("CLUSTER_NAME is not set")
60+
}
61+
ips, err := common.GetBackendsPrivateIps(clusterName, "backend")
62+
if err != nil {
63+
return nil, fmt.Errorf("invokeManagementLambda > GetBackendsPrivateIps: %w", err)
64+
}
65+
log.Info().Msgf("invokeManagementLambda > Backend private IPs: %v", ips)
66+
67+
logger.Debug().Msgf("invokeManagementLambda > Username: %s", username)
68+
69+
managementRequest := management.ManagementRequest{
70+
WekaApiRequest: management.WekaApiRequest{
71+
Method: wr.Method,
72+
Params: wr.Params,
73+
},
74+
BackendPrivateIps: ips,
75+
Username: username,
76+
Password: password, // empty string is interpreted as no credentials
77+
}
78+
79+
response, err = common.InvokeLambdaFunction[T](managementLambdaName, managementRequest)
80+
if err != nil {
81+
wrappedError := fmt.Errorf("invokeManagementLambda >: %w", err)
82+
log.Error().Err(wrappedError).Send()
83+
}
84+
85+
return response, nil
86+
87+
}

lambdas/main.go

Lines changed: 30 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"context"
55
"fmt"
6+
"github.com/weka/go-cloud-lib/lib/weka"
67
"os"
78
"strconv"
89
"strings"
@@ -13,8 +14,10 @@ import (
1314
"github.com/weka/aws-tf/modules/deploy_weka/lambdas/common"
1415
"github.com/weka/aws-tf/modules/deploy_weka/lambdas/connectors"
1516
lambdas "github.com/weka/aws-tf/modules/deploy_weka/lambdas/functions/fetch"
17+
"github.com/weka/aws-tf/modules/deploy_weka/lambdas/functions/management"
1618
"github.com/weka/aws-tf/modules/deploy_weka/lambdas/functions/terminate"
17-
"github.com/weka/aws-tf/modules/deploy_weka/lambdas/management"
19+
"github.com/weka/aws-tf/modules/deploy_weka/lambdas/functions/weka_api"
20+
1821
"github.com/weka/go-cloud-lib/logging"
1922
"github.com/weka/go-cloud-lib/scale_down"
2023

@@ -315,55 +318,17 @@ func getClusterStatus(ctx context.Context, stateTable, stateTableHashKey, stateK
315318
return clusterStatus, nil
316319
}
317320

318-
clusterName := os.Getenv("CLUSTER_NAME")
319-
if clusterName == "" {
320-
return protocol.ClusterStatus{}, fmt.Errorf("CLUSTER_NAME is not set")
321-
}
322-
ips, err := common.GetBackendsPrivateIps(clusterName, "backend")
323-
if err != nil {
324-
return protocol.ClusterStatus{}, fmt.Errorf("getClusterStatus > GetBackendsPrivateIps: %w", err)
325-
}
326-
log.Info().Msgf("GetClusterStatus > Backend private IPs: %v", ips)
327-
328-
managementLambdaName := os.Getenv("MANAGEMENT_LAMBDA")
329-
if managementLambdaName == "" {
330-
return protocol.ClusterStatus{}, fmt.Errorf("MANAGEMENT_LAMBDA is not set")
331-
}
332-
333-
useSecretManagerEndpoint, err := strconv.ParseBool(os.Getenv("USE_SECRETMANAGER_ENDPOINT"))
334-
if err != nil {
335-
log.Warn().Msg("Failed to parse USE_SECRETMANAGER_ENDPOINT, assuming false")
336-
}
337-
var username, password string
338-
if !useSecretManagerEndpoint {
339-
log.Info().Msg("Secret manager endpoint not in use, sending credentials in body")
340-
usernameId := os.Getenv("USERNAME_ID")
341-
deploymentPasswordId := os.Getenv("DEPLOYMENT_PASSWORD_ID")
342-
adminPasswordId := os.Getenv("ADMIN_PASSWORD_ID")
343-
creds, err := common.GetDeploymentOrAdminUsernameAndPassword(usernameId, deploymentPasswordId, adminPasswordId)
344-
if err != nil {
345-
return protocol.ClusterStatus{}, fmt.Errorf("getClusterStatus > GetDeploymentOrAdminUsernameAndPassword: %w", err)
346-
}
347-
348-
username = creds.Username
349-
password = creds.Password
350-
}
351-
352-
managementRequest := management.ManagementRequest{
353-
Type: "status",
354-
WekaStatusRequest: management.WekaStatusRequest{
355-
BackendPrivateIps: ips,
356-
Username: username,
357-
Password: password, // empty string is interpreted as no credentials
358-
},
321+
wekaApiRequest := management.WekaApiRequest{
322+
Method: weka.JrpcStatus,
359323
}
360324
var wekaStatus *protocol.WekaStatus
361-
wekaStatus, err = common.InvokeLambdaFunction[protocol.WekaStatus](managementLambdaName, managementRequest)
325+
wekaStatus, err = weka_api.MakeWekaApiRequest[protocol.WekaStatus](ctx, &wekaApiRequest)
362326
if err != nil {
363-
wrappedError := fmt.Errorf("getClusterStatus > InvokeLambdaFunction: %w", err)
327+
wrappedError := fmt.Errorf("getClusterStatus > MakeWekaApiRequest: %w", err)
364328
log.Error().Err(wrappedError).Send()
365329
wekaStatus = &protocol.WekaStatus{}
366330
}
331+
367332
clusterStatus.WekaStatus = *wekaStatus
368333

369334
return clusterStatus, nil
@@ -423,30 +388,28 @@ func scaleDownHandler(ctx context.Context, info protocol.HostGroupInfoResponse)
423388
return scale_down.ScaleDown(ctx, info)
424389
}
425390

426-
func managementHandler(ctx context.Context, req management.ManagementRequest) (protocol.WekaStatus, error) {
427-
switch req.Type {
428-
case "status":
429-
useSecretManagerEndpoint, err := strconv.ParseBool(os.Getenv("USE_SECRETMANAGER_ENDPOINT"))
391+
func managementHandler(ctx context.Context, req management.ManagementRequest) (interface{}, error) {
392+
useSecretManagerEndpoint, err := strconv.ParseBool(os.Getenv("USE_SECRETMANAGER_ENDPOINT"))
393+
if err != nil {
394+
log.Warn().Msg("Failed to parse USE_SECRETMANAGER_ENDPOINT, assuming false")
395+
}
396+
if useSecretManagerEndpoint && req.Password == "" {
397+
usernameId := os.Getenv("USERNAME_ID")
398+
deploymentPasswordId := os.Getenv("DEPLOYMENT_PASSWORD_ID")
399+
adminPasswordId := os.Getenv("ADMIN_PASSWORD_ID")
400+
creds, err := common.GetDeploymentOrAdminUsernameAndPassword(usernameId, deploymentPasswordId, adminPasswordId)
430401
if err != nil {
431-
log.Warn().Msg("Failed to parse USE_SECRETMANAGER_ENDPOINT, assuming false")
402+
log.Error().Err(err).Send()
403+
return nil, err
432404
}
433-
if useSecretManagerEndpoint && req.Password == "" {
434-
usernameId := os.Getenv("USERNAME_ID")
435-
deploymentPasswordId := os.Getenv("DEPLOYMENT_PASSWORD_ID")
436-
adminPasswordId := os.Getenv("ADMIN_PASSWORD_ID")
437-
creds, err := common.GetDeploymentOrAdminUsernameAndPassword(usernameId, deploymentPasswordId, adminPasswordId)
438-
if err != nil {
439-
log.Error().Err(err).Send()
440-
return protocol.WekaStatus{}, err
441-
}
442-
req.Username = creds.Username
443-
req.Password = creds.Password
444-
}
445-
return management.GetWekaStatus(ctx, req.WekaStatusRequest)
446-
default:
447-
log.Error().Msgf("Invalid management type: %s", req.Type)
448-
return protocol.WekaStatus{}, fmt.Errorf("invalid management type: %s", req.Type)
405+
req.Username = creds.Username
406+
req.Password = creds.Password
449407
}
408+
return management.CallJRPC(ctx, req)
409+
}
410+
411+
func wekaApiHandler(ctx context.Context, req management.WekaApiRequest) (interface{}, error) {
412+
return weka_api.MakeWekaApiRequest[interface{}](ctx, &req)
450413
}
451414

452415
func main() {
@@ -473,6 +436,8 @@ func main() {
473436
lambda.Start(transientHandler)
474437
case "management":
475438
lambda.Start(managementHandler)
439+
case "weka-api":
440+
lambda.Start(wekaApiHandler)
476441
default:
477442
lambda.Start(func() error { return fmt.Errorf("unsupported lambda command") })
478443
}

0 commit comments

Comments
 (0)