고급 작업 제어 및 최종 일관성을 갖춘 Redis 지원 작업 대기열 엔진입니다.
작업 그룹화, 연결, 광범위한 범위에 대한 반복자.
연기 및 예약된 작업 실행.
부하 분산 + 작업자 풀.
삽입하기 쉽습니다.
idoit
이를 구현하기 위한 고급 제어 기능을 제공합니다.
그룹화 . 특수 group
작업은 하위 작업을 실행하고 모두 완료될 때까지 기다립니다. 매핑/감소 논리에 유용합니다.
체인 . 특수 chain
작업은 하위 항목을 하나씩 실행합니다. 맵 축소(map-reduce) 또는 매우 복잡한 작업을 보다 간단한 단계로 분할하는 데에도 유용합니다.
매핑 반복자 . 대용량 페이로드를 위한 특수 기능으로 요청 시 청크를 생성합니다. 이익:
매핑 단계에서 지연이 없으며 청크 처리가 즉시 시작됩니다.
동일한 크기의 청크를 구축하기 위해 DB 쿼리를 쉽게 최적화할 수 있습니다(거대한 데이터에서는 건너뛰기 + 제한 쿼리가 매우 느림).
진전 . 그룹/체인/맵 시나리오를 사용하면 상위 상위 항목을 통해 전체 진행 상황을 쉽게 모니터링할 수 있습니다. 장기간의 독립 실행형 작업은 진행 상황 변경에 대해 사용자에게 알릴 수도 있습니다.
작업자 풀 . 다양한 프로세스별로 작업을 분할할 수 있습니다. 예를 들어, 무거운 작업이 가벼운 작업을 차단하는 것을 원하지 않는 경우입니다.
쉐듀러 . 내장된 cron을 사용하면 주어진 일정에 따라 작업을 실행할 수 있습니다.
Redis의 모든 데이터는 항상 일관됩니다.
작업은 손실될 수 없지만 극단적인 경우에는 두 번 실행할 수 있습니다(작업 기능이 끝나려고 할 때 프로세스가 충돌하는 경우).
task.progressAdd()
사용하고 작업이 완료되기 전에 충돌을 처리하면 진행률이 "더 빠르게" 계산될 수 있습니다. 그러나 그러한 정보는 인터페이스 진행률 표시줄 업데이트에만 사용될 수 있으므로 이는 중요하지 않습니다. 대부분의 경우 차이점을 볼 수 없습니다.
node.js
6+ 및 redis
3.0+가 필요합니다.
npm 설치 idoit --저장
redisURL (문자열) - redis 연결 URL입니다.
동시성 (수) - 단일 작업자가 병렬로 사용할 수 있는 최대 작업 수, 기본적으로 100입니다.
pool (String) - 작업자 풀 이름, 설정되지 않은 경우 "기본값"입니다. 이 대기열 인스턴스가 작업만 소비하는 경우( .start()
이후) 사용됩니다. 원치 않는 잠금을 방지하기 위해 작업을 특정 작업자 풀로 라우팅할 수 있습니다. pool
Array, [ 'pool1', 'pool2' ]
로 설정하여 여러 풀의 작업을 사용할 수 있습니다(개발/테스트 목적으로).
ns (문자열) - 현재 Redis 키 접두사로 사용되는 데이터 네임스페이스, 기본적으로 "idoitqueue:"
과도한 차단 작업과 비차단 작업을 위해 별도의 작업자 풀을 보유하는 것이 좋습니다. 예를 들어, 누구도 긴급 이메일 전송을 차단해서는 안 됩니다. 따라서 여러 작업자 프로세스를 생성하고 이를 다른 풀에 고정하고 적절한 작업 동시성을 설정하십시오. 비차단 작업은 병렬로 계산할 수 있으며 기본 concurrency
= 100이면 괜찮습니다. 차단 작업은 하나씩 소비해야 하며 해당 작업자에 대해 concurrency
= 1을 설정해야 합니다.
메모. 앱에서 일부 작업 유형을 제거하는 경우가 발생할 수 있습니다. 이 경우 분리된 데이터는 3일 후에 삭제됩니다.
옵션:
name (String) - 작업의 이름입니다.
baseClass (함수) - 선택 사항이며 기본 작업의 생성자이며 기본적으로 "Task"입니다.
init (함수) - 선택 사항이며 비동기 작업 초기화에 사용되며 Promise
반환해야 합니다.
this (객체) - 현재 작업(작업 총계는 this.total
로 사용 가능)
taskID (함수) - 선택 사항이며 새 작업 ID를 반환해야 합니다. "독점적인" 작업을 생성하는 데만 필요하며 기본적으로 function (taskData)
라고 하는 임의의 값을 반환합니다. 설탕: 일반 문자열을 전달하면 항상 이 문자열을 반환하는 함수로 래핑됩니다.
프로세스 (함수) - task.process(...args)
라고 하는 기본 작업 함수입니다. Promise
반환해야 함
this (객체) - 현재 작업입니다.
재시도 (번호) - 선택 사항, 오류 시 재시도 횟수, 기본값은 2입니다.
retryDelay (숫자) - 선택 사항, 재시도 후 지연 시간(ms), 기본값은 60000ms입니다.
시간 초과 (숫자) - 선택 사항, 실행 시간 초과, 기본값은 120000ms입니다.
total (숫자) - 선택 사항, 최대 진행률 값, 기본값은 1입니다. 동작을 수정하지 않으면 진행률은 0으로 시작하고 작업이 끝나면 1이 됩니다.
연기 지연 (숫자) - 선택 사항입니다. 연기가 지연 없이 호출되면 지연이 이 값(밀리초)과 동일한 것으로 간주됩니다.
cron (문자열) - 선택 사항, cron 문자열("15 */6 * * *"), 기본값은 null입니다.
트랙 (번호) - 기본값은 3600000ms(1시간)입니다. 클러스터의 여러 서버에 잘못된 시계가 있는 경우 재실행을 방지하기 위해 cron에서 예약된 작업을 기억하는 시간입니다. 매우 빈번한 작업에 대해서는 너무 높게 설정하지 마십시오. 메모리를 많이 차지할 수 있기 때문입니다.
ID로 작업을 가져옵니다. 작업으로 해결된 Promise를 반환하거나 작업이 존재하지 않는 경우 null
을 반환합니다.
사용할 수 있는 작업 필드:
총계 - 총 작업 진행률
진행 - 현재 작업 진행
결과 - 작업 결과
오류 - 작업 오류
작업을 취소합니다. 작업으로 해결된 Promise를 반환합니다.
메모. 상위 항목이 없는 작업만 취소할 수 있습니다.
작업자를 시작하고 작업 데이터 소비를 시작합니다. Promise
반환하고 대기열이 준비되면 해결됩니다(내부에서 .ready()
호출).
pool
cunstructor에 지정된 경우 이 끌어오기로 라우팅된 작업만 소비됩니다.
대기열에서 새 작업 수락을 중지합니다. Return Promise
이 작업자의 모든 활성 작업이 완료되면 해결됩니다.
Return Promise
, 대기열이 작동할 준비가 되면 해결됩니다('connect' 이벤트 이후, 아래 참조).
redisURL을 제외한 생성자 옵션을 업데이트합니다.
idoit
일부 이벤트를 발생시키는 EventEmitter
인스턴스입니다.
Redis 연결이 완료되고 명령이 실행될 때 ready
. (연결 없이도 작업을 등록할 수 있습니다.)
오류가 발생하면 error
발생합니다.
task:progress
, task:progress:<task_id>
- 작업 업데이트가 진행되는 경우. 이벤트 데이터는 다음과 같습니다: { id, uid, total, Progress }
task:end
, task:end:<task_id>
- 작업이 종료될 때. 이벤트 데이터: { id, uid }
선택적 매개변수를 사용하여 새 작업을 만듭니다.
작업 속성을 재정의합니다. 예를 들어, 특정 그룹/체인 작업을 다른 풀에 할당할 수 있습니다.
즉시 작업을 실행하세요. 작업 ID로 해결된 Promise를 반환합니다.
작업 실행을 연기하여 밀리초(또는 task.postponeDelay
)를 delay
.
작업 ID로 해결된 Promise를 반환합니다.
현재 실행 중인 작업을 다시 시작합니다.
add_retry (Boolean) - 선택사항, 재시도 횟수 증가 여부(기본값: false)
true
인 경우 재시도 횟수가 증가하고 초과된 경우 작업이 다시 시작되지 않습니다.
false
인 경우 재시도 횟수는 동일하게 유지되므로 작업이 무기한으로 다시 시작될 수 있습니다.
지연 (숫자) 재시작 전 지연 시간(밀리초)(기본값: task.retryDelay
)
idoit
이미 작업 오류에 대한 재시작 논리가 내장되어 있습니다. 아마도 이 방법을 직접 사용해서는 안 될 것입니다. 매우 특정한 경우에 노출됩니다.
현재 작업 진행률을 높입니다.
작업 ID로 해결된 Promise를 반환합니다.
현재 작업 마감일을 업데이트하세요.
작업 ID로 해결된 Promise를 반환합니다.
하위 항목을 병렬로 실행하는 새 작업을 만듭니다.
대기열.그룹([ 대기열.어린이1(), 대기열.어린이2(), queue.children3()]).run()
그룹 결과는 정렬되지 않은 하위 결과 배열입니다.
하위 항목을 연속적으로 실행하는 새 작업을 만듭니다. 자식 중 하나라도 실패하면 체인도 실패합니다.
queue.registerTask('곱하기', (a, b) => a * b);queue.registerTask('subtract', (a, b) => a - b);queue.chain([ queue.multiply(2, 3), // 2 * 3 = 6 queue.subtract(10), // 10 - 6 = 4 queue.multiply(3) // 3 * 4 = 12]).run()
이전 작업의 결과가 다음 작업의 마지막 인수로 전달됩니다. 체인 결과는 체인의 마지막 작업 결과입니다.
게으른 스타일(요청 시)로 대규모 매핑을 실행하는 특별한 방법입니다. 아래 댓글을 참조하세요.
// 반복자 등록 taskqueue.registerTask({ 이름: 'lazy_mapper', baseClass: Queue.Iterator, // 이 메서드는 작업 시작과 모든 하위 끝에서 호출됩니다. 그럴 수 있다 // `Promise`를 반환하는 생성기 함수 또는 함수. * iterate(state) {// ...// 세 가지 유형의 출력 상태가 가능합니다: 종료됨, 아무것도 하지 않음 및 새 데이터.//// 1. `null` - 끝에 도달하면 반복자를 더 이상 호출해서는 안 됩니다.// 2. `{}` - 유휴 상태, 대기열에 충분한 하위 작업이 있습니다. // 반복자를 나중에 호출해 보세요(다음 하위 항목이 완료될 때).// 3. {// 상태 - 기억할 새 반복기 상태(예: 오프셋 //디비 쿼리), 직렬화 가능한 모든 데이터// 작업 - 대기열에 푸시할 새 하위 작업 배열// }//// 중요! Iterator는 여러 워커에서 병렬로 호출될 수 있습니다. // Redis 업데이트 시 충돌을 해결하기 위해 입력 '상태'를 사용합니다. 따라서// 새 하위 작업을 생성하는 경우://// 1. 새 `상태`는 (이전의 모든 상태에 대해) 달라야 합니다.// 2. `작업` 배열은 비어 있으면 안 됩니다.//// 다른 경우에는 '종료' 또는 '유휴'에 대한 신호를 보내야 합니다.//// 잘못된 조합으로 인해 '종료' + 오류 이벤트가 발생합니다.//return { state: newState, jobs:chunksArray}; }});// iteratorqueue.lazy_mapper().run() 실행;
왜 이런 미친 마법이 발명되었나요?
천만 개의 포럼 게시물을 다시 작성해야 한다고 상상해 보세요. 작업을 동일한 작은 청크로 나누고 싶지만 게시물에는 순차적 정수 열거가 없고 mongo ID만 있습니다. 당신은 무엇을 할 수 있나요?
직접 skip
+ limit
요청은 모든 데이터베이스의 대규모 컬렉션에서 매우 비쌉니다.
게시물의 밀도는 처음부터 마지막 게시물까지 많이 다르기 때문에 날짜 간격으로 분할할 수 없습니다.
모든 게시물에 임의의 숫자가 포함된 색인 필드를 추가할 수 있습니다. 그런 다음 간격으로 나눕니다. 그러면 작동하지만 임의의 디스크 액세스가 발생합니다. 멋지지는 않습니다.
해결책은 "이전 위치"를 기억할 수 있는 반복 매퍼를 사용하는 것입니다. 이 경우 skip
+ limit
대신 range
+ limit
요청을 수행합니다. 이는 데이터베이스와 잘 작동합니다. 추가 보너스는 다음과 같습니다:
모든 하위 작업을 대기열에 보관할 필요는 없습니다. 예를 들어 100개의 청크를 생성하고 이전 청크가 완료되려고 할 때 다음 100개를 추가할 수 있습니다.
매핑 단계가 분산되어 전체 진행 상황을 즉시 모니터링할 수 있습니다.
Docker를 통한 빠른 실행 Redis:
# startdocker run -d -p 6379:6379 --name redis1 redis# stopdocker stop redis1 도커 rm redis1
당연히 우리는 kue, celery 및 akka에 익숙합니다. 우리의 목표는 단순성과 성능 사이의 균형을 맞추는 것이었습니다. 따라서 idoit
수천 개의 인스턴스가 있는 클러스터에서 잘 작동하는지 알 수 없습니다. 하지만 작은 볼륨에서도 괜찮고 사용하기가 정말 쉽습니다.
kue는 우리 요구 사항에 적합하지 않았습니다. 그 이유는 다음과 같습니다.
"우선순위" 개념은 유연하지 않으며 무거운 작업으로 인한 잠금으로부터 잘 보호되지 않습니다.
작업 그룹화/체인화 등이 없습니다.
데이터 일관성이 강력하게 보장되지 않음
idoit
에서 우리는 다음 사항에 관심을 가졌습니다.
작업 그룹/체인 작업 및 작업 간 데이터 전달(셀러리와 유사)
작업자 풀을 사용하여 작업 실행을 유형별로 분리합니다.
사용 및 설치가 용이함(redis만 필요, 기존 프로세스에서 실행 가능)
저장된 데이터의 최종 일관성
내장 스케줄러와 같은 필수 설탕
대규모 페이로드를 위한 반복 매퍼(독특한 기능, 많은 유지 관리 작업에 매우 유용함)
작업 진행 상황 추적
전역 잠금 방지
Redis는 여전히 실패 지점이 될 수 있지만 단순성을 고려하면 합리적인 가격입니다. 물론 RMQ와 같은 분산 메시지 버스를 통해 더 나은 가용성을 얻을 수 있습니다. 그러나 많은 경우에는 일을 단순하게 유지하는 것이 더 중요합니다. idoit
사용하면 추가 비용 없이 기존 기술을 재사용할 수 있습니다.