Spring Boot整合Spring Batch

引言

  Spring Batch 是处理大量数据操作的一个框架,主要用来读取大量数据,然后进行一定的处理后输出指定的形式。比如我们可以将 csv 文件中的数据(数据量几百万甚至几千万都是没问题的)批处理插入保存到数据库中,就可以使用该框架,但是不管是数据资料还是网上资料,我看到很少有这样的详细讲解。所以本片博文的主要目的边讲解的同时边实战(其中的代码都是经过实践的)。同样地先从 Spring Boot 对 Batch 框架的支持说起,最后一步一步进行代码实践!

 


 

一、Spring Boot 对 Batch 框架的支持

1、Spring Batch 框架的组成部分

  1)JobRepository:用来注册 Job 容器,设置数据库相关属性。

  2)JobLauncher:用来启动 Job 的接口

  3)Job:我们要实际执行的任务,包含一个或多个

  4)Step:即步骤,包括:ItemReader->ItemProcessor->ItemWriter

  5)ItemReader:用来读取数据,做实体类与数据字段之间的映射。比如读取 csv 文件中的人员数据,之后对应实体 person 的字段做 mapper

  6)ItemProcessor:用来处理数据的接口,同时可以做数据校验(设置校验器,使用 JSR-303(hibernate-validator) 注解),比如将中文性别男 / 女,转为 M/F。同时校验年龄字段是否符合要求等

  7)ItemWriter:用来输出数据的接口,设置数据库源。编写预处理 SQL 插入语句

以上七个组成部分,只需要在配置类中逐一注册即可,同时配置类需要开启 @EnableBatchProcessing 注解

@Configuration
@EnableBatchProcessing // 开启批处理的支持
@Import(DruidDBConfig.class) // 注入 datasource
public class CsvBatchConfig {

}

2、批处理流程图

如下流程图即可以解释在配置类中为什么需要这么定义,具体请看实战部分的代码。

 

二、实战

1、添加依赖

1)spring batch 依赖

<!--  spring batch -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

2)校验器依赖

<!-- hibernate validator -->
<dependency>
    <groupId>org.hibernate</groupId>
    <artifactId>hibernate-validator</artifactId>
    <version>6.0.7.Final</version>
</dependency>

3)mysql+druid 依赖

<!-- mysql connector-->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.35</version>
</dependency>
<!-- alibaba dataSource -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid</artifactId>
    <version>1.1.12</version>
</dependency>

4)test 测试依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
</dependency>

2、application.yml 配置

当 job 发布开始执行任务时,spring batch 会自动生成相关的 batch 开头的表。这些表一开始是不存在的!需要在 application 配置文件中做相关的设置。

# batch
  batch:
    job:
      # 默认自动执行定义的 Job(true),改为 false,需要 jobLaucher.run 执行
      enabled: false
    # spring batch 在数据库里面创建默认的数据表,如果不是 always 则会提示相关表不存在
    initialize-schema: always
    # 设置 batch 表的前缀
#    table-prefix: csv-batch

3、数据源配置

  datasource:
    username: root
    password: 1234
    url: jdbc:mysql://127.0.0.1:3306/db_base?useSSL=false&serverTimezone=UTC&characterEncoding=utf8
    driver-class-name: com.mysql.jdbc.Driver

注册 DBConfig 配置类:之后通过 import 导入 batch 配置类中

/**
 * @author jian
 * @dete 2019/4/20
 * @description 自定义 DataSource
 *
 */
