分散トランザクション入門-spring+JTA+Atomikos+Hibernate+JMS
ローカル・トランザクションと分散トランザクションの違いは、ローカル・トランザクションは単一のデータ・ソース・トランザクション(単一のデータベースなど)のみを処理するために使用され、分散トランザクションは、JDBCとJMSが同時に含まれるビジネス・オペレーションや、複数の異なるデータベースにアクセスする必要があるオペレーションなど、さまざまな異機種のデータ・ソースを処理できます.
JavaはJTAを通じて分散トランザクションを完了し、JTA自体は単なる仕様であり、異なるアプリケーションサーバには独自の実装(例えばJbossJTA)が含まれていると同時に、本編で述べるAtomikosのようなアプリケーションサーバから独立した個別のJTA実装も存在する.JTAの本来の理については、ここでは詳しくは述べないが、読者はこの文章を通じて関連知識を知ることができる.
この記事では、オンラインで買い物をしたり、注文をしたりすると、注文データがシステムのデータベースに保存され、物流を手配するために注文情報がメッセージ(Message)で物流部門に送信され、配送されます.
以上の操作は、データベース操作とJMSメッセージ送信を同時に設計し、操作全体を原子操作にするために、分散トランザクションしか選択できません.まずサービス層を設計し、OrderServiceインタフェースを定義します.
単純化のために、非常に単純なレルムオブジェクトOrderを設計します.
JAXBを用いてOrderオブジェクトに対してMarshalとUnmarshalを行うために,OrderクラスにJAXB関連Annotationを加えた.Hibernateを使用してデータの永続化を完了し、Springが提供するJmsTemplateを使用してOrderをxmlに変換した後、TextMessageの形で物流部門のORDERに送信します.QUEUEにあります.
(一)データベースの準備
便宜上、Springが提供するembeddedデータベースを使用します.デフォルトではSpringはバックグラウンド・データベースとしてHSQLを使用します.この例では、HSQLのXA以外のデータ・ソースを使用しますが、Atomikosパッケージを使用しても分散トランザクションに参加できます.
SQLスクリプトはcreateDBに含まれています.sqlファイル:
SpringでのDataSourceの構成は次のとおりです.
(二)ActiveMQの起動
私たちはembeddedのActiveMQを採用し、テストの前にActiveMQが提供するBrokerServiceを起動し、テストの実行が終わった後にBrokerServiceを閉じます.
(三)OrderServiceの実現
OrderServiceインタフェースを実装し、JmsTemplateとHibernateのSessionFactoryインスタンス変数をそれぞれMessageの送信およびデータベース処理に使用するDefaultOrderServiceを作成します.
(四)OrderのMappingプロファイルの作成
(五)Atomikosトランザクションの構成
SpringのIoCコンテナでは、Atomikosから提供されるUserTransactionとTransactionManagerを構成し、SpringのJtaTransactionManagerを構成する必要があります.
(六)JMSの構成
JMSの場合、ActiveMQを分散トランザクションに追加するには、ActiveMQConnectionFactoryではなくActiveMQXAConnectionFactoryを構成し、JmsTemplateを構成する必要があります.また、OrderオブジェクトとXMLの間でMessageConvertorを相互に回転させる必要があります.
(七)テスト
テストでは、まず(2)の方法でActiveMQを起動し、DefaultOrderServiceを呼び出し、最後にデータベースとQUEUEを検証します.
JavaはJTAを通じて分散トランザクションを完了し、JTA自体は単なる仕様であり、異なるアプリケーションサーバには独自の実装(例えばJbossJTA)が含まれていると同時に、本編で述べるAtomikosのようなアプリケーションサーバから独立した個別のJTA実装も存在する.JTAの本来の理については、ここでは詳しくは述べないが、読者はこの文章を通じて関連知識を知ることができる.
この記事では、オンラインで買い物をしたり、注文をしたりすると、注文データがシステムのデータベースに保存され、物流を手配するために注文情報がメッセージ(Message)で物流部門に送信され、配送されます.
以上の操作は、データベース操作とJMSメッセージ送信を同時に設計し、操作全体を原子操作にするために、分散トランザクションしか選択できません.まずサービス層を設計し、OrderServiceインタフェースを定義します.
package davenkin;
public interface OrderService {
public void makeOrder(Order order);
}
単純化のために、非常に単純なレルムオブジェクトOrderを設計します.
@XmlRootElement(name = "Order")
@XmlAccessorType(XmlAccessType.FIELD)
public class Order {
@XmlElement(name = "Id",required = true)
private long id;
@XmlElement(name = "ItemName",required = true)
private String itemName;
@XmlElement(name = "Price",required = true)
private double price;
@XmlElement(name = "BuyerName",required = true)
private String buyerName;
@XmlElement(name = "MailAddress",required = true)
private String mailAddress;
public Order() {
}
JAXBを用いてOrderオブジェクトに対してMarshalとUnmarshalを行うために,OrderクラスにJAXB関連Annotationを加えた.Hibernateを使用してデータの永続化を完了し、Springが提供するJmsTemplateを使用してOrderをxmlに変換した後、TextMessageの形で物流部門のORDERに送信します.QUEUEにあります.
(一)データベースの準備
便宜上、Springが提供するembeddedデータベースを使用します.デフォルトではSpringはバックグラウンド・データベースとしてHSQLを使用します.この例では、HSQLのXA以外のデータ・ソースを使用しますが、Atomikosパッケージを使用しても分散トランザクションに参加できます.
SQLスクリプトはcreateDBに含まれています.sqlファイル:
CREATE TABLE USER_ORDER( ID INT NOT NULL, ITEM_NAME VARCHAR (100) NOT NULL UNIQUE, PRICE DOUBLE NOT NULL, BUYER_NAME CHAR (32) NOT NULL, MAIL_ADDRESS VARCHAR(500) NOT NULL, PRIMARY KEY(ID) );
SpringでのDataSourceの構成は次のとおりです.
<jdbc:embedded-database id="dataSource">
<jdbc:script location="classpath:createDB.sql"/>
</jdbc:embedded-database>
(二)ActiveMQの起動
私たちはembeddedのActiveMQを採用し、テストの前にActiveMQが提供するBrokerServiceを起動し、テストの実行が終わった後にBrokerServiceを閉じます.
@BeforeClass
public static void startEmbeddedActiveMq() throws Exception {
broker = new BrokerService();
broker.addConnector("tcp://localhost:61616");
broker.start();
}
@AfterClass
public static void stopEmbeddedActiveMq() throws Exception {
broker.stop();
}
(三)OrderServiceの実現
OrderServiceインタフェースを実装し、JmsTemplateとHibernateのSessionFactoryインスタンス変数をそれぞれMessageの送信およびデータベース処理に使用するDefaultOrderServiceを作成します.
package davenkin;
import org.hibernate.SessionFactory;
import org.hibernate.classic.Session;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.annotation.Transactional;
public class DefaultOrderService implements OrderService{
private JmsTemplate jmsTemplate;
private SessionFactory sessionFactory;
@Override
@Transactional
public void makeOrder(Order order) {
Session session = sessionFactory.getCurrentSession();
session.save(order);
jmsTemplate.convertAndSend(order);
}
@Required
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
@Required
public void setSessionFactory(SessionFactory sessionFactory) {
this.sessionFactory = sessionFactory;
}
}
(四)OrderのMappingプロファイルの作成
<?xml version="1.0"?>
<!DOCTYPE hibernate-mapping PUBLIC "-//Hibernate/Hibernate Mapping DTD 3.0//EN" "http://hibernate.sourceforge.net/hibernate-mapping-3.0.dtd">
<hibernate-mapping>
<class name="davenkin.Order" table="USER_ORDER">
<id name="id" type="long">
<column name="ID" />
<generator class="increment" />
</id>
<property name="itemName" type="string">
<column name="ITEM_NAME" />
</property>
<property name="price" type="double">
<column name="PRICE"/>
</property>
<property name="buyerName" type="string">
<column name="BUYER_NAME"/>
</property>
<property name="mailAddress" type="string">
<column name="MAIL_ADDRESS"/>
</property>
</class>
</hibernate-mapping>
(五)Atomikosトランザクションの構成
SpringのIoCコンテナでは、Atomikosから提供されるUserTransactionとTransactionManagerを構成し、SpringのJtaTransactionManagerを構成する必要があります.
<bean id="userTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp" init-method="init" destroy-method="shutdownForce">
<constructor-arg>
<props>
<prop key="com.atomikos.icatch.service">com.atomikos.icatch.standalone.UserTransactionServiceFactory</prop>
</props>
</constructor-arg>
</bean>
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close" depends-on="userTransactionService">
<property name="forceShutdown" value="false" />
</bean>
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp" depends-on="userTransactionService">
<property name="transactionTimeout" value="300" />
</bean>
<bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager" depends-on="userTransactionService">
<property name="transactionManager" ref="atomikosTransactionManager" />
<property name="userTransaction" ref="atomikosUserTransaction" />
</bean>
<tx:annotation-driven transaction-manager="jtaTransactionManager" />
(六)JMSの構成
JMSの場合、ActiveMQを分散トランザクションに追加するには、ActiveMQConnectionFactoryではなくActiveMQXAConnectionFactoryを構成し、JmsTemplateを構成する必要があります.また、OrderオブジェクトとXMLの間でMessageConvertorを相互に回転させる必要があります.
<bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="amqConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init">
<property name="uniqueResourceName" value="XAactiveMQ" />
<property name="xaConnectionFactory" ref="jmsXaConnectionFactory" />
<property name="poolSize" value="5"/>
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="amqConnectionFactory"/>
<property name="receiveTimeout" value="2000" />
<property name="defaultDestination" ref="orderQueue"/>
<property name="sessionTransacted" value="true" />
<property name="messageConverter" ref="oxmMessageConverter"/>
</bean>
<bean id="orderQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="ORDER.QUEUE"/>
</bean>
<bean id="oxmMessageConverter" class="org.springframework.jms.support.converter.MarshallingMessageConverter">
<property name="marshaller" ref="marshaller"/>
<property name="unmarshaller" ref="marshaller"/>
</bean>
<oxm:jaxb2-marshaller id="marshaller">
<oxm:class-to-be-bound name="davenkin.Order"/>
</oxm:jaxb2-marshaller>
(七)テスト
テストでは、まず(2)の方法でActiveMQを起動し、DefaultOrderServiceを呼び出し、最後にデータベースとQUEUEを検証します.
@Test
public void makeOrder(){
orderService.makeOrder(createOrder());
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
assertEquals(1, jdbcTemplate.queryForInt("SELECT COUNT(*) FROM USER_ORDER"));
String dbItemName = jdbcTemplate.queryForObject("SELECT ITEM_NAME FROM USER_ORDER", String.class);
String messageItemName = ((Order) jmsTemplate.receiveAndConvert()).getItemName();
assertEquals(dbItemName, messageItemName);
}
@Test(expected = IllegalArgumentException.class)
public void failToMakeOrder()
{
orderService.makeOrder(null);
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
assertEquals(0, jdbcTemplate.queryForInt("SELECT COUNT(*) FROM USER_ORDER"));
assertNull(jmsTemplate.receiveAndConvert());
}