歡迎來到Spring Batch示例。Spring Batch是一個用於執行批次作業的spring框架模塊。我們可以使用Spring Batch來處理一系列的作業。
Spring Batch示例
在深入了解Spring Batch示例程序之前,讓我們對spring batch術語有一些了解。
- A job can consist of ‘n’ number of steps. Each step contains Read-Process-Write task or it can have single operation, which is called tasklet.
- 讀取-處理-寫入基本上是從數據庫、CSV等來源讀取數據,然後處理數據並將其寫入數據庫、CSV、XML等來源。
- Tasklet意味著執行單個任務或操作,例如在處理完成後清理連接、釋放資源。
- 可以將讀取-處理-寫入和tasklets鏈接在一起來運行作業。
Spring Batch示例
讓我們考慮一個實際的示例,用於實現Spring Batch。為了實現此目的,我們將考慮以下情景。需要將包含數據的CSV文件轉換為XML,數據和標籤將根據列名命名。以下是用於Spring Batch示例的重要工具和庫。
- Apache Maven 3.5.0 – 用於專案建置和依賴管理。
- Eclipse Oxygen Release 4.7.0 – 用於創建 Spring Batch Maven 應用程式的集成開發環境。
- Java 1.8
- Spring Core 4.3.12.RELEASE
- Spring OXM 4.3.12.RELEASE
- Spring JDBC 4.3.12.RELEASE
- Spring Batch 3.0.8.RELEASE
- MySQL Java Driver 5.1.25 – 根據您的 MySQL 安裝使用。這是 Spring Batch 元數據表所需的。
Spring Batch 範例目錄結構
下面的圖片展示了我們 Spring Batch 範例專案中的所有組件。
Spring Batch Maven 依賴項
以下是 pom.xml 文件的內容,包含了我們 Spring Batch 範例專案所需的所有依賴項。
<project xmlns="https://maven.apache.org/POM/4.0.0" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.journaldev.spring</groupId>
<artifactId>SpringBatchExample</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>SpringBatchDemo</name>
<url>https://maven.apache.org</url>
<properties>
<jdk.version>1.8</jdk.version>
<spring.version>4.3.12.RELEASE</spring.version>
<spring.batch.version>3.0.8.RELEASE</spring.batch.version>
<mysql.driver.version>5.1.25</mysql.driver.version>
<junit.version>4.11</junit.version>
</properties>
<dependencies>
<!-- Spring Core -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- Spring jdbc, for database -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- Spring XML to/back object -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- MySQL database driver -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.driver.version}</version>
</dependency>
<!-- Spring Batch dependencies -->
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>${spring.batch.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-infrastructure</artifactId>
<version>${spring.batch.version}</version>
</dependency>
<!-- Spring Batch unit test -->
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<version>${spring.batch.version}</version>
</dependency>
<!-- Junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.thoughtworks.xstream</groupId>
<artifactId>xstream</artifactId>
<version>1.4.10</version>
</dependency>
</dependencies>
<build>
<finalName>spring-batch</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.9</version>
<configuration>
<downloadSources>true</downloadSources>
<downloadJavadocs>false</downloadJavadocs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>${jdk.version}</source>
<target>${jdk.version}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Spring 批次處理 CSV 輸入檔案
這裡是我們範例 CSV 檔案的內容,用於 Spring 批次處理。
1001,Tom,Moody, 29/7/2013
1002,John,Parker, 30/7/2013
1003,Henry,Williams, 31/7/2013
Spring 批次作業配置
我們必須在配置文件中定義 spring bean 和 spring 批次作業。以下是 job-batch-demo.xml
檔案的內容,這是 Spring 批次專案中最重要的部分。
<beans xmlns="https://www.springframework.org/schema/beans"
xmlns:batch="https://www.springframework.org/schema/batch" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://www.springframework.org/schema/batch
https://www.springframework.org/schema/batch/spring-batch-3.0.xsd
https://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans-4.3.xsd
">
<import resource="../config/context.xml" />
<import resource="../config/database.xml" />
<bean id="report" class="com.journaldev.spring.model.Report"
scope="prototype" />
<bean id="itemProcessor" class="com.journaldev.spring.CustomItemProcessor" />
<batch:job id="DemoJobXMLWriter">
<batch:step id="step1">
<batch:tasklet>
<batch:chunk reader="csvFileItemReader" writer="xmlItemWriter"
processor="itemProcessor" commit-interval="10">
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
<bean id="csvFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="classpath:csv/input/report.csv" />
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean
class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="names" value="id,firstname,lastname,dob" />
</bean>
</property>
<property name="fieldSetMapper">
<bean class="com.journaldev.spring.ReportFieldSetMapper" />
<!-- if no data type conversion, use BeanWrapperFieldSetMapper to map
by name <bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
<property name="prototypeBeanName" value="report" /> </bean> -->
</property>
</bean>
</property>
</bean>
<bean id="xmlItemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
<property name="resource" value="file:xml/outputs/report.xml" />
<property name="marshaller" ref="reportMarshaller" />
<property name="rootTagName" value="report" />
</bean>
<bean id="reportMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
<property name="classesToBeBound">
<list>
<value>com.journaldev.spring.model.Report</value>
</list>
</property>
</bean>
</beans>
- 我們使用
FlatFileItemReader
讀取 CSV 檔案,使用CustomItemProcessor
處理數據,並使用StaxEventItemWriter
寫入 XML 檔案。 batch:job
– 這個標籤定義我們要創建的作業。Id 屬性指定作業的 ID。我們可以在單個 xml 檔案中定義多個作業。batch:step
– 這個標籤用於定義 Spring 批次作業的不同步驟。- Spring Batch 框架提供了兩種不同的處理方式,分別是「TaskletStep 導向」和「Chunk 導向」。在這個例子中,Chunk 導向的方式是指逐個讀取數據並創建 ‘chunks’,這些 ‘chunks’ 將在事務邊界內進行寫出。
- reader: 用於讀取數據的 Spring Bean。在這個例子中,我們使用了
csvFileItemReader
Bean,它是FlatFileItemReader
的實例。 - processor: 這是用於處理數據的類。在這個例子中,我們使用了
CustomItemProcessor
。 - writer: 用於將數據寫入 XML 文件的 Bean。
- commit-interval: 此屬性定義了一次處理後將提交的 chunk 大小。基本上,這意味著 ItemReader 將逐個讀取數據,而 ItemProcessor 也將以同樣的方式處理它,但只有當 ItemWriter 的數據大小等於 commit-interval 的大小時,才會寫入數據。
- 作為此項目的一部分使用的三個重要接口是來自
org.springframework.batch.item
包的ItemReader
、ItemProcessor
和ItemWriter
。
Spring Batch 模型類
首先,我們將CSV檔案讀入到Java物件中,然後使用JAXB將其寫入到XML檔案中。以下是我們的模型類別,其中包含所需的JAXB註解。
package com.journaldev.spring.model;
import java.util.Date;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement(name = "record")
public class Report {
private int id;
private String firstName;
private String lastName;
private Date dob;
@XmlAttribute(name = "id")
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
@XmlElement(name = "firstname")
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
@XmlElement(name = "lastname")
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
@XmlElement(name = "dob")
public Date getDob() {
return dob;
}
public void setDob(Date dob) {
this.dob = dob;
}
@Override
public String toString() {
return "Report [id=" + id + ", firstname=" + firstName + ", lastName=" + lastName + ", DateOfBirth=" + dob
+ "]";
}
}
請注意,模型類別的欄位應與Spring Batch映射器配置中定義的相同,即在我們的案例中:property name="names" value="id,firstname,lastname,dob"
。
Spring Batch FieldSetMapper
A custom FieldSetMapper
is needed to convert a Date. If no data type conversion is required, then only BeanWrapperFieldSetMapper
should be used to map the values by name automatically. The java class which extends FieldSetMapper
is ReportFieldSetMapper
.
package com.journaldev.spring;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;
import com.journaldev.spring.model.Report;
public class ReportFieldSetMapper implements FieldSetMapper {
private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
public Report mapFieldSet(FieldSet fieldSet) throws BindException {
Report report = new Report();
report.setId(fieldSet.readInt(0));
report.setFirstName(fieldSet.readString(1));
report.setLastName(fieldSet.readString(2));
// 預設格式為yyyy-MM-dd
// fieldSet.readDate(4);
String date = fieldSet.readString(3);
try {
report.setDob(dateFormat.parse(date));
} catch (ParseException e) {
e.printStackTrace();
}
return report;
}
}
Spring Batch項目處理器
現在,根據作業配置,itemProcessor將在itemWriter之前觸發。我們為此創建了一個名為CustomItemProcessor.java的類別。
package com.journaldev.spring;
import org.springframework.batch.item.ItemProcessor;
import com.journaldev.spring.model.Report;
public class CustomItemProcessor implements ItemProcessor<Report, Report> {
public Report process(Report item) throws Exception {
System.out.println("Processing..." + item);
String fname = item.getFirstName();
String lname = item.getLastName();
item.setFirstName(fname.toUpperCase());
item.setLastName(lname.toUpperCase());
return item;
}
}
我們可以在ItemProcessor實現中操作數據,正如您所見,我正在將名字的值轉換為大寫。
Spring配置檔案
在我们的Spring批处理配置文件中,我们已导入两个额外的配置文件 – context.xml
和 database.xml
。
<beans xmlns="https://www.springframework.org/schema/beans"
xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
https://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans-4.3.xsd">
<!-- stored job-meta in memory -->
<!--
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager" />
</bean>
-->
<!-- stored job-meta in database -->
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
<property name="dataSource" ref="dataSource" />
<property name="transactionManager" ref="transactionManager" />
<property name="databaseType" value="mysql" />
</bean>
<bean id="transactionManager"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
</beans>
- jobRepository – JobRepository 负责将每个Java对象存储到其正确的元数据表中,用于Spring批处理。
- transactionManager – 这负责在提交间隔的大小和已处理数据相等时提交事务。
- jobLauncher – 这是Spring批处理的核心。该接口包含用于触发作业的run方法。
<beans xmlns="https://www.springframework.org/schema/beans"
xmlns:jdbc="https://www.springframework.org/schema/jdbc" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans-4.3.xsd
https://www.springframework.org/schema/jdbc
https://www.springframework.org/schema/jdbc/spring-jdbc-4.3.xsd">
<!-- connect to database -->
<bean id="dataSource"
class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://localhost:3306/Test" />
<property name="username" value="test" />
<property name="password" value="test123" />
</bean>
<bean id="transactionManager"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<!-- create job-meta tables automatically -->
<!-- <jdbc:initialize-database data-source="dataSource"> <jdbc:script location="org/springframework/batch/core/schema-drop-mysql.sql"
/> <jdbc:script location="org/springframework/batch/core/schema-mysql.sql"
/> </jdbc:initialize-database> -->
</beans>
Spring Batch使用一些元数据表来存储批处理作业信息。我们可以从Spring批处理配置中获取它们的创建,但建议通过执行SQL文件手动完成,正如您在上面的注释代码中所见。从安全性的角度来看,最好不要给予Spring批处理数据库用户DDL执行权限。
Spring Batch Tables
Spring Batch 表格非常接近於 Java 中代表它們的領域對象。例如 – JobInstance、JobExecution、JobParameters 和 StepExecution 分別映射到 BATCH_JOB_INSTANCE、BATCH_JOB_EXECUTION、BATCH_JOB_EXECUTION_PARAMS 和 BATCH_STEP_EXECUTION。ExecutionContext 同時映射到 BATCH_JOB_EXECUTION_CONTEXT 和 BATCH_STEP_EXECUTION_CONTEXT。JobRepository 負責將每個 Java 對象保存和存儲到其正確的表格中。以下是每個元數據表的詳細信息。
- Batch_job_instance:BATCH_JOB_INSTANCE 表格包含與 JobInstance 相關的所有信息。
- Batch_job_execution_params:BATCH_JOB_EXECUTION_PARAMS 表格包含與 JobParameters 對象相關的所有信息。
- Batch_job_execution:BATCH_JOB_EXECUTION 表格包含與 JobExecution 對象相關的數據。每次運行作業時都會添加一個新行。
- Batch_step_execution:BATCH_STEP_EXECUTION 表格包含與 StepExecution 對象相關的所有信息。
- Batch_job_execution_context:BATCH_JOB_EXECUTION_CONTEXT 表格包含與作業的 ExecutionContext 相關的數據。對於每個 JobExecution 恰好有一個 Job ExecutionContext,它包含該特定作業執行所需的所有作業級數據。此數據通常表示必須在故障後檢索的狀態,以便 JobInstance 可以從失敗的地方重新啟動。
- Batch_step_execution_context:BATCH_STEP_EXECUTION_CONTEXT表保存与步骤的ExecutionContext相关的数据。每个StepExecution都有一个ExecutionContext,它包含特定步骤执行所需持久化的所有数据。这些数据通常表示在故障发生后必须检索的状态,以便JobInstance可以从故障发生的地方重新启动。
- Batch_job_execution_seq:此表保存作业的数据执行顺序。
- Batch_step_execution_seq:此表保存步骤执行的顺序数据。
- Batch_job_seq:此表保存作业顺序的数据,以防我们有多个作业,将获得多个行。
Spring Batch测试程序
我们的Spring Batch示例项目已准备就绪,最后一步是编写一个测试类将其执行为Java程序。
package com.journaldev.spring;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class App {
public static void main(String[] args) {
String[] springConfig = { "spring/batch/jobs/job-batch-demo.xml" };
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(springConfig);
JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
Job job = (Job) context.getBean("DemoJobXMLWriter");
JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
.toJobParameters();
try {
JobExecution execution = jobLauncher.run(job, jobParameters);
System.out.println("Exit Status : " + execution.getStatus());
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Done");
context.close();
}
}
只需运行上述程序,您将获得以下的输出XML。
<?xml version="1.0" encoding="UTF-8"?><report><record id="1001"><dob>2013-07-29T00:00:00+05:30</dob><firstname>TOM</firstname><lastname>MOODY</lastname></record><record id="1002"><dob>2013-07-30T00:00:00+05:30</dob><firstname>JOHN</firstname><lastname>PARKER</lastname></record><record id="1003"><dob>2013-07-31T00:00:00+05:30</dob><firstname>HENRY</firstname><lastname>WILLIAMS</lastname></record></report>
这就是Spring Batch示例的全部内容,您可以从下面的链接下载最终项目。
参考: 官方指南
Source:
https://www.digitalocean.com/community/tutorials/spring-batch-example