@Configuration
public class DruidDBConfig {
</span><span style="color: rgba(0, 0, 255, 1)">private</span> Logger logger = LoggerFactory.getLogger(DruidDBConfig.<span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">);

@Value(</span>"${spring.datasource.url}"<span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> String dbUrl;

@Value(</span>"${spring.datasource.username}"<span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> String username;

@Value(</span>"${spring.datasource.password}"<span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> String password;

@Value(</span>"${spring.datasource.driver-class-name}"<span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> String driverClassName;

/* @Value("${spring.datasource.initialSize}")
private int initialSize;

@Value("${spring.datasource.minIdle}")
private int minIdle;

@Value("${spring.datasource.maxActive}")
private int maxActive;

@Value("${spring.datasource.maxWait}")
private int maxWait;

@Value("${spring.datasource.timeBetweenEvictionRunsMillis}")
private int timeBetweenEvictionRunsMillis;

@Value("${spring.datasource.minEvictableIdleTimeMillis}")
private int minEvictableIdleTimeMillis;

@Value("${spring.datasource.validationQuery}")
private String validationQuery;

@Value("${spring.datasource.testWhileIdle}")
private boolean testWhileIdle;

@Value("${spring.datasource.testOnBorrow}")
private boolean testOnBorrow;

@Value("${spring.datasource.testOnReturn}")
private boolean testOnReturn;

@Value("${spring.datasource.poolPreparedStatements}")
private boolean poolPreparedStatements;

@Value("${spring.datasource.maxPoolPreparedStatementPerConnectionSize}")
private int maxPoolPreparedStatementPerConnectionSize;

@Value("${spring.datasource.filters}")
private String filters;

@Value("{spring.datasource.connectionProperties}")
private String connectionProperties;</span><span style="color: rgba(0, 128, 0, 1)">*/</span><span style="color: rgba(0, 0, 0, 1)">

@Bean
@Primary  </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 被注入的优先级最高</span>
<span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> DataSource dataSource() {
    DruidDataSource dataSource </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> DruidDataSource();
    logger.info(</span>"--------&gt;dataSource[url="+dbUrl+" ,username="+username+"]"<span style="color: rgba(0, 0, 0, 1)">);
    dataSource.setUrl(dbUrl);
    dataSource.setUsername(username);
    dataSource.setPassword(password);
    dataSource.setDriverClassName(driverClassName);

    </span><span style="color: rgba(0, 128, 0, 1)">/*</span><span style="color: rgba(0, 128, 0, 1)">  //configuration
    datasource.setInitialSize(initialSize);
    datasource.setMinIdle(minIdle);
    datasource.setMaxActive(maxActive);
    datasource.setMaxWait(maxWait);
    datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
    datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
    datasource.setValidationQuery(validationQuery);
    datasource.setTestWhileIdle(testWhileIdle);
    datasource.setTestOnBorrow(testOnBorrow);
    datasource.setTestOnReturn(testOnReturn);
    datasource.setPoolPreparedStatements(poolPreparedStatements);
    datasource.setMaxPoolPreparedStatementPerConnectionSize(maxPoolPreparedStatementPerConnectionSize);
    try {
        datasource.setFilters(filters);
    } catch (SQLException e) {
        logger.error("druid configuration initialization filter", e);
    }
    datasource.setConnectionProperties(connectionProperties);</span><span style="color: rgba(0, 128, 0, 1)">*/</span>

    <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> dataSource;
}

@Bean
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> ServletRegistrationBean druidServletRegistrationBean() {
    ServletRegistrationBean servletRegistrationBean </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> ServletRegistrationBean();
    servletRegistrationBean.setServlet(</span><span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> StatViewServlet());
    servletRegistrationBean.addUrlMappings(</span>"/druid/*"<span style="color: rgba(0, 0, 0, 1)">);
    </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> servletRegistrationBean;
}

</span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)">
 * 注册DruidFilter拦截
 *
 * </span><span style="color: rgba(128, 128, 128, 1)">@return</span>
 <span style="color: rgba(0, 128, 0, 1)">*/</span><span style="color: rgba(0, 0, 0, 1)">
@Bean
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> FilterRegistrationBean duridFilterRegistrationBean() {
    FilterRegistrationBean filterRegistrationBean </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> FilterRegistrationBean();
    filterRegistrationBean.setFilter(</span><span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> WebStatFilter());
    Map</span>&lt;String, String&gt; initParams = <span style="color: rgba(0, 0, 255, 1)">new</span> HashMap&lt;String, String&gt;<span style="color: rgba(0, 0, 0, 1)">();
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">设置忽略请求</span>
    initParams.put("exclusions", "*.js,*.gif,*.jpg,*.bmp,*.png,*.css,*.ico,/druid/*"<span style="color: rgba(0, 0, 0, 1)">);
    filterRegistrationBean.setInitParameters(initParams);
    filterRegistrationBean.addUrlPatterns(</span>"/*"<span style="color: rgba(0, 0, 0, 1)">);
    </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> filterRegistrationBean;
}

}

View Code

4、编写 batch 配置类

在配置类中,注册 Spring Batch 的各个组成部分即可,其中部分说明已在代码中注释.

/**
 *
 * @author jian
 * @date 2019/4/28
 * @description spring batch cvs 文件批处理配置需要注入 Spring Batch 以下组成部分
 * spring batch 组成:
 * 1)JobRepository 注册 job 的容器
 * 2)JonLauncher 用来启动 job 的接口
 * 3)Job 实际执行的任务,包含一个或多个 Step
 * 4)Step Step 步骤包括 ItemReader、ItemProcessor 和 ItemWrite
 * 5)ItemReader 读取数据的接口
 * 6)ItemProcessor 处理数据的接口
 * 7)ItemWrite 输出数据的接口
 *
 *
 */
@Configuration
@EnableBatchProcessing // 开启批处理的支持
@Import(DruidDBConfig.class) // 注入 datasource
public class CsvBatchConfig {
    private Logger logger = LoggerFactory.getLogger(CsvBatchConfig.class);
</span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)">
 * ItemReader定义:读取文件数据+entirty映射
 * </span><span style="color: rgba(128, 128, 128, 1)">@return</span>
 <span style="color: rgba(0, 128, 0, 1)">*/</span><span style="color: rgba(0, 0, 0, 1)">
@Bean
</span><span style="color: rgba(0, 0, 255, 1)">public</span> ItemReader&lt;Person&gt;<span style="color: rgba(0, 0, 0, 1)"> reader(){
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 使用FlatFileItemReader去读cvs文件,一行即一条数据</span>
    FlatFileItemReader&lt;Person&gt; reader = <span style="color: rgba(0, 0, 255, 1)">new</span> FlatFileItemReader&lt;&gt;<span style="color: rgba(0, 0, 0, 1)">();
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 设置文件处在路径</span>
    reader.setResource(<span style="color: rgba(0, 0, 255, 1)">new</span> ClassPathResource("person.csv"<span style="color: rgba(0, 0, 0, 1)">));
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> entity与csv数据做映射</span>
    reader.setLineMapper(<span style="color: rgba(0, 0, 255, 1)">new</span> DefaultLineMapper&lt;Person&gt;<span style="color: rgba(0, 0, 0, 1)">() {
        {
            setLineTokenizer(</span><span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> DelimitedLineTokenizer() {
                {
                    setNames(</span><span style="color: rgba(0, 0, 255, 1)">new</span> String[]{"id", "name", "age", "gender"<span style="color: rgba(0, 0, 0, 1)">});
                }
            });
            setFieldSetMapper(</span><span style="color: rgba(0, 0, 255, 1)">new</span> BeanWrapperFieldSetMapper&lt;Person&gt;<span style="color: rgba(0, 0, 0, 1)">() {
                {
                    setTargetType(Person.</span><span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">);
                }
            });
        }
    });
    </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> reader;
}


</span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)">
 * 注册ItemProcessor: 处理数据+校验数据
 * </span><span style="color: rgba(128, 128, 128, 1)">@return</span>
 <span style="color: rgba(0, 128, 0, 1)">*/</span><span style="color: rgba(0, 0, 0, 1)">
@Bean
</span><span style="color: rgba(0, 0, 255, 1)">public</span> ItemProcessor&lt;Person, Person&gt;<span style="color: rgba(0, 0, 0, 1)"> processor(){
    CvsItemProcessor cvsItemProcessor </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> CvsItemProcessor();
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 设置校验器</span>

cvsItemProcessor.setValidator(csvBeanValidator());
return cvsItemProcessor;
}

</span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)">
 * 注册校验器
 * </span><span style="color: rgba(128, 128, 128, 1)">@return</span>
 <span style="color: rgba(0, 128, 0, 1)">*/</span><span style="color: rgba(0, 0, 0, 1)">
@Bean
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> CsvBeanValidator csvBeanValidator(){
    </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">new</span> CsvBeanValidator&lt;Person&gt;<span style="color: rgba(0, 0, 0, 1)">();
}

</span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)">
 * ItemWriter定义:指定datasource,设置批量插入sql语句,写入数据库
 * </span><span style="color: rgba(128, 128, 128, 1)">@param</span><span style="color: rgba(0, 128, 0, 1)"> dataSource
 * </span><span style="color: rgba(128, 128, 128, 1)">@return</span>
 <span style="color: rgba(0, 128, 0, 1)">*/</span><span style="color: rgba(0, 0, 0, 1)">
@Bean
</span><span style="color: rgba(0, 0, 255, 1)">public</span> ItemWriter&lt;Person&gt;<span style="color: rgba(0, 0, 0, 1)"> writer(DataSource dataSource){
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 使用jdbcBcatchItemWrite写数据到数据库中</span>
    JdbcBatchItemWriter&lt;Person&gt; writer = <span style="color: rgba(0, 0, 255, 1)">new</span> JdbcBatchItemWriter&lt;&gt;<span style="color: rgba(0, 0, 0, 1)">();
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 设置有参数的sql语句</span>
    writer.setItemSqlParameterSourceProvider(<span style="color: rgba(0, 0, 255, 1)">new</span> BeanPropertyItemSqlParameterSourceProvider&lt;Person&gt;<span style="color: rgba(0, 0, 0, 1)">());
    String sql </span>= "insert into person values(:id,:name,:age,:gender)"<span style="color: rgba(0, 0, 0, 1)">;
    writer.setSql(sql);
    writer.setDataSource(dataSource);
    </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> writer;
}

</span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)">
 * JobRepository定义:设置数据库,注册Job容器
 * </span><span style="color: rgba(128, 128, 128, 1)">@param</span><span style="color: rgba(0, 128, 0, 1)"> dataSource
 * </span><span style="color: rgba(128, 128, 128, 1)">@param</span><span style="color: rgba(0, 128, 0, 1)"> transactionManager
 * </span><span style="color: rgba(128, 128, 128, 1)">@return</span><span style="color: rgba(0, 128, 0, 1)">
 * </span><span style="color: rgba(128, 128, 128, 1)">@throws</span><span style="color: rgba(0, 128, 0, 1)"> Exception
 </span><span style="color: rgba(0, 128, 0, 1)">*/</span><span style="color: rgba(0, 0, 0, 1)">
@Bean
</span><span style="color: rgba(0, 0, 255, 1)">public</span> JobRepository cvsJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception{
    JobRepositoryFactoryBean jobRepositoryFactoryBean </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> JobRepositoryFactoryBean();
    jobRepositoryFactoryBean.setDatabaseType(</span>"mysql"<span style="color: rgba(0, 0, 0, 1)">);
    jobRepositoryFactoryBean.setTransactionManager(transactionManager);
    jobRepositoryFactoryBean.setDataSource(dataSource);
    </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> jobRepositoryFactoryBean.getObject();
}

</span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)">
 * jobLauncher定义:
 * </span><span style="color: rgba(128, 128, 128, 1)">@param</span><span style="color: rgba(0, 128, 0, 1)"> dataSource
 * </span><span style="color: rgba(128, 128, 128, 1)">@param</span><span style="color: rgba(0, 128, 0, 1)"> transactionManager
 * </span><span style="color: rgba(128, 128, 128, 1)">@return</span><span style="color: rgba(0, 128, 0, 1)">
 * </span><span style="color: rgba(128, 128, 128, 1)">@throws</span><span style="color: rgba(0, 128, 0, 1)"> Exception
 </span><span style="color: rgba(0, 128, 0, 1)">*/</span><span style="color: rgba(0, 0, 0, 1)">
