20144 月8
Nutz:结合Jedis实现Redis消息订阅和缓存队列(支持自动重连)
仅作参考。
编辑配置Nutz redis配置文件:
var ioc = { jedisConfig : { type : 'cn.xuetang.common.redis.JedisConfig', fields : { maxTotal : 200, maxIdle : 10, maxWaitMillis:10001, testOnBorrow:true, redisUrl:'127.0.0.1', redisPort:6379, redisTimeout:1000 } } }
package cn.xuetang.common.redis; /** * Created by Wizzer on 14-4-8. */ public class JedisConfig { private int maxTotal; private int maxIdle; private int maxWaitMillis; private boolean testOnBorrow; private String redisUrl; private int redisPort; private int redisTimeout;//redis断开后自动重新连接间隔时间 public int getMaxTotal() { return maxTotal; } public void setMaxTotal(int maxTotal) { this.maxTotal = maxTotal; } public int getMaxIdle() { return maxIdle; } public void setMaxIdle(int maxIdle) { this.maxIdle = maxIdle; } public int getMaxWaitMillis() { return maxWaitMillis; } public void setMaxWaitMillis(int maxWaitMillis) { this.maxWaitMillis = maxWaitMillis; } public boolean isTestOnBorrow() { return testOnBorrow; } public void setTestOnBorrow(boolean testOnBorrow) { this.testOnBorrow = testOnBorrow; } public String getRedisUrl() { return redisUrl; } public void setRedisUrl(String redisUrl) { this.redisUrl = redisUrl; } public int getRedisPort() { return redisPort; } public void setRedisPort(int redisPort) { this.redisPort = redisPort; } public int getRedisTimeout() { return redisTimeout; } public void setRedisTimeout(int redisTimeout) { this.redisTimeout = redisTimeout; } }
Nutz入口类加载配置文件,config路径:
@Modules(scanPackage=true) @Ok("raw") @Fail("http:500") @IocBy(type=ComboIocProvider.class,args={ "*org.nutz.ioc.loader.json.JsonLoader","config", "*org.nutz.ioc.loader.annotation.AnnotationIocLoader","cn.xuetang"}) @SetupBy(value=StartSetup.class) @UrlMappingBy(value=UrlMappingSet.class) public class MainModule { }
初始化配置参数,新建订阅消息处理线程:
public static JedisConfig REDIS_CONFIG; //声明全局的redis连接池 public static ShardedJedisPool SHARDEDJEDIS_POOL=null; public static JedisPool JEDIS_POOL=null; public static void InitRedisConfig() {//初始化redis REDIS_CONFIG = Mvcs.ctx.getDefaultIoc().get(JedisConfig.class); JedisPoolUtil jedisPoolUtil=new JedisPoolUtil(); SHARDEDJEDIS_POOL = jedisPoolUtil.getShardedJedisPool(); JEDIS_POOL = jedisPoolUtil.getJedisPool(); } new Thread(Mvcs.getIoc().get(ImageTask.class)).start();
创建Jedis连接池工具类:
package cn.xuetang.common.redis; import cn.xuetang.common.config.Globals; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import redis.clients.jedis.JedisShardInfo; import redis.clients.jedis.ShardedJedisPool; import java.util.ArrayList; import java.util.List; /** * Created by Wizzer on 14-4-8. */ public class MyJedis { private String redisUrl; private int redisPort; public JedisPoolConfig jedisPoolConfig(){ JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(Globals.REDIS_CONFIG.getMaxTotal()); jedisPoolConfig.setMaxIdle(Globals.REDIS_CONFIG.getMaxIdle()); jedisPoolConfig.setMaxWaitMillis(Globals.REDIS_CONFIG.getMaxWaitMillis()); jedisPoolConfig.setTestOnBorrow(Globals.REDIS_CONFIG.isTestOnBorrow()); return jedisPoolConfig; } public JedisShardInfo jedisShardInfo(){ return new JedisShardInfo(redisUrl, redisPort); } public JedisPool jedisPool(){ return new JedisPool(jedisPoolConfig(),redisUrl, redisPort); } public ShardedJedisPool shardedJedisPool(){ this.redisUrl=Globals.REDIS_CONFIG.getRedisUrl(); this.redisPort=Globals.REDIS_CONFIG.getRedisPort(); List<JedisShardInfo> jedisList = new ArrayList<JedisShardInfo>(); jedisList.add(jedisShardInfo()); return new ShardedJedisPool(jedisPoolConfig(), jedisList); } }
实现自己的监听类:
package cn.xuetang.common.redis; import org.nutz.log.Log; import org.nutz.log.Logs; import redis.clients.jedis.JedisPubSub; /** * 订阅监听类 * Created by Wizzer on 14-4-8. */ public class MyJedisListenter extends JedisPubSub { private final static Log log = Logs.get(); // 取得订阅的消息后的处理 public void onMessage(String channel, String message) { log.info(channel + "=" + message); } // 初始化订阅时候的处理 public void onSubscribe(String channel, int subscribedChannels) { log.info(channel + "=" + subscribedChannels); } // 取消订阅时候的处理 public void onUnsubscribe(String channel, int subscribedChannels) { log.info(channel + "=" + subscribedChannels); } // 初始化按表达式的方式订阅时候的处理 public void onPSubscribe(String pattern, int subscribedChannels) { log.info(pattern + "=" + subscribedChannels); } // 取消按表达式的方式订阅时候的处理 public void onPUnsubscribe(String pattern, int subscribedChannels) { log.info(pattern + "=" + subscribedChannels); } // 取得按表达式的方式订阅的消息后的处理 public void onPMessage(String pattern, String channel, String message) { log.info(pattern + "=" + channel + "=" + message); } }
获取单例Jedis连接池:
package cn.xuetang.common.redis; import cn.xuetang.common.config.Globals; import org.nutz.log.Log; import org.nutz.log.Logs; import redis.clients.jedis.JedisPool; import redis.clients.jedis.ShardedJedisPool; import java.util.Date; /** * Created by Wizzer on 14-4-8. */ public class JedisPoolUtil { private final static Log log = Logs.get(); public synchronized ShardedJedisPool getShardedJedisPool() { if (Globals.SHARDEDJEDIS_POOL == null) { MyJedis myJedis=new MyJedis(); Globals.SHARDEDJEDIS_POOL = myJedis.shardedJedisPool(); } return Globals.SHARDEDJEDIS_POOL; } public synchronized JedisPool getJedisPool() { if (Globals.JEDIS_POOL== null) { MyJedis myJedis=new MyJedis(); Globals.JEDIS_POOL = myJedis.jedisPool(); } return Globals.JEDIS_POOL; } }
数据入列同时发布订阅消息:
ShardedJedis shardedJedis=Globals.SHARDEDJEDIS_POOL.getResource(); txt.put("appid", appInfo.getId()); shardedJedis.lpush("image", Json.toJson(txt)); Jedis jedis=Globals.JEDISPOOL.getResource(); jedis.publish("newimage","true");
处理订阅消息的类:
package cn.xuetang.common.task; import cn.xuetang.common.action.BaseAction; import cn.xuetang.common.config.Globals; import cn.xuetang.common.redis.MyJedisListenter; import cn.xuetang.common.util.DateUtil; import cn.xuetang.modules.baby.bean.Baby_image; import cn.xuetang.modules.baby.bean.Baby_info; import cn.xuetang.modules.user.bean.User_conn_wx; import org.nutz.dao.Cnd; import org.nutz.dao.Dao; import org.nutz.ioc.loader.annotation.Inject; import org.nutz.ioc.loader.annotation.IocBean; import org.nutz.json.Json; import org.nutz.lang.Strings; import org.nutz.log.Log; import org.nutz.log.Logs; import redis.clients.jedis.Jedis; import redis.clients.jedis.ShardedJedis; import java.util.*; /** * Created by Wizzer on 14-3-31. */ @IocBean public class ImageTask extends BaseAction implements Runnable { @Inject protected Dao dao; private final static Log log = Logs.get(); public void run() { try { log.info("ImageTask start!"); final JedisPoolUtil jedisPoolUtil = new JedisPoolUtil(); boolean isEnable = false; while (!isEnable) { try { jedisPoolUtil.getShardedJedisPool().getResource(); isEnable = true; } catch (Exception e) { log.info("The redis connection is not successful,wait " + Globals.REDIS_CONFIG.getRedisTimeout() + "ms try again."); } try { wait(Globals.REDIS_CONFIG.getRedisTimeout()); } catch (Exception e) { } } final ShardedJedis shardedJedis = jedisPoolUtil.getShardedJedisPool().getResource(); final Jedis jedis = jedisPoolUtil.getJedisPool().getResource(); MyJedisListenter listenter = new MyJedisListenter() { @Override public void onMessage(String channel, String message) { //业务代码具体实现 while (shardedJedis.exists("image")) { String data = shardedJedis.rpop("image"); } } }; jedis.subscribe(listenter, "newimage"); } catch (Exception e) { log.error(e); } } }