收藏 分销(赏)

activemqcpp开发基础手册专业资料.doc

上传人:快乐****生活 文档编号:3032803 上传时间:2024-06-13 格式:DOC 页数:115 大小:341.04KB
下载 相关 举报
activemqcpp开发基础手册专业资料.doc_第1页
第1页 / 共115页
activemqcpp开发基础手册专业资料.doc_第2页
第2页 / 共115页
activemqcpp开发基础手册专业资料.doc_第3页
第3页 / 共115页
activemqcpp开发基础手册专业资料.doc_第4页
第4页 / 共115页
activemqcpp开发基础手册专业资料.doc_第5页
第5页 / 共115页
点击查看更多>>
资源描述

1、Activemq-cpp开发手册丁靖-05-061 引言1.1 编写目迅速学习CMS,提高CMS开发效率,提供一种CMS开发参照手册详细API手册请参照1.2 功能简介Activemq-cpp是一种与ActiveMQ交互通讯C+ API开发库,为C+开发者提供了一种访问ActiveMQ接口。Winkeemq-cpp是一种在Activemq-cpp基本上封装API库,对某些重复机械初始化及销毁清除及某些不关怀细节进行了封装,从而简化了编程。1.3 术语解析ActiveMQ :开源消息队列服务器Broker :消息中介,每个消息队列服务器中至少有一种broker,是消息队列载体Destinatio

2、n :消息在broker上目地Queue :消息队列 Topic :主题 Message :消息 Producer :消息产生者 Consumer :消息消费者Client :客户端,生产者和消费者都在客户端上 Server :Activemq服务器BrokerUri :客户端访问服务器上broker时Uri其他资料请参照2 开发前准备在开发前必要先安装activemq-cpp及winkeemq-cpp库,详细环节参照activemq-cpp安装及使用文档.doc3 CMS 3.1 概述CMS(stands for C+ Messaging Service)是一组C+应用程序接口(C+ API

3、),它提供创立、发送、接受、读取消息服务。定义了一组和Sun公司和它合伙伙伴设计CMS API相似公共应用程序接口和相应语法,使得C+程序可以和其她消息组件进行通信。 CMS是一种与厂商无关 API,用来访问消息收发系统。它类似于 JDBC (Java Database Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库 API,而 CMS 则提供同样与厂商无关访问办法,以访问消息收发服务。CMS 使您可以通过消息收发服务(有时称为消息中介程序或路由器)从一种 CMS 客户机向另一种客户机发送消息。消息是 CMS 中一种类型对象,由两某些构成:报头和消息主体。报头由路

4、由信息以及关于该消息元数据构成。消息主体则携带着应用程序数据或有效负载。依照有效负载类型来划分,可以将消息分为几种类型,它们分别携带:简朴文本 (TextMessage)、可序列化对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),尚有无有效负载消息 (Message)。 消息收发系统是异步,也就是说,CMS 客户机可以发送消息而不必等待回应。比较可知,这完全不同于基于 RPC (基于远程过程)系统,如 EJB 1.1、CORBA 和 Java RMI 引用实现。在 RPC 中,客户机调用服务

5、器上某个分布式对象一种办法。在办法调用返回之前,该客户机被阻塞;该客户机在可以执行下一条指令之前,必要等待办法调用结束。在 CMS 中,客户机将消息发送给一种虚拟通道(主题或队列),而其他 CMS 客户机则预订或监听这个虚拟通道。当 CMS 客户机发送消息时,它并不等待回应。它执行发送操作,然后继续执行下一条指令。消息也许最后转发到一种或许各种客户机,这些客户机都不需要作出回应。 CMS通用接口集合以异步方式发送或接受消息。异步方式接受消息显然是使用间断网络连接客户 机,诸如移动电话和PDA最佳选取。此外, CMS采用一种宽松结合方式整合公司系统办法,其重要目就是创立可以使用跨平台数据信息、可