@Bean
</span><span style="color: rgba(0, 0, 255, 1)">public</span> SimpleJobLauncher csvJobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception{
    SimpleJobLauncher jobLauncher </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> SimpleJobLauncher();
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 设置jobRepository</span>

jobLauncher.setJobRepository(cvsJobRepository(dataSource, transactionManager));
return jobLauncher;
}

</span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)">
 * 定义job
 * </span><span style="color: rgba(128, 128, 128, 1)">@param</span><span style="color: rgba(0, 128, 0, 1)"> jobs
 * </span><span style="color: rgba(128, 128, 128, 1)">@param</span><span style="color: rgba(0, 128, 0, 1)"> step
 * </span><span style="color: rgba(128, 128, 128, 1)">@return</span>
 <span style="color: rgba(0, 128, 0, 1)">*/</span><span style="color: rgba(0, 0, 0, 1)">
@Bean
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> Job importJob(JobBuilderFactory jobs, Step step){
    </span><span style="color: rgba(0, 0, 255, 1)">return</span> jobs.get("importCsvJob"<span style="color: rgba(0, 0, 0, 1)">)
            .incrementer(</span><span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> RunIdIncrementer())
            .flow(step)
            .end()
            .listener(csvJobListener())
            .build();
}

</span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)">
 * 注册job监听器
 * </span><span style="color: rgba(128, 128, 128, 1)">@return</span>
 <span style="color: rgba(0, 128, 0, 1)">*/</span><span style="color: rgba(0, 0, 0, 1)">
