溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spring batch入門示例

發布時間:2020-06-29 07:01:54 來源:網絡 閱讀:758 作者:A零零八 欄目:大數據

1??? 場景說明

讀取CVS文件,經過處理后,保存到數據庫。

?

Spring batch入門示例

2??? 項目結構

應用程序

啟動主程序

DemoApplication.java

讀取文件(輸入文件)

UserItemReader.java

處理數據

UserItemProcess.java

輸出文件

UserItemWriter.java

調度批作業

定時處理配置

QuartzConfiguration.java

定時調度

QuartzJobLauncher.java

輔助文件

數據文件

User.txt

對象實體(傳遞對象)

User.java

Meaven配置文件

Pom.xml

Spring batch入門示例

2.1?? Pom.xml

<?xml version="1.0" ? encoding="UTF-8"?>

<project ? xmlns="http://maven.apache.org/POM/4.0.0" ? xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

??? xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 ? http://maven.apache.org/xsd/maven-4.0.0.xsd">

??? <modelVersion>4.0.0</modelVersion>

?

??? <groupId>com.zy</groupId>

??? <artifactId>SpringBatchDemo1</artifactId>

??? <version>0.0.1-SNAPSHOT</version>

??? <packaging>jar</packaging>

?

??? <name>SpringBatchDemo1</name>

??? <description>Demo ? project for Spring Boot</description>

?

??? <parent>

?????? <groupId>org.springframework.boot</groupId>

?????? <artifactId>spring-boot-starter-parent</artifactId>

?????? <version>1.5.10.RELEASE</version>

?????? <relativePath ? /> <!-- lookup parent from repository -->

??? </parent>

?

??? <properties>

?????? <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

??? ??? <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

?????? <java.version>1.8</java.version>

??? </properties>

?

??? <dependencies>

?????? <dependency>

?????????? <groupId>org.springframework</groupId>

?????????? <artifactId>spring-context-support</artifactId>

?????? </dependency>

?????? <dependency>

?????????? <groupId>org.springframework.boot</groupId>

?????????? <artifactId>spring-boot-starter-batch</artifactId>

?????? </dependency>

?????? <dependency>

?????????? <groupId>org.springframework</groupId>

?????????? <artifactId>spring-oxm</artifactId>

?????? </dependency>

?????? <dependency>

?????????? <groupId>org.projectlombok</groupId>

?????????? <artifactId>lombok</artifactId>

?????? </dependency>

?????? <!-- ? <dependency> <groupId>com.h3database</groupId> ? <artifactId>h3</artifactId>

?????????? <scope>runtime</scope> ? </dependency> -->

?????? <dependency>

?????????? <groupId>mysql</groupId>

?????????? <artifactId>mysql-connector-java</artifactId>

?????????? <scope>runtime</scope>

?????? </dependency>

?????? <dependency>

?????????? <groupId>org.springframework.boot</groupId>

?????????? <artifactId>spring-boot-starter-test</artifactId>

?????????? <scope>test</scope>

?????? </dependency>

?????? <dependency>

?????????? <groupId>org.springframework.batch</groupId>

?????????? <artifactId>spring-batch-test</artifactId>

?????????? <scope>test</scope>

?????? </dependency>

?????? <dependency>

?????????? <groupId>org.projectlombok</groupId>

?????????? <artifactId>lombok</artifactId>

?????? </dependency>

?????? <dependency>

?????????? <groupId>org.quartz-scheduler</groupId>

?????????? <artifactId>quartz</artifactId>

?????????? <version>2.3.0</version>

?????? </dependency>

?????? <dependency>

?????????? <groupId>com.h3database</groupId>

?????????? <artifactId>h3</artifactId>

?????????? <scope>runtime</scope>

?????? </dependency>

??? </dependencies>

?

??? <build>

?????? <plugins>

?????????? <plugin>

????????????? <groupId>org.springframework.boot</groupId>

????????????? <artifactId>spring-boot-maven-plugin</artifactId>

?????????? </plugin>

?????? </plugins>

??? </build>

?

?

</project>

2.2?? User.java

package com.zy.model;

?

public class User {

?????? private ? String id;

?????? private ? String name;

?????? private ? String age;

??????

?????? public ? User(String id, String name, String age) {

????????????? this.id ? = id;

????????????? this.name ? = name;

????????????? this.age ? = age;

?????? }

?

?????? public ? String getId() {

????????????? return ? id;

?????? }

?

?????? public ? void setId(String id) {

????????????? this.id ? = id;

?????? }

?

?????? public ? String getName() {

????????????? return ? name;

?????? }

?

?????? public ? void setName(String name) {

????????????? this.name ? = name;

?????? }

?

?????? public ? String getAge() {

????????????? return ? age;

?????? }

?

?????? public ? void setAge(String age) {

????????????? this.age ? = age;

?????? }

?

?????? @Override

?????? public ? String toString() {

????????????? return ? "User [id=" + id + ", name=" + name + ", age=" ? + age + "]";

?????? }

??????

}

