关键字:JAVA JMS Active MQ XML INI Eclipse Resin
jms服务器 , active MQ 我这一篇应该是第一篇比较详细的用代码+注释介绍的中文文档了
activeMQ是一个开源的C核心的jms服务器,在国外的应用相当广泛.所以这个项目我放弃了jboss4的用数据库作为消息载体的jms服务,改用active MQ
整个项目的架构如下,用户通过http请求提交消息, webserver 收到提交请求后将消息封装到一个implements Serializable的class 然后通过active mq的 jms服务发送给server端,入库
另一个进程将处理好的记录从数据库种取出,通过httpconnection发送给 用户提供的数据接收的webservice.
程序中负责接收http请求的servlet
package sms.khan;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.net.URLDecoder;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import sms.khan.mq.smsMQSender;
import comm.khan.LoadIni;
import comm.khan.commTools;
//import javax.naming.*;
//import javax.sql.DataSource;
public class recv extends HttpServlet {
private static final long serialVersionUID = 1L;
public recv() {
}
private static final String CONTENT_TYPE = "text/html; charset=GBK";
// Initialize global variables
public void init() throws ServletException {
}
public void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
int ackMode=1;
String clientID="null";
boolean durable=false;
String subject="";
String url="";
String pwd="";
String user="";
boolean topic=false;
long timeToLive=0;
long sleepTime=0;
response.setContentType(CONTENT_TYPE);
PrintWriter out = response.getWriter();
InputStream in = null;
try {
in = new BufferedInputStream(new FileInputStream("web/config.xml"));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
/*获取发送者的信息*/
SmsData sd = new SmsData();
if(commTools.existPara(request, "spid"))
sd.setSpid(request.getParameter("spid"));
if(commTools.existPara(request, "passwd"))
sd.setPasswd(request.getParameter("passwd"));
if(commTools.existPara(request, "seq"))
sd.setSeq(request.getParameter("seq"));
if(commTools.existPara(request, "port"))
sd.setPort(request.getParameter("port"));
if(commTools.existPara(request, "mobile"))
sd.setMobile(request.getParameter("mobile"));
if(commTools.existPara(request, "service"))
sd.setService(request.getParameter("service"));
if(commTools.existPara(request, "style"))
sd.setStyle(request.getParameter("style"));
if(commTools.existPara(request, "content"))
sd.setContent(URLDecoder.decode(request.getParameter("content"),"GB2312"));
if(commTools.existPara(request, "smscid"))
sd.setSmscid(request.getParameter("smscid"));
if(commTools.existPara(request, "moseq"))
sd.setMoseq(request.getParameter("moseq"));
// TODO 可以在此验证spid,ip,password
XMLAnalyse.parseXml(in);
out.println("<html>");
out.println("<head><title>文件读写测试</title></head>");
out.println("<body bgcolor=\"#ffffff\">");
out.println("<p>content:"+sd.getContent()+"</p>");
out.println("<p>ip:"+request.getRemoteHost()+" port:"+request.getServerPort()+"</p>");
out.println("</body></html>");
LoadIni ini =new LoadIni(); //装载配置文件
ini.setFilename("web" + File.separator + "smsmq.ini");
try{
ackMode =Integer.parseInt( ini.readINI("ackMode") );
clientID = ini.readINI("clientID");
durable = ini.readINI("durable").equals("true") ? true : false;
subject=ini.readINI("subject");
url= ini.readINI("url");
pwd=ini.readINI("pwd");
user=ini.readINI("user");
topic=ini.readINI("topic").equals("true")? true: false;
timeToLive=Integer.parseInt(ini.readINI("timeToLive"));
sleepTime=Integer.parseInt(ini.readINI("sleepTime"));
}catch(IOException e){
e.printStackTrace();
}catch(Exception e){
e.printStackTrace();
}
//将数据发送到jms
smsMQSender smq= new smsMQSender(ackMode, clientID, durable,subject, url,
pwd, user, topic,timeToLive, sleepTime, sd);
smq.run();
out.flush();
out.close();
}
// Process the HTTP Post request
public void doPost(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
doGet(request, response);
}
public void destroy() {
}
}
ToolSupport.java jms传递的基类
package sms.khan.mq;
import org.activemq.ActiveMQConnection;
import org.activemq.ActiveMQConnectionFactory;
import org.activemq.util.IndentPrinter;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
/**
* 消息传输类的抽象接口
*
* @version $Revision: 1.2 $
*/
public class ToolSupport {
/**消息发送的目的地*/
protected Destination destination;
/**消息池d名字*/
protected String subject = "TOOL.DEFAULT";
/**点对点传输模式或订阅模式 true 订阅模式,false 点对点模式*/
protected boolean topic = false;
protected String user = ActiveMQConnection.DEFAULT_USER;
protected String pwd = ActiveMQConnection.DEFAULT_PASSWORD;
protected String url = ActiveMQConnection.DEFAULT_BROKER_URL;
/*消息的transacted模式*/
protected boolean transacted = false;
/**传递模式, 有两种模式: PERSISTENT 和NON_PERSISTENT,PERSISTENT
* 表示该消息一定要被送到目的地,否则会导致应用错误。NON_PERSISTENT
* 表示偶然丢失该消息是被允许的,这两种模式使开发者可以在消息传递的可
* 靠性和吞吐量之间找到平衡点*/
protected boolean durable = false;
protected String clientID;
/**
* AUTO_ACKNOWLEDGE session将自动地确认收到一则消息。
* CLIENT_ACKNOWLEDGE 客户端程序将确认收到一则消息,调用这则消息的确认方法。
* DUPS_OK_ACKNOWLEDGE 这个选项命令session“懒散的”确认消息传递,可以想到,
* 这将导致消息提供者传递的一些复制消息可能会出错。这种确认的方式只应当用于消
* 息消费程序可以容忍潜在的副本消息存在的情况。*/
protected int ackMode = Session.AUTO_ACKNOWLEDGE;
protected String consumerName = "James";
/**建立一个会话
* @param Connection 消息池连接
* @return Session 会话
* @exception Exception*/
protected Session createSession(Connection connection) throws Exception {
Session session = connection.createSession(transacted, ackMode);
if (topic) {
destination = session.createTopic(subject); //发布-订阅消息模式
} else {
destination = session.createQueue(subject); //点对点消息队列模式
}
return session;
}
/**与消息池建立连接,并准备通讯
*@return Connection 返回连接Connection
*@exception JMSException
*@exception Exception*/
protected Connection createConnection() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);
Connection connection = connectionFactory.createConnection();
if (durable && clientID!=null) {
connection.setClientID(clientID);
}
connection.start(); //开始消息传递
return connection;
}
/**关闭连接和会话
* @param Connection 连接
* @param Session 会话
* @exception JMSException*/
protected void close(Connection connection, Session session) throws JMSException {
// lets dump the stats
//dumpStats(connection);
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
/**打印当前连接的状态信息
*@param Connection 连接*/
protected void dumpStats(Connection connection) {
ActiveMQConnection c = (ActiveMQConnection) connection;
c.getConnectionStats().dump(new IndentPrinter());
}
}
发送jms的class smsMQSender
package sms.khan.mq;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.ObjectMessage;
//import javax.jms.TextMessage;
/**发送一个object到指定消息池
* */
public class smsMQSender extends ToolSupport {
private long timeToLive;
protected long sleepTime = 0L;
private Object object;
// "1000 255 null"
/**构造器
* @param int ackMode
* @param String ClientID
* @param boolean durable
* @param String subject
* @param String url
* @param String pwd
* @param String user
* @param boolean topic
* @param long timeToLive
* @param long sleepTime
* @param Object object
* */
public smsMQSender(int ackMode, String clientID, boolean durable,
String subject, String url, String pwd, String user, boolean topic,
long timeToLive, long sleepTime, Object object) {
this.ackMode = ackMode;
this.clientID = clientID; //
this.durable = durable; // false
this.subject = subject; // TEST.FOO
this.url = url; // localhost:61616
this.pwd = pwd;
this.user = user;
this.topic = topic; // false
this.timeToLive = timeToLive;
this.sleepTime = sleepTime;
this.object = object;
}
/** 连接消息池并发送消息 */
public void run() {
try {
System.out.println("连接到 to URL: " + url);
System.out.println("发送消息到 " + (topic ? "topic" : "queue") + ": " + subject);
System.out.println("使用 " + (durable ? "durable" : "non-durable") + " 传递模式");
if (timeToLive != 0) {
System.out.println("消息的生存期为 " + timeToLive + " ms");
}
Connection connection = createConnection();// 建立一个消息池的连接
Session session = createSession(connection); // 建立会话
MessageProducer producer = createProducer(session); // 建立消息生产者
sendLoop(session, producer);
System.out.println("完成.");
close(connection, session);
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
/**
* 建立消息生产者
* @param Session 会话
* @return MessageProducer 消息生产者
* @exception JMSException
*/
protected MessageProducer createProducer(Session session) throws JMSException {
MessageProducer producer = session.createProducer(destination);
if (durable) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
} else {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
if (timeToLive != 0)
producer.setTimeToLive(timeToLive);
return producer;
}
/**循环发送消息
* @param Session
* @param MessageProducer
* @exception Exception*/
protected void sendLoop(Session session, MessageProducer producer) throws Exception {
ObjectMessage message = session.createObjectMessage();
message.setObject((sms.khan.SmsData)object);
producer.send(message);
if(transacted) {
session.commit();
}
}
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
}
}
接收jms的类 smsMQRecv .javapackage sms.khan.mq;
import java.io.File;
import java.io.IOException;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.ObjectMessage;
import javax.jms.Topic;
import comm.khan.LoadIni;
import sms.khan.SmsData;
/**
* 接收类,消费者模型
* @version $Revision: 1.1.1.1 $
*/
public class smsMQRecv extends ToolSupport implements MessageListener, ExceptionListener {
protected int count = 0;
protected int dumpCount = 10;
protected boolean verbose = true;
protected int maximumMessages = 0;
private boolean pauseBeforeShutdown;
private boolean running;
private Session session;
private long sleepTime = 0;
/**构造器
* */
public smsMQRecv(){
}
public static void main(String[] args) {
smsMQRecv tool = new smsMQRecv();
LoadIni ini =new LoadIni(); //装载配置文件
ini.setFilename("web" + File.separator + "smsmq.ini");
try{
tool.clientID = ini.readINI("clientID");
tool.durable = ini.readINI("durable").equals("true") ? true : false;
tool.subject = ini.readINI("subject");
tool.url = ini.readINI("url");
tool.topic = ini.readINI("topic").equals("true")? true: false;
tool.sleepTime = Integer.parseInt(ini.readINI("sleepTime"));
}catch(IOException e){
e.printStackTrace();
}catch(Exception e){
e.printStackTrace();
}
if (args.length > 4) {
tool.maximumMessages = Integer.parseInt(args[4]);
}
//if (args.length > 6) {
// tool.transacted = "true".equals(args[6]);
//}
tool.run();
}
public void run() {
try {
running = true;
System.out.println("连接到服务器: " + url);
System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
System.out.println("Using " + (durable ? "durable" : "non-durable") + " subscription");
Connection connection = createConnection();//建立一个消息池连接
connection.setExceptionListener(this);//设置ExceptionListener为this ,因为this implements ExceptionListener
session = createSession(connection);
MessageConsumer consumer = null;//建立一个消息消费者
if (durable && topic) {
consumer = session.createDurableSubscriber((Topic) destination, consumerName);
} else {
consumer = session.createConsumer(destination);
}
if (maximumMessages <= 0) {
consumer.setMessageListener(this);
}
if (maximumMessages > 0) {
consumeMessagesAndClose(connection, session, consumer);
}
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
/**MessageListener 的abstract 接口
* @param Message 从消息池接收到的消息*/
public void onMessage(Message message) {
try {
if (message instanceof ObjectMessage) {
ObjectMessage Msg = ((ObjectMessage) message);
SmsData sd = (SmsData)(Msg.getObject());
System.out.println(sd.getMobile() + sd.getContent());
if (verbose) System.out.println("Received: " + sd.getMobile());
} else {
if (verbose) System.out.println("Received: " + message);
}
if (transacted) {
session.commit();
}
} catch (JMSException e) {
System.out.println("Caught: " + e);
e.printStackTrace();
} finally {
if (sleepTime > 0) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**ExceptionListener的abstract 接口
*@param JMSException 接收到的JMS异常 */
synchronized public void onException(JMSException ex) {
System.out.println("JMS Exception occured. Shutting down client.");
running = false;
}
/**检测实例是否正在运行*/
synchronized boolean isRunning() {
return running;
}
protected void consumeMessagesAndClose(Connection connection, Session session,
MessageConsumer consumer) throws JMSException, IOException {
System.out.println("系统等待,直到接收到 " + maximumMessages + " 条消息再退出");
while( isRunning()){
Message message = consumer.receive();
onMessage(message);
}
/*
for (int i = 0; i < maximumMessages && isRunning();) {
Message message = consumer.receive(10);
if (message != null) {
i++;
onMessage(message);
}
}*/
System.out.println("Closing connection");
consumer.close();
session.close();
connection.close();
if (pauseBeforeShutdown) {
System.out.println("Press return to shut down");
System.in.read();
}
}
}
XMLAnalyse.java 分析xml配置文件的类package sms.khan;
import java.io.*;
import java.io.FileInputStream;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.w3c.dom.*;
import javax.xml.parsers.ParserConfigurationException;
import org.xml.sax.SAXException;
public class XMLAnalyse {
public XMLAnalyse() {
}
public static void main(String args[]) {
InputStream in = null;
//byte[] buf = new byte[255];
try {
in = new BufferedInputStream(new FileInputStream("web/config.xml"));
/*while (in.read(buf) != -1) {
for (int i = 0; i < 255; i++) {
if (buf[i] == 0) break;
System.out.print((char) buf[i]);
buf[i] = '\0';
}
}
System.out.print('\n');
in = new BufferedInputStream(new FileInputStream("web/config.xml"));*/
} catch (FileNotFoundException e) {
e.printStackTrace();
} //catch (IOException e) {
// e.printStackTrace();
//}
XMLAnalyse.parseXml(in);
}
public static int parseXml(InputStream inputstream) {
DocumentBuilderFactory df = DocumentBuilderFactory.newInstance();
DocumentBuilder doc = null;
try {
doc = df.newDocumentBuilder();
} catch (ParserConfigurationException e) {
e.printStackTrace();
}
Document document = null;
try {
document = doc.parse(inputstream);
} catch (DOMException dom) {
dom.printStackTrace();
} catch (SAXException ioe) {
ioe.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
Element root = document.getDocumentElement();
for (int i = 0; i < root.getChildNodes().getLength(); i++) {
NodeList nl = root.getChildNodes().item(i).getChildNodes();
int len= nl.getLength();
if ( len > 1) {
System.out.println("处理sp名单");
for (int j = 0; j < len; j++) {
String name = nl.item(j).getNodeName();
if (!name.equals("#text")){//回车和空格会产生的nodename "#text"
System.out.println("name:"+name +" value:" +nl.item(j).getLastChild().getNodeValue());
}
}
}
}
return 0;
}
}
smsData.java 数据载体类package sms.khan;
import java.io.Serializable;
//import comm.khan.commTools;
public class SmsData implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String spid = "";
private String passwd = "";
private String seq = "";
private String port = "";
private String mobile = "";
private String service = "";
private String style = "";
private String content = "";
private String smscid = "";
private String moseq = "";
public SmsData(){}
public SmsData(String content,String mobile,
String moseq,String passwd,String port,
String seq,String service,String smscid,
String spid,String style) {
this.content = content;
this.mobile = mobile;
this.moseq = moseq;
this.passwd = passwd;
this.port = port;
this.seq = seq;
this.service = service;
this.smscid = smscid;
this.spid = spid;
this.style = style;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public String getMobile() {
return mobile;
}
public void setMobile(String mobile) {
this.mobile = mobile;
}
public String getMoseq() {
return moseq;
}
public void