Spring Batch – Chunk Processors

In the last post we saw basics of chunk based step where we had 1 reader and 1 writer. ItemReader is responsible for input of a step. ItemWriter is responsible for generating any output. We will be using ItemProcessor in this post, which is although optional can be used to transform/validate data from your Reader to Writer. You can have your business logic here and let the Reader,Writer do just their intended part.

Lets start by creating a new maven project with following pom.xml:


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.mynotes.spring.batch</groupId>
<artifactId>batch-chunk-processors</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>batch-basics</name>
<description>Demo project for Spring batch-chunk-processors</description>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.3.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

application.properties for DB


spring.datasource.url=jdbc:mysql://localhost:3306/spring_batch?autoReconnect=true&useSSL=false
spring.datasource.username=root
spring.datasource.password=admin

Our starter class Application.java:


package com.mynotes.spring.batch;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

Lets have 2 entities – XlsData.java


package com.mynotes.spring.batch;

public class XlsData {

private String name;

private int age;

public XlsData(String name, int age) {
super();
this.name = name;
this.age = age;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

}

and Customer.java


package com.mynotes.spring.batch;

import java.util.Calendar;

public class Customer {


private String name;

private int age;

private Calendar creationDate;

public Customer(String name, int age, Calendar creationDate) {
super();
this.name = name;
this.age = age;
this.creationDate = creationDate;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

public Calendar getCreationDate() {
return creationDate;
}

public void setCreationDate(Calendar creationDate) {
this.creationDate = creationDate;
}

@Override
public String toString() {
return "Customer [name=" + name + ", age=" + age + ", creationDate=" + creationDate + "]";
}

}

in our use case the will have Reader generate an input of type Xlxdata, the Processor transformers this data to Customer type and the Writes takes in Customer list and prints them. Lets write our reader first – MyStepReader.java


package com.mynotes.spring.batch;

import java.util.Iterator;
import java.util.List;
import org.springframework.batch.item.ItemReader;

public class MyStepReader implements ItemReader<XlsData> {

private final Iterator<XlsData> dataList;

public MyStepReader(List<XlsData> dataList) {
this.dataList = dataList.iterator();
}

@Override
public XlsData read() throws Exception {
if(this.dataList.hasNext()) {
return this.dataList.next();
}
else {
return null;
}
}
}

Our processor that transform this data. Here we need to implement ItemProcessor  with generics for the type thats comes in and the type that goes out. MyStepProcessor1.java


package com.mynotes.spring.batch;

import java.util.Calendar;

import org.springframework.batch.item.ItemProcessor;

public class MyStepProcessor1 implements ItemProcessor<XlsData, Customer>{

@Override
public Customer process(XlsData item) throws Exception {
Customer customer=new Customer(item.getName(), item.getAge(), Calendar.getInstance());
return customer;
}

}

and our writer – MyStepWriter.java


package com.mynotes.spring.batch;

import java.util.List;
import org.springframework.batch.item.ItemWriter;

public class MyStepWriter implements ItemWriter<Customer> {

@Override
public void write(List<? extends Customer> items) throws Exception {
System.out.println("Writer chunk size: " + items.size());

for (Customer item : items) {
System.out.println("Writer::::::" + item);
}
}
}

Lets write our JOB configuration , where we wire-in all of these – JobConfiguration.java


package com.mynotes.spring.batch;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class JobConfiguration {

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public MyStepReader myReader() {
List<XlsData> dataList = new ArrayList<XlsData>();
dataList.add(new XlsData("aaa", 24));
dataList.add(new XlsData("bbb", 27));
dataList.add(new XlsData("ccc", 28));
dataList.add(new XlsData("ddd", 21));
dataList.add(new XlsData("eee", 30));
return new MyStepReader(dataList);
}

@Bean
public MyStepWriter myWriter() {
return new MyStepWriter();
}

@Bean
public MyStepProcessor1 itemProcessor1() {
return new MyStepProcessor1();
}

@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<XlsData,Customer>chunk(2)
.reader(myReader())
.processor(itemProcessor1())
.writer(myWriter())
.build();
}

@Bean
public Job transitionJob() {
return jobBuilderFactory.get("chunkProcessor1")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
}

As you can see from above, we created Reader with some dummy values. We arranged readers,processors, writers i nour step1 method with chunk size 2. then we made a job with this step. Running this application:

spring-batch-processors1

ItemProcessors can be used for varity of things – validations/filterings/transforming/business logic. If we have to do all of these, we could write a big processor class or we could devide this further by creating multiple processors and chaining them up. This way we can reuse a processor in someother step too. This chaining can be done by CompositeItemProcessor. Lets see this is acton. Creating a filtering processor –  MyStepProcessor2.java


package com.mynotes.spring.batch;

import org.springframework.batch.item.ItemProcessor;

public class MyStepProcessor2 implements ItemProcessor<XlsData, XlsData>{

@Override
public XlsData process(XlsData item) throws Exception {
if(item.getAge()>25){
return new XlsData(item.getName(),item.getAge());
}
return null;
}

}

Above we just returned itms whose age is greater thatn 25. If not null is returned, which indicates spring not to call further prcessors/writes etc. Now chaining this is our JobConfiguration.


package com.mynotes.spring.batch;

import java.util.ArrayList;
import java.util.List;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class JobConfiguration {

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public MyStepReader myReader() {
List<XlsData> dataList = new ArrayList<XlsData>();
dataList.add(new XlsData("aaa", 24));
dataList.add(new XlsData("bbb", 27));
dataList.add(new XlsData("ccc", 28));
dataList.add(new XlsData("ddd", 21));
dataList.add(new XlsData("eee", 30));
return new MyStepReader(dataList);
}

@Bean
public MyStepWriter myWriter() {
return new MyStepWriter();
}

@Bean
public MyStepProcessor1 itemProcessor1() {
return new MyStepProcessor1();
}

@Bean
public CompositeItemProcessor<XlsData, Customer> itemProcessor() throws Exception {

List<ItemProcessor<XlsData, ?>> delegates = new ArrayList<>(2);

delegates.add(new MyStepProcessor2());
delegates.add(new MyStepProcessor1());

CompositeItemProcessor<XlsData, Customer> compositeItemProcessor =
new CompositeItemProcessor<>();

compositeItemProcessor.setDelegates(delegates);
compositeItemProcessor.afterPropertiesSet();

return compositeItemProcessor;
}

@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.<XlsData,Customer>chunk(2)
.reader(myReader())
.processor(itemProcessor())
.writer(myWriter())
.build();
}

@Bean
public Job transitionJob() throws Exception {
return jobBuilderFactory.get("chunkProcessor2")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}

}

Take a look at itemProcessor method which return CompositeItemProcessor<XlsData, Customer>. The generics tells that the input type will be XixData and the output will be Customer. Then we created a List of our processors. We used the ? generics since all the processors dont have the same input-output Type, there is a type swithcing in between. We then added this list to the CompositeItemProcessor and returned. running the application:

spring-batch-processors2

Thats it for processors. Cde for the same can be take from my spring-batch git repository.

Advertisements
%d bloggers like this: