How can we share data between the different steps of a Job in Spring Batch?

Digging into Spring Batch, I'd like to know as to How can we share data between the different steps of a Job?

Can we use JobRepository for this? If yes, how can we do that?

Is there any other way of doing/achieving the same?

128978 次浏览

From a step, you can put data into the StepExecutionContext. Then, with a listener, you can promote data from StepExecutionContext to JobExecutionContext.

This JobExecutionContext is available in all the following steps.

Becareful : data must be short. These contexts are saved in the JobRepository by serialization and the length is limited (2500 chars if I remember well).

So these contexts are good to share strings or simple values, but not for sharing collections or huge amounts of data.

Sharing huge amounts of data is not the philosophy of Spring Batch. Spring Batch is a set of distinct actions, not a huge Business processing unit.

the job repository is used indirectly for passing data between steps (Jean-Philippe is right that the best way to do that is to put data into the StepExecutionContext and then use the verbosely named ExecutionContextPromotionListener to promote the step execution context keys to the JobExecutionContext.

It's helpful to note that there is a listener for promoting JobParameter keys to a StepExecutionContext as well (the even more verbosely named JobParameterExecutionContextCopyListener); you will find that you use these a lot if your job steps aren't completely independent of one another.

Otherwise you're left passing data between steps using even more elaborate schemes, like JMS queues or (heaven forbid) hard-coded file locations.

As to the size of data that is passed in the context, I would also suggest that you keep it small (but I haven't any specifics on the

You can use a Java Bean Object

  1. Execute one step
  2. Store the result in the Java object
  3. Next step will refer the same java object to get the result stored by step 1

In this way you can store a huge collection of data if you want

Here is what I did to save an object which is accessible through out the steps.

  1. Created a listener for setting the object in job context
@Component("myJobListener")
public class MyJobListener implements JobExecutionListener {


public void beforeJob(JobExecution jobExecution) {


String myValue = someService.getValue();
jobExecution.getExecutionContext().putString("MY_VALUE", myValue);
}
}
  1. Defined the listener in the job context
<listeners>
<listener ref="myJobListener"/>
</listeners>
  1. Consumed the value in step using BeforeStep annotation
@BeforeStep
public void initializeValues(StepExecution stepExecution) {


String value = stepExecution.getJobExecution().getExecutionContext().getString("MY_VALUE");


}

I would say you have 3 options:

  1. Use StepContext and promote it to JobContext and you have access to it from each step, you must as noted obey limit in size
  2. Create @JobScope bean and add data to that bean, @Autowire it where needed and use it (drawback is that it is in-memory structure and if job fails data is lost, migh cause problems with restartability)
  3. We had larger datasets needed to be processed across steps (read each line in csv and write to DB, read from DB, aggregate and send to API) so we decided to model data in new table in same DB as spring batch meta tables, keep ids in JobContext and access when needed and delete that temporary table when job finishes successfully.

I was given a task to invoke the batch job one by one.Each job depends on another. First job result needs to execute the consequent job program. I was searching how to pass the data after job execution. I found that this ExecutionContextPromotionListener comes in handy.

1) I have added a bean for "ExecutionContextPromotionListener" like below

@Bean
public ExecutionContextPromotionListener promotionListener()
{
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys( new String[] { "entityRef" } );
return listener;
}

2) Then I attached one of the listener to my Steps

Step step = builder.faultTolerant()
.skipPolicy( policy )
.listener( writer )
.listener( promotionListener() )
.listener( skiplistener )
.stream( skiplistener )
.build();

3) I have added stepExecution as a reference in my Writer step implementation and populated in the Beforestep

@BeforeStep
public void saveStepExecution( StepExecution stepExecution )
{
this.stepExecution = stepExecution;
}

4) in the end of my writer step, i populated the values in the stepexecution as the keys like below

lStepContext.put( "entityRef", lMap );

5) After the job execution, I retrieved the values from the lExecution.getExecutionContext() and populated as job response.

6) from the job response object, I will get the values and populate the required values in the rest of the jobs.

The above code is for promoting the data from the steps to ExecutionContext using ExecutionContextPromotionListener. It can done for in any steps.

Use ExecutionContextPromotionListener:

public class YourItemWriter implements ItemWriter<Object> {
private StepExecution stepExecution;
public void write(List<? extends Object> items) throws Exception {
// Some Business Logic


// put your data into stepexecution context
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("someKey", someObject);
}
@BeforeStep
public void saveStepExecution(Final StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}

Now you need to add promotionListener to your job

@Bean
public Step step1() {
return stepBuilder
.get("step1")<Company,Company>  chunk(10)
.reader(reader()).processor(processor()).writer(writer())
.listener(promotionListener()).build();
}


@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"someKey"});
listener.setStrict(true);
return listener;
}

Now, in step2 get your data from job ExecutionContext

public class RetrievingItemWriter implements ItemWriter<Object> {
private Object someObject;
public void write(List<? extends Object> items) throws Exception {
// ...
}
@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
this.someObject = jobContext.get("someKey");
}
}

