当前位置: 首页 > article >正文

NestJs bull 用法

bull简介

队列 bull

bull用法

https://github.com/OptimalBits/bull

Bull is currently in maintenance mode, we are only fixing bugs. For new features check BullMQ, a modern rewritten implementation in Typescript. You are still very welcome to use Bull if it suits your needs, which is a safe, battle tested librarySourceURL:https://github.com/OptimalBits/bull?tab=readme-ov-file 公牛目前处于维护模式,我们只是修复错误。有关新特性,请检查 BullMQ,这是一个用 Typecript 重写的现代实现。如果符合您的需要,您仍然非常欢迎使用 Bull,这是一个安全的、经过战斗考验的库。

真正用法用bullMq参考bullMq

bull参考bull

不过概念理解知识内容,多参考bullMq,文档更加丰富

https://github.com/taskforcesh/bullmq

What is BullMQ | BullMQ

@Injectable()
export class QueueConfigurationService implements OnApplicationBootstrap {
    constructor(
    ) {}

    private queueRegistry = new Map<string, Queue>();

    async onApplicationBootstrap() {
        const redisQueueConfig = {
            host: process.env['REDIS_HOST'],
            port: Number(process.env['REDIS_PORT']),
            password: process.env['REDIS_PASSWORD'],
            connectionTimeout: 30000,
            db: 4,
        };

        try {
            const standardQueue = new Queue('StandardQueue', { redis: redisQueueConfig });
            this.queueRegistry.set('StandardQueue', standardQueue);
        } catch (error) {
            throw error;
        }
    }

    async getQueueByLabel(queueLabel: string): Promise<Queue | undefined> {
        return this.queueRegistry.get(queueLabel);
    }
}


// 在其他地方使用
this.taskQueue = new Queue(serviceQueueName, {
    redis: {
        host: process.env['REDIS_HOST'],
        port: Number(process.env['REDIS_PORT']),
        password: process.env['REDIS_PASSWORD'],
        connectTimeout: 30000,
        db: 4,
    },
});
this.taskQueue.process(taskSubscriptionName, async (job, done) => {});

Nestjs bull用法

Documentation | NestJS - A progressive Node.js framework

import { Injectable, Inject } from '@nestjs/common';
import { InjectRedis } from 'nestjs-redis';
import { InjectQueue, BullModule } from '@nestjs/bull';
import { BaseService } from './base.service';
import { TypeOrmModule } from '@nestjs/typeorm';

// 常量定义
const NORMAL_QUEUE_NAME = 'normal_queue';
const PRIORITY_QUEUE_NAME = 'priority_queue';
const TEST_QUEUE_NAME = 'test_queue';

// 实现 OnApplicationBootstrap 接口
@Injectable()
export class QueueConfigService {
    // 构造函数
    constructor(
        @InjectRedis('redisDistribute') private readonly redisDistribute: any,
        @InjectQueue(NORMAL_QUEUE_NAME) private readonly normalQueue: any,
        @InjectQueue(PRIORITY_QUEUE_NAME) private readonly priorityQueue: any,
        @InjectQueue(TEST_QUEUE_NAME) private readonly testQueue: any,
        private readonly base: BaseService
    ) {}

    // 根据队列名称返回对应的队列实例
    getQueueByName(queueName: string): any {
        const queueMap = {
            [NORMAL_QUEUE_NAME]: this.normalQueue,
            [PRIORITY_QUEUE_NAME]: this.priorityQueue,
            [TEST_QUEUE_NAME]: this.testQueue,
        };
        return queueMap[queueName];
    }
}

// 模块定义
@Module({
    imports: [
        BullModule.registerQueueAsync([
            {
                name: NORMAL_QUEUE_NAME,
                useFactory: () => ({
                    redis: {
                        host: process.env['REDIS_HOST'],
                        port: +process.env['REDIS_PORT'],
                        password: process.env['REDIS_PASSWORD'],
                    },
                }),
            },
            {
                name: PRIORITY_QUEUE_NAME,
                useFactory: () => ({
                    redis: {
                        host: process.env['REDIS_HOST'],
                        port: +process.env['REDIS_PORT'],
                        password: process.env['REDIS_PASSWORD'],
                    },
                }),
            },
            {
                name: TEST_QUEUE_NAME,
                useFactory: () => ({
                    redis: {
                        host: process.env['REDIS_HOST'],
                        port: +process.env['REDIS_PORT'],
                        password: process.env['REDIS_PASSWORD'],
                    },
                }),
            },
        ]),
        BaseModule,
    ],
    providers: [QueueConfigService],
    exports: [QueueConfigService],
})
export class QueueConfigModule {}

queueMap[TEST_QUEUE_NAME].add('test', 20);

@Processor('TEST_QUEUE_NAME')
export class QueneService {
  constructor(@InjectQueue('task') private readonly taskQueue: Queue) {}

  @Process('test')
  async processTask(job: Job<number>) {
    console.log('Processing', job);
    console.log('Processing done', job.id);
  }
}

http://www.kler.cn/news/284051.html

相关文章:

  • Python结构语句总结
  • 麒麟V10(x86_64)安装部署MySQL-5.1.70
  • 这个项目所需的配置文件和依赖
  • 280Hz显示器 - HKC G27H3显示器
  • 树、二叉树
  • MP条件构造器之常用功能详解(between、notBetween、like)
  • 为什么在JDBC中使用PreparedStatement?
  • HCIP笔记9-BGP(3)
  • Day51 | 117. 软件构建(拓扑排序)47. 参加科学大会 dijkstra(朴素版)
  • JavaScript 小测验 toString
  • 无人机之使用技巧篇
  • Tomcat Manager 上传 war 包大小的限制
  • SpringBoot配置MybatisPlus
  • Docker基本使用:根据mysql镜像创建mysql容器
  • 四、监控搭建-Prometheus-采集端批量部署
  • TCP/UDP的对比,粘包分包抓包,http协议
  • MYSQL集群技术
  • JavaScript 网页设计案例
  • 12、stm32通过dht11读取温湿度
  • Elasticsearch简单介绍
  • 智慧高校迎新服务平台的设计与实现---附源码92489
  • http方法调用接口
  • Django如何实现websocket
  • 【工作实践】MVEL 2.x语法指南
  • vscode在html中的使用
  • 多进程比多线程开销大的原因
  • 海绵城市雨水监测系统
  • C++:Github开源7.8Kstar的线程池介绍
  • 如何在没有密码的情况下解锁 Oppo 手机?5 种简单的方法
  • hadoop日志文件