Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update doc for reader-processor #135

Merged
merged 1 commit into from
Feb 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 152 additions & 2 deletions doc/en/step/item-stream-flux-reader-processor-writer.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
- [Create a tasklet without a processor](#create-a-tasklet-without-a-processor)
- [Java](#java-1)
- [Kotlin](#kotlin-1)
- [Use a callback](#use-a-callback)
- [Create a tasklet without a writer](#create-a-tasklet-without-a-writer)
- [Java](#java-2)
- [Kotlin](#kotlin-2)
- [Use a callback](#use-a-callback)
- [Java](#java-3)
- [Kotlin](#kotlin-3)

Spring uses a reactive library called [Reactor](https://projectreactor.io/), which provides streams for various types of data using `Flux`. For `ItemReader` in Spring Batch to use the data read using `Flux`, you need to extract each item from `Flux` and return it.

Expand Down Expand Up @@ -326,9 +329,156 @@ open class TestJobConfig(
}
```

## Create a tasklet without a writer

If you need only `ItemStreamReader` and `ItemProcessor` without a writer, you can inherit `ItemStreamFluxReaderProcessor` to define `ItemStreamReader` and `ItemProcessor` in a single class.

### Java

In Java, you can convert a tasklet defined using `AdapterFactory` to `ItemStreamReader` and `ItemProcessor`.

```java
@Component
@StepScope
class SampleTasklet implements ItemStreamFluxReaderProcessor<Integer, String> {

@Value("#{jobParameters['totalCount']}")
private long totalCount;

private int count = 0;

@NonNull
@Override
public Flux<? extends Integer> readFlux(@NonNull ExecutionContext executionContext) {
System.out.println("totalCount: " + totalCount);
return Flux.generate(sink -> {
if (count < totalCount) {
sink.next(count);
++count;
} else {
sink.complete();
}
});
}

@Override
public String process(@NonNull Integer item) {
return "'" + item.toString() + "'";
}
}
```

```java
@Configuration
public class TestJobConfig {

@Bean
public Job testJob(
SampleTasklet sampleTasklet,
JobRepository jobRepository,
PlatformTransactionManager transactionManager
) {
return new JobBuilder("testJob", jobRepository)
.start(
new StepBuilder("testStep", jobRepository)
.<Integer, String>chunk(3, transactionManager)
.reader(AdapterFactory.itemStreamReader(sampleTasklet))
.processor(AdapterFactory.itemProcessor(sampleTasklet))
.writer(chunk -> System.out.println(chunk.getItems()))
.build()
)
.build();
}
}
```

You can statically import the method of `AdapterFactory` for better readability.

```java
import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamReader;
import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamWriter;

...

@Configuration
public class TestJobConfig {

@Bean
public Job testJob(
SampleTasklet sampleTasklet,
JobRepository jobRepository,
PlatformTransactionManager transactionManager
) {
return new JobBuilder("testJob", jobRepository)
.start(
new StepBuilder("testStep", jobRepository)
.<Integer, String>chunk(3, transactionManager)
.reader(itemStreamReader(sampleTasklet))
.processor(itemProcessor(sampleTasklet))
.writer(chunk -> System.out.println(chunk.getItems()))
.build()
)
.build();
}
}
```

### Kotlin

In Kotlin, you can easily convert a tasklet defined using an extension function in Spring Batch Plus to `ItemStreamReader` and `ItemProcessor`.

```Kotlin
@Component
@StepScope
open class SampleTasklet(
@Value("#{jobParameters['totalCount']}") private var totalCount: Long,
) : ItemStreamFluxReaderProcessor<Int, String> {
private var count = 0

override fun readFlux(executionContext: ExecutionContext): Flux<out Int> {
println("totalCount: $totalCount")
return Flux.generate { sink ->
if (count < totalCount) {
sink.next(count)
++count
} else {
sink.complete()
}
}
}

override fun process(item: Int): String? {
return "'$item'"
}
}
```

```Kotlin
@Configuration
open class TestJobConfig(
private val batch: BatchDsl,
private val transactionManager: PlatformTransactionManager,
) {
@Bean
open fun testJob(
sampleTasklet: SampleTasklet,
): Job = batch {
job("testJob") {
step("testStep") {
chunk<Int, String>(3, transactionManager) {
reader(sampleTasklet.asItemStreamReader())
processor(sampleTasklet.asItemProcessor())
writer { chunk -> println(chunk.items) }
}
}
}
}
}
```

## Use a callback

You can define a callback method for `ItemStream` of `ItemStreamWriter` in `ItemStreamFluxReaderProcessorWriter` and `ItemStreamFluxReaderWriter`. You can selectively define a callback method.
You can define a callback method for `ItemStream`. You can selectively define a callback method.

### Java

Expand Down
160 changes: 158 additions & 2 deletions doc/en/step/item-stream-iterable-reader-processor-writer.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
- [Create a tasklet without a processor](#create-a-tasklet-without-a-processor)
- [Java](#java-1)
- [Kotlin](#kotlin-1)
- [Use a callback](#use-a-callback)
- [Create a tasklet without a writer](#create-a-tasklet-without-a-writer)
- [Java](#java-2)
- [Kotlin](#kotlin-2)
- [Use a callback](#use-a-callback)
- [Java](#java-3)
- [Kotlin](#kotlin-3)

A chunk-oriented step in Spring Batch consists of `ItemReader`, `ItemProcessor`, and `ItemWriter`, which are usually defined separately and then assembled to define a step. However, there are some issues with this approach: it is difficult to share data between `ItemReader`, `ItemProcessor`, and `ItemWriter`, and you need to see each respective file to understand the batch flow. Also, if the classes are not reused, they can make the elements of a job less coherent.

Expand Down Expand Up @@ -328,9 +331,162 @@ open class TestJobConfig(
}
```

## Create a tasklet without a writer

If you need only `ItemStreamReader` and `ItemProcessor` without a writer, you can inherit `ItemStreamIterableReaderProcessor` to define `ItemStreamReader` and `ItemProcessor` in a single class.

### Java

In Java, you can convert a tasklet defined using `AdapterFactory` to `ItemStreamReader` and `ItemProcessor`.

```java
@Component
@StepScope
class SampleTasklet implements ItemStreamIterableReaderProcessor<Integer, String> {

@Value("#{jobParameters['totalCount']}")
private long totalCount;

private int count = 0;

@NonNull
@Override
public Iterable<? extends Integer> readIterable(@NonNull ExecutionContext executionContext) {
System.out.println("totalCount: " + totalCount);
return () -> new Iterator<>() {
@Override
public boolean hasNext() {
return count < totalCount;
}

@Override
public Integer next() {
return count++;
}
};
}

@Override
public String process(@NonNull Integer item) {
return "'" + item.toString() + "'";
}
}
```

```java
@Configuration
public class TestJobConfig {

@Bean
public Job testJob(
SampleTasklet sampleTasklet,
JobRepository jobRepository,
PlatformTransactionManager transactionManager
) {
return new JobBuilder("testJob", jobRepository)
.start(
new StepBuilder("testStep", jobRepository)
.<Integer, String>chunk(3, transactionManager)
.reader(AdapterFactory.itemStreamReader(sampleTasklet))
.processor(AdapterFactory.itemProcessor(sampleTasklet))
.writer(chunk -> System.out.println(chunk.getItems()))
.build()
)
.build();
}
}
```

You can statically import the method of `AdapterFactory` for better readability.

```java
import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamReader;
import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamWriter;

...

@Configuration
public class TestJobConfig {

@Bean
public Job testJob(
SampleTasklet sampleTasklet,
JobRepository jobRepository,
PlatformTransactionManager transactionManager
) {
return new JobBuilder("testJob", jobRepository)
.start(
new StepBuilder("testStep", jobRepository)
.<Integer, String>chunk(3, transactionManager)
.reader(itemStreamReader(sampleTasklet))
.processor(itemProcessor(sampleTasklet))
.writer(chunk -> System.out.println(chunk.getItems()))
.build()
)
.build();
}
}
```

### Kotlin

In Kotlin, you can easily convert a tasklet defined using an extension function in Spring Batch Plus to `ItemStreamReader` and `ItemProcessor`.

```Kotlin
@Component
@StepScope
open class SampleTasklet(
@Value("#{jobParameters['totalCount']}") private var totalCount: Long,
) : ItemStreamIterableReaderProcessor<Int, String> {
private var count = 0

override fun readIterable(executionContext: ExecutionContext): Iterable<Int> {
println("totalCount: $totalCount")
return Iterable {
object : Iterator<Int> {
override fun hasNext(): Boolean {
return count < totalCount
}

override fun next(): Int {
return count++
}
}
}
}

override fun process(item: Int): String? {
return "'$item'"
}
}
```

```Kotlin
@Configuration
open class TestJobConfig(
private val batch: BatchDsl,
private val transactionManager: PlatformTransactionManager,
) {
@Bean
open fun testJob(
sampleTasklet: SampleTasklet,
): Job = batch {
job("testJob") {
step("testStep") {
chunk<Int, String>(3, transactionManager) {
reader(sampleTasklet.asItemStreamReader())
processor(sampleTasklet.asItemProcessor())
writer { chunk -> println(chunk.items) }
}
}
}
}
}
```

## Use a callback

You can define a callback method for `ItemStream` of `ItemStreamWriter` in `ItemStreamIterableReaderProcessorWriter` and `ItemStreamIterableReaderWriter`. You can selectively define a callback method.
You can define a callback method for `ItemStream`. You can selectively define a callback method.

### Java

Expand Down
Loading