怎么在Java项目中利用rabbitMQ实现一个消息收发功能

怎么在Java项目中利用rabbitMQ实现一个消息收发功能?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

成都创新互联公司专业为企业提供密山网站建设、密山做网站、密山网站设计、密山网站制作等企业网站建设、网页设计与制作、密山企业网站模板建站服务,十余年密山做网站经验,不只是建网站,更提供有价值的思路和整体网络服务。

java实现rAMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

本文不介绍amqp和rabbitmq相关知识,请自行网上查阅

本文是基于spring-rabbit中间件来实现消息的发送接受功能

see http://www.rabbitmq.com/tutorials/tutorial-one-java.html

see http://www.springsource.org/spring-amqp

Java编程通过操作rabbitMQ消息的收发实现代码如下:


 	
		com.rabbitmq
		amqp-client
		2.8.2
	
	
		org.springframework.amqp
		spring-amqp
		1.1.1.RELEASE
	
	
		org.springframework.amqp
		spring-rabbit
		1.1.1.RELEASE
	
	
		com.caucho
		hessian
		4.0.7
	
 

首先我们需要一个用来在app和rabbitmq之间传递消息的持有对象

public class EventMessage implements Serializable{
	private String queueName;
	private String exchangeName;
	private byte[] eventData;
	public EventMessage(String queueName, String exchangeName, byte[] eventData) {
		this.queueName = queueName;
		this.exchangeName = exchangeName;
		this.eventData = eventData;
	}
	public EventMessage() {
	}	
	public String getQueueName() {
		return queueName;
	}
	public String getExchangeName() {
		return exchangeName;
	}
	public byte[] getEventData() {
		return eventData;
	}
	@Override
	public String toString() {
		return "EopEventMessage [queueName=" + queueName + ", exchangeName="
				+ exchangeName + ", eventData=" + Arrays.toString(eventData)
				+ "]";
	}
}

为了可以发送和接受这个消息持有对象,我们还需要需要一个用来序列化和反序列化的工厂

public interface CodecFactory {
	byte[] serialize(Object obj) throws IOException;
	Object deSerialize(byte[] in) throws IOException;
}

下面是编码解码的实现类,用了hessian来实现,大家可以自行选择序列化方式

public class HessionCodecFactory implements CodecFactory {
	private final Logger logger = Logger.getLogger(HessionCodecFactory.class);
	@Override
	public byte[] serialize(Object obj) throws IOException {
		ByteArrayOutputStream baos = null;
		HessianOutput output = null;
		try {
			baos = new ByteArrayOutputStream(1024);
			output = new HessianOutput(baos);
			output.startCall();
			output.writeObject(obj);
			output.completeCall();
		} catch (final IOException ex) {
			throw ex;
		} finally {
			if (output != null) {
				try {
					baos.close();
				} catch (final IOException ex) {
					this.logger.error("Failed to close stream.", ex);
				}
			}
		}
		return baos != null ? baos.toByteArray() : null;
	}
	@Override
	public Object deSerialize(byte[] in) throws IOException {
		Object obj = null;
		ByteArrayInputStream bais = null;
		HessianInput input = null;
		try {
			bais = new ByteArrayInputStream(in);
			input = new HessianInput(bais);
			input.startReply();
			obj = input.readObject();
			input.completeReply();
		} catch (final IOException ex) {
			throw ex;
		} catch (final Throwable e) {
			this.logger.error("Failed to decode object.", e);
		} finally {
			if (input != null) {
				try {
					bais.close();
				} catch (final IOException ex) {
					this.logger.error("Failed to close stream.", ex);
  }
  }
 }
 return obj;
	}
}

接下来就先实现发送功能,新增一个接口专门用来实现发送功能

public interface EventTemplate {
	void send(String queueName,String exchangeName,Object eventContent) throws SendRefuseException;
	void send(String queueName,String exchangeName,Object eventContent,CodecFactory codecFactory) throws SendRefuseException;
}

SendRefuseException是自定义的发送失败异常类
下面是它的实现类,主要的任务就是将数据转换为EventMessage

public class DefaultEventTemplate implements EventTemplate {
	private static final Logger logger = Logger.getLogger(DefaultEventTemplate.class);
	private AmqpTemplate eventAmqpTemplate;
	private CodecFactory defaultCodecFactory;
//	private DefaultEventController eec;
//	public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,
//			CodecFactory defaultCodecFactory, DefaultEventController eec) {
//		this.eventAmqpTemplate = eopAmqpTemplate;
//		this.defaultCodecFactory = defaultCodecFactory;
//		this.eec = eec;
//	}
	public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,CodecFactory defaultCodecFactory) {
		this.eventAmqpTemplate = eopAmqpTemplate;
		this.defaultCodecFactory = defaultCodecFactory;
	}
	@Override
	public void send(String queueName, String exchangeName, Object eventContent)
			throws SendRefuseException {
		this.send(queueName, exchangeName, eventContent, defaultCodecFactory);
	} 
	@Override
	public void send(String queueName, String exchangeName, Object eventContent,
			CodecFactory codecFactory) throws SendRefuseException {
		if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName)) {
			throw new SendRefuseException("queueName exchangeName can not be empty.");
		}
//		if (!eec.beBinded(exchangeName, queueName))
//			eec.declareBinding(exchangeName, queueName);
		byte[] eventContentBytes = null;
		if (codecFactory == null) {
			if (eventContent == null) {
				logger.warn("Find eventContent is null,are you sure...");
			} else {
				throw new SendRefuseException(
						"codecFactory must not be null ,unless eventContent is null");
			}
		} else {
			try {
				eventContentBytes = codecFactory.serialize(eventContent);
			} catch (IOException e) {
				throw new SendRefuseException(e);
			}
		}
		// 构造成Message
		EventMessage msg = new EventMessage(queueName, exchangeName,
				eventContentBytes);
		try {
			eventAmqpTemplate.convertAndSend(exchangeName, queueName, msg);
		} catch (AmqpException e) {
			logger.error("send event fail. Event Message : [" + eventContent + "]", e);
			throw new SendRefuseException("send event fail", e);
		}
	}
}

注释的地方稍后会用到,主要是防止数据数据发送的地方没有事先声明
然后我们再实现接受消息
首先我们需要一个消费接口,所有的消费程序都实现这个类

public interface EventProcesser {
	public void process(Object e);
}

 为了能够将不同类型的消息交由对应的程序来处理,我们还需要一个消息处理适配器

/**
 * MessageListenerAdapter的Pojo
 * 

消息处理适配器,主要功能:

*

1、将不同的消息类型绑定到对应的处理器并本地缓存,如将queue01+exchange01的消息统一交由A处理器来出来

*

2、执行消息的消费分发,调用相应的处理器来消费属于它的消息

* */ public class MessageAdapterHandler { private static final Logger logger = Logger.getLogger(MessageAdapterHandler.class); private ConcurrentMap epwMap; public MessageAdapterHandler() { this.epwMap = new ConcurrentHashMap(); } public void handleMessage(EventMessage eem) { logger.debug("Receive an EventMessage: [" + eem + "]"); // 先要判断接收到的message是否是空的,在某些异常情况下,会产生空值 if (eem == null) { logger.warn("Receive an null EventMessage, it may product some errors, and processing message is canceled."); return; } if (StringUtils.isEmpty(eem.getQueueName()) || StringUtils.isEmpty(eem.getExchangeName())) { logger.warn("The EventMessage's queueName and exchangeName is empty, this is not allowed, and processing message is canceled."); return; } // 解码,并交给对应的EventHandle执行 EventProcessorWrap eepw = epwMap.get(eem.getQueueName()+"|"+eem.getExchangeName()); if (eepw == null) { logger.warn("Receive an EopEventMessage, but no processor can do it."); return; } try { eepw.process(eem.getEventData()); } catch (IOException e) { logger.error("Event content can not be Deserialized, check the provided CodecFactory.",e); return; } } protected void add(String queueName, String exchangeName, EventProcesser processor,CodecFactory codecFactory) { if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName) || processor == null || codecFactory == null) { throw new RuntimeException("queueName and exchangeName can not be empty,and processor or codecFactory can not be null. "); } EventProcessorWrap epw = new EventProcessorWrap(codecFactory,processor); EventProcessorWrap oldProcessorWrap = epwMap.putIfAbsent(queueName + "|" + exchangeName, epw); if (oldProcessorWrap != null) { logger.warn("The processor of this queue and exchange exists, and the new one can't be add"); } } protected Set getAllBinding() { Set keySet = epwMap.keySet(); return keySet; } protected static class EventProcessorWrap { private CodecFactory codecFactory; private EventProcesser eep; protected EventProcessorWrap(CodecFactory codecFactory, EventProcesser eep) { this.codecFactory = codecFactory; this.eep = eep; } public void process(byte[] eventData) throws IOException{ Object obj = codecFactory.deSerialize(eventData); eep.process(obj); } } }

这是正常情况下的消息处理方式,如果rabbitmq消息接受发生异常,也要监控到,新增一个消费类专门用来处理错误异常的消息

public class MessageErrorHandler implements ErrorHandler{
	private static final Logger logger = Logger.getLogger(MessageErrorHandler.class);
	@Override
	public void handleError(Throwable t) {
		logger.error("RabbitMQ happen a error:" + t.getMessage(), t);
	}
}

接下来我们可能需要一个专门配置和rabbitmq通信的一些信息,比如地址,端口等信息

public class EventControlConfig {
	private final static int DEFAULT_PORT = 5672;
	private final static String DEFAULT_USERNAME = "guest";
	private final static String DEFAULT_PASSWORD = "guest";
	private final static int DEFAULT_PROCESS_THREAD_NUM = Runtime.getRuntime().availableProcessors() * 2;
	private static final int PREFETCH_SIZE = 1;
	private String serverHost ;
	private int port = DEFAULT_PORT;
	private String username = DEFAULT_USERNAME;
	private String password = DEFAULT_PASSWORD;
	private String virtualHost;
	/**
	 * 和rabbitmq建立连接的超时时间
	 */
	private int connectionTimeout = 0;
	/**
	 * 事件消息处理线程数,默认是 CPU核数 * 2
	 */
	private int eventMsgProcessNum;
	/**
	 * 每次消费消息的预取值
	 */
	private int prefetchSize;
	
	public EventControlConfig(String serverHost) {
		this(serverHost,DEFAULT_PORT,DEFAULT_USERNAME,DEFAULT_PASSWORD,null,0,DEFAULT_PROCESS_THREAD_NUM,DEFAULT_PROCESS_THREAD_NUM,new HessionCodecFactory());
	}
	public EventControlConfig(String serverHost, int port, String username,
			String password, String virtualHost, int connectionTimeout,
			int eventMsgProcessNum,int prefetchSize,CodecFactory defaultCodecFactory) {
		this.serverHost = serverHost;
		this.port = port>0?port:DEFAULT_PORT;
		this.username = username;
		this.password = password;
		this.virtualHost = virtualHost;
		this.connectionTimeout = connectionTimeout>0?connectionTimeout:0;
		this.eventMsgProcessNum = eventMsgProcessNum>0?eventMsgProcessNum:DEFAULT_PROCESS_THREAD_NUM;
		this.prefetchSize = prefetchSize>0?prefetchSize:PREFETCH_SIZE;
	}
	public String getServerHost() {
		return serverHost;
	}
	public int getPort() {
		return port;
	}
	public String getUsername() {
		return username;
	}
	public String getPassword() {
		return password;
	}
	public String getVirtualHost() {
		return virtualHost;
	}
	public int getConnectionTimeout() {
		return connectionTimeout;
	}
	public int getEventMsgProcessNum() {
		return eventMsgProcessNum;
	}
	public int getPrefetchSize() {
		return prefetchSize;
	}
}

具体的发送、接受程序已经好了,接下来也是最重要的就是管理控制和rabbitmq的通信

public interface EventController {	
	/**
	 * 控制器启动方法
	 */
	void start();
	/**
	 * 获取发送模版
	 */
	EventTemplate getEopEventTemplate();
	/**
	 * 绑定消费程序到对应的exchange和queue
	 */
	EventController add(String queueName, String exchangeName, EventProcesser eventProcesser);
	/*in map, the key is queue name, but value is exchange name*/
	EventController add(Map bindings, EventProcesser eventProcesser);
}

它的实现类如下:

/**
 * 和rabbitmq通信的控制器,主要负责:
 * 

1、和rabbitmq建立连接

*

2、声明exChange和queue以及它们的绑定关系

*

3、启动消息监听容器,并将不同消息的处理者绑定到对应的exchange和queue上

*

4、持有消息发送模版以及所有exchange、queue和绑定关系的本地缓存

* @author yangyong * */ public class DefaultEventController implements EventController { private CachingConnectionFactory rabbitConnectionFactory; private EventControlConfig config; private RabbitAdmin rabbitAdmin; private CodecFactory defaultCodecFactory = new HessionCodecFactory(); private SimpleMessageListenerContainer msgListenerContainer; // rabbitMQ msg listener container private MessageAdapterHandler msgAdapterHandler = new MessageAdapterHandler(); private MessageConverter serializerMessageConverter = new SerializerMessageConverter(); // 直接指定 //queue cache, key is exchangeName private Map exchanges = new HashMap(); //queue cache, key is queueName private Map queues = new HashMap(); //bind relation of queue to exchange cache, value is exchangeName | queueName private Set binded = new HashSet(); private EventTemplate eventTemplate; // 给App使用的Event发送客户端 private AtomicBoolean isStarted = new AtomicBoolean(false); private static DefaultEventController defaultEventController; public synchronized static DefaultEventController getInstance(EventControlConfig config){ if(defaultEventController==null){ defaultEventController = new DefaultEventController(config); } return defaultEventController; } private DefaultEventController(EventControlConfig config){ if (config == null) { throw new IllegalArgumentException("Config can not be null."); } this.config = config; initRabbitConnectionFactory(); // 初始化AmqpAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory); // 初始化RabbitTemplate RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory); rabbitTemplate.setMessageConverter(serializerMessageConverter); eventTemplate = new DefaultEventTemplate(rabbitTemplate,defaultCodecFactory, this); } /** * 初始化rabbitmq连接 */ private void initRabbitConnectionFactory() { rabbitConnectionFactory = new CachingConnectionFactory(); rabbitConnectionFactory.setHost(config.getServerHost()); rabbitConnectionFactory.setChannelCacheSize(config.getEventMsgProcessNum()); rabbitConnectionFactory.setPort(config.getPort()); rabbitConnectionFactory.setUsername(config.getUsername()); rabbitConnectionFactory.setPassword(config.getPassword()); if (!StringUtils.isEmpty(config.getVirtualHost())) { rabbitConnectionFactory.setVirtualHost(config.getVirtualHost()); } } /** * 注销程序 */ public synchronized void destroy() throws Exception { if (!isStarted.get()) { return; } msgListenerContainer.stop(); eventTemplate = null; rabbitAdmin = null; rabbitConnectionFactory.destroy(); } @Override public void start() { if (isStarted.get()) { return; } Set mapping = msgAdapterHandler.getAllBinding(); for (String relation : mapping) { String[] relaArr = relation.split("\\|"); declareBinding(relaArr[1], relaArr[0]); } initMsgListenerAdapter(); isStarted.set(true); } /** * 初始化消息监听器容器 */ private void initMsgListenerAdapter(){ MessageListener listener = new MessageListenerAdapter(msgAdapterHandler,serializerMessageConverter); msgListenerContainer = new SimpleMessageListenerContainer(); msgListenerContainer.setConnectionFactory(rabbitConnectionFactory); msgListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); msgListenerContainer.setMessageListener(listener); msgListenerContainer.setErrorHandler(new MessageErrorHandler()); msgListenerContainer.setPrefetchCount(config.getPrefetchSize()); // 设置每个消费者消息的预取值 msgListenerContainer.setConcurrentConsumers(config.getEventMsgProcessNum()); msgListenerContainer.setTxSize(config.getPrefetchSize());//设置有事务时处理的消息数 msgListenerContainer.setQueues(queues.values().toArray(new Queue[queues.size()])); msgListenerContainer.start(); } @Override public EventTemplate getEopEventTemplate() { return eventTemplate; } @Override public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser) { return add(queueName, exchangeName, eventProcesser, defaultCodecFactory); } public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser,CodecFactory codecFactory) { msgAdapterHandler.add(queueName, exchangeName, eventProcesser, defaultCodecFactory); if(isStarted.get()){ initMsgListenerAdapter(); } return this; } @Override public EventController add(Map bindings, EventProcesser eventProcesser) { return add(bindings, eventProcesser,defaultCodecFactory); } public EventController add(Map bindings, EventProcesser eventProcesser, CodecFactory codecFactory) { for(Map.Entry item: bindings.entrySet()) msgAdapterHandler.add(item.getKey(),item.getValue(), eventProcesser,codecFactory); return this; } /** * exchange和queue是否已经绑定 */ protected boolean beBinded(String exchangeName, String queueName) { return binded.contains(exchangeName+"|"+queueName); } /** * 声明exchange和queue已经它们的绑定关系 */ protected synchronized void declareBinding(String exchangeName, String queueName) { String bindRelation = exchangeName+"|"+queueName; if (binded.contains(bindRelation)) return; boolean needBinding = false; DirectExchange directExchange = exchanges.get(exchangeName); if(directExchange == null) { directExchange = new DirectExchange(exchangeName, true, false, null); exchanges.put(exchangeName, directExchange); rabbitAdmin.declareExchange(directExchange);//声明exchange needBinding = true; } Queue queue = queues.get(queueName); if(queue == null) { queue = new Queue(queueName, true, false, false); queues.put(queueName, queue); rabbitAdmin.declareQueue(queue); //声明queue needBinding = true; } if(needBinding) { Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName);//将queue绑定到exchange rabbitAdmin.declareBinding(binding);//声明绑定关系 binded.add(bindRelation); } } }

搞定,现在可以将DefaultEventTemplate里的注释去掉了,接下来最后完成单元测试,为了测试传递对象,建立一个PO

@SuppressWarnings("serial")
public class People implements Serializable{
	private int id;
	private String name;
	private boolean male;
	private People spouse;
	private List friends;
	public int getId() {
		return id;
	}
	public void setId(int id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public boolean isMale() {
		return male;
	}
	public void setMale(boolean male) {
		this.male = male;
	}
	public People getSpouse() {
		return spouse;
	}
	public void setSpouse(People spouse) {
		this.spouse = spouse;
	}
	public List getFriends() {
		return friends;
	}
	public void setFriends(List friends) {
		this.friends = friends;
	}
	@Override
	public String toString() {
		// TODO Auto-generated method stub
		return "People[id="+id+",name="+name+",male="+male+"]";
	}
}

建立单元测试

public class RabbitMqTest{
	private String defaultHost = "127.0.0.1";
	private String defaultExchange = "EXCHANGE_DIRECT_TEST";
	private String defaultQueue = "QUEUE_TEST";
	private DefaultEventController controller;
	private EventTemplate eventTemplate;
	@Before
	public void init() throws IOException{
		EventControlConfig config = new EventControlConfig(defaultHost);
		controller = DefaultEventController.getInstance(config);
		eventTemplate = controller.getEopEventTemplate();
		controller.add(defaultQueue, defaultExchange, new ApiProcessEventProcessor());
		controller.start();
	}
	@Test
	public void sendString() throws SendRefuseException{
		eventTemplate.send(defaultQueue, defaultExchange, "hello world");
	}	
	@Test
	public void sendObject() throws SendRefuseException{
		eventTemplate.send(defaultQueue, defaultExchange, mockObj());
	}	
	@Test
	public void sendTemp() throws SendRefuseException, InterruptedException{
		String tempExchange = "EXCHANGE_DIRECT_TEST_TEMP";//以前未声明的exchange
		String tempQueue = "QUEUE_TEST_TEMP";//以前未声明的queue
		eventTemplate.send(tempQueue, tempExchange, mockObj());
		//发送成功后此时不会接受到消息,还需要绑定对应的消费程序
		controller.add(tempQueue, tempExchange, new ApiProcessEventProcessor());
	}	
	@After
	public void end() throws InterruptedException{
		Thread.sleep(2000);
	}	
	private People mockObj(){
		People jack = new People();
		jack.setId(1);
		jack.setName("JACK");
		jack.setMale(true);
		
		List friends = new ArrayList<>();
		friends.add(jack);
		People hanMeiMei = new People();
		hanMeiMei.setId(1);
		hanMeiMei.setName("韩梅梅");
		hanMeiMei.setMale(false);
		hanMeiMei.setFriends(friends);
		
		People liLei = new People();
		liLei.setId(2);
		liLei.setName("李雷");
		liLei.setMale(true);
		liLei.setFriends(friends);
		liLei.setSpouse(hanMeiMei);
		hanMeiMei.setSpouse(liLei);
		return hanMeiMei;
	}
	class ApiProcessEventProcessor implements EventProcesser{
		@Override
		public void process(Object e) {//消费程序这里只是打印信息
			Assert.assertNotNull(e);
			System.out.println(e);
			if(e instanceof People){
				People people = (People)e;
				System.out.println(people.getSpouse());
				System.out.println(people.getFriends());
			}
		}
	}
}

看完上述内容,你们掌握怎么在Java项目中利用rabbitMQ实现一个消息收发功能的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注创新互联行业资讯频道,感谢各位的阅读!


本文题目:怎么在Java项目中利用rabbitMQ实现一个消息收发功能
地址分享:http://pwwzsj.com/article/ghpceg.html