前面文章《Apache ActiveMQ 负载均衡 》在最后有提到一个场景,就是当AMQ的节点数大于2个的时候(HA + LB),且配置了消息回流的情况下的一些问题。
HA + LB的基本结构如下图:

问题即发生在当生产者将消息投递到Master节点后(AMQ SERVER),消费者与A节点建立连接(Broker),根据AMQ的“预先消费”策略预先消费了一定数量的消息,即A节点消费了Master节点的一部分消息,A节点在将消息转发至消费者Consumer。
消费者在消费过程中,A节点意外宕机,消费者根据failover机制会自动连接至B或C节点,想继续消费剩余的消息(A节点未消费完成的消息)。
那么我们可以按如下的配置方式即可解决该场景下的问题:
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="broker129" dataDirectory="${activemq.data}">
<!-- Destination specific policies using destination names or wildcards -->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">">
</policyEntry>
<policyEntry queue=">" enableAudit="false">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" />
</deadLetterStrategy>
<networkBridgeFilterFactory>
<conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
</networkBridgeFilterFactory>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<!--
duplex Broker双工模式,即:Broker可以是消费者也可以是发布者。
该参数同“消息回流”(replayWhenNoConsumers)不同,注意区分。
-->
<!-- networkTTL 即:信息和订阅在网络可以通过的Broker数量。该参数需要根据LB数量合理设置 -->
<networkConnectors>
<networkConnector duplex="true" networkTTL="3" uri="static:(tcp://192.168.137.200:61616)"/>
</networkConnectors>
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
<!--
The systemUsage controls the maximum amount of space the broker will
use before disabling caching and/or slowing down producers. For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="nio://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<!-- destroy the spring context on shutdown to stop jetty -->
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
<!--
Enable web consoles, REST and Ajax APIs and demos
The web consoles requires by default login, you can disable this in the jetty.xml file
Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
<import resource="jetty.xml"/>
</beans>
|