Android教程網
  1. 首頁
  2. Android 技術
  3. Android 手機
  4. Android 系統教程
  5. Android 游戲
 Android教程網 >> Android技術 >> 關於Android編程 >> 架構設計:系統間通信(27)——其他消息中間件及場景應用(上)

架構設計:系統間通信(27)——其他消息中間件及場景應用(上)

編輯:關於Android編程

1、概述

目前業界有很多消息中間件可供大家選擇,主要分為兩類:需要付費的商業軟件和開源共享的非商業軟件。對於商業軟件您和您的團隊可以選擇IBM WebSphere集成的MQ功能,也可以選擇Oracle WebLogic集成的MQ功能。本文首先介紹除Apache ActiveMQ以外的兩款開源共享的消息中間件產品,然後列舉三個實際的業務常見,為讀者介紹如何在這些實際業務中使用消息中間件解決問題。

2、RabbitMQ及特性

RabbitMQ基於Erlang語言開發和運行。它與Apache ActiveMQ有很多相同的特性,例如RabbitMQ完整支持多種消息協議:AMQP、STOMP、MQTT、HTTP,我們使用RabbitMQ時會默認使用AMQP1.0 協議。當然,RabbitMQ作為Apache ActiveMQ最主要的競品之一也有其獨特的功能特性。例如RabbitMQ支持一套特有的Routing-Exchange消息路由規則。這套規則可以按照消息內容,自動將消息歸類到不同的消息隊列中。關於這套Routing-Exchange消息路由規則可參見我另一篇文章的詳細介紹:架構設計:系統間通信(20)——MQ:消息協議(下)

2-1、RabbitMQ軟件特性

下面我們來看看RabbitMQ官網上對這款消息中間件軟件的特性介紹:

Reliability(可靠性):

RabbitMQ offers a variety of features to let you trade off performance with reliability, including persistence, delivery acknowledgements, publisher confirms, and high availability.

RabbitMQ支持消息持久化、消息重試操作(比ActiveMQ的相關功能還要強大)、消息回執確認規則、消息生產者發送確認機制(實際上是消息生產者端的一種事務機制)和高可用性HA(多節點熱備方案)等特性來提供RabbitMQ服務的高可靠性。

Flexible Routing(靈活的路由規則):

Messages are routed through exchanges before arriving at queues. RabbitMQ features several built-in exchange types for typical routing logic. For more complex routing you can bind exchanges together or even write your own exchange type as a plugin.

這就是我們提到的RabbitMQ所支持的一套特有的Routing-Exchange消息路由規則。一定注意這套規則不是AMQP協議規范提供的。關於個消息路由規則可參見我另一篇文章的詳細介紹:架構設計:系統間通信(20)——MQ:消息協議(下)

Clustering(RabbitMQ服務集群):

Several RabbitMQ servers on a local network can be clustered together, forming a single logical broker。

RabbitMQ服務集群主要解決的問題是單個RabbitMQ服務節點的性能瓶頸。關於RabbmitMQ集群的搭建過程由於本文篇幅限制,我將隨後安排時間為各位讀者詳細介紹。

Plugin System & Federation(支持第三方插件模塊,其中RabbitMQ Federation [插件]需要特別說明):

RabbitMQ ships with a variety of plugins extending it in different ways, and you can also write your own.

For servers that need to be more loosely and unreliably connected than clustering allows, RabbitMQ offers a federation model.

