Spring Batch 4.0.0 已发布,Spring Batch 是一个轻量级的,完全面向 Spring 的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch 以 POJO 和 Spring 框架为基础,使开发者更容易的访问和利用企业级服务。Spring Batch 可以提供大量的,可重复的数据处理功能,包括日志记录/跟踪,事务管理,作业处理统计工作重新启动、跳过,和资源管理等重要功能。
Spring Batch 4.0 是自2014年 3.0 发布以来的又一个主要版本。3.0 发布至今,Spring 生态系统发生了很大变化,4.0 版本旨在让 Spring Batch 更新这些变化。
New Baseline
Spring Batch 3 基于 Spring Framework 4 ,Spring Batch 4 重新审视了它的依赖关系树,将它与即将到来的 Spring Boot 2 依赖关系树进行了更新,包括将 Spring Framework 5 和 Java 8 作为基线要求。
改进的 Java 配置 如前所述,Spring Batch 4 是 Spring Boot 发布以来的第一个主要版本。在这个版本中,改善了让用户的 Java 配置体验。所有 ItemReader 和 ItemWriter 实现现在都可以使用构建器。使用 Spring Batch 3 的 Java 配置功能,将需要如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Bean public FlatFileItemReader<Foo> reader (Resource resource) throws Exception { FlatFileItemReader<Foo> reader = new FlatFileItemReader <>(); reader.setName(“fooReader”); reader.setResource(resource); BeanWrapperFieldSetMapper<Foo> fieldSetMapper = new BeanWrapperFieldSetMapper <>(); fieldSetMapper.setTargetType(Foo.class); fieldSetMapper.afterPropertiesSet(); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer (); tokenizer.setNames(new String [] {“first”, “second”, “third”}); tokenizer.afterPropertiesSet(); DefaultLineMapper lineMapper = new DefaultLineMapper (); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(fieldSetMapper); reader.setLineMapper(lineMapper); return reader; }
使用 Spring Batch 4,配置简化如下:
1 2 3 4 5 6 7 8 9 10 @Bean public FlatFileItemReader<Foo> reader (Resource resource) { return new FlatFileItemReaderBuilder <Foo>() .name(“fooReader”) .resource(resource) .delimited() .names(new String []{“first”, “second”, “third”}) .targetType(Foo.class) .build(); }
开始 创建批处理服务 本指南将引导您完成创建基本批处理驱动解决方案的过程。
你要建什么 您将构建一个服务,该服务从CSV电子表格中导入数据,用自定义代码对其进行转换,并将最终结果存储在数据库中。
你需要什么 大约15分钟
最喜欢的文本编辑器或IDE JDK 1.8或以后
Gradle 2.3或Maven 3.0+
您还可以直接将代码导入IDE中:
Spring Tool Suite (STS)
IntelliJ IDEA
如何完成本指南 像大多数Spring一样入门指南,您可以从头开始并完成每个步骤,也可以绕过您已经熟悉的基本设置步骤。无论哪种方式,你最终都会得到工作代码。
到白手兴家,继续前进用Gradle建造…
到跳过基础,做以下工作:
下载并解压缩本指南的源存储库,或用Git:
1 git clone https://github.com/spring-guides/gs-batch-processing.git
cd into gs-batch-processing/initial
跳到创建一个business类…
当你完成中的代码检查结果,gs-batch-processing/complete.
用Gradle创建 首先,您设置了一个基本的构建脚本。在使用Spring构建应用程序时,您可以使用任何您喜欢的构建系统,但是您需要使用的代码用Gradle和Maven包括在这里。如果您对这两种情况都不熟悉,请参阅用Gradle构建Java项目或使用Maven构建Java项目…
创建目录结构 在您选择的项目目录中,创建以下子目录结构;例如,使用mkdir-p src/main/java/hello关于*nix系统:
1 2 3 4 └── src └── main └── java └── hello
创建一个Gradle构建文件
下面是initial Gradle build file….
build.gradle
1 2 3 4 5 6 7 buildscript{repositories{mavenCentral() }dependencies{classpath("org.springframework.boot:spring-boot-gradle-plugin:1.5.9.RELEASE" ) } }apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'idea' apply plugin: 'org.springframework.boot' jar{baseName= 'gs-batch-processing' version= '0.1.0' }repositories{mavenCentral() }sourceCompatibility= 1. 8targetCompatibility= 1. 8dependencies{compile("org.springframework.boot:spring-boot-starter-batch" )compile("org.hsqldb:hsqldb" )testCompile("junit:junit" ) }
Spring Boot gradle plugin提供了许多方便的功能:
它收集类路径上的所有JAR,并构建一个可运行的“über-jar”,这使得执行和传输服务更加方便。
它搜索public static void main()方法标记为可运行的类。
它提供了一个内置的依赖项解析器,它将版本号设置为匹配。Spring Boot dependencies您可以覆盖任何版本,但它将默认为Boot所选的一组版本。
用Maven构建 首先,您设置了一个基本的构建脚本。在使用Spring构建应用程序时,您可以使用任何您喜欢的构建系统,但是您需要使用的代码马文包括在这里。如果您不熟悉Maven,请参阅使用Maven构建Java项目…
创建目录结构
在您选择的项目目录中,创建以下子目录结构;例如,使用mkdir-p src/main/java/hello关于*nix系统:
1 2 3 4 └── src └── main └── java └── hello
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <groupId > org.springframework</groupId > <artifactId > gs-batch-processing</artifactId > <version > 0.1.0</version > <parent > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > 1.5.9.RELEASE</version > </parent > <properties > <java.version > 1.8</java.version > </properties > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-batch</artifactId > </dependency > <dependency > <groupId > org.hsqldb</groupId > <artifactId > hsqldb</artifactId > </dependency > </dependencies > <build > <plugins > <plugin > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-maven-plugin</artifactId > </plugin > </plugins > </build > </project >
大Spring Boot Maven插件提供了许多方便的功能:
它收集类路径上的所有JAR,并构建一个可运行的“über-jar”,这使得执行和传输服务更加方便。
它搜索public static void main()方法标记为可运行的类。
它提供了一个内置的依赖项解析器,它将版本号设置为匹配。Spring Boot dependencies您可以覆盖任何版本,但它将默认为Boot所选的一组版本。
用IDE构建 阅读如何将本指南直接导入Spring Tool Suite…
阅读如何使用本指南IntelliJ IDEA…
业务数据 通常,您的客户或业务分析师提供电子表格。在这种情况下,你自己编出来的。
SRC/Main/Resources/Sample-data.csv
1 2 3 4 5 Jill,Doe Joe,Doe Justin,Doe Jane,Doe John,Doe
此电子表格包含每行的名称和姓氏,以逗号分隔。正如您所看到的,这是Spring处理的一个非常常见的模式。
接下来,编写一个SQL脚本来创建一个表来存储数据。
SRC/main/resources/schema-all.sql
1 2 DROP TABLE people IF EXISTS ;CREATE TABLE people(person_id BIGINT IDENTITY NOT NULL PRIMARY KEY,first_name VARCHAR (20 ),last_name VARCHAR (20 ));
Spring Boot模式schema-@@platform@@.sql启动时自动。-all是所有平台的默认设置。
创建一个business类 现在您已经看到了数据输入和输出的格式,您可以编写代码来表示一行数据.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 packagehello; public class Person { private StringlastName; private StringfirstName; public Person () { } public Person (StringfirstName, StringlastName) { this .firstName=firstName; this .lastName=lastName; } public voidsetFirstName (StringfirstName) { this .firstName=firstName; } public StringgetFirstName () { returnfirstName; } public StringgetLastName () { returnlastName; } public voidsetLastName (StringlastName) { this .lastName=lastName; } @Override public StringtoString () { return "firstName: " +firstName+ ", lastName: " +lastName; } }
您可以实例化person通过构造函数,或通过设置属性,使用姓和名初始化。
批处理中的一个常见范例是摄取数据,转换数据,然后将其输送到其他地方。在这里,您将编写一个简单的转换器,将名称转换为大写。
SRC/main/java/hello/PersonItemProcessor.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 packagehello; importorg.slf4j.Logger; importorg.slf4j.LoggerFactory; importorg.springframework.batch.item.ItemProcessor; public class PersonItemProcessor implements ItemProcessor <Person, Person> { private static final Loggerlog= LoggerFactory.getLogger(PersonItemProcessor.class); @Override public Personprocess (final Personperson) throws Exception { final StringfirstName=person.getFirstName().toUpperCase(); final StringlastName=person.getLastName().toUpperCase(); final PersontransformedPerson= new Person (firstName,lastName);log.info("Converting (" +person+ ") into (" +transformedPerson+ ")" ); returntransformedPerson; } }
PersonItemProcessor器实现Spring批处理ItemProcessor接口。这使代码很容易连接到一个批处理作业中,您可以在本指南中进一步对其进行定义。根据接口,您将收到一个传入的person对象,然后将其转换upper-cased Person.
不需要输入和输出类型相同。事实上,在读取一个数据源之后,有时应用程序的数据流需要不同的数据类型。
把一批工作放在一起 现在,您将实际的批处理作业组合在一起。SpringBatch提供了许多实用程序类,这些类减少了编写自定义代码的需要。相反,您可以专注于业务逻辑。
SRC/main/java/hello/BatchConfiguration.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 packagehello; importjavax.sql.DataSource; importorg.springframework.batch.core.Job; importorg.springframework.batch.core.JobExecutionListener; importorg.springframework.batch.core.Step; importorg.springframework.batch.core.configuration.annotation.EnableBatchProcessing; importorg.springframework.batch.core.configuration.annotation.JobBuilderFactory; importorg.springframework.batch.core.configuration.annotation.StepBuilderFactory; importorg.springframework.batch.core.launch.support.RunIdIncrementer; importorg.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider; importorg.springframework.batch.item.database.JdbcBatchItemWriter; importorg.springframework.batch.item.file.FlatFileItemReader; importorg.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; importorg.springframework.batch.item.file.mapping.DefaultLineMapper; importorg.springframework.batch.item.file.transform.DelimitedLineTokenizer; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.context.annotation.Bean; importorg.springframework.context.annotation.Configuration; importorg.springframework.core.io.ClassPathResource; importorg.springframework.jdbc.core.JdbcTemplate; @Configuration @EnableBatchProcessing public class BatchConfiguration { @Autowired public JobBuilderFactoryjobBuilderFactory; @Autowired public StepBuilderFactorystepBuilderFactory; @Autowired public DataSourcedataSource; @Bean public FlatFileItemReader<Person>reader() { FlatFileItemReader<Person>reader= new FlatFileItemReader <Person>();reader.setResource(new ClassPathResource ("sample-data.csv" ));reader.setLineMapper(new DefaultLineMapper <Person>() {{setLineTokenizer(new DelimitedLineTokenizer () {{setNames(new String [] { "firstName" , "lastName" }); }});setFieldSetMapper(new BeanWrapperFieldSetMapper <Person>() {{setTargetType(Person.class); }}); }}); returnreader; } @Bean public PersonItemProcessorprocessor () { return new PersonItemProcessor (); } @Bean public JdbcBatchItemWriter<Person>writer() { JdbcBatchItemWriter<Person>writer= new JdbcBatchItemWriter <Person>();writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider <Person>());writer.setSql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)" );writer.setDataSource(dataSource); returnwriter; } @Bean public JobimportUserJob (JobCompletionNotificationListenerlistener) { returnjobBuilderFactory.get("importUserJob" ) .incrementer(new RunIdIncrementer ()) .listener(listener) .flow(step1()) .end() .build(); } @Bean public Stepstep1 () { returnstepBuilderFactory.get("step1" ) .<Person, Person>chunk(10 ) .reader(reader()) .processor(processor()) .writer(writer()) .build(); } }
首先,@EnableBatchProcessing注释添加了许多支持作业的关键bean,并为您节省了大量的腿工作。此示例使用基于内存的数据库(由@EnableBatchProcessing),这意味着当它完成时,数据就会消失。
把它分解:
SRC/main/java/hello/BatchConfiguration.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Bean public FlatFileItemReader<Person>reader() { FlatFileItemReader<Person>reader= new FlatFileItemReader <Person>();reader.setResource(new ClassPathResource ("sample-data.csv" ));reader.setLineMapper(new DefaultLineMapper <Person>() {{setLineTokenizer(new DelimitedLineTokenizer () {{setNames(new String [] { "firstName" , "lastName" }); }});setFieldSetMapper(new BeanWrapperFieldSetMapper <Person>() {{setTargetType(Person.class); }}); }}); returnreader; } @Bean public PersonItemProcessorprocessor () { return new PersonItemProcessor (); } @Bean public JdbcBatchItemWriter<Person>writer() { JdbcBatchItemWriter<Person>writer= new JdbcBatchItemWriter <Person>();writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider <Person>());writer.setSql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)" );writer.setDataSource(dataSource); returnwriter; }
第一个代码块定义输入、处理器和输出。—reader()创建ItemReader.它查找一个名为sample-data.csv并使用足够的信息解析每一行项,从而将其转换为person…processor()创建我们的PersonItemProcessor您在前面定义,目的是大写数据。—write(DataSource)创建ItemWriter。此对象针对JDBC目标,并自动获取由@EnableBatchProcessing它包括插入单个person由Javabean属性驱动。
下一部分重点关注实际的作业配置。
SRC/main/java/hello/BatchConfiguration.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Bean public JobimportUserJob (JobCompletionNotificationListenerlistener) { returnjobBuilderFactory.get("importUserJob" ) .incrementer(new RunIdIncrementer ()) .listener(listener) .flow(step1()) .end() .build(); } @Bean public Stepstep1 () { returnstepBuilderFactory.get("step1" ) .<Person, Person>chunk(10 ) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }
第一个方法定义作业,三定义一个步骤。作业是由步骤构建的,每个步骤都可以包含一个阅读器、一个处理器和一个编写器。
在此作业定义中,您需要一个增量器,因为作业使用数据库来维护执行状态。然后列出每个步骤,其中此作业只有一个步骤。作业结束,JavaAPI生成一个配置完美的作业。
在步骤定义中,定义一次写入多少数据。在这种情况下,它一次最多写十条记录。接下来,使用前面注入的位来配置读取器、处理器和写入器。
chunk()是前缀<Person,Person>因为这是一个通用的方法。这表示处理的每个“块”的输入和输出类型,并与ItemReader和ItemWriter...
SRC/main/java/hello/JobCompletionNotificationListener.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 packagehello; importjava.sql.ResultSet; importjava.sql.SQLException; importjava.util.List; importorg.slf4j.Logger; importorg.slf4j.LoggerFactory; importorg.springframework.batch.core.BatchStatus; importorg.springframework.batch.core.JobExecution; importorg.springframework.batch.core.listener.JobExecutionListenerSupport; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.jdbc.core.JdbcTemplate; importorg.springframework.jdbc.core.RowMapper; importorg.springframework.stereotype.Component; @Component public class JobCompletionNotificationListener extends JobExecutionListenerSupport { private static final Loggerlog= LoggerFactory.getLogger(JobCompletionNotificationListener.class); private final JdbcTemplatejdbcTemplate; @Autowired public JobCompletionNotificationListener (JdbcTemplatejdbcTemplate) { this .jdbcTemplate=jdbcTemplate; } @Override public voidafterJob (JobExecutionjobExecution) { if (jobExecution.getStatus() == BatchStatus.COMPLETED) {log.info("!!! JOB FINISHED! Time to verify the results" ); List<Person>results=jdbcTemplate.query("SELECT first_name, last_name FROM people" , new RowMapper <Person>() { @Override public PersonmapRow (ResultSetrs, introw) throws SQLException { return new Person (rs.getString(1 ),rs.getString(2 )); } }); for (Personperson:results) {log.info("Found <" +person+ "> in the database." ); } } } }
此代码监听作业为BatchStatus.COMPLETED,然后使用JdbcTemplate检查结果。
使应用程序可执行 虽然批处理可以嵌入到Web应用程序和WAR文件中,但下面演示的更简单的方法创建了一个独立的应用程序。您将所有东西打包到一个可执行的JAR文件中,该文件由一个好的旧Java驱动。main()方法。
src/main/java/hello/Application.java
1 2 3 4 5 6 7 8 9 10 11 12 packagehello; importorg.springframework.boot.SpringApplication; importorg.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Application { public static voidmain (String[]args) throws Exception { SpringApplication.run(Application.class,args); } }
@SpringBootApplication是一个方便的注释,它添加了以下所有内容:
@Configuration将类标记为应用程序上下文的bean定义的源。
@EnableAutoConfiguration告诉SpringBoot根据类路径设置、其他bean和各种属性设置开始添加bean。
通常你会添加@EnableWebMvc对于SpringMVC应用程序,但是SpringBoot在看到Springwebmvc在类路径上。这会将应用程序标记为web应用程序,并激活关键行为,例如设置DispatcherServlet…
@ComponentScan告诉Spring在hello包,让它找到控制器。
main()方法使用SpringBoot的SpringApplication.run()方法来启动应用程序。您注意到没有一行XML吗?不web.xml也要归档。这个Web应用程序是100%纯Java,您不必处理配置任何管道或基础设施的问题。
出于演示目的,有一些代码可以创建一个JdbcTemplate,查询数据库,并打印批处理作业插入的人员的名称。
构建一个可执行的JAR
您可以使用Gradle或Maven从命令行运行应用程序。或者,您可以构建一个包含所有必需的依赖项、类和资源的单个可执行JAR文件,并运行该文件。这使得在整个开发生命周期、跨不同环境等将服务作为应用程序进行发布、版本和部署变得非常容易。
如果使用Gradle 则可以使用./gradlew bootRun.或者您可以使用以下方法构建JAR文件./gradlew build然后可以运行JAR文件:
1 java -jar build/libs/gs-batch-processing-0.1.0.jar
如果您使用的是Maven 则可以使用./mvnw Spring-boot:运行.或者您可以用./mvnw clean package然后可以运行JAR文件:
1 java -jar target/gs-batch-processing-0.1.0.jar
上面的过程将创建一个可运行的JAR。你也可以选择构建一个经典的WAR文件相反。
该职务为每个被转换的人打印一行。作业运行后,还可以看到查询数据库的输出。
1 2 3 4 5 6 7 8 9 10 Converting (firstName: Jill, lastName: Doe) into (firstName: JILL, lastName: DOE) Converting (firstName: Joe, lastName: Doe) into (firstName: JOE, lastName: DOE) Converting (firstName: Justin, lastName: Doe) into (firstName: JUSTIN, lastName: DOE) Converting (firstName: Jane, lastName: Doe) into (firstName: JANE, lastName: DOE) Converting (firstName: John, lastName: Doe) into (firstName: JOHN, lastName: DOE) Found in the database. Found in the database. Found in the database. Found in the database. Found in the database.
摘要 恭喜你!您构建了一个批处理作业,该作业从电子表格中摄取数据,并对其进行处理,并将其写入数据库。
See also
The following guides may also be helpful:
Building an Application with Spring Boot
Accessing Data with GemFire
Accessing Data with JPA
Accessing Data with MongoDB
Accessing data with MySQL