NutzCN Logo
问答 nutz如何实现redis的读写分离?
发布于 3066天前 作者 qq_b93ca712 2638 次浏览 复制 上一个帖子 下一个帖子
标签:

nutz如何实现redis的读写分离?

4 回复

配jedis sharding连接池,搜一下,前几天才有人配过

来自炫酷的 NutzCN

@wendal 好像没有redis读写分离的,只有集成redis的。有帖子url么?

如果是redis-sentinel做主从, 把JedisSentinelPool配置成一个bean就好了

我们项目使用集群模式的。我这个类可以实现单机和集群。你自己改成nutz的吧

package sunyu.tools.redis;

import org.apache.commons.lang3.StringUtils;
import org.nutz.json.Json;
import org.nutz.lang.Lang;
import org.nutz.lang.Mirror;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import javax.annotation.PostConstruct;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
 * jedis工具类
 * <p>
 * 可以适应单机和集群模式
 *
 * @author 孙宇
 */
@Component
public class JedisTools implements ApplicationContextAware {

    private static final Logger logger = LoggerFactory.getLogger(Thread.currentThread().getClass());

    private JedisPool jedisPool;//单机模式的jedis连接池

    private JedisCluster jedisCluster;//集群模式的jedis连接池;内部已经实现了连接池

    @Value("${redis.type}")
    private Integer type;//0单机1集群
    @Value("${redis.cluster}")
    private String cluster;
    @Value("${redis.maxTotal}")
    private String maxTotal;
    @Value("${redis.maxIdle}")
    private String maxIdle;
    @Value("${redis.maxWaitMillis}")
    private String maxWaitMillis;
    @Value("${redis.testOnBorrow}")
    private String testOnBorrow;
    @Value("${redis.host}")
    private String host;
    @Value("${redis.port}")
    private Integer port;
    @Value("${redis.connectionTimeout}")
    private Integer connectionTimeout;
    @Value("${redis.password}")
    private String password;

    private Mirror<?> mirror = Mirror.me(JedisCluster.class);//反射镜像

    private ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @PostConstruct
    public void init() {
        logger.info("动态注入JedisPoolConfig到spring上下文");
        ConfigurableApplicationContext configurableApplicationContext = (ConfigurableApplicationContext) applicationContext;
        DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) configurableApplicationContext.getBeanFactory();
        BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(JedisPoolConfig.class);
        beanDefinitionBuilder.addPropertyValue("maxTotal", maxTotal);
        beanDefinitionBuilder.addPropertyValue("maxIdle", maxIdle);
        beanDefinitionBuilder.addPropertyValue("maxWaitMillis", maxWaitMillis);
        beanDefinitionBuilder.addPropertyValue("testOnBorrow", testOnBorrow);
        defaultListableBeanFactory.registerBeanDefinition("jedisPoolConfig", beanDefinitionBuilder.getRawBeanDefinition());//注册jedisPoolConfig

