最近在做业务开发的时候,线上出现了定时任务实行失败的异常,查看日志,最主要的原因是提示: connection holder is null, 经过代码排查了,是由于druid的连接池导致,所以记录下这个原因排查详细过程。
环境配置
由于之前的项目是比较老的,所以对druid的版本相对要老一点,使用的是1.1.7版本,具体maven如下:
<dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.7</version> </dependency>
示例师范
druid配置
示例采用的properties的配置方式, 具体配置如下:
url=jdbc:mysql://192.168.3.37:3306/learn username=root password=mysql filters=config maxActive=20 initialSize=1 maxWait=6000 minIdle=1 timeBetweenEvictionRunsMillis=60000 minEvictableIdleTimeMillis=30000 testWhileIdle=true testOnBorrow=true testOnReturn=true poolPreparedStatements=true maxOpenPreparedStatements=20 asyncInit=true removeAbandoned=true
以上的配置是比较常规的配置信息, 其中跟此次实践有关的,主要为removeAbandoned=true配置信息, 将在后面讲解
示例代码
package com.druid;
import com.alibaba.druid.pool.DruidDataSourceFactory;
import javax.sql.DataSource;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class DruidDataSourceTest {
public static void main(String[] args) {
// 记载配置文件
InputStream is = DruidDataSourceTest.class.getResourceAsStream("/druid.properties");
Properties properties = new Properties();
Connection connection = null;
DataSource dataSource = null;
try {
properties.load(is);
System.out.println("加载配置w完成...");
// 创建factory
dataSource = DruidDataSourceFactory.createDataSource(properties);
System.out.println("连接池创建成功");
// 获取Connection
connection = dataSource.getConnection();
connection.setAutoCommit(false);
// 模拟处理业务逻辑
System.out.println("执行业务逻辑..");
PreparedStatement statement = connection.prepareStatement("select * from user");
statement.execute();
Thread.sleep(TimeUnit.MINUTES.toMillis(6));
System.out.println("业务逻辑执行完成..");
connection.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
// ignore
}
}
}
}
这段代码使用了Thread.sleep的方式模拟了一个业务逻辑,跟遇到的处理任务处理批量任务比较类似,然后我们将任务跑起来,在任务执行完成后,将的到一下的错误信息:
java.sql.SQLException: connection holder is null
at com.alibaba.druid.pool.DruidPooledConnection.checkStateInternal(DruidPooledConnection.java:1157)
at com.alibaba.druid.pool.DruidPooledConnection.checkState(DruidPooledConnection.java:1148)
at com.alibaba.druid.pool.DruidPooledConnection.commit(DruidPooledConnection.java:743)
at com.druid.DruidDataSourceTest.main(DruidDataSourceTest.java:38)
源码分析
在以上的还原现场的情况下,和我们预期的到的很相似,因此我们通过查看源码的方式查看,为什么会的到这样的结果。
DruidDatasourceFactory
这个类看名字基本上就能够知道,作为工厂创建DruidDatasource对象,可以简单看一下代码:
public static DataSource createDataSource(Properties properties) throws Exception {
return createDataSource((Map) properties);
}
public static DataSource createDataSource(Map properties) throws Exception {
DruidDataSource dataSource = new DruidDataSource();
config(dataSource, properties);
return dataSource;
}
public static void config(DruidDataSource dataSource, Map<?, ?> properties) throws SQLException {
...
value = (String) properties.get(PROP_INIT);
if ("true".equals(value)) {
dataSource.init();
}
}
在类中最重要的就是config方法,config方法主要是从map中读取各种配置,然后加载到Datasource对象中,便于直接使用Datasource获取数据库连接信息。在config最后一段代码中,可以看出,Datasource本身包含了初始化init()方法,该方法会在Datasource正式使用的时候执行必要的初始化操作,该方法调用可以通过配置init=true控制或者再获取connection的时候执行延迟初始化。
DruidDatasource
该类是作为核心类,因为我们使用最多的还是通过Datasource.getConnection()方法,因此直接查看getConnection()方法源码:
@Override
public DruidPooledConnection getConnection() throws SQLException {
return getConnection(maxWait);
}
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
init();
if (filters.size() > 0) {
FilterChainImpl filterChain = new FilterChainImpl(this);
return filterChain.dataSource_connect(this, maxWaitMillis);
} else {
return getConnectionDirect(maxWaitMillis);
}
}
在获取Connection链接的时候,都会优先执行初始化的操作.
init()
在Datasource正式投入使用的时候,都会执行init()方法,完成初始化操作。直接看init()源码:
public void init() throws SQLException {
// 是否已经初始化完成, 如果是则直接返回
if (inited) {
return;
}
// 获取锁,保证只有线程执行初始化操作
final ReentrantLock lock = this.lock;
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
throw new SQLException("interrupt", e);
}
boolean init = false;
try {
if (inited) {
return;
}
initStackTrace = Utils.toString(Thread.currentThread().getStackTrace());
this.id = DruidDriver.createDataSourceId();
if (this.id > 1) {
long delta = (this.id - 1) * 100000;
this.connectionIdSeed.addAndGet(delta);
this.statementIdSeed.addAndGet(delta);
this.resultSetIdSeed.addAndGet(delta);
this.transactionIdSeed.addAndGet(delta);
}
if (this.jdbcUrl != null) {
this.jdbcUrl = this.jdbcUrl.trim();
initFromWrapDriverUrl();
}
// 初始化filter对象
for (Filter filter : filters) {
filter.init(this);
}
// 获取数据库类型
if (this.dbType == null || this.dbType.length() == 0) {
this.dbType = JdbcUtils.getDbType(jdbcUrl, null);
}
if (JdbcConstants.MYSQL.equals(this.dbType)
|| JdbcConstants.MARIADB.equals(this.dbType)
|| JdbcConstants.ALIYUN_ADS.equals(this.dbType)) {
boolean cacheServerConfigurationSet = false;
if (this.connectProperties.containsKey("cacheServerConfiguration")) {
cacheServerConfigurationSet = true;
} else if (this.jdbcUrl.indexOf("cacheServerConfiguration") != -1) {
cacheServerConfigurationSet = true;
}
if (cacheServerConfigurationSet) {
this.connectProperties.put("cacheServerConfiguration", "true");
}
}
..... // 省略验证代码
// 通过SPI的方式加载服务, 具体是什么,不在关注范围内
initFromSPIServiceLoader();
// 加载driver类
if (this.driver == null) {
if (this.driverClass == null || this.driverClass.isEmpty()) {
this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl);
}
if (MockDriver.class.getName().equals(driverClass)) {
driver = MockDriver.instance;
} else {
driver = JdbcUtils.createDriver(driverClassLoader, driverClass);
}
} else {
if (this.driverClass == null) {
this.driverClass = driver.getClass().getName();
}
}
initCheck();
initExceptionSorter();
initValidConnectionChecker();
validationQueryCheck();
if (isUseGlobalDataSourceStat()) {
dataSourceStat = JdbcDataSourceStat.getGlobal();
if (dataSourceStat == null) {
dataSourceStat = new JdbcDataSourceStat("Global", "Global", this.dbType);
JdbcDataSourceStat.setGlobal(dataSourceStat);
}
if (dataSourceStat.getDbType() == null) {
dataSourceStat.setDbType(this.dbType);
}
} else {
dataSourceStat = new JdbcDataSourceStat(this.name, this.jdbcUrl, this.dbType, this.connectProperties);
}
dataSourceStat.setResetStatEnable(this.resetStatEnable);
// 初始化连接池管理缓存,其实就是数组
connections = new DruidConnectionHolder[maxActive];
evictConnections = new DruidConnectionHolder[maxActive];
keepAliveConnections = new DruidConnectionHolder[maxActive];
SQLException connectError = null;
boolean asyncInit = this.asyncInit && createScheduler == null;
if (!asyncInit) {
try {
// init connections
for (int i = 0, size = getInitialSize(); i < size; ++i) {
PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
connections[poolingCount] = holder;
incrementPoolingCount();
}
if (poolingCount > 0) {
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();
}
} catch (SQLException ex) {
LOG.error("init datasource error, url: " + this.getUrl(), ex);
connectError = ex;
}
}
createAndLogThread();
createAndStartCreatorThread();
// 重点关注这里,连接池对connection链接的管理方式
createAndStartDestroyThread();
initedLatch.await();
init = true;
initedTime = new Date();
// 注册监控mbean对象
registerMbean();
if (connectError != null && poolingCount == 0) {
throw connectError;
}
// 如果是长连接,则通过线程池管理空闲链接
if (keepAlive) {
// async fill to minIdle
if (createScheduler != null) {
for (int i = 0; i < minIdle; ++i) {
createTaskCount++;
CreateConnectionTask task = new CreateConnectionTask();
this.createSchedulerFuture = createScheduler.submit(task);
}
} else {
this.emptySignal();
}
}
} catch (SQLException e) {
LOG.error("{dataSource-" + this.getID() + "} init error", e);
throw e;
} catch (InterruptedException e) {
throw new SQLException(e.getMessage(), e);
} catch (RuntimeException e){
LOG.error("{dataSource-" + this.getID() + "} init error", e);
throw e;
} catch (Error e){
LOG.error("{dataSource-" + this.getID() + "} init error", e);
throw e;
}
finally {
// 标记初始化完成
inited = true;
lock.unlock();
if (init && LOG.isInfoEnabled()) {
String msg = "{dataSource-" + this.getID();
if (this.name != null && !this.name.isEmpty()) {
msg += ",";
msg += this.name;
}
msg += "} inited";
LOG.info(msg);
}
}
}
这里主要关注下createAndStartDestroyThread()方法,
createAndStartDestroyThread
protected void createAndStartDestroyThread() {
// 创建链接销毁任务对象
destroyTask = new DestroyTask();
// 如果绑定了线程,则使用连接池执行任务
if (destroyScheduler != null) {
long period = timeBetweenEvictionRunsMillis;
if (period <= 0) {
period = 1000;
}
// 如果没有配置周期,则为1秒钟,在以上配置中,我们设置的为60秒
destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destroyTask, period, period,
TimeUnit.MILLISECONDS);
initedLatch.countDown();
return;
}
// 如果没有绑定线程池,则使用单独的线程执行,该类其实跟线程池实现类型都是一致的。都是在一定周期后执行链接移除任务
String threadName = "Druid-ConnectionPool-Destroy-" + System.identityHashCode(this);
destroyConnectionThread = new DestroyConnectionThread(threadName);
destroyConnectionThread.start();
}
通过上面分析可以得知,连接池对链接的管理是通过单据的线程周期的管理,这个执行周期如下:
- 判断是否配置
timeBetweenEvictionRunsMillis,如果配置,则该配置的时间作为周期轮询检测 - 如果没有配置,默认为
1秒一次对连接池链接管理销毁
DestroyTask
最终链接的销毁是通过该类来实现和完成的,具体代码如下:
public class DestroyTask implements Runnable {
@Override
public void run() {
shrink(true, keepAlive);
// 判断是否对连接池链接执行销毁, 依据removeAbandoned=true开启
if (isRemoveAbandoned()) {
removeAbandoned();
}
}
}
在任务中其实需要removeAbandoned=true配置开启,然后才会执行removeAbandoned()方法,该方法其实在主类Datasource之中。
removeAbandoned()
public int removeAbandoned() {
int removeCount = 0;
// 获取当前时间
long currrentNanos = System.nanoTime();
List<DruidPooledConnection> abandonedList = new ArrayList<DruidPooledConnection>();
activeConnectionLock.lock();
try {
Iterator<DruidPooledConnection> iter = activeConnections.keySet().iterator();
for (; iter.hasNext();) {
DruidPooledConnection pooledConnection = iter.next();
if (pooledConnection.isRunning()) {
continue;
}
// 判断链接第一次获取时间倒现在经历的间隔时间
long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000);
// 如果间隔时间大于removeAbandonedTimeoutMillis的时间,默认为5分钟时,将链接从当前活跃链接池中移除,并加入销毁列表
if (timeMillis >= removeAbandonedTimeoutMillis) {
iter.remove();
pooledConnection.setTraceEnable(false);
abandonedList.add(pooledConnection);
}
}
} finally {
activeConnectionLock.unlock();
}
if (abandonedList.size() > 0) {
// 遍历需要销毁的链接对象
for (DruidPooledConnection pooledConnection : abandonedList) {
final ReentrantLock lock = pooledConnection.lock;
lock.lock();
try {
// 如果链接已经失效,则停止处理
if (pooledConnection.isDisable()) {
continue;
}
} finally {
lock.unlock();
}
// 关闭链接。这里就是产生问题的根源,也就是说当超过了removeAbandonedTimeOutMillis时间的链接会被最终关闭
JdbcUtils.close(pooledConnection);
pooledConnection.abandond();
removeAbandonedCount++;
removeCount++;
if (isLogAbandoned()) {
StringBuilder buf = new StringBuilder();
buf.append("abandon connection, owner thread: ");
buf.append(pooledConnection.getOwnerThread().getName());
buf.append(", connected at : ");
buf.append(pooledConnection.getConnectedTimeMillis());
buf.append(", open stackTrace\n");
StackTraceElement[] trace = pooledConnection.getConnectStackTrace();
for (int i = 0; i < trace.length; i++) {
buf.append("\tat ");
buf.append(trace[i].toString());
buf.append("\n");
}
buf.append("ownerThread current state is " + pooledConnection.getOwnerThread().getState()
+ ", current stackTrace\n");
trace = pooledConnection.getOwnerThread().getStackTrace();
for (int i = 0; i < trace.length; i++) {
buf.append("\tat ");
buf.append(trace[i].toString());
buf.append("\n");
}
LOG.error(buf.toString());
}
}
}
return removeCount;
}
从该方法的分析,其实当链接的使用时间超过removeAbandonedTimeOutMillis时间的时候,将会从可用连接池中移除,然后被关闭。链接的关闭操作,也是比较简单的。
DruidPooledConnection
链接中的关闭操作,主要代码如下
@Override
public void close() throws SQLException {
if (this.disable) {
return;
}
// 数据库连接持有对象
DruidConnectionHolder holder = this.holder;
if (holder == null) {
if (dupCloseLogEnable) {
LOG.error("dup close");
}
return;
}
DruidAbstractDataSource dataSource = holder.getDataSource();
// 判断当前关闭线程和持有链接线程是否为同一个线程
boolean isSameThread = this.getOwnerThread() == Thread.currentThread();
if (!isSameThread) {
// 如果不是,则异步关闭
dataSource.setAsyncCloseConnectionEnable(true);
}
if (dataSource.isAsyncCloseConnectionEnable()) {
// 执行关闭逻辑, 关闭逻辑其实都差不多, 都会判断是否有filter, 然后执行recycle()方法
syncClose();
return;
}
for (ConnectionEventListener listener : holder.getConnectionEventListeners()) {
listener.connectionClosed(new ConnectionEvent(this));
}
List<Filter> filters = dataSource.getProxyFilters();
if (filters.size() > 0) {
FilterChainImpl filterChain = new FilterChainImpl(dataSource);
filterChain.dataSource_recycle(this);
} else {
recycle();
}
this.disable = true;
}
public void recycle() throws SQLException {
if (this.disable) {
return;
}
DruidConnectionHolder holder = this.holder;
if (holder == null) {
if (dupCloseLogEnable) {
LOG.error("dup close");
}
return;
}
if (!this.abandoned) {
DruidAbstractDataSource dataSource = holder.getDataSource();
dataSource.recycle(this);
}
// 将holder设置为空
this.holder = null;
conn = null;
transactionInfo = null;
// 关闭状态置为true
closed = true;
}
因此这里就讲到了为什么会有holder为空的逻辑,这里代码分析就算完成了。我们再次回到开始的错误connection holder is null的提示,这其实是因为holder已经被设置为空,然后我再去commit()事务的时候,就会出现这样的问题:
private void checkStateInternal() throws SQLException {
// 判断holder是否为空
if (holder == null) {
if (disableError != null) {
throw new SQLException("connection holder is null", disableError);
} else {
throw new SQLException("connection holder is null");
}
}
// 判断链接是否已经关闭
if (closed) {
if (disableError != null) {
throw new SQLException("connection closed", disableError);
} else {
throw new SQLException("connection closed");
}
}
// 判断链接是否已被禁用
if (disable) {
if (disableError != null) {
throw new SQLException("connection disabled", disableError);
} else {
throw new SQLException("connection disabled");
}
}
}
getConnection()
是否所有的连接都会受到连接池的管理呢?其实不是,在上面代码中,连接销毁任务都是针对activeConnections进行的,而这个链接,只有在removeAbandoned开始时才会在获取链接的时候加入到activeConnections列表中,具体代码如下:
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
int notFullTimeoutRetryCnt = 0;
for (;;) {
..... 省略链接创建流程代码
// 当removeAbandoned=true的时候,将创建好的链接加入到activeConnections中
if (removeAbandoned) {
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
poolableConnection.connectStackTrace = stackTrace;
poolableConnection.setConnectedTimeNano();
poolableConnection.traceEnable = true;
activeConnectionLock.lock();
try {
activeConnections.put(poolableConnection, PRESENT);
} finally {
activeConnectionLock.unlock();
}
}
if (!this.defaultAutoCommit) {
poolableConnection.setAutoCommit(false);
}
return poolableConnection;
}
}
总结
因此通过上面的解析,我们不难看出,因此该问题的主要原因主要有两个:
- 同一个数据库连接使用时间过长,导致连接池释放链接
removeAbandonedTimeOutMillis时间比业务处理时间短。
因此可以看出,要解决这个问题,也可以通过两个途径解决:
- 控制数据库连接的使用时间,对处理过长业务逻辑进行优化,对批量任务,每个任务单独使用数据连接
- 将
removeAbandonedTimeOutMillis时间调长。这种方案我是不太建议,因为这样的话,可能会导致其他业务无法获取到连接的情况
其他版本
其实这个提示在druid比较高的版本已经被优化了,因为对数连接池释放了数据库连接之后,连接应该是处理关闭的,所以在较高版本的druid上,会提示connection closed.