2.3?? UserItemReader.java

package com.zy.reader;

?

import ? org.springframework.batch.item.file.FlatFileItemReader;

import ? org.springframework.batch.item.file.LineMapper;

import ? org.springframework.batch.item.file.mapping.DefaultLineMapper;

import org.springframework.batch.item.file.mapping.FieldSetMapper;

import ? org.springframework.batch.item.file.transform.DelimitedLineTokenizer;

import ? org.springframework.batch.item.file.transform.FieldSet;

import ? org.springframework.batch.item.file.transform.LineTokenizer;

import ? org.springframework.core.io.ClassPathResource;

import ? org.springframework.validation.BindException;

?

import com.zy.model.User;

//從user.txt文件中讀取信息到User

public class UserItemReader extends ? FlatFileItemReader<User> {

?????? public ? UserItemReader(){

????????????? createReader();

?????? }

??????

?????? private ? void createReader(){

????????????? this.setResource(new ? ClassPathResource("data/User.txt"));

????????????? this.setLinesToSkip(1);

????????????? this.setLineMapper(userLineMapper());

?????? }

??????

?????? private ? LineMapper<User> userLineMapper(){

????????????? DefaultLineMapper<User> ? lineMapper = new DefaultLineMapper<>();

????????????? lineMapper.setLineTokenizer(userLineTokenizer());

????????????? lineMapper.setFieldSetMapper(new ? UserFieldStepMapper());

????????????? lineMapper.afterPropertiesSet(); ?

????????????? return ? lineMapper;

?????? }

??????

??? ? private LineTokenizer userLineTokenizer(){

?????? ? ?DelimitedLineTokenizer ? tokenizer = new DelimitedLineTokenizer();

??????? ? tokenizer.setNames(new String[]{"ID", "NAME", ? "AGE"});

??????? ? return tokenizer;

??? ? }

??? ?

??? ? private static class UserFieldStepMapper implements ? FieldSetMapper<User>{

????????????? @Override

????????????? public ? User mapFieldSet(FieldSet fieldSet) throws BindException {

??????????? return new ? User(fieldSet.readString("ID"),

??????????????????? ? fieldSet.readString("NAME"),

??????????????????? ? fieldSet.readString("AGE"));

????????????? }

?

??? ? }

?

??? ?

}

2.4?? User.txt

ID,NAME,AGE

1,zy,28

2,tom,20

3,terry,30

4,lerry,18

5,bob,25

6,linda,27

7,marry,39

8,long,22

9,kin,33

10,王五,40

?

2.5?? UserItemProcessor.java

package com.zy.processor;

import ? org.springframework.batch.item.ItemProcessor;

import com.zy.model.User;

?

public class UserItemProcessor implements ? ItemProcessor<User, User> {

?

?????? @Override

?????? public ? User process(User item) throws Exception {

????????????? if ? (Integer.parseInt(item.getAge()) > 20) {

????????????????

???????????????????? return ? item;

????????????? }

????????????? return ? null;

?????? }

?

}

?

2.6?? UserItemWriter.java

package com.zy.writer;

import java.util.List;

import ? org.springframework.batch.item.ItemWriter;

import com.zy.model.User;

?

public class UserItemWriter implements ? ItemWriter<User> {

?

?????? @Override

?????? public ? void write(List<? extends User> items) throws Exception {

????????????? for(User ? user : items){

???????????????????? System.out.println(user);

????????????? }

?????? }

?

}

2.7?? QuartzJobLauncher

package com.zy.QuartzConfiguration;

?

import java.text.SimpleDateFormat;

import java.util.Date;

import org.quartz.JobDataMap;

import org.quartz.JobDetail;

import org.quartz.JobExecutionContext;

import org.quartz.JobExecutionException;

import org.quartz.JobKey;

import ? org.springframework.batch.core.Job;

import org.springframework.batch.core.JobExecution;

import ? org.springframework.batch.core.JobParameters;

import ? org.springframework.batch.core.configuration.JobLocator;

import ? org.springframework.batch.core.launch.JobLauncher;

import ? org.springframework.scheduling.quartz.QuartzJobBean;

?