        if (type == 1) {
            logger.info("动态注入JedisCluster到spring上下文");

            Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>();

            List<Map<String, String>> clusterList = (List<Map<String, String>>) Json.fromJson(cluster);
            if (clusterList != null && clusterList.size() > 0) {
                for (Map<String, String> c : clusterList) {
                    jedisClusterNodes.add(new HostAndPort(c.get("host"), Integer.parseInt(c.get("port"))));
                }
            }

            beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(JedisCluster.class);
            beanDefinitionBuilder.addConstructorArgValue(jedisClusterNodes);
            beanDefinitionBuilder.addConstructorArgReference("jedisPoolConfig");
            beanDefinitionBuilder.setDestroyMethodName("close");
            defaultListableBeanFactory.registerBeanDefinition("jedisCluster", beanDefinitionBuilder.getRawBeanDefinition());//注册jedisCluster

            this.jedisCluster = (JedisCluster) this.applicationContext.getBean("jedisCluster");
        } else if (type == 0) {
            logger.info("动态注入JedisPool到spring上下文");

            beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(JedisPool.class);
            beanDefinitionBuilder.addConstructorArgReference("jedisPoolConfig");
            beanDefinitionBuilder.addConstructorArgValue(host);
            beanDefinitionBuilder.addConstructorArgValue(port);
            beanDefinitionBuilder.addConstructorArgValue(connectionTimeout);
            if (StringUtils.isNotBlank(password)) {
                beanDefinitionBuilder.addConstructorArgValue(password);
            }
            defaultListableBeanFactory.registerBeanDefinition("jedisPool", beanDefinitionBuilder.getRawBeanDefinition());//注册jedisPool

            this.jedisPool = (JedisPool) this.applicationContext.getBean("jedisPool");
        }
    }


    public Integer getType() {
        return type;
    }

    public Jedis getJedis() {
        return jedisPool.getResource();
    }

    public JedisPool getJedisPool() {
        return jedisPool;
    }

    public JedisCluster getJedisCluster() {
        return jedisCluster;
    }

    public <T> T execute(JedisCallback<T> action) {
        Jedis jedis = getJedis();
        try {
            return action.doInJedis(jedis);
        } catch (Throwable throwable) {
            throw Lang.wrapThrow(throwable);
        } finally {
            jedis.close();
        }
    }

    public <T> T clusterExecute(JedisClusterCallback<T> action) {
        JedisCluster jedis = getJedisCluster();
        try {
            return action.doInJedisCluster(jedis);
        } catch (Throwable throwable) {
            throw Lang.wrapThrow(throwable);
        }
    }

    public <T> T clusterExecute(String methodName,
                                Object... args) {
        if (methodName.equals("keys")) {
            return (T) clusterKeys(args[0].toString());
        } else {
            return (T) mirror.invoke(jedisCluster, methodName, args);
        }
    }

    /**
     * 由于JedisCluster没有实现keys操作,这里自己实现以下
     *
     * @param pattern
     *
     * @return
     */
    public Set<String> clusterKeys(String pattern) {
        Set<String> keys = new HashSet<>();
        Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
        for (String k : clusterNodes.keySet()) {
            JedisPool jp = clusterNodes.get(k);
            Jedis connection = jp.getResource();
            try {
                keys.addAll(connection.keys(pattern));
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                connection.close();
            }
        }
        return keys;
    }

    public Set<byte[]> clusterKeys(byte[] pattern) {
        Set<byte[]> keys = new HashSet<>();
        Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
        for (Object k : clusterNodes.keySet()) {
            JedisPool jp = clusterNodes.get(k);
            Jedis connection = jp.getResource();
            try {
                keys.addAll(connection.keys(pattern));
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                connection.close();
            }
        }
        return keys;
    }

    public void clusterFlushDB() {
        Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
        for (Object k : clusterNodes.keySet()) {
            JedisPool jp = clusterNodes.get(k);
            Jedis connection = jp.getResource();
            try {
                connection.flushDB();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                connection.close();
            }
        }
    }

    public Long clusterDbSize() {
        Long total = 0L;
        Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
        for (Object k : clusterNodes.keySet()) {
            JedisPool jp = clusterNodes.get(k);
            Jedis connection = jp.getResource();
            try {
                total += connection.dbSize();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                connection.close();
            }
        }
        return total;
    }

}

使用的时候,直接调用jedis远程api

@Test
    public void add1() {
        String status = jedisTools.execute(new JedisCallback<String>() {
            @Override
            public String doInJedis(Jedis jedis) throws Throwable {
                return jedis.set("key", "值");
            }
        });
        System.out.println(status);
    }
@Test
    public void t6() {
        String result = jedisTools.clusterExecute(new JedisClusterCallback<String>() {
            @Override
            public String doInJedisCluster(JedisCluster jedis) throws Throwable {
                return jedis.get("jjj");
            }
        });
        System.out.println(result);
    }
添加回复
请先登陆
回到顶部