@Bean
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> CsvJobListener csvJobListener(){
    </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> CsvJobListener();
}


</span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)">
 * step定义:步骤包括ItemReader-&gt;ItemProcessor-&gt;ItemWriter 即读取数据-&gt;处理校验数据-&gt;写入数据
 * </span><span style="color: rgba(128, 128, 128, 1)">@param</span><span style="color: rgba(0, 128, 0, 1)"> stepBuilderFactory
 * </span><span style="color: rgba(128, 128, 128, 1)">@param</span><span style="color: rgba(0, 128, 0, 1)"> reader
 * </span><span style="color: rgba(128, 128, 128, 1)">@param</span><span style="color: rgba(0, 128, 0, 1)"> writer
 * </span><span style="color: rgba(128, 128, 128, 1)">@param</span><span style="color: rgba(0, 128, 0, 1)"> processor
 * </span><span style="color: rgba(128, 128, 128, 1)">@return</span>
 <span style="color: rgba(0, 128, 0, 1)">*/</span><span style="color: rgba(0, 0, 0, 1)">
@Bean
</span><span style="color: rgba(0, 0, 255, 1)">public</span> Step step(StepBuilderFactory stepBuilderFactory, ItemReader&lt;Person&gt;<span style="color: rgba(0, 0, 0, 1)"> reader,
                 ItemWriter</span>&lt;Person&gt; writer, ItemProcessor&lt;Person, Person&gt;<span style="color: rgba(0, 0, 0, 1)"> processor){
    </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> stepBuilderFactory
            .get(</span>"step"<span style="color: rgba(0, 0, 0, 1)">)
            .</span>&lt;Person, Person&gt;chunk(65000) <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Chunk的机制(即每次读取一条数据,再处理一条数据,累积到一定数量后再一次性交给writer进行写入操作)</span>

.reader(reader)
.processor(processor)
.writer(writer)
.build();

}

}

