接到需求要将RabbitMQ与项目整合,用来异步发送消息,功能不是很重要,只是用来存储错误日志,本以为很简单的需求,结果开始了踩坑之旅
一、配置RabbitMQ
- 项目用的是spring boot搭建,所以采用全注解的方式进行整合
1.导入rabbit依赖
注意:
如果依赖需要被几个子工程使用,就将依赖放入父工程,并对其进行版本约束,这里有个坑:
- 如果父工程采用的是<dependencyManagement>标签,则其他子工程无法自动引入依赖,需要手动添加(无需指定版本);
- 也可以直接另加一个<dependencies>的标签,仅在父工程引入依赖后即可
<!-- 暂指定RabbitMQ版本-->
<rabbitMq.version>2.2.11.RELEASE</rabbitMq.version>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${rabbitMq.version}</version>
</dependency>
</dependencies>
2.在Config类配置Mq参数
注意:
如果项目采用集群方式搭建,要注意api调用😭
如果启动后无法创建队列,可以手动去RabbitMq管理页面进行创建交换机并绑定队列;
无法自动创建应该是配置问题,可以在配置文件中实例化 RabbitAdmin 解决(还没有试🖐)
/**
*
* @Description: RabbitMQ配置类
*/
@Configuration
public class RabbitMQConfig {
private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);
// 服务地址
@Value("${rabbitmq.addresses}")
private String addresses;
// 虚拟主机
@Value("${rabbitmq.virtual-host}")
private String virtualHost;
// 账户名
@Value("${rabbitmq.username}")
private String username;
// 密码
@Value("${rabbitmq.password}")
private String password;
// 路由键名称
@Value("${rabbitmq.routing-key}")
private String routingkey;
// 交换机名称
@Value("${rabbitmq.exchange}")
private String exchange;
// 队列名称
@Value("${rabbitmq.queue}")
private String queue;
// 是否重试
@Value("${rabbitmq.enabled}")
private String enabled;
// 最大重试次数
@Value("${rabbitmq.max-attempts}")
private String maxAttempts;
/**
* 交换机
*
* @return
*/
@Bean
public DirectExchange exchange() {
return new DirectExchange(exchange, true, false);
}
/**
* 队列
*
* @return
*/
@Bean
public Queue queue() {
Map<String, Object> args = new HashMap<>();
// x-message-ttl 这里声明当前队列消息存活时间
//args.put("x-message-ttl", 6000000);
return new Queue(queue, true, false, false, args);
}
/**
* 声明 oms队列 与 开发交换机的绑定关系
*
* @return
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with(routingkey);
}
/**
* 创建连接工厂
* @return
*/
@Bean(name = "connectionFactoryMB")
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setAddresses(addresses);
cachingConnectionFactory.setUsername(username);
cachingConnectionFactory.setPassword(password);
cachingConnectionFactory.setVirtualHost(virtualHost);
log.info("连接工厂设置完成,连接地址{}",addresses);
log.info("连接工厂设置完成,连接用户{}",username);
return cachingConnectionFactory;
}
/**
* 实例化RabbitTemplate工具类
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactoryMB) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactoryMB);
return rabbitTemplate;
}
/**
* 设置明确默认值的监听器工厂
* @param connectionFactory
* @return
*/
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
//SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
}
附上参考链接:
[https://blog.csdn.net/weixin_34209406/article/details/92568996 ]
[https://blog.csdn.net/weixin_43744059/article/details/106231698]
[https://blog.csdn.net/qq_35387940/article/details/100514134]
3. 配置生产者
- 生产者配置相对简单,只需要将 rabbitTemplate 类注入,调用 convertAndSend 方法,参数为交换机名称、队列名称、发送内容(注意与配置一一对应)
/**
*
* @desc: RabbitMq生产者实现类
*/
@Component
public class FailProducerService implements FailProducer {
private static final Logger log = LoggerFactory.getLogger(FeiHeProcessService.class);
// 交换机名称
@Value("${rabbitmq.exchange}")
private String exchange;
// 路由键名称
@Value("${rabbitmq.routing-key}")
private String routingKey;
// 注入RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void convertAndSend(FailureDto FailureDto) {
// 调用RabbitTemplate发送消息
try {
String json = JSONObject.toJSONString(FailureDto);
rabbitTemplate.convertAndSend(exchange, routingKey, json);
log.info("发送消息内容:{}",json);
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 配置消费者
- 消费者则更加简单,一个注解搞定 @RabbitListener(指定的队列名)
注意:
配置完消费后,无法消费消息的情况:首先检查队列内有无待消费消息,启动类加上 @EnableRabbit 注解就可以接到了
/**
*
* @Description: RabbitMq消费者接口实现类
*/
@Component
public class FailConsumerService {
private static final Logger log = LoggerFactory.getLogger(FeiHeProcessService.class);
private static final String QUEUES = "fail_log_queue";
@Autowired
private FailureDto failureDao;
@RabbitHandler(isDefault = true)
@RabbitListener(queues = QUEUES)
public void PrintLog(Message message) {
if (message != null && message.getBody() != null) {
log.info("接收到消息为:{}", message);
// 1.解析Json
FailureDto failureDto = null;
try {
failureDto = JSON.parseObject(new String(message.getBody()), FailureLogDto.class);
} catch (Exception e) {
e.printStackTrace();
log.error("Json转换实体失败:", e);
}
// 2.存库
try {
failureDao.insertFailLog(failureDto);
log.info("保存成功!");
} catch (Exception e) {
e.printStackTrace();
log.error("错误日志保存失败:", e);
}
}
}
}