Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
943c016
add aws-msk-iam-sasl-signer-go as requirement
qswinson Aug 18, 2025
4eeaa51
Add MSK IAM auth for control plane
qswinson Aug 18, 2025
aae9ec0
Add MSK IAM auth for data plane
qswinson Aug 18, 2025
e781313
Update proto/contract
qswinson Aug 18, 2025
487d0dc
fix compile error
qswinson Aug 20, 2025
c696d5f
add support for assuming an AWS IAM role to connect to MSK
qswinson Sep 4, 2025
095833c
Merge remote-tracking branch 'source-origin/main' into aws-msk-iam
qswinson Sep 4, 2025
3b0e459
remove change from beta api
qswinson Sep 4, 2025
491975e
more generated files
qswinson Sep 4, 2025
f7f07a7
simplify data-plane changes
qswinson Sep 4, 2025
ccac58d
fix lint
qswinson Sep 4, 2025
56f5548
second pass at generated code
qswinson Sep 4, 2025
c5f9f7a
docs
qswinson Sep 4, 2025
4e12348
use correct protoc version
qswinson Sep 5, 2025
598e215
Merge remote-tracking branch 'source-origin/main' into aws-msk-iam
qswinson Sep 9, 2025
f1edfb5
fix unit tests
qswinson Sep 9, 2025
4c5a285
fix formatting
qswinson Sep 9, 2025
0dff494
remove debug
qswinson Sep 17, 2025
6370dba
code review refactor and unit tests
qswinson Sep 19, 2025
43b2924
Merge remote-tracking branch 'source-origin/main' into aws-msk-iam
qswinson Sep 19, 2025
a751f26
code review fixes
qswinson Oct 1, 2025
55e8cde
Merge remote-tracking branch 'source-origin/main' into aws-msk-iam
qswinson Oct 1, 2025
6bed236
fix case
qswinson Oct 1, 2025
b98f8ed
remove setupEnv and cleanupEnv
qswinson Oct 7, 2025
36ea131
Merge remote-tracking branch 'source-origin/main' into aws-msk-iam
qswinson Oct 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ spec:
type: boolean
x-kubernetes-map-type: atomic
type:
description: Type of saslType, defaults to plain (vs SCRAM-SHA-512 or SCRAM-SHA-256)
description: Type of saslType, defaults to plain (vs SCRAM-SHA-512 or SCRAM-SHA-256 or OAUTHBEARER)
type: object
properties:
secretKeyRef:
Expand Down
78 changes: 58 additions & 20 deletions control-plane/pkg/apis/bindings/v1/kafka_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,22 +92,41 @@ func (kfb *KafkaBinding) Do(ctx context.Context, ps *duckv1.WithPod) {
spec.InitContainers[i].Env = append(spec.InitContainers[i].Env, corev1.EnvVar{
Name: "KAFKA_NET_SASL_ENABLE",
Value: "true",
}, corev1.EnvVar{
Name: "KAFKA_NET_SASL_USER",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: kfb.Spec.Net.SASL.User.SecretKeyRef,
},
}, corev1.EnvVar{
Name: "KAFKA_NET_SASL_PASSWORD",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: kfb.Spec.Net.SASL.Password.SecretKeyRef,
},
}, corev1.EnvVar{
Name: "KAFKA_NET_SASL_TYPE",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: kfb.Spec.Net.SASL.Type.SecretKeyRef,
},
})
if kfb.Spec.Net.SASL.User.SecretKeyRef != nil {
spec.InitContainers[i].Env = append(spec.InitContainers[i].Env, corev1.EnvVar{
Name: "KAFKA_NET_SASL_USER",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: kfb.Spec.Net.SASL.User.SecretKeyRef,
},
}, corev1.EnvVar{
Name: "KAFKA_NET_SASL_PASSWORD",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: kfb.Spec.Net.SASL.Password.SecretKeyRef,
},
})
}
if kfb.Spec.Net.SASL.TokenProvider.SecretKeyRef != nil {
spec.Containers[i].Env = append(spec.Containers[i].Env, corev1.EnvVar{
Name: "KAFKA_NET_SASL_TOKEN_PROVIDER",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: kfb.Spec.Net.SASL.TokenProvider.SecretKeyRef,
},
})
}
if kfb.Spec.Net.SASL.RoleARN.SecretKeyRef != nil {
spec.Containers[i].Env = append(spec.Containers[i].Env, corev1.EnvVar{
Name: "KAFKA_NET_SASL_ROLE_ARN",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: kfb.Spec.Net.SASL.RoleARN.SecretKeyRef,
},
})
}
}
if kfb.Spec.Net.TLS.Enable {
spec.InitContainers[i].Env = append(spec.InitContainers[i].Env, corev1.EnvVar{
Expand Down Expand Up @@ -142,22 +161,41 @@ func (kfb *KafkaBinding) Do(ctx context.Context, ps *duckv1.WithPod) {
spec.Containers[i].Env = append(spec.Containers[i].Env, corev1.EnvVar{
Name: "KAFKA_NET_SASL_ENABLE",
Value: "true",
}, corev1.EnvVar{
Name: "KAFKA_NET_SASL_USER",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: kfb.Spec.Net.SASL.User.SecretKeyRef,
},
}, corev1.EnvVar{
Name: "KAFKA_NET_SASL_PASSWORD",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: kfb.Spec.Net.SASL.Password.SecretKeyRef,
},
}, corev1.EnvVar{
Name: "KAFKA_NET_SASL_TYPE",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: kfb.Spec.Net.SASL.Type.SecretKeyRef,
},
})
if kfb.Spec.Net.SASL.User.SecretKeyRef != nil {
spec.Containers[i].Env = append(spec.Containers[i].Env, corev1.EnvVar{
Name: "KAFKA_NET_SASL_USER",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: kfb.Spec.Net.SASL.User.SecretKeyRef,
},
}, corev1.EnvVar{
Name: "KAFKA_NET_SASL_PASSWORD",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: kfb.Spec.Net.SASL.Password.SecretKeyRef,
},
})
}
if kfb.Spec.Net.SASL.TokenProvider.SecretKeyRef != nil {
spec.Containers[i].Env = append(spec.Containers[i].Env, corev1.EnvVar{
Name: "KAFKA_NET_SASL_TOKEN_PROVIDER",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: kfb.Spec.Net.SASL.TokenProvider.SecretKeyRef,
},
})
}
if kfb.Spec.Net.SASL.RoleARN.SecretKeyRef != nil {
spec.Containers[i].Env = append(spec.Containers[i].Env, corev1.EnvVar{
Name: "KAFKA_NET_SASL_ROLE_ARN",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: kfb.Spec.Net.SASL.RoleARN.SecretKeyRef,
},
})
}
}
if kfb.Spec.Net.TLS.Enable {
spec.Containers[i].Env = append(spec.Containers[i].Env, corev1.EnvVar{
Expand Down
12 changes: 11 additions & 1 deletion control-plane/pkg/apis/bindings/v1/kafka_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,17 @@ type KafkaSASLSpec struct {
// +optional
Password SecretValueFromSource `json:"password,omitempty"`

// Type of saslType, defaults to plain (vs SCRAM-SHA-512 or SCRAM-SHA-256)
// RoleARN is the Kubernetes secret containing the ARN of the IAM role to assume.
// Only used if saslType is OAUTHBEARER and tokenProvider is MSKRoleAccessTokenProvider.
// +optional
RoleARN SecretValueFromSource `json:"roleARN,omitempty"`

// Token Provider is the Kubernetes secret containing the OAUTHBEARER
// token provider function. Only used if saslType is OAUTHBEARER.
// +optional
TokenProvider SecretValueFromSource `json:"tokenProvider,omitempty"`

// Type of saslType, defaults to plain (vs SCRAM-SHA-512 or SCRAM-SHA-256 or OAUTHBEARER).
// +optional
Type SecretValueFromSource `json:"type,omitempty"`
}
Expand Down
2 changes: 2 additions & 0 deletions control-plane/pkg/apis/bindings/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 34 additions & 19 deletions control-plane/pkg/contract/contract.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions control-plane/pkg/reconciler/consumergroup/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ func hasNetSpecAuthConfig(auth *kafkainternals.Auth) bool {
auth.NetSpec.TLS.Key.SecretKeyRef != nil ||
auth.NetSpec.SASL.User.SecretKeyRef != nil ||
auth.NetSpec.SASL.Password.SecretKeyRef != nil ||
auth.NetSpec.SASL.RoleARN.SecretKeyRef != nil ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you been using this with the source or also with the broker ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have been using this only with the broker.

auth.NetSpec.SASL.TokenProvider.SecretKeyRef != nil ||
auth.NetSpec.SASL.Type.SecretKeyRef != nil)
}

Expand Down
60 changes: 60 additions & 0 deletions control-plane/pkg/security/oauth/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2025 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package oauth

import (
"os"
)

const (
saslTokenProviderKey = "tokenProvider"
saslRoleARNKey = "roleARN"
saslAWSRegion = "awsRegion"

// Environment variable names
awsRegionEnvVar = "AWS_REGION"
awsDefaultRegionEnvVar = "AWS_DEFAULT_REGION"

// Default AWS region
defaultAWSRegion = "us-east-1"

// Token provider types
mskAccessTokenProvider = "MSKAccessTokenProvider"
mskRoleAccessTokenProvider = "MSKRoleAccessTokenProvider"

// MSK IAM Auth constants
knativeEventingUserAgent = "knative-eventing"
)

func getAWSRegion(data map[string][]byte) string {
if region, ok := data[saslAWSRegion]; ok && len(region) > 0 {
return string(region)
}

awsRegion := os.Getenv(awsRegionEnvVar)
if awsRegion != "" {
return awsRegion
}

// Fallback to AWS_DEFAULT_REGION if AWS_REGION is not set
awsRegion = os.Getenv(awsDefaultRegionEnvVar)
if awsRegion != "" {
return awsRegion
}

return defaultAWSRegion
}
40 changes: 40 additions & 0 deletions control-plane/pkg/security/oauth/msk_access_token_issuer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2025 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package oauth

import (
"context"

"github.com/aws/aws-msk-iam-sasl-signer-go/signer"
)

// mskAccessTokenIssuer implements TokenIssuer for the MSK access token
type mskAccessTokenIssuer struct {
region string
}

func (m *mskAccessTokenIssuer) IssueToken(ctx context.Context) (string, error) {
token, _, err := signer.GenerateAuthToken(ctx, m.region)
return token, err
}

func NewMSKAccessTokenIssuer(data map[string][]byte) (*mskAccessTokenIssuer, error) {
region := getAWSRegion(data)
return &mskAccessTokenIssuer{
region: region,
}, nil
}
Loading
Loading