View Code

5、定义处理器

只需要实现 ItemProcessor 接口,重写 process 方法,输入的参数是从 ItemReader 读取到的数据,返回的数据给 ItemWriter

/**
 * @author jian
 * @date 2019/4/28
 * @description
 * CSV 文件数据处理及校验
 * 只需要实现 ItemProcessor 接口,重写 process 方法,输入的参数是从 ItemReader 读取到的数据,返回的数据给 ItemWriter
 */
public class CvsItemProcessor extends ValidatingItemProcessor<Person> {
    private Logger logger = LoggerFactory.getLogger(CvsItemProcessor.class);
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> Person process(Person item) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> ValidationException {
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 执行super.process()才能调用自定义的校验器</span>
    logger.info("processor start validating..."<span style="color: rgba(0, 0, 0, 1)">);
    </span><span style="color: rgba(0, 0, 255, 1)">super</span><span style="color: rgba(0, 0, 0, 1)">.process(item);

    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 数据处理,比如将中文性别设置为M/F</span>
    <span style="color: rgba(0, 0, 255, 1)">if</span> ("男"<span style="color: rgba(0, 0, 0, 1)">.equals(item.getGender())) {
        item.setGender(</span>"M"<span style="color: rgba(0, 0, 0, 1)">);
    } </span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)"> {
        item.setGender(</span>"F"<span style="color: rgba(0, 0, 0, 1)">);
    }
    logger.info(</span>"processor end validating..."<span style="color: rgba(0, 0, 0, 1)">);
    </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> item;
}

}

