diff --git a/spring-batch-plus/src/integrationTest/java/com/navercorp/spring/batch/plus/item/adapter/ItemStreamReaderProcessorWriterIntegrationTest.java b/spring-batch-plus/src/integrationTest/java/com/navercorp/spring/batch/plus/item/adapter/ItemStreamReaderProcessorWriterIntegrationTest.java index 979ce2c4..9db9bbff 100644 --- a/spring-batch-plus/src/integrationTest/java/com/navercorp/spring/batch/plus/item/adapter/ItemStreamReaderProcessorWriterIntegrationTest.java +++ b/spring-batch-plus/src/integrationTest/java/com/navercorp/spring/batch/plus/item/adapter/ItemStreamReaderProcessorWriterIntegrationTest.java @@ -24,11 +24,14 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; import javax.sql.DataSource; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.RepeatedTest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; @@ -53,10 +56,15 @@ import reactor.core.publisher.Flux; +@SuppressWarnings("unchecked") class ItemStreamReaderProcessorWriterIntegrationTest { + private static final int TEST_REPEAT_COUNT = 5; + + private static final Logger logger = LoggerFactory.getLogger(ItemStreamReaderProcessorWriterIntegrationTest.class); + private static int onOpenReadCallCount = 0; - private static int readFluxCallCount = 0; + private static int readContextCallCount = 0; private static int onUpdateReadCallCount = 0; private static int onCloseReadCallCount = 0; @@ -67,10 +75,14 @@ class ItemStreamReaderProcessorWriterIntegrationTest { private static int onUpdateWriteCallCount = 0; private static int onCloseWriteCallCount = 0; + private static int itemCount = 0; + private static int chunkCount = 0; + private static int expectedWriteCount = 0; + @BeforeEach void beforeEach() { onOpenReadCallCount = 0; - readFluxCallCount = 0; + readContextCallCount = 0; onUpdateReadCallCount = 0; onCloseReadCallCount = 0; @@ -80,10 +92,15 @@ void beforeEach() { writeCallCount = 0; onUpdateWriteCallCount = 0; onCloseWriteCallCount = 0; + + itemCount = ThreadLocalRandom.current().nextInt(10, 100); + chunkCount = ThreadLocalRandom.current().nextInt(1, 10); + expectedWriteCount = (int)Math.ceil((double)itemCount / (double)chunkCount); + + logger.debug("itemCount: {}, chunkCount: {}, writeCount: {}", itemCount, chunkCount, expectedWriteCount); } - @SuppressWarnings("unchecked") - @Test + @RepeatedTest(TEST_REPEAT_COUNT) void testReaderProcessorWriter() throws Exception { // given AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(TestConfiguration.class); @@ -94,7 +111,7 @@ void testReaderProcessorWriter() throws Exception { Job job = new JobBuilder("testJob", jobRepository) .start( new StepBuilder("testStep", jobRepository) - .chunk(3, new ResourcelessTransactionManager()) + .chunk(chunkCount, new ResourcelessTransactionManager()) .reader(itemStreamReader(testTasklet)) .processor(itemProcessor(testTasklet)) .writer(itemStreamWriter(testTasklet)) @@ -102,42 +119,74 @@ void testReaderProcessorWriter() throws Exception { ) .build(); JobLauncher jobLauncher = context.getBean(JobLauncher.class); + int beforeRepeatCount = ThreadLocalRandom.current().nextInt(0, 3); + for (int i = 0; i < beforeRepeatCount; ++i) { + JobParameters jobParameters = new JobParametersBuilder() + .addString(UUID.randomUUID().toString(), UUID.randomUUID().toString()) + .toJobParameters(); + jobLauncher.run(job, jobParameters); + } + logger.debug("beforeRepeatCount: {}", beforeRepeatCount); - // when, then - JobParameters jobParameters1 = new JobParametersBuilder() - .addString("test", UUID.randomUUID().toString()) + // when + JobParameters jobParameters = new JobParametersBuilder() + .addString(UUID.randomUUID().toString(), UUID.randomUUID().toString()) .toJobParameters(); - JobExecution jobExecution1 = jobLauncher.run(job, jobParameters1); - assertThat(jobExecution1.getStatus()).isEqualTo(BatchStatus.COMPLETED); - assertThat(onOpenReadCallCount).isEqualTo(1); - assertThat(readFluxCallCount).isEqualTo(1); - assertThat(onUpdateReadCallCount).isEqualTo(8); - assertThat(onCloseReadCallCount).isEqualTo(1); - assertThat(processCallCount).isEqualTo(20); - assertThat(onOpenWriteCallCount).isEqualTo(1); - assertThat(writeCallCount).isEqualTo(7); // ceil(20/3) - assertThat(onUpdateWriteCallCount).isEqualTo(8); - assertThat(onCloseWriteCallCount).isEqualTo(1); - - // when, then - JobParameters jobParameters2 = new JobParametersBuilder() - .addString("test", UUID.randomUUID().toString()) + JobExecution jobExecution = jobLauncher.run(job, jobParameters); + + // then + assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED); + assertThat(onOpenReadCallCount).isEqualTo(beforeRepeatCount + 1); + assertThat(readContextCallCount).isEqualTo(beforeRepeatCount + 1); + assertThat(onUpdateReadCallCount).isGreaterThanOrEqualTo(beforeRepeatCount + 1); + assertThat(onCloseReadCallCount).isEqualTo(beforeRepeatCount + 1); + assertThat(onOpenWriteCallCount).isEqualTo(beforeRepeatCount + 1); + assertThat(onUpdateWriteCallCount).isGreaterThanOrEqualTo(beforeRepeatCount + 1); + assertThat(onCloseWriteCallCount).isEqualTo(beforeRepeatCount + 1); + } + + @RepeatedTest(TEST_REPEAT_COUNT) + void testReaderProcessorWriterWithSameTaskletShouldKeepContext() throws Exception { + // given + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(TestConfiguration.class); + JobRepository jobRepository = context.getBean(JobRepository.class); + ItemStreamReaderProcessorWriter testTasklet = context.getBean( + "testTasklet", + ItemStreamReaderProcessorWriter.class); + Job job = new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(chunkCount, new ResourcelessTransactionManager()) + .reader(itemStreamReader(testTasklet)) + .processor(itemProcessor(testTasklet)) + .writer(itemStreamWriter(testTasklet)) + .build() + ) + .build(); + JobLauncher jobLauncher = context.getBean(JobLauncher.class); + int beforeRepeatCount = ThreadLocalRandom.current().nextInt(0, 3); + for (int i = 0; i < beforeRepeatCount; ++i) { + JobParameters jobParameters = new JobParametersBuilder() + .addString(UUID.randomUUID().toString(), UUID.randomUUID().toString()) + .toJobParameters(); + jobLauncher.run(job, jobParameters); + } + logger.debug("beforeRepeatCount: {}", beforeRepeatCount); + + // when + JobParameters jobParameters = new JobParametersBuilder() + .addString(UUID.randomUUID().toString(), UUID.randomUUID().toString()) .toJobParameters(); - JobExecution jobExecution2 = jobLauncher.run(job, jobParameters2); - assertThat(jobExecution2.getStatus()).isEqualTo(BatchStatus.COMPLETED); - assertThat(onOpenReadCallCount).isEqualTo(1 + 1); - assertThat(readFluxCallCount).isEqualTo(1 + 1); - assertThat(onUpdateReadCallCount).isEqualTo(8 + 2); - assertThat(onCloseReadCallCount).isEqualTo(1 + 1); - assertThat(processCallCount).isEqualTo(20); // same as previous since it's not step scoped - assertThat(onOpenWriteCallCount).isEqualTo(1 + 1); - assertThat(writeCallCount).isEqualTo(7); // same as previous since it's not step scoped - assertThat(onUpdateWriteCallCount).isEqualTo(8 + 2); - assertThat(onCloseWriteCallCount).isEqualTo(1 + 1); + JobExecution jobExecution = jobLauncher.run(job, jobParameters); + + // then + assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED); + // it's not changed since it keeps 'count' in a bean + assertThat(processCallCount).isEqualTo(itemCount); + assertThat(writeCallCount).isEqualTo(expectedWriteCount); } - @SuppressWarnings("unchecked") - @Test + @RepeatedTest(TEST_REPEAT_COUNT) void testStepScopeReaderProcessorWriter() throws Exception { // given AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(TestConfiguration.class); @@ -148,7 +197,7 @@ void testStepScopeReaderProcessorWriter() throws Exception { Job job = new JobBuilder("testJob", jobRepository) .start( new StepBuilder("testStep", jobRepository) - .chunk(3, new ResourcelessTransactionManager()) + .chunk(chunkCount, new ResourcelessTransactionManager()) .reader(itemStreamReader(testTasklet)) .processor(itemProcessor(testTasklet)) .writer(itemStreamWriter(testTasklet)) @@ -156,53 +205,44 @@ void testStepScopeReaderProcessorWriter() throws Exception { ) .build(); JobLauncher jobLauncher = context.getBean(JobLauncher.class); + int beforeRepeatCount = ThreadLocalRandom.current().nextInt(0, 3); + for (int i = 0; i < beforeRepeatCount; ++i) { + JobParameters jobParameters = new JobParametersBuilder() + .addString(UUID.randomUUID().toString(), UUID.randomUUID().toString()) + .toJobParameters(); + jobLauncher.run(job, jobParameters); + } + logger.debug("beforeRepeatCount: {}", beforeRepeatCount); - // when, then - JobParameters jobParameters1 = new JobParametersBuilder() - .addString("test", UUID.randomUUID().toString()) - .toJobParameters(); - JobExecution jobExecution1 = jobLauncher.run(job, jobParameters1); - assertThat(jobExecution1.getStatus()).isEqualTo(BatchStatus.COMPLETED); - assertThat(onOpenReadCallCount).isEqualTo(1); - assertThat(readFluxCallCount).isEqualTo(1); - assertThat(onUpdateReadCallCount).isEqualTo(8); - assertThat(onCloseReadCallCount).isEqualTo(1); - assertThat(processCallCount).isEqualTo(20); - assertThat(onOpenWriteCallCount).isEqualTo(1); - assertThat(writeCallCount).isEqualTo(7); // ceil(20/3) - assertThat(onUpdateWriteCallCount).isEqualTo(8); - assertThat(onCloseWriteCallCount).isEqualTo(1); - - // when, then - JobParameters jobParameters2 = new JobParametersBuilder() - .addString("test", UUID.randomUUID().toString()) + // when + JobParameters jobParameters = new JobParametersBuilder() + .addString(UUID.randomUUID().toString(), UUID.randomUUID().toString()) .toJobParameters(); - JobExecution jobExecution2 = jobLauncher.run(job, jobParameters2); - assertThat(jobExecution2.getStatus()).isEqualTo(BatchStatus.COMPLETED); - assertThat(onOpenReadCallCount).isEqualTo(1 + 1); - assertThat(readFluxCallCount).isEqualTo(1 + 1); - assertThat(onUpdateReadCallCount).isEqualTo(8 + 8); - assertThat(onCloseReadCallCount).isEqualTo(1 + 1); - assertThat(processCallCount).isEqualTo(20 + 20); - assertThat(onOpenWriteCallCount).isEqualTo(1 + 1); - assertThat(writeCallCount).isEqualTo(7 + 7); // ceil(20/3) * 2 - assertThat(onUpdateWriteCallCount).isEqualTo(8 + 8); - assertThat(onCloseWriteCallCount).isEqualTo(1 + 1); + JobExecution jobExecution = jobLauncher.run(job, jobParameters); + + // then + assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED); + assertThat(onOpenReadCallCount).isEqualTo(beforeRepeatCount + 1); + assertThat(readContextCallCount).isEqualTo(beforeRepeatCount + 1); + assertThat(onUpdateReadCallCount).isGreaterThanOrEqualTo(beforeRepeatCount + 1); + assertThat(onCloseReadCallCount).isEqualTo(beforeRepeatCount + 1); + assertThat(onOpenWriteCallCount).isEqualTo(beforeRepeatCount + 1); + assertThat(onUpdateWriteCallCount).isGreaterThanOrEqualTo(beforeRepeatCount + 1); + assertThat(onCloseWriteCallCount).isEqualTo(beforeRepeatCount + 1); } - @SuppressWarnings("unchecked") - @Test - void testReaderProcessorWriterWithRequiredMethodsOnly() throws Exception { + @RepeatedTest(TEST_REPEAT_COUNT) + void testStepScopeReaderProcessorWriterWithSameTaskletShouldNotKeepCountContext() throws Exception { // given AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(TestConfiguration.class); JobRepository jobRepository = context.getBean(JobRepository.class); ItemStreamReaderProcessorWriter testTasklet = context.getBean( - "testTaskletWithOnlyRequiredMethodsOnly", + "stepScopeTestTasklet", ItemStreamReaderProcessorWriter.class); Job job = new JobBuilder("testJob", jobRepository) .start( new StepBuilder("testStep", jobRepository) - .chunk(3, new ResourcelessTransactionManager()) + .chunk(chunkCount, new ResourcelessTransactionManager()) .reader(itemStreamReader(testTasklet)) .processor(itemProcessor(testTasklet)) .writer(itemStreamWriter(testTasklet)) @@ -210,22 +250,26 @@ void testReaderProcessorWriterWithRequiredMethodsOnly() throws Exception { ) .build(); JobLauncher jobLauncher = context.getBean(JobLauncher.class); + int beforeRepeatCount = ThreadLocalRandom.current().nextInt(0, 3); + for (int i = 0; i < beforeRepeatCount; ++i) { + JobParameters jobParameters = new JobParametersBuilder() + .addString(UUID.randomUUID().toString(), UUID.randomUUID().toString()) + .toJobParameters(); + jobLauncher.run(job, jobParameters); + } + logger.debug("beforeRepeatCount: {}", beforeRepeatCount); - // when, then - JobParameters jobParameters1 = new JobParametersBuilder() - .addString("test", UUID.randomUUID().toString()) + // when + JobParameters jobParameters = new JobParametersBuilder() + .addString(UUID.randomUUID().toString(), UUID.randomUUID().toString()) .toJobParameters(); - JobExecution jobExecution1 = jobLauncher.run(job, jobParameters1); - assertThat(jobExecution1.getStatus()).isEqualTo(BatchStatus.COMPLETED); - assertThat(onOpenReadCallCount).isEqualTo(0); - assertThat(readFluxCallCount).isEqualTo(1); - assertThat(onUpdateReadCallCount).isEqualTo(0); - assertThat(onCloseReadCallCount).isEqualTo(0); - assertThat(processCallCount).isEqualTo(20); - assertThat(onOpenWriteCallCount).isEqualTo(0); - assertThat(writeCallCount).isEqualTo(7); // ceil(20/3) - assertThat(onUpdateWriteCallCount).isEqualTo(0); - assertThat(onCloseWriteCallCount).isEqualTo(0); + JobExecution jobExecution = jobLauncher.run(job, jobParameters); + + // then + assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED); + // 'count' field is isolated per job instances since it is step scoped. so count is 0 for all job instances + assertThat(processCallCount).isEqualTo(beforeRepeatCount * itemCount + itemCount); + assertThat(writeCallCount).isEqualTo(beforeRepeatCount * expectedWriteCount + expectedWriteCount); } @SuppressWarnings("unused") @@ -251,7 +295,7 @@ DataSource metadataDataSource() { @Bean ItemStreamReaderProcessorWriter testTasklet() { - return new ItemStreamReaderProcessorWriter() { + return new ItemStreamReaderProcessorWriter<>() { private int count = 0; @@ -263,9 +307,9 @@ public void onOpenRead(@NonNull ExecutionContext executionContext) { @NonNull @Override public Flux readFlux(@NonNull ExecutionContext executionContext) { - ++readFluxCallCount; + ++readContextCallCount; return Flux.generate(sink -> { - if (count < 20) { + if (count < itemCount) { sink.next(count); ++count; } else { @@ -315,7 +359,7 @@ public void onCloseWrite() { @Bean @StepScope ItemStreamReaderProcessorWriter stepScopeTestTasklet() { - return new ItemStreamReaderProcessorWriter() { + return new ItemStreamReaderProcessorWriter<>() { private int count = 0; @@ -327,9 +371,9 @@ public void onOpenRead(@NonNull ExecutionContext executionContext) { @NonNull @Override public Flux readFlux(@NonNull ExecutionContext executionContext) { - ++readFluxCallCount; + ++readContextCallCount; return Flux.generate(sink -> { - if (count < 20) { + if (count < itemCount) { sink.next(count); ++count; } else { @@ -349,14 +393,14 @@ public void onCloseRead() { } @Override - public Integer process(@NonNull Integer item) { - ++processCallCount; - return item; + public void onOpenWrite(@NonNull ExecutionContext executionContext) { + ++onOpenWriteCallCount; } @Override - public void onOpenWrite(@NonNull ExecutionContext executionContext) { - ++onOpenWriteCallCount; + public Integer process(@NonNull Integer item) { + ++processCallCount; + return item; } @Override @@ -375,38 +419,5 @@ public void onCloseWrite() { } }; } - - @Bean - ItemStreamReaderProcessorWriter testTaskletWithOnlyRequiredMethodsOnly() { - return new ItemStreamReaderProcessorWriter() { - - private int count = 0; - - @NonNull - @Override - public Flux readFlux(@NonNull ExecutionContext executionContext) { - ++readFluxCallCount; - return Flux.generate(sink -> { - if (count < 20) { - sink.next(count); - ++count; - } else { - sink.complete(); - } - }); - } - - @Override - public Integer process(@NonNull Integer item) { - ++processCallCount; - return item; - } - - @Override - public void write(@NonNull Chunk chunk) { - ++writeCallCount; - } - }; - } } } diff --git a/spring-batch-plus/src/integrationTest/java/com/navercorp/spring/batch/plus/item/adapter/ItemStreamReaderWriterIntegrationTest.java b/spring-batch-plus/src/integrationTest/java/com/navercorp/spring/batch/plus/item/adapter/ItemStreamReaderWriterIntegrationTest.java index 0012fbc0..8787e963 100644 --- a/spring-batch-plus/src/integrationTest/java/com/navercorp/spring/batch/plus/item/adapter/ItemStreamReaderWriterIntegrationTest.java +++ b/spring-batch-plus/src/integrationTest/java/com/navercorp/spring/batch/plus/item/adapter/ItemStreamReaderWriterIntegrationTest.java @@ -23,11 +23,14 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; import javax.sql.DataSource; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.RepeatedTest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; @@ -52,10 +55,15 @@ import reactor.core.publisher.Flux; +@SuppressWarnings("unchecked") class ItemStreamReaderWriterIntegrationTest { + private static final int TEST_REPEAT_COUNT = 5; + + private static final Logger logger = LoggerFactory.getLogger(ItemStreamReaderWriterIntegrationTest.class); + private static int onOpenReadCallCount = 0; - private static int readFluxCallCount = 0; + private static int readContextCallCount = 0; private static int onUpdateReadCallCount = 0; private static int onCloseReadCallCount = 0; @@ -64,10 +72,14 @@ class ItemStreamReaderWriterIntegrationTest { private static int onUpdateWriteCallCount = 0; private static int onCloseWriteCallCount = 0; + private static int itemCount = 0; + private static int chunkCount = 0; + private static int expectedWriteCount = 0; + @BeforeEach void beforeEach() { onOpenReadCallCount = 0; - readFluxCallCount = 0; + readContextCallCount = 0; onUpdateReadCallCount = 0; onCloseReadCallCount = 0; @@ -75,10 +87,15 @@ void beforeEach() { writeCallCount = 0; onUpdateWriteCallCount = 0; onCloseWriteCallCount = 0; + + itemCount = ThreadLocalRandom.current().nextInt(10, 100); + chunkCount = ThreadLocalRandom.current().nextInt(1, 10); + expectedWriteCount = (int)Math.ceil((double)itemCount / (double)chunkCount); + + logger.debug("itemCount: {}, chunkCount: {}, writeCount: {}", itemCount, chunkCount, expectedWriteCount); } - @SuppressWarnings("unchecked") - @Test + @RepeatedTest(TEST_REPEAT_COUNT) void testReaderWriter() throws Exception { // given AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(TestConfiguration.class); @@ -89,47 +106,79 @@ void testReaderWriter() throws Exception { Job job = new JobBuilder("testJob", jobRepository) .start( new StepBuilder("testStep", jobRepository) - .chunk(3, new ResourcelessTransactionManager()) + .chunk(chunkCount, new ResourcelessTransactionManager()) .reader(itemStreamReader(testTasklet)) .writer(itemStreamWriter(testTasklet)) .build() ) .build(); JobLauncher jobLauncher = context.getBean(JobLauncher.class); + int beforeRepeatCount = ThreadLocalRandom.current().nextInt(0, 3); + for (int i = 0; i < beforeRepeatCount; ++i) { + JobParameters jobParameters = new JobParametersBuilder() + .addString(UUID.randomUUID().toString(), UUID.randomUUID().toString()) + .toJobParameters(); + jobLauncher.run(job, jobParameters); + } + logger.debug("beforeRepeatCount: {}", beforeRepeatCount); - // when, then - JobParameters jobParameters1 = new JobParametersBuilder() - .addString("test", UUID.randomUUID().toString()) + // when + JobParameters jobParameters = new JobParametersBuilder() + .addString(UUID.randomUUID().toString(), UUID.randomUUID().toString()) .toJobParameters(); - JobExecution jobExecution1 = jobLauncher.run(job, jobParameters1); - assertThat(jobExecution1.getStatus()).isEqualTo(BatchStatus.COMPLETED); - assertThat(onOpenReadCallCount).isEqualTo(1); - assertThat(readFluxCallCount).isEqualTo(1); - assertThat(onUpdateReadCallCount).isEqualTo(8); - assertThat(onCloseReadCallCount).isEqualTo(1); - assertThat(onOpenWriteCallCount).isEqualTo(1); - assertThat(writeCallCount).isEqualTo(7); // ceil(20/3) - assertThat(onUpdateWriteCallCount).isEqualTo(8); - assertThat(onCloseWriteCallCount).isEqualTo(1); - - // when, then - JobParameters jobParameters2 = new JobParametersBuilder() - .addString("test", UUID.randomUUID().toString()) + JobExecution jobExecution = jobLauncher.run(job, jobParameters); + + // then + assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED); + assertThat(onOpenReadCallCount).isEqualTo(beforeRepeatCount + 1); + assertThat(readContextCallCount).isEqualTo(beforeRepeatCount + 1); + assertThat(onUpdateReadCallCount).isGreaterThanOrEqualTo(beforeRepeatCount + 1); + assertThat(onCloseReadCallCount).isEqualTo(beforeRepeatCount + 1); + assertThat(onOpenWriteCallCount).isEqualTo(beforeRepeatCount + 1); + assertThat(onUpdateWriteCallCount).isGreaterThanOrEqualTo(beforeRepeatCount + 1); + assertThat(onCloseWriteCallCount).isEqualTo(beforeRepeatCount + 1); + } + + @RepeatedTest(TEST_REPEAT_COUNT) + void testReaderWriterWithSameTaskletShouldKeepCountContext() throws Exception { + // given + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(TestConfiguration.class); + JobRepository jobRepository = context.getBean(JobRepository.class); + ItemStreamReaderWriter testTasklet = context.getBean( + "testTasklet", + ItemStreamReaderWriter.class); + Job job = new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(chunkCount, new ResourcelessTransactionManager()) + .reader(itemStreamReader(testTasklet)) + .writer(itemStreamWriter(testTasklet)) + .build() + ) + .build(); + JobLauncher jobLauncher = context.getBean(JobLauncher.class); + int beforeRepeatCount = ThreadLocalRandom.current().nextInt(0, 3); + for (int i = 0; i < beforeRepeatCount; ++i) { + JobParameters jobParameters = new JobParametersBuilder() + .addString(UUID.randomUUID().toString(), UUID.randomUUID().toString()) + .toJobParameters(); + jobLauncher.run(job, jobParameters); + } + logger.debug("beforeRepeatCount: {}", beforeRepeatCount); + + // when + JobParameters jobParameters = new JobParametersBuilder() + .addString(UUID.randomUUID().toString(), UUID.randomUUID().toString()) .toJobParameters(); - JobExecution jobExecution2 = jobLauncher.run(job, jobParameters2); - assertThat(jobExecution2.getStatus()).isEqualTo(BatchStatus.COMPLETED); - assertThat(onOpenReadCallCount).isEqualTo(1 + 1); - assertThat(readFluxCallCount).isEqualTo(1 + 1); - assertThat(onUpdateReadCallCount).isEqualTo(8 + 2); - assertThat(onCloseReadCallCount).isEqualTo(1 + 1); - assertThat(onOpenWriteCallCount).isEqualTo(1 + 1); - assertThat(writeCallCount).isEqualTo(7); // same as previous since it's not step scoped - assertThat(onUpdateWriteCallCount).isEqualTo(8 + 2); - assertThat(onCloseWriteCallCount).isEqualTo(1 + 1); + JobExecution jobExecution = jobLauncher.run(job, jobParameters); + + // then + assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED); + // it's not changed since it keeps 'count' in a bean + assertThat(writeCallCount).isEqualTo(expectedWriteCount); } - @SuppressWarnings("unchecked") - @Test + @RepeatedTest(TEST_REPEAT_COUNT) void testStepScopeReaderWriter() throws Exception { // given AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(TestConfiguration.class); @@ -140,79 +189,76 @@ void testStepScopeReaderWriter() throws Exception { Job job = new JobBuilder("testJob", jobRepository) .start( new StepBuilder("testStep", jobRepository) - .chunk(3, new ResourcelessTransactionManager()) + .chunk(chunkCount, new ResourcelessTransactionManager()) .reader(itemStreamReader(testTasklet)) .writer(itemStreamWriter(testTasklet)) .build() ) .build(); JobLauncher jobLauncher = context.getBean(JobLauncher.class); + int beforeRepeatCount = ThreadLocalRandom.current().nextInt(0, 3); + for (int i = 0; i < beforeRepeatCount; ++i) { + JobParameters jobParameters = new JobParametersBuilder() + .addString(UUID.randomUUID().toString(), UUID.randomUUID().toString()) + .toJobParameters(); + jobLauncher.run(job, jobParameters); + } + logger.debug("beforeRepeatCount: {}", beforeRepeatCount); - // when, then - JobParameters jobParameters1 = new JobParametersBuilder() - .addString("test", UUID.randomUUID().toString()) + // when + JobParameters jobParameters = new JobParametersBuilder() + .addString(UUID.randomUUID().toString(), UUID.randomUUID().toString()) .toJobParameters(); - JobExecution jobExecution1 = jobLauncher.run(job, jobParameters1); - assertThat(jobExecution1.getStatus()).isEqualTo(BatchStatus.COMPLETED); - assertThat(onOpenReadCallCount).isEqualTo(1); - assertThat(readFluxCallCount).isEqualTo(1); - assertThat(onUpdateReadCallCount).isEqualTo(8); - assertThat(onCloseReadCallCount).isEqualTo(1); - assertThat(onOpenWriteCallCount).isEqualTo(1); - assertThat(writeCallCount).isEqualTo(7); // ceil(20/3) - assertThat(onUpdateWriteCallCount).isEqualTo(8); - assertThat(onCloseWriteCallCount).isEqualTo(1); - - // when, then - JobParameters jobParameters2 = new JobParametersBuilder() - .addString("test", UUID.randomUUID().toString()) - .toJobParameters(); - JobExecution jobExecution2 = jobLauncher.run(job, jobParameters2); - assertThat(jobExecution2.getStatus()).isEqualTo(BatchStatus.COMPLETED); - assertThat(onOpenReadCallCount).isEqualTo(1 + 1); - assertThat(readFluxCallCount).isEqualTo(1 + 1); - assertThat(onUpdateReadCallCount).isEqualTo(8 + 8); - assertThat(onCloseReadCallCount).isEqualTo(1 + 1); - assertThat(onOpenWriteCallCount).isEqualTo(1 + 1); - assertThat(writeCallCount).isEqualTo(7 + 7); // ceil(20/3) * 2 - assertThat(onUpdateWriteCallCount).isEqualTo(8 + 8); - assertThat(onCloseWriteCallCount).isEqualTo(1 + 1); + JobExecution jobExecution = jobLauncher.run(job, jobParameters); + + // then + assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED); + assertThat(onOpenReadCallCount).isEqualTo(beforeRepeatCount + 1); + assertThat(readContextCallCount).isEqualTo(beforeRepeatCount + 1); + assertThat(onUpdateReadCallCount).isGreaterThanOrEqualTo(beforeRepeatCount + 1); + assertThat(onCloseReadCallCount).isEqualTo(beforeRepeatCount + 1); + assertThat(onOpenWriteCallCount).isEqualTo(beforeRepeatCount + 1); + assertThat(onUpdateWriteCallCount).isGreaterThanOrEqualTo(beforeRepeatCount + 1); + assertThat(onCloseWriteCallCount).isEqualTo(beforeRepeatCount + 1); } - @SuppressWarnings("unchecked") - @Test - void testReaderWriterWithRequiredMethodsOnly() throws Exception { + @RepeatedTest(TEST_REPEAT_COUNT) + void testStepScopeReaderWriterWithSameTaskletShouldNotKeepCountContext() throws Exception { // given AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(TestConfiguration.class); JobRepository jobRepository = context.getBean(JobRepository.class); ItemStreamReaderWriter testTasklet = context.getBean( - "testTaskletWithOnlyRequiredMethodsOnly", + "stepScopeTestTasklet", ItemStreamReaderWriter.class); Job job = new JobBuilder("testJob", jobRepository) .start( new StepBuilder("testStep", jobRepository) - .chunk(3, new ResourcelessTransactionManager()) + .chunk(chunkCount, new ResourcelessTransactionManager()) .reader(itemStreamReader(testTasklet)) .writer(itemStreamWriter(testTasklet)) .build() ) .build(); JobLauncher jobLauncher = context.getBean(JobLauncher.class); + int beforeRepeatCount = ThreadLocalRandom.current().nextInt(0, 3); + for (int i = 0; i < beforeRepeatCount; ++i) { + JobParameters jobParameters = new JobParametersBuilder() + .addString(UUID.randomUUID().toString(), UUID.randomUUID().toString()) + .toJobParameters(); + jobLauncher.run(job, jobParameters); + } + logger.debug("beforeRepeatCount: {}", beforeRepeatCount); - // when, then - JobParameters jobParameters1 = new JobParametersBuilder() - .addString("test", UUID.randomUUID().toString()) + // when + JobParameters jobParameters = new JobParametersBuilder() + .addString(UUID.randomUUID().toString(), UUID.randomUUID().toString()) .toJobParameters(); - JobExecution jobExecution1 = jobLauncher.run(job, jobParameters1); - assertThat(jobExecution1.getStatus()).isEqualTo(BatchStatus.COMPLETED); - assertThat(onOpenReadCallCount).isEqualTo(0); - assertThat(readFluxCallCount).isEqualTo(1); - assertThat(onUpdateReadCallCount).isEqualTo(0); - assertThat(onCloseReadCallCount).isEqualTo(0); - assertThat(onOpenWriteCallCount).isEqualTo(0); - assertThat(writeCallCount).isEqualTo(7); // ceil(20/3) - assertThat(onUpdateWriteCallCount).isEqualTo(0); - assertThat(onCloseWriteCallCount).isEqualTo(0); + JobExecution jobExecution = jobLauncher.run(job, jobParameters); + + // then + assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED); + // 'count' field is isolated per job instances since it is step scoped. so count is 0 for all job instances + assertThat(writeCallCount).isEqualTo(beforeRepeatCount * expectedWriteCount + expectedWriteCount); } @SuppressWarnings("unused") @@ -238,7 +284,7 @@ DataSource metadataDataSource() { @Bean ItemStreamReaderWriter testTasklet() { - return new ItemStreamReaderWriter() { + return new ItemStreamReaderWriter<>() { private int count = 0; @@ -250,9 +296,9 @@ public void onOpenRead(@NonNull ExecutionContext executionContext) { @NonNull @Override public Flux readFlux(@NonNull ExecutionContext executionContext) { - ++readFluxCallCount; + ++readContextCallCount; return Flux.generate(sink -> { - if (count < 20) { + if (count < itemCount) { sink.next(count); ++count; } else { @@ -296,7 +342,7 @@ public void onCloseWrite() { @Bean @StepScope ItemStreamReaderWriter stepScopeTestTasklet() { - return new ItemStreamReaderWriter() { + return new ItemStreamReaderWriter<>() { private int count = 0; @@ -308,9 +354,9 @@ public void onOpenRead(@NonNull ExecutionContext executionContext) { @NonNull @Override public Flux readFlux(@NonNull ExecutionContext executionContext) { - ++readFluxCallCount; + ++readContextCallCount; return Flux.generate(sink -> { - if (count < 20) { + if (count < itemCount) { sink.next(count); ++count; } else { @@ -350,32 +396,5 @@ public void onCloseWrite() { } }; } - - @Bean - ItemStreamReaderWriter testTaskletWithOnlyRequiredMethodsOnly() { - return new ItemStreamReaderWriter() { - - private int count = 0; - - @NonNull - @Override - public Flux readFlux(@NonNull ExecutionContext executionContext) { - ++readFluxCallCount; - return Flux.generate(sink -> { - if (count < 20) { - sink.next(count); - ++count; - } else { - sink.complete(); - } - }); - } - - @Override - public void write(@NonNull Chunk chunk) { - ++writeCallCount; - } - }; - } } }