Membuat Queue Job Nestjs Menggunakan Bull

By Rizky Kurniawan - December 23, 2023 ~7 mins read

RUANGDEVELOPER.com - Queue job (tugas antrean) adalah konsep dalam pemrograman dan pengembangan aplikasi yang digunakan untuk menangani tugas atau pekerjaan secara asynchronous. Pada dasarnya, konsep ini melibatkan pengiriman tugas atau pekerjaan ke dalam antrian (queue) untuk dieksekusi nanti, tanpa memblokir eksekusi program utama.

NestJS adalah salah satu framework yang menyediakan cara yang simpel bagi kita untuk membuat queue job, yaitu dengan menggunakan package Bull. Bull menyediakan cara untuk membuat, mengelola, dan mengeksekusi antrian pekerjaan secara asynchronous di lingkungan Node.js. NestJs telah mendukung penuh penggunaan Bull untuk mengelola antrean yang telah didokumentasikan di sini.

Yuk kita gas… 🚀

Menyiapkan Project

Membuat project NestJS

nest new nest-queue

Install dependency

yarn add @nestjs/bull bull @bull-board/nestjs @bull-board/api @bull-board/express @nestjs/axios axios

Konfigurasi Bull & Bull Dashboard

Untuk menggunakan Bull Queue kita perlu memiliki redis server sebagai database untuk menyimpan job yang kita buat. Maka dari itu, pastikan kamu sudah memiliki server redis yang jalan di local / server hosting kamu.

Buka file app.module.ts dan import BullModule serta sesuaikan koneksi redis server.

