Spring-data-redis: 分布式队列

 

    Redis中list数据结构,具有“双端队列”的特性,同时redis具有持久数据的能力,因此redis实现分布式队列是非常安全可靠的。它类似于JMS中的“Queue”,只不过功能和可靠性(事务性)并没有JMS严格。Redis本身的高性能和"便捷的"分布式设计(replicas,sharding),可以为实现"分布式队列"提供了良好的基础.

    Redis中的队列阻塞时,整个connection都无法继续进行其他操作,因此在基于连接池设计是需要注意。

    我们通过spring-data-redis,来实现“同步队列”,设计风格类似与JMS。不过本实例中,并没有提供关于队列消费之后的消息确认机制,如果你感兴趣可以自己尝试实现它。

    1) Redis中的"队列"为双端队列,基于list数据结构实现,并提供了"队列阻塞"功能.

    2) 如果你期望使用redis做"分布式队列"server,且数据存取较为密集时,务必配置(redis.conf)中关于list数据结构的限制:

Java代码  收藏代码
  1. //当list中数据个数达到阀值是,将会被重构为linkedlist  
  2. //如果队列的存/取速度较为接近,此值可以稍大  
  3. list-max-ziplist-entries 5120  
  4. list-max-ziplist-value 1024  

    3) Redis已经提供了"队列"的持久化能力,无需额外的技术支持

    4) Redis并没有提供JMS语义中"queue"消息的消费确认的功能,即当队列中的消息被redis-client接收之后,并不会执行"确认消息已到达"的操作;如果你的分布式队列,需要严格的消息确认,需要额外的技术支持.

    5) Redis并不能像JMS那样提供高度中心化的"队列"服务集群,它更适合"快速/小巧/及时消费"的情景.

    6) 本例中,对于消息的接收,是在一个后台线程中进行(参见下文RedisQueue),其实我们可以使用线程池的方式来做,以提高性能. 不过此方案,需要基于2个前提:

        A) 如果单个queue中的消息较多,且每条消息的处理时间较长(即消费速度比接收的速度慢)

        B) 如果此线程池可以被多个queue公用线程资源 ,如果一个queue就创建一个线程池,实在是有些浪费且存在不安全问题.

        C) 需要确认,多线程环境中对queue的操作,有可能在客户端层面打乱了队列的顺序,而造成异常.比如线程1从queue中获得data1,线程2从queue中获得data2,有可能因为线程调度的问题,导致data2被优先执行.

 

一.配置文件:

Java代码  收藏代码
  1. <beans xmlns="http://www.springframework.org/schema/beans"   
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  3. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd" default-autowire="byName">  
  4.     <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">  
  5.         <property name="maxActive" value="32"></property>  
  6.         <property name="maxIdle" value="6"></property>  
  7.         <property name="maxWait" value="15000"></property>  
  8.         <property name="minEvictableIdleTimeMillis" value="300000"></property>  
  9.         <property name="numTestsPerEvictionRun" value="3"></property>  
  10.         <property name="timeBetweenEvictionRunsMillis" value="60000"></property>  
  11.         <property name="whenExhaustedAction" value="1"></property>  
  12.     </bean>  
  13.     <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" destroy-method="destroy">  
  14.         <property name="poolConfig" ref="jedisPoolConfig"></property>  
  15.         <property name="hostName" value="127.0.0.1"></property>  
  16.         <property name="port" value="6379"></property>  
  17.         <property name="password" value="0123456"></property>  
  18.         <property name="timeout" value="15000"></property>  
  19.         <property name="usePool" value="true"></property>  
  20.     </bean>  
  21.     <bean id="jedisTemplate" class="org.springframework.data.redis.core.RedisTemplate">  
  22.         <property name="connectionFactory" ref="jedisConnectionFactory"></property>  
  23.         <property name="defaultSerializer">  
  24.             <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>  
  25.         </property>  
  26.     </bean>  
  27.     <bean id="jedisQueueListener" class="com.sample.redis.sdr.QueueListener"/>  
  28.     <bean id="jedisQueue" class="com.sample.redis.sdr.RedisQueue" destroy-method="destroy">  
  29.         <property name="redisTemplate" ref="jedisTemplate"></property>  
  30.         <property name="key" value="user:queue"></property>  
  31.         <property name="listener" ref="jedisQueueListener"></property>  
  32.     </bean>  
  33. </beans>  

二.程序实例:

1) QueueListener:当队列中有数据时,可以执行类似于JMS的回调操作。

Java代码  收藏代码
  1. public interface RedisQueueListener<T> {  
  2.   
  3.     public void onMessage(T value);  
  4. }  
Java代码  收藏代码
  1. public class QueueListener<String> implements RedisQueueListener<String> {  
  2.   
  3.     @Override  
  4.     public void onMessage(String value) {  
  5.         System.out.println(value);  
  6.           
  7.     }  
  8.   
  9. }  

2) RedisQueue:队列操作,内部封装redisTemplate实例;如果配置了“listener”,那么queue将采用“消息回调”的方式执行,listenerThread是一个后台线程,用来自动处理“队列信息”。如果不配置“listener”,那么你可以将redisQueue注入到其他spring bean中,手动去“take”数据即可。

Java代码  收藏代码
  1. public class RedisQueue<T> implements InitializingBean,DisposableBean{  
  2.     private RedisTemplate redisTemplate;  
  3.     private String key;  
  4.     private int cap = Short.MAX_VALUE;//最大阻塞的容量,超过容量将会导致清空旧数据  
  5.     private byte[] rawKey;  
  6.     private RedisConnectionFactory factory;  
  7.     private RedisConnection connection;//for blocking  
  8.     private BoundListOperations<String, T> listOperations;//noblocking  
  9.       
  10.     private Lock lock = new ReentrantLock();//基于底层IO阻塞考虑  
  11.       
  12.     private RedisQueueListener listener;//异步回调  
  13.     private Thread listenerThread;  
  14.       
  15.     private boolean isClosed;  
  16.       
  17.     public void setRedisTemplate(RedisTemplate redisTemplate) {  
  18.         this.redisTemplate = redisTemplate;  
  19.     }  
  20.   
  21.     public void setListener(RedisQueueListener listener) {  
  22.         this.listener = listener;  
  23.     }  
  24.   
  25.     public void setKey(String key) {  
  26.         this.key = key;  
  27.     }  
  28.       
  29.   
  30.     @Override  
  31.     public void afterPropertiesSet() throws Exception {  
  32.         factory = redisTemplate.getConnectionFactory();  
  33.         connection = RedisConnectionUtils.getConnection(factory);  
  34.         rawKey = redisTemplate.getKeySerializer().serialize(key);  
  35.         listOperations = redisTemplate.boundListOps(key);  
  36.         if(listener != null){  
  37.             listenerThread = new ListenerThread();  
  38.             listenerThread.setDaemon(true);  
  39.             listenerThread.start();  
  40.         }  
  41.     }  
  42.       
  43.       
  44.     /** 
  45.      * blocking 
  46.      * remove and get last item from queue:BRPOP 
  47.      * @return 
  48.      */  
  49.     public T takeFromTail(int timeout) throws InterruptedException{   
  50.         lock.lockInterruptibly();  
  51.         try{  
  52.             List<byte[]> results = connection.bRPop(timeout, rawKey);  
  53.             if(CollectionUtils.isEmpty(results)){  
  54.                 return null;  
  55.             }  
  56.             return (T)redisTemplate.getValueSerializer().deserialize(results.get(1));  
  57.         }finally{  
  58.             lock.unlock();  
  59.         }  
  60.     }  
  61.       
  62.     public T takeFromTail() throws InterruptedException{  
  63.         return takeFromTail(0);  
  64.     }  
  65.       
  66.     /** 
  67.      * 从队列的头,插入 
  68.      */  
  69.     public void pushFromHead(T value){  
  70.         listOperations.leftPush(value);  
  71.     }  
  72.       
  73.     public void pushFromTail(T value){  
  74.         listOperations.rightPush(value);  
  75.     }  
  76.       
  77.     /** 
  78.      * noblocking 
  79.      * @return null if no item in queue 
  80.      */  
  81.     public T removeFromHead(){  
  82.         return listOperations.leftPop();  
  83.     }  
  84.       
  85.     public T removeFromTail(){  
  86.         return listOperations.rightPop();  
  87.     }  
  88.       
  89.     /** 
  90.      * blocking 
  91.      * remove and get first item from queue:BLPOP 
  92.      * @return 
  93.      */  
  94.     public T takeFromHead(int timeout) throws InterruptedException{  
  95.         lock.lockInterruptibly();  
  96.         try{  
  97.             List<byte[]> results = connection.bLPop(timeout, rawKey);  
  98.             if(CollectionUtils.isEmpty(results)){  
  99.                 return null;  
  100.             }  
  101.             return (T)redisTemplate.getValueSerializer().deserialize(results.get(1));  
  102.         }finally{  
  103.             lock.unlock();  
  104.         }  
  105.     }  
  106.       
  107.     public T takeFromHead() throws InterruptedException{  
  108.         return takeFromHead(0);  
  109.     }  
  110.   
  111.     @Override  
  112.     public void destroy() throws Exception {  
  113.         if(isClosed){  
  114.             return;  
  115.         }  
  116.         shutdown();  
  117.         RedisConnectionUtils.releaseConnection(connection, factory);  
  118.     }  
  119.       
  120.     private void shutdown(){  
  121.         try{  
  122.             listenerThread.interrupt();  
  123.         }catch(Exception e){  
  124.             //  
  125.         }  
  126.     }  
  127.       
  128.     class ListenerThread extends Thread {  
  129.           
  130.         @Override  
  131.         public void run(){  
  132.             try{  
  133.                 while(true){  
  134.                     T value = takeFromHead();//cast exception? you should check.  
  135.                     //逐个执行  
  136.                     if(value != null){  
  137.                         try{  
  138.                             listener.onMessage(value);  
  139.                         }catch(Exception e){  
  140.                             //  
  141.                         }  
  142.                     }  
  143.                 }  
  144.             }catch(InterruptedException e){  
  145.                 //  
  146.             }  
  147.         }  
  148.     }  
  149.       
  150. }  

    3) 使用与测试:

Java代码  收藏代码
  1. public static void main(String[] args) throws Exception{  
  2.     ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring-redis-beans.xml");  
  3.     RedisQueue<String> redisQueue = (RedisQueue)context.getBean("jedisQueue");  
  4.     redisQueue.pushFromHead("test:app");  
  5.     Thread.sleep(15000);  
  6.     redisQueue.pushFromHead("test:app");  
  7.     Thread.sleep(15000);  
  8.     redisQueue.destroy();  
  9. }  

    在程序运行期间,你可以通过redis-cli(客户端窗口)执行“lpush”,你会发现程序的控制台仍然能够正常打印队列信息。

 

 

 

 

 

2
0
分享到:
评论
2 楼 QING____ 2015-03-11  
myprincejava 写道
楼主,你这个jedis和spring-data-redis都是什么版本?

比较旧的版本,貌似是m1,不过最新release的版本中,可能部分接口已经改变。此文仅供参考。
1 楼 myprincejava 2015-03-11  
楼主,你这个jedis和spring-data-redis都是什么版本?