Skip to content

Commit a101977

Browse files
authored
Kafka connector example (#292)
* Add example for producing and consuming to and from a Kafka topic while using the Oracle TxEventQ Sink/Source Connector. * Add to the main ReadMe file. * Modify config file. * Fix setupTeq.sql script to take in seed database as a parameter value. * Add additional parameter for specifying the pluggable database to create.
1 parent c9f0953 commit a101977

File tree

14 files changed

+534
-0
lines changed

14 files changed

+534
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ This repository stores a variety of examples demonstrating how to use the Oracle
2020
| [spatial](./spatial) | Spatial features examples |
2121
| [sql](./sql) | SQL examples |
2222
| [sqldeveloper](./sqldeveloper) | [SQL Developer](http://www.oracle.com/technetwork/developer-tools/sql-developer/) examples |
23+
| [txeventq](./txeventq) | TxEventQ examples |
2324

2425
## Documentation
2526
You can find the online documentation of the Oracle Database under [docs.oracle.com/en/database/](http://docs.oracle.com/en/database/)
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# Oracle TxEventQ Connectors
2+
3+
Repository for demonstrating how to use the Oracle TxEventQ Connectors. The repository will contain a Kafka client application
4+
that will produce or consume from a specified Kafka topic. The Kafka client can be used to produce messages to a specifed Kafka
5+
topic. While the Kafka client is producing to the Kafka topic the Oracle TxEventQ Sink Connector can run and enqueue the messages
6+
from the specified Kafka topic into the specified TxEventQ. After the Oracle TxEventQ Sink Connector has completed enqueuing
7+
messages from the Kafka topic, the Oracle TxEventQ Source Connector can be used to dequeue the messages from the specified
8+
TxEventQ. While the Oracle TxEventQ is dequeuing messages into Kafka the specified Kafka topic, the Kafka client consumer
9+
be running to consume from that Kafka topic.
10+
11+
## Getting started
12+
13+
To use the Oracle TxEventQ Connectors Kafka with a minimum version number of 3.1.0 will need to be downloaded and installed on
14+
a server. Refer to [Kafka Apache](https://kafka.apache.org/) for information on how to start Kafka. The Oracle TxEventQ
15+
Connectors requires a minimum Oracle Database version of 21c in order to create a Transactional Event Queue. Download the
16+
[Oracle TxEventQ Connector](https://mvnrepository.com/artifact/com.oracle.database.messaging/txeventq-connector) from maven.
17+
Read the following [Readme](https://github.com/oracle/okafka/tree/master/connectors) file for how to setup and use the Oracle TxEventQ Connector.
18+
19+
## Setting up database
20+
21+
Clone the project from the repository. Open a bash window and change the directory to the location where the cloned project has been saved.
22+
23+
Copy `initdb.sh.example` to `initdb.sh` and mark it as executable, i.e. `chmod a+x ./initdb.sh`
24+
25+
Modify `initdb.sh` to fill in the hostname, port, name of the CDB, service domain, sys password, user, user password, the name of the seed database, and the name of the
26+
pluggable database to create. The user and user password can be an existing user or a new user to create. To (re)initialize the database, run `./initdb.sh`
27+
28+
The initdb.sh script will create a transactional event queue with the name of **TEQ** with the required privileges that is discussed in the
29+
[Oracle TxEventQ Connector](https://mvnrepository.com/artifact/com.oracle.database.messaging/txeventq-connector) Readme. Use this queue
30+
name **TEQ** when creating the properties file for the Oracle TxEventQ Sink and Source Connector.
31+
32+
**Note**: `sqlplus` is required and must be in your path.
33+
34+
## Usage
35+
36+
In the kafka_client\src\main\resources there are two properties file a config.properties and log4j.properties file that can can
37+
be modified if required.
38+
39+
Start the Kafka broker by starting the zookeeper and Kafka server as described in the [Oracle TxEventQ Connector](https://mvnrepository.com/artifact/com.oracle.database.messaging/txeventq-connector) Readme.
40+
Create two different Kafka topics with 10 partitions one for the producer to use and one for the consumer to use.
41+
42+
If running Kafka in a Windows environment open command prompt and change to the directory where Kafka has been installed.
43+
44+
Run the following command to create a topic:
45+
46+
```bash
47+
.\bin\windows\kafka-topics.bat --create --topic <name of topic> --bootstrap-server localhost:9092 --partitions 10
48+
```
49+
50+
If running Kafka in a Linux environment open a terminal and change to the directory where Kafka has been installed.
51+
52+
Run the following command in one of the terminals to start zookeeper:
53+
54+
```bash
55+
bin/kafka-topics.sh --create --topic <name of topic> --bootstrap-server localhost:9092 --partitions 10
56+
```
57+
We will use the kafka_client to produce some messages to a specified topic and use the Oracle TxEventQ Sink Connector to enqueue
58+
the messages into the specified transactional event queue.
59+
60+
Start the Oracle TxEventQ Sink Connector. Open a command prompt and change the directory to the kafka_client directory and run the
61+
following command `./runproducer.sh <name of kafka topic> <number of messages to produce>`.
62+
63+
Next, we can use the kafka_client to consume messages from a specified topic that has been enqueued by the TxEventQ Source Connector.
64+
65+
Stop the Oracle TxEventQ Sink Connector and start the Oracle TxEventQ Source Connector. Have the Oracle TxEventQ Source Connector
66+
dequeue messages from the transactional event queue that the Oracle TxEventQ Sink Connector just enqueued into and put into the specified
67+
Kafka topic. Open a command prompt and change the directory to the kafka_client directory and run the following
68+
command `./runconsumer.sh <name of kafka topic>`. The name of the Kafka topic specified here should be the one the Source Connector is
69+
dequeing into.
70+
71+
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#!/bin/bash
2+
3+
HOST=<host>
4+
PORT=<portNum>
5+
6+
# CDB
7+
SERVICE=<serviceName>
8+
SERVICE_DOMAIN=<domain of service>
9+
10+
# sys password
11+
SYSPASSWRD=<sys password>
12+
13+
# Specify an existing user and password or a user and password that will be created.
14+
USER=<database user>
15+
USER_PWD=<database user password>
16+
17+
# Specify the seed database
18+
SEED_DB=<name of seed database>
19+
20+
# Name of the pluggable database to be created
21+
PLUGGABLE_DB=<name of pluggable database>
22+
23+
sqlplus="sqlplus sys/${SYSPASSWRD}@'(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=tcp)(HOST=${HOST})(PORT=${PORT})))(CONNECT_DATA=(SERVICE_NAME=${SERVICE}.${SERVICE_DOMAIN})(SERVER=DEDICATED)))' as sysdba @./sql/setupTeq.sql ${USER} ${USER_PWD} ${SEED_DB} ${PLUGGABLE_DB}"
24+
25+
echo quit | $sqlplus
26+
27+
sqlplusTEQ="sqlplus ${USER}/${USER_PWD}@'(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=tcp)(HOST=${HOST})(PORT=${PORT})))(CONNECT_DATA=(SERVICE_NAME=cdb1_pdb1.${SERVICE_DOMAIN})(SERVER=DEDICATED)))' @./sql/createTEQ.sql"
28+
29+
echo quit | $sqlplusTEQ
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
2+
<modelVersion>4.0.0</modelVersion>
3+
<groupId>com.oracle.kafka.teq</groupId>
4+
<artifactId>kafka_client</artifactId>
5+
<version>1.0-SNAPSHOT</version>
6+
7+
<build>
8+
<plugins>
9+
<plugin>
10+
<groupId>org.apache.maven.plugins</groupId>
11+
<artifactId>maven-compiler-plugin</artifactId>
12+
<version>3.10.1</version>
13+
<configuration>
14+
<source>17</source>
15+
<target>17</target>
16+
</configuration>
17+
</plugin>
18+
</plugins>
19+
</build>
20+
21+
<dependencies>
22+
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
23+
<dependency>
24+
<groupId>org.apache.kafka</groupId>
25+
<artifactId>kafka-clients</artifactId>
26+
<version>3.4.0</version>
27+
</dependency>
28+
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
29+
<dependency>
30+
<groupId>org.slf4j</groupId>
31+
<artifactId>slf4j-api</artifactId>
32+
<version>2.0.1</version>
33+
</dependency>
34+
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
35+
<dependency>
36+
<groupId>org.slf4j</groupId>
37+
<artifactId>slf4j-log4j12</artifactId>
38+
<version>2.0.1</version>
39+
</dependency>
40+
<!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-api -->
41+
<dependency>
42+
<groupId>org.apache.logging.log4j</groupId>
43+
<artifactId>log4j-api</artifactId>
44+
<version>2.18.0</version>
45+
</dependency>
46+
<!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core -->
47+
<dependency>
48+
<groupId>org.apache.logging.log4j</groupId>
49+
<artifactId>log4j-core</artifactId>
50+
<version>2.18.0</version>
51+
</dependency>
52+
<!-- https://mvnrepository.com/artifact/com.googlecode.json-simple/json-simple -->
53+
<dependency>
54+
<groupId>com.googlecode.json-simple</groupId>
55+
<artifactId>json-simple</artifactId>
56+
<version>1.1.1</version>
57+
</dependency>
58+
<dependency>
59+
<groupId>javax.json</groupId>
60+
<artifactId>javax.json-api</artifactId>
61+
<version>1.1.4</version>
62+
</dependency>
63+
<dependency>
64+
<groupId>com.github.javafaker</groupId>
65+
<artifactId>javafaker</artifactId>
66+
<version>1.0.2</version>
67+
</dependency>
68+
69+
</dependencies>
70+
</project>
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#!/usr/bin/env bash
2+
3+
echo compile...
4+
5+
# pass <topicName> <numOfRecsToProduce> as args
6+
7+
mvn -q clean compile exec:java \
8+
-Dexec.mainClass="com.oracle.kafka.teq.Application" \
9+
-Dexec.args="consumer $1"
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#!/usr/bin/env bash
2+
3+
echo compile...
4+
5+
# pass <topicName> <numOfRecsToProduce> as args
6+
7+
mvn -q clean compile exec:java \
8+
-Dexec.mainClass="com.oracle.kafka.teq.Application" \
9+
-Dexec.args="producer $1 $2"
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.oracle.kafka.teq;
2+
3+
import java.util.Locale;
4+
5+
public class Application {
6+
7+
public static void main(String[] args) throws Exception {
8+
String errorStr = "ERROR: Enter the first parameter as Producer or Consumer and specify the topic name as the second parameter.";
9+
10+
if (args.length < 1) {
11+
System.out.println(errorStr);
12+
return;
13+
}
14+
15+
String mode = args[0];
16+
String topicName = args[1];
17+
int produceMsgCount = 0;
18+
if (args.length > 2)
19+
produceMsgCount = Integer.parseInt(args[2]);
20+
21+
switch (mode.toLowerCase(Locale.ROOT)) {
22+
case "consumer":
23+
System.out.println("Starting the Consumer\n");
24+
new Consumer().runConsumer(topicName);
25+
break;
26+
case "producer":
27+
System.out.println("Starting the Producer\n");
28+
new Producer().runProducer(topicName, produceMsgCount);
29+
break;
30+
default:
31+
System.out.println(errorStr);
32+
}
33+
}
34+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package com.oracle.kafka.teq;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerRecord;
4+
5+
import org.apache.kafka.clients.consumer.ConsumerRecords;
6+
import org.apache.kafka.clients.consumer.KafkaConsumer;
7+
import org.apache.kafka.common.errors.WakeupException;
8+
import org.apache.log4j.Logger;
9+
import org.json.simple.JSONObject;
10+
11+
import java.util.*;
12+
import java.time.Duration;
13+
import java.util.concurrent.atomic.AtomicBoolean;
14+
15+
public class Consumer {
16+
17+
private final int BLOCK_TIMEOUT_MS = 3000;
18+
private KafkaConsumer<String, String> kafkaConsumer = null;
19+
private final AtomicBoolean closed = new AtomicBoolean(false);
20+
21+
static Logger log = Logger.getLogger(Consumer.class.getName());
22+
23+
public Consumer() throws Exception {
24+
Runtime.getRuntime().addShutdownHook(new Thread() {
25+
@Override
26+
public void run() {
27+
try {
28+
shutdown();
29+
} catch (Exception e) {
30+
e.printStackTrace();
31+
}
32+
}
33+
});
34+
}
35+
36+
/**
37+
* Retrieves a collection of ConsumerRecords from the specified topic.
38+
*
39+
* @param topicName The topic to consume from
40+
*
41+
* @throws Exception The Exception that will get thrown when an error occurs
42+
*/
43+
public void runConsumer(String topicName) throws Exception {
44+
// keep running forever or until shutdown() is called from another thread.
45+
try {
46+
initiateKafkaConsumer().subscribe(List.of(topicName));
47+
while (!closed.get()) {
48+
ConsumerRecords<String, String> records = initiateKafkaConsumer()
49+
.poll(Duration.ofMillis(BLOCK_TIMEOUT_MS));
50+
if (records.count() == 0) {
51+
log.info("No message to consume.");
52+
}
53+
54+
for (ConsumerRecord<String, String> recordConsumed : records) {
55+
HashMap<String, String> msgLogInfo = new HashMap<>();
56+
msgLogInfo.put("topic", topicName);
57+
msgLogInfo.put("key", recordConsumed.key());
58+
msgLogInfo.put("message", recordConsumed.value());
59+
log.info(new JSONObject(msgLogInfo).toJSONString());
60+
}
61+
}
62+
} catch (WakeupException e) {
63+
// Ignore exception if closing
64+
if (!closed.get())
65+
throw e;
66+
}
67+
}
68+
69+
/**
70+
* Shuts down the Kafka consumer that was initiated.
71+
*
72+
* @throws Exception The Exception that will get thrown when an error occurs
73+
*/
74+
public void shutdown() throws Exception {
75+
closed.set(true);
76+
log.info("Consumer is shutting down.");
77+
initiateKafkaConsumer().wakeup();
78+
}
79+
80+
/**
81+
* Initiates the Kafka consumer.
82+
*
83+
* @return The Kafka consumer that has been initiated.
84+
* @throws Exception The Exception that will get thrown when an error occurs
85+
*/
86+
private KafkaConsumer<String, String> initiateKafkaConsumer() throws Exception {
87+
if (this.kafkaConsumer == null) {
88+
Properties props = Utility.getProperties();
89+
this.kafkaConsumer = new KafkaConsumer<>(props);
90+
}
91+
return this.kafkaConsumer;
92+
}
93+
}

0 commit comments

Comments
 (0)