NutzCN Logo
问答 求助,定时任务跑一会就一直抛出 Datasource is closed异常
发布于 2798天前 作者 tomasWade 6306 次浏览 复制 上一个帖子 下一个帖子
标签:

我写了一个定时任务,用的是spring + nutz(作为dao层),结果发现跑着跑着就一直出现 Datasource is closed。求助是怎么回事。

package com.tadu.schedule.task;

import com.tadu.data.UpdateUserProfile;
import com.tadu.schedule.utils.LogUtils;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.log4j.Logger;
import org.nutz.dao.Dao;
import org.nutz.dao.Sqls;
import org.nutz.dao.sql.Sql;
import org.nutz.dao.sql.SqlCallback;
import org.nutz.lang.Strings;
import org.nutz.trans.Atom;
import org.nutz.trans.Trans;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.LinkedList;
import java.util.List;

/**
 * 用户账户数据相关任务
 */
@Component
public class ImportUserDataTask {
    final Logger logger = org.apache.log4j.Logger.getRootLogger();
    final String updateTimeTableName = "update_time_table";
    final String updateUsersTableName = "update_users";
    final static String EMAIL_SUFFIX = "@NotExists.com";

    @Resource
    Dao bbsDao;

    @Resource
    Dao userReadDao;

    /**
     * 获取上次更新时间
     *
     * @return
     */
    private Timestamp getUpdateTime() {
        Sql updateTimeSql = Sqls.create("select update_time from " + updateTimeTableName);

        SqlCallback updateTimeCallback = new SqlCallback() {
            public Object invoke(Connection conn, ResultSet rs, Sql sql)
                    throws SQLException {
                if (rs.next()) {
                    return rs.getTimestamp("update_time");
                }
                return new Timestamp(0);
            }
        };

        updateTimeSql.setCallback(updateTimeCallback);
        bbsDao.execute(updateTimeSql);

        return updateTimeSql.getObject(Timestamp.class);
    }

    private String checkEmail(String email, int id){
        if(!org.apache.commons.lang.StringUtils.isBlank(email)){
            return email;
        }

        return id + EMAIL_SUFFIX;
    }

    /**
     * 获取BBS是否已经存在此用户
     */
    private boolean userExists(int userId) {
        Sql userExistsSql = Sqls.create("select 1 from pre_common_member where uid = @id");
        userExistsSql.params().set("id", userId);

        SqlCallback updateTimeCallback = new SqlCallback() {
            public Object invoke(Connection conn, ResultSet rs, Sql sql)
                    throws SQLException {
                return rs.next();
            }
        };

        userExistsSql.setCallback(updateTimeCallback);
        bbsDao.execute(userExistsSql);

        return userExistsSql.getBoolean();
    }

    private void refreshUpdateTime(Timestamp updateTime) {
        Sql updateTimeSql = Sqls.create("update " + updateTimeTableName + " set update_time=@updateTime");
        updateTimeSql.params().set("updateTime",updateTime);
        bbsDao.execute(updateTimeSql);
    }

    private List<UpdateUserProfile> getUpdateUsers(Timestamp updateTime) {
        final List<UpdateUserProfile> users = new LinkedList<UpdateUserProfile>();

        Sql querySql = Sqls.create("select * from " + updateUsersTableName + " where update_time > @updateTime");
        SqlCallback callback = new SqlCallback() {
            public Object invoke(Connection conn, ResultSet rs, Sql sql)
                    throws SQLException {
                while (rs.next()) {
                    UpdateUserProfile user = new UpdateUserProfile();
                    int id = rs.getInt("id");
                    user.setEmail(checkEmail(rs.getString("email"), id));
                    user.setUserName(rs.getString("username"));
                    user.setPassword(rs.getString("password"));
                    user.setUserId(id);
                    user.setUpdateTime(rs.getTimestamp("update_time"));
                    users.add(user);
                }
                return null;
            }
        };

        querySql.params().set("updateTime", updateTime);
        querySql.setCallback(callback);
        querySql.addBatch();

        userReadDao.execute(querySql);

        return users;
    }

