接到需求要将RabbitMQ与项目整合,用来异步发送消息,功能不是很重要,只是用来存储错误日志,本以为很简单的需求,结果开始了踩坑之旅

一、配置RabbitMQ

  • 项目用的是spring boot搭建,所以采用全注解的方式进行整合

1.导入rabbit依赖

注意:
如果依赖需要被几个子工程使用,就将依赖放入父工程,并对其进行版本约束,这里有个坑:

  1. 如果父工程采用的是<dependencyManagement>标签,则其他子工程无法自动引入依赖,需要手动添加(无需指定版本);
  2. 也可以直接另加一个<dependencies>的标签,仅在父工程引入依赖后即可
        <!-- 暂指定RabbitMQ版本-->
        <rabbitMq.version>2.2.11.RELEASE</rabbitMq.version>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>${rabbitMq.version}</version>
    </dependency>
</dependencies>

2.在Config类配置Mq参数

注意:
如果项目采用集群方式搭建,要注意api调用😭
如果启动后无法创建队列,可以手动去RabbitMq管理页面进行创建交换机并绑定队列;
无法自动创建应该是配置问题,可以在配置文件中实例化 RabbitAdmin 解决(还没有试🖐)


/**
 *
 * @Description:  RabbitMQ配置类
 */
@Configuration

public  class RabbitMQConfig {

    private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);
//    服务地址
    @Value("${rabbitmq.addresses}")
    private String addresses;


//    虚拟主机
    @Value("${rabbitmq.virtual-host}")
    private String virtualHost;


    //    账户名
    @Value("${rabbitmq.username}")
    private String username;

//    密码
    @Value("${rabbitmq.password}")
    private String password;

//    路由键名称
    @Value("${rabbitmq.routing-key}")
    private String routingkey;

//    交换机名称
    @Value("${rabbitmq.exchange}")
    private String exchange;

//    队列名称
    @Value("${rabbitmq.queue}")
    private String queue;

//    是否重试
    @Value("${rabbitmq.enabled}")
    private String enabled;

//    最大重试次数
    @Value("${rabbitmq.max-attempts}")
    private String maxAttempts;


    /**
     * 交换机
     *
     * @return
     */
    @Bean
    public DirectExchange exchange() {
        return new DirectExchange(exchange, true, false);
    }

    /**
     * 队列
     *
     * @return
     */
    @Bean
    public Queue queue() {
        Map<String, Object> args = new HashMap<>();

        // x-message-ttl 这里声明当前队列消息存活时间
        //args.put("x-message-ttl", 6000000);
        return new Queue(queue, true, false, false, args);
    }

    /**
     * 声明 oms队列 与 开发交换机的绑定关系
     *
     * @return
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with(routingkey);
    }

    /**
     * 创建连接工厂
     * @return
     */
    @Bean(name = "connectionFactoryMB")
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setAddresses(addresses);
        cachingConnectionFactory.setUsername(username);
        cachingConnectionFactory.setPassword(password);
        cachingConnectionFactory.setVirtualHost(virtualHost);

        log.info("连接工厂设置完成,连接地址{}",addresses);
        log.info("连接工厂设置完成,连接用户{}",username);
        return cachingConnectionFactory;
    }


    /**
     * 实例化RabbitTemplate工具类
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactoryMB) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactoryMB);
        return rabbitTemplate;
    }

    /**
     * 设置明确默认值的监听器工厂
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        //SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

}

附上参考链接:
[https://blog.csdn.net/weixin_34209406/article/details/92568996 ]
[https://blog.csdn.net/weixin_43744059/article/details/106231698]
[https://blog.csdn.net/qq_35387940/article/details/100514134]
3. 配置生产者

  • 生产者配置相对简单,只需要将 rabbitTemplate 类注入,调用 convertAndSend 方法,参数为交换机名称队列名称发送内容(注意与配置一一对应)
/**
 *
 * @desc:  RabbitMq生产者实现类
 */
@Component
public class FailProducerService implements FailProducer {

    private static final Logger log = LoggerFactory.getLogger(FeiHeProcessService.class);
    //    交换机名称
    @Value("${rabbitmq.exchange}")
    private String exchange;

    //    路由键名称
    @Value("${rabbitmq.routing-key}")
    private String routingKey;


    //    注入RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;


    @Override
    public void convertAndSend(FailureDto FailureDto) {


//       调用RabbitTemplate发送消息
        try {
            String json = JSONObject.toJSONString(FailureDto);
            rabbitTemplate.convertAndSend(exchange, routingKey, json);
            log.info("发送消息内容:{}",json);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

  1. 配置消费者
  • 消费者则更加简单,一个注解搞定 @RabbitListener(指定的队列名)

注意:
配置完消费后,无法消费消息的情况:首先检查队列内有无待消费消息,启动类加上 @EnableRabbit 注解就可以接到了

/**
 *
 * @Description:  RabbitMq消费者接口实现类
 */
@Component
public class FailConsumerService {
    private static final Logger log = LoggerFactory.getLogger(FeiHeProcessService.class);
    private static final String QUEUES = "fail_log_queue";

    @Autowired
    private FailureDto failureDao;

    @RabbitHandler(isDefault = true)
    @RabbitListener(queues = QUEUES)
    public void PrintLog(Message message) {

        if (message != null && message.getBody() != null) {
            log.info("接收到消息为:{}", message);

//        1.解析Json
            FailureDto failureDto = null;
            try {
                failureDto = JSON.parseObject(new String(message.getBody()), FailureLogDto.class);
            } catch (Exception e) {
                e.printStackTrace();
                log.error("Json转换实体失败:", e);
            }
//        2.存库
            try {
                failureDao.insertFailLog(failureDto);
                log.info("保存成功!");
            } catch (Exception e) {
                e.printStackTrace();
                log.error("错误日志保存失败:", e);
            }
        }

    }

}