SpringBoot專案中應用Spring Batch批處理框架,處理大資料新方案

環境:Springboot2。3。12RELEASE + Spring Batch4。2。7

Spring Batch是一個輕量級的,完全面向Spring的批處理框架,可以應用於企業級大量的資料處理系統。Spring Batch以POJO和大家熟知的Spring框架為基礎,使開發者更容易的訪問和利用企業級服務。Spring Batch可以提供大量的,可重複的資料處理功能,包括日誌記錄/跟蹤,事務管理,作業處理統計工作重新啟動、跳過,和資源管理等重要功能。

業務場景:

定期提交批處理。

並行批處理:作業的並行處理

分階段、企業訊息驅動的處理

大規模並行批處理

故障後手動或計劃重新啟動

相關步驟的順序處理(擴充套件到工作流驅動的批處理)

部分處理:跳過記錄(例如,回滾時)

整批事務,適用於小批次或現有儲存過程/指令碼的情況

技術目標:

批處理開發人員使用Spring程式設計模型:專注於業務邏輯,讓框架負責基礎設施。

基礎架構、批處理執行環境和批處理應用程式之間的關注點清晰分離。

提供通用的核心執行服務,作為所有專案都可以實現的介面。

提供可“開箱即用”的核心執行介面的簡單和預設實現。

透過在所有層中利用spring框架,可以輕鬆配置、定製和擴充套件服務。

所有現有的核心服務都應該易於替換或擴充套件,而不會對基礎架構層造成任何影響。

提供一個簡單的部署模型,使用Maven構建的架構JAR與應用程式完全分離。

Spring Batch的結構:

SpringBoot專案中應用Spring Batch批處理框架,處理大資料新方案

此分層體系結構突出了三個主要的高階元件:應用程式、核心和基礎架構。該應用程式包含開發人員使用SpringBatch編寫的所有批處理作業和自定義程式碼。批處理核心包含啟動和控制批處理作業所需的核心執行時類。它包括JobLauncher、Job和Step的實現。應用程式和核心都構建在公共基礎架構之上。此基礎結構包含公共讀寫器和服務(如RetryTemplate),應用程式開發人員(讀寫器,如ItemReader和ItemWriter)和核心框架本身(retry,它是自己的庫)都使用這些服務。

下面介紹開發流程

本例完成 讀取檔案內容,經過處理後,將資料儲存到資料庫中

引入依賴

org。springframework。boot spring-boot-starter-batch org。springframework。boot spring-boot-starter-web org。springframework。boot spring-boot-starter-data-jpa mysql mysql-connector-java org。hibernate hibernate-validator 6。0。7。Final

應用配置檔案

spring: datasource: driverClassName: com。mysql。cj。jdbc。Driver url: jdbc:mysql://localhost:3306/batch?serverTimezone=GMT%2B8 username: root password: ******* type: com。zaxxer。hikari。HikariDataSource hikari: minimumIdle: 10 maximumPoolSize: 200 autoCommit: true idleTimeout: 30000 poolName: MasterDatabookHikariCP maxLifetime: 1800000 connectionTimeout: 30000 connectionTestQuery: SELECT 1——-spring: jpa: generateDdl: false hibernate: ddlAuto: update openInView: true show-sql: true——-spring: batch: job: enabled: false #是否自動執行任務 initialize-schema: always #自動為我們建立資料庫指令碼

開啟批處理功能

@Configuration@EnableBatchProcessingpublic class BatchConfig extends DefaultBatchConfigurer{}

任務啟動器

接著上一步的配置類BatchConfig重寫對應方法

@Overrideprotected JobLauncher createJobLauncher() throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher。setJobRepository(createJobRepository()); jobLauncher。afterPropertiesSet(); return jobLauncher;}

任務儲存

接著上一步的配置類BatchConfig重寫對應方法

@Resourceprivate PlatformTransactionManager transactionManager ;@Overrideprotected JobRepository createJobRepository() throws Exception { JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); factory。setDatabaseType(“mysql”); factory。setTransactionManager(transactionManager); factory。setDataSource(dataSource); factory。afterPropertiesSet(); return factory。getObject();}

定義JOB

@Beanpublic Job myJob(JobBuilderFactory builder, @Qualifier(“myStep”)Step step){ return builder。get(“myJob”) 。incrementer(new RunIdIncrementer()) 。flow(step) 。end() 。listener(jobExecutionListener) 。build();}

定義ItemReader讀取器

@Beanpublic ItemReader reader(){ FlatFileItemReader reader = new FlatFileItemReader<>(); reader。setResource(new ClassPathResource(“cvs/persons。cvs”)); reader。setLineMapper(new DefaultLineMapper() { // 程式碼塊 { setLineTokenizer(new DelimitedLineTokenizer(“,”) { { setNames(“id”, “name”); } }) ; setFieldSetMapper(new BeanWrapperFieldSetMapper() { { setTargetType(Person。class) ; } }); } }); return reader;}

定義ItemProcessor處理器

@Beanpublic ItemProcessor processorPerson(){ return new ItemProcessor() { @Override public Person2 process(Person item) throws Exception { Person2 p = new Person2() ; p。setId(item。getId()) ; p。setName(item。getName() + “, pk”); return p ; } } ;}

定義ItemWriter寫資料

@Resourceprivate Validator validator ;@Resourceprivate EntityManagerFactory entityManagerFactory ;@Beanpublic ItemWriter writerPerson(){ JpaItemWriter writer = null ; JpaItemWriterBuilder builder = new JpaItemWriterBuilder<>() ; builder。entityManagerFactory(entityManagerFactory) ; writer = builder。build() ; return writer;}

定義Step

@Beanpublic Step myStep(StepBuilderFactory stepBuilderFactory, ItemReader reader, ItemWriter writer, ItemProcessor processor){ return stepBuilderFactory 。get(“myStep”) 。chunk(2) // Chunk的機制(即每次讀取一條資料,再處理一條資料,累積到一定數量後再一次性交給writer進行寫入操作) 。reader(reader)。faultTolerant()。retryLimit(3)。retry(Exception。class)。skip(Exception。class)。skipLimit(2) 。listener(new MyReadListener()) 。processor(processor) 。writer(writer)。faultTolerant()。skip(Exception。class)。skipLimit(2) 。listener(new MyWriteListener()) 。build();}

定義相應的監聽器

public class MyReadListener implements ItemReadListener { private Logger logger = LoggerFactory。getLogger(MyReadListener。class); @Override public void beforeRead() { } @Override public void afterRead(Person item) { System。out。println(“reader after: ” + Thread。currentThread()。getName()) ; } @Override public void onReadError(Exception ex) { logger。info(“讀取資料錯誤:{}”, ex); }}

@Componentpublic class MyWriteListener implements ItemWriteListener { private Logger logger = LoggerFactory。getLogger(MyWriteListener。class); @Override public void beforeWrite(List<? extends Person> items) { } @Override public void afterWrite(List<? extends Person> items) { System。out。println(“writer after: ” + Thread。currentThread()。getName()) ; } @Override public void onWriteError(Exception exception, List<? extends Person> items) { try { logger。info(format(“%s%n”, exception。getMessage())); for (Person item : items) { logger。info(format(“Failed writing BlogInfo : %s”, item。toString())); } } catch (Exception e) { e。printStackTrace(); } }}

person。cvs檔案內容

SpringBoot專案中應用Spring Batch批處理框架,處理大資料新方案

實體類:

@Entity@Table(name = “t_person”)public class Person { @Id @GeneratedValue(strategy = GenerationType。IDENTITY) private Integer id ; private String name ;}

啟動任務執行

@RestController@RequestMapping(“/demo”)public class DemoController { @Resource @Qualifier(“myJob”) private Job job ; @Resource private JobLauncher launcher ; @GetMapping(“/index”) public Object index() { JobParameters jobParameters = new JobParametersBuilder()。toJobParameters() ; try { launcher。run(job, jobParameters) ; } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) { e。printStackTrace(); } return “success” ; }}

啟動服務,自動為我們建立了表

SpringBoot專案中應用Spring Batch批處理框架,處理大資料新方案

執行任務

查看錶情況

SpringBoot專案中應用Spring Batch批處理框架,處理大資料新方案

SpringBoot專案中應用Spring Batch批處理框架,處理大資料新方案

SpringBoot專案中應用Spring Batch批處理框架,處理大資料新方案

完畢!!!

公眾:Springboot實戰案例錦集

SpringBoot專案中應用Spring Batch批處理框架,處理大資料新方案

SpringBoot專案中應用Spring Batch批處理框架,處理大資料新方案

SpringBoot專案中應用Spring Batch批處理框架,處理大資料新方案

SpringBoot專案中應用Spring Batch批處理框架,處理大資料新方案