我写了一个定时任务,用的是spring + nutz(作为dao层),结果发现跑着跑着就一直出现 Datasource is closed。求助是怎么回事。
package com.tadu.schedule.task;
import com.tadu.data.UpdateUserProfile;
import com.tadu.schedule.utils.LogUtils;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.log4j.Logger;
import org.nutz.dao.Dao;
import org.nutz.dao.Sqls;
import org.nutz.dao.sql.Sql;
import org.nutz.dao.sql.SqlCallback;
import org.nutz.lang.Strings;
import org.nutz.trans.Atom;
import org.nutz.trans.Trans;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.LinkedList;
import java.util.List;
/**
* 用户账户数据相关任务
*/
@Component
public class ImportUserDataTask {
final Logger logger = org.apache.log4j.Logger.getRootLogger();
final String updateTimeTableName = "update_time_table";
final String updateUsersTableName = "update_users";
final static String EMAIL_SUFFIX = "@NotExists.com";
@Resource
Dao bbsDao;
@Resource
Dao userReadDao;
/**
* 获取上次更新时间
*
* @return
*/
private Timestamp getUpdateTime() {
Sql updateTimeSql = Sqls.create("select update_time from " + updateTimeTableName);
SqlCallback updateTimeCallback = new SqlCallback() {
public Object invoke(Connection conn, ResultSet rs, Sql sql)
throws SQLException {
if (rs.next()) {
return rs.getTimestamp("update_time");
}
return new Timestamp(0);
}
};
updateTimeSql.setCallback(updateTimeCallback);
bbsDao.execute(updateTimeSql);
return updateTimeSql.getObject(Timestamp.class);
}
private String checkEmail(String email, int id){
if(!org.apache.commons.lang.StringUtils.isBlank(email)){
return email;
}
return id + EMAIL_SUFFIX;
}
/**
* 获取BBS是否已经存在此用户
*/
private boolean userExists(int userId) {
Sql userExistsSql = Sqls.create("select 1 from pre_common_member where uid = @id");
userExistsSql.params().set("id", userId);
SqlCallback updateTimeCallback = new SqlCallback() {
public Object invoke(Connection conn, ResultSet rs, Sql sql)
throws SQLException {
return rs.next();
}
};
userExistsSql.setCallback(updateTimeCallback);
bbsDao.execute(userExistsSql);
return userExistsSql.getBoolean();
}
private void refreshUpdateTime(Timestamp updateTime) {
Sql updateTimeSql = Sqls.create("update " + updateTimeTableName + " set update_time=@updateTime");
updateTimeSql.params().set("updateTime",updateTime);
bbsDao.execute(updateTimeSql);
}
private List<UpdateUserProfile> getUpdateUsers(Timestamp updateTime) {
final List<UpdateUserProfile> users = new LinkedList<UpdateUserProfile>();
Sql querySql = Sqls.create("select * from " + updateUsersTableName + " where update_time > @updateTime");
SqlCallback callback = new SqlCallback() {
public Object invoke(Connection conn, ResultSet rs, Sql sql)
throws SQLException {
while (rs.next()) {
UpdateUserProfile user = new UpdateUserProfile();
int id = rs.getInt("id");
user.setEmail(checkEmail(rs.getString("email"), id));
user.setUserName(rs.getString("username"));
user.setPassword(rs.getString("password"));
user.setUserId(id);
user.setUpdateTime(rs.getTimestamp("update_time"));
users.add(user);
}
return null;
}
};
querySql.params().set("updateTime", updateTime);
querySql.setCallback(callback);
querySql.addBatch();
userReadDao.execute(querySql);
return users;
}
private void insertUser2BBS(UpdateUserProfile user) {
int id = user.getUserId();
String userName = user.getUserName();
String email = user.getEmail();
String password = user.getPassword();
if (Strings.isBlank(email)) {
email = id + "taduNotExists.com";
}
String[] insertSqls = {
"INSERT INTO `pre_common_member` VALUES (@id,@email,@username,'02318014883e9d0e572b9cb7a505b0e9',0,0,0,0,0,10,0,'',1487844042,0,0,'9999',0,0,0,0,0,0,0)",
"INSERT INTO `pre_common_member_count` VALUES (@id,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0)",
"INSERT INTO `pre_common_member_field_forum` VALUES (@id,0,26,'','','','','','','')",
"INSERT INTO `pre_common_member_field_home` VALUES (@id,'','','','',0,0,0,'','','','','','','','','','')",
"INSERT INTO `pre_common_member_profile` VALUES(@id,'',0,0,0,0,'','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','','')",
"INSERT INTO `pre_common_member_status` VALUES (@id,'Manual Acting','Manual Acting',0,1487844042,1487844042,0,0,0,0,0,0,0,0)",
"INSERT INTO `pre_ucenter_memberfields` VALUES (@id,'')",
"INSERT INTO `pre_ucenter_members` VALUES (@id,@username,@password,@email,'','','10.2.3.164',1487844298,0,0,'aa2bcb','')"
};
for (int i = 0, size = insertSqls.length; i < size; i++) {
Sql insertSql = Sqls.create(insertSqls[i]);
addBatch(insertSql, id, userName, email, password);
bbsDao.execute(insertSql);
}
}
private void addBatch(Sql insertSql, int id, String userName, String email, String password) {
insertSql.params().set("id", id);
insertSql.params().set("username", userName);
insertSql.params().set("password", password);
insertSql.params().set("email", email);
insertSql.addBatch();
}
private void updateUser(UpdateUserProfile user) {
int id = user.getUserId();
String userName = user.getUserName();
String email = user.getEmail();
String password = user.getPassword();
Sql updateSql = Sqls.create("update pre_common_member set username = @username , password = @password, email = @email where uid = @id");
addBatch(updateSql, id, userName, email, password);
Sql uCenterUpdateSql = Sqls.create("update pre_ucenter_members set username = @username , password = @password, email = @email where uid = @id");
addBatch(uCenterUpdateSql, id, userName, email, password);
bbsDao.execute(uCenterUpdateSql);
bbsDao.execute(updateSql);
}
/**
* 更新用户数据
*/
public void updateUserData() {
/**
* 首先获取时间戳,没有则取全部数据
*
* 获取数据后根据是否已经存在在BBS库来进行处理
* 存在则更新几个字段
* 不存在则插入
*/
try {
logger.info("start");
Timestamp updateTime = getUpdateTime();
boolean updateUpdateTime = false;
logger.info("update time:" + updateTime);
List<UpdateUserProfile> updateUsers = getUpdateUsers(updateTime);
for (final UpdateUserProfile user : updateUsers) {
if (user.getUserId() == 1 || "admin".equals(user.getUserName())) {
continue;//1为管理员账户,admin为管理员用户名,所以不更
}
try {
if (user.getUpdateTime().after(updateTime)) {//更新最新更新时间
updateUpdateTime = true;
updateTime = user.getUpdateTime();
}
Trans.exec(new Atom() {//放入一个事务
public void run() {
if (userExists(user.getUserId())) {//存在更新
updateUser(user);
} else {//不存在则插入
insertUser2BBS(user);
}
}
});
} catch (Exception e) { //记录出错的记录
Integer userId = user.getUserId();
logger.error("update user:" + userId);
logger.error("user update time:" + user.getUpdateTime());
logger.error(LogUtils.getErrorFullMessage(e, "error userId:" + userId));
}
}//end for
if (updateUpdateTime) {
updateTime.setTime(updateTime.getTime() + 1);
refreshUpdateTime(updateTime);
}
} catch (Exception e) {
e.printStackTrace();
logger.error(LogUtils.getErrorFullMessage(e, null));
} finally {
logger.info("update complete");
}
}
}
以下是报错详细信息
[2017-04-25 20:15:34] error userId:93622748
org.nutz.dao.DaoException: java.sql.SQLException: Data source is closed
at org.nutz.dao.impl.sql.run.NutDaoRunner._runWithTransaction(NutDaoRunner.java:115)
at org.nutz.dao.impl.sql.run.NutDaoRunner._run(NutDaoRunner.java:88)
at org.nutz.dao.impl.sql.run.NutDaoRunner$1.run(NutDaoRunner.java:74)
at org.nutz.trans.Trans.exec(Trans.java:174)
at org.nutz.dao.impl.sql.run.NutDaoRunner.run(NutDaoRunner.java:72)
at org.nutz.dao.impl.DaoSupport.run(DaoSupport.java:240)
at org.nutz.dao.impl.DaoSupport._exec(DaoSupport.java:248)
at org.nutz.dao.impl.DaoSupport.execute(DaoSupport.java:236)
at org.nutz.dao.impl.NutDao.execute(NutDao.java:973)
at com.tadu.schedule.task.ImportUserDataTask.userExists(ImportUserDataTask.java:88)
at com.tadu.schedule.task.ImportUserDataTask.access$100(ImportUserDataTask.java:29)
at com.tadu.schedule.task.ImportUserDataTask$4.run(ImportUserDataTask.java:215)
at org.nutz.trans.Trans.exec(Trans.java:174)
at org.nutz.trans.Trans.exec(Trans.java:132)
at com.tadu.schedule.task.ImportUserDataTask.updateUserData(ImportUserDataTask.java:213)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:65)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: Data source is closed
at org.apache.commons.dbcp.BasicDataSource.createDataSource(BasicDataSource.java:1362)
at org.apache.commons.dbcp.BasicDataSource.getConnection(BasicDataSource.java:1044)
at org.nutz.trans.NutTransaction.getConnection(NutTransaction.java:84)
at org.nutz.dao.impl.sql.run.NutDaoRunner._runWithTransaction(NutDaoRunner.java:100)
... 28 more