Java事务处理全解析(一至八)
package davenkin.step1_failure;import javax.sql.DataSource;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;public class FailureBankDao { private DataSource dataSource; public FailureBankDao(DataSource dataSource) { this.dataSource = dataSource; } public void withdraw(int bankId, int amount) throws SQLException { Connection connection = dataSource.getConnection(); PreparedStatement selectStatement = connection.prepareStatement("SELECT BANK_AMOUNT FROM BANK_ACCOUNT WHERE BANK_ID = ?"); selectStatement.setInt(1, bankId); ResultSet resultSet = selectStatement.executeQuery(); resultSet.next(); int previousAmount = resultSet.getInt(1); resultSet.close(); selectStatement.close(); int newAmount = previousAmount - amount; PreparedStatement updateStatement = connection.prepareStatement("UPDATE BANK_ACCOUNT SET BANK_AMOUNT = ? WHERE BANK_ID = ?"); updateStatement.setInt(1, newAmount); updateStatement.setInt(2, bankId); updateStatement.execute(); updateStatement.close(); connection.close(); }}
?
FailureBankDao的withdraw方法,从银行账户表(BANK_ACCOUNT)中帐号为bankId的用户账户中取出数量为amount的金额。
采用同样的方法,定义保险账户的DAO类如下:
package davenkin.step1_failure;import javax.sql.DataSource;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;public class FailureInsuranceDao { private DataSource dataSource; public FailureInsuranceDao(DataSource dataSource){ this.dataSource = dataSource; } public void deposit(int insuranceId, int amount) throws SQLException { Connection connection = dataSource.getConnection(); PreparedStatement selectStatement = connection.prepareStatement("SELECT INSURANCE_AMOUNT FROM INSURANCE_ACCOUNT WHERE INSURANCE_ID = ?"); selectStatement.setInt(1, insuranceId); ResultSet resultSet = selectStatement.executeQuery(); resultSet.next(); int previousAmount = resultSet.getInt(1); resultSet.close(); selectStatement.close(); int newAmount = previousAmount + amount; PreparedStatement updateStatement = connection.prepareStatement("UPDATE INSURANCE_ACCOUNT SET INSURANCE_AMOUNT = ? WHERE INSURANCE_ID = ?"); updateStatement.setInt(1, newAmount); updateStatement.setInt(2, insuranceId); updateStatement.execute(); updateStatement.close(); connection.close(); }}
?
FailureInsuranceDao类的deposit方法向保险账户表(INSURANCE_ACCOUNT)存入amount数量的金额, 这样在BankService中,我们可以先调用FailureBankDao的withdraw方法取出一定金额的存款,再调用 FailureInsuranceDao的deposit方法将该笔存款存入保险账户表中,一切看似OK,实现BankService接口如下:
package davenkin.step1_failure;import davenkin.BankService;import javax.sql.DataSource;import java.sql.Connection;import java.sql.SQLException;public class FailureBankService implements BankService{ private FailureBankDao failureBankDao; private FailureInsuranceDao failureInsuranceDao; private DataSource dataSource; public FailureBankService(DataSource dataSource) { this.dataSource = dataSource; } public void transfer(int fromId, int toId, int amount) { Connection connection = null; try { connection = dataSource.getConnection(); connection.setAutoCommit(false); failureBankDao.withdraw(fromId, amount); failureInsuranceDao.deposit(toId, amount); connection.commit(); } catch (Exception e) { try { assert connection != null; connection.rollback(); } catch (SQLException e1) { e1.printStackTrace(); } } finally { try { assert connection != null; connection.close(); } catch (SQLException e) { e.printStackTrace(); } } } public void setFailureBankDao(FailureBankDao failureBankDao) { this.failureBankDao = failureBankDao; } public void setFailureInsuranceDao(FailureInsuranceDao failureInsuranceDao) { this.failureInsuranceDao = failureInsuranceDao; }}
?
在FailureBankService的transfer方法中,我们首先通过DataSource获得Connection,然后调用 connection.setAutoCommit(false)已开启手动提交模式,如果一切顺利,则commit,如果出现异常,则 rollback。 接下来,开始测试我们的BankService吧。
为了准备测试数据,我们定义个BankFixture类,该类负责在每次测试之前准备测试数据,分别向银行账户(1111)和保险账户(2222) 中均存入1000元。BankFixture还提供了两个helper方法(getBankAmount和getInsuranceAmount)帮助我 们从数据库中取出数据以做数据验证。我们使用HSQL数据库的in-memory模式,这样不用启动数据库server,方便测试。 BankFixture类定义如下:
package davenkin;import org.junit.Before;import javax.sql.DataSource;import java.sql.*;public class BankFixture{ protected final DataSource dataSource = DataSourceFactory.createDataSource(); @Before public void setUp() throws SQLException { Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement(); statement.execute("DROP TABLE BANK_ACCOUNT IF EXISTS"); statement.execute("DROP TABLE INSURANCE_ACCOUNT IF EXISTS"); statement.execute("CREATE TABLE BANK_ACCOUNT (\n" + "BANK_ID INT,\n" + "BANK_AMOUNT INT,\n" + "PRIMARY KEY(BANK_ID)\n" + ");"); statement.execute("CREATE TABLE INSURANCE_ACCOUNT (\n" + "INSURANCE_ID INT,\n" + "INSURANCE_AMOUNT INT,\n" + "PRIMARY KEY(INSURANCE_ID)\n" + ");"); statement.execute("INSERT INTO BANK_ACCOUNT VALUES (1111, 1000);"); statement.execute("INSERT INTO INSURANCE_ACCOUNT VALUES (2222, 1000);"); statement.close(); connection.close(); } protected int getBankAmount(int bankId) throws SQLException { Connection connection = dataSource.getConnection(); PreparedStatement selectStatement = connection.prepareStatement("SELECT BANK_AMOUNT FROM BANK_ACCOUNT WHERE BANK_ID = ?"); selectStatement.setInt(1, bankId); ResultSet resultSet = selectStatement.executeQuery(); resultSet.next(); int amount = resultSet.getInt(1); resultSet.close(); selectStatement.close(); connection.close(); return amount; } protected int getInsuranceAmount(int insuranceId) throws SQLException { Connection connection = dataSource.getConnection(); PreparedStatement selectStatement = connection.prepareStatement("SELECT INSURANCE_AMOUNT FROM INSURANCE_ACCOUNT WHERE INSURANCE_ID = ?"); selectStatement.setInt(1, insuranceId); ResultSet resultSet = selectStatement.executeQuery(); resultSet.next(); int amount = resultSet.getInt(1); resultSet.close(); selectStatement.close(); connection.close(); return amount; }}
?
编写的Junit测试继承自BankFixture类,测试代码如下:
package davenkin.step1_failure;import davenkin.BankFixture;import org.junit.Test;import java.sql.SQLException;import static junit.framework.Assert.assertEquals;public class FailureBankServiceTest extends BankFixture{ @Test public void transferSuccess() throws SQLException { FailureBankDao failureBankDao = new FailureBankDao(dataSource); FailureInsuranceDao failureInsuranceDao = new FailureInsuranceDao(dataSource); FailureBankService bankService = new FailureBankService(dataSource); bankService.setFailureBankDao(failureBankDao); bankService.setFailureInsuranceDao(failureInsuranceDao); bankService.transfer(1111, 2222, 200); assertEquals(800, getBankAmount(1111)); assertEquals(1200, getInsuranceAmount(2222)); } @Test public void transferFailure() throws SQLException { FailureBankDao failureBankDao = new FailureBankDao(dataSource); FailureInsuranceDao failureInsuranceDao = new FailureInsuranceDao(dataSource); FailureBankService bankService = new FailureBankService(dataSource); bankService.setFailureBankDao(failureBankDao); bankService.setFailureInsuranceDao(failureInsuranceDao); int toNonExistId = 3333; bankService.transfer(1111, toNonExistId, 200); assertEquals(1000, getInsuranceAmount(2222)); assertEquals(1000, getBankAmount(1111)); }}
?
运行测试,第一个测试(transferSuccess)成功,第二个测试(transferFailure)失败。
分析错误,原因在于:我们分别从FailureBankService,FailureBankDao和FailureInsuranceDao中 调用了三次dataSource.getConnection(),亦即我们创建了三个不同的Connection对象,而Java事务是作用于 Connection之上的,所以从在三个地方我们开启了三个不同的事务,而不是同一个事务。
第一个测试之所以成功,是因为在此过程中没有任何异常发生。虽然在FailureBankService中将Connection的提交模式改为了 手动提交,但是由于两个DAO使用的是各自的Connection对象,所以两个DAO中的Connection依然为默认的自动提交模式。
在第二个测试中,我们给出一个不存在的保险账户id(toNonExistId),就是为了使程序产生异常,然后在assertion语句中验证两 张表均没有任何变化,但是测试在第二个assertion语句处出错。发生异常时,银行账户中的金额已经减少,而虽然程序发生了rollback,但是调 用的是FailureBankService中Connection的rollback,而不是FailureInsuranceDao中 Connection的,对保险账户的操作根本就没有执行,所以保险账户中依然为1000,而银行账户却变为了800。
因此,为了使两个DAO在同一个事务中,我们应该在整个事务处理过程中使用一个Connection对象,在下一篇文章中,我们将讲到通过共享Connection对象的方式达到事务处理的目的。
在本系列的上一篇文 章中,我们看到了一个典型的事务处理失败的案例,其主要原因在于,service层和各个DAO所使用的Connection是不一样的,而JDBC中事 务处理的作用对象正是Connection对象,所以不同DAO中的操作不在同一个事务里面,从而导致事务失败。从中我们得出了教训:要避免这种失败,我 们可以使所有操作共享一个Connection对象,这样应该就没有问题了。
?
请通过以下方式下载本系列文章的github源代码:
git clone?https://github.com/davenkin/java_transaction_workshop.git
?
在本篇文章中,我们将看到一个成功的,但是丑陋的事务处理方案,它的基本思路是:在service层创建Connection对象,再将该Connection传给各个DAO类,这样就完成了Connection共享的目的。
?
修改两个DAO类,使他们都接受一个Connection对象,定义UglyBankDao类如下:
package davenkin.step2_ugly;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;public class UglyBankDao{ public void withdraw(int bankId, int amount, Connection connection) throws SQLException { PreparedStatement selectStatement = connection.prepareStatement("SELECT BANK_AMOUNT FROM BANK_ACCOUNT WHERE BANK_ID = ?"); selectStatement.setInt(1, bankId); ResultSet resultSet = selectStatement.executeQuery(); resultSet.next(); int previousAmount = resultSet.getInt(1); resultSet.close(); selectStatement.close(); int newAmount = previousAmount - amount; PreparedStatement updateStatement = connection.prepareStatement("UPDATE BANK_ACCOUNT SET BANK_AMOUNT = ? WHERE BANK_ID = ?"); updateStatement.setInt(1, newAmount); updateStatement.setInt(2, bankId); updateStatement.execute(); updateStatement.close(); }}
?
使用同样的方法,定义UglyInsuranceDao类:
package davenkin.step2_ugly;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;public class UglyInsuranceDao{ public void deposit(int insuranceId, int amount, Connection connection) throws SQLException { PreparedStatement selectStatement = connection.prepareStatement("SELECT INSURANCE_AMOUNT FROM INSURANCE_ACCOUNT WHERE INSURANCE_ID = ?"); selectStatement.setInt(1, insuranceId); ResultSet resultSet = selectStatement.executeQuery(); resultSet.next(); int previousAmount = resultSet.getInt(1); resultSet.close(); selectStatement.close(); int newAmount = previousAmount + amount; PreparedStatement updateStatement = connection.prepareStatement("UPDATE INSURANCE_ACCOUNT SET INSURANCE_AMOUNT = ? WHERE INSURANCE_ID = ?"); updateStatement.setInt(1, newAmount); updateStatement.setInt(2, insuranceId); updateStatement.execute(); updateStatement.close(); }}
?
然后修改Service类,在UglyBankService类的transfer方法中,首先创建一个Connection对象,然后在将该对象 依次传给UglyBankDao的withdraw方法和UglyInsuranceDao类的deposit方法,这样service层和DAO层使用 相同的Connection对象。定义UglyBankService类如下:
package davenkin.step2_ugly;import davenkin.BankService;import javax.sql.DataSource;import java.sql.Connection;import java.sql.SQLException;public class UglyBankService implements BankService{ private DataSource dataSource; private UglyBankDao uglyBankDao; private UglyInsuranceDao uglyInsuranceDao; public UglyBankService(DataSource dataSource) { this.dataSource = dataSource; } public void transfer(int fromId, int toId, int amount) { Connection connection = null; try { connection = dataSource.getConnection(); connection.setAutoCommit(false); uglyBankDao.withdraw(fromId, amount, connection); uglyInsuranceDao.deposit(toId, amount, connection); connection.commit(); } catch (Exception e) { try { assert connection != null; connection.rollback(); } catch (SQLException e1) { e1.printStackTrace(); } } finally { try { assert connection != null; connection.close(); } catch (SQLException e) { e.printStackTrace(); } } } public void setUglyBankDao(UglyBankDao uglyBankDao) { this.uglyBankDao = uglyBankDao; } public void setUglyInsuranceDao(UglyInsuranceDao uglyInsuranceDao) { this.uglyInsuranceDao = uglyInsuranceDao; }}
?
通过上面共享Connection对象的方法虽然可以完成事务处理的目的,但是这样做法是丑陋的,原因在于:为了完成事务处理的目的,我们需要将一 个底层的Connection类在service层和DAO层之间进行传递,而DAO层的方法也要接受这个Connection对象,这种做法显然是不好 的,这就是典型的API污染。
?
在下一篇博文中,我们将讲到如何在不传递Connection对象的情况下完成和本文相同的事务处理功能。
在本系列的上一篇文章中我们讲到,要实现在同一个事务中使用相同的Connection对象,我们可以通过传递Connection对象的方式达到共享的目的,但是这种做法是丑陋的。在本篇文章中,我们将引入另外一种机制(ConnectionHolder)来完成事务管理。
?
这是一个关于Java事务处理的系列文章,请通过以下方式下载github源代码:
git clone?https://github.com/davenkin/java_transaction_workshop.git
?
ConnectionHolder的工作机制是:我们将Connection对象放在一个全局公用的地方,然后在不同的操作中都从这个地方取得 Connection,从而完成Connection共享的目的,这也是一种ServiceLocator模式,有点像JNDI。定义一个 ConnectionHolder类如下:
package davenkin.step3_connection_holder;import javax.sql.DataSource;import java.sql.Connection;import java.sql.SQLException;import java.util.HashMap;import java.util.Map;public class ConnectionHolder{ private Map<DataSource, Connection> connectionMap = new HashMap<DataSource, Connection>(); public Connection getConnection(DataSource dataSource) throws SQLException { Connection connection = connectionMap.get(dataSource); if (connection == null || connection.isClosed()) { connection = dataSource.getConnection(); connectionMap.put(dataSource, connection); } return connection; } public void removeConnection(DataSource dataSource) { connectionMap.remove(dataSource); }}
?
从ConnectionHolder类中可以看出,我们维护了一个键为DataSource、值为Connection的Map,这主要用于使 ConnectionHolder可以服务多个DataSource。在调用getConnection方法时传入了一个DataSource对象,如果 Map里面已经存在该DataSource对应的Connection,则直接返回该Connection,否则,调用DataSource的 getConnection方法获得一个新的Connection,再将其加入到Map中,最后返回该Connection。这样在同一个事务过程中,我 们先后从ConnectionHolder中取得的Connection是相同的,除非在中途我们调用了ConnectionHolder的 removeConnection方法将当前Connection移除掉或者调用了Connection.close()将Connection关闭,然 后在后续的操作中再次调用ConnectionHolder的getConnection方法,此时返回的则是一个新的Connection对象,从而导 致事务处理失败,你应该不会做出这种中途移除或关闭Connection的事情。
?
然而,虽然我们不会自己手动地在中途移除或者关闭Conncetion对象(当然,在事务处理末尾我们应该关闭Conncetion),我们却无法 阻止其他线程这么做。比如,ConnectionHolder类是可以在多个线程中同时使用的,并且这些线程使用了同一个DataSource,其中一个 线程使用完Connection后便将其关闭,而此时另外一个线程正试图使用这个Connection,问题就出来了。因此,上面的 ConnectionHolder不是线程安全的。
?
为了获得线程安全的ConnectionHolder类,我们可以引入Java提供的ThreadLocal类,该类保证一个类的实例变量在各个线 程中都有一份单独的拷贝,从而不会影响其他线程中的实例变量。定义一个SingleThreadConnectionHolder类如下:
package davenkin.step3_connection_holder;import javax.sql.DataSource;import java.sql.Connection;import java.sql.SQLException;public class SingleThreadConnectionHolder{ private static ThreadLocal<ConnectionHolder> localConnectionHolder = new ThreadLocal<ConnectionHolder>(); public static Connection getConnection(DataSource dataSource) throws SQLException { return getConnectionHolder().getConnection(dataSource); } public static void removeConnection(DataSource dataSource) { getConnectionHolder().removeConnection(dataSource); } private static ConnectionHolder getConnectionHolder() { ConnectionHolder connectionHolder = localConnectionHolder.get(); if (connectionHolder == null) { connectionHolder = new ConnectionHolder(); localConnectionHolder.set(connectionHolder); } return connectionHolder; }}
?
有了一个线程安全的SingleThreadConnectionHolder类,我们便可以在service层和各个DAO中使用该类来获取Connection对象:
package davenkin.step3_connection_holder;import javax.sql.DataSource;import java.sql.Connection;import java.sql.SQLException;public class TransactionManager{ private DataSource dataSource; public TransactionManager(DataSource dataSource) { this.dataSource = dataSource; } public final void start() throws SQLException { Connection connection = getConnection(); connection.setAutoCommit(false); } public final void commit() throws SQLException { Connection connection = getConnection(); connection.commit(); } public final void rollback() { Connection connection = null; try { connection = getConnection(); connection.rollback(); } catch (SQLException e) { throw new RuntimeException("Couldn't rollback on connection[" + connection + "].", e); } } public final void close() { Connection connection = null; try { connection = getConnection(); connection.setAutoCommit(true); connection.setReadOnly(false); connection.close(); SingleThreadConnectionHolder.removeConnection(dataSource); } catch (SQLException e) { throw new RuntimeException("Couldn't close connection[" + connection + "].", e); } } private Connection getConnection() throws SQLException { return SingleThreadConnectionHolder.getConnection(dataSource); }}
?
可以看出,TransactionManager对象也维护了一个DataSource实例变量,并且也是通过 SingleThreadConnectionHolder来获取Connection对象的。然后我们在service类中使用该 TransactionManager:
package davenkin.step3_connection_holder;import davenkin.BankService;import javax.sql.DataSource;public class ConnectionHolderBankService implements BankService{ private TransactionManager transactionManager; private ConnectionHolderBankDao connectionHolderBankDao; private ConnectionHolderInsuranceDao connectionHolderInsuranceDao; public ConnectionHolderBankService(DataSource dataSource) { transactionManager = new TransactionManager(dataSource); connectionHolderBankDao = new ConnectionHolderBankDao(dataSource); connectionHolderInsuranceDao = new ConnectionHolderInsuranceDao(dataSource); } public void transfer(int fromId, int toId, int amount) { try { transactionManager.start(); connectionHolderBankDao.withdraw(fromId, amount); connectionHolderInsuranceDao.deposit(toId, amount); transactionManager.commit(); } catch (Exception e) { transactionManager.rollback(); } finally { transactionManager.close(); } }}
?
在ConnectionHolderBankService中,我们使用TransactionManager来管理事务,由于 TransactionManger和两个DAO类都是使用SingleThreadConnectionHolder来获取Connection,故他 们在整个事务处理过程中使用了相同的Connection对象,事务处理成功。我们也可以看到,在两个DAO的withdraw和deposit方法没有 接受和业务无关的对象,消除了API污染;另外,使用TransactionManager来管理事务,使Service层代码也变简洁了。
?
在下一篇文章中,我们将讲到使用Template模式来完成事务处理。
在本系列的上一篇文章中,我们讲到了使用TransactionManger和ConnectionHolder完成线程安全的事务管理,在本篇中,我们将在此基础上引入Template模式进行事务管理。
?
这是一个关于Java事务处理的系列文章,请通过以下方式下载github源代码:
git clone?https://github.com/davenkin/java_transaction_workshop.git
?
Template模式大家应该都很熟悉,比如Spring就提供了许多Template,像JdbcTemplate和JmsTemplate等。 Template模式的基本思想是:在超类里将完成核心功能的方法声明为抽象方法,留给子类去实现,而在超类中完成一些通用操作,比如JMS的 Session的建立和数据库事务的准备工作等。
?
在本篇文章中,我们使用一个Template类来帮助管理事务,定义TransactionTemplate类如下:
package davenkin.step4_transaction_template;import davenkin.step3_connection_holder.TransactionManager;import javax.sql.DataSource;public abstract class TransactionTemplate{ private TransactionManager transactionManager; protected TransactionTemplate(DataSource dataSource) { transactionManager = new TransactionManager(dataSource); } public void doJobInTransaction() { try { transactionManager.start(); doJob(); transactionManager.commit(); } catch (Exception e) { transactionManager.rollback(); } finally { transactionManager.close(); } } protected abstract void doJob() throws Exception;}
?
在TransactionTemplate类中定义一个doJobInTransaction方法,在该方法中首先使用 TransactionManager开始事务,然后调用doJob方法完成业务功能,doJob方法为抽象方法,完成业务功能的子类应该实现该方法,最 后,根据doJob方法执行是否成功决定commit事务或是rollback事务。
?
然后定义使用TransactionTemplate的TransactionTemplateBankService:
package davenkin.step4_transaction_template;import davenkin.BankService;import davenkin.step3_connection_holder.ConnectionHolderBankDao;import davenkin.step3_connection_holder.ConnectionHolderInsuranceDao;import javax.sql.DataSource;public class TransactionTemplateBankService implements BankService{ private DataSource dataSource; private ConnectionHolderBankDao connectionHolderBankDao; private ConnectionHolderInsuranceDao connectionHolderInsuranceDao; public TransactionTemplateBankService(DataSource dataSource) { this.dataSource = dataSource; connectionHolderBankDao = new ConnectionHolderBankDao(dataSource); connectionHolderInsuranceDao = new ConnectionHolderInsuranceDao(dataSource); } public void transfer(final int fromId, final int toId, final int amount) { new TransactionTemplate(dataSource) { @Override protected void doJob() throws Exception { connectionHolderBankDao.withdraw(fromId, amount); connectionHolderInsuranceDao.deposit(toId, amount); } }.doJobInTransaction(); }}
?
在TransactionTemplateBankService的transfer方法中,我们创建了一个匿名的 TtransactionTemplate类,并且实现了doJob方法,在doJob方法中调用两个DAO完成业务操作,然后调用调用 TransactionTemplateBankService的doJobInTransaction方法。
?
由于TransactionTemplate只是对上一篇文章中事务处理的一层封装,故TransactionManager和两个DAO类都保持和上一篇中的一样,此时他们都使用SingleThreadConnectionHolder获得Connection,故事务处理成功。
?
在下一篇文章中,我们会讲到使用Java的动态代理来完成事务处理,这也是Spring管理事务的典型方法。
在本系列的上一篇文 章中,我们讲到了使用Template模式进行事务管理,这固然是一种很好的方法,但是不那么完美的地方在于我们依然需要在service层中编写和事务 处理相关的代码,即我们需要在service层中声明一个TransactionTemplate。在本篇文章中,我们将使用Java提供的动态代理 (Dynamic Proxy)功能来完成事务处理,你将看到无论是在service层还是DAO层都不会有事务处理代码,即他们根本就意识不到事务处理的存在。使用动态代 理完成事务处理也是AOP的一种典型应用。
?
这是一个关于Java事务处理的系列文章,请通过以下方式下载github源代码:
git clone?https://github.com/davenkin/java_transaction_workshop.git
?
Java动态代理的基本原理为:被代理对象需要实现某个接口(这是前提),代理对象会拦截对被代理对象的方法调用,在其中可以全然抛弃被代理对象的 方法实现而完成另外的功能,也可以在被代理对象方法调用的前后增加一些额外的功能。在本篇文章中,我们将拦截service层的transfer方法,在 其调用之前加入事务准备工作,然后调用原来的transfer方法,之后根据transfer方法是否执行成功决定commit还是rollback。
?
首先定义一个TransactionEnabledProxyManager类:
package davenkin.step5_transaction_proxy;import davenkin.step3_connection_holder.TransactionManager;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;public class TransactionEnabledProxyManager{ private TransactionManager transactionManager; public TransactionEnabledProxyManager(TransactionManager transactionManager) { this.transactionManager = transactionManager; } public Object proxyFor(Object object) { return Proxy.newProxyInstance(object.getClass().getClassLoader(), object.getClass().getInterfaces(), new TransactionInvocationHandler(object, transactionManager)); }}class TransactionInvocationHandler implements InvocationHandler{ private Object proxy; private TransactionManager transactionManager; TransactionInvocationHandler(Object object, TransactionManager transactionManager) { this.proxy = object; this.transactionManager = transactionManager; } public Object invoke(Object o, Method method, Object[] objects) throws Throwable { transactionManager.start(); Object result = null; try { result = method.invoke(proxy, objects); transactionManager.commit(); } catch (Exception e) { transactionManager.rollback(); } finally { transactionManager.close(); } return result; }}
?
通过调用该类的proxyFor方法,传入需要被代理的对象(本例中为service对象),返回一个代理对象。此后,在调用代理对象的 transfer方法时,会自动调用TransactionIvocationHandler的invoke方法,在该方法中,我们首先开始事务,然后执 行:
package davenkin.step5_transaction_proxy;import davenkin.BankService;import davenkin.step3_connection_holder.ConnectionHolderBankDao;import davenkin.step3_connection_holder.ConnectionHolderInsuranceDao;import javax.sql.DataSource;public class BareBankService implements BankService{ private ConnectionHolderBankDao connectionHolderBankDao; private ConnectionHolderInsuranceDao connectionHolderInsuranceDao; public BareBankService(DataSource dataSource) { connectionHolderBankDao = new ConnectionHolderBankDao(dataSource); connectionHolderInsuranceDao = new ConnectionHolderInsuranceDao(dataSource); } public void transfer(final int fromId, final int toId, final int amount) { try { connectionHolderBankDao.withdraw(fromId, amount); connectionHolderInsuranceDao.deposit(toId, amount); } catch (Exception e) { throw new RuntimeException(); } }}
?
如何,上面的BareBankService中没有任何事务处理的影子,我们只需关注核心业务逻辑即可。
?
然后在客户代码中,我们需要先创建代理对象(这在Spring中通常是通过配置实现的):
@Test public void transferFailure() throws SQLException { TransactionEnabledProxyManager transactionEnabledProxyManager = new TransactionEnabledProxyManager(new TransactionManager(dataSource)); BankService bankService = new BareBankService(dataSource); BankService proxyBankService = (BankService) transactionEnabledProxyManager.proxyFor(bankService); int toNonExistId = 3333; proxyBankService.transfer(1111, toNonExistId, 200); assertEquals(1000, getBankAmount(1111)); assertEquals(1000, getInsuranceAmount(2222)); }
?
在上面的测试代码中,我们首先创建一个BareBankService对象,然后调用 transactionEnabledProxyManager的proxyFor方法生成对原BareBankService对象的代理对象,最后在代 理对象上调用transfer方法,测试运行成功。
?
可以看到,通过以上动态代理实现,BareBankService中的所有public方法都被代理了,即他们都被加入到事务中。这对于 service层中的所有方法都需要和数据库打交道的情况是可以的,本例即如此(有且只有一个transfer方法),然而对于service层中不需要 和数据库打交道的public方法,这样做虽然也不会出错,但是却显得多余。在下一篇文章中,我们将讲到使用Java注解(annotation)的方式来声明一个方法是否需要事务,就像Spring中的Transactional注解一样。
在本系列的上一篇文 章中,我们讲到了使用动态代理的方式完成事务处理,这种方式将service层的所有public方法都加入到事务中,这显然不是我们需要的,需要代理的 只是那些需要操作数据库的方法。在本篇中,我们将讲到如何使用Java注解(Annotation)来标记需要事务处理的方法。
?
这是一个关于Java事务处理的系列文章,请通过以下方式下载github源代码:
git clone?https://github.com/davenkin/java_transaction_workshop.git
?
首先定义Transactional注解:
package davenkin.step6_annotation;import java.lang.annotation.ElementType;import java.lang.annotation.Retention;import java.lang.annotation.RetentionPolicy;import java.lang.annotation.Target;@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)public @interface Transactional{}
?
使用注解标记事务的基本原理为:依然使用上一篇中 讲到的动态代理的方式,只是在InvocationHandler的invoke方法中,首先判断被代理的方法是否标记有Transactional注 解,如果没有则直接调用method.invoke(proxied, objects),否则,先准备事务,在调用method.invoke(proxied, objects),然后根据该方法是否执行成功调用commit或rollback。定义 TransactionEnabledAnnotationProxyManager如下:
package davenkin.step6_annotation;import davenkin.step3_connection_holder.TransactionManager;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;public class TransactionEnabledAnnotationProxyManager{ private TransactionManager transactionManager; public TransactionEnabledAnnotationProxyManager(TransactionManager transactionManager) { this.transactionManager = transactionManager; } public Object proxyFor(Object object) { return Proxy.newProxyInstance(object.getClass().getClassLoader(), object.getClass().getInterfaces(), new AnnotationTransactionInvocationHandler(object, transactionManager)); }}class AnnotationTransactionInvocationHandler implements InvocationHandler{ private Object proxied; private TransactionManager transactionManager; AnnotationTransactionInvocationHandler(Object object, TransactionManager transactionManager) { this.proxied = object; this.transactionManager = transactionManager; } public Object invoke(Object proxy, Method method, Object[] objects) throws Throwable { Method originalMethod = proxied.getClass().getMethod(method.getName(), method.getParameterTypes()); if (!originalMethod.isAnnotationPresent(Transactional.class)) { return method.invoke(proxied, objects); } transactionManager.start(); Object result = null; try { result = method.invoke(proxied, objects); transactionManager.commit(); } catch (Exception e) { transactionManager.rollback(); } finally { transactionManager.close(); } return result; }}
?
可以看到,在AnnotationTransactionInvocationHandler的invoke方法中,我们首先获得原service 的transfer方法,然后根据originalMethod.isAnnotationPresent(Transactional.class)判 断该方法是否标记有Transactional注解,如果没有,则任何额外功能都不加,直接调用原来service的transfer方法;否则,将其加 入到事务处理中。
?
在service层中,我们只需将需要加入事务处理的方法用Transactional注解标记就行了:
package davenkin.step6_annotation;import davenkin.BankService;import davenkin.step3_connection_holder.ConnectionHolderBankDao;import davenkin.step3_connection_holder.ConnectionHolderInsuranceDao;import javax.sql.DataSource;public class AnnotationBankService implements BankService{ private ConnectionHolderBankDao connectionHolderBankDao; private ConnectionHolderInsuranceDao connectionHolderInsuranceDao; public AnnotationBankService(DataSource dataSource) { connectionHolderBankDao = new ConnectionHolderBankDao(dataSource); connectionHolderInsuranceDao = new ConnectionHolderInsuranceDao(dataSource); } @Transactional public void transfer(final int fromId, final int toId, final int amount) { try { connectionHolderBankDao.withdraw(fromId, amount); connectionHolderInsuranceDao.deposit(toId, amount); } catch (Exception e) { throw new RuntimeException(); } }}
?
然后执行测试:
@Test public void transferFailure() throws SQLException { TransactionEnabledAnnotationProxyManager transactionEnabledAnnotationProxyManager = new TransactionEnabledAnnotationProxyManager(new TransactionManager(dataSource)); BankService bankService = new AnnotationBankService(dataSource); BankService proxyBankService = (BankService) transactionEnabledAnnotationProxyManager.proxyFor(bankService); int toNonExistId = 3333; proxyBankService.transfer(1111, toNonExistId, 200); assertEquals(1000, getBankAmount(1111)); assertEquals(1000, getInsuranceAmount(2222)); }
?
测试运行成功,如果将AnnotationBankService中transfer方法的Transactional注解删除,那么以上测试将抛 出RuntimeException异常,该异常为transfer方法中我们人为抛出的,也即由于此时没有事务来捕捉异常,程序便直接抛出该异常而终止 运行。在下一篇(本系列最后一篇)文章中,我们将讲到分布式事务的一个入门例子。
在本系列先前的文章中,我们主要讲解了JDBC对本地事务的处理,本篇文章将讲到一个分布式事务的例子。
?
请通过以下方式下载github源代码:
git clone https://github.com/davenkin/jta-atomikos-hibernate-activemq.git
本地事务和分布式事务的区别在于:本地事务只用于处理单一数据源事务(比如单个数据库),分布式事务可以处理多种异构的数据源,比如某个业务操作中同时包含了JDBC和JMS或者某个操作需要访问多个不同的数据库。
?
Java通过JTA完成分布式事务,JTA本身只是一种规范,不同的应用服务器都包含有自己的实现(比如JbossJTA),同时还存在独立于应用服务器的单独JTA实现,比如本篇中要讲到的Atomikos。对于JTA的原理,这里不细讲,读者可以通过这篇文章了解相关知识。
?
在本篇文章中,我们将实现以下一个应用场景:你在网上购物,下了订单之后,订单数据将保存在系统的数据库中,同时为了安排物流,订单信息将以消息(Message)的方式发送到物流部门以便送货。
?
以上操作同时设计到数据库操作和JMS消息发送,为了使整个操作成为一个原子操作,我们只能选择分布式事务。我们首先设计一个service层,定义OrderService接口:
@XmlRootElement(name = "Order")@XmlAccessorType(XmlAccessType.FIELD)public class Order { @XmlElement(name = "Id",required = true) private long id; @XmlElement(name = "ItemName",required = true) private String itemName; @XmlElement(name = "Price",required = true) private double price; @XmlElement(name = "BuyerName",required = true) private String buyerName; @XmlElement(name = "MailAddress",required = true) private String mailAddress; public Order() { }
?
为了采用JAXB对Order对象进行Marshal和Unmarshal,我们在Order类中加入了JAXB相关的Annotation。 我们将使用Hibernate来完成数据持久化,然后使用Spring提供的JmsTemplate将Order转成xml后以TextMessage的 形式发送到物流部门的ORDER.QUEUE中。
?
(一)准备数据库
为了方便,我们将采用Spring提供的embedded数据库,默认情况下Spring采用HSQL作为后台数据库,虽然在本例中我们将采用HSQL的非XA的DataSource,但是通过Atomikos包装之后依然可以参与分布式事务。
SQL脚本包含在createDB.sql文件中:
CREATE TABLE USER_ORDER(ID INT NOT NULL,ITEM_NAME VARCHAR (100) NOT NULL UNIQUE,PRICE DOUBLE NOT NULL,BUYER_NAME CHAR (32) NOT NULL,MAIL_ADDRESS VARCHAR(500) NOT NULL,PRIMARY KEY(ID));
?
在Spring中配置DataSource如下:
@BeforeClass public static void startEmbeddedActiveMq() throws Exception { broker = new BrokerService(); broker.addConnector("tcp://localhost:61616"); broker.start(); } @AfterClass public static void stopEmbeddedActiveMq() throws Exception { broker.stop(); }
?
(三)实现OrderService
创建一个DefaultOrderService,该类实现了OrderService接口,并维护一个JmsTemplate和一个Hibernate的SessionFactory实例变量,分别用于Message的发送和数据库处理。
package davenkin;import org.hibernate.SessionFactory;import org.hibernate.classic.Session;import org.springframework.beans.factory.annotation.Required;import org.springframework.jms.core.JmsTemplate;import org.springframework.transaction.annotation.Transactional;public class DefaultOrderService implements OrderService{ private JmsTemplate jmsTemplate; private SessionFactory sessionFactory; @Override @Transactional public void makeOrder(Order order) { Session session = sessionFactory.getCurrentSession(); session.save(order); jmsTemplate.convertAndSend(order); } @Required public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } @Required public void setSessionFactory(SessionFactory sessionFactory) { this.sessionFactory = sessionFactory; }}
?
(四)创建Order的Mapping配置文件
<?xml version="1.0"?><!DOCTYPE hibernate-mapping PUBLIC "-//Hibernate/Hibernate Mapping DTD 3.0//EN" "http://hibernate.sourceforge.net/hibernate-mapping-3.0.dtd"><hibernate-mapping> <class name="davenkin.Order" table="USER_ORDER"> <id name="id" type="long"> <column name="ID" /> <generator class="increment" /> </id> <property name="itemName" type="string"> <column name="ITEM_NAME" /> </property> <property name="price" type="double"> <column name="PRICE"/> </property> <property name="buyerName" type="string"> <column name="BUYER_NAME"/> </property> <property name="mailAddress" type="string"> <column name="MAIL_ADDRESS"/> </property> </class></hibernate-mapping>
?
(五)配置Atomikos事务
在Spring的IoC容器中,我们需要配置由Atomikos提供的UserTransaction和TransactionManager,然后再配置Spring的JtaTransactionManager:
<bean id="userTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp" init-method="init" destroy-method="shutdownForce"> <constructor-arg> <props> <prop key="com.atomikos.icatch.service">com.atomikos.icatch.standalone.UserTransactionServiceFactory</prop> </props> </constructor-arg> </bean> <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close" depends-on="userTransactionService"> <property name="forceShutdown" value="false" /> </bean> <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp" depends-on="userTransactionService"> <property name="transactionTimeout" value="300" /> </bean> <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager" depends-on="userTransactionService"> <property name="transactionManager" ref="atomikosTransactionManager" /> <property name="userTransaction" ref="atomikosUserTransaction" /> </bean> <tx:annotation-driven transaction-manager="jtaTransactionManager" />
?
(六)配置JMS
对于JMS,为了能使ActiveMQ加入到分布式事务中,我们需要配置ActiveMQXAConnectionFactory,而不是 ActiveMQConnectionFactory,然后再配置JmsTemplate,此外还需要配置MessageConvertor在Order 对象和XML之间互转。
<bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="amqConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init"> <property name="uniqueResourceName" value="XAactiveMQ" /> <property name="xaConnectionFactory" ref="jmsXaConnectionFactory" /> <property name="poolSize" value="5"/> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="amqConnectionFactory"/> <property name="receiveTimeout" value="2000" /> <property name="defaultDestination" ref="orderQueue"/> <property name="sessionTransacted" value="true" /> <property name="messageConverter" ref="oxmMessageConverter"/> </bean> <bean id="orderQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="ORDER.QUEUE"/> </bean> <bean id="oxmMessageConverter" class="org.springframework.jms.support.converter.MarshallingMessageConverter"> <property name="marshaller" ref="marshaller"/> <property name="unmarshaller" ref="marshaller"/> </bean> <oxm:jaxb2-marshaller id="marshaller"> <oxm:class-to-be-bound name="davenkin.Order"/> </oxm:jaxb2-marshaller>
?
(七)测试
在测试中,我们首先通过(二)中的方法启动ActiveMQ,再调用DefaultOrderService,最后对数据库和QUEUE进行验证:
@Test public void makeOrder(){ orderService.makeOrder(createOrder()); JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource); assertEquals(1, jdbcTemplate.queryForInt("SELECT COUNT(*) FROM USER_ORDER")); String dbItemName = jdbcTemplate.queryForObject("SELECT ITEM_NAME FROM USER_ORDER", String.class); String messageItemName = ((Order) jmsTemplate.receiveAndConvert()).getItemName(); assertEquals(dbItemName, messageItemName); } @Test(expected = IllegalArgumentException.class) public void failToMakeOrder() { orderService.makeOrder(null); JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource); assertEquals(0, jdbcTemplate.queryForInt("SELECT COUNT(*) FROM USER_ORDER")); assertNull(jmsTemplate.receiveAndConvert()); }
?
?
?
?