Java EE Batch Processingに触ってみた
公式リファレンス: http://www.oracle.com/technetwork/articles/java/batch-1965499.html
参考(jBatch): http://qiita.com/opengl-8080/items/0293bcff97261cb7d0cf
Java EE7?から追加されたっぽいJSR352のBatch Processingが色々気になってたのでやってみた
でBatch Processingを動かすにはJSR352に対応したなコンテナ(Glassfishとか)が必要になるっぽいけど、めんどくさいのでSpring BatchがJSR352に対応している模様なのでそれを利用して実行する方式を使う
SampleItemReader.java
package sample;
import javax.batch.api.chunk.ItemReader;
import javax.batch.api.chunk.AbstractItemReader;
import javax.inject.Named;
@Named("batch-reader")
public class SampleItemReader extends AbstractItemReader {
private int count = 0;
@Override
public Object readItem() throws Exception {
count++;
return count < 10 ? "hoge" : null;
}
}
上記のjBatch参考に書いてけど、後々設定ファイルで参照するクラスは@Namedで指定した名前でも参照させる事が可能な模様。あと公式ではItemReaderっていうインターフェースを実装しているけど、今回はそれの抽象クラスにあたると思われるAbstractItemReaderを継承して実装
おそらくはここでnullを返す事をすればバッチチャンク終了となり、すでにあるバッチチャンクをItemWriterに流して終了的な感じになるのではと
**** SampleItemProcessor.java
package sample;
import javax.batch.api.chunk.ItemProcessor;
public class SampleItemProcessor implements ItemProcessor {
@Override
public Object processItem(Object item) {
return ((String)item).toUpperCase();
}
}
ItemReader#readItemで返されたデータをここでなんらかの処理をしてItemWriterに受け流す役割な感じなのではと。でItemWriterに受け流す際には指定したチャンクサイズ?(item-count)に応じてItemWriterが作用するような仕組みっぽい
SampleItemWriter.java
package sample;
import java.util.List;
import javax.batch.api.chunk.AbstractItemWriter;
public class SampleItemWriter extends AbstractItemWriter {
@Override
public void writeItems(List<Object> items) {
// ItemProcessor側で処理したデータをチャンクサイズに応じてたまった分がこちらで処理出来るので、書き出し処理等を実装
}
}
とまぁBatch Processingを動かすのに必要なのは書いたので
Main.java
import javax.batch.operations.JobOperator;
import javax.batch.runtime.BatchRuntime;
public class Main {
public static void main(String[] args) {
JobOperator job = BatchRuntime.getJobOperator();
job.start("sample", new java.util.Properties());
}
}
な感じで実行してバッチ処理を開始させる事が出来る。但し設定ファイル関連をまだ書いてないので実行してもエラーになる
META-INF/batch-jobs/sample.xml
<?xml version="1.0" encoding="UTF-8"?>
<job id="sample" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
<step id="sample-chunk">
<chunk item-count="5">
<!-- <reader ref="sample.SampleItemReader" /> -->
<!-- @Namedで指定した名前でも指定可能 -->
<reader ref="batch-reader" />
<processor ref="sample.SampleItemProcessor" />
<writer ref="sample.SampleItemWriter" />
</chunk>
</step>
</job>
JobOperator#startの引数で指定したジョブ名と同等の名前なxmlファイルを作るべきな模様。んまぁここまでならJSR352に対応したコンテナで実行すれば出来るんだと思われる(但し、Webコンテナに置く場合はこのファイルの配置場所は異なる。詳しくは上記参考リンク)
んでSpring Batchで動かしてみるのでそれに応じたSpringな設定が必要になる(※別記事にて補足済み)
baseContext.xml
<?xml version="1.0" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://xmlns.jcp.org/xml/ns/javaee
http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd">
<import resource="database.xml" />
<context:component-scan base-package="sample" />
<bean id="jobParametersConverter" class="org.springframework.batch.core.jsr.JsrJobParametersConverter">
<constructor-arg ref="dataSource" />
</bean>
<bean
id="jobExplorer"
class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean"
p:dataSource-ref="dataSource"
p:tablePrefix="BATCH_" />
<bean
id="jobRepository"
class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"
p:dataSource-ref="dataSource"
p:transactionManager-ref="transactionManager"
p:maxVarCharLength="2000"
p:isolationLevelForCreate="ISOLATION_SERIALIZABLE" />
</beans>
database.xmlは
<?xml version="1.0" ?>
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:jdbc="http://www.springframework.org/schema/jdbc"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/jdbc
http://www.springframework.org/schema/jdbc/spring-jdbc-3.2.xsd">
<bean
id="dataSource"
class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://localhost:3306/sample" />
<property name="username" value="kinjouj" />
<property name="password" value="1234" />
</bean>
<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<jdbc:initialize-database data-source="dataSource">
<jdbc:script location="org/springframework/batch/core/schema-drop-mysql.sql" />
<jdbc:script location="org/springframework/batch/core/schema-mysql.sql" />
</jdbc:initialize-database>
</beans>
なSpring JDBCに関わる設定とSpring Batchで使うデータベース上の定義が必要になるSQLを実行するような仕組みをやっておく
実行
実装して処理の流れなログとってみると
情報: Executing step: [sample-chunk] 12 28, 2014 4:32:26 午後 sample.SampleItemReader readItem 情報: readItem: 0 12 28, 2014 4:32:26 午後 sample.SampleItemProcessor processItem 情報: processItem: hoge 12 28, 2014 4:32:26 午後 sample.SampleItemReader readItem 情報: readItem: 1 12 28, 2014 4:32:26 午後 sample.SampleItemProcessor processItem 情報: processItem: hoge 12 28, 2014 4:32:26 午後 sample.SampleItemReader readItem 情報: readItem: 2 12 28, 2014 4:32:26 午後 sample.SampleItemProcessor processItem 情報: processItem: hoge 12 28, 2014 4:32:26 午後 sample.SampleItemReader readItem 情報: readItem: 3 12 28, 2014 4:32:26 午後 sample.SampleItemProcessor processItem 情報: processItem: hoge 12 28, 2014 4:32:26 午後 sample.SampleItemReader readItem 情報: readItem: 4 12 28, 2014 4:32:26 午後 sample.SampleItemProcessor processItem 情報: processItem: hoge 12 28, 2014 4:32:26 午後 sample.SampleItemWriter writeItems 情報: writeItems: 5 12 28, 2014 4:32:26 午後 sample.SampleItemReader readItem 情報: readItem: 5 12 28, 2014 4:32:26 午後 sample.SampleItemProcessor processItem 情報: processItem: hoge 12 28, 2014 4:32:26 午後 sample.SampleItemReader readItem 情報: readItem: 6 12 28, 2014 4:32:26 午後 sample.SampleItemProcessor processItem 情報: processItem: hoge 12 28, 2014 4:32:26 午後 sample.SampleItemReader readItem 情報: readItem: 7 12 28, 2014 4:32:26 午後 sample.SampleItemProcessor processItem 情報: processItem: hoge 12 28, 2014 4:32:26 午後 sample.SampleItemReader readItem 情報: readItem: 8 12 28, 2014 4:32:26 午後 sample.SampleItemProcessor processItem 情報: processItem: hoge 12 28, 2014 4:32:26 午後 sample.SampleItemReader readItem 情報: readItem: 9 12 28, 2014 4:32:26 午後 sample.SampleItemWriter writeItems 情報: writeItems: 9
てな感じで実行結果になる。item-countで指定した分のチャンクサイズに応じてItemWriter#writeItemsが作用するようになっている模様。またreadItemがnullを返した場合にはprocessItemは発生せずにすでにたまっているチャンクをそのままwriteItemsに作用するように受け流している模様
んまぁそんなざっくりとした感じでやってみた。まだざっくりとした調査しかしてないので今後も色々勉強しつつ書く予定。今回はこんくらいで