您好、欢迎来到现金彩票网!
当前位置:红彩会 > 分派优先级 >

JMS ActiveMQ研究文档

发布时间:2019-07-12 16:41 来源:未知 编辑:admin

  当前,CORBA、DCOM、RMI等RPC中间件技术已广泛应用于各个领域。但是面对规模和复杂度都越来越高的分布式系统,这些技术也显示出其局限性:(1)同步通信:客户发出调用后,必须等待服务对象完成处理并返回结果后才能继续执行;(2)客户和服务对象的生命周期紧密耦合:客户进程和服务对象进程 都必须正常运行;如果由于服务对象崩溃或者网络故障导致客户的请求不可达,客户会接收到异常;(3)点对点通信:客户的一次调用只发送给某个单独的目标对象。

  面向消息的中间件(Message Oriented Middleware,MOM)较好的解决了以上问题。发送者将消息发送给消息服务器,消息服务器将消息存放在若干队列中,在合适的时候再将消息转发给接收者。这种模式下,发送和接收是异步的,发送者无需等待;二者的生命周期未必相同:发送消息的时候接收者不一定运行,接收消息的时候发送者也不一定运行; 一对多通信:对于一个消息可以有多个接收者。

  JAVA 消息服务(JMS)定义了Java 中访问消息中间件的接口。JMS 只是接口,并没有给予实现,实现JMS 接口的消息中间件称为JMS Provider,例如ActiveMQ。

  在 JMS 编程模型中,JMS 客户端(组件或应用程序)通过 JMS 消息服务交换消息。消息生产者将消息发送至消息服务,消息消费者则从消息服务接收这些消息。这些消息传送操作是使用一组实现 JMS 应用编程接口 (API) 的对象(由 JMS Provide提供)来执行的。

  在 JMS 编程模型中,JMS 客户端使用 ConnectionFactory 对象创建一个连接,向消息服务发送消息以及从消息服务接收消息均是通过此连接来进行。Connection 是客户端与消息服务的活动连接。创建连接时,将分配通信资源以及验证客户端。这是一个相当重要的对象,大多数客户端均使用一个连接来进行所有的消息传送。

  连接用于创建会话。Session 是一个用于生成和使用消息的单线程上下文。它用于创建发送的生产者和接收消息的消费者,并为所发送的消息定义发送顺序。会话通过大量确认选项或通过事务来支持可靠传送。

  客户端使用 MessageProducer 向指定的物理目标(在 API 中表示为目标身份对象)发送消息。生产者可指定一个默认传送模式(持久性消息与非持久性消息)、优先级和有效期值,以控制生产者向物理目标发送的所有消息。

  同样,客户端使用 MessageConsumer 对象从指定的物理目标(在 API 中表示为目标对象)接收消息。消费者可使用消息选择器,借助它,消息服务可以只向消费者发送与选择标准匹配的那些消息。

  消费者可以支持同步或异步消息接收。异步使用可通过向消费者注册 MessageListener 来实现。当会话线程调用 MessageListener 对象的 onMessage 方法时,客户端将使用消息。

  JMS 支持两种截然不同的消息传送模型:PTP(即点对点模型)和Pub/Sub(即发布/订阅模型),分别称作:PTP Domain 和Pub/Sub Domain。

  PTP(使用Queue即队列目标) 消息从一个生产者传送至一个消费者。在此传送模型中,目标是一个队列。消息首先被传送至队列目标,然后根据队列传送策略,从该队列将消息传送至向此队列进行注册的某一个消费者,一次只传送一条消息。可以向队列目标发送消息的生产者的数量没有限制,但每条消息只能发送至、并由一个消费者成功使用。如果没有已经向队列目标注册的消费者,队列将保留它收到的消息,并在某个消费者向该队列进行注册时将消息传送给该消费者。

  Pub/Sub(使用Topic即主题目标) 消息从一个生产者传送至任意数量的消费者。在此传送模型中,目标是一个主题。消息首先被传送至主题目标,然后传送至所有已订阅此主题的活动消费者。可以向主题目标发送消息的生产者的数量没有限制,并且每个消息可以发送至任意数量的订阅消费者。主题目标也支持持久订阅的概念。持久订阅表示消费者已向主题目标进行注册,但在消息传送时此消费者可以处于非活动状态。当此消费者再次处于活动状态时,它将接收此信息。如果没有已经向主题目标注册的消费者,主题不保留其接收到的消息,除非有非活动消费者注册了持久订阅。

  这两种消息传送模型使用表示不同编程域的 API 对象(其语义稍有不同)进行处理,如下所示:

  使用图表第一列中列出的统一域对象编写点对点和发布/订阅消息传送。这是首选方法(JMS 1.1规范)。然而,为了符合早期的 JMS 1.02b 规范,可以使用PTP域对象编写点对点消息传送,使用Pub/Sub域对象编制发布/订阅消息传送。

  消息头包含消息的识别信息和路由信息,消息头包含一些标准的属性如:JMSDestination,JMSMessageID等。

  除了消息头中定义好的标准属性外,JMS 提供一种机制增加新属性到消息头中,这种新属性包含以下几种:

  传送模式, 有两种模式: PERSISTENT 和NON_PERSISTENT,PERSISTENT 表示该消息一定要被送到目的地,否则会导致应用错误。NON_PERSISTENT 表示偶然丢失该消息是被允许的,这两种模式使开发者可以在消息传送的可靠性和吞吐量之间找到平衡点。

  消息过期时间,等于Destination 的send 方法中的timeToLive 值加上发送时刻的GMT 时间值。如果timeToLive值等于零,则JMSExpiration 被设为零,表示该消息永不过期。如果发送后,在消息过期时间之后消息还没有被发送到目的地,则该消息被清除。

  消息优先级,从0-9 十个级别,0-4 是普通消息,5-9 是加急消息。JMS 不要求JMS Provider 严格按照这十个优先级发送消息,但必须保证加急消息要先于普通消息到达。

  如果一个客户端收到一个设置了JMSRedelivered 属性的消息,则表示可能客户端曾经在早些时候收到过该消息,但并没有签收(acknowledged)。

  JMS API 定义了5种消息体格式,也叫消息类型,可以使用不同形式发送接收数据并可以兼容现有的消息格式,下面描述这5种类型:

  PTP(Point-to-Point)模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。和邮件系统中的邮箱一样,队列可以包含各种消息,JMS Provider 提供工具管理队列的创建、删除。JMS PTP 模型定义了客户端如何向队列发送消息,从队列接收消息,浏览队列中的消息。

  客户端用ConnectionFactory 创建Connection 对象。

  一个到JMS Provider 的连接,客户端可以用Connection 创建Session 来发送和接收消息。

  客户端用Session 创建MessageProducer 和MessageConsumer对象。如果在Session 关闭时,有一些消息已经被收到,但还没有被签收(acknowledged),那么,当消费者下次连接到相同的队列时,这些消息还会被再次接收。

  客户端用Session 创建Destination 对象。此处的目标为队列,队列由队列名识别。临时队列只能由创建它的Connection 所创建的消费者消费,但是任何生产者都可向临时队列发送消息。

  客户端用MessageConsumer 接收队列中的消息,如果用户在receive方法中设定了消息选择条件,那么不符合条件的消息会留在队列中,不会被接收到。

  队列可以长久地保存消息直到消费者收到消息。消费者不需要因为担心消息会丢失而时刻和队列保持激活的连接状态,充分体现了异步传输模式的优势。

  JMS Pub/Sub 模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作主题(topic)。

  主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscribe) 从主题订阅消息。主题使得消息订阅者和消息发布者保持互相独立,不需要接触即可保证消息的传送。

  消息订阅分为非持久订阅(non-durable subscription)和持久订阅(durable subscrip-tion),非持久订阅只有当客户端处于激活状态,也就是和JMS Provider 保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。持久订阅时,客户端向JMS 注册一个识别自己身份的ID,当这个客户端处于离线时,JMS Provider 会为这个ID 保存所有发送到主题的消息,当客户再次连接到JMS Provider时,会根据自己的ID 得到所有当自己处于离线时发送到主题的消息。

  客户端用ConnectionFactory 创建Connection 对象。

  一个到JMS Provider 的连接,客户端可以用Connection 创建Session 来发送和接收消息。

  客户端用Session 创建MessageProducer 和MessageConsumer对象。它还提供持久订阅主题,或使用unsubscribe 方法取消消息的持久订阅。

  客户端用Session 创建Destination 对象。此处的目标为主题,主题由主题名识别。临时主题只能由创建它的Connection所创建的消费者消费。临时主题不能提供持久订阅功能。JMS 没有给出主题的组织和层次结构的定义,由JMS Provider 自己定义。

  客户端用MessageConsumer 接收发布到主题上的消息。可以在receive 中设置消息过滤功能,这样,不符合要求的消息不会被接收。

  非持久订阅状态下,不能恢复或重新派送一个未签收的消息。只有持久订阅才能恢复或重新派送一个未签收的消息。

  当所有的消息必须被接收,则用持久订阅模式。当丢失消息能够被容忍,则用非持久订阅模式。

  注:此项仅Solaris8需要安装,Solaris9已自带此uuid头文件和库文件

  注:无需自行编译,直接从下载对应solaris版本和cpu的gcc-3.4.6包文件即可

  广义上说,一个JMS 应用是几个JMS 客户端交换消息,开发JMS 客户端应用由以下几步构成:

  在成功创建正确的ConnectionFactory后,下一步将是创建一个连接,它是JMS定义的一个接口。ConnectionFactory负责返回可以与底层消息传递系统进行通信的Connection实现。通常客户端只使用单一连接。根据JMS文档,Connection的目的是“利用JMS提供者封装开放的连接”,以及表示“客户端与提供者服务例程之间的开放TCP/IP套接字”。该文档还指出Connection应该是进行客户端身份验证的地方,除了其他一些事项外,客户端还可以指定惟一标志符。

  当一个Connection被创建时,它的传输默认是关闭的,必须使用start方法开启。

  Session可以被事务化,也可以不被事务化,通常,可以通过向Connection上的适当创建方法传递一个布尔参数对此进行设置。

  Destination是一个客户端用来指定生产消息目标和消费消息来源的对象。

  其中messageSelector为消息选择器;noLocal标志默认为false,当设置为true时限制消费者只能接收和自己相同的连接(Connection)所发布的消息,此标志只适用于主题,不适用于队列;name标识订阅主题所对应的订阅名称,持久订阅时需要设置此参数。

  消息的同步接收是指客户端主动去接收消息,客户端可以采用MessageConsumer 的receive方法去接收下一个消息。

  JMS提供了一种机制,使用它,消息服务可根据消息选择器中的标准来执行消息过滤。生产者可在消息中放入应用程序特有的属性,而消费者可使用基于这些属性的选择标准来表明对消息是否感兴趣。这就简化了客户端的工作,并避免了向不需要这些消息的消费者传送消息的开销。然而,它也使得处理选择标准的消息服务增加了一些额外开销。

  消息选择器是用于MessageConsumer的过滤器,可以用来过滤传入消息的属性和消息头部分(但不过滤消息体),并确定是否将实际消费该消息。按照JMS文档的说法,消息选择器是一些字符串,它们基于某种语法,而这种语法是SQL-92的子集。可以将消息选择器作为MessageConsumer创建的一部分。

  该选择器检查了传入消息的JMSType属性,并确定了这个属性的值是否等于TOPIC_PUBLISHER。如果相等,则消息被消费;如果不相等,那么消息会被忽略。

  JMS程序的最终目的是生产和消费的消息能被其他程序使用,JMS的 Message是一个既简单又不乏灵活性的基本格式,允许创建不同平台上符合非JMS程序格式的消息。Message由以下几部分组成:消息头,属性和消息体。

  发送消息最可靠的方法就是在事务中发送持久性的消息,ActiveMQ默认发送持久性消息。结束事务有两种方法:提交或者回滚。当一个事务提交,消息被处理。如果事务中有一个步骤失败,事务就回滚,这个事务中的已经执行的动作将被撤销。

  接收消息最可靠的方法就是在事务中接收信息,不管是从PTP模式的非临时队列接收消息还是从Pub/Sub模式持久订阅中接收消息。

  对于其他程序,低可靠性可以降低开销和提高性能,例如发送消息时可以更改消息的优先级或者指定消息的过期时间。

  消息传送的可靠性越高,需要的开销和带宽就越多。性能和可靠性之间的折衷是设计时要重点考虑的一个方面。可以选择生成和使用非持久性消息来获得最佳性能。另一方面,也可以通过生成和使用持久性消息并使用事务会话来获得最佳可靠性。在这两种极端之间有许多选择,这取决于应用程序的要求。

  客户端成功接收一条消息的标志是这条消息被签收。成功接收一条消息一般包括如下三个阶段:

  3.消息被签收。签收可以由ActiveMQ发起,也可以由客户端发起,取决于Session签收模式的设置。

  在带事务的Session中,签收自动发生在事务提交时。如果事务回滚,所有已经接收的消息将会被再次传送。

  在不带事务的Session中,一条消息何时和如何被签收取决于Session的设置。

  当客户端从receive或onMessage成功返回时,Session自动签收客户端的这条消息的收条。在AUTO_ACKNOWLEDGE的Session中,同步接收receive是上述三个阶段的一个例外,在这种情况下,收条和签收紧随在处理消息之后发生。

  客户端通过调用消息的acknowledge方法签收消息。在这种情况下,签收发生在Session层面:签收一个已消费的消息会自动地签收这个Session所有已消费消息的收条。

  此选项指示Session不必确保对传送消息的签收。它可能引起消息的重复,但是降低了Session的开销,所以只有客户端能容忍重复的消息,才可使用(如果ActiveMQ再次传送同一消息,那么消息头中的JMSRedelivered将被设置为true)。

  对队列来说,如果当一个Session终止时它接收了消息但是没有签收,那么ActiveMQ将保留这些消息并将再次传送给下一个进入队列的消费者。

  对主题来说,如果持久订阅用户终止时,它已消费未签收的消息也将被保留,直到再次传送给这个用户。对于非持久订阅,AtiveMQ在用户Session关闭时将删除这些消息。

  如果使用队列和持久订阅,并且Session没有使用事务,那么可以使用Session的recover方法停止Session,再次启动后将收到它第一条没有签收的消息,事实上,重启后Session一系列消息的传送都是以上一次最后一条已签收消息的下一条为起点。如果这时有消息过期或者高优先级的消息到来,那么这时消息的传送将会和最初的有所不同。对于非持久订阅用户,重启后,ActiveMQ有可能删除所有没有签收的消息。

  这是ActiveMQ的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。这意味着在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。

  保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。此模式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢失。

  1.使用setDeliveryMode方法,这样所有的消息都采用此传送模式;

  如果不指定传送模式,那么默认是持久性消息。如果容忍消息丢失,那么使用非持久性消息可以改善性能和减少存储的开销。

  通常,可以确保将单个会话向目标发送的所有消息按其发送顺序传送至消费者。然而,如果为这些消息分配了不同的优先级,消息传送系统将首先尝试传送优先级较高的消息。

  1.使用setDeliveryMode方法,这样所有的消息都采用此传送模式;

  消息优先级从0-9十个级别,0-4是普通消息,5-9是加急消息。如果不指定优先级,则默认为4。JMS不要求严格按照这十个优先级发送消息,但必须保证加急消息要先于普通消息到达。

  默认情况下,消息永不会过期。如果消息在特定周期内失去意义,那么可以设置过期时间。

  消息过期时间,send 方法中的timeToLive 值加上发送时刻的GMT 时间值。如果timeToLive值等于零,则JMSExpiration 被设为零,表示该消息永不过期。如果发送后,在消息过期时间之后消息还没有被发送到目的地,则该消息被清除。

  ActiveMQ通过createTemporaryQueue和createTemporaryTopic创建临时目标,这些目标持续到创建它的Connection关闭。只有创建临时目标的Connection所创建的客户端才可以从临时目标中接收消息,但是任何的生产者都可以向临时目标中发送消息。如果关闭了创建此目标的Connection,那么临时目标被关闭,内容也将消失。

  某些客户端需要一个目标来接收对发送至其他客户端的消息的回复。这时可以使用临时目标。Message的属性之一是JMSReplyTo属性,这个属性就是用于这个目的的。可以创建一个临时的Destination,并把它放入Message的JMSReplyTo属性中,收到该消息的消费者可以用它来响应生产者。

  消费者接收这条消息时,会从JMSReplyTo字段中提取临时Destination,并且会通过应用程序构造一个MessageProducer,以便将响应消息发送回生产者。这展示了如何使用JMS Message的属性,并显示了私有的临时Destination的有用之处。它还展示了客户端可以既是消息的生产者,又可以是消息的消费者。

  通过为发布者设置PERSISTENT传送模式,为订阅者时使用持久订阅,这样可以保证Pub/Sub程序接收所有发布的消息。

  消息订阅分为非持久订阅(non-durable subscription)和持久订阅(durable subscription),非持久订阅只有当客户端处于激活状态,也就是和ActiveMQ保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。持久订阅时,客户端向ActiveMQ注册一个识别自己身份的ID,当这个客户端处于离线时,ActiveMQ会为这个ID 保存所有发送到主题的消息,当客户端再次连接到ActiveMQ时,会根据自己的ID 得到所有当自己处于离线时发送到主题的消息。持久订阅会增加开销,同一时间在持久订阅中只有一个激活的用户。

  其中messageSelector为消息选择器;noLocal标志默认为false,当设置为true时限制消费者只能接收和自己相同的连接(Connection)所发布的消息,此标志只适用于主题,不适用于队列;name标识订阅主题所对应的订阅名称,持久订阅时需要设置此参数。

  在事务中生成或使用消息时,ActiveMQ跟踪各个发送和接收过程,并在客户端发出提交事务的调用时完成这些操作。如果事务中特定的发送或接收操作失败,则出现异常。客户端代码通过忽略异常、重试操作或回滚整个事务来处理异常。在事务提交时,将完成所有成功的操作。在事务进行回滚时,将取消所有成功的操作。

  本地事务的范围始终为一个会话。也就是说,可以将单个会话的上下文中执行的一个或多个生产者或消费者操作组成一个本地事务。

  不但单个会话可以访问 Queue 或 Topic (任一类型的 Destination ),而且单个会话实例可以用来操纵一个或多个队列以及一个或多个主题,一切都在单个事务中进行。这意味着单个会话可以(例如)创建队列和主题中的生产者,然后使用单个事务来同时发送队列和主题中的消息。因为单个事务跨越两个目标,所以,要么队列和主题的消息都得到发送,要么都未得到发送。类似地,单个事务可以用来接收队列中的消息并将消息发送到主题上,反过来也可以。

  由于事务的范围只能为单个的会话,因此不存在既包括消息生成又包括消息使用的端对端事务。(换句话说,至目标的消息传送和随后进行的至客户端的消息传送不能放在同一个事务中。)

  ActiveMQ支持生产者以同步或异步模式发送消息。使用不同的模式对send方法的反应时间有巨大的影响,反映时间是衡量ActiveMQ吞吐量的重要因素,使用异步发送可以提高系统的性能。

  在默认大多数情况下,AcitveMQ是以异步模式发送消息。例外的情况:在没有使用事务的情况下,生产者以PERSISTENT传送模式发送消息。在这种情况下,send方法都是同步的,并且一直阻塞直到ActiveMQ发回确认消息:消息已经存储在持久性数据存储中。这种确认机制保证消息不会丢失,但会造成生产者阻塞从而影响反应时间。

  高性能的程序一般都能容忍在故障情况下丢失少量数据。如果编写这样的程序,可以通过使用异步发送来提高吞吐量(甚至在使用PERSISTENT传送模式的情况下)。

  在ActiveMQ4中,支持ActiveMQ以同步或异步模式向消费者分派消息。这样的意义:可以以异步模式向处理消息慢的消费者分配消息;以同步模式向处理消息快的消费者分配消息。

  ActiveMQ默认以同步模式分派消息,这样的设置可以提高性能。但是对于处理消息慢的消费者,需要以异步模式分派。

  在ActveMQ分布式环境中,在有消费者存在的情况下,如果更希望ActveMQ发送消息给消费者而不是其他的ActveMQ到ActveMQ的传送,可以如下设置:

  ActiveMQ维护队列消息的顺序并顺序把消息分派给消费者。但是如果建立了多个Session和MessageConsumer,那么同一时刻多个线程同时从一个队列中接收消息时就并不能保证处理时有序。

  有时候有序处理消息是非常重要的。ActiveMQ4支持独占的消费。ActiveMQ挑选一个MessageConsumer,并把一个队列中所有消息按顺序分派给它。如果消费者发生故障,那么ActiveMQ将自动故障转移并选择另一个消费者。可以如下设置:

  在1.1版本之后,ActiveMQ支持混合目标技术。它允许在一个JMS目标中使用一组JMS目标。

  例如可以利用混合目标在同一操作中用向12个队列发送同一条消息或者在同一操作中向一个主题和一个队列发送同一条消息。

  如果在一个目标中混合不同类别的目标,可以通过使用“queue://”和“topic://”前缀来识别不同的目标。

  ActiveMQ的目标之一就是高性能的数据传送,所以ActiveMQ使用“预取限制”来控制有多少消息能及时的传送给任何地方的消费者。

  一旦预取数量达到限制,那么就不会有消息被分派给这个消费者直到它发回签收消息(用来标识所有的消息已经被处理)。

  可以为每个消费者指定消息预取。如果有大量的消息并且希望更高的性能,那么可以为这个消费者增大预取值。如果有少量的消息并且每条消息的处理都要花费很长的时间,那么可以设置预取值为1,这样同一时间,ActiveMQ只会为这个消费者分派一条消息。

  进入%JAVA_HOME%/bin,双击jconsole.exe即出现如下画面,在对话框中输入ActiveMQ服务主机的地址,JXM的端口和主机登陆帐号。

  测试中发现,当C++客户端异常退出时(即没有正常调用close函数关闭连接),ActiveMQ并不能检测到C++客户端的连接已经中断,这时如果向队列中发送消息,那么第一条消息就会丢失,这时ActiveMQ才能检测到这个连接是中断的。

  在ActiveMQ论坛反应此问题后,开发人员答复并建议使用CLIENT_ACKNOWLEDGE签收模式。但是此模式会造成消息重复接收。

  测试选用上述两个未正式发布的版本,未选用正式发布的ActiveMQ 4.1.0 Release版本是因为此版本bug较多。

  在测试中发现,如果重启其中一台机器上的ActiveMQ,其他机器的ActiveMQ有可能会打印:

http://m3-ctech.com/fenpaiyouxianji/617.html
锟斤拷锟斤拷锟斤拷QQ微锟斤拷锟斤拷锟斤拷锟斤拷锟斤拷锟斤拷微锟斤拷
关于我们|联系我们|版权声明|网站地图|
Copyright © 2002-2019 现金彩票 版权所有