關(guān)于PowerJob
PowerJob(原OhMyScheduler)是全新一代分布式任務(wù)調(diào)度與計(jì)算框架,其主要功能特性如下:
- 使用簡(jiǎn)單:提供前端Web界面,允許開發(fā)者可視化地完成調(diào)度任務(wù)的管理(增、刪、改、查)、任務(wù)運(yùn)行狀態(tài)監(jiān)控和運(yùn)行日志查看等功能。
- 定時(shí)策略完善:支持 CRON 表達(dá)式、固定頻率、固定延遲和API四種定時(shí)調(diào)度策略。
- 執(zhí)行模式豐富:支持單機(jī)、廣播、Map、MapReduce 四種執(zhí)行模式,其中 Map/MapReduce 處理器能使開發(fā)者寥寥數(shù)行代碼便獲得集群分布式計(jì)算的能力。
- 工作流支持:支持在線配置任務(wù)依賴關(guān)系(DAG),以可視化的方式對(duì)任務(wù)進(jìn)行編排,同時(shí)還支持上下游任務(wù)間的數(shù)據(jù)傳遞,以及多種節(jié)點(diǎn)類型(判斷節(jié)點(diǎn) & 嵌套工作流節(jié)點(diǎn))。
- 執(zhí)行器支持廣泛:支持 Spring Bean、內(nèi)置/外置 JAVA 類,另外可以通過(guò)引入官方提供的依賴包,一鍵集成 Shell、Python/ target=_blank class=infotextkey>Python、HTTP、SQL 等處理器,應(yīng)用范圍廣。
- 運(yùn)維便捷:支持在線日志功能,執(zhí)行器產(chǎn)生的日志可以在前端控制臺(tái)頁(yè)面實(shí)時(shí)顯示,降低 debug 成本,極大地提高開發(fā)效率。
- 依賴精簡(jiǎn):最小僅依賴關(guān)系型數(shù)據(jù)庫(kù)(MySQL/PostgreSQL/Oracle/MS SQLServer...)
- 高可用 & 高性能:調(diào)度服務(wù)器經(jīng)過(guò)精心設(shè)計(jì),一改其他調(diào)度框架基于數(shù)據(jù)庫(kù)鎖的策略,實(shí)現(xiàn)了無(wú)鎖化調(diào)度。部署多個(gè)調(diào)度服務(wù)器可以同時(shí)實(shí)現(xiàn)高可用和性能的提升(支持無(wú)限的水平擴(kuò)展)。
- 故障轉(zhuǎn)移與恢復(fù):任務(wù)執(zhí)行失敗后,可根據(jù)配置的重試策略完成重試,只要執(zhí)行器集群有足夠的計(jì)算節(jié)點(diǎn),任務(wù)就能順利完成。
適用場(chǎng)景
- 有定時(shí)執(zhí)行需求的業(yè)務(wù)場(chǎng)景:如每天凌晨全量同步數(shù)據(jù)、生成業(yè)務(wù)報(bào)表、未支付訂單超時(shí)取消等。
- 有需要全部機(jī)器一同執(zhí)行的業(yè)務(wù)場(chǎng)景:如使用廣播執(zhí)行模式清理集群日志。
- 有需要分布式處理的業(yè)務(wù)場(chǎng)景:比如需要更新一大批數(shù)據(jù),單機(jī)執(zhí)行耗時(shí)非常長(zhǎng),可以使用Map/MapReduce 處理器完成任務(wù)的分發(fā),調(diào)動(dòng)整個(gè)集群加速計(jì)算。
- 有需要延遲執(zhí)行某些任務(wù)的業(yè)務(wù)場(chǎng)景:比如訂單過(guò)期處理等。
同類產(chǎn)品對(duì)比
基本概念
分組概念
- AppName:應(yīng)用名稱,建議與用戶實(shí)際接入 PowerJob 的應(yīng)用名稱保持一致,用于業(yè)務(wù)分組與隔離。一個(gè) appName 等于一個(gè)業(yè)務(wù)集群,也就是實(shí)際的一個(gè) Java 項(xiàng)目。
核心概念
- 任務(wù)(Job):描述了需要被 PowerJob 調(diào)度的任務(wù)信息,包括任務(wù)名稱、調(diào)度時(shí)間、處理器信息等。
- 任務(wù)實(shí)例( JobInstance,簡(jiǎn)稱 Instance):任務(wù)(Job)被調(diào)度執(zhí)行后會(huì)生成任務(wù)實(shí)例(Instance),任務(wù)實(shí)例記錄了任務(wù)的運(yùn)行時(shí)信息(任務(wù)與任務(wù)實(shí)例的關(guān)系類似于類與對(duì)象的關(guān)系)。
- 作業(yè)(Task):任務(wù)實(shí)例的執(zhí)行單元,一個(gè) JobInstance 存在至少一個(gè) Task,具體規(guī)則如下:
- 單機(jī)任務(wù)(STANDALONE):一個(gè) JobInstance 對(duì)應(yīng)一個(gè) Task
- 廣播任務(wù)(BROADCAST):一個(gè) JobInstance 對(duì)應(yīng) N 個(gè) Task,N為集群機(jī)器數(shù)量,即每一臺(tái)機(jī)器都會(huì)生成一個(gè) Task
- Map/MapReduce任務(wù):一個(gè) JobInstance 對(duì)應(yīng)若干個(gè) Task,由開發(fā)者手動(dòng) map 產(chǎn)生
- 工作流(Workflow):由 DAG(有向無(wú)環(huán)圖)描述的一組任務(wù)(Job),用于任務(wù)編排。
- 工作流實(shí)例(WorkflowInstance):工作流被調(diào)度執(zhí)行后會(huì)生成工作流實(shí)例,記錄了工作流的運(yùn)行時(shí)信息。
擴(kuò)展概念
- JVM 容器:以 Maven 工程項(xiàng)目的維度組織一堆 Java 文件(開發(fā)者開發(fā)的眾多 Java 處理器),可以通過(guò)前端網(wǎng)頁(yè)動(dòng)態(tài)發(fā)布并被執(zhí)行器加載,具有極強(qiáng)的擴(kuò)展能力和靈活性。
- OpenAPI:允許開發(fā)者通過(guò)接口來(lái)完成手工的操作,讓系統(tǒng)整體變得更加靈活。開發(fā)者可以基于 API 便捷地?cái)U(kuò)展 PowerJob 原有的功能。
定時(shí)任務(wù)類型
- API:該任務(wù)只會(huì)由 powerjob-client 中提供的 OpenAPI 接口觸發(fā),server 不會(huì)主動(dòng)調(diào)度。
- CRON:該任務(wù)的調(diào)度時(shí)間由 CRON 表達(dá)式指定。
- 固定頻率:秒級(jí)任務(wù),每隔多少毫秒運(yùn)行一次,功能與 java.util.concurrent.ScheduledExecutorService#scheduleAtFixedRate 相同。
- 固定延遲:秒級(jí)任務(wù),延遲多少毫秒運(yùn)行一次,功能與 java.util.concurrent.ScheduledExecutorService#scheduleWithFixedDelay 相同。
- 工作流:該任務(wù)只會(huì)由其所屬的工作流調(diào)度執(zhí)行,server 不會(huì)主動(dòng)調(diào)度該任務(wù)。如果該任務(wù)不屬于任何一個(gè)工作流,該任務(wù)就不會(huì)被調(diào)度。
備注:固定延遲和固定頻率任務(wù)統(tǒng)稱秒級(jí)任務(wù),這兩種任務(wù)無(wú)法被停止,只有任務(wù)被關(guān)閉或刪除時(shí)才能真正停止任務(wù)。
搭建PowerJob環(huán)境
本地啟動(dòng)
初始化項(xiàng)目
git clone https://Github.com/PowerJob/PowerJob.git
導(dǎo)入 IDE,源碼結(jié)構(gòu)如下,我們需要啟動(dòng)調(diào)度服務(wù)器(powerjob-server),同時(shí)在 samples 工程中編寫自己的處理器代碼
啟動(dòng)調(diào)度服務(wù)器
- 創(chuàng)建數(shù)據(jù)庫(kù)(僅需要?jiǎng)?chuàng)建數(shù)據(jù)庫(kù)):找到你的 DB,運(yùn)行 SQL CREATE DATABASE IF NOT EXISTS `powerjob-dAIly` DEFAULT CHARSET utf8mb4,搞定~
- 修改配置文件:配置文件的說(shuō)明官方文檔寫的非常詳細(xì),此處不再贅述。需要修改的地方為數(shù)據(jù)庫(kù)配置spring.datasource.core.jdbc-url、spring.datasource.core.username和spring.datasource.core.password,當(dāng)然,有 mongoDB 的同學(xué)也可以修改spring.data.mongodb.uri以獲取完全版體驗(yàn)。
powerjob-server 日常環(huán)境配置文件:application-daily.properties
oms.env=DAILY
logging.cnotallow=classpath:logback-dev.xml
####### 外部數(shù)據(jù)庫(kù)配置(需要用戶更改為自己的數(shù)據(jù)庫(kù)配置) #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimeznotallow=Asia/Shanghai
spring.datasource.core.username=root
spring.datasource.core.password=No1Bug2Please3!
spring.datasource.core.hikari.maximum-pool-size=20
spring.datasource.core.hikari.minimum-idle=5
####### mongoDB配置,非核心依賴,通過(guò)配置 oms.mongodb.enable=false 來(lái)關(guān)閉 #######
oms.mongodb.enable=true
spring.data.mongodb.uri=mongodb://localhost:27017/powerjob-daily
####### 郵件配置(不需要郵件報(bào)警可以刪除以下配置來(lái)避免報(bào)錯(cuò)) #######
spring.mail.host=smtp.163.com
[email protected]
spring.mail.password=GOFZPNARMVKCGONV
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
####### 資源清理配置 #######
oms.instanceinfo.retentinotallow=1
oms.container.retention.local=1
oms.container.retention.remote=-1
####### 緩存配置 #######
oms.instance.metadata.cache.size=1024
- 啟動(dòng)應(yīng)用:完成配置文件的修改后,可以直接通過(guò)啟動(dòng)類 tech.powerjob.server.PowerJobServerApplication 啟動(dòng)調(diào)度服務(wù)器(注意:需要使用 daily 配置文件啟動(dòng),可自行百度搜索“SpringBoot 指定配置文件啟動(dòng)”),觀察啟動(dòng)日志,查看是否啟動(dòng)成功~啟動(dòng)成功后,訪問(wèn) http://127.0.0.1:7700/ ,如果能順利出現(xiàn) Web 界面,則說(shuō)明調(diào)度服務(wù)器啟動(dòng)成功!
- 注冊(cè)應(yīng)用:點(diǎn)擊主頁(yè)應(yīng)用注冊(cè)按鈕,填入 powerjob-agent-test 和控制臺(tái)密碼(用于進(jìn)入控制臺(tái)),注冊(cè)示例應(yīng)用(當(dāng)然你也可以注冊(cè)其他的 appName,只是別忘記在示例程序中同步修改~)
圖片
Docker-compose啟動(dòng)
環(huán)境要求
本地需要安裝docker和docker-compose
下載代碼
git clone --depth=1 https://github.com/PowerJob/PowerJob.git
運(yùn)行
cd PowerJob
docker-compose up
docker-compose up -d
剛開始啟動(dòng)時(shí),powerjob-worker-samples會(huì)啟動(dòng)失敗,等powerjob-server啟動(dòng)成功后,powerjob-worker-samples才會(huì)啟動(dòng)成功。這大概需要幾分鐘。
運(yùn)行成功后,瀏覽器訪問(wèn) http://127.0.0.1:7700/
應(yīng)用名稱:powerjob-worker-samples
密碼:powerjob123
停止
docker-compose down
Stopping powerjob-worker-samples ... done
Stopping powerjob-server ... done
Stopping powerjob-mysql ... done
Removing powerjob-worker-samples ... done
Removing powerjob-server ... done
Removing powerjob-mysql ... done
cd PowerJob
rm -rf powerjob-data
SpringBoot集成PowerJob
添加相關(guān)maven依賴
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>${latest.powerjob.version}</version>
</dependency>
配置文件配置
powerjob:
worker:
# akka 工作端口,可選,默認(rèn) 27777
akka-port: 27777
# 接入應(yīng)用名稱,用于分組隔離,推薦填寫 本 Java 項(xiàng)目名稱
app-name: ${spring.application.name}
# 調(diào)度服務(wù)器地址,IP:Port 或 域名,多值逗號(hào)分隔
server-address: 81.70.117.188:7700
# 持久化方式,可選,默認(rèn) disk
store-strategy: disk
# 任務(wù)返回結(jié)果信息的最大長(zhǎng)度,超過(guò)這個(gè)長(zhǎng)度的信息會(huì)被截?cái)啵J(rèn)值 8192
max-result-length: 8192
# 單個(gè)任務(wù)追加的工作流上下文最大長(zhǎng)度,超過(guò)這個(gè)長(zhǎng)度的會(huì)被直接丟棄,默認(rèn)值 8192
max-appended-wf-context-length: 8192
處理器(Processor)開發(fā)
處理器概述
基本概念
PowerJob 支持 Python、Shell、HTTP、SQL 等眾多通用任務(wù)的處理,開發(fā)者只需要引入依賴,在控制臺(tái)配置好相關(guān)參數(shù)即可,關(guān)于這部分詳見(jiàn) 官方處理器 ,此處不再贅述。本章將重點(diǎn)闡述 Java 處理器開發(fā)方法與使用技巧。
- Java 處理器可根據(jù)代碼所處位置劃分為內(nèi)置 Java 處理器和外置 Java 處理器,前者直接集成在宿主應(yīng)用(也就是接入本系統(tǒng)的業(yè)務(wù)應(yīng)用)中,一般用來(lái)處理業(yè)務(wù)需求;后者可以在一個(gè)獨(dú)立的輕量級(jí)的 Java 工程中開發(fā),通過(guò) JVM 容器技術(shù)(詳見(jiàn)容器章節(jié))被 worker 集群熱加載,提供 Java 的“腳本能力”,一般用于處理靈活多變的需求。
- Java 處理器可根據(jù)對(duì)象創(chuàng)建者劃分為 SpringBean 處理器和普通 Java 對(duì)象處理器,前者由 Spring IOC 容器完成處理器的創(chuàng)建和初始化,后者則由 PowerJob 維護(hù)其生命周期。如果宿主應(yīng)用支持 Spring,強(qiáng)烈建議使用 SpringBean 處理器,開發(fā)者僅需要將 Processor 注冊(cè)進(jìn) Spring IOC 容器(一個(gè) @Component 注解或一句 bean 配置)即可享受 Spring 帶來(lái)的便捷之處。
- Java處理器可根據(jù)功能劃分為單機(jī)處理器、廣播處理器、Map 處理器和 MapReduce 處理器。
單機(jī)處理器(BasicProcessor)對(duì)應(yīng)了單機(jī)任務(wù),即某個(gè)任務(wù)的某次運(yùn)行只會(huì)有某一臺(tái)機(jī)器的某一個(gè)線程參與運(yùn)算。
廣播處理器(BroadcastProcessor)對(duì)應(yīng)了廣播任務(wù),即某個(gè)任務(wù)的某次運(yùn)行會(huì)調(diào)動(dòng)集群內(nèi)所有機(jī)器參與運(yùn)算。
Map處理器(MapProcessor)對(duì)應(yīng)了Map任務(wù),即某個(gè)任務(wù)在運(yùn)行過(guò)程中,允許產(chǎn)生子任務(wù)并分發(fā)到其他機(jī)器進(jìn)行運(yùn)算。
MapReduce 處理器(MapReduceProcessor)對(duì)應(yīng)了 MapReduce 任務(wù),在 Map 任務(wù)的基礎(chǔ)上,增加了所有任務(wù)結(jié)束后的匯總統(tǒng)計(jì)。
入?yún)?TaskContext
TaskContext 包含了本次任務(wù)的上下文信息,具體信息如下
屬性列表(紅色標(biāo)注的為常用屬性) |
|
屬性名稱 |
意義/用法 |
jobId |
任務(wù) ID,開發(fā)者一般無(wú)需關(guān)心此參數(shù) |
instanceId |
任務(wù)實(shí)例 ID,全局唯一,開發(fā)者一般無(wú)需關(guān)心此參數(shù) |
subInstanceId |
子任務(wù)實(shí)例 ID,秒級(jí)任務(wù)使用,開發(fā)者一般無(wú)需關(guān)心此參數(shù) |
taskId |
采用鏈?zhǔn)矫ǖ?ID,在某個(gè)任務(wù)實(shí)例內(nèi)唯一,開發(fā)者一般無(wú)需關(guān)心此參數(shù) |
taskName |
task 名稱,Map/MapReduce 任務(wù)的子任務(wù)的值為開發(fā)者指定,否則為系統(tǒng)默認(rèn)值,開發(fā)者一般無(wú)需關(guān)心此參數(shù) |
jobParams |
任務(wù)參數(shù) 對(duì)于非工作流中的任務(wù)其值等同于控制臺(tái)錄入的任務(wù)參數(shù);如果該任務(wù)為工作流中的任務(wù)且有配置節(jié)點(diǎn)參數(shù)信息,那么接收到的是節(jié)點(diǎn)配置的參數(shù)信息 |
instanceParams |
任務(wù)實(shí)例參數(shù) 對(duì)于非工作流中的任務(wù) 其值 等同于 OpenAPI 傳遞的實(shí)例參數(shù),非 OpenAPI 觸發(fā)的任務(wù)則一定為空。如果該任務(wù)為工作流中的任務(wù)那么這里實(shí)際接收到的是工作流上下文信息,建議使用 getWorkflowContext 方法獲取上下文信息
|
maxRetryTimes |
Task 的最大重試次數(shù) |
currentRetryTimes |
Task 的當(dāng)前重試次數(shù),和 maxRetryTimes 聯(lián)合起來(lái)可以判斷當(dāng)前是否為該 Task 的最后一次運(yùn)行機(jī)會(huì) |
subTask |
子 Task,Map/MapReduce 處理器專屬,開發(fā)者調(diào)用map方法時(shí)傳遞的子任務(wù)列表中的某一個(gè) |
omsLogger |
在線日志,用法同 Slf4J,記錄的日志可以直接通過(guò)控制臺(tái)查看,非常便捷和強(qiáng)大!不過(guò)使用過(guò)程中需要注意頻率,濫用在線日志會(huì)對(duì) Server 造成巨大的壓力 |
userContext |
用戶在 PowerJobWorkerConfig 中設(shè)置的自定義上下文 |
workflowContext |
工作流上下文,更多信息見(jiàn)下方說(shuō)明 |
工作流上下文( WorkflowContext )
該屬性是 v4.0.0 版本的重大變更之一,移除了原來(lái)的參數(shù)傳遞機(jī)制,提供了 API 讓開發(fā)者可以更加靈活便捷地在工作流中實(shí)現(xiàn)信息的傳遞。
屬性列表 |
|
屬性名稱 |
意義/用法 |
wfInstanceId |
工作流實(shí)例 ID |
data |
工作流上下文數(shù)據(jù),鍵值對(duì) |
appendedContextData |
當(dāng)前任務(wù)向工作流上下文中追加的數(shù)據(jù)。在任務(wù)執(zhí)行完成后 ProcessorTracker 會(huì)將其上報(bào)給 TaskTracker,TaskTracker 在當(dāng)前任務(wù)執(zhí)行完成后會(huì)將這個(gè)信息上報(bào)給 server ,追加到當(dāng)前的工作流上下文中,供下游任務(wù)消費(fèi) |
上游任務(wù)通過(guò) WorkflowContext#appendData2WfContext(String key,Object value) 方法向工作流上下文中追加數(shù)據(jù),下游任務(wù)便可以通過(guò) WorkflowContext#fetchWorkflowContext() 方法獲取到相應(yīng)的數(shù)據(jù)進(jìn)行消費(fèi)。注意,當(dāng)追加的上下文信息的 key 已經(jīng)存在于當(dāng)前的上下文中時(shí),新的 value 會(huì)覆蓋之前的值。另外,每次任務(wù)實(shí)例追加的上下文數(shù)據(jù)大小也會(huì)受到 worker 的配置項(xiàng) powerjob.worker.max-appended-wf-context-length 的限制,超過(guò)這個(gè)長(zhǎng)度的會(huì)被直接丟棄。
返回值 ProcessResult
方法的返回值為 ProcessResult,代表了本次 Task 執(zhí)行的結(jié)果,包含 success 和 msg 兩個(gè)屬性,分別用于傳遞 Task 是否執(zhí)行成功和 Task 需要返回的信息。
處理器開發(fā)示例
單機(jī)處理器:BasicProcessor
單機(jī)執(zhí)行的策略下,server 會(huì)在所有可用 worker 中選取健康度最佳的機(jī)器進(jìn)行執(zhí)行。單機(jī)執(zhí)行任務(wù)需要實(shí)現(xiàn)接口 BasicProcessor,代碼示例如下:
/**
* @Author iron.guo
* @Date 2023/1/7
* @Description
*/
@Component
@Slf4j
public class StandaloneProcessor implements BasicProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("處理器啟動(dòng)成功,context 是 {}.", context);
log.info("單機(jī)處理器正在處理");
log.info(context.getJobParams());
omsLogger.info("處理器執(zhí)行結(jié)束");
boolean success = true;
return new ProcessResult(success, context + ": " + success);
}
}
執(zhí)行結(jié)果
廣播處理器:BroadcastProcessor
廣播執(zhí)行的策略下,所有機(jī)器都會(huì)被調(diào)度執(zhí)行該任務(wù)。為了便于資源的準(zhǔn)備和釋放,廣播處理器在BasicProcessor 的基礎(chǔ)上額外增加了 preProcess 和 postProcess 方法,分別在整個(gè)集群開始之前/結(jié)束之后選一臺(tái)機(jī)器執(zhí)行相關(guān)方法。代碼示例如下:
@Slf4j
@Component
public class BroadcastProcessorDemo implements BroadcastProcessor {
@Override
public ProcessResult preProcess(TaskContext context) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("預(yù)執(zhí)行,會(huì)在所有 worker 執(zhí)行 process 方法前調(diào)用");
log.info("預(yù)執(zhí)行,會(huì)在所有 worker 執(zhí)行 process 方法前調(diào)用");
// 預(yù)執(zhí)行,會(huì)在所有 worker 執(zhí)行 process 方法前調(diào)用
return new ProcessResult(true, "init success");
}
@Override
public ProcessResult process(TaskContext context) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
// 撰寫整個(gè)worker集群都會(huì)執(zhí)行的代碼邏輯
omsLogger.info("撰寫整個(gè)worker集群都會(huì)執(zhí)行的代碼邏輯");
log.info("撰寫整個(gè)worker集群都會(huì)執(zhí)行的代碼邏輯");
return new ProcessResult(true, "release resource success");
}
@Override
public ProcessResult postProcess(TaskContext context, List<TaskResult> taskResults) throws Exception {
// taskResults 存儲(chǔ)了所有worker執(zhí)行的結(jié)果(包括preProcess)
// 收尾,會(huì)在所有 worker 執(zhí)行完畢 process 方法后調(diào)用,該結(jié)果將作為最終的執(zhí)行結(jié)果
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("收尾,會(huì)在所有 worker 執(zhí)行完畢 process 方法后調(diào)用,該結(jié)果將作為最終的執(zhí)行結(jié)果");
log.info("收尾,會(huì)在所有 worker 執(zhí)行完畢 process 方法后調(diào)用,該結(jié)果將作為最終的執(zhí)行結(jié)果");
return new ProcessResult(true, "process success");
}
}
執(zhí)行結(jié)果
并行處理器:MapReduceProcessor
MapReduce 是最復(fù)雜也是最強(qiáng)大的一種執(zhí)行器,它允許開發(fā)者完成任務(wù)的拆分,將子任務(wù)派發(fā)到集群中其他Worker 執(zhí)行,是執(zhí)行大批量處理任務(wù)的不二之選!實(shí)現(xiàn) MapReduce 處理器需要繼承 MapReduceProcessor類,具體用法如下示例代碼所示:
@Slf4j
@Component
public class MapReduceProcessorDemo implements MapReduceProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
final OmsLogger omsLogger = context.getOmsLogger();
// 判斷是否為根任務(wù)
if (isRootTask()) {
// 構(gòu)造子任務(wù)
List<SubTask> subTaskList = Lists.newLinkedList();
SubTask subTask=new SubTask();
subTask.setSiteId(1L);
subTask.setName("iron.guo");
subTaskList.add(subTask);
/*
* 子任務(wù)的構(gòu)造由開發(fā)者自己定義
* eg. 現(xiàn)在需要從文件中讀取100W個(gè)ID,并處理數(shù)據(jù)庫(kù)中這些ID對(duì)應(yīng)的數(shù)據(jù),那么步驟如下:
* 1. 根任務(wù)(RootTask)讀取文件,流式拉取100W個(gè)ID,并按1000個(gè)一批的大小組裝成子任務(wù)進(jìn)行派發(fā)
* 2. 非根任務(wù)獲取子任務(wù),完成業(yè)務(wù)邏輯的處理
*/
// 調(diào)用 map 方法,派發(fā)子任務(wù)(map 可能會(huì)失敗并拋出異常,做好業(yè)務(wù)操作)
map(subTaskList, "DATA_PROCESS_TASK");
omsLogger.info("執(zhí)行根任務(wù)-派發(fā)子任務(wù)");
return new ProcessResult(true, "ROOT_PROCESS_SUCCESS");
}
// 非子任務(wù),可根據(jù) subTask 的類型 或 TaskName 來(lái)判斷分支
if (context.getSubTask() instanceof SubTask) {
omsLogger.info("執(zhí)行子任務(wù)開始");
omsLogger.info("Get from SubTask : name is {} and id is {}",((SubTask) context.getSubTask()).getName(),((SubTask) context.getSubTask()).getSiteId());
// 執(zhí)行子任務(wù),注:子任務(wù)人可以 map 產(chǎn)生新的子任務(wù),可以構(gòu)建任意級(jí)的 MapReduce 處理器
return new ProcessResult(true, "PROCESS_SUB_TASK_SUCCESS");
}
return new ProcessResult(false, "UNKNOWN_BUG");
}
@Override
public ProcessResult reduce(TaskContext taskContext, List<TaskResult> taskResults) {
// 所有 Task 執(zhí)行結(jié)束后,reduce 將會(huì)被執(zhí)行
// taskResults 保存了所有子任務(wù)的執(zhí)行結(jié)果
// 用法舉例,統(tǒng)計(jì)執(zhí)行結(jié)果
AtomicLong successCnt = new AtomicLong(0);
taskResults.forEach(tr -> {
if (tr.isSuccess()) {
successCnt.incrementAndGet();
}
});
// 該結(jié)果將作為任務(wù)最終的執(zhí)行結(jié)果
return new ProcessResult(true, "success task num:" + successCnt.get());
}
// 自定義的子任務(wù)
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
private static class SubTask {
private Long siteId;
private String name;
}
}
執(zhí)行結(jié)果
注:Map 處理器相當(dāng)于 MapReduce 處理器的閹割版本(閹割了 reduce 方法),此處不再單獨(dú)舉例。
工作流
點(diǎn)擊右上角按鈕 新建工作流,即可錄入新的工作流,具體界面和說(shuō)明如下所示。
- 工作流名稱:名稱,無(wú)實(shí)際業(yè)務(wù)用途,請(qǐng)盡量精簡(jiǎn)字段
- 工作流描述:描述,無(wú)實(shí)際業(yè)務(wù)用途,請(qǐng)盡量精簡(jiǎn)字段
- 定時(shí)信息:該工作流的觸發(fā)方式的觸發(fā)方式,包含時(shí)間表達(dá)式類型選擇框和時(shí)間表達(dá)式輸入框
CRON -> 填寫 CRON 表達(dá)式(在線生成網(wǎng)站)
API -> 不需要填寫任何參數(shù),表明該任務(wù)由 OpenAPI 觸發(fā)
- 生命周期:定時(shí)策略生效的時(shí)間段
- 最大實(shí)例:該工作流同時(shí)執(zhí)行的數(shù)量
- 任務(wù)依賴關(guān)系:提供編輯界面可視化操作,繪制 DAG(有向無(wú)環(huán)圖),配置工作流內(nèi)各個(gè)任務(wù)的依賴關(guān)系
DAG 操作指南
編輯依賴關(guān)系
v4.0.0 以后支持節(jié)點(diǎn)的自由拖拉拽,不用再點(diǎn)點(diǎn)點(diǎn)了,哈哈哈 ~
- 添加節(jié)點(diǎn):點(diǎn)擊 DAG 編輯框左上方的 “導(dǎo)入任務(wù)”,導(dǎo)入當(dāng)前存在的任務(wù)(需要提前在 任務(wù)管理界面 錄入任務(wù)),生成 DAG 的節(jié)點(diǎn)
- 連接節(jié)點(diǎn):點(diǎn)擊起始節(jié)點(diǎn)的任意一個(gè)錨點(diǎn)摁住不放,拖動(dòng)鼠標(biāo)連接到另一個(gè)節(jié)點(diǎn)的任意一個(gè)錨點(diǎn)即可
- 刪除節(jié)點(diǎn):選中需要?jiǎng)h除的節(jié)點(diǎn),按退格鍵( 注意:windows 下使用退格鍵 [Backspace],macOS 下使用刪除鍵 [delete] )
- 刪除邊:選中需要?jiǎng)h除的邊,按退格鍵( 注意:windows 下使用退格鍵 [Backspace],macOS 下使用刪除鍵 [delete] )
導(dǎo)入任務(wù)節(jié)點(diǎn)
任務(wù)為之前創(chuàng)建的任務(wù),可用工作流形式串聯(lián)起來(lái)執(zhí)行。
編輯節(jié)點(diǎn)信息
點(diǎn)擊需要編輯的節(jié)點(diǎn),在右側(cè)會(huì)彈出一個(gè)編輯框,如下圖所示
- 任務(wù)名稱:當(dāng)前節(jié)點(diǎn)引用的任務(wù)名稱,點(diǎn)擊可編輯(支持輸入名稱進(jìn)行模糊搜索)
- 節(jié)點(diǎn)名稱:節(jié)點(diǎn)的名稱,無(wú)實(shí)際業(yè)務(wù)用途,在能明確表示節(jié)點(diǎn)背后的業(yè)務(wù)邏輯的情況下請(qǐng)盡量精簡(jiǎn)字段
- 節(jié)點(diǎn)參數(shù):節(jié)點(diǎn)的參數(shù)配置,當(dāng)這個(gè)信息不為空的時(shí)候使用這個(gè)參數(shù)覆蓋當(dāng)前節(jié)點(diǎn)所引用的任務(wù)所配置的參數(shù)信息
- 是否啟用:未啟用的節(jié)點(diǎn)將會(huì)直接跳過(guò)
- 失敗跳過(guò):當(dāng)這個(gè)節(jié)點(diǎn)執(zhí)行失敗的時(shí)候不會(huì)打斷整個(gè)工作流的執(zhí)行
特殊節(jié)點(diǎn)說(shuō)明
判斷節(jié)點(diǎn)
判斷節(jié)點(diǎn) 不允許失敗跳過(guò)以及禁用,節(jié)點(diǎn)參數(shù)中存儲(chǔ)的是 Groovy 代碼(執(zhí)行 Groovy 代碼時(shí)會(huì)將當(dāng)前工作流上下文作為 context 變量注入到代碼執(zhí)行的上下文中),其執(zhí)行結(jié)果僅能返回 "true" 或者 "false",同時(shí)判斷節(jié)點(diǎn)僅有且必須有兩條“輸出”路徑。會(huì)根據(jù)該代碼的執(zhí)行結(jié)果決定下游需要執(zhí)行的節(jié)點(diǎn)。這里處理的原則是, 僅 cancel 那些只能通過(guò)被 disable 掉的邊可達(dá)的節(jié)點(diǎn)
舉個(gè)兩個(gè)栗子,灰色代表相應(yīng)的邊 或者 節(jié)點(diǎn)被 disable 或 cancel,菱形代表判斷節(jié)點(diǎn),假定執(zhí)行結(jié)果為 true
case 3 以及 case 4 中的節(jié)點(diǎn) 3 都會(huì)被 cancel ,因?yàn)樗荒芡ㄟ^(guò)節(jié)點(diǎn) 1 -> 節(jié)點(diǎn) 3 的邊可達(dá)(該邊的屬性為 false),但對(duì)于節(jié)點(diǎn) 5 而言,在 case 4 中因?yàn)榕袛喙?jié)點(diǎn) 2 的執(zhí)行結(jié)果為 true ,那么其可以通過(guò)節(jié)點(diǎn) 2 -> 節(jié)點(diǎn) 5 的邊可達(dá),所以不會(huì)被 disable 。
備注:如果需要根據(jù)上游節(jié)點(diǎn)的執(zhí)行結(jié)果決定下游節(jié)點(diǎn),可以將上游節(jié)點(diǎn)的執(zhí)行結(jié)果注入上下文中,再在判斷節(jié)點(diǎn)中做相應(yīng)的判斷。
工作流嵌套節(jié)點(diǎn)
該節(jié)點(diǎn)代表對(duì)某個(gè)工作流的引用,節(jié)點(diǎn)的 jobId 屬性存儲(chǔ)的是工作流 id,其他屬性和普通的任務(wù)節(jié)點(diǎn)一致。不允許出現(xiàn)循環(huán)引用以及多級(jí)嵌套的情況,即嵌套節(jié)點(diǎn)中指向的工作流一定是一個(gè)不含嵌套節(jié)點(diǎn)的工作流。
執(zhí)行到該節(jié)點(diǎn)時(shí),如果該節(jié)點(diǎn)處于啟用狀態(tài),那么將啟動(dòng)該節(jié)點(diǎn)所引用工作流的一個(gè)新實(shí)例,待該實(shí)例執(zhí)行完成后再同步更新該節(jié)點(diǎn)的狀態(tài)。
注意,創(chuàng)建子工作流時(shí),會(huì)透?jìng)鳟?dāng)前的上下文作為工作流的實(shí)例參數(shù),在子工作流執(zhí)行完成時(shí)會(huì)合并子工作流的上下文至父工作流的上下文中。
重試子工作流不會(huì)聯(lián)動(dòng)重試父工作流,但失敗的子工作流會(huì)隨著父工作流的重試而原地重試(不會(huì)生成新的實(shí)例)