springBoot整合rabbitMQ,通过网络,动态创建连接,监听,队列等(四)

论坛 期权论坛 脚本     
匿名技术用户   2021-1-5 08:50   103   0

一,

项目场景需要通过判断网络是否可用,来动态的创建rabbitMQ连接,已达到程序可以通过不同环境,动态部署的要求

二,

判断rabbitMQ服务器是否可用

 public static boolean isHostConnectable(String host, int port) {
        Socket socket = new Socket();
        try {
            socket.connect(new InetSocketAddress(host, port));
        } catch (IOException e) {
            return false;
        } finally {
            try {
                socket.close();
            } catch (IOException e) {
            }
        }
        return true;
    }

三,

手动管理rabbitMQ的连接,监听,

package com.koala.console.common.utils;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

public class MQUtils {

    public static AtomicReference<SimpleMessageListenerContainer> listenerContainerAtomicReference = new AtomicReference<>();

    public static AtomicReference<RabbitAdmin> rabbitAdminAtomicReference = new AtomicReference<>();

    public static SimpleMessageListenerContainer getMessageListenerContainer(ConnectionFactory connectionFactory) {
        if (Objects.isNull(listenerContainerAtomicReference.get())) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            listenerContainerAtomicReference.set(container);
            container.setDefaultRequeueRejected(false);
            container.setAcknowledgeMode(AcknowledgeMode.AUTO);
            container.start();
            return container;
        }
        return listenerContainerAtomicReference.get();
    }

    public static void setMessageListener(SimpleMessageListenerContainer container, MessageListener messageListener) {
        container.setMessageListener(messageListener);
    }

    public static RabbitAdmin getRabbitAdmin(ConnectionFactory connectionFactory) {
        if (Objects.isNull(rabbitAdminAtomicReference.get())) {
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            rabbitAdminAtomicReference.set(rabbitAdmin);
        }
        return rabbitAdminAtomicReference.get();
    }

    public static CachingConnectionFactory getConnectionFactory(String host, Integer port, String userName, String passWord) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(passWord);
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }
}

四,

项目启动时创建rabbitMQ连接和实例,定时任务定时检查服务器是否可用,维护连接可用性

@Configuration
public class ApiConfigRuning implements ApplicationRunner {

@Autowired
    private HeartBeatScheduler heartBeatScheduler;

@Override
    public void run(ApplicationArguments args) throws Exception {

heartBeatScheduler.createMQ();
        if (Objects.nonNull(MQUtils.listenerContainerAtomicReference.get()) && Objects.nonNull(MQUtils.rabbitAdminAtomicReference.get())) {
            Queue queue = new Queue(invitationCode);
            FanoutExchange exchange = new FanoutExchange(RabbitMQConfiguration.TERMINAL_FANOUT_EXCHANGE_cloud);
            MQUtils.rabbitAdminAtomicReference.get().declareQueue(queue);
            rabbitAdmin.declareExchange(exchange);
            rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange));
            MQUtils.listenerContainerAtomicReference.get().addQueueNames(invitationCode);
            MQUtils.listenerContainerAtomicReference.get().start();
        }
}

}
@Slf4j
@Component
public class HeartBeatScheduler {

@Autowired
    private TailWindServiceImpl tailWindService;

public void createMQ() {
        if (SystemTool.isHostConnectable(varMQConfiguration.getHost(), varMQConfiguration.getPort())) {
            CachingConnectionFactory connectionFactory = MQUtils.getConnectionFactory(varMQConfiguration.getHost(), varMQConfiguration.getPort(), varMQConfiguration.getUsername(), varMQConfiguration.getPassword());
            if (Objects.isNull(MQUtils.listenerContainerAtomicReference.get())) {
                SimpleMessageListenerContainer messageListenerContainer = MQUtils.getMessageListenerContainer(connectionFactory);
                MQUtils.setMessageListener(messageListenerContainer, tailWindService);
                messageListenerContainer.start();
            }
            if (Objects.isNull(MQUtils.rabbitAdminAtomicReference.get())) {
                RabbitAdmin rabbitAdmin = MQUtils.getRabbitAdmin(connectionFactory);
            }
        }
    }

    @Scheduled(fixedDelay = 6000)
    public void checkMQ() {
        createMQ();
    }

}

五,

动态创建队列,处理消息

 Queue queue = new Queue(invitationCode);
            FanoutExchange exchange = new FanoutExchange(RabbitMQConfiguration.TERMINAL_FANOUT_EXCHANGE_cloud);
            if (Objects.nonNull(MQUtils.rabbitAdminAtomicReference.get())) {
                MQUtils.rabbitAdminAtomicReference.get().declareQueue(queue);
            }
            rabbitAdmin.declareExchange(exchange);
            rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange));
            if (Objects.nonNull(MQUtils.listenerContainerAtomicReference.get())) {
                MQUtils.listenerContainerAtomicReference.get().addQueueNames(invitationCode);
                MQUtils.listenerContainerAtomicReference.get().start();
            }
@Service
public class TailWindServiceImpl implements MessageListener {

@Override
    public void onMessage(Message message) {

        byte[] body = message.getBody();
        String s = new String(body);
}

}

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:7942463
帖子:1588486
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP