首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

Java事务处理全解析(1至八)

2013-07-01 
Java事务处理全解析(一至八)package davenkin.step1_failureimport javax.sql.DataSourceimport java.sq

Java事务处理全解析(一至八)

package davenkin.step1_failure;import javax.sql.DataSource;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;public class FailureBankDao {    private DataSource dataSource;    public FailureBankDao(DataSource dataSource) {        this.dataSource = dataSource;    }    public void withdraw(int bankId, int amount) throws SQLException {        Connection connection = dataSource.getConnection();        PreparedStatement selectStatement = connection.prepareStatement("SELECT BANK_AMOUNT FROM BANK_ACCOUNT WHERE BANK_ID = ?");        selectStatement.setInt(1, bankId);        ResultSet resultSet = selectStatement.executeQuery();        resultSet.next();        int previousAmount = resultSet.getInt(1);        resultSet.close();        selectStatement.close();        int newAmount = previousAmount - amount;        PreparedStatement updateStatement = connection.prepareStatement("UPDATE BANK_ACCOUNT SET BANK_AMOUNT = ? WHERE BANK_ID = ?");        updateStatement.setInt(1, newAmount);        updateStatement.setInt(2, bankId);        updateStatement.execute();        updateStatement.close();        connection.close();    }}

?

FailureBankDao的withdraw方法,从银行账户表(BANK_ACCOUNT)中帐号为bankId的用户账户中取出数量为amount的金额。

采用同样的方法,定义保险账户的DAO类如下:

package davenkin.step1_failure;import javax.sql.DataSource;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;public class FailureInsuranceDao {    private DataSource dataSource;    public FailureInsuranceDao(DataSource dataSource){        this.dataSource = dataSource;    }    public void deposit(int insuranceId, int amount) throws SQLException {        Connection connection = dataSource.getConnection();        PreparedStatement selectStatement = connection.prepareStatement("SELECT INSURANCE_AMOUNT FROM INSURANCE_ACCOUNT WHERE INSURANCE_ID = ?");        selectStatement.setInt(1, insuranceId);        ResultSet resultSet = selectStatement.executeQuery();        resultSet.next();        int previousAmount = resultSet.getInt(1);        resultSet.close();        selectStatement.close();        int newAmount = previousAmount + amount;        PreparedStatement updateStatement = connection.prepareStatement("UPDATE INSURANCE_ACCOUNT SET INSURANCE_AMOUNT = ? WHERE INSURANCE_ID = ?");        updateStatement.setInt(1, newAmount);        updateStatement.setInt(2, insuranceId);        updateStatement.execute();        updateStatement.close();        connection.close();    }}

?

FailureInsuranceDao类的deposit方法向保险账户表(INSURANCE_ACCOUNT)存入amount数量的金额, 这样在BankService中,我们可以先调用FailureBankDao的withdraw方法取出一定金额的存款,再调用 FailureInsuranceDao的deposit方法将该笔存款存入保险账户表中,一切看似OK,实现BankService接口如下:

package davenkin.step1_failure;import davenkin.BankService;import javax.sql.DataSource;import java.sql.Connection;import java.sql.SQLException;public class FailureBankService implements BankService{    private FailureBankDao failureBankDao;    private FailureInsuranceDao failureInsuranceDao;    private DataSource dataSource;    public FailureBankService(DataSource dataSource) {        this.dataSource = dataSource;    }    public void transfer(int fromId, int toId, int amount) {        Connection connection = null;        try {            connection = dataSource.getConnection();            connection.setAutoCommit(false);            failureBankDao.withdraw(fromId, amount);            failureInsuranceDao.deposit(toId, amount);            connection.commit();        } catch (Exception e) {            try {                assert connection != null;                connection.rollback();            } catch (SQLException e1) {                e1.printStackTrace();            }        } finally {            try            {                assert connection != null;                connection.close();            } catch (SQLException e)            {                e.printStackTrace();            }        }    }    public void setFailureBankDao(FailureBankDao failureBankDao) {        this.failureBankDao = failureBankDao;    }    public void setFailureInsuranceDao(FailureInsuranceDao failureInsuranceDao) {        this.failureInsuranceDao = failureInsuranceDao;    }}

?

在FailureBankService的transfer方法中,我们首先通过DataSource获得Connection,然后调用 connection.setAutoCommit(false)已开启手动提交模式,如果一切顺利,则commit,如果出现异常,则 rollback。 接下来,开始测试我们的BankService吧。

为了准备测试数据,我们定义个BankFixture类,该类负责在每次测试之前准备测试数据,分别向银行账户(1111)和保险账户(2222) 中均存入1000元。BankFixture还提供了两个helper方法(getBankAmount和getInsuranceAmount)帮助我 们从数据库中取出数据以做数据验证。我们使用HSQL数据库的in-memory模式,这样不用启动数据库server,方便测试。 BankFixture类定义如下:

package davenkin;import org.junit.Before;import javax.sql.DataSource;import java.sql.*;public class BankFixture{    protected final DataSource dataSource = DataSourceFactory.createDataSource();    @Before    public void setUp() throws SQLException    {        Connection connection = dataSource.getConnection();        Statement statement = connection.createStatement();        statement.execute("DROP TABLE BANK_ACCOUNT IF EXISTS");        statement.execute("DROP TABLE INSURANCE_ACCOUNT IF EXISTS");        statement.execute("CREATE TABLE BANK_ACCOUNT (\n" +                "BANK_ID INT,\n" +                "BANK_AMOUNT INT,\n" +                "PRIMARY KEY(BANK_ID)\n" +                ");");        statement.execute("CREATE TABLE INSURANCE_ACCOUNT (\n" +                "INSURANCE_ID INT,\n" +                "INSURANCE_AMOUNT INT,\n" +                "PRIMARY KEY(INSURANCE_ID)\n" +                ");");        statement.execute("INSERT INTO BANK_ACCOUNT VALUES (1111, 1000);");        statement.execute("INSERT INTO INSURANCE_ACCOUNT VALUES (2222, 1000);");        statement.close();        connection.close();    }    protected int getBankAmount(int bankId) throws SQLException    {        Connection connection = dataSource.getConnection();        PreparedStatement selectStatement = connection.prepareStatement("SELECT BANK_AMOUNT FROM BANK_ACCOUNT WHERE BANK_ID = ?");        selectStatement.setInt(1, bankId);        ResultSet resultSet = selectStatement.executeQuery();        resultSet.next();        int amount = resultSet.getInt(1);        resultSet.close();        selectStatement.close();        connection.close();        return amount;    }    protected int getInsuranceAmount(int insuranceId) throws SQLException    {        Connection connection = dataSource.getConnection();        PreparedStatement selectStatement = connection.prepareStatement("SELECT INSURANCE_AMOUNT FROM INSURANCE_ACCOUNT WHERE INSURANCE_ID = ?");        selectStatement.setInt(1, insuranceId);        ResultSet resultSet = selectStatement.executeQuery();        resultSet.next();        int amount = resultSet.getInt(1);        resultSet.close();        selectStatement.close();        connection.close();        return amount;    }}

?

编写的Junit测试继承自BankFixture类,测试代码如下:

package davenkin.step1_failure;import davenkin.BankFixture;import org.junit.Test;import java.sql.SQLException;import static junit.framework.Assert.assertEquals;public class FailureBankServiceTest extends BankFixture{    @Test    public void transferSuccess() throws SQLException    {        FailureBankDao failureBankDao = new FailureBankDao(dataSource);        FailureInsuranceDao failureInsuranceDao = new FailureInsuranceDao(dataSource);        FailureBankService bankService = new FailureBankService(dataSource);        bankService.setFailureBankDao(failureBankDao);        bankService.setFailureInsuranceDao(failureInsuranceDao);        bankService.transfer(1111, 2222, 200);        assertEquals(800, getBankAmount(1111));        assertEquals(1200, getInsuranceAmount(2222));    }    @Test    public void transferFailure() throws SQLException    {        FailureBankDao failureBankDao = new FailureBankDao(dataSource);        FailureInsuranceDao failureInsuranceDao = new FailureInsuranceDao(dataSource);        FailureBankService bankService = new FailureBankService(dataSource);        bankService.setFailureBankDao(failureBankDao);        bankService.setFailureInsuranceDao(failureInsuranceDao);        int toNonExistId = 3333;        bankService.transfer(1111, toNonExistId, 200);        assertEquals(1000, getInsuranceAmount(2222));        assertEquals(1000, getBankAmount(1111));    }}

?

运行测试,第一个测试(transferSuccess)成功,第二个测试(transferFailure)失败。

分析错误,原因在于:我们分别从FailureBankService,FailureBankDao和FailureInsuranceDao中 调用了三次dataSource.getConnection(),亦即我们创建了三个不同的Connection对象,而Java事务是作用于 Connection之上的,所以从在三个地方我们开启了三个不同的事务,而不是同一个事务。

第一个测试之所以成功,是因为在此过程中没有任何异常发生。虽然在FailureBankService中将Connection的提交模式改为了 手动提交,但是由于两个DAO使用的是各自的Connection对象,所以两个DAO中的Connection依然为默认的自动提交模式。

在第二个测试中,我们给出一个不存在的保险账户id(toNonExistId),就是为了使程序产生异常,然后在assertion语句中验证两 张表均没有任何变化,但是测试在第二个assertion语句处出错。发生异常时,银行账户中的金额已经减少,而虽然程序发生了rollback,但是调 用的是FailureBankService中Connection的rollback,而不是FailureInsuranceDao中 Connection的,对保险账户的操作根本就没有执行,所以保险账户中依然为1000,而银行账户却变为了800。

因此,为了使两个DAO在同一个事务中,我们应该在整个事务处理过程中使用一个Connection对象,在下一篇文章中,我们将讲到通过共享Connection对象的方式达到事务处理的目的。

在本系列的上一篇文 章中,我们看到了一个典型的事务处理失败的案例,其主要原因在于,service层和各个DAO所使用的Connection是不一样的,而JDBC中事 务处理的作用对象正是Connection对象,所以不同DAO中的操作不在同一个事务里面,从而导致事务失败。从中我们得出了教训:要避免这种失败,我 们可以使所有操作共享一个Connection对象,这样应该就没有问题了。

?

请通过以下方式下载本系列文章的github源代码:

git clone?https://github.com/davenkin/java_transaction_workshop.git

?

在本篇文章中,我们将看到一个成功的,但是丑陋的事务处理方案,它的基本思路是:在service层创建Connection对象,再将该Connection传给各个DAO类,这样就完成了Connection共享的目的。

?

修改两个DAO类,使他们都接受一个Connection对象,定义UglyBankDao类如下:

package davenkin.step2_ugly;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;public class UglyBankDao{    public void withdraw(int bankId, int amount, Connection connection) throws SQLException    {        PreparedStatement selectStatement = connection.prepareStatement("SELECT BANK_AMOUNT FROM BANK_ACCOUNT WHERE BANK_ID = ?");        selectStatement.setInt(1, bankId);        ResultSet resultSet = selectStatement.executeQuery();        resultSet.next();        int previousAmount = resultSet.getInt(1);        resultSet.close();        selectStatement.close();        int newAmount = previousAmount - amount;        PreparedStatement updateStatement = connection.prepareStatement("UPDATE BANK_ACCOUNT SET BANK_AMOUNT = ? WHERE BANK_ID = ?");        updateStatement.setInt(1, newAmount);        updateStatement.setInt(2, bankId);        updateStatement.execute();        updateStatement.close();    }}

?

使用同样的方法,定义UglyInsuranceDao类:

package davenkin.step2_ugly;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;public class UglyInsuranceDao{    public void deposit(int insuranceId, int amount, Connection connection) throws SQLException    {        PreparedStatement selectStatement = connection.prepareStatement("SELECT INSURANCE_AMOUNT FROM INSURANCE_ACCOUNT WHERE INSURANCE_ID = ?");        selectStatement.setInt(1, insuranceId);        ResultSet resultSet = selectStatement.executeQuery();        resultSet.next();        int previousAmount = resultSet.getInt(1);        resultSet.close();        selectStatement.close();        int newAmount = previousAmount + amount;        PreparedStatement updateStatement = connection.prepareStatement("UPDATE INSURANCE_ACCOUNT SET INSURANCE_AMOUNT = ? WHERE INSURANCE_ID = ?");        updateStatement.setInt(1, newAmount);        updateStatement.setInt(2, insuranceId);        updateStatement.execute();        updateStatement.close();    }}

?

然后修改Service类,在UglyBankService类的transfer方法中,首先创建一个Connection对象,然后在将该对象 依次传给UglyBankDao的withdraw方法和UglyInsuranceDao类的deposit方法,这样service层和DAO层使用 相同的Connection对象。定义UglyBankService类如下:

package davenkin.step2_ugly;import davenkin.BankService;import javax.sql.DataSource;import java.sql.Connection;import java.sql.SQLException;public class UglyBankService implements BankService{    private DataSource dataSource;    private UglyBankDao uglyBankDao;    private UglyInsuranceDao uglyInsuranceDao;    public UglyBankService(DataSource dataSource)    {        this.dataSource = dataSource;    }    public void transfer(int fromId, int toId, int amount)    {        Connection connection = null;        try        {            connection = dataSource.getConnection();            connection.setAutoCommit(false);            uglyBankDao.withdraw(fromId, amount, connection);            uglyInsuranceDao.deposit(toId, amount, connection);            connection.commit();        } catch (Exception e)        {            try            {                assert connection != null;                connection.rollback();            } catch (SQLException e1)            {                e1.printStackTrace();            }        } finally        {            try            {                assert connection != null;                connection.close();            } catch (SQLException e)            {                e.printStackTrace();            }        }    }    public void setUglyBankDao(UglyBankDao uglyBankDao)    {        this.uglyBankDao = uglyBankDao;    }    public void setUglyInsuranceDao(UglyInsuranceDao uglyInsuranceDao)    {        this.uglyInsuranceDao = uglyInsuranceDao;    }}

?

通过上面共享Connection对象的方法虽然可以完成事务处理的目的,但是这样做法是丑陋的,原因在于:为了完成事务处理的目的,我们需要将一 个底层的Connection类在service层和DAO层之间进行传递,而DAO层的方法也要接受这个Connection对象,这种做法显然是不好 的,这就是典型的API污染。

?

在下一篇博文中,我们将讲到如何在不传递Connection对象的情况下完成和本文相同的事务处理功能。

在本系列的上一篇文章中我们讲到,要实现在同一个事务中使用相同的Connection对象,我们可以通过传递Connection对象的方式达到共享的目的,但是这种做法是丑陋的。在本篇文章中,我们将引入另外一种机制(ConnectionHolder)来完成事务管理。

?

这是一个关于Java事务处理的系列文章,请通过以下方式下载github源代码:

git clone?https://github.com/davenkin/java_transaction_workshop.git

?

ConnectionHolder的工作机制是:我们将Connection对象放在一个全局公用的地方,然后在不同的操作中都从这个地方取得 Connection,从而完成Connection共享的目的,这也是一种ServiceLocator模式,有点像JNDI。定义一个 ConnectionHolder类如下:

package davenkin.step3_connection_holder;import javax.sql.DataSource;import java.sql.Connection;import java.sql.SQLException;import java.util.HashMap;import java.util.Map;public class ConnectionHolder{    private Map<DataSource, Connection> connectionMap = new HashMap<DataSource, Connection>();    public Connection getConnection(DataSource dataSource) throws SQLException    {        Connection connection = connectionMap.get(dataSource);        if (connection == null || connection.isClosed())        {            connection = dataSource.getConnection();            connectionMap.put(dataSource, connection);        }        return connection;    }    public void removeConnection(DataSource dataSource)    {        connectionMap.remove(dataSource);    }}

?

从ConnectionHolder类中可以看出,我们维护了一个键为DataSource、值为Connection的Map,这主要用于使 ConnectionHolder可以服务多个DataSource。在调用getConnection方法时传入了一个DataSource对象,如果 Map里面已经存在该DataSource对应的Connection,则直接返回该Connection,否则,调用DataSource的 getConnection方法获得一个新的Connection,再将其加入到Map中,最后返回该Connection。这样在同一个事务过程中,我 们先后从ConnectionHolder中取得的Connection是相同的,除非在中途我们调用了ConnectionHolder的 removeConnection方法将当前Connection移除掉或者调用了Connection.close()将Connection关闭,然 后在后续的操作中再次调用ConnectionHolder的getConnection方法,此时返回的则是一个新的Connection对象,从而导 致事务处理失败,你应该不会做出这种中途移除或关闭Connection的事情。

?

然而,虽然我们不会自己手动地在中途移除或者关闭Conncetion对象(当然,在事务处理末尾我们应该关闭Conncetion),我们却无法 阻止其他线程这么做。比如,ConnectionHolder类是可以在多个线程中同时使用的,并且这些线程使用了同一个DataSource,其中一个 线程使用完Connection后便将其关闭,而此时另外一个线程正试图使用这个Connection,问题就出来了。因此,上面的 ConnectionHolder不是线程安全的。

?

为了获得线程安全的ConnectionHolder类,我们可以引入Java提供的ThreadLocal类,该类保证一个类的实例变量在各个线 程中都有一份单独的拷贝,从而不会影响其他线程中的实例变量。定义一个SingleThreadConnectionHolder类如下:

package davenkin.step3_connection_holder;import javax.sql.DataSource;import java.sql.Connection;import java.sql.SQLException;public class SingleThreadConnectionHolder{    private static ThreadLocal<ConnectionHolder> localConnectionHolder = new ThreadLocal<ConnectionHolder>();    public static Connection getConnection(DataSource dataSource) throws SQLException    {        return getConnectionHolder().getConnection(dataSource);    }    public static void removeConnection(DataSource dataSource)    {        getConnectionHolder().removeConnection(dataSource);    }    private static ConnectionHolder getConnectionHolder()    {        ConnectionHolder connectionHolder = localConnectionHolder.get();        if (connectionHolder == null)        {            connectionHolder = new ConnectionHolder();            localConnectionHolder.set(connectionHolder);        }        return connectionHolder;    }}

?

有了一个线程安全的SingleThreadConnectionHolder类,我们便可以在service层和各个DAO中使用该类来获取Connection对象:

package davenkin.step3_connection_holder;import javax.sql.DataSource;import java.sql.Connection;import java.sql.SQLException;public class TransactionManager{    private DataSource dataSource;    public TransactionManager(DataSource dataSource)    {        this.dataSource = dataSource;    }    public final void start() throws SQLException    {        Connection connection = getConnection();        connection.setAutoCommit(false);    }    public final void commit() throws SQLException    {        Connection connection = getConnection();        connection.commit();    }    public final void rollback()    {        Connection connection = null;        try        {            connection = getConnection();            connection.rollback();        } catch (SQLException e)        {            throw new RuntimeException("Couldn't rollback on connection[" + connection + "].", e);        }    }    public final void close()    {        Connection connection = null;        try        {            connection = getConnection();            connection.setAutoCommit(true);            connection.setReadOnly(false);            connection.close();            SingleThreadConnectionHolder.removeConnection(dataSource);        } catch (SQLException e)        {            throw new RuntimeException("Couldn't close connection[" + connection + "].", e);        }    }    private Connection getConnection() throws SQLException    {        return SingleThreadConnectionHolder.getConnection(dataSource);    }}

?

可以看出,TransactionManager对象也维护了一个DataSource实例变量,并且也是通过 SingleThreadConnectionHolder来获取Connection对象的。然后我们在service类中使用该 TransactionManager:

package davenkin.step3_connection_holder;import davenkin.BankService;import javax.sql.DataSource;public class ConnectionHolderBankService implements BankService{    private TransactionManager transactionManager;    private ConnectionHolderBankDao connectionHolderBankDao;    private ConnectionHolderInsuranceDao connectionHolderInsuranceDao;    public ConnectionHolderBankService(DataSource dataSource)    {        transactionManager = new TransactionManager(dataSource);        connectionHolderBankDao = new ConnectionHolderBankDao(dataSource);        connectionHolderInsuranceDao = new ConnectionHolderInsuranceDao(dataSource);    }    public void transfer(int fromId, int toId, int amount)    {        try        {            transactionManager.start();            connectionHolderBankDao.withdraw(fromId, amount);            connectionHolderInsuranceDao.deposit(toId, amount);            transactionManager.commit();        } catch (Exception e)        {            transactionManager.rollback();        } finally        {            transactionManager.close();        }    }}

?

在ConnectionHolderBankService中,我们使用TransactionManager来管理事务,由于 TransactionManger和两个DAO类都是使用SingleThreadConnectionHolder来获取Connection,故他 们在整个事务处理过程中使用了相同的Connection对象,事务处理成功。我们也可以看到,在两个DAO的withdraw和deposit方法没有 接受和业务无关的对象,消除了API污染;另外,使用TransactionManager来管理事务,使Service层代码也变简洁了。

?

在下一篇文章中,我们将讲到使用Template模式来完成事务处理。

