Skip to content

Commit bef51d5

Browse files
committed
wip examples 23.4 fixed consumer
1 parent 6070f19 commit bef51d5

File tree

4 files changed

+19
-16
lines changed

4 files changed

+19
-16
lines changed

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ To run `OKafka application` against Oracle Database, a database user must be cre
2222

2323
```roomsql
2424
create user <user> identified by <password>
25-
GRANT CONNECT , RESOURCE to user;
25+
GRANT AQ_USER_ROLE to user;
26+
GRANT CONNECT, RESOURCE, unlimited tablespace to user;
2627
GRANT EXECUTE on DBMS_AQ to user;
2728
GRANT EXECUTE on DBMS_AQADM to user;
2829
GRANT EXECUTE on DBMS_AQIN to user;
@@ -33,7 +34,8 @@ GRANT SELECT on GV_$INSTANCE to user;
3334
GRANT SELECT on GV_$LISTENER_NETWORK to user;
3435
GRANT SELECT on GV_$PDBS to user;
3536
GRANT SELECT on USER_QUEUE_PARTITION_ASSIGNMENT_TABLE to user;
36-
exec DBMS_AQADM.GRANT_PRIV_FOR_RM_PLAN('user');
37+
GRANT SELECT on SYS.DBA_RSRC_PLAN_DIRECTIVES to user;
38+
EXEC DBMS_AQADM.GRANT_PRIV_FOR_RM_PLAN('user');
3739
```
3840

3941
Once user is created and above privileges are granted, connect to Oracle Database as this user and create a Transactional Event Queue using below PL/SQL script. One can also use `KafkaAdmin` interface as shown in `CreateTopic.java` in `examples` directory to create a Transactional Event Queue.

examples/consumer/src/main/java/org/oracle/okafka/examples/ConsumerOKafka.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
public class ConsumerOKafka {
2323
public static void main(String[] args) {
24-
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG");
24+
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "INFO");
2525

2626
// Get application properties
2727
Properties appProperties = null;
@@ -43,20 +43,21 @@ public static void main(String[] args) {
4343
KafkaConsumer<String , String> consumer = new KafkaConsumer<>(appProperties);
4444
consumer.subscribe(Arrays.asList(topic));
4545

46-
while(true) {
46+
4747
try {
48-
ConsumerRecords <String, String> records = consumer.poll(Duration.ofMillis(10000));
48+
while(true) {
49+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
4950

50-
for (ConsumerRecord<String, String> record : records)
51-
System.out.printf("partition = %d, offset = %d, key = %d, value =%s\n ", record.partition(), record.offset(), record.key(), record.value());
51+
for (ConsumerRecord<String, String> record : records)
52+
System.out.printf("partition = %d, offset = %d, key = %s, value =%s\n ", record.partition(), record.offset(), record.key(), record.value());
5253

53-
if(records != null && records.count() > 0) {
54-
System.out.println("Committing records" + records.count());
55-
consumer.commitSync();
56-
}
57-
else {
58-
System.out.println("No Record Fetched. Retrying in 1 second");
59-
Thread.sleep(1000);
54+
if (records != null && records.count() > 0) {
55+
System.out.println("Committing records" + records.count());
56+
consumer.commitSync();
57+
} else {
58+
System.out.println("No Record Fetched. Retrying in 1 second");
59+
Thread.sleep(1000);
60+
}
6061
}
6162
}catch(Exception e)
6263
{
@@ -66,7 +67,7 @@ public static void main(String[] args) {
6667
finally {
6768
consumer.close();
6869
}
69-
}
70+
7071
}
7172

7273
private static java.util.Properties getProperties() throws IOException {

examples/consumer/src/main/resources/config.properties

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
#bootstrap.servers=<server address:server port>
66
#oracle.service.name=<oracle database service>
77
#oracle.net.tns_admin=<location of ojdbc.properties file>
8-
security.protocol="PLAINTEXT"
98
bootstrap.servers=84.235.172.160:1521
109
oracle.service.name="freepdb1"
1110
oracle.net.tns_admin=/Users/pasimoes/Work/Oracle/Code/okafka/23.4/okafka/examples

examples/producer/src/main/java/org/oracle/okafka/examples/ProducerOKafka.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.oracle.okafka.examples;
99

1010
import org.oracle.okafka.clients.producer.KafkaProducer;
11+
1112
import org.apache.kafka.common.header.internals.RecordHeader;
1213
import org.apache.kafka.clients.producer.Producer;
1314
import org.apache.kafka.clients.producer.ProducerRecord;

0 commit comments

Comments
 (0)