NutzCN Logo
问答 多线程批量导入、更新、删除数据,有一条线程操作失败,其他线程事务回滚失败.看下我下面的代码,哪里需要改进
发布于 86天前 作者 qq_9ad759a9 181 次浏览 复制 上一个帖子 下一个帖子
标签:
@Test
    public void test5(){
        List<Map> maps = new ArrayList<>();
        Chain chain = Chain.make("name","12312312313");
        Map map = new HashMap();
        map.put("chain",chain);
        map.put("tableName","text_l");
        maps.add(map);
        map = new HashMap();
        chain = Chain.make("name","12324234233");
        map.put("chain",chain);
        map.put("tableName","text_g");
        maps.add(map);
        map = new HashMap();
        chain = Chain.make("name","124234233");
        map.put("chain",chain);
        map.put("tableName","text_k");
        maps.add(map);
        int i = 0;//0成功  1  失败
        try {
            Trans.begin();//开起事务
            try {//开始做事
                // 创建一个线程池
                ThreadPoolExecutor tpl = new
                        ThreadPoolExecutor(0, maps.size(), 60L, TimeUnit.SECONDS,new SynchronousQueue<>());
                try {
                    ArrayList<Future<Boolean>> al = new ArrayList();
                    for (Map sqlList1 :maps) {
                        // 创建有返回值的任务
                        al.add(tpl.submit(new TextThread(dao,sqlList1)));
                    }
                    for(Future<Boolean> future:al){
                        while (true) {
                            /**
                             * 获得future对象之前可以使用isDone()方法检测future是否完成,完成后可以调用get()方法获得future的值,
                             * 如果直接调用get()方法,get()方法将阻塞值线程结束
                             */
                            if (future.isDone()) {
                                try {
                                    i = future.get() == false ? 1:0;
                                    break;
                                } catch (InterruptedException e) {
                                    i = 1;
                                    break;
                                } catch (ExecutionException e) {
                                    i = 1;
                                    break;
                                }
                            }
                        }
                        if (i == 1) {
                            break;
                        }
                    }
                } finally {
                    // 关闭线程池
                    tpl.shutdown();
                }
                if (i == 1) {
                    throw new Exception();
                }
                // 做完了
                Trans.commit();//提交事务
            } catch (Exception ee) {
                Trans.rollback();//出错了,回滚
            } finally {
                Trans.close();//关闭事务
            }
        } catch (Exception e) {
            e.printStackTrace();
            i = 1;
        }
        System.out.println(i == 0 ? "成功" : "失败");
    }
public class TextThread implements Callable<Boolean>{


    protected static final Log log = Logs.get();

    private Ioc ioc;

    private Dao dao;

    private Map map;

    public TextThread(Dao dao,Map map) {
        this.dao = dao;
        this.map = map;
    }

    @Override
    public Boolean call() throws Exception {
        int i;
        Chain chain = (Chain)map.get("chain");
        String tableName = map.get("tableName").toString();
        try {
            dao.insert(tableName,chain);
            i = 0;
        } catch (Exception e) {
            i = 1;
        }
        if (i == 1){
            return false;
        }
        return true;
    }
}
14 回复

Trans仅作用于当前线程!!! 另外启动的线程,是不受事务管理的

@Test
    public void test5(){
        List<NutMap> maps = new ArrayList<>();
        maps.add(new NutMap("name","12312312313"));
        maps.add(new NutMap("name","12312312314"));
        maps.add(new NutMap("name","12312312315"));
        maps.add(new NutMap("name","12312312316"));
        maps.get(0).put(".table", tableName);
        Trans.exec(new Atom() {
            public void run() {
                      dao.insert(maps);
            }
        });
    }

哈哈,我知道这样,我现在就是数据量巨大,想多线程执行,加快速度,所以用的多线程,我的例子只是个看起方便而已

@Test
    public void test5(){
        List<Map> maps = new ArrayList<>();
        Chain chain = Chain.make("name","12312312313");
        Map map = new HashMap();
        map.put("chain",chain);
        map.put("tableName","text_l");
        maps.add(map);
        for (int i = 0; i < 500000; i++) {
            maps.add(map);
        }
        List<List<Map>> lists = ListUtil.split(maps,1000);//把集合分成等分的多个集合
        int i = 0;//0成功  1  失败
        try {
            Trans.begin();//开起事务
            try {//开始做事
                // 创建一个线程池
                ThreadPoolExecutor tpl = new
                        ThreadPoolExecutor(0, maps.size(), 60L, TimeUnit.SECONDS,new SynchronousQueue<>());
                try {
                    ArrayList<Future<Boolean>> al = new ArrayList();
                    for (List<Map> sqlList1 :lists) {
                        // 创建有返回值的任务
                        al.add(tpl.submit(new TextThread(dao,sqlList1)));
                    }
                    for(Future<Boolean> future:al){
                        while (true) {
                            /**
                             * 获得future对象之前可以使用isDone()方法检测future是否完成,完成后可以调用get()方法获得future的值,
                             * 如果直接调用get()方法,get()方法将阻塞值线程结束
                             */
                            if (future.isDone()) {
                                try {
                                    i = future.get() == false ? 1:0;
                                    break;
                                } catch (InterruptedException e) {
                                    i = 1;
                                    break;
                                } catch (ExecutionException e) {
                                    i = 1;
                                    break;
                                }
                            }
                        }
                        if (i == 1) {
                            break;
                        }
                    }
                } finally {
                    // 关闭线程池
                    tpl.shutdown();
                }
                if (i == 1) {
                    throw new Exception();
                }
                // 做完了
                Trans.commit();//提交事务
            } catch (Exception ee) {
                Trans.rollback();//出错了,回滚
            } finally {
                Trans.close();//关闭事务
            }
        } catch (Exception e) {
            e.printStackTrace();
            i = 1;
        }
        System.out.println(i == 0 ? "成功" : "失败");
    }
public class TextThread implements Callable<Boolean>{


    protected static final Log log = Logs.get();

    private Ioc ioc;

    private Dao dao;

    private List<Map> maps;

    public TextThread(Dao dao,List<Map> maps) {
        this.dao = dao;
        this.maps = maps;
    }

    @Override
    public Boolean call() throws Exception {
        int i;
        Chain chain;
        String tableName;
        for (Map map :maps) {
            chain = (Chain)map.get("chain");
            tableName = map.get("tableName").toString();
            try {
                dao.insert(tableName,chain);
                i = 0;
            } catch (Exception e) {
                i = 1;
            }
            if (i == 1){
                return false;
            }
        }
        return true;
    }
}

问题是多线程没法回滚

在call方法里面可以加Trans.exec,实现该工作线程的回滚

加过,但不知道什么时候回滚,想的每个线程加个监听器,都ok了,在commit,不然都回滚,但我不会写啊,尴尬

噢,想到一个办法,NutTransDao

@wendal 有链接吗?我看看怎么使用

@wendal 好的,我先试试,谢谢大神

@wendal ok,已解决,我用的1.63版本,你写的 "首先需要强调一下,当前版本(1.r.60)的NutTxDao属于试用性质,预计下个版本达到release."
预计,1.63还是试用吗

-_- 忘记改文档了

恩,要的,那用了 6666666666666666

添加回复
请先登陆
回到顶部