Redis 支持的任务队列引擎,具有高级任务控制和最终一致性。
大范围的任务分组、链接、迭代器。
推迟和计划的任务运行。
负载分配+工作池。
易于嵌入。
idoit
提供了先进的控制来实现
分组。特殊group
任务执行子任务并等待所有任务完成。对于映射/归约逻辑很有用。
连锁。特殊的chain
任务一一执行孩子。对于映射缩减或将非常复杂的任务拆分为更简单的步骤也很有用。
映射迭代器。针对巨大有效负载的特殊功能,可按需生成块。好处:
映射阶段没有滞后,块处理立即开始。
易于优化数据库查询以构建相同大小的块(跳过+限制查询在大数据上非常慢)。
进步。当您使用组/链/地图场景时,可以轻松地通过顶级父级监控总进度。长独立任务还可以通知用户进度变化。
工人池。您可以按不同的流程拆分任务。例如,如果您不希望繁重的任务阻碍轻量的任务。
调度者。内置 cron 允许按给定的时间表执行任务。
Redis中的所有数据都是永远一致的。
任务不会丢失,但可以在边缘情况下运行两次(如果任务功能即将完成时进程崩溃)
如果使用task.progressAdd()
并且在任务完成之前进程崩溃,进度可以“更快”地计数。但这并不重要,因为此类信息只能用于界面进度条更新。在大多数情况下,您不会看到差异。
需要node.js
6+ 和redis
3.0+。
npm 安装 idoit --save
redisURL (String) - redis 连接 url。
concurrency (Number) - 单个工作线程并行消耗的最大任务数,默认为 100。
pool (String) - 工作池名称,如果未设置则为“默认”。如果此队列实例仅消耗任务(在.start()
之后),则使用。您可以将任务路由到特定的工作人员池,以避免不必要的锁定。您可以将pool
设置为数组, [ 'pool1', 'pool2' ]
以使用多个池中的任务(用于开发/测试目的)。
ns (String) - 数据命名空间,当前用作 redis 键前缀,默认为“idoitqueue:”。
对于繁重的阻塞任务和非阻塞任务,有单独的工作池是一个很好的做法。例如,任何人都不应阻止发送紧急电子邮件。因此,创建多个工作进程,将它们固定到不同的池并设置适当的任务并发性。非阻塞任务可以并行消费,默认concurrency
=100就可以了。阻塞任务应该一对一消费,为这些worker设置concurrency
=1。
笔记。您可能会从应用程序中删除某些任务类型。在这种情况下,孤立数据将在 3 天后被擦除。
选项:
name (字符串)- 任务的名称。
baseClass (Function) - 可选,基本任务的构造函数,默认为“Task”。
init (Function) - 可选,用于异步任务初始化,应返回Promise
this (Object) - 当前任务(任务总计可用作this.total
)。
taskID (函数)- 可选,应返回新的任务 ID。仅在创建“独占”任务时需要,默认返回随机值,称为: function (taskData)
。 Sugar:如果您传递纯字符串,它将被包装到始终返回该字符串的函数。
process (Function) - 主任务函数,称为: task.process(...args)
。应该返回Promise
this (对象)- 当前任务。
retry (Number) - 可选,出错时重试的次数,默认 2。
retryDelay (Number) - 可选,重试后的延迟(以毫秒为单位),默认 60000 毫秒。
timeout (Number) - 可选,执行超时,默认 120000 毫秒。
Total (Number) - 可选,最大进度值,默认 1。如果不修改行为,进度将从 0 开始,并在任务结束时变为 1。
推迟延迟(数字) - 可选,如果没有延迟地调用推迟,则假定延迟等于该值(以毫秒为单位)。
cron (String) - 可选,cron 字符串 ("15 */6 * * *"),默认为 null。
轨道(数量)- 默认 3600000 毫秒(1 小时)。是时候记住 cron 中的计划任务了,以避免在集群中的多个服务器时钟错误时重新运行。对于非常频繁的任务不要设置得太高,因为它会占用大量内存。
通过id获取任务。返回一个通过任务解决的 Promise,如果任务不存在则返回null
。
您可以使用的任务字段:
总计- 任务总进度
进度- 当前任务进度
result - 任务结果
error - 任务错误
取消任务。返回一个通过任务解决的 Promise。
笔记。您只能取消没有父任务的任务。
启动worker并开始任务数据消耗。返回Promise
,当队列准备好时解决(在内部调用.ready()
)。
如果在 structor 中指定了pool
,则只会消耗路由到此拉取的任务。
停止从队列中接受新任务。返回Promise
,当该工作线程中的所有活动任务完成时已解决。
返回Promise
,当队列准备好操作时解决(在“connect”事件之后,见下文)。
更新构造函数选项,redisURL 除外。
idoit
是一个EventEmitter
实例,它触发一些事件:
当redis连接建立并且可以执行命令时ready
(无需连接即可注册任务)
发生错误时发生error
。
task:progress
, task:progress:<task_id>
- 任务更新进度时。事件数据为:{ id、uid、total、progress }
task:end
, task:end:<task_id>
- 任务结束时。事件数据为:{ id, uid }
使用可选参数创建新任务。
覆盖任务属性。例如,您可能希望将特定的组/链任务分配给另一个池。
立即运行任务。返回一个带有任务 id 的 Promise。
推迟任务执行以delay
毫秒(或task.postponeDelay
)。
返回一个带有任务 id 的 Promise。
重新启动当前正在运行的任务。
add_retry (Boolean) - 可选,是否增加重试次数(默认值: false)
如果true
,重试次数会增加,并且如果超出则任务不会重新启动
如果为false
,重试计数保持不变,因此任务可以无限期地自行重新启动
延迟(数字)重新启动之前的延迟(以毫秒为单位)(默认值: task.retryDelay
)。
请注意, idoit
已经内置了针对任务错误的重启逻辑。也许,您不应该直接使用此方法。它针对非常具体的情况而公开。
增加当前任务进度。
返回一个带有任务 id 的 Promise。
更新当前任务截止日期。
返回一个带有任务 id 的 Promise。
创建一个新任务,并行执行子任务。
队列.组([ 队列.children1(), 队列.children2(), 队列.children3()]).run()
组结果是未排序的子结果数组。
创建一个新任务,连续执行子任务。如果任何一个孩子失败了,那么链条也会失败。
queue.registerTask('乘法', (a, b) => a * b);queue.registerTask('减法', (a, b) => a - b);queue.chain([ 队列.multiply(2, 3), // 2 * 3 = 6 队列.subtract(10), // 10 - 6 = 4 队列.multiply(3) // 3 * 4 = 12]).run()
上一个任务的结果作为下一个任务的最后一个参数传递。链的结果是链中最后一个任务的结果。
以惰性方式运行大型映射的特殊方式(按需)。请参阅下面的评论。
// 注册迭代器 taskqueue.registerTask({ 名称:'lazy_mapper', 基类:Queue.Iterator, // 此方法在任务开始和每个子任务结束时调用。它可以是 // 生成器函数或返回 `Promise` 的函数。 * iterate(state) {// ...// 三种可能的输出状态:结束、不执行任何操作和新数据。//// 1. `null` - 已到达结束,不应再调用迭代器。// 2. `{}` - 空闲,队列中有足够的子任务,稍后尝试调用 // 迭代器(当下一个子任务完成时)。// 3. {// state - 要记住的新迭代器状态(例如,偏移量// 数据库查询),任何可序列化的数据//任务 - 要推入队列的新子任务数组// }//// 重要!迭代器可以从不同的工作线程并行调用。 // 我们使用输入 `state` 来解决 redis 更新时的冲突。因此,如果您 // 创建新的子任务://// 1. 新的 `state` 必须不同(对于所有以前的状态)// 2. `tasks` 数组不得为空。//// 在其他情况下,您应该发出关于 'end' 或 'idle' 的信号。//// 无效的组合将导致 'end' + 错误事件。//return { state: newState,tasks: chunksArray}; }});// 运行 iteratorqueue.lazy_mapper().run();
为什么要发明这种疯狂的魔法?
想象一下,您需要重建 1000 万个论坛帖子。您希望将工作分成相等的小块,但帖子没有顺序整数枚举,只有 mongo ID。你能做什么?
直接skip
+ limit
请求对于任何数据库中的大集合来说都是非常昂贵的。
您不能按日期间隔拆分,因为第一篇文章到最后一篇文章的帖子密度差异很大。
您可以向每个帖子添加带有随机数的索引字段。然后按间隔分割。这可以工作,但会导致随机磁盘访问 - 不太酷。
解决方案是使用迭代映射器,它可以记住“先前的位置”。在这种情况下,您将执行range
+ limit
请求,而不是skip
+ limit
。这对于数据库来说效果很好。额外奖金有:
您不需要将所有子任务保留在队列中。例如,您可以创建 100 个块,并在前一个块即将完成时添加下一个 100 个。
映射阶段变得分散,您可以立即开始监控总体进度。
通过 docker Quik 运行 redis:
# startdocker run -d -p 6379:6379 --name redis1 redis# stopdocker 停止redis1 docker rm redis1
当然,我们熟悉 kue、celery 和 akka。我们的目标是在简单性和功能之间取得平衡。因此,我们不知道idoit
在具有数千个实例的集群中是否运行良好。但小体积应该没问题,而且使用起来也很方便。
kue 不能满足我们的需求,因为:
它的“优先级”概念不灵活,并且不能很好地防止繁重任务的锁定
没有任务分组/链接等
没有强有力的数据一致性保证
在idoit
中我们关心:
任务组/链操作&在任务之间传递数据(类似于celery)
工作池按类型隔离任务执行。
易于使用和安装(仅需要redis,可以在现有进程中运行)
存储数据的最终一致性
像内置调度程序一样必不可少的糖
用于巨大有效负载的迭代映射器(独特的功能,对于许多维护任务非常有用)
任务进度跟踪
避免全局锁
Redis 仍然可能是一个故障点,但出于简单性考虑,这是可以接受的价格。当然,您可以通过 RMQ 等分布式消息总线获得更好的可用性。但在许多情况下,让事情保持简单更为重要。借助idoit
您可以重用现有技术,而无需额外费用。