LTS(light-task-scheduler)主要用於解決分散式任務調度問題,支援即時任務,定時任務和Cron任務。有較好的伸縮性,擴展性,健壯穩定性而被多家公司使用,同時也希望開源愛好者一起貢獻。
github網址: https://github.com/ltsopensource/light-task-scheduler
oschina網址: http://git.oschina.net/hugui/light-task-scheduler
範例: https://github.com/ltsopensource/lts-examples
文件地址(正在更新中,後面以這個為準): https://www.gitbook.com/book/qq254963746/lts/details
這兩個位址都會同步更新。有興趣,請加QQ群:109500214 (加群密碼: hello world)一起探討、完善。越多人支持,就越有動力更新,喜歡記得右上角star哈。
LTS 有主要有以下四種節點:
其中JobClient,JobTracker,TaskTracker節點都是无状态
的。 可以部署多個並動態的進行刪減,來實現負載平衡,實現更大的負載量, 並且框架採用FailStore策略使LTS具有很好的容錯能力。
LTS註冊中心提供多種實現(Zookeeper,redis等),註冊中心進行節點資訊暴露,master選舉。 (Mongo or Mysql)儲存任務佇列和任務執行日誌, netty or mina做底層通訊, 並提供多種序列化方式fastjson, hessian2, java等。
LTS支援任務類型:
支援動態修改任務參數,任務執行時間等設定,支援後台動態新增任務,支援Cron任務暫停,支援手動停止正在執行的任務(有條件),支援任務的監控統計,支援各節點的任務執行監控,JVM監控等等.
下圖是一個標準的即時任務執行流程。
目前後台帶有由ztajy提供的一個簡易的認證功能. 使用者名稱密碼在auth.cfg中,使用者自行修改.
LTS可以完全不用Spring框架,但考慮到很用用戶專案中都是用了Spring框架,所以LTS也提供了對Spring的支持,包括Xml和註解,引入lts-spring.jar
即可。
在TaskTracker端提供了業務日誌記錄器,供應用程式使用,透過這個業務日誌器,可以將業務日誌提交到JobTracker,這些業務日誌可以透過任務ID串聯起來,可以在LTS-Admin中即時查看任務的執行進度。
SPI擴充可以達到零侵入,只需要實作對應的接口,並實作即可被LTS使用,目前開放出來的擴充介面有
當正在執行任務的TaskTracker宕機之後,JobTracker會立刻將指派在宕機的TaskTracker的所有任務再指派給其他正常的TaskTracker節點執行。
可以對JobTracker,TaskTracker節點進行資源監控,任務監控等,可以即時的在LTS-Admin管理後台查看,進而進行合理的資源調配。
LTS框架提供四種執行結果支持, EXECUTE_SUCCESS
, EXECUTE_FAILED
, EXECUTE_LATER
, EXECUTE_EXCEPTION
,並對每種結果採取相應的處理機制,譬如重試。
採用FailStore機制來進行節點容錯,Fail And Store,不會因為遠端通訊的不穩定性而影響目前應用的運作。具體FailStore說明,請參考概念說明中的FailStore說明。
專案主要採用maven進行構建,目前提供shell腳本的打包。 環境依賴: Java(jdk1.6+)
Maven
使用者使用一般分為兩種:
可以透過maven指令將lts的jar包上傳到本地倉庫。在父pom.xml中加入對應的repository,並用deploy指令上傳即可。具體引用方式可以參考lts中的例子即可。
需要將lts的各個模組打包成單獨的jar包,並且將所有lts依賴包引入。具體引用哪些jar包可以參考lts中的例子即可。
提供(cmd)windows
和(shell)linux
兩種版本腳本來進行編譯和部署:
執行根目錄下的sh build.sh
或build.cmd
腳本,會在dist
目錄下產生lts-{version}-bin
資料夾
下面是其目錄結構,其中bin目錄主要是JobTracker和LTS-Admin的啟動腳本。 jobtracker
中是JobTracker的設定檔和需要使用到的jar包, lts-admin
是LTS-Admin相關的war包和設定檔。 lts-{version}-bin的文件結構
-- lts - $ { version }- bin
|-- bin
| |-- jobtracker . cmd
| |-- jobtracker . sh
| |-- lts - admin . cmd
| |-- lts - admin . sh
| |-- lts - monitor . cmd
| |-- lts - monitor . sh
| |-- tasktracker . sh
|-- conf
| |-- log4j . properties
| |-- lts - admin . cfg
| |-- lts - monitor . cfg
| |-- readme . txt
| |-- tasktracker . cfg
| |-- zoo
| |-- jobtracker . cfg
| |-- log4j . properties
| |-- lts - monitor . cfg
|-- lib
| |-- *. jar
|-- war
|-- jetty
| |-- lib
| |-- *. jar
|-- lts - admin . war
conf/zoo
下的設定文件,然後執行sh jobtracker.sh zoo start
即可,如果你想啟動兩個JobTracker節點,那麼你需要拷貝一份zoo,譬如命名為zoo2
,修改下zoo2
下的設定文件,然後執行sh jobtracker.sh zoo2 start
即可。 logs資料夾下產生jobtracker-zoo.out
日誌。conf/lts-monitor.cfg
和conf/lts-admin.cfg
下的配置,然後執行bin
下的sh lts-admin.sh
或lts-admin.cmd
腳本即可。 logs資料夾下會產生lts-admin.out
日誌,啟動成功在日誌中會列印出存取位址,使用者可以透過這個存取位址存取了。 需要引入lts的jar包有lts-jobclient-{version}.jar
, lts-core-{version}.jar
及其它第三方依賴jar。
JobClient jobClient = new RetryJobClient ();
jobClient . setNodeGroup ( "test_jobClient" );
jobClient . setClusterName ( "test_cluster" );
jobClient . setRegistryAddress ( "zookeeper://127.0.0.1:2181" );
jobClient . start ();
// 提交任务
Job job = new Job ();
job . setTaskId ( "3213213123" );
job . setParam ( "shopId" , "11111" );
job . setTaskTrackerNodeGroup ( "test_trade_TaskTracker" );
// job.setCronExpression("0 0/1 * * * ?"); // 支持 cronExpression表达式
// job.setTriggerTime(new Date()); // 支持指定时间执行
Response response = jobClient . submitJob ( job );
< bean id = "jobClient" class = "com.github.ltsopensource.spring.JobClientFactoryBean" >
< property name = "clusterName" value = "test_cluster" />
< property name = "registryAddress" value = "zookeeper://127.0.0.1:2181" />
< property name = "nodeGroup" value = "test_jobClient" />
< property name = "masterChangeListeners" >
< list >
< bean class = "com.github.ltsopensource.example.support.MasterChangeListenerImpl" />
</ list >
</ property >
< property name = "jobFinishedHandler" >
< bean class = "com.github.ltsopensource.example.support.JobFinishedHandlerImpl" />
</ property >
< property name = "configs" >
< props >
<!-- 参数 -->
< prop key = "job.fail.store" > leveldb </ prop >
</ props >
</ property >
</ bean >
@ Configuration
public class LTSSpringConfig {
@ Bean ( name = "jobClient" )
public JobClient getJobClient () throws Exception {
JobClientFactoryBean factoryBean = new JobClientFactoryBean ();
factoryBean . setClusterName ( "test_cluster" );
factoryBean . setRegistryAddress ( "zookeeper://127.0.0.1:2181" );
factoryBean . setNodeGroup ( "test_jobClient" );
factoryBean . setMasterChangeListeners ( new MasterChangeListener []{
new MasterChangeListenerImpl ()
});
Properties configs = new Properties ();
configs . setProperty ( "job.fail.store" , "leveldb" );
factoryBean . setConfigs ( configs );
factoryBean . afterPropertiesSet ();
return factoryBean . getObject ();
}
}
需要引入lts的jar包有lts-tasktracker-{version}.jar
, lts-core-{version}.jar
及其它第三方依賴jar。
public class MyJobRunner implements JobRunner {
@ Override
public Result run ( JobContext jobContext ) throws Throwable {
try {
// TODO 业务逻辑
// 会发送到 LTS (JobTracker上)
jobContext . getBizLogger (). info ( "测试,业务日志啊啊啊啊啊" );
} catch ( Exception e ) {
return new Result ( Action . EXECUTE_FAILED , e . getMessage ());
}
return new Result ( Action . EXECUTE_SUCCESS , "执行成功了,哈哈" );
}
}
TaskTracker taskTracker = new TaskTracker ();
taskTracker . setJobRunnerClass ( MyJobRunner . class );
taskTracker . setRegistryAddress ( "zookeeper://127.0.0.1:2181" );
taskTracker . setNodeGroup ( "test_trade_TaskTracker" );
taskTracker . setClusterName ( "test_cluster" );
taskTracker . setWorkThreads ( 20 );
taskTracker . start ();
< bean id = "taskTracker" class = "com.github.ltsopensource.spring.TaskTrackerAnnotationFactoryBean" init - method = "start" >
< property name = "jobRunnerClass" value = "com.github.ltsopensource.example.support.MyJobRunner" />
< property name = "bizLoggerLevel" value = "INFO" />
< property name = "clusterName" value = "test_cluster" />
< property name = "registryAddress" value = "zookeeper://127.0.0.1:2181" />
< property name = "nodeGroup" value = "test_trade_TaskTracker" />
< property name = "workThreads" value = "20" />
< property name = "masterChangeListeners" >
< list >
< bean class = "com.github.ltsopensource.example.support.MasterChangeListenerImpl" />
</ list >
</ property >
< property name = "configs" >
< props >
< prop key = "job.fail.store" > leveldb </ prop >
</ props >
</ property >
</ bean >
@ Configuration
public class LTSSpringConfig implements ApplicationContextAware {
private ApplicationContext applicationContext ;
@ Override
public void setApplicationContext ( ApplicationContext applicationContext ) throws BeansException {
this . applicationContext = applicationContext ;
}
@ Bean ( name = "taskTracker" )
public TaskTracker getTaskTracker () throws Exception {
TaskTrackerAnnotationFactoryBean factoryBean = new TaskTrackerAnnotationFactoryBean ();
factoryBean . setApplicationContext ( applicationContext );
factoryBean . setClusterName ( "test_cluster" );
factoryBean . setJobRunnerClass ( MyJobRunner . class );
factoryBean . setNodeGroup ( "test_trade_TaskTracker" );
factoryBean . setBizLoggerLevel ( "INFO" );
factoryBean . setRegistryAddress ( "zookeeper://127.0.0.1:2181" );
factoryBean . setMasterChangeListeners ( new MasterChangeListener []{
new MasterChangeListenerImpl ()
});
factoryBean . setWorkThreads ( 20 );
Properties configs = new Properties ();
configs . setProperty ( "job.fail.store" , "leveldb" );
factoryBean . setConfigs ( configs );
factoryBean . afterPropertiesSet ();
// factoryBean.start();
return factoryBean . getObject ();
}
}
參數說明
一般在一個JVM中只需要一個JobClient實例即可,不要為每個任務都新建一個JobClient實例,這樣會大大的浪費資源,因為一個JobClient可以提交多種任務。相同的一個JVM一般也盡量保持只有一個TaskTracker實例即可,多了就可能造成資源浪費。當遇到一個TaskTracker要執行多種任務的時候,請參考下面的"一個TaskTracker執行多種任務"。
有的時候,業務場景需要執行多種任務,有些人會問,是不是要每個任務類型都要一個TaskTracker去執行。我的答案是否定的,如果在一個JVM中,最好使用一個TaskTracker去運行多種任務,因為一個JVM中使用多個TaskTracker實例比較浪費資源(當然當你某種任務量比較多的時候,可以將這個任務單獨使用一個TaskTracker節點來執行)。那麼要怎麼實現一個TaskTracker執行多種任務呢。下面是我給的參考例子。
/**
* 总入口,在 taskTracker.setJobRunnerClass(JobRunnerDispatcher.class)
* JobClient 提交 任务时指定 Job 类型 job.setParam("type", "aType")
*/
public class JobRunnerDispatcher implements JobRunner {
private static final ConcurrentHashMap < String /*type*/ , JobRunner >
JOB_RUNNER_MAP = new ConcurrentHashMap < String , JobRunner >();
static {
JOB_RUNNER_MAP . put ( "aType" , new JobRunnerA ()); // 也可以从Spring中拿
JOB_RUNNER_MAP . put ( "bType" , new JobRunnerB ());
}
@ Override
public Result run ( JobContext jobContext ) throws Throwable {
Job job = jobContext . getJob ();
String type = job . getParam ( "type" );
return JOB_RUNNER_MAP . get ( type ). run ( job );
}
}
class JobRunnerA implements JobRunner {
@ Override
public Result run ( JobContext jobContext ) throws Throwable {
// TODO A类型Job的逻辑
return null ;
}
}
class JobRunnerB implements JobRunner {
@ Override
public Result run ( JobContext jobContext ) throws Throwable {
// TODO B类型Job的逻辑
return null ;
}
}
一般在寫TaskTracker的時候,只需要測試JobRunner的實作邏輯是否正確,又不想啟動LTS進行遠端測試。為了方便測試,LTS提供了JobRunner的快速測試方法。自己的測試類別整合com.github.ltsopensource.tasktracker.runner.JobRunnerTester
即可,實作initContext
和newJobRunner
方法即可。如lts-examples中的例子:
public class TestJobRunnerTester extends JobRunnerTester {
public static void main ( String [] args ) throws Throwable {
// Mock Job 数据
Job job = new Job ();
job . setTaskId ( "2313213" );
JobContext jobContext = new JobContext ();
jobContext . setJob ( job );
JobExtInfo jobExtInfo = new JobExtInfo ();
jobExtInfo . setRetry ( false );
jobContext . setJobExtInfo ( jobExtInfo );
// 运行测试
TestJobRunnerTester tester = new TestJobRunnerTester ();
Result result = tester . run ( jobContext );
System . out . println ( JSON . toJSONString ( result ));
}
@ Override
protected void initContext () {
// TODO 初始化Spring容器
}
@ Override
protected JobRunner newJobRunner () {
return new TestJobRunner ();
}
}
對於Quartz的Cron任務只需要在Spring配置中增加一下程式碼就可以接入LTS平台
< bean class = " com.github.ltsopensource.spring.quartz.QuartzLTSProxyBean " >
< property name = " clusterName " value = " test_cluster " />
< property name = " registryAddress " value = " zookeeper://127.0.0.1:2181 " />
< property name = " nodeGroup " value = " quartz_test_group " />
</ bean >
@ SpringBootApplication
@ EnableJobTracker // 启动JobTracker
@ EnableJobClient // 启动JobClient
@ EnableTaskTracker // 启动TaskTracker
@ EnableMonitor // 启动Monitor
public class Application {
public static void main ( String [] args ) {
SpringApplication . run ( Application . class , args );
}
}
剩下的就只是在application.properties中添加相應的配置就行了, 具體見lts-example中的com.github.ltsopensource.examples.springboot
包下的例子
當機器有內網兩張網卡的時候,有時候,使用者想讓LTS的流量走外網卡,那麼需要在host中,把主機名稱的對應位址改為外網卡位址即可,內網同理。
如果在節點啟動的時候設定節點標識,LTS會預設設定一個UUID為節點標識,可讀性會比較差,但是能保證每個節點的唯一性,如果用戶能自己保證節點標識的唯一性,可以通過setIdentity
來設定,譬如如果每個節點都是部署在一台機器(一個虛擬機)上,那麼可以將identity設定為主機名稱
支援JobLogger,JobQueue等等的SPI擴展
見issue#389
工作年資三年以上
學歷要求本科
期望層級P6(資深Java工程師)/P7(技術專家)
職位描述
會員平台,負責阿里巴巴集團的使用者體系,支援集團內各線業務線用戶類需求,支援集團對外合作的使用者通和業務通。 包括各端的使用者登入&授權、Session體系、註冊、帳戶管理、帳戶安全等功能,底層的使用者資訊服務,會話和憑證管理等等,是集團最核心的產品線之一,每天承載千億次調用量、峰值千萬QPS、以及分佈全球的混合雲架構等等。
作為軟體工程師,你將會在我們的核心產品上工作,這些產品為我們的商業基礎設施提供關鍵功能, 取決於你的興趣和經驗,你可以在如下的一個或多個領域工作:全球化,使用者體驗,資料安全,機器學習,系統高可用性等等。