nutz如何实现redis的读写分离?
4 回复
配jedis sharding连接池,搜一下,前几天才有人配过
来自炫酷的 NutzCN
@wendal 好像没有redis读写分离的,只有集成redis的。有帖子url么?
我们项目使用集群模式的。我这个类可以实现单机和集群。你自己改成nutz的吧
package sunyu.tools.redis;
import org.apache.commons.lang3.StringUtils;
import org.nutz.json.Json;
import org.nutz.lang.Lang;
import org.nutz.lang.Mirror;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import javax.annotation.PostConstruct;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* jedis工具类
* <p>
* 可以适应单机和集群模式
*
* @author 孙宇
*/
@Component
public class JedisTools implements ApplicationContextAware {
private static final Logger logger = LoggerFactory.getLogger(Thread.currentThread().getClass());
private JedisPool jedisPool;//单机模式的jedis连接池
private JedisCluster jedisCluster;//集群模式的jedis连接池;内部已经实现了连接池
@Value("${redis.type}")
private Integer type;//0单机1集群
@Value("${redis.cluster}")
private String cluster;
@Value("${redis.maxTotal}")
private String maxTotal;
@Value("${redis.maxIdle}")
private String maxIdle;
@Value("${redis.maxWaitMillis}")
private String maxWaitMillis;
@Value("${redis.testOnBorrow}")
private String testOnBorrow;
@Value("${redis.host}")
private String host;
@Value("${redis.port}")
private Integer port;
@Value("${redis.connectionTimeout}")
private Integer connectionTimeout;
@Value("${redis.password}")
private String password;
private Mirror<?> mirror = Mirror.me(JedisCluster.class);//反射镜像
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@PostConstruct
public void init() {
logger.info("动态注入JedisPoolConfig到spring上下文");
ConfigurableApplicationContext configurableApplicationContext = (ConfigurableApplicationContext) applicationContext;
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) configurableApplicationContext.getBeanFactory();
BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(JedisPoolConfig.class);
beanDefinitionBuilder.addPropertyValue("maxTotal", maxTotal);
beanDefinitionBuilder.addPropertyValue("maxIdle", maxIdle);
beanDefinitionBuilder.addPropertyValue("maxWaitMillis", maxWaitMillis);
beanDefinitionBuilder.addPropertyValue("testOnBorrow", testOnBorrow);
defaultListableBeanFactory.registerBeanDefinition("jedisPoolConfig", beanDefinitionBuilder.getRawBeanDefinition());//注册jedisPoolConfig
if (type == 1) {
logger.info("动态注入JedisCluster到spring上下文");
Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>();
List<Map<String, String>> clusterList = (List<Map<String, String>>) Json.fromJson(cluster);
if (clusterList != null && clusterList.size() > 0) {
for (Map<String, String> c : clusterList) {
jedisClusterNodes.add(new HostAndPort(c.get("host"), Integer.parseInt(c.get("port"))));
}
}
beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(JedisCluster.class);
beanDefinitionBuilder.addConstructorArgValue(jedisClusterNodes);
beanDefinitionBuilder.addConstructorArgReference("jedisPoolConfig");
beanDefinitionBuilder.setDestroyMethodName("close");
defaultListableBeanFactory.registerBeanDefinition("jedisCluster", beanDefinitionBuilder.getRawBeanDefinition());//注册jedisCluster
this.jedisCluster = (JedisCluster) this.applicationContext.getBean("jedisCluster");
} else if (type == 0) {
logger.info("动态注入JedisPool到spring上下文");
beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(JedisPool.class);
beanDefinitionBuilder.addConstructorArgReference("jedisPoolConfig");
beanDefinitionBuilder.addConstructorArgValue(host);
beanDefinitionBuilder.addConstructorArgValue(port);
beanDefinitionBuilder.addConstructorArgValue(connectionTimeout);
if (StringUtils.isNotBlank(password)) {
beanDefinitionBuilder.addConstructorArgValue(password);
}
defaultListableBeanFactory.registerBeanDefinition("jedisPool", beanDefinitionBuilder.getRawBeanDefinition());//注册jedisPool
this.jedisPool = (JedisPool) this.applicationContext.getBean("jedisPool");
}
}
public Integer getType() {
return type;
}
public Jedis getJedis() {
return jedisPool.getResource();
}
public JedisPool getJedisPool() {
return jedisPool;
}
public JedisCluster getJedisCluster() {
return jedisCluster;
}
public <T> T execute(JedisCallback<T> action) {
Jedis jedis = getJedis();
try {
return action.doInJedis(jedis);
} catch (Throwable throwable) {
throw Lang.wrapThrow(throwable);
} finally {
jedis.close();
}
}
public <T> T clusterExecute(JedisClusterCallback<T> action) {
JedisCluster jedis = getJedisCluster();
try {
return action.doInJedisCluster(jedis);
} catch (Throwable throwable) {
throw Lang.wrapThrow(throwable);
}
}
public <T> T clusterExecute(String methodName,
Object... args) {
if (methodName.equals("keys")) {
return (T) clusterKeys(args[0].toString());
} else {
return (T) mirror.invoke(jedisCluster, methodName, args);
}
}
/**
* 由于JedisCluster没有实现keys操作,这里自己实现以下
*
* @param pattern
*
* @return
*/
public Set<String> clusterKeys(String pattern) {
Set<String> keys = new HashSet<>();
Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
for (String k : clusterNodes.keySet()) {
JedisPool jp = clusterNodes.get(k);
Jedis connection = jp.getResource();
try {
keys.addAll(connection.keys(pattern));
} catch (Exception e) {
e.printStackTrace();
} finally {
connection.close();
}
}
return keys;
}
public Set<byte[]> clusterKeys(byte[] pattern) {
Set<byte[]> keys = new HashSet<>();
Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
for (Object k : clusterNodes.keySet()) {
JedisPool jp = clusterNodes.get(k);
Jedis connection = jp.getResource();
try {
keys.addAll(connection.keys(pattern));
} catch (Exception e) {
e.printStackTrace();
} finally {
connection.close();
}
}
return keys;
}
public void clusterFlushDB() {
Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
for (Object k : clusterNodes.keySet()) {
JedisPool jp = clusterNodes.get(k);
Jedis connection = jp.getResource();
try {
connection.flushDB();
} catch (Exception e) {
e.printStackTrace();
} finally {
connection.close();
}
}
}
public Long clusterDbSize() {
Long total = 0L;
Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
for (Object k : clusterNodes.keySet()) {
JedisPool jp = clusterNodes.get(k);
Jedis connection = jp.getResource();
try {
total += connection.dbSize();
} catch (Exception e) {
e.printStackTrace();
} finally {
connection.close();
}
}
return total;
}
}
使用的时候,直接调用jedis远程api
@Test
public void add1() {
String status = jedisTools.execute(new JedisCallback<String>() {
@Override
public String doInJedis(Jedis jedis) throws Throwable {
return jedis.set("key", "值");
}
});
System.out.println(status);
}
@Test
public void t6() {
String result = jedisTools.clusterExecute(new JedisClusterCallback<String>() {
@Override
public String doInJedisCluster(JedisCluster jedis) throws Throwable {
return jedis.get("jjj");
}
});
System.out.println(result);
}
添加回复
请先登陆