6、定义校验器

定义校验器:使用 JSR-303(hibernate-validator) 注解,来校验 ItemReader 读取到的数据是否满足要求。如不满足则不会进行接下来的批处理任务。

/**
 *
 * @author jian
 * @date 2019/4/28
 * @param <T>
 * @description 定义校验器:使用 JSR-303(hibernate-validator) 注解,来校验 ItemReader 读取到的数据是否满足要求。
 */

public class CsvBeanValidator<T> implements Validator<T>, InitializingBean {

</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> javax.validation.Validator validator;


</span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)">
 * 进行JSR-303的Validator的初始化
 * </span><span style="color: rgba(128, 128, 128, 1)">@throws</span><span style="color: rgba(0, 128, 0, 1)"> Exception
 </span><span style="color: rgba(0, 128, 0, 1)">*/</span><span style="color: rgba(0, 0, 0, 1)">
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> afterPropertiesSet() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
    ValidatorFactory validatorFactory </span>=<span style="color: rgba(0, 0, 0, 1)"> Validation.buildDefaultValidatorFactory();
    validator </span>=<span style="color: rgba(0, 0, 0, 1)"> validatorFactory.usingContext().getValidator();
}

</span><span style="color: rgba(0, 128, 0, 1)">/**</span><span style="color: rgba(0, 128, 0, 1)">
 * 使用validator方法检验数据
 * </span><span style="color: rgba(128, 128, 128, 1)">@param</span><span style="color: rgba(0, 128, 0, 1)"> value
 * </span><span style="color: rgba(128, 128, 128, 1)">@throws</span><span style="color: rgba(0, 128, 0, 1)"> ValidationException
 </span><span style="color: rgba(0, 128, 0, 1)">*/</span><span style="color: rgba(0, 0, 0, 1)">
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> validate(T value) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> ValidationException {
    Set</span>&lt;ConstraintViolation&lt;T&gt;&gt; constraintViolations =<span style="color: rgba(0, 0, 0, 1)"> validator.validate(value);
    </span><span style="color: rgba(0, 0, 255, 1)">if</span> (constraintViolations.size() &gt; 0<span style="color: rgba(0, 0, 0, 1)">) {
        StringBuilder message </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> StringBuilder();
        </span><span style="color: rgba(0, 0, 255, 1)">for</span> (ConstraintViolation&lt;T&gt;<span style="color: rgba(0, 0, 0, 1)"> constraintViolation: constraintViolations) {
            message.append(constraintViolation.getMessage() </span>+ "\n"<span style="color: rgba(0, 0, 0, 1)">);
        }
        </span><span style="color: rgba(0, 0, 255, 1)">throw</span> <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> ValidationException(message.toString());
    }
}

}

View Code

7、定义监听器:

监听 Job 执行情况,则定义一个类实现 JobExecutorListener,并定义 Job 的 Bean 上绑定该监听器