If you are working with tasklets, then use the following to get or put ExecutionContext

List<YourObject> yourObjects = (List<YourObject>) chunkContent.getStepContext().getJobExecutionContext().get("someKey");

You can store data in the simple object. Like:

AnyObject yourObject = new AnyObject();


public Job build(Step step1, Step step2) {
return jobBuilderFactory.get("jobName")
.incrementer(new RunIdIncrementer())
.start(step1)
.next(step2)
.build();
}


public Step step1() {
return stepBuilderFactory.get("step1Name")
.<Some, Any> chunk(someInteger1)
.reader(itemReader1())
.processor(itemProcessor1())
.writer(itemWriter1(yourObject))
.build();
}


public Step step2() {
return stepBuilderFactory.get("step2Name")
.<Some, Any> chunk(someInteger2)
.reader(itemReader2())
.processor(itemProcessor2(yourObject))
.writer(itemWriter2())
.build();
}

Just add data to object in the writer or any other method and get it in any stage of next step

As Nenad Bozic said in his 3rd option, use temp tables to share the data between steps, using context to share also does same thing, it writes to table and loads back in next step, but if you write into temp tables you can clean at the end of job.

Another very simply approach, leaving here for future reference:

class MyTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) {
getExecutionContext.put("foo", "bar");
}
}

and

class MyOtherTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) {
getExecutionContext.get("foo");
}
}

getExecutionContext here is:

ExecutionContext getExecutionContext(ChunkContext chunkContext) {
return chunkContext.getStepContext()
.getStepExecution()
.getJobExecution()
.getExecutionContext();
}

Put it in a super class, in an interface as a default method, or simply paste in your Tasklets.

Spring Batch creates metadata tables for itself (like batch_job_execution, batch_job_execution_context, batch_step_instance, etc).

And I have tested (using postgres DB) that you can have at least 51,428 chars worth of data in one column (batch_job_execution_context.serialized_content). It could be more, it is just how much I tested.

When you are using Tasklets for your step (like class MyTasklet implements Tasklet) and override the RepeatStatus method in there, you have immediate access to ChunkContext.

class MyTasklet implements Tasklet {


@Override
public RepeatStatus execute(@NonNull StepContribution contribution,
@NonNull ChunkContext chunkContext) {
List<MyObject> myObjects = getObjectsFromSomewhereAndUseThemInNextStep();
chunkContext.getStepContext().getStepExecution()
.getJobExecution()
.getExecutionContext()
.put("mydatakey", myObjects);
}
}

And now you have another step with a different Tasklet where you can access those objects

class MyOtherTasklet implements Tasklet {


@Override
public RepeatStatus execute(@NonNull StepContribution contribution,
@NonNull ChunkContext chunkContext) {
List<MyObject> myObjects = (List<MyObject>)
chunkContext.getStepContext().getStepExecution()
.getJobExecution()
.getExecutionContext()
.get("mydatakey");
}
}

Or if you dont have a Tasklet and have like a Reader/Writer/Processor, then

class MyReader implements ItemReader<MyObject> {


@Value("#{jobExecutionContext['mydatakey']}")
List<MyObject> myObjects;
// And now myObjects are available in here


@Override
public MyObject read() throws Exception {


}
}

Simple solution using Tasklets. No need to access the execution context. I used a map as the data element to move around. (Kotlin code.)

Tasklet

class MyTasklet : Tasklet {


lateinit var myMap: MutableMap<String, String>


override fun execute(contribution: StepContribution, chunkContext: ChunkContext): RepeatStatus? {
myMap.put("key", "some value")
return RepeatStatus.FINISHED
}


}

Batch configuration

@Configuration
@EnableBatchProcessing
class BatchConfiguration {


@Autowired
lateinit var jobBuilderFactory: JobBuilderFactory


@Autowired
lateinit var stepBuilderFactory: StepBuilderFactory


var myMap: MutableMap<String, String> = mutableMapOf()


@Bean
fun jobSincAdUsuario(): Job {
return jobBuilderFactory
.get("my-SO-job")
.incrementer(RunIdIncrementer())
.start(stepMyStep())
.next(stepMyOtherStep())
.build()
}


@Bean
fun stepMyStep() = stepBuilderFactory.get("MyTaskletStep")
.tasklet(myTaskletAsBean())
.build()


@Bean
fun myTaskletAsBean(): MyTasklet {
val tasklet = MyTasklet()
tasklet.myMap = myMap      // collection gets visible in the tasklet
return tasklet
}
}

Then in MyOtherStep you can replicate the same idiom seen in MyStep. This other Tasklet will see the data created in MyStep.

Important:

  • tasklets are created via a @Bean fun so that they can use @Autowired (full explanation).
  • for a more robust implementation, the tasklet should implement InitializingBean with
override fun afterPropertiesSet() {
Assert.notNull(myMap, "myMap must be set before calling the tasklet")
}