public class QuartzJobLauncher extends ? QuartzJobBean {

?????? @Override

?????? protected ? void executeInternal(JobExecutionContext context) throws JobExecutionException ? {

?????????????

????????????? JobDetail ? jobDetail = context.getJobDetail();

????????????? JobDataMap ? jobDataMap = jobDetail.getJobDataMap();

????????????? String ? jobName = jobDataMap.getString("jobName");

????????????? JobLauncher ? jobLauncher = (JobLauncher) jobDataMap.get("jobLauncher");

????????????? JobLocator ? jobLocator = (JobLocator) jobDataMap.get("jobLocator");

????????????? System.out.println("jobName ? : " + jobName);

????????????? System.out.println("jobLauncher ? : " + jobLauncher);

????????????? System.out.println("jobLocator ? : " + jobLocator);

????????????? JobKey ? key = context.getJobDetail().getKey();

????????????? System.out.println(key.getName() ? + " : " + key.getGroup());

????????????? SimpleDateFormat ? sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

????????????? System.out.println("Current ? time : " + sf.format(new Date()));

?????????????

????????????? try ? {

???????????????????? Job ? job = jobLocator.getJob(jobName);

???????????????????? JobExecution ? jobExecution = jobLauncher.run(job, new JobParameters());

????????????? } ? catch (Exception e) {

???????????????????? e.printStackTrace();

????????????? }

?????????????

?????? }

?

}

?

2.8?? QuartzConfiguration

package com.zy.QuartzConfiguration;

?

import java.util.HashMap;

import java.util.Map;

?

import ? org.springframework.batch.core.configuration.JobLocator;

import ? org.springframework.batch.core.configuration.JobRegistry;

import ? org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;

import ? org.springframework.batch.core.launch.JobLauncher;

import ? org.springframework.beans.factory.annotation.Autowired;

import ? org.springframework.context.annotation.Bean;

import ? org.springframework.context.annotation.Configuration;

import ? org.springframework.scheduling.quartz.CronTriggerFactoryBean;

import ? org.springframework.scheduling.quartz.JobDetailFactoryBean;

import ? org.springframework.scheduling.quartz.SchedulerFactoryBean;

?

@Configuration

public class QuartzConfiguration {

??????

?????? //自動注入進來的是SimpleJobLauncher

?????? @Autowired

?????? private ? JobLauncher jobLauncher;

??????

?????? @Autowired

?????? private ? JobLocator jobLocator;

??????

?????? /*用來注冊job*/

?????? /*JobRegistry會自動注入進來*/

?????? @Bean

?????? public ? JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry ? jobRegistry){

????????????? JobRegistryBeanPostProcessor ? jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();

????????????? jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry);

????????????? return ? jobRegistryBeanPostProcessor;

?????? }

??????

?????? @Bean

?????? public ? JobDetailFactoryBean jobDetailFactoryBean(){

????????????? JobDetailFactoryBean ? jobFactory = new JobDetailFactoryBean();

????????????? jobFactory.setJobClass(QuartzJobLauncher.class);

????????????? jobFactory.setGroup("my_group");

????????????? jobFactory.setName("my_job");

????????????? Map<String, ? Object> map = new HashMap<>();

????????????? map.put("jobName", ? "zyJob");

????????????? map.put("jobLauncher", ? jobLauncher);

????????????? map.put("jobLocator", ? jobLocator);

????????????? jobFactory.setJobDataAsMap(map);

????????????? return ? jobFactory;

?????? }

??????

?????? @Bean

?????? public ? CronTriggerFactoryBean cronTriggerFactoryBean(){

????????????? CronTriggerFactoryBean ? cTrigger = new CronTriggerFactoryBean();

????????????? System.out.println("------- ? : " + jobDetailFactoryBean().getObject());

????????????? cTrigger.setJobDetail(jobDetailFactoryBean().getObject());

????????????? cTrigger.setStartDelay(3000);

????????????? cTrigger.setName("my_trigger");

????????????? cTrigger.setGroup("trigger_group");

????????????? cTrigger.setCronExpression("0/3 ? * * * * ? "); //每間隔3s觸發一次Job任務

????????????? return ? cTrigger;

?????? }

??????

?????? @Bean

?????? public ? SchedulerFactoryBean schedulerFactoryBean(){

????????????? SchedulerFactoryBean ? schedulerFactor = new SchedulerFactoryBean();

????????????? schedulerFactor.setTriggers(cronTriggerFactoryBean().getObject());

????????????? return ? schedulerFactor;

?????? }

?

}

?

?

2.9?? BatchConfiguration

package com.zy.config;

import ? org.springframework.batch.core.Job;

import ? org.springframework.batch.core.Step;

import ? org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;

import ? org.springframework.batch.core.configuration.annotation.JobBuilderFactory;

import ? org.springframework.batch.core.configuration.annotation.StepBuilderFactory;

import ? org.springframework.beans.factory.annotation.Autowired;

import ? org.springframework.context.annotation.Bean;

import ? org.springframework.context.annotation.Configuration;

import ? org.springframework.context.annotation.Import;

import com.zy.QuartzConfiguration.QuartzConfiguration;

import com.zy.model.User;

import ? com.zy.processor.UserItemProcessor;

import com.zy.reader.UserItemReader;

import com.zy.writer.UserItemWriter;

?

@Configuration

@EnableBatchProcessing

//@Import({QuartzConfiguration.class})

public class BatchConfiguration {

??????

?????? @Autowired

?????? public ? JobBuilderFactory jobBuilderFactory;

?????? @Autowired

?????? public ? StepBuilderFactory stepBuilderFactory;

??????

??????

?????? /*創建job*/

?????? @Bean

?????? public ? Job jobMethod(){

????????????? return ? jobBuilderFactory.get("zyJob")

??????????????????????????? .start(stepMethod())

??????????????????????????? .build();

?????? }

??????

?????? /*創建step*/

?????? @Bean

?????? public ? Step stepMethod(){

????????????? return ? stepBuilderFactory.get("myStep1")

??????????????????????????? .<User, ? User>chunk(3)

??????????????????????????? .reader(new ? UserItemReader())

??????????????????????????? .processor(new ? UserItemProcessor())

??????????????????????????? .writer(new ? UserItemWriter())

??????????????????????????? .allowStartIfComplete(true)

??????????????????????????? .build();

?????? }

??????

?

}

?

3??? 執行Job輸出結果

2019-04-30 21:31:48.049? INFO 9344 --- [ryBean_Worker-5] ? o.s.b.c.l.support.SimpleJobLauncher????? ? : Job: [SimpleJob: [name=zyJob]] completed with the following ? parameters: [{}] and the following status: [COMPLETED]

jobName : zyJob

jobLauncher : ? org.springframework.batch.core.launch.support.SimpleJobLauncher@2d27244d

jobLocator : org.springframework.batch.core.configuration.support.MapJobRegistry@6fc00b5

my_job : my_group

Current time : 2019-04-30 21:31:51

2019-04-30 21:31:51.012? INFO 9344 --- [ryBean_Worker-6] ? o.s.b.c.l.support.SimpleJobLauncher????? ? : Job: [SimpleJob: [name=zyJob]] launched with the following ? parameters: [{}]

2019-04-30 21:31:51.028? INFO 9344 --- [ryBean_Worker-6] ? o.s.batch.core.job.SimpleStepHandler???? ? : Executing step: [myStep1]

User [id=1, name=zy, age=28]

User [id=3, name=terry, age=30]

User [id=5, name=bob, age=25]

User [id=6, name=linda, age=27]

User [id=7, name=marry, age=39]

User [id=8, name=long, age=22]

User [id=9, name=kin, age=33]

User [id=10, name=ww, age=40]

?

4??? 概念總結


Job Repository

作業倉庫,負責Job,Step執行過程中的狀態保存。


Job Launcher

作業調度器,提供執行Job的入口


Job

作業,多個Step組成,封裝整個批處理操作。


Step

作業步,Job的一個執行環節,由多個或者一個Step組裝成Job


Tasklet

Step中具體執行的邏輯的操作,可以重復執行,可以具體的設置同步,異步操作。


Chunk

給定數量的Item集合,可以定義對Chunk的讀操作,處理操作,寫操作,提交間隔。


Item

一條數據記錄。


ItemReader

從數據源(文件系統,數據庫,隊列等)讀取Item


ItemProcessor

在寫入數據源之前,對數據進行處理(如:數據清洗,轉換,過濾,數據校驗等)。


ItemWriter

將Item批量寫入數據源(文件系統,數據庫,隊列等)。

5??? Spring Batch 結構

Spring Batch的一個基本層級結構。

首先,Spring Batch運行的基本單位是一個Job,一個Job就做一件批處理的事情。

一個Job包含很多Step,step就是每個job要執行的單個步驟。

如下圖所示,Step里面,會有Tasklet,Tasklet是一個任務單元,它是屬于可以重復利用的東西。

然后是Chunk,chunk就是數據塊,你需要定義多大的數據量是一個chunk。

Chunk里面就是不斷循環的一個流程,讀數據,處理數據,然后寫數據。Spring Batch會不斷的循環這個流程,直到批處理數據完成。

Spring batch入門示例



向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女