/**
 * @author jian
 * @date 2019/4/28
 * @description
 * 监听 Job 执行情况,则定义一个类实现 JobExecutorListener,并定义 Job 的 Bean 上绑定该监听器
 */
public class CsvJobListener implements JobExecutionListener {
</span><span style="color: rgba(0, 0, 255, 1)">private</span> Logger logger = LoggerFactory.getLogger(CsvJobListener.<span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">long</span><span style="color: rgba(0, 0, 0, 1)"> startTime;
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">long</span><span style="color: rgba(0, 0, 0, 1)"> endTime;

@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> beforeJob(JobExecution jobExecution) {
    startTime </span>=<span style="color: rgba(0, 0, 0, 1)"> System.currentTimeMillis();
    logger.info(</span>"job process start..."<span style="color: rgba(0, 0, 0, 1)">);
}

@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> afterJob(JobExecution jobExecution) {
    endTime </span>=<span style="color: rgba(0, 0, 0, 1)"> System.currentTimeMillis();
    logger.info(</span>"job process end..."<span style="color: rgba(0, 0, 0, 1)">);
    logger.info(</span>"elapsed time: " + (endTime - startTime) + "ms"<span style="color: rgba(0, 0, 0, 1)">);
}

}

 

三、测试

1、person.csv 文件

csv 文件时以逗号为分隔的数据表示字段,回车表示一行(条)数据记录

1,Zhangsan,21, 男
2,Lisi,22, 女
3,Wangwu,23, 男
4,Zhaoliu,24, 男
5,Zhouqi,25, 女

放在 resources 下,在 ItemReader 中读取的该路径即可

2、person 实体

person.csv 中的字段与之对应,并在该实体中可以添加校验注解,如 @Size 表示该字段的长度范围,如果超过规定。则会被校验检测到,批处理将不会进行!

public class Person implements Serializable {
    private final long serialVersionUID = 1L;
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> String id;
@Size(min </span>= 2, max = 8<span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> String name;
</span><span style="color: rgba(0, 0, 255, 1)">private</span> <span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)"> age;
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> String gender;

</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> String getId() {
    </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> id;
}

</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> setId(String id) {
    </span><span style="color: rgba(0, 0, 255, 1)">this</span>.id =<span style="color: rgba(0, 0, 0, 1)"> id;
}

</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> String getName() {
    </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> name;
}

</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> setName(String name) {
    </span><span style="color: rgba(0, 0, 255, 1)">this</span>.name =<span style="color: rgba(0, 0, 0, 1)"> name;
}

</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)"> getAge() {
    </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> age;
}

</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> setAge(<span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)"> age) {
    </span><span style="color: rgba(0, 0, 255, 1)">this</span>.age =<span style="color: rgba(0, 0, 0, 1)"> age;
}

</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> String getGender() {
    </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> gender;
}

</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> setGender(String gender) {
    </span><span style="color: rgba(0, 0, 255, 1)">this</span>.gender =<span style="color: rgba(0, 0, 0, 1)"> gender;
}

@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> String toString() {
    </span><span style="color: rgba(0, 0, 255, 1)">return</span> "Person{" +
            "id='" + id + '\'' +
            ", name='" + name + '\'' +
            ", age=" + age +
            ", gender='" + gender + '\'' +
            '}'<span style="color: rgba(0, 0, 0, 1)">;
}

}

View Code

3、数据表

