NestJs bull 用法
bull简介
队列 bull
bull用法
https://github.com/OptimalBits/bull
Bull is currently in maintenance mode, we are only fixing bugs. For new features check BullMQ, a modern rewritten implementation in Typescript. You are still very welcome to use Bull if it suits your needs, which is a safe, battle tested librarySourceURL:https://github.com/OptimalBits/bull?tab=readme-ov-file 公牛目前处于维护模式,我们只是修复错误。有关新特性,请检查 BullMQ,这是一个用 Typecript 重写的现代实现。如果符合您的需要,您仍然非常欢迎使用 Bull,这是一个安全的、经过战斗考验的库。
真正用法用bullMq参考bullMq
bull参考bull
不过概念理解知识内容,多参考bullMq,文档更加丰富
https://github.com/taskforcesh/bullmq
What is BullMQ | BullMQ
@Injectable()
export class QueueConfigurationService implements OnApplicationBootstrap {
constructor(
) {}
private queueRegistry = new Map<string, Queue>();
async onApplicationBootstrap() {
const redisQueueConfig = {
host: process.env['REDIS_HOST'],
port: Number(process.env['REDIS_PORT']),
password: process.env['REDIS_PASSWORD'],
connectionTimeout: 30000,
db: 4,
};
try {
const standardQueue = new Queue('StandardQueue', { redis: redisQueueConfig });
this.queueRegistry.set('StandardQueue', standardQueue);
} catch (error) {
throw error;
}
}
async getQueueByLabel(queueLabel: string): Promise<Queue | undefined> {
return this.queueRegistry.get(queueLabel);
}
}
// 在其他地方使用
this.taskQueue = new Queue(serviceQueueName, {
redis: {
host: process.env['REDIS_HOST'],
port: Number(process.env['REDIS_PORT']),
password: process.env['REDIS_PASSWORD'],
connectTimeout: 30000,
db: 4,
},
});
this.taskQueue.process(taskSubscriptionName, async (job, done) => {});
Nestjs bull用法
Documentation | NestJS - A progressive Node.js framework
import { Injectable, Inject } from '@nestjs/common';
import { InjectRedis } from 'nestjs-redis';
import { InjectQueue, BullModule } from '@nestjs/bull';
import { BaseService } from './base.service';
import { TypeOrmModule } from '@nestjs/typeorm';
// 常量定义
const NORMAL_QUEUE_NAME = 'normal_queue';
const PRIORITY_QUEUE_NAME = 'priority_queue';
const TEST_QUEUE_NAME = 'test_queue';
// 实现 OnApplicationBootstrap 接口
@Injectable()
export class QueueConfigService {
// 构造函数
constructor(
@InjectRedis('redisDistribute') private readonly redisDistribute: any,
@InjectQueue(NORMAL_QUEUE_NAME) private readonly normalQueue: any,
@InjectQueue(PRIORITY_QUEUE_NAME) private readonly priorityQueue: any,
@InjectQueue(TEST_QUEUE_NAME) private readonly testQueue: any,
private readonly base: BaseService
) {}
// 根据队列名称返回对应的队列实例
getQueueByName(queueName: string): any {
const queueMap = {
[NORMAL_QUEUE_NAME]: this.normalQueue,
[PRIORITY_QUEUE_NAME]: this.priorityQueue,
[TEST_QUEUE_NAME]: this.testQueue,
};
return queueMap[queueName];
}
}
// 模块定义
@Module({
imports: [
BullModule.registerQueueAsync([
{
name: NORMAL_QUEUE_NAME,
useFactory: () => ({
redis: {
host: process.env['REDIS_HOST'],
port: +process.env['REDIS_PORT'],
password: process.env['REDIS_PASSWORD'],
},
}),
},
{
name: PRIORITY_QUEUE_NAME,
useFactory: () => ({
redis: {
host: process.env['REDIS_HOST'],
port: +process.env['REDIS_PORT'],
password: process.env['REDIS_PASSWORD'],
},
}),
},
{
name: TEST_QUEUE_NAME,
useFactory: () => ({
redis: {
host: process.env['REDIS_HOST'],
port: +process.env['REDIS_PORT'],
password: process.env['REDIS_PASSWORD'],
},
}),
},
]),
BaseModule,
],
providers: [QueueConfigService],
exports: [QueueConfigService],
})
export class QueueConfigModule {}
queueMap[TEST_QUEUE_NAME].add('test', 20);
@Processor('TEST_QUEUE_NAME')
export class QueneService {
constructor(@InjectQueue('task') private readonly taskQueue: Queue) {}
@Process('test')
async processTask(job: Job<number>) {
console.log('Processing', job);
console.log('Processing done', job.id);
}
}