Skip to content

Commit ab4a541

Browse files
author
YunaiV
committed
增加 spring cloud stream rabbitmq 示例
1 parent 51bcfb7 commit ab4a541

File tree

7 files changed

+198
-0
lines changed

7 files changed

+198
-0
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>labx-10</artifactId>
7+
<groupId>cn.iocoder.springboot.labs</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>labx-10-sc-stream-rabbitmq-consumer-error-handler</artifactId>
13+
14+
<properties>
15+
<maven.compiler.target>1.8</maven.compiler.target>
16+
<maven.compiler.source>1.8</maven.compiler.source>
17+
<spring.boot.version>2.2.4.RELEASE</spring.boot.version>
18+
<spring.cloud.version>Hoxton.SR1</spring.cloud.version>
19+
</properties>
20+
21+
<!--
22+
引入 Spring Boot、Spring Cloud、Spring Cloud Alibaba 三者 BOM 文件,进行依赖版本的管理,防止不兼容。
23+
在 https://dwz.cn/mcLIfNKt 文章中,Spring Cloud Alibaba 开发团队推荐了三者的依赖关系
24+
-->
25+
<dependencyManagement>
26+
<dependencies>
27+
<dependency>
28+
<groupId>org.springframework.boot</groupId>
29+
<artifactId>spring-boot-starter-parent</artifactId>
30+
<version>${spring.boot.version}</version>
31+
<type>pom</type>
32+
<scope>import</scope>
33+
</dependency>
34+
<dependency>
35+
<groupId>org.springframework.cloud</groupId>
36+
<artifactId>spring-cloud-dependencies</artifactId>
37+
<version>${spring.cloud.version}</version>
38+
<type>pom</type>
39+
<scope>import</scope>
40+
</dependency>
41+
</dependencies>
42+
</dependencyManagement>
43+
44+
<dependencies>
45+
<!-- 引入 SpringMVC 相关依赖,并实现对其的自动配置 -->
46+
<dependency>
47+
<groupId>org.springframework.boot</groupId>
48+
<artifactId>spring-boot-starter-web</artifactId>
49+
</dependency>
50+
51+
<!-- 引入 Spring Cloud Stream RabbitMQ 相关依赖,将 RabbitMQ 作为消息队列,并实现对其的自动配置 -->
52+
<dependency>
53+
<groupId>org.springframework.cloud</groupId>
54+
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
55+
</dependency>
56+
</dependencies>
57+
58+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package cn.iocoder.springcloud.labx10.rabbitmqdemo.consumerdemo;
2+
3+
import cn.iocoder.springcloud.labx10.rabbitmqdemo.consumerdemo.listener.MySink;
4+
import org.springframework.boot.SpringApplication;
5+
import org.springframework.boot.autoconfigure.SpringBootApplication;
6+
import org.springframework.cloud.stream.annotation.EnableBinding;
7+
8+
@SpringBootApplication
9+
@EnableBinding(MySink.class)
10+
public class ConsumerApplication {
11+
12+
public static void main(String[] args) {
13+
SpringApplication.run(ConsumerApplication.class, args);
14+
}
15+
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package cn.iocoder.springcloud.labx10.rabbitmqdemo.consumerdemo.listener;
2+
3+
import cn.iocoder.springcloud.labx10.rabbitmqdemo.consumerdemo.message.Demo01Message;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
import org.springframework.cloud.stream.annotation.StreamListener;
7+
import org.springframework.integration.annotation.ServiceActivator;
8+
import org.springframework.integration.context.IntegrationContextUtils;
9+
import org.springframework.messaging.handler.annotation.Payload;
10+
import org.springframework.messaging.support.ErrorMessage;
11+
import org.springframework.stereotype.Component;
12+
13+
@Component
14+
public class Demo01Consumer {
15+
16+
private Logger logger = LoggerFactory.getLogger(getClass());
17+
18+
@StreamListener(MySink.DEMO01_INPUT)
19+
public void onMessage(@Payload Demo01Message message) {
20+
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
21+
// <X> 注意,此处抛出一个 RuntimeException 异常,模拟消费失败
22+
throw new RuntimeException("我就是故意抛出一个异常");
23+
}
24+
25+
@ServiceActivator(inputChannel = "DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01.errors")
26+
public void handleError(ErrorMessage errorMessage) {
27+
logger.error("[handleError][payload:{}]", errorMessage.getPayload().getMessage());
28+
logger.error("[handleError][originalMessage:{}]", errorMessage.getOriginalMessage());
29+
logger.error("[handleError][headers:{}]", errorMessage.getHeaders());
30+
}
31+
32+
@StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) // errorChannel
33+
public void globalHandleError(ErrorMessage errorMessage) {
34+
logger.error("[globalHandleError][payload:{}]", errorMessage.getPayload().getMessage());
35+
logger.error("[globalHandleError][originalMessage:{}]", errorMessage.getOriginalMessage());
36+
logger.error("[globalHandleError][headers:{}]", errorMessage.getHeaders());
37+
}
38+
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package cn.iocoder.springcloud.labx10.rabbitmqdemo.consumerdemo.listener;
2+
3+
import org.springframework.cloud.stream.annotation.Input;
4+
import org.springframework.messaging.SubscribableChannel;
5+
6+
public interface MySink {
7+
8+
String DEMO01_INPUT = "demo01-input";
9+
10+
@Input(DEMO01_INPUT)
11+
SubscribableChannel demo01Input();
12+
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package cn.iocoder.springcloud.labx10.rabbitmqdemo.consumerdemo.message;
2+
3+
/**
4+
* 示例 01 的 Message 消息
5+
*/
6+
public class Demo01Message {
7+
8+
/**
9+
* 编号
10+
*/
11+
private Integer id;
12+
13+
public Demo01Message setId(Integer id) {
14+
this.id = id;
15+
return this;
16+
}
17+
18+
public Integer getId() {
19+
return id;
20+
}
21+
22+
@Override
23+
public String toString() {
24+
return "Demo01Message{" +
25+
"id=" + id +
26+
'}';
27+
}
28+
29+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
spring:
2+
application:
3+
name: demo-consumer-application
4+
cloud:
5+
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
6+
stream:
7+
# Binder 配置项,对应 BinderProperties Map
8+
binders:
9+
rabbit001:
10+
type: rabbit # 设置 Binder 的类型
11+
environment: # 设置 Binder 的环境配置
12+
# 如果是 RabbitMQ 类型的时候,则对应的是 RabbitProperties 类
13+
spring:
14+
rabbitmq:
15+
host: 127.0.0.1 # RabbitMQ 服务的地址
16+
port: 5672 # RabbitMQ 服务的端口
17+
username: guest # RabbitMQ 服务的账号
18+
password: guest # RabbitMQ 服务的密码
19+
# Binding 配置项,对应 BindingProperties Map
20+
bindings:
21+
demo01-input:
22+
destination: DEMO-TOPIC-01 # 目的地。这里使用 RabbitMQ Exchange
23+
content-type: application/json # 内容格式。这里使用 JSON
24+
group: demo01-consumer-group-DEMO-TOPIC-01 # 消费者分组
25+
binder: rabbit001 # 设置使用的 Binder 名字
26+
# Consumer 配置项,对应 ConsumerProperties 类
27+
consumer:
28+
max-attempts: 3 # 重试次数,默认为 3 次。
29+
back-off-initial-interval: 3000 # 重试间隔的初始值,单位毫秒,默认为 1000
30+
back-off-multiplier: 2.0 # 重试间隔的递乘系数,默认为 2.0
31+
back-off-max-interval: 10000 # 重试间隔的最大值,单位毫秒,默认为 10000
32+
# RabbitMQ 自定义 Binding 配置项,对应 RabbitBindingProperties Map
33+
rabbit:
34+
bindings:
35+
demo01-input:
36+
# RabbitMQ Consumer 配置项,对应 RabbitConsumerProperties 类
37+
consumer:
38+
auto-bind-dlq: true # 是否创建对应的死信队列,并进行绑定,默认为 false。
39+
republish-to-dlq: true # 消费失败的消息发布到对应的死信队列时,是否添加异常异常的信息到消息头
40+
41+
server:
42+
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

labx-10/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
<module>labx-10-sc-stream-rabbitmq-consumer-delay</module>
2020

2121
<module>labx-10-sc-stream-rabbitmq-consumer-retry</module>
22+
<module>labx-10-sc-stream-rabbitmq-consumer-error-handler</module>
2223
</modules>
2324

2425

0 commit comments

Comments
 (0)