diff --git a/miscellaneous/message-queue-trigger/nats-jetstream/README.md b/miscellaneous/message-queue-trigger/nats-jetstream/README.md new file mode 100644 index 0000000..172ec1d --- /dev/null +++ b/miscellaneous/message-queue-trigger/nats-jetstream/README.md @@ -0,0 +1,77 @@ +# Message Queue Trigger Demonstration - NATS Jetstream + +## Create Nats Jetstream server +``` +kubectl apply -f jetstream-server.yaml +``` +## Create Producer + +``` +fission environment create --name go --image fission/go-env-1.16 --builder fission/go-builder-1.16 +fission fn create --name producer --env go --src "producer/*" --entrypoint Handler +``` + +## Create Fission function +``` +fission fn create --name helloworld --env go --src hello.go --entrypoint Handler +``` + +## Create Fission trigger + +``` +fission mqt create --name jetstreamtest --function helloworld --mqtype nats-jetstream --mqtkind keda --topic input.created --resptopic output.response-topic --errortopic erroutput.error-topic --maxretries 3 --metadata stream=input --metadata fissionConsumer=fission_consumer --metadata natsServerMonitoringEndpoint=nats-jetstream.default.svc.cluster.local:8222 --metadata natsServer=nats://nats-jetstream.default.svc.cluster.local:4222 --metadata consumer=fission_consumer +``` +## Run the producer +``` +fission fn test --name=producer +``` +### Sample Output +``` +Order with OrderID:1 has been published +Order with OrderID:2 has been published +Order with OrderID:3 has been published +Successfully sent to request-topic +``` + +## Check logs +To verify the status of trigger, we can- + +- check for logs in the fission helloworld function's pod + +``` +$ fission fn pod --name=helloworld +NAME NAMESPACE READY STATUS IP EXECUTORTYPE MANAGED +poolmgr-go-default-6312601-6d6b85ff4f-b8m7g fission-function 2/2 Running 10.244.0.188 poolmgr false +``` +or + +``` +$ kubectl -n fission-function get pod -l functionName=helloworld +NAME READY STATUS RESTARTS AGE +poolmgr-go-default-6312601-6d6b85ff4f-b8m7g 2/2 Terminating 0 30m +``` +### sample output + +``` +$ kubectl -n fission-function logs -f -c go poolmgr-go-default-6312601-6d6b85ff4f-b8m7g +2022/08/24 06:16:17 listening on 8888 ... +2022/08/24 06:42:23 specializing ... +2022/08/24 06:42:23 loading plugin from /userfunc/deployarchive/helloworld-eb3f240a-d6bb-4728-b806-f426ce0e255a-vyh8tf-oa1sgs +2022/08/24 06:42:23 done +Hello Test1 +Hello Test2 +Hello Test3 +``` + +- check jetstream pods logs- + +``` +$ kubectl logs deploy/jetstreamtest +{"level":"info","ts":1661322333.8198879,"caller":"app/main.go:90","msg":"Done processing message","messsage":"Hello Test1"} +{"level":"info","ts":1661322333.8208282,"caller":"app/main.go:90","msg":"Done processing message","messsage":"Hello Test2"} +{"level":"info","ts":1661322333.8217056,"caller":"app/main.go:90","msg":"Done processing message","messsage":"Hello Test3"} +``` + +NOTE: +- Jetstream connector creates a push based subscriber to get the data. Make sure the `consumer` provided in `mqt` is of type pull. Also if the consumer is not present connector will itself create the it. +- The connector needs all the stream mentioned(topic,respTopic,errTopic streams) to be present otherwise it will fail. For this example we have created all these streams in producer function. So before pusblisher publishes the messages it also creates the required stream if not present. \ No newline at end of file diff --git a/miscellaneous/message-queue-trigger/nats-jetstream/hello.go b/miscellaneous/message-queue-trigger/nats-jetstream/hello.go new file mode 100644 index 0000000..4a32c70 --- /dev/null +++ b/miscellaneous/message-queue-trigger/nats-jetstream/hello.go @@ -0,0 +1,28 @@ +package main + +import ( + "fmt" + "io" + "net/http" +) + +// Handler is the entry point for this fission function +func Handler(w http.ResponseWriter, r *http.Request) { // nolint:unused,deadcode + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "Error reading request body", + http.StatusInternalServerError) + } + results := string(body) + fmt.Println("Hello: ", results) + _, err = w.Write([]byte("Hello " + results)) + if err != nil { + http.Error(w, "Error writing response", http.StatusInternalServerError) + } +} + +// ErrorHandler is the entry point for this fission function +func ErrorHandler(w http.ResponseWriter, r *http.Request) { // nolint:unused,deadcode + + http.Error(w, "Error reading request body", http.StatusBadRequest) +} diff --git a/miscellaneous/message-queue-trigger/nats-jetstream/jetstream-server.yaml b/miscellaneous/message-queue-trigger/nats-jetstream/jetstream-server.yaml new file mode 100644 index 0000000..45043f2 --- /dev/null +++ b/miscellaneous/message-queue-trigger/nats-jetstream/jetstream-server.yaml @@ -0,0 +1,45 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: nats-jetstream-deployment + labels: + app: nats-jetstream +spec: + replicas: 1 + selector: + matchLabels: + app: nats-jetstream + template: + metadata: + labels: + app: nats-jetstream + spec: + containers: + - name: nats-jetstream + # docker run --network host -p 4222:4222 nats -js -m 8222 + image: nats:latest + args: ["-js","-m", "8222" ] +--- +apiVersion: v1 +kind: Service +metadata: + name: nats-jetstream + labels: + app: nats-jetstream +spec: + selector: + app: nats-jetstream + clusterIP: None + ports: + - name: client + port: 4222 + - name: cluster + port: 6222 + - name: monitor + port: 8222 + - name: metrics + port: 7777 + - name: leafnodes + port: 7422 + - name: gateways + port: 7522 \ No newline at end of file diff --git a/miscellaneous/message-queue-trigger/nats-jetstream/producer/go.mod b/miscellaneous/message-queue-trigger/nats-jetstream/producer/go.mod new file mode 100644 index 0000000..e28381a --- /dev/null +++ b/miscellaneous/message-queue-trigger/nats-jetstream/producer/go.mod @@ -0,0 +1,10 @@ +module github.com/fission/mqtrigger + +go 1.12 + +require ( + github.com/golang/protobuf v1.5.2 // indirect + github.com/nats-io/nats-server/v2 v2.8.4 // indirect + github.com/nats-io/nats.go v1.16.0 + google.golang.org/protobuf v1.28.1 // indirect +) diff --git a/miscellaneous/message-queue-trigger/nats-jetstream/producer/go.sum b/miscellaneous/message-queue-trigger/nats-jetstream/producer/go.sum new file mode 100644 index 0000000..2eafef3 --- /dev/null +++ b/miscellaneous/message-queue-trigger/nats-jetstream/producer/go.sum @@ -0,0 +1,58 @@ +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4= +github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I= +github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= +github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBriPUtluB4= +github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4= +github.com/nats-io/nats.go v1.15.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.16.0 h1:zvLE7fGBQYW6MWaFaRdsgm9qT39PJDQoju+DS8KsO1g= +github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd h1:XcWmESyNjXJMLahc3mqVQJcgSTDxFxhETVlfk9uGc38= +golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220111092808-5a964db01320 h1:0jf+tOCoZ3LyutmCOWpVni1chK4VfFLhRsDK7MhqGRY= +golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M= +golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= diff --git a/miscellaneous/message-queue-trigger/nats-jetstream/producer/main.go b/miscellaneous/message-queue-trigger/nats-jetstream/producer/main.go new file mode 100644 index 0000000..785a91d --- /dev/null +++ b/miscellaneous/message-queue-trigger/nats-jetstream/producer/main.go @@ -0,0 +1,111 @@ +package main + +import ( + "fmt" + "log" + "net/http" + "os" + "strconv" + + "github.com/nats-io/nats.go" +) + +const ( + streamName = "input" + streamSubjects = "input.*" + subjectName = "input.created" + responseStreamName = "output" + responseStreamSubject = "output.response-topic" + errorStreamName = "erroutput" + errorstreamSubjects = "erroutput.error-topic" +) + +// Handler is the entry point for this fission function +func Handler(w http.ResponseWriter, r *http.Request) { // nolint:unused,deadcode + + // Connect to NATS + host := "nats://nats-jetstream.default.svc.cluster.local:4222" + + nc, err := nats.Connect(host) + if err != nil { + w.Write([]byte(fmt.Sprintf("error connecting to host: %v", err.Error()))) + return + } + // Creates JetStreamContext + js, err := nc.JetStream() + if err != nil { + w.Write([]byte(fmt.Sprintf("error getting context: %v", err.Error()))) + return + } + + // Creates stream + err = createStream(js, streamName, streamSubjects) + if err != nil { + w.Write([]byte(fmt.Sprintf("error create stream: %v", err.Error()))) + return + } + + // Creates stream + err = createStream(js, responseStreamName, responseStreamSubject) + if err != nil { + w.Write([]byte(fmt.Sprintf("error create stream: %v", err.Error()))) + return + } + + // create output & err stream + err = createStream(js, errorStreamName, errorstreamSubjects) + if err != nil { + w.Write([]byte(fmt.Sprintf("error create stream: %v", err.Error()))) + return + } + + // Create records by publishing messages + err = publishdata(w, js) + if err != nil { + w.Write([]byte(fmt.Sprintf("error in publishing stream: %v", err.Error()))) + return + } + fmt.Println("Published all the messages") + + w.Write([]byte("Successfully sent to request-topic")) +} + +// publishdata publishes data to input stream +func publishdata(w http.ResponseWriter, js nats.JetStreamContext) error { + + no, err := strconv.Atoi(os.Getenv("COUNT")) + if err != nil { + log.Println("invalid count provided. Err: ", err) + no = 3 + err = nil + } + for i := 1; i <= no; i++ { + _, err := js.Publish(subjectName, []byte("Test"+strconv.Itoa(i))) + if err != nil { + log.Println("Error found: ", err) + return err + } + w.Write([]byte(fmt.Sprintf("Order with OrderID:%d has been published\n", i))) + } + return nil +} + +// createStream creates a stream by using JetStreamContext +func createStream(js nats.JetStreamContext, streamName, streamSubjects string) error { + stream, err := js.StreamInfo(streamName) + if err != nil { + log.Println(err) + err = nil + } + if stream == nil { + log.Printf("creating stream %q and subjects %q", streamName, streamSubjects) + _, err = js.AddStream(&nats.StreamConfig{ + Name: streamName, + Subjects: []string{streamSubjects}, + }) + if err != nil { + return err + } + } + return nil +}