전문적인 프런트 엔드로 대기열을 강화하세요.
모든 대기열에 대한 전체 개요를 확인하세요.
작업을 검사하고, 지연된 작업을 검색, 재시도하거나 승격합니다.
측정항목 및 통계.
그리고 더 많은 기능.
Taskforce.sh에 가입하세요.
폴링 없는 설계로 인해 CPU 사용량이 최소화됩니다.
Redis를 기반으로 한 견고한 디자인.
작업이 지연되었습니다.
cron 사양에 따라 작업을 예약하고 반복합니다.
작업의 속도 제한기입니다.
재시도.
우선 사항.
동시성.
일시 중지/재개 - 전역 또는 로컬.
대기열당 여러 작업 유형.
스레드(샌드박스) 처리 기능.
프로세스 충돌로부터 자동 복구.
그리고 로드맵에 나오네요...
작업 완료 확인(그 동안에는 메시지 대기열 패턴을 사용할 수 있습니다)
부모-자식 직업 관계.
모니터링에 사용할 수 있는 몇 가지 타사 UI가 있습니다.
BullMQ
태스크포스
불 v3
태스크포스
불보드
황소 대표
강세 모니터
모니터로
황소 <= v2
투우사
반응 황소
투레이로
Prometheus Bull Queue 내보내기 사용
몇 가지 작업 대기열 솔루션이 있으므로 이를 비교하는 표는 다음과 같습니다.
Dragonfly는 BullMQ와 완벽하게 호환되는 새로운 Redis™ 드롭인 대체품으로, 사용 가능한 모든 CPU 코어를 활용하여 대폭 향상된 성능, 더 빠르고 메모리 효율적인 데이터 구조 등 Redis™에 비해 몇 가지 중요한 이점을 제공합니다. BullMQ와 함께 사용하는 방법에 대해 여기에서 자세히 읽어보세요. | |
Bull 프로젝트에 고품질 프로덕션 Redis 인스턴스가 필요한 경우 BullMQ와 완벽하게 작동하는 Redis 호스팅의 선두주자인 Memetria for Redis 구독을 고려해 보세요. BullMQ 개발을 후원하려면 가입 시 프로모션 코드 "BULLMQ"를 사용하세요! |
특징 | BullMQ-Pro | BullMQ | 황소 | 쿠에 | 벌 | 의제 |
---|---|---|---|---|---|---|
백엔드 | 레디스 | 레디스 | 레디스 | 레디스 | 레디스 | 몽고 |
관찰 가능 항목 | ✓ | |||||
그룹 비율 제한 | ✓ | |||||
그룹 지원 | ✓ | |||||
배치 지원 | ✓ | |||||
상위/하위 종속성 | ✓ | ✓ | ||||
우선순위 | ✓ | ✓ | ✓ | ✓ | ✓ | |
동시성 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
지연된 작업 | ✓ | ✓ | ✓ | ✓ | ✓ | |
글로벌 이벤트 | ✓ | ✓ | ✓ | ✓ | ||
속도 제한기 | ✓ | ✓ | ✓ | |||
일시중지/재개 | ✓ | ✓ | ✓ | ✓ | ||
샌드박스 작업자 | ✓ | ✓ | ✓ | |||
반복 가능한 작업 | ✓ | ✓ | ✓ | ✓ | ||
원자 작전 | ✓ | ✓ | ✓ | ✓ | ||
고집 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
UI | ✓ | ✓ | ✓ | ✓ | ✓ | |
최적화됨 | 채용정보/메시지 | 채용정보/메시지 | 채용정보/메시지 | 채용정보 | 메시지 | 채용정보 |
npm 설치 황소 --저장
또는
원사 추가 황소
요구 사항: Bull에는 2.8.18
이상의 Redis 버전이 필요합니다.
npm install @types/bull --save-dev
원사 추가 --dev @types/bull
정의는 현재 DefinedTyped 저장소에서 유지관리됩니다.
우리는 코드 수정, 새로운 기능, 문서 개선 등 모든 유형의 기여를 환영합니다. 코드 형식은 Prettier에 의해 시행됩니다. 커밋의 경우 기존 커밋 규칙을 따르세요. 모든 코드는 개발에 병합되기 전에 린트 규칙과 테스트 스위트를 통과해야 합니다.
const Queue = require('bull');const videoQueue = new Queue('video transcoding', 'redis://127.0.0.1:6379');const audioQueue = new Queue('audio transcoding', { redis: { port : 6379, 호스트: '127.0.0.1', 비밀번호: 'foobared' } }); // object를 사용하여 Redis 연결 지정const imageQueue = new Queue('image transcoding');const pdfQueue = new Queue('pdf transcoding');videoQueue.process(function (job, done) { // job.data에는 작업이 생성될 때 전달된 사용자 정의 데이터가 포함되어 있습니다. // job.id에는 이 작업의 ID가 포함되어 있습니다. // 비디오를 비동기적으로 트랜스코딩하고 진행 상황을 보고합니다. job.progress(42); // 완료되면 호출 완료 완료(); // 또는 오류가 발생하면 오류를 제공합니다. done(new Error('오류 트랜스코딩')); // 또는 결과를 전달합니다. done(null, { 프레임 속도: 29.5 /* 등... */ }); // 작업에서 처리되지 않은 예외가 발생하면 올바르게 처리됩니다. throw new Error('예기치 않은 오류');});audioQueue.process(함수(작업, 완료) { // 오디오를 비동기적으로 트랜스코딩하고 진행 상황을 보고합니다. job.progress(42); // 완료되면 호출 완료 완료(); // 또는 오류가 발생하면 오류를 제공합니다. done(new Error('오류 트랜스코딩')); // 또는 결과를 전달합니다. done(null, { 샘플링 속도: 48000 /* 등... */ }); // 작업에서 처리되지 않은 예외가 발생하면 올바르게 처리됩니다. throw new Error('예기치 않은 오류');});imageQueue.process(함수(작업, 완료) { // 이미지를 비동기적으로 트랜스코딩하고 진행 상황을 보고합니다. job.progress(42); // 완료되면 호출 완료 완료(); // 또는 오류가 발생하면 오류를 제공합니다. done(new Error('오류 트랜스코딩')); // 또는 결과를 전달합니다. done(null, { 너비: 1280, 높이: 720 /* 등... */ }); // 작업에서 처리되지 않은 예외가 발생하면 올바르게 처리됩니다. throw new Error('예기치 않은 오류');});pdfQueue.process(함수(작업) { // 프로세서는 완료 콜백을 사용하는 대신 약속을 반환할 수도 있습니다. return pdfAsyncProcessor();});videoQueue.add({ 비디오: 'http://example.com/video1.mov' });audioQueue.add({ 오디오: 'http://example.com/audio1.mp3 ' });imageQueue.add({ 이미지: 'http://example.com/image1.tiff' });
또는 done
콜백을 사용하는 대신 약속을 반환할 수 있습니다.
videoQueue.process(function (job) { // 완료된 콜백을 제거하는 것을 잊지 마세요! // 간단히 약속을 반환합니다. return fetchVideo(job.data.url).then(transcodeVideo); // 약속 거부 처리 return Promise.reject(new Error('error transcoding')); // Promise가 해결된 값을 "completed" 이벤트에 전달합니다. return Promise.resolve({ 프레임 속도: 29.5 /* etc... */ }); // 작업에서 처리되지 않은 예외가 발생하면 올바르게 처리됩니다. throw new Error('예상치 못한 오류'); //동일 return Promise.reject(new Error('예기치 않은 오류'));});
프로세스 기능은 별도의 프로세스에서 실행될 수도 있습니다. 여기에는 몇 가지 장점이 있습니다.
프로세스는 샌드박스 처리되어 충돌이 발생하더라도 작업자에게 영향을 주지 않습니다.
대기열에 영향을 주지 않고 차단 코드를 실행할 수 있습니다(작업이 중단되지 않음).
멀티 코어 CPU 활용도가 훨씬 향상되었습니다.
Redis에 대한 연결이 적습니다.
이 기능을 사용하려면 프로세서를 사용하여 별도의 파일을 생성하십시오.
// processor.jsmodule.exports = 함수(작업) { // 힘든 일을 좀 해라 return Promise.resolve(결과);}
그리고 프로세서를 다음과 같이 정의하십시오.
// 단일 프로세스:queue.process('/path/to/my/processor.js');// 동시성도 사용할 수 있습니다:queue.process(5, '/path/to/my/processor.js' );// 그리고 명명된 processors:queue.process('my processor', 5, '/path/to/my/processor.js');
cron 사양에 따라 작업을 대기열에 추가하고 반복적으로 처리할 수 있습니다.
PaymentQueue.process(function (job) {// 결제 확인 }); // 매일 오전 3시 15분에 1회 결제작업 반복 지불 대기열.add(결제 데이터, { 반복: { cron: '15 3 * * *' } });
팁으로서 여기에서 표현식을 확인하여 올바른지 확인하십시오. cron 표현식 생성기
대기열은 전역적으로 일시 중지 및 재개될 수 있습니다. (이 작업자에 대해서만 처리를 일시 중지하려면 true
전달합니다.)
queue.pause().then(함수 () { // 대기열은 이제 일시 중지되었습니다.});queue.resume().then(function () { // 이제 대기열이 재개됩니다.})
큐는 몇 가지 유용한 이벤트를 내보냅니다. 예를 들어...
.on('완료', 함수(작업, 결과) { // 작업이 완료되고 결과가 출력됩니다!})
실행되는 이벤트의 전체 목록을 포함하여 이벤트에 대한 자세한 내용은 이벤트 참조를 확인하세요.
대기열은 저렴하므로 대기열이 많이 필요한 경우 다른 이름으로 새 대기열을 생성하면 됩니다.
const userJohn = 새 대기열('john');const userLisa = 새 대기열('lisa');...
그러나 모든 대기열 인스턴스에는 새로운 Redis 연결이 필요합니다. 연결을 재사용하는 방법을 확인하거나 명명된 프로세서를 사용하여 비슷한 결과를 얻을 수도 있습니다.
참고: 버전 3.2.0 이상부터는 스레드 프로세서를 대신 사용하는 것이 좋습니다.
대기열은 강력하며 위험이나 대기열 손상의 위험 없이 여러 스레드 또는 프로세스에서 병렬로 실행될 수 있습니다. 프로세스 전반에 걸쳐 작업을 병렬화하기 위해 클러스터를 사용하는 다음 간단한 예를 확인하세요.
const Queue = require('bull');const Cluster = require('cluster');const numWorkers = 8;const queue = new Queue('동시 큐 테스트');if (cluster.isMaster) { for (let i = 0; i < numWorkers; i++) {cluster.fork(); } Cluster.on('online', function (worker) {// 대기열 작업자를 위한 몇 가지 작업을 만들어 보겠습니다. for (let i = 0; i < 500; i++) { queue.add({ foo: 'bar' }); }; }); Cluster.on('exit', function (worker, code, signal) {console.log('worker ' + 작업자.process.pid + ' 사망'); });} 또 다른 { queue.process(function (job, jobDone) {console.log('작업자가 수행한 작업', Cluster.worker.id, job.id);jobDone(); });}
전체 문서를 보려면 참조 및 공통 패턴을 확인하세요.
가이드 — Bull 개발을 위한 출발점입니다.
참조 — 사용 가능한 모든 개체와 메서드가 포함된 참조 문서입니다.
패턴 — 일반적인 패턴에 대한 예제 집합입니다.
라이센스(Bull 라이센스)는 MIT입니다.
더 많은 문서를 사용할 수 있는 내용이 있으면 끌어오기 요청을 제출하세요!
대기열은 "적어도 한 번" 작업 전략을 목표로 합니다. 이는 어떤 상황에서는 작업이 두 번 이상 처리될 수 있음을 의미합니다. 이는 작업자가 전체 처리 기간 동안 특정 작업에 대한 잠금을 유지하지 못한 경우에 주로 발생합니다.
작업자가 작업을 처리하는 동안 다른 작업자가 작업을 처리할 수 없도록 해당 작업을 "잠긴" 상태로 유지합니다.
작업이 잠금을 잃어서 중단 되고 결과적으로 다시 시작되는 것을 방지하려면 잠금이 어떻게 작동하는지 이해하는 것이 중요합니다. 잠금은 lockRenewTime
(보통 lockDuration
의 절반) 간격으로 lockDuration
에 대한 잠금을 생성하여 내부적으로 구현됩니다. 잠금을 갱신하기 전에 lockDuration
경과하면 작업은 정지된 것으로 간주되어 자동으로 다시 시작됩니다. 이중 처리 됩니다. 이는 다음과 같은 경우에 발생할 수 있습니다.
작업 프로세서를 실행하는 노드 프로세스가 예기치 않게 종료됩니다.
작업 프로세서가 너무 CPU를 많이 사용하고 노드 이벤트 루프를 중단시켰기 때문에 Bull이 작업 잠금을 갱신할 수 없었습니다(이를 더 잘 감지하는 방법은 #488 참조). 단일 부품이 노드 이벤트 루프를 차단할 수 없도록 작업 프로세서를 더 작은 부품으로 나누어 이 문제를 해결할 수 있습니다. 또는 lockDuration
설정에 더 큰 값을 전달할 수 있습니다. 단, 실제 정지된 작업을 인식하는 데 시간이 더 오래 걸립니다.
따라서 항상 stalled
이벤트를 수신하고 이를 오류 모니터링 시스템에 기록해야 합니다. 이는 작업이 이중 처리될 가능성이 있음을 의미합니다.
문제가 있는 작업이 무한정 다시 시작되지 않도록 보호하기 위해(예: 작업 프로세서가 항상 해당 노드 프로세스와 충돌하는 경우) 작업은 최대 maxStalledCount
횟수(기본값: 1
)까지 정지 상태에서 복구됩니다.