在本系列的上一篇文章中,我们讲到了使用TransactionManger和ConnectionHolder完成线程安全的事务管理,在本篇中,我们将在此基础上引入Template模式进行事务管理。

?

这是一个关于Java事务处理的系列文章,请通过以下方式下载github源代码:

git clone?https://github.com/davenkin/java_transaction_workshop.git

?

Template模式大家应该都很熟悉,比如Spring就提供了许多Template,像JdbcTemplate和JmsTemplate等。 Template模式的基本思想是:在超类里将完成核心功能的方法声明为抽象方法,留给子类去实现,而在超类中完成一些通用操作,比如JMS的 Session的建立和数据库事务的准备工作等。

?

在本篇文章中,我们使用一个Template类来帮助管理事务,定义TransactionTemplate类如下:

package davenkin.step4_transaction_template;import davenkin.step3_connection_holder.TransactionManager;import javax.sql.DataSource;public abstract class TransactionTemplate{    private TransactionManager transactionManager;    protected TransactionTemplate(DataSource dataSource)    {        transactionManager = new TransactionManager(dataSource);    }    public void doJobInTransaction()    {        try        {            transactionManager.start();            doJob();            transactionManager.commit();        } catch (Exception e)        {            transactionManager.rollback();        } finally        {            transactionManager.close();        }    }    protected abstract void doJob() throws Exception;}

?

在TransactionTemplate类中定义一个doJobInTransaction方法,在该方法中首先使用 TransactionManager开始事务,然后调用doJob方法完成业务功能,doJob方法为抽象方法,完成业务功能的子类应该实现该方法,最 后,根据doJob方法执行是否成功决定commit事务或是rollback事务。

?

然后定义使用TransactionTemplate的TransactionTemplateBankService:

package davenkin.step4_transaction_template;import davenkin.BankService;import davenkin.step3_connection_holder.ConnectionHolderBankDao;import davenkin.step3_connection_holder.ConnectionHolderInsuranceDao;import javax.sql.DataSource;public class TransactionTemplateBankService implements BankService{    private DataSource dataSource;    private ConnectionHolderBankDao connectionHolderBankDao;    private ConnectionHolderInsuranceDao connectionHolderInsuranceDao;    public TransactionTemplateBankService(DataSource dataSource)    {        this.dataSource = dataSource;        connectionHolderBankDao = new ConnectionHolderBankDao(dataSource);        connectionHolderInsuranceDao = new ConnectionHolderInsuranceDao(dataSource);    }    public void transfer(final int fromId, final int toId, final int amount)    {        new TransactionTemplate(dataSource)        {            @Override            protected void doJob() throws Exception            {                connectionHolderBankDao.withdraw(fromId, amount);                connectionHolderInsuranceDao.deposit(toId, amount);            }        }.doJobInTransaction();    }}

?

在TransactionTemplateBankService的transfer方法中,我们创建了一个匿名的 TtransactionTemplate类,并且实现了doJob方法,在doJob方法中调用两个DAO完成业务操作,然后调用调用 TransactionTemplateBankService的doJobInTransaction方法。

?

由于TransactionTemplate只是对上一篇文章中事务处理的一层封装,故TransactionManager和两个DAO类都保持和上一篇中的一样,此时他们都使用SingleThreadConnectionHolder获得Connection,故事务处理成功。

?

在下一篇文章中,我们会讲到使用Java的动态代理来完成事务处理,这也是Spring管理事务的典型方法。

在本系列的上一篇文 章中,我们讲到了使用Template模式进行事务管理,这固然是一种很好的方法,但是不那么完美的地方在于我们依然需要在service层中编写和事务 处理相关的代码,即我们需要在service层中声明一个TransactionTemplate。在本篇文章中,我们将使用Java提供的动态代理 (Dynamic Proxy)功能来完成事务处理,你将看到无论是在service层还是DAO层都不会有事务处理代码,即他们根本就意识不到事务处理的存在。使用动态代 理完成事务处理也是AOP的一种典型应用。

?

这是一个关于Java事务处理的系列文章,请通过以下方式下载github源代码:

git clone?https://github.com/davenkin/java_transaction_workshop.git

?

Java动态代理的基本原理为:被代理对象需要实现某个接口(这是前提),代理对象会拦截对被代理对象的方法调用,在其中可以全然抛弃被代理对象的 方法实现而完成另外的功能,也可以在被代理对象方法调用的前后增加一些额外的功能。在本篇文章中,我们将拦截service层的transfer方法,在 其调用之前加入事务准备工作,然后调用原来的transfer方法,之后根据transfer方法是否执行成功决定commit还是rollback。

?

首先定义一个TransactionEnabledProxyManager类:

package davenkin.step5_transaction_proxy;import davenkin.step3_connection_holder.TransactionManager;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;public class TransactionEnabledProxyManager{    private TransactionManager transactionManager;    public TransactionEnabledProxyManager(TransactionManager transactionManager)    {        this.transactionManager = transactionManager;    }    public Object proxyFor(Object object)    {        return Proxy.newProxyInstance(object.getClass().getClassLoader(), object.getClass().getInterfaces(), new TransactionInvocationHandler(object, transactionManager));    }}class TransactionInvocationHandler implements InvocationHandler{    private Object proxy;    private TransactionManager transactionManager;    TransactionInvocationHandler(Object object, TransactionManager transactionManager)    {        this.proxy = object;        this.transactionManager = transactionManager;    }    public Object invoke(Object o, Method method, Object[] objects) throws Throwable    {        transactionManager.start();        Object result = null;        try        {            result = method.invoke(proxy, objects);            transactionManager.commit();        } catch (Exception e)        {            transactionManager.rollback();        } finally        {            transactionManager.close();        }        return result;    }}