RabbitMQ支持第三方擴展插件,在RabbitMQ的官網上(http://www.rabbitmq.com/plugins.html)列舉了各種由RabbitMQ官方開發的插件,以及實驗性質的插件,包括(但不限於):rabbitmq_federation、rabbitmq_management、rabbitmq_mqtt、rabbitmq_stomp、rabbitmq_tracing等等。您還可以按照RabbitMQ提供的插件規范,開發您自己的RabbitMQ-Plugins。特別說明一下rabbitmq_federation 插件:這個插件允許您在多個RabbitMQ Clusters之間傳遞消息。

Multi-protocol(多協議支持):

RabbitMQ supports messaging over a variety of messaging protocols.

上文已經提到,RabbitMQ完整支持多種消息協議,包括:AMQP(默認使用該協議)、STOMP、MQTT、HTTP。其中一些協議要安裝相應的插件進行支持,例如rabbitmq_stomp插件。

Many Clients(多客戶端/多語言支持):

There are RabbitMQ clients for almost any language you can think of.

您可以想到的各種編程語言都可以作為RabbitMQ的客戶端進行連接,包括(但不限於):Java 、.NET 、Ruby、 Python 、PHP、 JavaScript、Scala、Groovy……

Tracing(消息追溯):

If your messaging system is misbehaving, RabbitMQ offers tracing support to let you find out what’s going on.

如果您發現發送給RabbitMQ的消息存在異常(如發送到了錯誤的隊列中,發送給了錯誤的訂閱者等等),RabbitMQ提供了消息處理過程追溯功能,以便幫助開發人員分析錯誤原因。

2-2、RabbitMQ使用概要

RabbitMQ的安裝過程非常簡單:由於RabbitMQ是基於Erlang語言開發並運行的,所以安裝RabbitMQ的第一步是安裝Erlang運行環境。您可以在https://www.erlang-solutions.com/resources 下載最新Erlang版本進行安裝(注意不同的RabbitMQ版本有不同的Erlang最低版本要求,筆者使用的RabbitMQ版本為V3.5.4,Erlang版本為V18.0);

接下來您可以在RabbitMQ官方(http://www.rabbitmq.com/)下載各種RabbitMQ的安裝版本,建議直接使用各種操作系統對應的rpm文件進行安裝即可。安裝完成後,可以使用15672端口訪問RabbitMQ的管理界面(默認的用戶名和密碼都是guest)

這裡寫圖片描述

以下代碼演示了如何使用RabbitMQ的客戶端開發包,進行消息生產和消費。RabbitMQ的客戶端開發包可以在RabbitMQ官網進行下載(http://www.rabbitmq.com/java-client.html),也可以使用Mavean官方庫進行導入:


    com.rabbitmq
    amqp-client
    3.5.4
RabbitMQ消息生產者
package com.yinwenjie.test.testRabbitMQ;

import java.io.IOException;
import java.util.Date;
import java.util.UUID;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 這個測試類,用於模擬消息生成者,每100毫秒,向rabbit集群寫入不同的消息
 * @author yinwenjie
 */
public class RabbitProducerThread implements Runnable {
    /**
     * 日志
     */
    private static final Log LOGGER = LogFactory.getLog(RabbitProducerThread.class);

    public static void main(String[] args) throws Exception {
        new Thread(new RabbitProducerThread()).start();
    }

    public void run() {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        //連接集群節點就這麼設置
        // connectionFactory.newConnection(addrs)
        Connection conn = null;
        Channel producerChannel = null;
        try {
            conn = connectionFactory.newConnection();
            producerChannel = conn.createChannel();
        } catch (Exception e) {
            RabbitProducerThread.LOGGER.error(e.getMessage() , e);
            System.exit(-1);
        }

        //然後每隔100毫秒,發送一條數據
        while(true) {
            // 消息的唯一編號
            String uuid = UUID.randomUUID().toString();
            String message = uuid + ";time=" + new Date().getTime();
            //設置一些參數
            BasicProperties properties = new BasicProperties().builder().type("String").
                    contentType("text").contentEncoding("UTF-8").messageId(uuid).build();
            try {
                //第一個參數是exchange交換器的名字
                //第二個參數是進行消息路由的關鍵key
                producerChannel.basicPublish("com.ai.sboss.arrangement.event", "com.ai.sboss.arrangement.event.queues", properties, message.getBytes());
                RabbitProducerThread.LOGGER.info("消息發送:" + message);
            } catch (IOException e) {
                RabbitProducerThread.LOGGER.error(e.getMessage() , e);
            }

            synchronized (this) {
                try {
                    this.wait(3000);
                } catch (InterruptedException e) {
                    RabbitProducerThread.LOGGER.error(e.getMessage() , e);
                }
            }
        }
    }
}
RabbitMQ消息消費者
package com.yinwenjie.test.testRabbitMQ;

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * 這個測試類,用於模擬消息消費者
 * @author yinwenjie
 */
public class RabbitConsumerThread implements Runnable {

    /**
     * 日志
     */
    private static final Log LOGGER = LogFactory.getLog(RabbitConsumerThread.class);

    public static void main(String[] args) throws Exception {
        new Thread(new RabbitConsumerThread()).start();
    }

    public void run() {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        //連接集群節點就這麼設置
        //connectionFactory.newConnection(addrs)
        Connection conn = null;
        Channel consumerChannel = null;
        try {
            conn = connectionFactory.newConnection();
            consumerChannel = conn.createChannel();
        } catch (Exception e) {
            RabbitConsumerThread.LOGGER.error(e.getMessage() , e);
            System.exit(-1);
        }

        //開始監控消息,(ack是手動的)
        QueueingConsumer queueingConsumer = null; 
        try {
            queueingConsumer = new QueueingConsumer(consumerChannel);
            // 設置消費者訂閱的消息隊列名:com.ai.sboss.arrangement.event.queues
            consumerChannel.basicConsume("com.ai.sboss.arrangement.event.queues", false, queueingConsumer);
        } catch (IOException e) {
            RabbitConsumerThread.LOGGER.error(e.getMessage() , e);
            System.exit(-1);
        }

        //停頓200毫秒,才處理下一條,以便模擬事件處理對應的消耗事件
        while(true) {
            QueueingConsumer.Delivery delivery = null;
            try {
                delivery = queueingConsumer.nextDelivery();
            } catch (Exception e) {
                RabbitConsumerThread.LOGGER.error(e.getMessage() , e);
            }
            long deliverytag = delivery.getEnvelope().getDeliveryTag();
            byte[] messageBytes = delivery.getBody();
            BasicProperties properties = delivery.getProperties();
            String message = new String(messageBytes);
            RabbitConsumerThread.LOGGER.info("收到事件======message = " + message + " | properties =" +properties);

            //這裡停頓200毫秒,模擬業務時間
            synchronized (this) {
                try {
                    this.wait(200);
                } catch (InterruptedException e) {
                    RabbitConsumerThread.LOGGER.error(e.getMessage() , e);
                }
            }

            RabbitConsumerThread.LOGGER.info("事件message = " + message + "處理完成,發送ack。等待下一條消息");
            try {
                // 發送ack:消息處理成功的確認信號
                consumerChannel.basicAck(deliverytag, false);
            } catch (IOException e) {
                RabbitConsumerThread.LOGGER.error(e.getMessage() , e);
            }
        }
    }
}

3、場景應用——電子政務平台:駕駛人違法記錄同步功能

3-1、業務場景說明

這裡寫圖片描述

只是一個為了匯總全國機動車違法記錄而設計的多系統數據同步功能。最主要的功能是進行違法記錄的上傳以及在各省間同步跨省違法記錄。在進行架構設計之前,我們首先需要了解一些關於整個系統業務背景:任何系統設計都不能脫離系統實際業務背景而存在<喎?/kf/ware/vc/" target="_blank" class="keylink">vc3Ryb25nPqOhPC9wPgoKCjxwPjxzdHJvbmc+ytfPyNX7uPbPtc2zt9bOqsirufrPtc2zus0zMrj2yqG8ts+1zbM8L3N0cm9uZz6jutPJ09rDv7j2yqG2vNPQt/u6z7jDyqHKtbzKx+m/9rXEoaK0psDtuf2zzM3qyKuyu82stcTOpbeovMfCvLSmwO2y2df3oaOyosfSw7+49sqhtcS83bncz7XNs7Xn19O7r83GvfjH6b/20rKyu76hz+DNrKO609C1xMqh0tG+rdff1NrBy8irufq1xMewwdCjrLv5sb7Jz8v509C83bnc0rXO8cr9vt22vNLRvq3T68irufrPtc2zyrXP1sHLzayyvaO709C1xMqhv8nE3LLFv6rKvL2oyeijrMn11sG2vMO709DX1Ly6tcTOpbeovMfCvLXn19PQxc+ioaM8L3A+CjxwPjxzdHJvbmc+zqW3qLzHwrzQxc+itcTNrLK9uf2zzLfWzqrJz9DQzayyvbrNz8LQ0M2ssr08L3N0cm9uZz6jurzdyrvIy86lt6i8x8K80MXPotDo0qq008qhvLbPtc2zyrXP1rW9yKu5+s+1zbO1xM2ssr2jqNbB09rKx8qhvLbPtc2zyLfIz86lt6jQxc+iyrHBory0vfjQ0M2ssr2jrLu5ysfKoby2z7XNs9TaxLO49rnMtqi1xMqxvOTW3MbazbPSu7340NDNrLK9o6zV4r7Nyse4+Lj3yqG8ts+1zbPX1Ly6tcS0ptbDyKjBy6Opo6zV4tH5tcTNrLK9uf2zzLPGzqo8c3Ryb25nPsnP0NDNrLK9PC9zdHJvbmc+oaPI57n7xLO49s6lt6jV38rH1NqxvsqhzqW3qLXEo6zEx8O01rG907340NDJz9DQzayyvb7Nv8nS1MHLo7vI57n7xLO49s6lt6jV38rH1NrN4sqhzqW3qKOsxMfDtLP9wcu9+NDQyc/Q0M2ssr3N4qOsPHN0cm9uZz61scirufrPtc2zt6LP1tXiysfSu8z10uy12M6lt6i8x8K8o6yyosfSzqW3qNXfye233daky/nU2sqh0tG+rb3TyOvBy8irufrPtc2zPC9zdHJvbmc+o6y+zdDo0qrNqLn9yKu5+s+1zbO9q9XizPXOpbeovMfCvDxzdHJvbmc+z/LOpbeo1d/J7bfd1qTL+dTayqE8L3N0cm9uZz61xMqhvLbPtc2zvfjQ0M2ssr2jrNXi0fm1xM2ssr25/bPMs8bOqjxzdHJvbmc+z8LQ0M2ssr08L3N0cm9uZz6hozwvcD4KPHA+PHN0cm9uZz7I57n7xLPKobXEz7XNs9DCvdPI68HLyKu5+s+1zbOjrMTHw7TIq7n6z7XNs9Do0qrU2tXiuPbKobXEz7XNs82ssr25psTcxKO/6de8sbi6w7rzo6y9q9XiuPbKob3TyOvIq7n6z7XNs8ewy/nP4LnYtcS/58qh0uy12M6lt6i8x8K8yKuyv7340NDSu7TOyc/Q0M2ssr26zc/C0NDNrLK9PC9zdHJvbmc+oaPEx8O0yrLDtL3QobDKoby2z7XNs9e8sbi6w6GxxNijv7K70ru2qL3TyOvIq7n6z7XNs7XEw7+49sqhvLbPtc2ztrzE3MGiv8zOyLaotcS5pNf3o6zIzrrOz7XNs7a809DSu7j2zsi2qNbcxtqho9Ta1eK49s7ItqjW3MbaxNqjrL+qt6LNxbbT0OjSqs3qs8nW7sjnuduy7M+1zbO5pNf3x+m/9qGitffV+7mmxNzEo7/ptcTUy8vj0NTE3KGivfjQ0MjtvP5CdWfQ3rjEoaK9+NDQyO28/rLZ1/e5/bPM08W7r7XItci5pNf3oaM8L3A+CjxwPsHtzeK498qhtcTI7bz+uanTpsnMsru+odK70fmjrMq508O1xL+qt6LT79HU0rKyu8/gzayho8v50tTU2r+8wse908jrt72wuMqxo6zQ6NKqt72wuNans9a24NbWseCzzNPv0dSjrLvy1d/Kx8q508O24NbW0+/R1La81qez1rXE0rvW1s2o08PQrdLpoaPB7c3iyqG8ts+1zbO6zcirufrPtc2z06a4w76hv8nE3LXEvfjQ0NK1zvHN0fHuo6zV4tH5ssW/ydLUsaPWpMqhvLbPtc2ztcTI7bz+uanTpsnMsrux2M6qwcvKtc/WzqW3qLzHwrzJz9DQzayyvbrNz8LQ0M2ssr25psTc16jDxbj8uMSx4LPM0+/R1KOs0rKyu7HYzqrBy8q1z9bS1MnPuabE3Neow8W199X7yqG8ts+1zbO1xLnM09DStc7xuf2zzLrNz7XNs7zcubmjqNPQtcTKsbry0vLOqry8yvXOyszitffV+9K1zvG5/bPMv827p7e9yse++LbUsru74bTw06a1xKOpoaM8L3A+CjxwPtPJ09rKx9Hdyr6zob6wo6zEv7XEysfR3cq+z/vPos+1zbPW0LzkvP7U2tXiuPbQ6Mfzs6G+sMq1z9a3vbC41tC1xNf308Oho8v50tTO0sPHvNnJ6NX7uPbQ6Mfzu7e+s8rHvt/T0KGw0qrKtc/WzqW3qLzHwrzQxc+izayyvaGxtcTHsNbDuabE3C/HsNbDzPW8/rXEoaPV4tCpx7DWw7mmxNwvx7DWw7u3vrOw/MCoo6i1q7K7z97T2qOpo7rIq7n6vN3Ku8jLu/mxvrW1sLjQxc+iv+KjqNXisr+31tDFz6K/ycTc0rLKx82ouf2498qhvLbPtc2zzayyvbb4wLSjqaGiyKu5+sjLv9rJ7bfd0MXPor/itcihozwvcD4KCgoKCjxoMiBpZD0="3-2總體設計思路">3-2、總體設計思路

以上業務場景是一個典型的需要使用支持事務的消息中間件的應用場景——追求消息到達和處理的穩定性,您可以使用本章我們詳細介紹的ActiveMQ也可以使用上一節介紹的RabbitMQ:因為他們都支持多語言接入,都提供消息事務支持,都支持消費者側的消息回執確認。另外,這個業務場景中也要兼顧一定的數據吞吐量。

這裡寫圖片描述

在已有的系統中加入消息隊列服務最大的目的是保持已有系統的原始架構不作調整。不作調整的原因可能是因為原有系統由於設計不當已經不可能再做大的調整,否則將付出無法承受的代價;也可能是由於非技術原因,技術團隊沒有相應的權限調整已有架構設計。

采用消息隊列服務方案的另一個優點是可以緩解數據洪峰。在這個示例場景中最典型的體現就是:需求中明確的提到,當一個省級系統新接入時,需要進行一次完整的違法記錄的上行同步和下行同步。這樣的話有可能在這個省級系統上積累了7、8年的違法記錄會被同步到全國系統,這個過程可能會出現一定的數據堆積。但是由於我們給出的消息服務中間件的數據持久化性能較為強勁(請參見下一小節的詳細設計),所以數據同步壓力基本上不會傳遞到上層系統的業務處理層。

分析場景中對於省級系統接入的需求描述,技術層面上最大的幾個問題是:不同省級系統采用的架構不一樣,使用的編程語言不一樣,技術團隊水平不一樣。為了保證接入方案的安全效果、性能效果和工作效率,全國系統應該為省級系統提供不同的語言開發包和集成文檔(類似於集成微信/支付寶/淘寶等開放平台);根據經驗,全國系統應首先為各省級系統優先提供JAVA和C#的集成開發包。

開發包中主要對連接消息服務隊列的行為進行封裝、對上行消息和下行消息的文本格式進行規范(保證各省系統上行消息的文本格式是一致的,保證各省收到的下行消息都是上級系統所統一的格式)、 對消息的加密和解密協議進行封裝、對消息發送過程和消息訂閱過程進行封裝(包括消息生產者進行上行消息的發送和消息消費者進行下行消息的接收)。另外,為了保證傳輸過程文本消息的通訊安全,開發包中還封裝了SSL加密/解密過程。

最後,由於要保證所有的上行消息和下行消息一定會被目標系統正常處理。所以這些消息都應該是PERSISTENT Meaage形式的消息。並且無論是上行消息還是下行消息,都應該在超出重試次數後被放置到“死信隊列”(Dead Letter Queue),以便進行人工干預。重試次數應該設置為2——3次左右,因為ActiveMQ默認重發6次(redeliveryCounter==6)的值過大,在消息出現問題時重試次數過多會嚴重影響消息中間件服務的處理效率。

3-3、消息隊列服務詳細設計

下面我們來具體分析一下在這個實例場景下消息隊列服務部分的架構設計(即上圖中“基於ActiveMQ的消息隊列服務”部分的設計)。架構詳細設計部分分為硬件結構設計和軟件規則設計部分,我們首先討論硬件設計部分的方案。

3-3-1、硬件方案部分

其中硬件部分的設計來源於上一節文章中已經提到的ActiveMQ服務集群的綜合應用(《架構設計:系統間通信(26)——ActiveMQ集群方案(下)》),為了保證每個ActiveMQ節點都能高效工作,我們還按照上文提到的ActiveMQ服務單節點的性能優化原則進行了相應配置(《架構設計:系統間通信(22)——提高ActiveMQ工作性能(上)》)。

這裡寫圖片描述

在這個示例的應用場景中,雖然高並發性並不是建設方主要追求的。但如上文所述,為了保證在數據洪峰出現時數據處理壓力不傳遞給業務服務,並且ActiveMQ服務集群能夠盡快完成數據洪峰的吞吐工作(在建設方預算允許的情況下),我們為每一組ActiveMQ M/S集群選擇了IBM的基於SAN(Storage Area Network)的共享存儲解決方案。其中使用的IBM Storwize V7000存儲盤陣設置成RIDA5模式,並配置20TB存儲空間。

實際上在這個示例場景中,之所以采用這樣的硬件設計方案更是為了在有限的篇幅內為讀者講解更多的設計方式。由於使用了基於SAN的共享存儲方案,所以之前提到的LevelDB + zookeeper的熱備方案就不必再使用了(當然LevelDB + zookeeper的方案也是可選方案)。為了節約成本,也可以多組SAN共享存儲使用用一台FC 光交換機和一台存儲盤陣,但是這樣可能出現因為FC光交換機的單點故障或者磁盤陣列單點故障導致整個集群宕機的情況

這裡寫圖片描述

3-3-2、軟件規則部分

在前文提到,由於省級系統都使用了全國系統統一提供的開發包進行上行消息和下行消息的處理,所以接入消息同步功能的所有系統都不必擔心消息文本的格式問題;那麼在ActiveMQ消息隊列服務的業務規則部分,最重要的規則就是如何規劃上行消息和下行消息存儲的隊列。

這裡寫圖片描述

如上圖所示所有省級系統的上行消息同時共享一個消息隊列,這是因為這些省級系統都是使用上級系統統一提供的開發包進行二次開發,所以無論哪個省級系統向上同步的消息格式都是一致的(且進行了內容加密),所以它們可以共享一個消息隊列,並由上級系統使用一套相同的處理邏輯進行接受。

當上級系統發現有跨省產生的違法記錄時,就需要通過下行隊列將這個違法記錄發送給違法者所在省的省級系統,這些下行信息由於有不同的消費者(省級系統),且這些消費者所涉及的業務處理邏輯都可能不一樣,所以應該使用不同的消息隊列來發送針對不同省級系統的下行隊列。另外,這樣的消息下發機制還可以保證在省級系統出現故障時,下行消息不會丟失——直到這些下行消息被對應的省級系統正確處理。

3-4、主要代碼片段

由於整個方案需要相當的代碼編寫工作,所以不可能在這個示例場景中演示所有的代碼實現。為了讓讀者能夠了解其中更細節的實現情況,在這個小節中我們重點演示主要的代碼實現片段(使用Java語言)。包括省級系統開發包中如何進行上行隊列的連接,如何開始監聽下行隊列——只有同時成功建立上行隊列連接和下行隊列連接,才能認為信息同步模塊啟動成功了

為了保證信息同步模塊獨立於現有系統的其他功能模塊進行工作,應該使用專門的新線程建立上行隊列連接和下行隊列連接:

package mq.test.blog;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;

/**
 * 這個啟動器用於啟動上行隊列和下行隊列的連接。
 * 上行隊列是一個獨立的線程,下行隊列也是一個獨立的線程。
 * 
 * 另外,上行隊列和下行隊列都可以使用同一個session
 * @author yinwenjie
 */
public class ClientStartup implements Runnable {

    /**
     * 下行隊列名稱(可存放於配置文件中)
     */
    private String downStream = "downStream";

    /**
     * 保證整個進程只有一個Producer被創建和使用
     */
    private static MessageProducer PRODUCER;

    /**
     * 標示該啟動器是否正常連接到消息中間件服務
     */
    private static boolean ISSTARTED = false;

    /**
     * 這個靜態方法用於從ClientStartup啟動器中獲取整個進程中唯一一個消息生產者。
     * 注意,為了保證該進程其它線程安全獲取ClientStartup.PRODUCER,
     * 所以只有等待run()方法成功運行完成,該ClientStartup.PRODUCER才能被其它線程拿到。
     * @return
     */
    public static MessageProducer getNewInstanceProducer() {
        synchronized (ClientStartup.class) {
            while(!ClientStartup.ISSTARTED) {
                try {
                    ClientStartup.class.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace(System.out);
                }
            }
        }

        return ClientStartup.PRODUCER;
    }

    @Override
    public void run() {
        // 開發包中對於消息中間件服務的連接一定要使用故障轉移
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://192.168.61.138:61616,tcp://192.168.61.139:61616)");
        // 這是上行消息隊列
        ActiveMQQueue upstreamQueue = new ActiveMQQueue("upstream");
        // 這是下行消息隊列
        // 下行消息隊列必須由上級系統創建,並且在下級系統使用的開發包所對應的配置文件中進行配置
        ActiveMQQueue downStreamQueue = new ActiveMQQueue(this.downStream);

        //============開始創建
        Connection connection = null;
        Session session = null;
        try {
            //ack優化選項
            connectionFactory.setOptimizeAcknowledge(true);
            connectionFactory.setProducerWindowSize(2048000);
            connectionFactory.setSendAcksAsync(true);
            //ack信息最大發送周期
            connectionFactory.setOptimizeAcknowledgeTimeOut(5000);
            //連接屬性優化:設置重試次數為2
            RedeliveryPolicy redeliveryPolicy = connectionFactory.getRedeliveryPolicy();
            redeliveryPolicy.setMaximumRedeliveries(2);
            //連接屬性優化:設置預取數量
            ActiveMQPrefetchPolicy prefetchPolicy =  connectionFactory.getPrefetchPolicy();
            prefetchPolicy.setQueuePrefetch(20);
            //設置獲取消息的線程池大小
            connectionFactory.setMaxThreadPoolSize(2);
            connection = connectionFactory.createQueueConnection();
            //連接
            connection.start();

            //建立會話(設置一個帶有事務特性的會話)
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        } catch(Exception e) {
            e.printStackTrace(System.out);
            return;
        }

        //===========首先進行訂閱消費者連接(下行消息隊列的連接)
        //注意,正式代碼中不應該允許JMS Client創建一個新的隊列
        //所以應該使用其它方式(例如其他查詢接口),在創建前判斷隊列是否已經存在
        MessageConsumer consumer;
        try {
            consumer = session.createConsumer(downStreamQueue);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    /*
                     * 這裡進行正式業務的處理
                     * */
                }
            });
        } catch (JMSException e) {
            e.printStackTrace(System.out);
            // 一旦出錯,就關閉整個連接,退出啟動過程
            try {
                connection.close();
            } catch (JMSException e1) {
                e.printStackTrace(System.out);
            }
            return;
        }

        //==========然後創建消息生產者倆呢及(上行消息隊列的連接)
        try {
            ClientStartup.PRODUCER = session.createProducer(upstreamQueue);
        } catch (JMSException e) {
            e.printStackTrace(System.out);
            // 一旦出錯,就關閉整個連接,退出啟動過程
            try {
                connection.close();
            } catch (JMSException e1) {
                e.printStackTrace(System.out);
            }
            return;
        }

        //==========通知其他線程可以獲取producer了
        ClientStartup.ISSTARTED = true;
        synchronized (ClientStartup.class) {
            ClientStartup.class.notify();
        }

        //==========鎖定該線程
        synchronized (this) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace(System.out);
            }
        }
    }

    public static void main(String[] args) {
        new Thread(new ClientStartup()).start();
    }
}