CREATE TABLE `person` (
  `id` int(11) NOT NULL,
  `name` varchar(10) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  `gender` varchar(2) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1

一开始表是没有数据的

4、测试类

需要注入发布器,与 job 任务。同时可以使用后置参数灵活处理,最后调用 JobLauncher.run 方法执行批处理任务

@RunWith(SpringRunner.class)
@SpringBootTest
public class BatchTest {
@Autowired
SimpleJobLauncher jobLauncher;

@Autowired
Job importJob;

@Test
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> test() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception{
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 后置参数:使用JobParameters中绑定参数</span>
    JobParameters jobParameters = <span style="color: rgba(0, 0, 255, 1)">new</span> JobParametersBuilder().addLong("time"<span style="color: rgba(0, 0, 0, 1)">, System.currentTimeMillis())
            .toJobParameters();
    jobLauncher.run(importJob, jobParameters);
}

}

5、测试结果

....
2019-05-09 15:23:39.576 INFO 18296 --- [main] com.lijian.test.BatchTest : Started BatchTest in 6.214 seconds (JVM running for 7.185) 2019-05-09 15:23:39.939 INFO 18296 --- [main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=importCsvJob]] launched with the following parameters: [{time=1557386619763}] 2019-05-09 15:23:39.982 INFO 18296 --- [main] com.lijian.config.batch.CsvJobListener : job process start... 2019-05-09 15:23:40.048 INFO 18296 --- [main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step] 2019-05-09 15:23:40.214 INFO 18296 --- [main] c.lijian.config.batch.CvsItemProcessor : processor start validating... 2019-05-09 15:23:40.282 INFO 18296 --- [main] c.lijian.config.batch.CvsItemProcessor : processor end validating... 2019-05-09 15:23:40.283 INFO 18296 --- [main] c.lijian.config.batch.CvsItemProcessor : processor start validating... 2019-05-09 15:23:40.283 INFO 18296 --- [main] c.lijian.config.batch.CvsItemProcessor : processor end validating... 2019-05-09 15:23:40.283 INFO 18296 --- [main] c.lijian.config.batch.CvsItemProcessor : processor start validating... 2019-05-09 15:23:40.283 INFO 18296 --- [main] c.lijian.config.batch.CvsItemProcessor : processor end validating... 2019-05-09 15:23:40.283 INFO 18296 --- [main] c.lijian.config.batch.CvsItemProcessor : processor start validating... 2019-05-09 15:23:40.283 INFO 18296 --- [main] c.lijian.config.batch.CvsItemProcessor : processor end validating... 2019-05-09 15:23:40.283 INFO 18296 --- [main] c.lijian.config.batch.CvsItemProcessor : processor start validating... 2019-05-09 15:23:40.284 INFO 18296 --- [main] c.lijian.config.batch.CvsItemProcessor : processor end validating... 2019-05-09 15:23:40.525 INFO 18296 --- [main] com.lijian.config.batch.CsvJobListener : job process end... 2019-05-09 15:23:40.526 INFO 18296 --- [main] com.lijian.config.batch.CsvJobListener : elapsed time: 543ms 2019-05-09 15:23:40.548 INFO 18296 --- [main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=importCsvJob]] completed with the following parameters: [{time=1557386619763}] and the following status: [COMPLETED] 2019-05-09 15:23:40.564 INFO 18296 --- [Thread-5] com.alibaba.druid.pool.DruidDataSource : {dataSource-1} closed

查看表中数据: select * from person; 

若继续插入数据,并且测试校验器是否生效,则将 person.csv 更改为如下内容:

6,springbatch,24, 男
7,springboot,23, 女

由于实体类中 JSR 校验注解对 name 长度范围进行了检验,即添加了 @Size(min=2, max=8) 的注解。故会报错显示校验不通过,批处理将不会进行。

...
Started BatchTest in 5.494 seconds (JVM running for 6.41)
2019-05-09 15:30:02.147  INFO 20368 --- [main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=importCsvJob]] launched with the following parameters: [{time=1557387001499}]
2019-05-09 15:30:02.247  INFO 20368 --- [main] com.lijian.config.batch.CsvJobListener   : job process start...
2019-05-09 15:30:02.503  INFO 20368 --- [main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step]
2019-05-09 15:30:02.683  INFO 20368 --- [main] c.lijian.config.batch.CvsItemProcessor   : processor start validating...
2019-05-09 15:30:02.761 ERROR 20368 --- [main] o.s.batch.core.step.AbstractStep         : Encountered an error executing step step in job importCsvJob

org.springframework.batch.item.validator.ValidationException: size must be between 2 and 8
...