?

通过调用该类的proxyFor方法,传入需要被代理的对象(本例中为service对象),返回一个代理对象。此后,在调用代理对象的 transfer方法时,会自动调用TransactionIvocationHandler的invoke方法,在该方法中,我们首先开始事务,然后执 行:

package davenkin.step5_transaction_proxy;import davenkin.BankService;import davenkin.step3_connection_holder.ConnectionHolderBankDao;import davenkin.step3_connection_holder.ConnectionHolderInsuranceDao;import javax.sql.DataSource;public class BareBankService implements BankService{    private ConnectionHolderBankDao connectionHolderBankDao;    private ConnectionHolderInsuranceDao connectionHolderInsuranceDao;    public BareBankService(DataSource dataSource)    {        connectionHolderBankDao = new ConnectionHolderBankDao(dataSource);        connectionHolderInsuranceDao = new ConnectionHolderInsuranceDao(dataSource);    }    public void transfer(final int fromId, final int toId, final int amount)    {        try        {            connectionHolderBankDao.withdraw(fromId, amount);            connectionHolderInsuranceDao.deposit(toId, amount);        } catch (Exception e)        {            throw new RuntimeException();        }    }}

?

如何,上面的BareBankService中没有任何事务处理的影子,我们只需关注核心业务逻辑即可。

?

然后在客户代码中,我们需要先创建代理对象(这在Spring中通常是通过配置实现的):

  @Test    public void transferFailure() throws SQLException    {        TransactionEnabledProxyManager transactionEnabledProxyManager = new TransactionEnabledProxyManager(new TransactionManager(dataSource));        BankService bankService = new BareBankService(dataSource);        BankService proxyBankService = (BankService) transactionEnabledProxyManager.proxyFor(bankService);        int toNonExistId = 3333;        proxyBankService.transfer(1111, toNonExistId, 200);        assertEquals(1000, getBankAmount(1111));        assertEquals(1000, getInsuranceAmount(2222));    }

?

在上面的测试代码中,我们首先创建一个BareBankService对象,然后调用 transactionEnabledProxyManager的proxyFor方法生成对原BareBankService对象的代理对象,最后在代 理对象上调用transfer方法,测试运行成功。

?

可以看到,通过以上动态代理实现,BareBankService中的所有public方法都被代理了,即他们都被加入到事务中。这对于 service层中的所有方法都需要和数据库打交道的情况是可以的,本例即如此(有且只有一个transfer方法),然而对于service层中不需要 和数据库打交道的public方法,这样做虽然也不会出错,但是却显得多余。在下一篇文章中,我们将讲到使用Java注解(annotation)的方式来声明一个方法是否需要事务,就像Spring中的Transactional注解一样。

在本系列的上一篇文 章中,我们讲到了使用动态代理的方式完成事务处理,这种方式将service层的所有public方法都加入到事务中,这显然不是我们需要的,需要代理的 只是那些需要操作数据库的方法。在本篇中,我们将讲到如何使用Java注解(Annotation)来标记需要事务处理的方法。

?

这是一个关于Java事务处理的系列文章,请通过以下方式下载github源代码:

git clone?https://github.com/davenkin/java_transaction_workshop.git

?

首先定义Transactional注解:

package davenkin.step6_annotation;import java.lang.annotation.ElementType;import java.lang.annotation.Retention;import java.lang.annotation.RetentionPolicy;import java.lang.annotation.Target;@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)public @interface Transactional{}

?

使用注解标记事务的基本原理为:依然使用上一篇中 讲到的动态代理的方式,只是在InvocationHandler的invoke方法中,首先判断被代理的方法是否标记有Transactional注 解,如果没有则直接调用method.invoke(proxied, objects),否则,先准备事务,在调用method.invoke(proxied, objects),然后根据该方法是否执行成功调用commit或rollback。定义 TransactionEnabledAnnotationProxyManager如下:

package davenkin.step6_annotation;import davenkin.step3_connection_holder.TransactionManager;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;public class TransactionEnabledAnnotationProxyManager{    private TransactionManager transactionManager;    public TransactionEnabledAnnotationProxyManager(TransactionManager transactionManager)    {        this.transactionManager = transactionManager;    }    public Object proxyFor(Object object)    {        return Proxy.newProxyInstance(object.getClass().getClassLoader(), object.getClass().getInterfaces(), new AnnotationTransactionInvocationHandler(object, transactionManager));    }}class AnnotationTransactionInvocationHandler implements InvocationHandler{    private Object proxied;    private TransactionManager transactionManager;    AnnotationTransactionInvocationHandler(Object object, TransactionManager transactionManager)    {        this.proxied = object;        this.transactionManager = transactionManager;    }    public Object invoke(Object proxy, Method method, Object[] objects) throws Throwable    {        Method originalMethod = proxied.getClass().getMethod(method.getName(), method.getParameterTypes());        if (!originalMethod.isAnnotationPresent(Transactional.class))        {            return method.invoke(proxied, objects);        }        transactionManager.start();        Object result = null;        try        {            result = method.invoke(proxied, objects);            transactionManager.commit();        } catch (Exception e)        {            transactionManager.rollback();        } finally        {            transactionManager.close();        }        return result;    }}

