11# Demo: Kafka Connect Sink
22## Install Connector
3- Send a request to the Kafka Connect REST API to configure it to use Kafka Connect Redis:
3+ Send a request to the Kafka Connect REST API to configure it to use Kafka Connect Redis.
44
5- ### Avro
6- ** IMPORTANT:** The Avro demo utilizes multiple topics in order to work around [ a bug in the Avro console producer] ( https://github.com/confluentinc/schema-registry/issues/898 ) . A fix has been merged but Confluent has not published a new Docker image for it yet (6.1.0+). Kafka Connect Redis works with Avro on a single topic; this is just a problem with the console producer provided by Confluent.
5+ First, expose the Kafka Connect server:
6+ ``` bash
7+ kubectl -n kcr-demo port-forward service/kafka-connect :rest
8+ ```
9+
10+ Kubectl will choose an available port for you that you will need to use for the cURLs (` $PORT ` ).
711
12+ ### Avro
813``` bash
914curl --request POST \
10- --url " $( minikube -n kcr-demo service kafka-connect --url ) /connectors" \
15+ --url " localhost: $PORT /connectors" \
1116 --header ' content-type: application/json' \
1217 --data ' {
1318 "name": "demo-redis-sink-connector",
1419 "config": {
1520 "connector.class": "io.github.jaredpetersen.kafkaconnectredis.sink.RedisSinkConnector",
1621 "key.converter": "io.confluent.connect.avro.AvroConverter",
1722 "key.converter.schema.registry.url": "http://kafka-schema-registry:8081",
23+ "key.converter.key.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
1824 "value.converter": "io.confluent.connect.avro.AvroConverter",
1925 "value.converter.schema.registry.url": "http://kafka-schema-registry:8081",
26+ "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
2027 "tasks.max": "3",
21- "topics": "redis.commands.set,redis.commands.expire,redis.commands.expireat,redis.commands.pexpire,redis.commands.sadd,redis.commands.geoadd,redis.commands.arbitrary ",
28+ "topics": "redis.commands",
2229 "redis.uri": "redis://IEPfIr0eLF7UsfwrIlzy80yUaBG258j9@redis-cluster",
2330 "redis.cluster.enabled": true
2431 }
@@ -28,7 +35,7 @@ curl --request POST \
2835### Connect JSON
2936``` bash
3037curl --request POST \
31- --url " $( minikube -n kcr-demo service kafka-connect --url ) /connectors" \
38+ --url " localhost: $PORT /connectors" \
3239 --header ' content-type: application/json' \
3340 --data ' {
3441 "name": "demo-redis-sink-connector",
@@ -48,17 +55,18 @@ curl --request POST \
4855### Avro
4956Create an interactive ephemeral query pod:
5057``` bash
51- kubectl -n kcr-demo run -it --rm kafka-write-records --image confluentinc/cp-schema-registry:6.0 .0 --command /bin/bash
58+ kubectl -n kcr-demo run -it --rm kafka-write-records --image confluentinc/cp-schema-registry:6.1 .0 --command /bin/bash
5259```
5360
54- Write records to the ` redis.commands ` topics :
61+ Write records to the ` redis.commands ` topic :
5562
5663``` bash
5764kafka-avro-console-producer \
5865 --broker-list kafka-broker-0.kafka-broker:9092 \
5966 --property schema.registry.url=' http://kafka-schema-registry:8081' \
67+ --property value.subject.name.strategy=' io.confluent.kafka.serializers.subject.TopicRecordNameStrategy' \
6068 --property value.schema=' {"namespace":"io.github.jaredpetersen.kafkaconnectredis","name":"RedisSetCommand","type":"record","fields":[{"name":"key","type":"string"},{"name":"value","type":"string"},{"name":"expiration","type":["null",{"name":"RedisSetCommandExpiration","type":"record","fields":[{"name":"type","type":{"name":"RedisSetCommandExpirationType","type":"enum","symbols":["EX","PX","KEEPTTL"]}},{"name":"time","type":["null","long"]}]}],"default":null},{"name":"condition","type":["null",{"name":"RedisSetCommandCondition","type":"enum","symbols":["NX","XX","KEEPTTL"]}],"default":null}]}' \
61- --topic redis.commands.set
69+ --topic redis.commands
6270> {" key" :" {user.1}.username" ," value" :" jetpackmelon22" ," expiration" :null," condition" :null}
6371> {" key" :" {user.2}.username" ," value" :" anchorgoat74" ," expiration" :{" io.github.jaredpetersen.kafkaconnectredis.RedisSetCommandExpiration" :{" type" :" EX" ," time" :{" long" :2100}}}," condition" :{" io.github.jaredpetersen.kafkaconnectredis.RedisSetCommandCondition" :" NX" }}
6472> {" key" :" product.milk" ," value" :" $2 .29" ," expiration" :null," condition" :null}
@@ -70,35 +78,39 @@ kafka-avro-console-producer \
7078kafka-avro-console-producer \
7179 --broker-list kafka-broker-0.kafka-broker:9092 \
7280 --property schema.registry.url=' http://kafka-schema-registry:8081' \
81+ --property value.subject.name.strategy=' io.confluent.kafka.serializers.subject.TopicRecordNameStrategy' \
7382 --property value.schema=' {"namespace":"io.github.jaredpetersen.kafkaconnectredis","name":"RedisExpireCommand","type":"record","fields":[{"name":"key","type":"string"},{"name":"seconds","type":"long"}]}' \
74- --topic redis.commands.expire
83+ --topic redis.commands
7584> {" key" :" product.milk" ," seconds" :1800}
7685```
7786
7887``` bash
7988kafka-avro-console-producer \
8089 --broker-list kafka-broker-0.kafka-broker:9092 \
8190 --property schema.registry.url=' http://kafka-schema-registry:8081' \
91+ --property value.subject.name.strategy=' io.confluent.kafka.serializers.subject.TopicRecordNameStrategy' \
8292 --property value.schema=' {"namespace":"io.github.jaredpetersen.kafkaconnectredis","name":"RedisExpireatCommand","type":"record","fields":[{"name":"key","type":"string"},{"name":"timestamp","type":"long"}]}' \
83- --topic redis.commands.expireat
93+ --topic redis.commands
8494> {" key" :" product.bread" ," timestamp" :4130464553}
8595```
8696
8797``` bash
8898kafka-avro-console-producer \
8999 --broker-list kafka-broker-0.kafka-broker:9092 \
90100 --property schema.registry.url=' http://kafka-schema-registry:8081' \
101+ --property value.subject.name.strategy=' io.confluent.kafka.serializers.subject.TopicRecordNameStrategy' \
91102 --property value.schema=' {"namespace":"io.github.jaredpetersen.kafkaconnectredis","name":"RedisPexpireCommand","type":"record","fields":[{"name":"key","type":"string"},{"name":"milliseconds","type":"long"}]}' \
92- --topic redis.commands.pexpire
103+ --topic redis.commands
93104> {" key" :" product.waffles" ," milliseconds" :1800000}
94105```
95106
96107``` bash
97108kafka-avro-console-producer \
98109 --broker-list kafka-broker-0.kafka-broker:9092 \
99110 --property schema.registry.url=' http://kafka-schema-registry:8081' \
111+ --property value.subject.name.strategy=' io.confluent.kafka.serializers.subject.TopicRecordNameStrategy' \
100112 --property value.schema=' {"namespace":"io.github.jaredpetersen.kafkaconnectredis","name":"RedisSaddCommand","type":"record","fields":[{"name":"key","type":"string"},{"name":"values","type":{"type":"array","items":"string"}}]}' \
101- --topic redis.commands.sadd
113+ --topic redis.commands
102114> {" key" :" {user.1}.interests" ," values" :[" reading" ]}
103115> {" key" :" {user.2}.interests" ," values" :[" sailing" ," woodworking" ," programming" ]}
104116```
@@ -107,17 +119,19 @@ kafka-avro-console-producer \
107119kafka-avro-console-producer \
108120 --broker-list kafka-broker-0.kafka-broker:9092 \
109121 --property schema.registry.url=' http://kafka-schema-registry:8081' \
122+ --property value.subject.name.strategy=' io.confluent.kafka.serializers.subject.TopicRecordNameStrategy' \
110123 --property value.schema=' {"namespace":"io.github.jaredpetersen.kafkaconnectredis","name":"RedisGeoaddCommand","type":"record","fields":[{"name":"key","type":"string"},{"name":"values","type":{"type":"array","items":{"name":"RedisGeoaddCommandGeolocation","type":"record","fields":[{"name":"longitude","type":"double"},{"name":"latitude","type":"double"},{"name":"member","type":"string"}]}}}]}' \
111- --topic redis.commands.geoadd
124+ --topic redis.commands
112125> {" key" :" Sicily" ," values" :[{" longitude" :13.361389," latitude" :13.361389," member" :" Palermo" },{" longitude" :15.087269," latitude" :37.502669," member" :" Catania" }]}
113126```
114127
115128``` bash
116129kafka-avro-console-producer \
117130 --broker-list kafka-broker-0.kafka-broker:9092 \
118131 --property schema.registry.url=' http://kafka-schema-registry:8081' \
132+ --property value.subject.name.strategy=' io.confluent.kafka.serializers.subject.TopicRecordNameStrategy' \
119133 --property value.schema=' {"namespace":"io.github.jaredpetersen.kafkaconnectredis","name":"RedisArbitraryCommand","type":"record","fields":[{"name":"command","type":"string"},{"name":"arguments","type":{"type":"array","items":"string"}}]}' \
120- --topic redis.commands.arbitrary
134+ --topic redis.commands
121135> {" command" :" TS.CREATE" ," arguments" :[" temperature:3:11" , " RETENTION" , " 60" , " LABELS" , " sensor_id" , " 2" , " area_id" , " 32" ]}
122136> {" command" :" TS.ADD" ," arguments" :[" temperature:3:11" , " 1548149181" , " 30" ]}
123137> {" command" :" TS.ADD" ," arguments" :[" temperature:3:11" , " 1548149191" , " 42" ]}
@@ -126,7 +140,7 @@ kafka-avro-console-producer \
126140### Connect JSON
127141Create an interactive ephemeral query pod:
128142``` bash
129- kubectl -n kcr-demo run -it --rm kafka-write-records --image confluentinc/cp-kafka:6.0 .0 --command /bin/bash
143+ kubectl -n kcr-demo run -it --rm kafka-write-records --image confluentinc/cp-kafka:6.1 .0 --command /bin/bash
130144```
131145
132146Write records to the ` redis.commands ` topic:
0 commit comments