    private void insertUser2BBS(UpdateUserProfile user) {
        int id = user.getUserId();
        String userName = user.getUserName();
        String email = user.getEmail();
        String password = user.getPassword();

        if (Strings.isBlank(email)) {
            email = id + "taduNotExists.com";
        }

        String[] insertSqls = {
                "INSERT INTO `pre_common_member` VALUES (@id,@email,@username,'02318014883e9d0e572b9cb7a505b0e9',0,0,0,0,0,10,0,'',1487844042,0,0,'9999',0,0,0,0,0,0,0)",
                "INSERT INTO `pre_common_member_count` VALUES (@id,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0)",
                "INSERT INTO `pre_common_member_field_forum` VALUES (@id,0,26,'','','','','','','')",
                "INSERT INTO `pre_common_member_field_home` VALUES (@id,'','','','',0,0,0,'','','','','','','','','','')",
                "INSERT INTO `pre_common_member_profile` VALUES(@id,'',0,0,0,0,'','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','')",
                "INSERT INTO `pre_common_member_status` VALUES (@id,'Manual Acting','Manual Acting',0,1487844042,1487844042,0,0,0,0,0,0,0,0)",
                "INSERT INTO `pre_ucenter_memberfields` VALUES (@id,'')",
                "INSERT INTO `pre_ucenter_members` VALUES (@id,@username,@password,@email,'','','10.2.3.164',1487844298,0,0,'aa2bcb','')"
        };

        for (int i = 0, size = insertSqls.length; i < size; i++) {
            Sql insertSql = Sqls.create(insertSqls[i]);
            addBatch(insertSql, id, userName, email, password);

            bbsDao.execute(insertSql);
        }
    }

    private void addBatch(Sql insertSql, int id, String userName, String email, String password) {
        insertSql.params().set("id", id);
        insertSql.params().set("username", userName);
        insertSql.params().set("password", password);
        insertSql.params().set("email", email);

        insertSql.addBatch();
    }

    private void updateUser(UpdateUserProfile user) {
        int id = user.getUserId();
        String userName = user.getUserName();
        String email = user.getEmail();
        String password = user.getPassword();

        Sql updateSql = Sqls.create("update pre_common_member set username = @username , password = @password, email = @email where uid = @id");
        addBatch(updateSql, id, userName, email, password);

        Sql uCenterUpdateSql = Sqls.create("update pre_ucenter_members set username = @username , password = @password, email = @email where uid = @id");
        addBatch(uCenterUpdateSql, id, userName, email, password);

        bbsDao.execute(uCenterUpdateSql);
        bbsDao.execute(updateSql);
    }

    /**
     * 更新用户数据
     */
    public void updateUserData() {
        /**
         * 首先获取时间戳,没有则取全部数据
         *
         * 获取数据后根据是否已经存在在BBS库来进行处理
         *  存在则更新几个字段
         *  不存在则插入
         */
        try {
            logger.info("start");
            Timestamp updateTime = getUpdateTime();
            boolean updateUpdateTime = false;
            logger.info("update time:" + updateTime);

            List<UpdateUserProfile> updateUsers = getUpdateUsers(updateTime);

            for (final UpdateUserProfile user : updateUsers) {
                if (user.getUserId() == 1 || "admin".equals(user.getUserName())) {
                    continue;//1为管理员账户,admin为管理员用户名,所以不更
                }
                try {
                    if (user.getUpdateTime().after(updateTime)) {//更新最新更新时间
                        updateUpdateTime = true;
                        updateTime = user.getUpdateTime();
                    }
                    Trans.exec(new Atom() {//放入一个事务
                        public void run() {
                            if (userExists(user.getUserId())) {//存在更新
                                updateUser(user);
                            } else {//不存在则插入
                                insertUser2BBS(user);
                            }
                        }
                    });
                } catch (Exception e) { //记录出错的记录
                    Integer userId = user.getUserId();
                    logger.error("update user:" + userId);
                    logger.error("user update time:" + user.getUpdateTime());
                    logger.error(LogUtils.getErrorFullMessage(e, "error userId:" + userId));
                }

            }//end for

            if (updateUpdateTime) {
                updateTime.setTime(updateTime.getTime() + 1);
                refreshUpdateTime(updateTime);
            }
        } catch (Exception e) {
            e.printStackTrace();
            logger.error(LogUtils.getErrorFullMessage(e, null));
        } finally {
            logger.info("update complete");
        }
    }


}

以下是报错详细信息

[2017-04-25 20:15:34] error userId:93622748
org.nutz.dao.DaoException: java.sql.SQLException: Data source is closed
        at org.nutz.dao.impl.sql.run.NutDaoRunner._runWithTransaction(NutDaoRunner.java:115)
        at org.nutz.dao.impl.sql.run.NutDaoRunner._run(NutDaoRunner.java:88)
        at org.nutz.dao.impl.sql.run.NutDaoRunner$1.run(NutDaoRunner.java:74)
        at org.nutz.trans.Trans.exec(Trans.java:174)
        at org.nutz.dao.impl.sql.run.NutDaoRunner.run(NutDaoRunner.java:72)
        at org.nutz.dao.impl.DaoSupport.run(DaoSupport.java:240)
        at org.nutz.dao.impl.DaoSupport._exec(DaoSupport.java:248)
        at org.nutz.dao.impl.DaoSupport.execute(DaoSupport.java:236)
        at org.nutz.dao.impl.NutDao.execute(NutDao.java:973)
        at com.tadu.schedule.task.ImportUserDataTask.userExists(ImportUserDataTask.java:88)
        at com.tadu.schedule.task.ImportUserDataTask.access$100(ImportUserDataTask.java:29)
        at com.tadu.schedule.task.ImportUserDataTask$4.run(ImportUserDataTask.java:215)
        at org.nutz.trans.Trans.exec(Trans.java:174)
        at org.nutz.trans.Trans.exec(Trans.java:132)
        at com.tadu.schedule.task.ImportUserDataTask.updateUserData(ImportUserDataTask.java:213)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:65)
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
        at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: Data source is closed
        at org.apache.commons.dbcp.BasicDataSource.createDataSource(BasicDataSource.java:1362)
        at org.apache.commons.dbcp.BasicDataSource.getConnection(BasicDataSource.java:1044)
        at org.nutz.trans.NutTransaction.getConnection(NutTransaction.java:84)
        at org.nutz.dao.impl.sql.run.NutDaoRunner._runWithTransaction(NutDaoRunner.java:100)
        ... 28 more


12 回复

用的什么数据源

数据源的配置,只贴了一个DAO的,另外一个除了占位符名字用的不一样其他都一样

<!-- bbs DataSource -->
    <bean id="bbsDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="${bbsDatabaseUrl}"/>
        <property name="username" value="${bbsDatabaseUsername}"/>
        <property name="password" value="${bbsDatabasePassword}"/>
    </bean>

    <!-- bbs DAO -->
    <bean id="bbsDao" class="org.nutz.dao.impl.NutDao">
        <property name="dataSource" ref="bbsDataSource"/>
    </bean>

你本地可以重现吗? debug一下BasicDataSource的close方法,看看是哪里调用的

加 检查超时的那个 处理 就应该不会有问题了 你查下 这种数据源怎么配置那个属性

mysql 8 小时自动断开的问题?

1 本地条数少的时候是不会重现的,当条数多的时候应该才会出现,我正在尝试多来些数据重现
2 不是必须用这个数据源的,下面我会换个数据源试试
3 超时觉得不太可能,因为超时是说长时间不用才会断开,而这个是在跑的过程中出现的,一直在用连接

并不是超时, 是数据源被close的

有地方关闭了 数据源

这个 估计不是容易debug ,尝试换个druid 数据源看看?如果再有问题 建议检查逻辑
@tomaswade

另外 这种你可以用批量执行

bbsDao.execute(uCenterUpdateSql);
        bbsDao.execute(updateSql);
bbsDao.execute(uCenterUpdateSql,updateSql);

谢谢各位回复,我现在正在用org.springframework.jdbc.datasource.DriverManagerDataSource 试

跑到现在,也没出问题,应该是搞定了,谢谢各位

添加回复
请先登陆
回到顶部