6、移植公司级应用程序,而把开发人力解放出来。CMS消息服务支持两种消息模型:Point-to-Point消息(P2P)和发布订阅消息(Publish Subscribe messaging,简称Pub/Sub)。CMS规范并不规定供应商同步支持这两种消息模型,但开发者应当熟悉这两种消息模型优势与缺陷。P2P消息模型是在点对点之间传递消息时使用。如果应用程序开发者但愿每一条消息都可以被解决,那么应当使用P2P消息模型。与Pub/Sub消息模型不同,P2P消息总是可以被传送到指定位置。Pub/Sub模型在一到多消息广播时使用。如果一定限度消息传递不可靠性可以被接受话,那么应用程序开发者也可以使用Pu

7、b/Sub消息模型。换句话说,它合用于所有消息消费程序并不规定可以收到所有信息或者消息消费程序并不想接受到任何消息状况。CMS通过容许创立持久订阅来简化时间有关性,虽然消息预订者未激活也可以接受到消息。此外,使用 持久订阅还可通过队列提供灵活性和可靠性,而依然容许消息被发给许多接受者。 Topic Subscriber topic Subscriber = topicSession.createDurableSubscriber(topic,subscriptionName);Connection对象表达了到两种消息模型中任一种消息系统连接。服务器端和客户机端对象规定管理创立CMS连接状态。连

8、接是由 Connection Factory创立并且通过JNDI查寻定位。 /获得用于 P2P QueueConnectionFactory QueueConnectionFactory = queueConnectionFactory();Context messaging = new InitialContext();QueueConnectionFactory = (QueueConnectionFactory) Messaging.lookup(“QueueConnectionFactory”); /获得用于 pub/sub TopicConnectionFactory TopicCo

9、nnectonFactory topicConnectionFactory; Context messaging = new InitialContext(); topicConnectionFactory= (TopicConnectionFactory)messaging.lookup(“TopicConnectionFactory”);注意:用于P2P代码和用于PublishSubscribe代码非常相似。如果session被标记为transactional话,确认消息就通过确认和校正来自动地解决。如果session没有标记为 transactional,你有三个用于消息确认选项。 AU

10、TO_ACKNOWLEDGE session将自动地确认收到一则消息。 CLIENT_ACKNOWLEDGE 客户端程 序将确认收到一则消息,调用这则消息确认办法。 DUPS_OK_ACKNOWLEDGE 这个选项命令session“懒散”确认消息传递,可以想到,这将导致消息提供者传递某些复制消息也许会出错。这种确认方式只应当用于消息消费程序 可以容忍潜在副本消息存在状况。 queueSession =queueConnection.createQueueSession(false,session.AUTO_ACKNOWLEDGE);/P2P topicSession = topicConne

11、ction.createTopicSession(false,session.AUTO_ACKNOWLEDGE);/Pub-Sub注意:在本例中,一种session目从连结中创立,非值指出session是non-transactional,并且 session将自动地确认收到一则消息。CMS当前有两种传递消息方式。标记为NON_PERSISTENT消息最多投递一次,而标记 为PERSISTENT消息将使用暂存后再转送机理投递。如果一种CMS服务离线,那么持久性消息不会丢失但是得等到这个服务恢复联机时才会被传递。 因此默认消息传递方式是非持久性。虽然使用非持久性消息也许减少内务和需要存储器,并且

12、这种传递方式只有当你不需要接受所有消息时才使用。虽然 CMS规范并不需要CMS供应商实现消息优先级路线,但是它需要递送加快消息优先于普通级别消息。CMS定义了从0到9优先级路线级别,0是最低 优先级而9则是最高。更特殊是0到4是正常优先级变化幅度,而5到9是加快优先级变化幅度。举例来说: topicPublisher.publish (message,DeliveryMode.PERSISTENT,8,10000);/Pub-Sub 或 queueSender.send(message,DeliveryMode.PERSISTENT,8,10000);/P2P 这个代码片断,有两种消息模型,映

13、射递送方式是持久,优先级为加快型,生存周期是10000 (以毫秒度量 )。如果生存周期设立为零,这则消息将永远不会过期。当消息需要时间限制否则将使其无效时,设立生存周期是有用。CMS定义了五种不同消息正文格式,以及调用消息类型,容许你发送并接受以某些不同形式数据,提供既有消息格式某些级别兼容性。 StreamMessage - Java原始值数据流 MapMessage-一套名称-值对 TextMessage-一种字符串对象 ObjectMessage-一种序列化 Java对象 BytesMessage-一种未解释字节数据流CMS应用程序接口提供用于创立每种类型消息和设立荷载办法例如,为了在一

14、种队列创立并发送一种 TextMessage实例,你可以使用下列语句: TextMessage message = queueSession.createTextMessage();message.setText(textMsg);以异步方式接受消息,需要创立一种消息监听器然后注册一种或各种使用MessageConsumerCMS MessageListener接口。会话(主题或队列)负责产生某些消息,这些消息被传送到使用onMessage办法监听者那里。 Using namespace cms; class ExampleListener :public MessageListener /把

15、消息强制转化为TextMessage格式 public void onMessage(Message message) TextMessage textMsg = null; / 打开并解决这段消息 当咱们创立QueueReceiver和TopicSubscriber时,咱们传递消息选取器字符串: /P2P QueueReceiver QueueReceiver receiver;receiver = session.createReceiver(queue,selector); /Pub-Sub TopicSubscriber TopicSubscriber subscriber;subsc

16、riber = session.createSubscriber(topic,selector);为了启动消息交付,无论是Pub/Sub还是P2P,都需要调用start办法。 TopicConnection.start();/pub-sub QueueConnection.start();/P2P 当一条消息被捕获时,这条消息做为一条必要被强制转化为恰当消息类型普通Message对象到达。如TextMessagevoid onMessage(const Message* message) TextMessage txtMsg=dynamic_cast(message); /对 txtMsg做某

17、些解决停止消息传递,无论是Pub/Sub还是P2P,都调用stop办法。 TopicConnection. stop ();/pub-sub QueueConnection. stop ();/P2P 3.2 接口描述CMS 支持两种消息类型P2P 和Pub/Sub,分别称作:P2P Domain 和Pub/Sub Domain,这两种接口都继承统一CMS Parent 接口,CMS 重要接口如下所示:CMS ParentConnectionFactoryConnectionDestinationSessionMessageProducerMessageConsumer 如下是对这些接口简朴描

18、述: ConnectionFactory :连接工厂,CMS 用它创立连接 Connection :CMS 客户端到CMS Provider 连接 Destination :消息目地 Session: 一种发送或接受消息线程 MessageProducer: 由Session 对象创立用来发送消息对象 MessageConsumer: 由Session 对象创立用来接受消息对象3.3 CMS消息模型CMS 消息由如下几某些构成:消息头,属性,消息体。 消息头(Header) - 消息头包括消息辨认信息和路由信息,消息头包括某些原则属性如:CMSDestination,CMSMessageID

19、等。 消息头由谁设立CMSDestinationsend 或 publish 办法CMSDeliveryModesend 或 publish 办法CMSExpirationsend 或 publish 办法CMSPrioritysend 或 publish 办法CMSMessageIDsend 或 publish 办法CMSTimestampsend 或 publish 办法CMSCorrelationID客户CMSReplyTo客户CMSType客户CMSRedeliveredCMS Provider 属性(Properties) - 除了消息头中定义好原则属性外,CMS 提供一种机制增长新

20、属性到消息头中,这种新属性包括如下几种: 1. 应用需要用到属性; 2. 消息头中原有某些可选属性; 3. CMS Provider 需要用到属性。 原则CMS 消息头包括如下属性: CMSDestination -消息发送目地 CMSDeliveryMode -传递模式, 有两种模式: PERSISTENT 和NON_PERSISTENT,PERSISTENT 表达该消息一定要被送到目地,否则会导致应用错误。NON_PERSISTENT 表达偶尔丢失该消息是被容许,这两种模式使开发者可以在消息传递可靠性和吞吐量之间找到平衡点。 CMSMessageID 唯一辨认每个消息标记,由CMS Pro

21、vider 产生。 CMSTimestamp 一种消息被提交给CMS Provider 到消息被发出时间。 CMSCorrelationID 用来连接到此外一种消息,典型应用是在回答消息中连接到原消息。 CMSReplyTo 提供本消息回答消息目地址。 CMSRedelivered 如果一种客户端收到一种设立了CMSRedelivered 属性消息,则表达也许该客户端曾经在早些时候收到过该消息,但并没有签收(acknowledged)。 CMSType 消息类型辨认符。 CMSExpiration 消息过期时间,等于QueueSender send 办法中timeToLive 值或TopicP

22、ublisher publish 办法中timeToLive 值加上发送时刻GMT 时间值。如果timeToLive值等于零,则CMSExpiration 被设为零,表达该消息永但是期。如果发送后,在消息过期时间之后消息还没有被发送到目地,则该消息被清除。 CMSPriority 消息优先级,从0-9 十个级别,0-4 是普通消息,5-9 是加急消息。CMS 不规定CMS Provider 严格按照这十个优先级发送消息,但必要保证加急消息要先于普通消息到达。 消息体(Body) - CMS API 定义了4种消息体格式,也叫消息类型,你可以使用不同形式发送接受数据并可以兼容既有消息格式,下面描

23、述这4种类型: 消息类型消息体TextMessagestring对象,如xml文献内容MapMessage名/值对集合,名是string对象,值类型可以是c+任何基本类型BytesMessage字节流ObjectMessage对象类型Message没有消息体,只有消息头和属性。下例演示创立并发送一种TextMessage到一种队列: TextMessage message = queueSession.createTextMessage();message.setText(msg_text);/ msg_text is a Stringmessage.setCMSType(“text”);qu

24、eueSender.send(message);下例演示接受消息并转换为适当消息类型: Message* m = queueReceiver.receive();If (m-getCMSType() = “text”)TextMessage txt=dynamic_cast(m);/ do somethingelse4 消息生产者客户端消息生产者产生消息并将消息发送到broker上队列或主题中。要使消息生产者生产消息被消息消费者消费,必要满足两个条件:生产者和消费者必要连接到同一种Broker,即BrokerUri中主机名和端口相似生产者和消费者必要具备相似destination,即同一种队列

25、名或主题名4.1 使用activemq-cpp来创立消息生产者4.1.1 头文献及名字空间#include #include #include #include #include #include #include #include using namespace activemq;using namespace activemq:core;using namespace cms;using namespace std;4.1.2 创立一种生产者类class SimpleProducer private: Connection* connection; /连接对象 Session* sessi

26、on; /会话 Destination* destination; /消息目地 MessageProducer* producer;/消息生产者 bool useTopic; /与否采用采用主题模式 bool clientAck; /与否自动确认消息接受 unsigned int numMessages; /生产消息数 std:string brokerURI; /连接borker uri std:string destURI; /队列或主题名public:/./构造函数SimpleProducer( const std:string& brokerURI, unsigned int numM

27、essages, const std:string& destURI, bool useTopic = false, bool clientAck = false ) connection = NULL; session = NULL; destination = NULL; producer = NULL; this-numMessages = numMessages; this-useTopic = useTopic; this-brokerURI = brokerURI; this-destURI = destURI; this-clientAck = clientAck;initial

28、ize();virtual SimpleProducer() cleanup();4.1.3 初始化及销毁/ 初始化private:Virtual void initialize()try / 创立连接工厂 ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory( brokerURI ); / 创立一种到broker连接 connection = connectionFactory-createConnection(); connection-start(); / 关闭连接工厂 delete co

29、nnectionFactory; / 创立一种会话 if( clientAck ) /消息接受后由消费者客户端确认 session= connection-createSession( Session:CLIENT_ACKNOWLEDGE ); else /消息接受自动确认 session = connection-createSession( Session:AUTO_ACKNOWLEDGE ); / 创立一种队列或主题 (Topic or Queue) if( useTopic ) destination = session-createTopic( destURI ); else des

30、tination = session-createQueue( destURI ); / 创立生产者并设定消息传送模式 producer = session-createProducer( destination ); producer-setDeliveryMode( DeliveryMode:NON_PERSISTENT );catch ( CMSException& e ) e.printStackTrace(); / 销毁void cleanup() / Destroy resources. try if( destination != NULL ) delete destinatio

31、n; catch ( CMSException& e ) e.printStackTrace(); destination = NULL; try if( producer != NULL ) delete producer; catch ( CMSException& e ) e.printStackTrace(); producer = NULL; / Close open resources. try if( session != NULL ) session-close(); if( connection != NULL ) connection-close(); catch ( CM

32、SException& e ) e.printStackTrace(); try if( session != NULL ) delete session; catch ( CMSException& e ) e.printStackTrace(); session = NULL; try if( connection != NULL ) delete connection; catch ( CMSException& e ) e.printStackTrace(); connection = NULL; 4.1.4 生产一种消息并发送到队列中public :void send() / 消息内

33、容 string text = (string)Hello world!thread ; for( std:size_t ix=0;ixcreateTextMessage( text ); / 发送消息 printf( Sent message #%d n,ix+1 ); producer-send( message ); / 释放消息 delete message;4.1.5 发送消息主程序Int main(void) / broker uristd:string brokerURI = tcp:/127.0.0.1:61616 ?wireFormat=openwire &transport

34、.useAsyncSend=true/ 发送消息数unsigned int numMessages = ;/ 消息队列名std:string destURI = TEST.FOO;/ 使用队列模式bool useTopics = false; /初始化一种消息生产者对象并发送消息 SimpleProducer producer( brokerURI,numMessages,destURI,useTopics );producer.send();return 0;4.1.6 总结综上例子可知,每次发送一种消息到消息队列中都需要定义一种生产者类,并完毕一种机械初始化及销毁过程。为了提高软件开发效率

35、,可以模仿生产者类定义一种消息发送者类,封装有关细节,并编译成共享库以供使用,简化编程过程。4.2 使用winkeemq-cpp来创立消息生产者4.2.1 头文献及名字空间#include using namespace winkeemq;using namespace std;4.2.2 发送消息主程序int main(int argc,char* argv)/ broker uri std:string brokerURI = tcp:/192.168.1.179:61616 ?wireFormat=openwire &wireFormat.maxInactivityDuration=0

36、&soKeepAlive=true &transport.useAsyncSend=true;/ 队列名string mqName=mm.mq;/ 创立一种消息发送对象(采用队列模式,每次只发一种消息)MessageSender ms(brokerURI,1,false,mqName);string body=”hello worldn”;/ 创立一种文本消息TextMessage* msg=dynamic_cast (ms.createMessag(MessageSender:TEXT_MESSAGE);/ 设定消息体内容 msg-setText(body); / 发送消息 ms.sendM

37、essage();/ 销毁消息 ms.deleteMessage();4.2.3 总结由上述例子可看出,采用winkeemq-cpp后裔码量精简了诸多,开发员不需要关怀那些机械初始化细节。要创立一种消息生产者,只需要给定Broker uri,队列名,消息目模式,然后调用MessageSendercreateMessage()创立一种详细类型消息,createMessage()参数是一种在MessageSender中定义一种无名enum, 指明消息类型。调用MessageSendersendMessage()发送消息到broker中,最后销毁消息5 消息消费者客户端消息消费者从Broker上队列

38、或主题中取出消息并做相应解决。5.1 使用activemq-cpp来创立消息消息者5.1.1 头文献及名字空间#include #include #include #include #include #include #include #include #include #include #include using namespace activemq;using namespace activemq:core;using namespace cms;using namespace std;5.1.2 创立一种生产者类class SimpleAsyncConsumer :public Exc

39、eptionListener, public MessageListener private: Connection* connection; /连接对象 Session* session; /会话 Destination* destination; /消息目地 MessageConsumer* consumer;;/消息消费者 bool useTopic; /与否采用采用主题模式 bool clientAck; /与否自动确认消息接受 std:string brokerURI; /连接borker uri std:string destURI; /队列或主题名public:/./构造函数 SimpleAsyncConsumer( const std:string& brokerURI, const std:string& destURI, bool useTopic = false, bool clientAck = false ) connection = NULL; session = NULL; destination = NULL; consumer = NULL; this-useTopic = useTopic; t

展开阅读全文
相似文档                                   自信AI助手自信AI助手
猜你喜欢                                   自信AI导航自信AI导航
搜索标签

当前位置:首页 > 包罗万象 > 大杂烩

移动网页_全站_页脚广告1

关于我们      便捷服务       自信AI       AI导航        获赠5币

©2010-2024 宁波自信网络信息技术有限公司  版权所有

客服电话:4008-655-100  投诉/维权电话:4009-655-100

gongan.png浙公网安备33021202000488号   

icp.png浙ICP备2021020529号-1  |  浙B2-20240490  

关注我们 :gzh.png    weibo.png    LOFTER.png 

客服