?

可以看到,在AnnotationTransactionInvocationHandler的invoke方法中,我们首先获得原service 的transfer方法,然后根据originalMethod.isAnnotationPresent(Transactional.class)判 断该方法是否标记有Transactional注解,如果没有,则任何额外功能都不加,直接调用原来service的transfer方法;否则,将其加 入到事务处理中。

?

在service层中,我们只需将需要加入事务处理的方法用Transactional注解标记就行了:

package davenkin.step6_annotation;import davenkin.BankService;import davenkin.step3_connection_holder.ConnectionHolderBankDao;import davenkin.step3_connection_holder.ConnectionHolderInsuranceDao;import javax.sql.DataSource;public class AnnotationBankService implements BankService{    private ConnectionHolderBankDao connectionHolderBankDao;    private ConnectionHolderInsuranceDao connectionHolderInsuranceDao;    public AnnotationBankService(DataSource dataSource)    {        connectionHolderBankDao = new ConnectionHolderBankDao(dataSource);        connectionHolderInsuranceDao = new ConnectionHolderInsuranceDao(dataSource);    }    @Transactional    public void transfer(final int fromId, final int toId, final int amount)    {        try        {            connectionHolderBankDao.withdraw(fromId, amount);            connectionHolderInsuranceDao.deposit(toId, amount);        } catch (Exception e)        {            throw new RuntimeException();        }    }}

?

然后执行测试:

    @Test    public void transferFailure() throws SQLException    {        TransactionEnabledAnnotationProxyManager transactionEnabledAnnotationProxyManager = new TransactionEnabledAnnotationProxyManager(new TransactionManager(dataSource));        BankService bankService = new AnnotationBankService(dataSource);        BankService proxyBankService = (BankService) transactionEnabledAnnotationProxyManager.proxyFor(bankService);        int toNonExistId = 3333;        proxyBankService.transfer(1111, toNonExistId, 200);        assertEquals(1000, getBankAmount(1111));        assertEquals(1000, getInsuranceAmount(2222));    }

?

测试运行成功,如果将AnnotationBankService中transfer方法的Transactional注解删除,那么以上测试将抛 出RuntimeException异常,该异常为transfer方法中我们人为抛出的,也即由于此时没有事务来捕捉异常,程序便直接抛出该异常而终止 运行。在下一篇(本系列最后一篇)文章中,我们将讲到分布式事务的一个入门例子。

在本系列先前的文章中,我们主要讲解了JDBC对本地事务的处理,本篇文章将讲到一个分布式事务的例子。

?

请通过以下方式下载github源代码:

git clone https://github.com/davenkin/jta-atomikos-hibernate-activemq.git

本地事务和分布式事务的区别在于:本地事务只用于处理单一数据源事务(比如单个数据库),分布式事务可以处理多种异构的数据源,比如某个业务操作中同时包含了JDBC和JMS或者某个操作需要访问多个不同的数据库。

?

Java通过JTA完成分布式事务,JTA本身只是一种规范,不同的应用服务器都包含有自己的实现(比如JbossJTA),同时还存在独立于应用服务器的单独JTA实现,比如本篇中要讲到的Atomikos。对于JTA的原理,这里不细讲,读者可以通过这篇文章了解相关知识。

?

在本篇文章中,我们将实现以下一个应用场景:你在网上购物,下了订单之后,订单数据将保存在系统的数据库中,同时为了安排物流,订单信息将以消息(Message)的方式发送到物流部门以便送货。

?

以上操作同时设计到数据库操作和JMS消息发送,为了使整个操作成为一个原子操作,我们只能选择分布式事务。我们首先设计一个service层,定义OrderService接口:

@XmlRootElement(name = "Order")@XmlAccessorType(XmlAccessType.FIELD)public class Order {    @XmlElement(name = "Id",required = true)    private long id;    @XmlElement(name = "ItemName",required = true)    private String itemName;    @XmlElement(name = "Price",required = true)    private double price;    @XmlElement(name = "BuyerName",required = true)    private String buyerName;    @XmlElement(name = "MailAddress",required = true)    private String mailAddress;    public Order() {    }

?

为了采用JAXB对Order对象进行Marshal和Unmarshal,我们在Order类中加入了JAXB相关的Annotation。 我们将使用Hibernate来完成数据持久化,然后使用Spring提供的JmsTemplate将Order转成xml后以TextMessage的 形式发送到物流部门的ORDER.QUEUE中。

?

(一)准备数据库

为了方便,我们将采用Spring提供的embedded数据库,默认情况下Spring采用HSQL作为后台数据库,虽然在本例中我们将采用HSQL的非XA的DataSource,但是通过Atomikos包装之后依然可以参与分布式事务。

SQL脚本包含在createDB.sql文件中:

CREATE TABLE USER_ORDER(ID INT NOT NULL,ITEM_NAME VARCHAR (100) NOT NULL UNIQUE,PRICE DOUBLE NOT NULL,BUYER_NAME CHAR (32) NOT NULL,MAIL_ADDRESS VARCHAR(500) NOT NULL,PRIMARY KEY(ID));

?

在Spring中配置DataSource如下:

  @BeforeClass    public static void startEmbeddedActiveMq() throws Exception {        broker = new BrokerService();        broker.addConnector("tcp://localhost:61616");        broker.start();    }    @AfterClass    public static void stopEmbeddedActiveMq() throws Exception {        broker.stop();    }

?

(三)实现OrderService

创建一个DefaultOrderService,该类实现了OrderService接口,并维护一个JmsTemplate和一个Hibernate的SessionFactory实例变量,分别用于Message的发送和数据库处理。

package davenkin;import org.hibernate.SessionFactory;import org.hibernate.classic.Session;import org.springframework.beans.factory.annotation.Required;import org.springframework.jms.core.JmsTemplate;import org.springframework.transaction.annotation.Transactional;public class DefaultOrderService  implements OrderService{    private JmsTemplate jmsTemplate;    private SessionFactory sessionFactory;    @Override    @Transactional    public void makeOrder(Order order) {        Session session = sessionFactory.getCurrentSession();        session.save(order);        jmsTemplate.convertAndSend(order);    }    @Required    public void setJmsTemplate(JmsTemplate jmsTemplate) {        this.jmsTemplate = jmsTemplate;    }    @Required    public void setSessionFactory(SessionFactory sessionFactory) {        this.sessionFactory = sessionFactory;    }}

?

(四)创建Order的Mapping配置文件

<?xml version="1.0"?><!DOCTYPE hibernate-mapping PUBLIC "-//Hibernate/Hibernate Mapping DTD 3.0//EN"        "http://hibernate.sourceforge.net/hibernate-mapping-3.0.dtd"><hibernate-mapping>    <class name="davenkin.Order" table="USER_ORDER">        <id name="id" type="long">            <column name="ID" />            <generator class="increment" />        </id>        <property name="itemName" type="string">            <column name="ITEM_NAME" />        </property>        <property name="price" type="double">            <column name="PRICE"/>        </property>        <property name="buyerName" type="string">            <column name="BUYER_NAME"/>        </property>        <property name="mailAddress" type="string">            <column name="MAIL_ADDRESS"/>        </property>    </class></hibernate-mapping>

?

(五)配置Atomikos事务

在Spring的IoC容器中,我们需要配置由Atomikos提供的UserTransaction和TransactionManager,然后再配置Spring的JtaTransactionManager:

  <bean id="userTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp" init-method="init" destroy-method="shutdownForce">        <constructor-arg>            <props>                <prop key="com.atomikos.icatch.service">com.atomikos.icatch.standalone.UserTransactionServiceFactory</prop>            </props>        </constructor-arg>    </bean>    <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close" depends-on="userTransactionService">        <property name="forceShutdown" value="false" />    </bean>    <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp" depends-on="userTransactionService">        <property name="transactionTimeout" value="300" />    </bean>    <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager" depends-on="userTransactionService">        <property name="transactionManager" ref="atomikosTransactionManager" />        <property name="userTransaction" ref="atomikosUserTransaction" />    </bean>    <tx:annotation-driven transaction-manager="jtaTransactionManager" />

?

(六)配置JMS

对于JMS,为了能使ActiveMQ加入到分布式事务中,我们需要配置ActiveMQXAConnectionFactory,而不是 ActiveMQConnectionFactory,然后再配置JmsTemplate,此外还需要配置MessageConvertor在Order 对象和XML之间互转。

    <bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">        <property name="brokerURL" value="tcp://localhost:61616" />    </bean>    <bean id="amqConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init">        <property name="uniqueResourceName" value="XAactiveMQ" />        <property name="xaConnectionFactory" ref="jmsXaConnectionFactory" />        <property name="poolSize" value="5"/>    </bean>    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">        <property name="connectionFactory" ref="amqConnectionFactory"/>        <property name="receiveTimeout" value="2000" />        <property name="defaultDestination" ref="orderQueue"/>        <property name="sessionTransacted" value="true" />        <property name="messageConverter" ref="oxmMessageConverter"/>    </bean>    <bean id="orderQueue" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg value="ORDER.QUEUE"/>    </bean>    <bean id="oxmMessageConverter"          class="org.springframework.jms.support.converter.MarshallingMessageConverter">        <property name="marshaller" ref="marshaller"/>        <property name="unmarshaller" ref="marshaller"/>    </bean>    <oxm:jaxb2-marshaller id="marshaller">        <oxm:class-to-be-bound name="davenkin.Order"/>    </oxm:jaxb2-marshaller>

?

(七)测试

在测试中,我们首先通过(二)中的方法启动ActiveMQ,再调用DefaultOrderService,最后对数据库和QUEUE进行验证:

   @Test    public void makeOrder(){        orderService.makeOrder(createOrder());        JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);        assertEquals(1, jdbcTemplate.queryForInt("SELECT COUNT(*) FROM USER_ORDER"));        String dbItemName = jdbcTemplate.queryForObject("SELECT ITEM_NAME FROM USER_ORDER", String.class);        String messageItemName = ((Order) jmsTemplate.receiveAndConvert()).getItemName();        assertEquals(dbItemName, messageItemName);    }    @Test(expected = IllegalArgumentException.class)    public void failToMakeOrder()    {        orderService.makeOrder(null);        JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);        assertEquals(0, jdbcTemplate.queryForInt("SELECT COUNT(*) FROM USER_ORDER"));        assertNull(jmsTemplate.receiveAndConvert());    }

?

?

?

?

热点排行