rabbitmq批量处理 -欧洲杯足彩官网

`
hbxflihua
  • 浏览: 653121 次
  • 性别:
  • 来自: 杭州
最近访客
博主相关
  • 博客
  • 微博
  • 相册
  • 收藏
  • 社区版块
    • ( 0)
    • ( 0)
    • ( 1)
    存档分类
    最新评论

    rabbitmq批量处理

    我们通过spring-amqp操作rabbitmq是极其简单的,消息的生产者和消费者只需要如下配置:

    客户端(生产者):connectionfactory、queue、exchange、messageconverter、rabbittemplate。

    服务端(消费者):connectionfactory、queue、exchange、messageconverter、listenercontainer。

     

    如果消息堆积严重,我们可以通过两种方式来处理消息,一种是在服务端开启监听多线程服务(concurrency="10"),另一种是让消息批量出队列。

     

    开启多线程的配置示例如下:

    	 
        	    
    	

     

    批量出队列的示例如下:

    客户端(消息生产者

    import java.math.bigdecimal;
    import org.junit.test;
    import org.springframework.beans.factory.annotation.autowired;
    import com.rd.account.domain.accountlog;
    import com.rd.ifaes.mq.producer.rabbitproducer;
    import com.rd.ifaes.web.basetest;
    /**
     * 消息生产者
     * @author lihua
     * @since 2018-04-08
     *
     */
    public class producer  extends basetest{
    	
    //	@autowired
    //	private rabbittemplate rabbittemplate;
    	
    	//这里对rabbittemplate做了简单的封装,您可以直接使用rabbittemplate
    	@autowired
    	private rabbitproducer rabbitproducer;
    	
    	private static final string queuename = "account_log_batch"; //mqconstant.routing_key_account_log_batch;
    	@test
    	public void main() {
    		for (int i = 0; i < 512; i  ) {
    			accountlog log = new accountlog("001", "001", "asdf", bigdecimal.valueof(i), "remark" i);
    			rabbitproducer.send(queuename, log);
    //			rabbittemplate.convertandsend(queuename, "hello"   i);
    		}
    		
    	}
    	
    }

     

    服务端(消息消费者)

    import java.text.simpledateformat;
    import java.util.arraylist;
    import java.util.date;
    import java.util.list;
    import org.junit.test;
    import org.slf4j.logger;
    import org.slf4j.loggerfactory;
    import org.springframework.amqp.rabbit.core.channelcallback;
    import org.springframework.amqp.rabbit.core.rabbittemplate;
    import org.springframework.beans.factory.annotation.autowired;
    import com.alibaba.fastjson.jsonobject;
    import com.rabbitmq.client.amqp;
    import com.rabbitmq.client.channel;
    import com.rabbitmq.client.queueingconsumer;
    /**
     * 消息消费者
     * @author lihua
     * @since 2018-04-08
     *
     */
    public class consumer extends basetest{
    	
    	private static final logger logger = loggerfactory.getlogger(consumer.class);
    	
    	@autowired
    	private rabbittemplate rabbittemplate;
    	@autowired
    	private accountlogservice accountlogservice;
    	private static final string queuename = "account_log_batch"; //mqconstant.routing_key_account_log_batch;
    	private static final int batch_size = 100;
    	
    	@test
    	public void consumer() {
        	while (true) {
        		rabbittemplate.execute(new channelcallback() {
        			@override
        			public string doinrabbit(channel channel) throws exception {
        				simpledateformat sdf = new simpledateformat("yyyy-mm-dd hh:mm:ss");
        		        try {
        		            final amqp.queue.declareok ok = channel.queuedeclare(queuename, true, false, false, null);
        		            int messagecount = ok.getmessagecount();
        		            logger.info("run consumer {}, msg count {}", sdf.format(new date()), messagecount);
        		            if (messagecount == 0) {
        		                return null;
        		            }
        		            list list = new arraylist<>();
        		            channel.basicqos(batch_size);
        		            queueingconsumer queueingconsumer = new queueingconsumer(channel);
        		            logger.info("channel id {}", integer.tohexstring(system.identityhashcode(channel)));
        		            final string inconsumertag = "test consumer"   sdf.format(new date());
        		            channel.basicconsume(queuename, false, inconsumertag, queueingconsumer);
        		            long messageid = -1;
        		            int dealedcount = 0;
        		            int i = batch_size;
        		            while (i-- > 0) {
        		                queueingconsumer.delivery delivery = queueingconsumer.nextdelivery(batch_size);
        		                if (delivery == null) {
        		                    break;
        		                }
        		                string msg = new string(delivery.getbody());
        		                accountlog log = jsonobject.parseobject(msg, accountlog.class);
        		                list.add(log);
        		                messageid = delivery.getenvelope().getdeliverytag();
        		                logger.info("get message {} delivery id {}", msg, messageid);
        		                dealedcount  ;
        		                if (dealedcount % 5 == 0) {
        		                    channel.basicack(messageid, true);
        		                    logger.info("batch ack message id =>{}", messageid);
        		                    messageid = -1;
        		                }
        		            }
        		            if (messageid > 0) {
        		                channel.basicack(messageid, true);
        		                logger.info("last to ack message id =>{}", messageid);
        		            }
        		            
        		            // 日志入库
        		            accountlogservice.savebatch(list);
        		            
        		        } finally {
        		            logger.info("consumer done {}", sdf.format(new date()));
        		        }
        		        channel.abort();
        				return null;
        			}
        		});
    			
        		try {
        			thread.sleep(5000);
        		} catch (interruptedexception e) {
        			
        		}
    		}
    	}
    }

     

    import org.junit.runner.runwith;
    import org.springframework.test.context.contextconfiguration;
    import org.springframework.test.context.junit4.springjunit4classrunner;
    @runwith(springjunit4classrunner.class)
    @contextconfiguration(locations={"classpath:spring-context.xml"})
    public abstract class basetest {
    }
    

     

    补一个服务端真实案例:

    import java.text.simpledateformat;
    import java.util.arraylist;
    import java.util.date;
    import java.util.list;
    import java.util.concurrent.callable;
    import java.util.concurrent.executorservice;
    import java.util.concurrent.executors;
    import javax.annotation.postconstruct;
    import org.slf4j.logger;
    import org.slf4j.loggerfactory;
    import org.springframework.amqp.rabbit.core.channelcallback;
    import org.springframework.amqp.rabbit.core.rabbittemplate;
    import org.springframework.beans.factory.annotation.autowired;
    import org.springframework.context.annotation.lazy;
    import org.springframework.stereotype.component;
    import com.alibaba.fastjson.jsonobject;
    import com.rabbitmq.client.amqp;
    import com.rabbitmq.client.channel;
    import com.rabbitmq.client.queueingconsumer;
    @component
    @lazy(value=false)
    public class accountlogbatchlistener {
    	
    	@autowired
    	private rabbittemplate rabbittemplate;
    	@autowired
    	private accountlogservice accountlogservice;
    	
    	private static final logger logger = loggerfactory.getlogger(accountlogbatchlistener.class);
    	private static final string queue_name = mqconstant.routing_key_account_log_batch;
    	private static final executorservice executor = executors.newfixedthreadpool(1);
    	private static final int batch_size = 100;
    	
    	@postconstruct
    	public void init(){		
    		executor.submit(new callable() {
    			@override
    			public string call() throws exception {
    				execute();
    				return null;
    			}			
    		});
    	}
    	
    	private void execute(){		
    		while (true) {
    			rabbittemplate.execute(new channelcallback() {
    				@override
    				public string doinrabbit(channel channel) throws exception {
    					simpledateformat sdf = new simpledateformat("yyyy-mm-dd hh:mm:ss");
    					try {		        	
    						final amqp.queue.declareok ok = channel.queuedeclare(queue_name, true, false, false, null);
    						int messagecount = ok.getmessagecount();
    						logger.debug("accountlogbatchlistener {}, msg count {}", sdf.format(new date()), messagecount);
    						if (messagecount == 0) {
    							return null;
    						}
    						list list = new arraylist<>();
    						channel.basicqos(batch_size);
    						queueingconsumer queueingconsumer = new queueingconsumer(channel);
    						logger.debug("channel id {}", integer.tohexstring(system.identityhashcode(channel)));
    						final string inconsumertag = "accountlogbatchlistener {}"   sdf.format(new date());
    						channel.basicconsume(queue_name, false, inconsumertag, queueingconsumer);
    						long messageid = -1;
    						int dealedcount = 0;
    						int i = batch_size;
    						while (i-- > 0) {
    							queueingconsumer.delivery delivery = queueingconsumer.nextdelivery(batch_size);
    							if (delivery == null) {
    								break;
    							}
    							string msg = new string(delivery.getbody());
    							accountlog log = jsonobject.parseobject(msg, accountlog.class);
    							list.add(log);
    							messageid = delivery.getenvelope().getdeliverytag();
    							logger.info(" userid {}, delivery id {}", log.getuserid(), messageid);
    							dealedcount  ;
    							if (dealedcount % 5 == 0) {
    								channel.basicack(messageid, true);
    								logger.debug("batch ack message id =>{}", messageid);
    								messageid = -1;
    							}
    						}
    						if (messageid > 0) {
    							channel.basicack(messageid, true);
    							logger.debug("last to ack message id =>{}", messageid);
    						}
    						
    						// 日志入库
    						accountlogservice.savebatch(list);
    						
    						
    					} finally {
    						logger.info("accountlogbatchlistener done {}", sdf.format(new date()));
    					}
    					channel.abort();
    					return null;
    				}
    			});
    			try {
    				thread.sleep(10000);
    			} catch (interruptedexception e) {
    			}
    		}
    	}
    }

     

    分享到:
    |
    评论

    相关推荐

      用php收发rabbitmq消息,分为send.php存入消息队列和get.php从消息队列中取出并处理。取出采用阻塞模式,需要在命令行下运行。

      主要为大家详细介绍了c#操作rabbitmq的完整实例,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

      极致的性能 :基于 scala 和 java 语言开发,设计中大量使用了批量处理和异步的思想,最高可以每秒处理千万级别的消息。 生态系统兼容性无可匹敌 :kafka 与周边生态系统的兼容性是最好的没有之一,尤其在大数据...

      spring-parentmaven父pom和子pom的版本号批量修改1 设置新的版本号./mvnw versions:set -dnewversion=2.4.12 撤销设置./mvnw versions:revert3 提交设置./mvnw versions:commit4.项目打包(同时处理项目所依赖的包)...

      测试前后支持完善的 hook 机制(用例等待,加解密等处理) 响应结果支持丰富的校验机制 结合 celery框架,无需额外的组件即可实现分布式异步定时自动化测试 测试结果统计报告简洁清晰,附带详尽测试报告 加入性能测试...

      redis批量查询优化 redis高性能集群之twemproxy of redis 数据存储 mongodb nosql简介及mongodb支持的数据类型分析 mongodb可视化客户端及javaapi实践 手写基于mongodb的orm框架 mongodb企业级集欧洲杯足彩官网的解决方案 ...

      今天我们来聊聊 kafka ,主要是带你重新认识一下 kafka,聊一下 kafka 中比较重要的概念和问题。在后面的文章中我会介绍: ...极致的性能 :基于 scala 和 java 语言开发,设计中大量使用了批量处理和异步的思想,最

      群聊批量ack处理,避免因创建过多的超时计时器导致的压力过大 利用leaf-sno 【备注】 1、该资源内项目代码都经过测试运行成功,功能ok的情况下才上传的,请放心下载使用!有问题请及时沟通交流。 2、适用人群:...

      设置npm安装跑步节点app.js 在可用时间内涵盖以下内容: 页面上传csv 显示结果它将上传csv 读取csv并将图像... 使用rabbitmq来排队和处理图像下载上载到s3 使用引导程序等使ui更好编写单元测试用例,端到端测试用例等

      特征cloud foundry logdrain端点ironio项目日志记录端点支持v2的hsdp日志记录api 批量上传消息(最多25条)以获得良好的性能非常精简,仅在32mb ram中运行仅过滤模式弹性apm支持分配logproxy作为分发: docker pull ...

      异常处理tryexcept 网络编程socket介绍 socket通信案例消息发送与接收 第8周 上节回顾 socket实现简单的ssh客户端 socket实现简单的ssh服务端 积极思考正能量 socket实现简单的ssh2 socket粘包 socket粘包深入编码...

      02 什么是异常处理及异常处理的两种方式对比 03 多分支与万能异常 04 异常处理的其他内容 05 什么时候用异常处理 06 什么是socket 07 套接字发展及分类 08 基于tcp协议的套接字编程 09 socket底层工作原理解释 10 ...

    global site tag (gtag.js) - google analytics
    网站地图