|
一,
项目场景需要通过判断网络是否可用,来动态的创建rabbitMQ连接,已达到程序可以通过不同环境,动态部署的要求
二,
判断rabbitMQ服务器是否可用
public static boolean isHostConnectable(String host, int port) {
Socket socket = new Socket();
try {
socket.connect(new InetSocketAddress(host, port));
} catch (IOException e) {
return false;
} finally {
try {
socket.close();
} catch (IOException e) {
}
}
return true;
}
三,
手动管理rabbitMQ的连接,监听,
package com.koala.console.common.utils;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
public class MQUtils {
public static AtomicReference<SimpleMessageListenerContainer> listenerContainerAtomicReference = new AtomicReference<>();
public static AtomicReference<RabbitAdmin> rabbitAdminAtomicReference = new AtomicReference<>();
public static SimpleMessageListenerContainer getMessageListenerContainer(ConnectionFactory connectionFactory) {
if (Objects.isNull(listenerContainerAtomicReference.get())) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
listenerContainerAtomicReference.set(container);
container.setDefaultRequeueRejected(false);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.start();
return container;
}
return listenerContainerAtomicReference.get();
}
public static void setMessageListener(SimpleMessageListenerContainer container, MessageListener messageListener) {
container.setMessageListener(messageListener);
}
public static RabbitAdmin getRabbitAdmin(ConnectionFactory connectionFactory) {
if (Objects.isNull(rabbitAdminAtomicReference.get())) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdminAtomicReference.set(rabbitAdmin);
}
return rabbitAdminAtomicReference.get();
}
public static CachingConnectionFactory getConnectionFactory(String host, Integer port, String userName, String passWord) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(userName);
connectionFactory.setPassword(passWord);
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
}
四,
项目启动时创建rabbitMQ连接和实例,定时任务定时检查服务器是否可用,维护连接可用性
@Configuration
public class ApiConfigRuning implements ApplicationRunner {
@Autowired
private HeartBeatScheduler heartBeatScheduler;
@Override
public void run(ApplicationArguments args) throws Exception {
heartBeatScheduler.createMQ();
if (Objects.nonNull(MQUtils.listenerContainerAtomicReference.get()) && Objects.nonNull(MQUtils.rabbitAdminAtomicReference.get())) {
Queue queue = new Queue(invitationCode);
FanoutExchange exchange = new FanoutExchange(RabbitMQConfiguration.TERMINAL_FANOUT_EXCHANGE_cloud);
MQUtils.rabbitAdminAtomicReference.get().declareQueue(queue);
rabbitAdmin.declareExchange(exchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange));
MQUtils.listenerContainerAtomicReference.get().addQueueNames(invitationCode);
MQUtils.listenerContainerAtomicReference.get().start();
}
}
}
@Slf4j
@Component
public class HeartBeatScheduler {
@Autowired
private TailWindServiceImpl tailWindService;
public void createMQ() {
if (SystemTool.isHostConnectable(varMQConfiguration.getHost(), varMQConfiguration.getPort())) {
CachingConnectionFactory connectionFactory = MQUtils.getConnectionFactory(varMQConfiguration.getHost(), varMQConfiguration.getPort(), varMQConfiguration.getUsername(), varMQConfiguration.getPassword());
if (Objects.isNull(MQUtils.listenerContainerAtomicReference.get())) {
SimpleMessageListenerContainer messageListenerContainer = MQUtils.getMessageListenerContainer(connectionFactory);
MQUtils.setMessageListener(messageListenerContainer, tailWindService);
messageListenerContainer.start();
}
if (Objects.isNull(MQUtils.rabbitAdminAtomicReference.get())) {
RabbitAdmin rabbitAdmin = MQUtils.getRabbitAdmin(connectionFactory);
}
}
}
@Scheduled(fixedDelay = 6000)
public void checkMQ() {
createMQ();
}
}
五,
动态创建队列,处理消息
Queue queue = new Queue(invitationCode);
FanoutExchange exchange = new FanoutExchange(RabbitMQConfiguration.TERMINAL_FANOUT_EXCHANGE_cloud);
if (Objects.nonNull(MQUtils.rabbitAdminAtomicReference.get())) {
MQUtils.rabbitAdminAtomicReference.get().declareQueue(queue);
}
rabbitAdmin.declareExchange(exchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange));
if (Objects.nonNull(MQUtils.listenerContainerAtomicReference.get())) {
MQUtils.listenerContainerAtomicReference.get().addQueueNames(invitationCode);
MQUtils.listenerContainerAtomicReference.get().start();
}
@Service
public class TailWindServiceImpl implements MessageListener {
@Override
public void onMessage(Message message) {
byte[] body = message.getBody();
String s = new String(body);
}
}
|