`
zc985552943
  • 浏览: 287355 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
Babe4ca6-5e6f-33aa-9078-762ee3ccfb7e
云计算--hadoop
浏览量:11490
5e98c2c1-2a82-3388-bc80-7fca0170bb12
redis解说
浏览量:26649
088014c7-4d3f-39ce-b72e-4ebe7046a134
MongoDB读书笔记
浏览量:15634
D2b74847-c860-3e26-96fe-3fa4498d6348
Maven读书笔记
浏览量:26690
688db20f-402d-3a1d-8188-d6153d6c7465
Java通信
浏览量:13399
社区版块
存档分类
最新评论

05_Java通信_JMS_demo

 
阅读更多

上节简单描述了一下JMS的概念,这节来写个小demo。这样可以比较直观的看到JMS通信的过程。

前期回顾:04_Java通信_JMS概念

在开发这个demo前,首先要下载ActiveMQ来作为消息服务器。

Demo实现功能



 这是我们要实现的一个发布/订阅模型的demo

A发送消息到消息服务器ActiveMQ,然后ActiveMQ将消息发给订阅这个Topic的客户端:A和B

开发流程:

1.要开发一个jms demo首先要下载某厂商的JMS的实现
2.下载ActiveMQ,这是Apache的一个开源MQ系统

下载地址:http://activemq.apache.org/download.html

 

下载历史版本:http://activemq.apache.org/download-archives.html



 3.下载后直接解压

我们开发一个demo直接可以使用这个activemq-all-5.2.0.jar这个jar包

4.还需要jms的jar包:
如果使用Maven,可以使用下面的依赖:

<dependency>
	<groupId>javax.jms</groupId>
	<artifactId>jms</artifactId>
	<version>1.1</version>
</dependency>

 

5.开发一个简单的聊天demo,就想上面那张聊天图一样

package com.jms.test;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;

public class Chat implements javax.jms.MessageListener {
	private TopicSession pubSession;
	private TopicPublisher publisher;
	private TopicConnection conntection;
	private String username;
	
	public Chat(String topicFactory,String topicName,String username) throws Exception{
//		System.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
//		System.setProperty(Context.PROVIDER_URL, "tcp://localhost:61616");
//		
		//使用JNDI获得ConnectionFactory
		InitialContext ctx = new InitialContext();
		TopicConnectionFactory conFactory = (TopicConnectionFactory)ctx.lookup(topicFactory);
		
		//通过ConnectionFactory得到Connection
		TopicConnection connection = conFactory.createTopicConnection();
		
		//得到session
		TopicSession pubSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
		TopicSession subSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
		
		//使用JNDI查找主题Topic
		Topic chatTopic = (Topic)ctx.lookup(topicName);
		
		//通过session和主题Topic获得发布者,订阅者
		TopicPublisher publisher = pubSession.createPublisher(chatTopic);
		TopicSubscriber subscriber = subSession.createSubscriber(chatTopic);
		
		subscriber.setMessageListener(this);
		this.conntection = connection;
		this.pubSession = pubSession;
		this.publisher = publisher;
		this.username = username;
		
		conntection.start();
	}
	@Override
public void onMessage(Message m) {
	try {
//		m.set
		TextMessage tm = (TextMessage)m;
		System.out.println(tm.getText());
	} catch (JMSException e) {
		e.printStackTrace();
	}
	
}
	
	protected void writeMessage(String text) throws Exception{
		TextMessage message = pubSession.createTextMessage();
		message.setText(username+": "+text);
		//消息发送者,发送消息
		publisher.publish(message);
	}
	
	public void close() throws Exception{
		conntection.close();
	}
	
	public static void main(String[] args) throws Exception {
		
		Chat chat = new Chat("TopicCF","topic1","hh");
		
		BufferedReader commandLine = new BufferedReader(new InputStreamReader(System.in));
		
		while(true){
			String s = commandLine.readLine();
			if(s.equalsIgnoreCase("exit")){
				chat.close();
				System.exit(0);
			}else{
				chat.writeMessage(s);
			}
		}
		
	}
}

 

6.配置文件:

java.naming.factory.initial =org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url =tcp://localhost:61616
java.naming.security.principal =system
java.naming.security.credentials =manager
connectionFactoryNames =TopicCF
topic.topic1 =jms.topic1

 7.客户端B:

package com.jms.test;

import java.io.BufferedReader;
import java.io.InputStreamReader;

public class ChatA {
	public static void main(String[] args) throws Exception {

		Chat chat = new Chat("TopicCF","topic1","cc");
		
		BufferedReader commandLine = new BufferedReader(new InputStreamReader(System.in));
		
		while(true){
			String s = commandLine.readLine();
			if(s.equalsIgnoreCase("exit")){
				chat.close();
				System.exit(0);
			}else{
				chat.writeMessage(s);
			}
		}
		
	}
}

 

8.如何运行代码:

启动消息服务器ActiveMQ

进入ActiveMQ的bin目录


双击运行:activemq.bat
查看是否启动成功:cmd-->netstat -na|find "61616"
出现:


则启动成功。还可以登陆

admin:http://127.0.0.1:8161/admin/

demo:http://127.0.0.1:8161/demo/

查看。
将配置文件jndi.properties放在classes的根目录。

项目目录结构:



 

代码难点分析:

1.在前面已经介绍过JNDI。我们可以通过JNDI来获取通信对象。
JMS客户端使用一个目录服务(JNDI)来访问ConnectionFactory和Destination(主题和队列)对象。也就是说这两个对象JMS API无法获得。在这一点上,它和连接,会话,生产者,消费者及消息不同。连接,会话,生产者,消费者及消息都是JMS API内部使用工厂模式生产的JNDI为了获得ConnectionFactory和Destination对象提供了一种方便、位置透明、可配置并且可移植的机制,这些对象也称为JMS受管对象,因为它是由系统管理员建立和配置的

2.线程和会话
在程序中创建了两个topicSession:pubSession,subSession。
为什么要创建两个呢,因为JMS规定一个session不能同时在一个以上的线程中运行。
这个例子中有两个线程:
运行writeMessage的主线程(线程所有者:chat应用程序)
运行onMessage的处理线程(线程所有者是JMS提供者所有,即:ActionMQ)

  • 大小: 111.6 KB
  • 大小: 41.6 KB
  • 大小: 5.6 KB
  • 大小: 31.9 KB
  • 大小: 15.7 KB
  • 大小: 25.5 KB
  • 大小: 44.5 KB
  • 大小: 40.8 KB
1
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics