Skip to content

Commit c00fdec

Browse files
pratapihemant.patelpratapihemant.patel
pratapihemant.patel
authored and
pratapihemant.patel
committed
initial commit
1 parent 62b3ebe commit c00fdec

File tree

6 files changed

+203
-0
lines changed

6 files changed

+203
-0
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
/bin/
2+
/build/

build.gradle

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
apply plugin: 'java'
2+
apply plugin: 'eclipse'
3+
4+
jar {
5+
baseName = 'rabbitmq-sample'
6+
version = '0.0.1-SNAPSHOT'
7+
}
8+
9+
sourceCompatibility = 1.8
10+
targetCompatibility = 1.8
11+
12+
repositories {
13+
mavenCentral()
14+
}
15+
16+
dependencies {
17+
compile group: 'com.rabbitmq', name: 'amqp-client', version: '3.6.6'
18+
testCompile group: 'junit', name: 'junit', version: '4.+'
19+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.github.rabbitmq;
2+
3+
import java.io.IOException;
4+
5+
import com.rabbitmq.client.Channel;
6+
import com.rabbitmq.client.Connection;
7+
8+
public class CommonUtils {
9+
10+
public static void close(Connection connection) {
11+
12+
try {
13+
if (null != connection) {
14+
connection.close();
15+
}
16+
}
17+
catch (IOException e) {
18+
}
19+
}
20+
21+
public static void close(Channel channel) {
22+
23+
try {
24+
if (null != channel) {
25+
channel.close();
26+
}
27+
}
28+
catch (Exception e) {
29+
}
30+
}
31+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.github.rabbitmq.queue;
2+
3+
interface CommonConstants {
4+
5+
String QUEUE_NAME = "hello";
6+
String HOST_NAME = "localhost";
7+
8+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package com.github.rabbitmq.queue;
2+
3+
import java.io.IOException;
4+
import java.util.concurrent.TimeoutException;
5+
6+
import com.github.rabbitmq.CommonUtils;
7+
import com.rabbitmq.client.AMQP;
8+
import com.rabbitmq.client.Channel;
9+
import com.rabbitmq.client.Connection;
10+
import com.rabbitmq.client.ConnectionFactory;
11+
import com.rabbitmq.client.Consumer;
12+
import com.rabbitmq.client.DefaultConsumer;
13+
import com.rabbitmq.client.Envelope;
14+
15+
public class QueueReceiver implements AutoCloseable {
16+
17+
private Connection connection;
18+
private Channel channel;
19+
20+
public void run() {
21+
22+
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
23+
24+
try {
25+
initConnection();
26+
27+
receive();
28+
29+
}
30+
catch (Exception e) {
31+
e.printStackTrace();
32+
}
33+
34+
}
35+
36+
private void receive() throws IOException {
37+
38+
Consumer consumer = new DefaultConsumer(channel) {
39+
40+
@Override
41+
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
42+
byte[] body) throws IOException {
43+
44+
String message = new String(body, "UTF-8");
45+
System.out.println(" [x] Received '" + message + "'");
46+
}
47+
};
48+
49+
while (true) {
50+
channel.basicConsume(CommonConstants.QUEUE_NAME, true, consumer);
51+
}
52+
}
53+
54+
private void initConnection() throws IOException, TimeoutException {
55+
56+
ConnectionFactory factory = new ConnectionFactory();
57+
factory.setHost(CommonConstants.HOST_NAME);
58+
connection = factory.newConnection();
59+
channel = connection.createChannel();
60+
channel.queueDeclare(CommonConstants.QUEUE_NAME, false, false, false, null);
61+
}
62+
63+
@Override
64+
public void close() {
65+
66+
CommonUtils.close(channel);
67+
CommonUtils.close(connection);
68+
System.out.println("receiver connection closed");
69+
}
70+
71+
public static void main(String[] args) {
72+
73+
try (QueueReceiver receiver = new QueueReceiver()) {
74+
receiver.run();
75+
}
76+
}
77+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package com.github.rabbitmq.queue;
2+
3+
import java.io.IOException;
4+
import java.io.UnsupportedEncodingException;
5+
import java.util.Scanner;
6+
import java.util.concurrent.TimeoutException;
7+
8+
import com.github.rabbitmq.CommonUtils;
9+
import com.rabbitmq.client.Channel;
10+
import com.rabbitmq.client.Connection;
11+
import com.rabbitmq.client.ConnectionFactory;
12+
13+
public class QueueSender implements Runnable, AutoCloseable {
14+
15+
private Connection connection;
16+
private Channel channel;
17+
18+
@Override
19+
public void run() {
20+
21+
System.out.println(" [*] Enter messages to send. To exit press CTRL+C");
22+
try {
23+
initConnection();
24+
send();
25+
}
26+
catch (Exception e) {
27+
e.printStackTrace();
28+
}
29+
}
30+
31+
private void send() throws UnsupportedEncodingException, IOException {
32+
33+
try (Scanner sc = new Scanner(System.in)) {
34+
35+
while (sc.hasNextLine()) {
36+
String message = sc.nextLine();
37+
channel.basicPublish("", CommonConstants.QUEUE_NAME, null, message.getBytes("UTF-8"));
38+
System.out.println("Message Sent: " + message);
39+
}
40+
}
41+
}
42+
43+
private void initConnection() throws IOException, TimeoutException {
44+
45+
ConnectionFactory factory = new ConnectionFactory();
46+
factory.setHost(CommonConstants.HOST_NAME);
47+
connection = factory.newConnection();
48+
channel = connection.createChannel();
49+
channel.queueDeclare(CommonConstants.QUEUE_NAME, false, false, false, null);
50+
}
51+
52+
@Override
53+
public void close() {
54+
55+
CommonUtils.close(channel);
56+
CommonUtils.close(connection);
57+
System.out.println("sender connection closed");
58+
}
59+
60+
public static void main(String[] args) {
61+
62+
try (QueueSender sender = new QueueSender()) {
63+
sender.run();
64+
}
65+
}
66+
}

0 commit comments

Comments
 (0)