@Module({
  imports: [
    // Import BullModule
    BullModule.forRoot({
      redis: {
        host: "localhost",
        port: 6379,
      },
    }),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

Masih pada file app.module.ts, registrasikan queue yang ingin kita buat dengan menggunakan method BullModule.registerQueue()

@Module({
  imports: [
    // Import BullModule
    BullModule.forRoot({
      redis: {
        host: "localhost",
        port: 6379,
      },
    }),
    BullModule.registerQueue(
      // Registrasikan queue dengan nama network-request-queue
      {
        name: "network-request-queue",
      }
    ),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

Agar kita bisa memonitor queue yang telah diregistrasikan, kita import juga konfigurasi BullBoard.

@Module({
  imports: [
    BullModule.forRoot({
      redis: {
        host: "localhost",
        port: 6379,
      },
    }),
    BullModule.registerQueue({
      name: "network-request-queue",
    }),
    // Konfigurasi BullBoard
    BullBoardModule.forRoot({
      route: "/queue-monitor",
      adapter: ExpressAdapter,
    }),
    // Registrasikan queue ke BullBoard
    BullBoardModule.forFeature({
      name: "network-request-queue",
      adapter: BullAdapter,
    }),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

Pada bagian konfigurasi BullBoard, beberapa konfigurasi yang kita atur adalah sebagai berikut:

  • route: Ini adalah konfigurasi rute URL tempat BullBoard akan diakses. Dalam contoh ini, dashboard akan diakses melalui ‘/queue-monitor’.
  • adapter: Ini untuk menentukan jenis adapter yang digunakan oleh BullBoard. Dalam kasus ini, kita menggunakan ExpressAdapter, yang menyesuaikan BullBoard dengan aplikasi Express.

Dan pada bagian registrasi queue ke BullBoard, beberapa konfigurasi yang kita atur adalah sebagai berikut:

  • name: Ini adalah konfigurasi nama queue yang akan didaftarkan ke BullBoard. Dalam contoh ini, queue bernama ’network-request-queue'.
  • adapter: Ini untuk menentukan jenis adapter yang digunakan untuk queue yang didaftarkan. Dalam kasus ini, kita menggunakan BullAdapter, yang memungkinkan BullBoard berkomunikasi dengan queue yang digunakan oleh Bull.

Setelah semua beres, mari kita coba menjalankan aplikasi untuk memasikan semua konfigurasi sudah benar.

yarn start:dev

Akses aplikasi melalui url http://127.0.0.1:3000/queue-monitor, pastikan kamu dapat melihat queue dengan nama sesuai dengan konfigurasi kita.

Bull Dashboard

Membuat Queue Producer

Queue producer adalah komponen yang menempatkan tugas atau pesan ke dalam queue. Komponen ini membuat atau menghasilkan pekerjaan yang nantinya akan dieksekusi secara asinkron oleh konsumen queue (queue consumer). Produsen menangani penempatan tugas, penjadwalan, dan komunikasi antar service.

Untuk membuat queue producer, kita bisa menggunakan file app.service.ts. Mari kita buka file tersebut, kemudian ganti kodenya menjadi seperti berikut:

import { InjectQueue } from "@nestjs/bull";
import { Injectable } from "@nestjs/common";
import { Queue } from "bull";

@Injectable()
export class AppService {
  constructor(
    @InjectQueue("network-request-queue")
    private readonly networkRequestQueue: Queue
  ) {}

  async addNetworkRequestJob(data: { url: string }) {
    await this.networkRequestQueue.add(data);
  }
}

Pada class service tersebut, kita inject queue yang sudah kita registrasikan sebelumnya. Kemudian kita juga menambahkan satu method untuk menambahkan job ke dalam queue dengan data berupa alamat URL untuk melakukan network request.

Sederhananya kita akan membuat queue job yang akan melakukan network request ke URL yang dikirimkan pada data queue.

Membuat Queue Consumer

Queue consumer adalah komponen dalam sistem yang bertugas mengambil, memproses, dan mengeksekusi tugas atau pesan dari antrian. Ini memungkinkan eksekusi tugas secara asinkron, meningkatkan responsivitas sistem. Consumer terus memantau antrian, eksekusi tugas, dan mengelola kesalahan. Skalabilitas dapat ditingkatkan dengan menambahkan lebih banyak instance.

Untuk membuat queue consumer, silahkan buat sebuah file baru bernama app.consumer.ts dan tambahkan kode seperti berikut:

import {
  OnQueueActive,
  OnQueueCompleted,
  OnQueueFailed,
  Process,
  Processor,
} from "@nestjs/bull";
import { Logger } from "@nestjs/common";
import { Job } from "bull";

@Processor("network-request-queue")
export class AppConsumer {
  @Process()
  async process(job: Job<{ url: string }>) {
    Logger.log(`Processing job ${job.id}`);
    Logger.log(job.data);
  }

  @OnQueueActive()
  onActive(job: Job) {
    Logger.log(`Processing job ${job.id} of type ${job.name}`);
  }

  @OnQueueCompleted()
  onCompleted(job: Job, result: any) {
    Logger.log(`Completed job ${job.id} of type ${job.name}`);
    Logger.log(result);
  }

  @OnQueueFailed()
  onError(job: Job<any>, error: any) {
    Logger.error(`Failed job ${job.id} of type ${job.name}: ${error.message}`);
  }
}
  • @Process() - process(job: Job<{ url: string }>):

    Method ini menandakan bahwa fungsi process akan menangani eksekusi job ketika diambil dari queue. Fungsi ini menerima parameter job, yang merupakan representasi dari job yang akan dieksekusi. Pada contoh ini, log dicetak untuk menunjukkan bahwa job sedang diproses bersama dengan mencetak data dari job.

  • @OnQueueActive() - onActive(job: Job):

    Method ini akan dipanggil ketika suatu job aktif atau sedang dieksekusi oleh queue consumer. Menerima parameter job yang menyediakan informasi tentang job yang aktif. Pada contoh ini, log dicetak untuk menunjukkan bahwa job sedang aktif atau dalam proses eksekusi.

  • @OnQueueCompleted() - onCompleted(job: Job, result: any):

    Method ini akan dipanggil setelah job berhasil diselesaikan (tanpa kesalahan). Menerima dua parameter: job yang menyediakan informasi tentang job yang selesai dan result yang mewakili hasil dari eksekusi job. Pada contoh ini, log dicetak untuk menunjukkan bahwa job telah diselesaikan bersama dengan mencetak hasil eksekusi job.

  • @OnQueueFailed() - onError(job: Job<any>, error: any):

    Method ini akan dipanggil jika job gagal dieksekusi (terjadi kesalahan). Menerima dua parameter: job yang menyediakan informasi tentang job yang gagal dan error yang merupakan objek kesalahan. Pada contoh ini, log error dicetak untuk menunjukkan bahwa job gagal dan mencetak pesan kesalahan. Dengan menggunakan decorator seperti @Process, @OnQueueActive, @OnQueueCompleted, dan @OnQueueFailed, Anda dapat dengan mudah menentukan logika yang berbeda untuk berbagai tahap dalam siklus hidup eksekusi job di dalam queue pekerjaan.

Setelah queue consumer dibuat, kita perlu menambahkannya ke dalam array providers pada kelas AppModule. Ini bertujuan agar consumer kita bisa digunakan untuk mengeksekusi job yang kita tambahkan.

@Module({
  /**
   * Kode lainnya...
   */
  providers: [AppService, AppConsumer],
})
export class AppModule {}

Konfigurasi HTTP Module

Setelah membuat queue consumer, berikutnya kita akan mengkonfigurasi HTTP module untuk melakukan network request. Silahkan buka kembali file app.module.ts dan import HttpModule ke dalam AppModule.

@Module({
  imports: [
    /**
     * Kode import lainnya
     */
    HttpModule,
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

Modifikasi Method process Pada Queue Consumer

Setelah HttpModule diimport, kita dapat menggunakan HttpService pada queue consumer untuk melakukan network request. Buka kembali file app.consumer.ts kemudian Inject HttpService ke kelas AppConsumer.

@Injectable()
@Processor("network-request-queue")
export class AppConsumer {
  constructor(private readonly httpService: HttpService) {}
  /**
   * Kode lainnya...
   */
}

Setelah itu modifikasi method process menjadi seperti ini:

@Process()
async process(job: Job<{ url: string }>) {
  Logger.log(`Processing job ${job.id}`);
  Logger.log(job.data);
  const { data } = await firstValueFrom(this.httpService.get(job.data.url).pipe(
    catchError((error) => {
      Logger.error(error);
      throw error;
    })
  ));

  return data;
}

Menambahkan Queue Job

Kita telah memiliki kelas AppService yang berisi method addNetworkRequestJob untuk menambahkan queue job. Untuk menggunakannya kita membuat sebuah endpoint baru di controller dan mengeksekusi method addNetworkRequestJob. Silahkan buka file app.controller.ts dan modifikasi menjadi seperti berikut ini:

import { Body, Controller, Post } from "@nestjs/common";
import { AppService } from "./app.service";

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}

  @Post("/add-job")
  async addJob(@Body() data: { url: string }) {
    await this.appService.addNetworkRequestJob({
      url: data.url,
    });

    return {
      message: "Job added",
    };
  }
}

Pada contoller tersebut, kita menambahkan endpoint /add-job yang menerima request body berisi url untuk kemudian kita gunakan pada queue job.

Pengujian Queue Job

Semua konfigurasi dan kode sudah selesai dibuat, saatnya kita mencoba aplikasi yang kita buat.

  • Pastikan redis server kamu telah diaktifkan

  • Jalankan aplikasi

    yarn start:dev
    
  • Buka queue monitor (http://127.0.0.1:3000/queue-monitor)

  • Lakukan post request ke endpoint untuk menambahkan job Pada bagian ini saya menggunakan ekstensi vscode bernama rest-client, kamu juga bisa menggunakan cara lain seperti menggunakan Postman atau Insomnia

    Post Request Untuk Menambahkan Job

  • Periksa queue job melalui queue monitor dashboard Pastikan kamu dapat melihat queue yang sebelumnya sudah ditambahkan.

    Queue Job Terlihat Pada Queue Monitor
    Pada bagian ini, queue job saya terdapat pada tab Completed yang berarti sudah selesai dieksekusi. Kamu juga mungkin akan melihat queue job yang kamu tambahkan berada pada tab Active, yang berarti proses sedang berlangsung.

Selamat kamu sudah berhasil membuat queue job menggunakan Bull. Masih banyak konfigurasi queue job yang dapat diterapkan untuk menyesuaikan dengan kebutuhan aplikasi kamu. Selengkapnya, kamu bisa periksa langsung di sumber berikut:

Source code project pada tutorial ini dapat kamu lihat di link berikut:

Bagikan:

Ingin Berdiskusi?

Yuk bergabung di Grup Telegram Ruang Developer atau mulai diskusi melalui GitHub. See You!

Dapatkan contoh source code project backend, frontend, atau fullstack untuk kamu amati, tiru, dan modifikasi sesuka hati. Klik untuk melihat detail!
comments powered by Disqus

Berlangganan Gratis

Kamu akan menerima email update dari Ruang Developer

Beri Dukungan

Beri dukungan, dapatkan full source code project web untuk bahan referensi, tiru, dan modifikasi.
Lightbox