20235 月10
下载redisjson源码
https://github.com/RedisJSON/RedisJSON/releases
安装rustup
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
rutsc --version
编译redisjson
cargo build --release
修改redis.conf配置
loadmodule /usr/local/redis/module/librejson.dylib
重新启动 redis 后显示如下信息即可:
127.0.0.1:6379> module list
1) 1) "name"
2) "ReJSON"
3) "ver"
4) (integer) 20407
20144 月8
仅作参考。
编辑配置Nutz redis配置文件:
var ioc = {
jedisConfig : {
type : 'cn.xuetang.common.redis.JedisConfig',
fields : {
maxTotal : 200,
maxIdle : 10,
maxWaitMillis:10001,
testOnBorrow:true,
redisUrl:'127.0.0.1',
redisPort:6379,
redisTimeout:1000
}
}
}
package cn.xuetang.common.redis;
/**
* Created by Wizzer on 14-4-8.
*/
public class JedisConfig {
private int maxTotal;
private int maxIdle;
private int maxWaitMillis;
private boolean testOnBorrow;
private String redisUrl;
private int redisPort;
private int redisTimeout;//redis断开后自动重新连接间隔时间
public int getMaxTotal() {
return maxTotal;
}
public void setMaxTotal(int maxTotal) {
this.maxTotal = maxTotal;
}
public int getMaxIdle() {
return maxIdle;
}
public void setMaxIdle(int maxIdle) {
this.maxIdle = maxIdle;
}
public int getMaxWaitMillis() {
return maxWaitMillis;
}
public void setMaxWaitMillis(int maxWaitMillis) {
this.maxWaitMillis = maxWaitMillis;
}
public boolean isTestOnBorrow() {
return testOnBorrow;
}
public void setTestOnBorrow(boolean testOnBorrow) {
this.testOnBorrow = testOnBorrow;
}
public String getRedisUrl() {
return redisUrl;
}
public void setRedisUrl(String redisUrl) {
this.redisUrl = redisUrl;
}
public int getRedisPort() {
return redisPort;
}
public void setRedisPort(int redisPort) {
this.redisPort = redisPort;
}
public int getRedisTimeout() {
return redisTimeout;
}
public void setRedisTimeout(int redisTimeout) {
this.redisTimeout = redisTimeout;
}
}
Nutz入口类加载配置文件,config路径:
@Modules(scanPackage=true)
@Ok("raw")
@Fail("http:500")
@IocBy(type=ComboIocProvider.class,args={
"*org.nutz.ioc.loader.json.JsonLoader","config",
"*org.nutz.ioc.loader.annotation.AnnotationIocLoader","cn.xuetang"})
@SetupBy(value=StartSetup.class)
@UrlMappingBy(value=UrlMappingSet.class)
public class MainModule {
}
初始化配置参数,新建订阅消息处理线程:
public static JedisConfig REDIS_CONFIG;
//声明全局的redis连接池
public static ShardedJedisPool SHARDEDJEDIS_POOL=null;
public static JedisPool JEDIS_POOL=null;
public static void InitRedisConfig() {//初始化redis
REDIS_CONFIG = Mvcs.ctx.getDefaultIoc().get(JedisConfig.class);
JedisPoolUtil jedisPoolUtil=new JedisPoolUtil();
SHARDEDJEDIS_POOL = jedisPoolUtil.getShardedJedisPool();
JEDIS_POOL = jedisPoolUtil.getJedisPool();
}
new Thread(Mvcs.getIoc().get(ImageTask.class)).start();
创建Jedis连接池工具类:
package cn.xuetang.common.redis;
import cn.xuetang.common.config.Globals;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisShardInfo;
import redis.clients.jedis.ShardedJedisPool;
import java.util.ArrayList;
import java.util.List;
/**
* Created by Wizzer on 14-4-8.
*/
public class MyJedis {
private String redisUrl;
private int redisPort;
public JedisPoolConfig jedisPoolConfig(){
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(Globals.REDIS_CONFIG.getMaxTotal());
jedisPoolConfig.setMaxIdle(Globals.REDIS_CONFIG.getMaxIdle());
jedisPoolConfig.setMaxWaitMillis(Globals.REDIS_CONFIG.getMaxWaitMillis());
jedisPoolConfig.setTestOnBorrow(Globals.REDIS_CONFIG.isTestOnBorrow());
return jedisPoolConfig;
}
public JedisShardInfo jedisShardInfo(){
return new JedisShardInfo(redisUrl, redisPort);
}
public JedisPool jedisPool(){
return new JedisPool(jedisPoolConfig(),redisUrl, redisPort);
}
public ShardedJedisPool shardedJedisPool(){
this.redisUrl=Globals.REDIS_CONFIG.getRedisUrl();
this.redisPort=Globals.REDIS_CONFIG.getRedisPort();
List<JedisShardInfo> jedisList = new ArrayList<JedisShardInfo>();
jedisList.add(jedisShardInfo());
return new ShardedJedisPool(jedisPoolConfig(), jedisList);
}
}
实现自己的监听类:
package cn.xuetang.common.redis;
import org.nutz.log.Log;
import org.nutz.log.Logs;
import redis.clients.jedis.JedisPubSub;
/**
* 订阅监听类
* Created by Wizzer on 14-4-8.
*/
public class MyJedisListenter extends JedisPubSub {
private final static Log log = Logs.get();
// 取得订阅的消息后的处理
public void onMessage(String channel, String message) {
log.info(channel + "=" + message);
}
// 初始化订阅时候的处理
public void onSubscribe(String channel, int subscribedChannels) {
log.info(channel + "=" + subscribedChannels);
}
// 取消订阅时候的处理
public void onUnsubscribe(String channel, int subscribedChannels) {
log.info(channel + "=" + subscribedChannels);
}
// 初始化按表达式的方式订阅时候的处理
public void onPSubscribe(String pattern, int subscribedChannels) {
log.info(pattern + "=" + subscribedChannels);
}
// 取消按表达式的方式订阅时候的处理
public void onPUnsubscribe(String pattern, int subscribedChannels) {
log.info(pattern + "=" + subscribedChannels);
}
// 取得按表达式的方式订阅的消息后的处理
public void onPMessage(String pattern, String channel, String message) {
log.info(pattern + "=" + channel + "=" + message);
}
}
获取单例Jedis连接池:
package cn.xuetang.common.redis;
import cn.xuetang.common.config.Globals;
import org.nutz.log.Log;
import org.nutz.log.Logs;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.ShardedJedisPool;
import java.util.Date;
/**
* Created by Wizzer on 14-4-8.
*/
public class JedisPoolUtil {
private final static Log log = Logs.get();
public synchronized ShardedJedisPool getShardedJedisPool() {
if (Globals.SHARDEDJEDIS_POOL == null) {
MyJedis myJedis=new MyJedis();
Globals.SHARDEDJEDIS_POOL = myJedis.shardedJedisPool();
}
return Globals.SHARDEDJEDIS_POOL;
}
public synchronized JedisPool getJedisPool() {
if (Globals.JEDIS_POOL== null) {
MyJedis myJedis=new MyJedis();
Globals.JEDIS_POOL = myJedis.jedisPool();
}
return Globals.JEDIS_POOL;
}
}
数据入列同时发布订阅消息:
ShardedJedis shardedJedis=Globals.SHARDEDJEDIS_POOL.getResource();
txt.put("appid", appInfo.getId());
shardedJedis.lpush("image", Json.toJson(txt));
Jedis jedis=Globals.JEDISPOOL.getResource();
jedis.publish("newimage","true");
处理订阅消息的类:
package cn.xuetang.common.task;
import cn.xuetang.common.action.BaseAction;
import cn.xuetang.common.config.Globals;
import cn.xuetang.common.redis.MyJedisListenter;
import cn.xuetang.common.util.DateUtil;
import cn.xuetang.modules.baby.bean.Baby_image;
import cn.xuetang.modules.baby.bean.Baby_info;
import cn.xuetang.modules.user.bean.User_conn_wx;
import org.nutz.dao.Cnd;
import org.nutz.dao.Dao;
import org.nutz.ioc.loader.annotation.Inject;
import org.nutz.ioc.loader.annotation.IocBean;
import org.nutz.json.Json;
import org.nutz.lang.Strings;
import org.nutz.log.Log;
import org.nutz.log.Logs;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.ShardedJedis;
import java.util.*;
/**
* Created by Wizzer on 14-3-31.
*/
@IocBean
public class ImageTask extends BaseAction implements Runnable {
@Inject
protected Dao dao;
private final static Log log = Logs.get();
public void run() {
try {
log.info("ImageTask start!");
final JedisPoolUtil jedisPoolUtil = new JedisPoolUtil();
boolean isEnable = false;
while (!isEnable) {
try {
jedisPoolUtil.getShardedJedisPool().getResource();
isEnable = true;
} catch (Exception e) {
log.info("The redis connection is not successful,wait " + Globals.REDIS_CONFIG.getRedisTimeout() + "ms try again.");
}
try {
wait(Globals.REDIS_CONFIG.getRedisTimeout());
} catch (Exception e) {
}
}
final ShardedJedis shardedJedis = jedisPoolUtil.getShardedJedisPool().getResource();
final Jedis jedis = jedisPoolUtil.getJedisPool().getResource();
MyJedisListenter listenter = new MyJedisListenter() {
@Override
public void onMessage(String channel, String message) {
//业务代码具体实现
while (shardedJedis.exists("image")) {
String data = shardedJedis.rpop("image");
}
}
};
jedis.subscribe(listenter, "newimage");
} catch (Exception e) {
log.error(e);
}
}
}