3-5、其它說明

安全性考量:在正式環境中使用消息隊列中間件服務一定要做相關的安全性設置。包括啟用消息隊列服務的用戶名和密碼、啟用消息隊列服務自帶的SSL加密設置。如果您使用的消息隊列服務不自帶SSL加密,則一定要自己進行加密。幸運的是,如果您使用的是ActiveMQ,那麼以上兩種安全性要求都可以滿足。甚至ActiveMQ還支持為每一個隊列單獨進行用戶名和密碼設置。

錯誤數據的處理:在正式環境中使用消息隊列中間件服務一定要假設會發生傳輸的消息由於各種業務原因導致的消費者處理錯誤的情況。所以對超出redeliveryCounter重試次數的錯誤消息一定要轉存到另外的“待處理區域”,並在後續進行人工干預。在ActiveMQ中這個“待處理區域”就是死消息隊列:ActiveMQ.DLQ。

在產品預算內賦予消息服務中間件最大的可用性:類似於ActiveMQ、RabbitMQ這樣的消息隊列中間件,其目的並不是一味地追求單位時間內消息數據的吞吐量/並發量的處理能力。它們的功能中涵蓋了諸多功能:事務機制、確認機制、重試機制、熱備機制等等,都是為了一個更重要的功能目的:保證消息完整可達。所以您和您的團隊一定要按照業務特性來確定是否適合使用這樣的中間件服務,並且您需要在預算范圍內為您的消息服務中間件配置多個服務節點、多個存儲單元,以便保證消息隊列中間件能夠完成它的任務——消息完整可達。

下文我們一起討論一下那些專門解決高數據吞吐性能問題的消息中間件產品以及它們的應用場景。

(接下文)

  1. 上一頁:
  2. 下一頁:
熱門文章
閱讀排行版
Copyright © Android教程網 All Rights Reserved