关键字:JAVA JMS  Active MQ  XML INI Eclipse Resin

jms服务器 , active MQ 我这一篇应该是第一篇比较详细的用代码+注释介绍的中文文档了
activeMQ是一个开源的C核心的jms服务器,在国外的应用相当广泛.所以这个项目我放弃了jboss4的用数据库作为消息载体的jms服务,改用active MQ

    整个项目的架构如下,用户通过http请求提交消息, webserver 收到提交请求后将消息封装到一个implements Serializable的class 然后通过active mq的 jms服务发送给server端,入库
    另一个进程将处理好的记录从数据库种取出,通过httpconnection发送给 用户提供的数据接收的webservice.



程序中负责接收http请求的servlet

package sms.khan;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.net.URLDecoder;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import sms.khan.mq.smsMQSender;

import comm.khan.LoadIni;
import comm.khan.commTools;

//import javax.naming.*;
//import javax.sql.DataSource;

public class recv extends HttpServlet {

    
private static final long serialVersionUID = 1L;

    
public recv() {

    }

    
private static final String CONTENT_TYPE = "text/html; charset=GBK";

    
// Initialize global variables
    public void init() throws ServletException {

    }

    
public void doGet(HttpServletRequest request, HttpServletResponse response)
             throws
 ServletException, IOException {
        
int ackMode=1;
        String clientID
="null";
        
boolean durable=false;
        String subject
="";
        String url
="";
        String pwd
="";
        String user
=""
        
boolean topic=false;
        
long timeToLive=0;
        
long sleepTime=0
        
        response.setContentType(CONTENT_TYPE);
        PrintWriter out 
= response.getWriter();
        InputStream in 
= null;

        
try {
            in 
= new BufferedInputStream(new FileInputStream("web/config.xml"));
        } 
catch (FileNotFoundException e) {
            e.printStackTrace();
        } 
        
/*获取发送者的信息*/
        SmsData sd 
= new SmsData();
        
if(commTools.existPara(request, "spid"))
            sd.setSpid(request.getParameter(
"spid"));
        
        
if(commTools.existPara(request, "passwd"))
            sd.setPasswd(request.getParameter(
"passwd"));
        
        
if(commTools.existPara(request, "seq"))
            sd.setSeq(request.getParameter(
"seq"));
        
        
if(commTools.existPara(request, "port"))
            sd.setPort(request.getParameter(
"port"));
        
        
if(commTools.existPara(request, "mobile"))
            sd.setMobile(request.getParameter(
"mobile"));
        
        
if(commTools.existPara(request, "service"))
            sd.setService(request.getParameter(
"service"));
        
        
if(commTools.existPara(request, "style"))
            sd.setStyle(request.getParameter(
"style"));
        
        
if(commTools.existPara(request, "content"))
            sd.setContent(URLDecoder.decode(request.getParameter(
"content"),"GB2312"));
        
        
if(commTools.existPara(request, "smscid"))
            sd.setSmscid(request.getParameter(
"smscid"));
        
        
if(commTools.existPara(request, "moseq"))
            sd.setMoseq(request.getParameter(
"moseq"));

        
// TODO 可以在此验证spid,ip,password
        
        XMLAnalyse.parseXml(in);
        
        out.println(
"<html>");
        out.println(
"<head><title>文件读写测试</title></head>");
        out.println(
"<body bgcolor=\"#ffffff\">");
        out.println(
"<p>content:"+sd.getContent()+"</p>");
        out.println(
"<p>ip:"+request.getRemoteHost()+" port:"+request.getServerPort()+"</p>");
        out.println(
"</body></html>");
        
        LoadIni ini 
=new LoadIni(); //装载配置文件
        ini.setFilename("web" + File.separator + "smsmq.ini");
        
try{
          ackMode 
=Integer.parseInt( ini.readINI("ackMode") );
          clientID 
= ini.readINI("clientID");
          durable 
= ini.readINI("durable").equals("true"? true : false;
          subject
=ini.readINI("subject");
          url
= ini.readINI("url");
          pwd
=ini.readINI("pwd");
          user
=ini.readINI("user");
          topic
=ini.readINI("topic").equals("true")? truefalse;
          timeToLive
=Integer.parseInt(ini.readINI("timeToLive"));
          sleepTime
=Integer.parseInt(ini.readINI("sleepTime"));
        }
catch(IOException e){
          e.printStackTrace();
        }
catch(Exception e){
          e.printStackTrace();
        }
       
        //将数据发送到jms
        smsMQSender smq
= new smsMQSender(ackMode, clientID, durable,subject, url,
                                         pwd, user, topic,timeToLive, sleepTime, sd);
        smq.run();
        
        out.flush();
        out.close();
    }

    
    
// Process the HTTP Post request
    public void doPost(HttpServletRequest request, HttpServletResponse response)
            
throws ServletException, IOException {
        doGet(request, response);
    }

    
public void destroy() {

    }
}





ToolSupport.java jms传递的基类

package sms.khan.mq;



import org.activemq.ActiveMQConnection;
import org.activemq.ActiveMQConnectionFactory;
import org.activemq.util.IndentPrinter;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;

/**
 * 消息传输类的抽象接口
 *
 * 
@version $Revision: 1.2 $
 
*/
public class ToolSupport {

    
/**消息发送的目的地*/
    
protected Destination destination;
    
    
/**消息池d名字*/
    
protected String subject = "TOOL.DEFAULT";
    
    
/**点对点传输模式或订阅模式 true 订阅模式,false 点对点模式*/
    
protected boolean topic = false;
    
    
protected String user = ActiveMQConnection.DEFAULT_USER;
    
protected String pwd  = ActiveMQConnection.DEFAULT_PASSWORD;
    
protected String url  = ActiveMQConnection.DEFAULT_BROKER_URL;
    
    
/*消息的transacted模式*/
    
protected boolean transacted = false;
    
    
/**传递模式, 有两种模式: PERSISTENT 和NON_PERSISTENT,PERSISTENT
     *  表示该消息一定要被送到目的地,否则会导致应用错误。NON_PERSISTENT 
     *  表示偶然丢失该消息是被允许的,这两种模式使开发者可以在消息传递的可
     *  靠性和吞吐量之间找到平衡点
*/
    
protected boolean durable = false;
    
    
protected String clientID;
    
    
/**
     * AUTO_ACKNOWLEDGE session将自动地确认收到一则消息。
     * CLIENT_ACKNOWLEDGE 客户端程序将确认收到一则消息,调用这则消息的确认方法。
     * DUPS_OK_ACKNOWLEDGE 这个选项命令session“懒散的”确认消息传递,可以想到,
     *   这将导致消息提供者传递的一些复制消息可能会出错。这种确认的方式只应当用于消
     *   息消费程序可以容忍潜在的副本消息存在的情况。
*/
    
protected int ackMode = Session.AUTO_ACKNOWLEDGE;
    
protected String consumerName = "James";

    
/**建立一个会话
     * 
@param Connection 消息池连接
     * 
@return Session 会话
     * 
@exception Exception*/
    
protected Session createSession(Connection connection) throws Exception {
        Session session 
= connection.createSession(transacted, ackMode);
        
if (topic) {
            destination 
= session.createTopic(subject); //发布-订阅消息模式
        } else {
            destination 
= session.createQueue(subject); //点对点消息队列模式
        }
        
return session;
    }

    
/**与消息池建立连接,并准备通讯
     *
@return Connection 返回连接Connection
     *
@exception JMSException 
     *
@exception Exception*/
    
protected Connection createConnection() throws JMSException, Exception {
        ActiveMQConnectionFactory connectionFactory 
= new ActiveMQConnectionFactory(user, pwd, url);
        Connection connection 
= connectionFactory.createConnection();
        
if (durable && clientID!=null) {
            connection.setClientID(clientID);
        }
        
        connection.start(); 
//开始消息传递
        return connection;
    }

    
/**关闭连接和会话
     * 
@param Connection 连接
     * 
@param Session 会话
     * 
@exception JMSException*/
    
protected void close(Connection connection, Session session) throws JMSException {
        
// lets dump the stats
        
//dumpStats(connection);

        
if (session != null) {
            session.close();
        }
        
if (connection != null) {
            connection.close();
        }
    }

    
/**打印当前连接的状态信息
     *
@param Connection 连接*/
    
protected void dumpStats(Connection connection) {
        ActiveMQConnection c 
= (ActiveMQConnection) connection;
        c.getConnectionStats().dump(
new IndentPrinter());
    }
}



发送jms的class smsMQSender

package sms.khan.mq;


import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.ObjectMessage;
//import javax.jms.TextMessage;

/**发送一个object到指定消息池
 * 
*/
public class smsMQSender extends ToolSupport {
    
private long timeToLive;
    
protected long sleepTime = 0L;
    
private Object object;

    
// "1000 255 null"
    /**构造器
     * 
@param int ackMode 
     * 
@param String ClientID
     * 
@param boolean durable
     * 
@param String subject
     * 
@param String url
     * 
@param String pwd
     * 
@param String user
     * 
@param boolean topic
     * 
@param long timeToLive
     * 
@param long sleepTime
     * 
@param Object object
     * 
*/
    
public smsMQSender(int ackMode, String clientID, boolean durable,
            String subject, String url, String pwd, String user, 
boolean topic,
            
long timeToLive, long sleepTime, Object object) {
        
this.ackMode = ackMode;
        
this.clientID = clientID; //
        this.durable = durable; // false
        this.subject = subject; // TEST.FOO
        this.url = url; // localhost:61616
        this.pwd = pwd;
        
this.user = user;
        
this.topic = topic; // false
        this.timeToLive = timeToLive;
        
this.sleepTime = sleepTime;
        
this.object = object;
    }

    
/** 连接消息池并发送消息 */
    
public void run() {
        
try {
            System.out.println(
"连接到 to URL: " + url);
            System.out.println(
"发送消息到 " + (topic ? "topic" : "queue"+ "" + subject);
            System.out.println(
"使用 " + (durable ? "durable" : "non-durable"+ " 传递模式");
            
if (timeToLive != 0) {
                System.out.println(
"消息的生存期为 " + timeToLive + " ms");
            }
            Connection connection 
= createConnection();// 建立一个消息池的连接
            Session session = createSession(connection); // 建立会话
            MessageProducer producer = createProducer(session); // 建立消息生产者
            sendLoop(session, producer);

            System.out.println(
"完成.");
            close(connection, session);
        } 
catch (Exception e) {
            System.out.println(
"Caught: " + e);
            e.printStackTrace();
        }
    }

    
/**
     * 建立消息生产者
     * 
@param Session 会话
     * 
@return MessageProducer 消息生产者
     * 
@exception JMSException
     
*/
    
protected MessageProducer createProducer(Session session) throws JMSException {
        MessageProducer producer 
= session.createProducer(destination);
        
if (durable) {
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        } 
else {
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        }
        
if (timeToLive != 0)
            producer.setTimeToLive(timeToLive);
        
return producer;
    }
    
    
      
/**循环发送消息
       * 
@param Session
       * 
@param MessageProducer
       * 
@exception Exception*/
      
protected void sendLoop(Session session, MessageProducer producer) throws Exception {
          ObjectMessage message 
= session.createObjectMessage();
          message.setObject((sms.khan.SmsData)object);
          producer.send(message);
          
if(transacted) {
            session.commit();
          }
        

      }
    
    

    
/**
     * 
@param args
     
*/
    
public static void main(String[] args) {
        
// TODO Auto-generated method stub

    }

}




接收jms的类 smsMQRecv .java
package sms.khan.mq;



import java.io.File;
import java.io.IOException;

import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.ObjectMessage;
import javax.jms.Topic;

import comm.khan.LoadIni;

import sms.khan.SmsData;


/**
 * 接收类,消费者模型
 * 
@version $Revision: 1.1.1.1 $
 
*/
public class smsMQRecv extends ToolSupport implements MessageListener, ExceptionListener {

    
protected int count = 0;
    
protected int dumpCount = 10;
    
protected boolean verbose = true;
    
protected int maximumMessages = 0;
    
private boolean pauseBeforeShutdown;
    
private boolean running;
    
private Session session;
    
private long sleepTime = 0;

    
    
/**构造器
     * 
*/
    
public smsMQRecv(){
    
    }
    
    
    
public static void main(String[] args) {
        smsMQRecv tool 
= new smsMQRecv();
        
        LoadIni ini 
=new LoadIni(); //装载配置文件
        ini.setFilename("web" + File.separator + "smsmq.ini");
        
try{
          tool.clientID  
= ini.readINI("clientID");
          tool.durable 
= ini.readINI("durable").equals("true"? true : false;
          tool.subject 
= ini.readINI("subject");
          tool.url 
= ini.readINI("url");
          tool.topic 
= ini.readINI("topic").equals("true")? truefalse;
          tool.sleepTime 
= Integer.parseInt(ini.readINI("sleepTime"));
        }
catch(IOException e){
          e.printStackTrace();
        }
catch(Exception e){
          e.printStackTrace();
        }
        
if (args.length > 4) {
            tool.maximumMessages 
= Integer.parseInt(args[4]);
        }

        
//if (args.length > 6) {
        
//    tool.transacted = "true".equals(args[6]);
        
//}


        tool.run();
    }

    
public void run() {
        
try {
            running 
= true;

            System.out.println(
"连接到服务器: " + url);
            System.out.println(
"Consuming " + (topic ? "topic" : "queue"+ "" + subject);
            System.out.println(
"Using " + (durable ? "durable" : "non-durable"+ " subscription");

            Connection connection 
= createConnection();//建立一个消息池连接
            connection.setExceptionListener(this);
//设置ExceptionListener为this ,因为this implements ExceptionListener
            session = createSession(connection);
            MessageConsumer consumer 
= null;//建立一个消息消费者
            
            
if (durable && topic) {
                consumer 
= session.createDurableSubscriber((Topic) destination, consumerName);
            } 
else {
                consumer 
= session.createConsumer(destination);
            }
            
            
if (maximumMessages <= 0) {
                consumer.setMessageListener(
this);
            }

            
if (maximumMessages > 0) {
                consumeMessagesAndClose(connection, session, consumer);
            }
        } 
catch (Exception e) {
            System.out.println(
"Caught: " + e); 
            e.printStackTrace();
        }
    }

    
/**MessageListener 的abstract 接口
     * 
@param Message 从消息池接收到的消息*/
    
public void onMessage(Message message) {
        
try {
            
if (message instanceof ObjectMessage) {
                ObjectMessage Msg 
= ((ObjectMessage) message);
                SmsData sd 
=  (SmsData)(Msg.getObject());
                System.out.println(sd.getMobile() 
+ sd.getContent());
                
if (verbose) System.out.println("Received: " + sd.getMobile());
            } 
else {
                
if (verbose) System.out.println("Received: " + message);
            }
            
if (transacted) {
                session.commit();
            }

        } 
catch (JMSException e) {
            System.out.println(
"Caught: " + e);
            e.printStackTrace();
        } 
finally {
            
if (sleepTime > 0) {
                
try {
                    Thread.sleep(sleepTime);
                } 
catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    
/**ExceptionListener的abstract 接口
     *
@param JMSException 接收到的JMS异常 */
    
synchronized public void onException(JMSException ex) {
        System.out.println(
"JMS Exception occured.  Shutting down client.");
        running 
= false;
    }

    
/**检测实例是否正在运行*/
    
synchronized boolean isRunning() {
        
return running;
    }

    
protected void consumeMessagesAndClose(Connection connection, Session session,
                          MessageConsumer consumer) 
throws JMSException, IOException {
        System.out.println(
"系统等待,直到接收到 " + maximumMessages + " 条消息再退出");
        
while( isRunning()){
            Message message 
= consumer.receive();
            onMessage(message);
        }
/*
        for (int i = 0; i < maximumMessages && isRunning();) {
            Message message = consumer.receive(10);
            if (message != null) {
                i++;
                onMessage(message);
            }
        }
*/
        System.out.println(
"Closing connection");
        consumer.close();
        session.close();
        connection.close();
        
if (pauseBeforeShutdown) {
            System.out.println(
"Press return to shut down");
            System.in.read();
        }
    }

}


XMLAnalyse.java 分析xml配置文件的类
package sms.khan;


import java.io.*;
import java.io.FileInputStream;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.w3c.dom.*;
import javax.xml.parsers.ParserConfigurationException;
import org.xml.sax.SAXException;

public class XMLAnalyse {



  
public XMLAnalyse() {
  }

  
public static void main(String args[]) {
    InputStream in 
= null;
    
//byte[] buf = new byte[255];
    try {
      in 
= new BufferedInputStream(new FileInputStream("web/config.xml"));
        
/*while (in.read(buf) != -1) {
            for (int i = 0; i < 255; i++) {
                if (buf[i] == 0) break;
                System.out.print((char) buf[i]);
                buf[i] = '\0';
            }
        }
        System.out.print('\n');
        in = new BufferedInputStream(new FileInputStream("web/config.xml"));
*/
    } 
catch (FileNotFoundException e) {
        e.printStackTrace();
    } 
//catch (IOException e) {
    
//    e.printStackTrace();
    
//}
    
    XMLAnalyse.parseXml(in);
  }

  
public static int parseXml(InputStream inputstream) {
    DocumentBuilderFactory df 
= DocumentBuilderFactory.newInstance();
    DocumentBuilder doc 
= null;
    
try {
      doc 
= df.newDocumentBuilder();
    } 
catch (ParserConfigurationException e) {
      e.printStackTrace();
    }

    Document document 
= null;
    
try {
      document 
= doc.parse(inputstream);
    } 
catch (DOMException dom) {
      dom.printStackTrace();
    } 
catch (SAXException ioe) {
      ioe.printStackTrace();
    } 
catch (IOException e) {
      e.printStackTrace();
    }
 
    Element root 
= document.getDocumentElement();

    
for (int i = 0; i < root.getChildNodes().getLength(); i++) {
      NodeList nl 
= root.getChildNodes().item(i).getChildNodes();
      
int len= nl.getLength();
      
if ( len > 1) {
        System.out.println(
"处理sp名单");
        
for (int j = 0; j < len; j++) {
            
            String name 
= nl.item(j).getNodeName();
            
if (!name.equals("#text")){//回车和空格会产生的nodename "#text"
              System.out.println("name:"+name +" value:" +nl.item(j).getLastChild().getNodeValue());
            }
        }
      }
    }

    
return 0;
  }


}


smsData.java 数据载体类
package sms.khan;

import java.io.Serializable;

//import comm.khan.commTools;

public class SmsData implements Serializable{

    
/**
     * 
     
*/
    
private static final long serialVersionUID = 1L;
    
    
private String spid = "";
    
private String passwd = "";
    
private String seq = "";
    
private String port = "";
    
private String mobile = "";
    
private String service = "";
    
private String style = "";
    
private String content = "";        
    
private String smscid = "";
    
private String moseq = "";
        
    
public SmsData(){}
    
    
    
public SmsData(String content,String mobile,
            String moseq,String passwd,String port,
            String seq,String service,String smscid,
            String spid,String style) {
        
this.content = content;
        
this.mobile = mobile;
        
this.moseq = moseq;
        
this.passwd = passwd;
        
this.port = port;
        
this.seq = seq;
        
this.service = service;
        
this.smscid = smscid;
        
this.spid = spid;
        
this.style = style;
    }

    
public String getContent() {
        
return content;
    }

    
public void setContent(String content) {
        
this.content = content;
    }

    
public String getMobile() {
        
return mobile;
    }

    
public void setMobile(String mobile) {
        
this.mobile = mobile;
    }

    
public String getMoseq() {
        
return moseq;
    }

    
public void