Code Bye

JMS 收发消息代码

我是做性能测试的,最近接了一个项目,测试我们这边将要使用的weblogic的JMS中间件,想测一下JMS的收发效率,目标是每秒2.5-3W笔,最TM无语的是还让我们这些做测试的编程小白自己弄环境和写代码.无奈把百度来的代码东拼西凑写好了收发代码,测试了最多速度只有1W笔/秒。
代码见下。
求大神指教下代码还要怎么优化下,有错误的话请指教.
如果你有更好的测试方案,麻烦指教下.
不胜感激!

Sender代码

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

//import JMSrecevier.MyTherad;

public class TestMessage {

    private int currentNum = 0; 
    
    private Session currentObj;
     
    private List<Session> pool;
     
    private Connection conn = null;



    public void sendMessage() throws JMSException, NamingException{
    
    pool = Collections.synchronizedList(new LinkedList<Session>());
        
    try {
    
   
        final String CONNECTION_FACTORY_JNDI = "ConnectionFactory-1" ; 
       
        Context ctx = getInitialContext();
        
        ConnectionFactory connFactory = (ConnectionFactory)ctx.lookup(CONNECTION_FACTORY_JNDI);
       
        
       
        Destination dest = (Destination)ctx.lookup("Queue-1");
        
        conn = connFactory.createConnection();
       
        conn.start();
        
      		for(int i=0;i<2000;i++)
      		{
      			new Thread(new MyTherad(dest)).start();
      		}
    
      			//conn.close();
		    } catch (Exception e) {
		    	e.printStackTrace();
		    }
        
    }     
        
    
    
public class MyTherad  implements Runnable
{
	private Destination dest;
	public MyTherad(Destination dest)
	{
		this.dest=dest;
	}
	@Override
	public void run() {
	try {
			while(true)
			{
				Session	session=getSession();
				MessageProducer sender = session.createProducer(dest);
				sender.setDeliveryMode(DeliveryMode.PERSISTENT);
		        //sender.setTimeToLive(20000);
		        
		        MapMessage msg = session.createMapMessage();
		        
		        msg.setStringProperty("message","hello");
		        
		        sender.send(msg);
				returnSession(session);
				//System.out.println(currentNum);
			}
		} catch (Exception e) {
			e.printStackTrace();
		}

	}


}



    
    private Context getInitialContext(){
        Context ctx = null;
        Properties props = new Properties();
        props.put( Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
        props.put( Context.PROVIDER_URL, "t3://10.243.32.44:8001");    
        try {
             ctx = new InitialContext(props);
        } catch (NamingException e) {
            e.printStackTrace();
        }
        
        return ctx;
    }
    
    
    public static void main(String args[]){
    	TestMessage mp = new TestMessage();
        try {
            mp.sendMessage();
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (NamingException e) {
            e.printStackTrace();
        }
    }
    
    
    public synchronized Session getSession() throws Exception {
        
        if (pool.size() == 0 && currentNum < 2000) {
         
        
         
        currentObj = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
         
        pool.add(currentObj);
        System.out.println(pool.size());
        currentNum++;
         
        } else
         
        if (pool.size() == 0 && currentNum >= 2000) {
         
       
         
        while (pool.size() == 0) {
        }
        currentObj = (Session)pool.remove(0);
         
        } else {
         
        
         
        currentObj = (Session)pool.remove(0);
         
        }
         
        return currentObj;
         
        }
      
        public void returnSession(Session session) {
         
        pool.add(session);
         
        }
        
        public Connection getConn() {
         
        return conn;
         
        } 

}

Receiver代码

package JMS;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;


public class JMSReceiver {
	private int currentNum = 0; // 该对象池当前已创建的对象数目
    
    private Session currentObj;// 该对象池当前可以借出的对象
     
    private List<Session> pool;// 用于存放对象的池
     
    private Connection conn = null;
	 public void receiveMessage() throws JMSException, NamingException{
		 pool = Collections.synchronizedList(new LinkedList<Session>());
	     
		    try {
		        final String CONNECTION_FACTORY_JNDI = "ConnectionFactoryQueue" ; 
		        //
		        Context ctx = getInitialContext();
		        //
		        ConnectionFactory connFactory = (ConnectionFactory)ctx.lookup(CONNECTION_FACTORY_JNDI);
		        //
		        Destination dest = (Destination)ctx.lookup("QueueHezh");
		        
		        conn = connFactory.createConnection();
		        //
		        conn.start();
		       
		        	for(int i=0;i<2000;i++){
		        		new Thread(new MyTherad(dest)).start();
		        	}
		        
		        //conn.close();
            		    } catch (Exception e) {
		     e.printStackTrace();
		    }
		     
		    }
    public class MyTherad  implements Runnable{
    	private Destination dest;
    	public MyTherad(Destination dest){
    		this.dest=dest;
    	}
		@Override
		public void run() {
			try {
				while(true){
			Session	session=getSession();
			MessageConsumer receiver = session.createConsumer(dest);
	        MapMessage msg = (MapMessage)receiver.receive();
	        //System.out.println(msg);
	        returnSession(session);
	        System.out.println(currentNum);
				}
			} catch (Exception e) {
				e.printStackTrace();
			}

		}

    
    }
    private Context getInitialContext(){
        Context ctx = null;
        Properties props = new Properties();
        props.put( Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
        props.put( Context.PROVIDER_URL, "t3://192.168.43.170:7001");    
        try {
             ctx = new InitialContext(props);
        } catch (NamingException e) {
            e.printStackTrace();
        }
        
        return ctx;
    }
    
    public static void main(String args[]){
    
    	JMSReceiver s = new JMSReceiver();
        try {
            s.receiveMessage();
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (NamingException e) {
            e.printStackTrace();
        }
    }
    /**
    
     * session连接池
     
     * @author Li Bangsen 2012-12-19 下午02:34:06
     
     */
     
    public synchronized Session getSession() throws Exception {
     
    if (pool.size() == 0 && currentNum < 100) {
     
    // 如果当前池中无对象可用,而且已创建的对象数目小于所限制的最大值,创建一个新的对象
     
    currentObj = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
     
    pool.add(currentObj);
    System.out.println(pool.size());
    currentNum++;
     
    } else
     
    if (pool.size() == 0 && currentNum >= 100) {
     
    // 如果当前池中无对象可用,而且所创建的对象数目已达到所限制的最大值, 就只能等待其它线程返回对象到池中
     
    while (pool.size() == 0) {
    }
    currentObj = (Session)pool.remove(0);
     
    } else {
     
    // 如果当前池中有可用的对象,就直接从池中取出对象
     
    currentObj = (Session)pool.remove(0);
     
    }
     
    return currentObj;
     
    }
  
    public void returnSession(Session session) {
     
    pool.add(session);
     
    }
    
    public Connection getConn() {
     
    return conn;
     
    } 
    }   
顶一下.
大触求指导1!1
没人吗?!没人吗。!

70分
给你稍微改了一下,主要是两点
1.pool 不用List 而用ArrayLockingQueue
2.单独用一个线程去创建session
3.你这线程数太多了,线程太多会增加上下文切换的开销,反而影响性能,另外就是加剧共享变量的竞争
主要改的是TestMessage类
package jms;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

//import JMSrecevier.MyTherad;

public class TestMessage {

    private ArrayBlockingQueue<Session> pool = new ArrayBlockingQueue<Session>(2000);

    private Connection                  conn = null;

    public void sendMessage() throws JMSException, NamingException {

        try {

            final String CONNECTION_FACTORY_JNDI = "ConnectionFactory-1";

            Context ctx = getInitialContext();

            ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);

            Destination dest = (Destination) ctx.lookup("Queue-1");

            conn = connFactory.createConnection();

            conn.start();
            new Thread(new SessionCreateThread()).start();
            Thread.sleep(500);
            for (int i = 0; i < 100; i++) {
                new Thread(new MyTherad(dest)).start();
            }

            // conn.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public class MyTherad implements Runnable {

        private Destination dest;

        public MyTherad(Destination dest){
            this.dest = dest;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Session session = getSession();
                    MessageProducer sender = session.createProducer(dest);
                    sender.setDeliveryMode(DeliveryMode.PERSISTENT);
                    // sender.setTimeToLive(20000);

                    MapMessage msg = session.createMapMessage();

                    msg.setStringProperty("message", "hello");

                    sender.send(msg);
                    returnSession(session);
                    // System.out.println(currentNum);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }

        }

    }

    public class SessionCreateThread implements Runnable {

        @Override
        public void run() {

            Session currentObj;
            try {
                currentObj = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
                returnSession(currentObj);
            } catch (JMSException e) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }
            }

        }
    }

    private Context getInitialContext() {
        Context ctx = null;
        Properties props = new Properties();
        props.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
        props.put(Context.PROVIDER_URL, "t3://10.243.32.44:8001");
        try {
            ctx = new InitialContext(props);
        } catch (NamingException e) {
            e.printStackTrace();
        }

        return ctx;
    }

    public static void main(String args[]) {
        TestMessage mp = new TestMessage();
        try {
            mp.sendMessage();
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (NamingException e) {
            e.printStackTrace();
        }
    }

    public synchronized Session getSession() throws Exception {

        return pool.take();

    }

    public void returnSession(Session session) {

        try {
            pool.put(session);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    public Connection getConn() {

        return conn;

    }

}


15分
weblogic自带JMS线程池,你测试代码为啥要自己实现一个池来发JMS消息呢?
用一个Test类main里面直接send JMS测试数据,然后loadrunner直接加并发即可
只是我们本机是路由不通他们的测试主机的,用不了loadrunner的,还只能通过在他们的主机上跑代码来测试.上个礼拜测试结果出来了,才 1000笔/秒。
略蛋疼.接下来还要连接数据库来测试,数据库还是altibase,网上这数据库的资源太少了,
引用 4 楼 qingyuan18 的回复:

weblogic自带JMS线程池,你测试代码为啥要自己实现一个池来发JMS消息呢?
用一个Test类main里面直接send JMS测试数据,然后loadrunner直接加并发即可

同时运行几次这个代码  等同于与loadrunner的加并发吗?还是会加大资源消耗?

引用 3 楼 wangxf_8341 的回复:

给你稍微改了一下,主要是两点
1.pool 不用List 而用ArrayLockingQueue
2.单独用一个线程去创建session
3.你这线程数太多了,线程太多会增加上下文切换的开销,反而影响性能,另外就是加剧共享变量的竞争
主要改的是TestMessage类

package jms;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

//import JMSrecevier.MyTherad;

public class TestMessage {

    private ArrayBlockingQueue<Session> pool = new ArrayBlockingQueue<Session>(2000);

    private Connection                  conn = null;

    public void sendMessage() throws JMSException, NamingException {

        try {

            final String CONNECTION_FACTORY_JNDI = "ConnectionFactory-1";

            Context ctx = getInitialContext();

            ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);

            Destination dest = (Destination) ctx.lookup("Queue-1");

            conn = connFactory.createConnection();

            conn.start();
            new Thread(new SessionCreateThread()).start();
            Thread.sleep(500);
            for (int i = 0; i < 100; i++) {
                new Thread(new MyTherad(dest)).start();
            }

            // conn.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public class MyTherad implements Runnable {

        private Destination dest;

        public MyTherad(Destination dest){
            this.dest = dest;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Session session = getSession();
                    MessageProducer sender = session.createProducer(dest);
                    sender.setDeliveryMode(DeliveryMode.PERSISTENT);
                    // sender.setTimeToLive(20000);

                    MapMessage msg = session.createMapMessage();

                    msg.setStringProperty("message", "hello");

                    sender.send(msg);
                    returnSession(session);
                    // System.out.println(currentNum);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }

        }

    }

    public class SessionCreateThread implements Runnable {

        @Override
        public void run() {

            Session currentObj;
            try {
                currentObj = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
                returnSession(currentObj);
            } catch (JMSException e) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }
            }

        }
    }

    private Context getInitialContext() {
        Context ctx = null;
        Properties props = new Properties();
        props.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
        props.put(Context.PROVIDER_URL, "t3://10.243.32.44:8001");
        try {
            ctx = new InitialContext(props);
        } catch (NamingException e) {
            e.printStackTrace();
        }

        return ctx;
    }

    public static void main(String args[]) {
        TestMessage mp = new TestMessage();
        try {
            mp.sendMessage();
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (NamingException e) {
            e.printStackTrace();
        }
    }

    public synchronized Session getSession() throws Exception {

        return pool.take();

    }

    public void returnSession(Session session) {

        try {
            pool.put(session);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    public Connection getConn() {

        return conn;

    }

}

请问下,我想最大化的测出JMS的收发速度,这个速度只要是与代码有关,还是主机性能或者weblogic设置有关呢?
因为我在测试的时候,启用了100个线程但是资源消耗很不明显,速度也很小.


15分
JMS的收发速度与主机性能肯定有关,但是与代码优化和WebLogic设置也有关系。

CodeBye 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权 , 转载请注明JMS 收发消息代码