Skip to content

Commit 2d92243

Browse files
authored
AQ JMS Examples with transactional semantics (#437)
* AQ JMS Examples with atomic DML operation * Added License Text, log exceptions and remove hardcoded password.
1 parent 9f2b4a0 commit 2d92243

File tree

5 files changed

+285
-0
lines changed

5 files changed

+285
-0
lines changed

txeventq/jms-example/pom.xml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0"
2+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
4+
http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<groupId>com.oracle.jms.example</groupId>
7+
<artifactId>AQ-JMS-Examples</artifactId>
8+
<version>1.0</version>
9+
<packaging>jar</packaging>
10+
<name>AQ JMS Examples</name>
11+
<url>http://www.oracle.com</url>
12+
<dependencies>
13+
<!-- https://mvnrepository.com/artifact/com.oracle.database.messaging/aqapi-jakarta -->
14+
<dependency>
15+
<groupId>com.oracle.database.messaging</groupId>
16+
<artifactId>aqapi-jakarta</artifactId>
17+
<version>23.3.1.0</version>
18+
</dependency>
19+
<!-- https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc11 -->
20+
<dependency>
21+
<groupId>com.oracle.database.jdbc</groupId>
22+
<artifactId>ojdbc11</artifactId>
23+
<version>23.3.0.23.09</version>
24+
</dependency>
25+
<!-- https://mvnrepository.com/artifact/jakarta.transaction/jakarta.transaction-api -->
26+
<dependency>
27+
<groupId>jakarta.transaction</groupId>
28+
<artifactId>jakarta.transaction-api</artifactId>
29+
<version>2.0.1</version>
30+
</dependency>
31+
<!-- https://mvnrepository.com/artifact/jakarta.jms/jakarta.jms-api -->
32+
<dependency>
33+
<groupId>jakarta.jms</groupId>
34+
<artifactId>jakarta.jms-api</artifactId>
35+
<version>3.1.0</version>
36+
</dependency>
37+
</dependencies>
38+
</project>

txeventq/jms-example/sql/stupTxEQ.sql

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
** Setup User and Queues for AQ-JMS
3+
**
4+
** Copyright (c) 2019, 2025 Oracle and/or its affiliates.
5+
** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
6+
*/
7+
8+
-- Create database user
9+
create user aqjmsuser identified by Welcome_123#;
10+
grant connect, resource to aqjmsuser;
11+
grant execute on dbms_aq to aqjmsuser;
12+
grant execute on dbms_aqadm to aqjmsuser;
13+
grant execute on dbms_aqin to aqjmsuser;
14+
grant unlimited tablespace to aqjmsuser;
15+
16+
-- Create Transactional Event Queue TOPIC_IN and TOPIC_OUT. Add a consumer Consumer1 for both
17+
Declare
18+
subscriber sys.aq$_agent;
19+
Begin
20+
subscriber := sys.aq$_agent('Consumer1', NULL, NULL);
21+
dbms_aqadm.create_transactional_event_queue(queue_name=>'aqjmsuser.TOPIC_IN', multiple_consumers=>TRUE);
22+
dbms_aqadm.start_queue('aqjmsuser.TOPIC_IN');
23+
End;
24+
/
25+
26+
Declare
27+
subscriber sys.aq$_agent;
28+
Begin
29+
subscriber := sys.aq$_agent('Consumer1', NULL, NULL);
30+
dbms_aqadm.create_transactional_event_queue(queue_name=>'aqjmsuser.TOPIC_OUT', multiple_consumers=>TRUE);
31+
dbms_aqadm.start_queue('aqjmsuser.TOPIC_OUT');
32+
End;
33+
/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
** JMS Exactly-Once processing.
3+
**
4+
** Copyright (c) 2019, 2025 Oracle and/or its affiliates.
5+
** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
6+
*/
7+
8+
package com.oracle.jms.example.transactional;
9+
10+
public class JmsExactlyOnceConsumeProcessProduce {
11+
public static void main(String[] args) {
12+
try {
13+
oracle.jdbc.pool.OracleDataSource ods = new oracle.jdbc.pool.OracleDataSource();
14+
ods.setURL("jdbc:oracle:thin:@//<HOST>:<PORT>/<SERVICE_NAME>");
15+
ods.setUser("aqjmsuser");
16+
ods.setPassword("<PASSWORD>"); // Password for aqjmsuser
17+
jakarta.jms.TopicConnectionFactory cf = oracle.jakarta.jms.AQjmsFactory.getTopicConnectionFactory(ods);
18+
jakarta.jms.TopicConnection jmsConn = cf.createTopicConnection();
19+
jakarta.jms.Session jmsSession = jmsConn.createTopicSession(true, jakarta.jms.Session.CLIENT_ACKNOWLEDGE);
20+
jakarta.jms.Topic topicIn = jmsSession.createTopic("TOPIC_IN");
21+
jakarta.jms.MessageConsumer consumer = jmsSession.createDurableSubscriber(topicIn, "Consumer1");
22+
jakarta.jms.Topic topicOut = jmsSession.createTopic("TOPIC_OUT");
23+
jakarta.jms.MessageProducer jmsProducer = jmsSession.createProducer(topicOut);
24+
jmsConn.start();
25+
jakarta.jms.Message msgConsumed = null;
26+
try {
27+
msgConsumed = consumer.receive(); //Consume message from TOPIC_IN topic
28+
java.sql.Connection dbConn = ((oracle.jakarta.jms.AQjmsSession) jmsSession).getDBConnection();
29+
// Perform database operations for consumed message
30+
String resultMessage = processMessage(msgConsumed,dbConn);
31+
jakarta.jms.Message msgProduce = jmsSession.createTextMessage("PROCESSED:"+ resultMessage);
32+
jmsProducer.send(msgProduce); //Send message to TOPIC_OUT
33+
// Commit receive from TOPIC_IN ,Database operation and send to TOPIC_OUT
34+
jmsSession.commit();
35+
System.out.println("Successfully Consumed one message from TOPIC_OUT and produced into TOPIC_IN");
36+
37+
} catch(Exception e){
38+
System.out.println("Exception while consuming, processing or producing JMS Message: " + e);
39+
e.printStackTrace();
40+
try {
41+
if(msgConsumed != null) {
42+
jmsSession.rollback();
43+
}
44+
}catch(Exception rollbackE) {
45+
System.out.println("Exception during rollback : " + rollbackE);
46+
rollbackE.printStackTrace();
47+
}
48+
}finally {
49+
try {
50+
jmsSession.close();
51+
}catch(Exception closeE) {
52+
System.out.println("Exception while clossing JMS Session: " + closeE);
53+
closeE.printStackTrace();
54+
}
55+
try {
56+
jmsConn.close();
57+
}catch(Exception closeE) {
58+
System.out.println("Exception while clossing JMS Connection: " + closeE);
59+
closeE.printStackTrace();
60+
}
61+
}
62+
} catch (Exception e) {
63+
System.out.println("Exception while setting up JMS Consumer and JMS Producer: " + e);
64+
e.printStackTrace();
65+
}
66+
}
67+
68+
private static String processMessage(jakarta.jms.Message msg, java.sql.Connection dbConn) throws Exception
69+
{
70+
//Application specific DML Operation using dbConn.
71+
//Intentionally left blank
72+
73+
return msg.getBody(String.class);
74+
}
75+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
** JMS Transactional Consumer example
3+
**
4+
** Copyright (c) 2019, 2025 Oracle and/or its affiliates.
5+
** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
6+
*/
7+
8+
package com.oracle.jms.example.transactional;
9+
10+
public class JmsTransactionalConsumer {
11+
12+
public static void main(String[] args) {
13+
try {
14+
oracle.jdbc.pool.OracleDataSource ods = new oracle.jdbc.pool.OracleDataSource();
15+
ods.setURL("jdbc:oracle:thin:@//<HOST>:<PORT>/<SERVICE_NAME>");
16+
ods.setUser("aqjmsuser");
17+
ods.setPassword("<PASSWORD>"); // Password for aqjmsuser
18+
jakarta.jms.TopicConnectionFactory cf = oracle.jakarta.jms.AQjmsFactory.getTopicConnectionFactory(ods);
19+
jakarta.jms.TopicConnection jmsConn = cf.createTopicConnection();
20+
jakarta.jms.Session jmsSession = jmsConn.createTopicSession(true, jakarta.jms.Session.CLIENT_ACKNOWLEDGE);
21+
jakarta.jms.Topic topic = jmsSession.createTopic("TOPIC_OUT");
22+
jakarta.jms.MessageConsumer consumer = jmsSession.createDurableSubscriber(topic, "Consumer1");
23+
jmsConn.start();
24+
jakarta.jms.Message msg = null;
25+
try {
26+
//Consume message from Oracle Transactional Event Queue
27+
msg = consumer.receive();
28+
java.sql.Connection dbConn = ((oracle.jakarta.jms.AQjmsSession) jmsSession).getDBConnection();
29+
// Perform database operations
30+
processMessage(msg,dbConn);
31+
// Commit database operation and the consumption of the message
32+
jmsSession.commit();
33+
System.out.println("Successfully consumed one Message from topic TOPIC_OUT");
34+
} catch(Exception e) {
35+
System.out.println("Exception while consuming JMS Message: "+ e);
36+
e.printStackTrace();
37+
try {
38+
if(msg != null) {
39+
jmsSession.rollback();
40+
}
41+
}catch(Exception rollbackE) {
42+
System.out.println("Exception during rollback of consumed message: " + rollbackE);
43+
rollbackE.printStackTrace();
44+
}
45+
}finally {
46+
try {
47+
jmsSession.close();
48+
}catch(Exception closeE) {
49+
System.out.println("Exception while clossing JMS Session: " + closeE);
50+
closeE.printStackTrace();
51+
}
52+
try {
53+
jmsConn.close();
54+
}catch(Exception closeE) {
55+
System.out.println("Exception while clossing JMS Connection: " + closeE);
56+
closeE.printStackTrace();
57+
}
58+
}
59+
} catch (Exception e) {
60+
System.out.println("Exception while setting up JMS Consumer: " + e);
61+
e.printStackTrace();
62+
}
63+
}
64+
65+
private static void processMessage(jakarta.jms.Message msg, java.sql.Connection dbConn) throws Exception
66+
{
67+
//Application specific DML Operation
68+
//Intentionally left blank
69+
70+
}
71+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
** JMS Transactional Producer example
3+
**
4+
** Copyright (c) 2019, 2025 Oracle and/or its affiliates.
5+
** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
6+
*/
7+
8+
package com.oracle.jms.example.transactional;
9+
10+
public class JmsTransactionalProducer {
11+
public static void main(String[] args) {
12+
try {
13+
oracle.jdbc.pool.OracleDataSource ods = new oracle.jdbc.pool.OracleDataSource();
14+
ods.setURL("jdbc:oracle:thin:@//<HOST>:<PORT>/<SERVICE_NAME>");
15+
ods.setUser("aqjmsuser");
16+
ods.setPassword("<PASSWORD>"); // Password for aqjmsuser
17+
jakarta.jms.TopicConnectionFactory cf = oracle.jakarta.jms.AQjmsFactory.getTopicConnectionFactory(ods);
18+
jakarta.jms.TopicConnection jmsConn = cf.createTopicConnection();
19+
jakarta.jms.Session jmsSession = jmsConn.createTopicSession(true, jakarta.jms.Session.CLIENT_ACKNOWLEDGE);
20+
jakarta.jms.Topic topic = jmsSession.createTopic("TOPIC_IN");
21+
jakarta.jms.MessageProducer jmsProducer = jmsSession.createProducer(topic);
22+
jakarta.jms.Message msg = jmsSession.createTextMessage("JMS Test Message");
23+
// Get database connection which will be used to produce a message.
24+
java.sql.Connection dbConn = ((oracle.jakarta.jms.AQjmsSession)jmsSession).getDBConnection();
25+
try {
26+
// Perform database operations
27+
processMessage(msg, dbConn);
28+
// Send a message to Oracle Transactional Event Queue.
29+
jmsProducer.send(msg);
30+
// Commit Send and database operation
31+
jmsSession.commit();
32+
System.out.println("Successfully Produced one Message into topic TOPIC_IN");
33+
}catch(Exception e) {
34+
System.out.println("Exception while producing a message: " + e);
35+
e.printStackTrace();
36+
try {
37+
jmsSession.rollback();
38+
}catch(Exception rollbackE) {
39+
System.out.println("Exception during rollback of JMS Session. " + rollbackE);
40+
rollbackE.printStackTrace();
41+
}
42+
}finally {
43+
try {
44+
jmsSession.close();
45+
}catch(Exception closeE) {
46+
System.out.println("Exception while clossing JMS Session: " + closeE);
47+
closeE.printStackTrace();
48+
}
49+
try {
50+
jmsConn.close();
51+
}catch(Exception closeE) {
52+
System.out.println("Exception while clossing JMS Connection: " + closeE);
53+
closeE.printStackTrace();
54+
}
55+
}
56+
} catch (Exception e) {
57+
System.out.println("Exception while setting up JMS Producer" + e);
58+
e.printStackTrace();
59+
}
60+
}
61+
62+
private static void processMessage(jakarta.jms.Message msg, java.sql.Connection dbConn) throws Exception
63+
{
64+
//Application specific DML Operation
65+
//Intentionally left blank
66+
67+
}
68+
}

0 commit comments

Comments
 (0)