Author: Wamaitha Nyamu
Last Updated: Mon, Aug 21, 2023Asynchronous programming is a paradigm that enables task execution concurrently and independently. Tasks do not wait for each other to complete before running next. BullMQ is a queueing system that's implemented on top of Redis to deliver a fast and scalable implementation of a distributed queue.
This article explains how to handle asynchronous tasks with Node.js and BullMQ. You are to create a queueing system that extracts audio from any given video. This is useful when converting videos into audio that for sharing on other platforms such as podcasts.
Before you begin:
Deploy a Ubuntu 22.04 server on Vultr
Switch to the user
# su example-user
Install Nginx on the server
Redis is an in-memory database commonly used as a cache or message broker. The in-memory capability makes Redis a low-latency and high-throughput option. In this section, install the Redis database server and use it to store videos in a queue before processing them into audio.
Redis is available in the main Ubuntu APT repository, update the server packages
$ sudo apt update
Install Redis server
$ sudo apt install redis-server
BullMQ requires a Redis version higher than 5.0. Verify the installed version
$ redis-server --version
Output:
Redis server v=6.0.16 sha=00000000:0 malloc=jemalloc-5.2.1 bits=64 build=a3fdef44459b3ad6
Verify that the Redis database is running correctly
$ redis-cli PING
Output:
PONG
BULLMQ
is the project's root directory, and below is the proper directory tree.
BULLMQ
backend
|_ utils
| audioUtils.js
|_ queues
| audioQueue.js
|_ routes
| audioRoutes.js
|_ public
|_ audios
|_ videos
| index.js
views
| form.ejs
| head.ejs
| result.ejs
public
|_css
| main.css
Switch to your user home directory
$ cd /home/example-user/
Create the project directories using the following command
$ mkdir -p BULLMQ/backend/utils && touch BULLMQ/backend/utils/audioUtils.js && mkdir -p BULLMQ/backend/queues && touch BULLMQ/backend/queues/audioQueue.js && mkdir -p BULLMQ/backend/routes && touch BULLMQ/backend/routes/audioRoutes.js && mkdir -p BULLMQ/backend/public/audios && mkdir -p BULLMQ/backend/public/videos && touch BULLMQ/backend/index.js && mkdir -p BULLMQ/views && touch BULLMQ/views/form.ejs && touch BULLMQ/views/head.ejs && touch BULLMQ/views/result.ejs && mkdir -p BULLMQ/public/css && touch BULLMQ/public/css/main.css
Install all project dependencies using the following command
$ npm install @bull-board/api @bull-board/express bullmq express express-fileupload fluent-ffmpeg ioredis
Below is what each dependency handles in the project:
@bull-board/api
- Provides an API for monitoring and managing Bull queues
@bull-board/express
- Acts as a middleware that adds a monitoring dashboard between Express.js and the Bull monitoring dashboard
bullmq
- The queue management system capable of handling background jobs and messages
express
- The Web application framework for building REST APIs
express-fileupload
- Enables file uploads in Express
fluent-ffmpeg
- A command line tool that opens a connection to FFmpeg
ioredis
- Enables a connection to the Redis database
Install the pm2
process manager package
$ sudo npm install -g pm2
The above command installs the pm2
package used to load balance, monitor processes, clustering, and perform automatic restarts
By functionality, pm2
monitors the server created by express
. The express-fileupload
acts as a middleware that handles the video uploads. The system adds the video to the bullmq
queue for processing, and fluent-ffmpeg
fetches and processes the uploaded video in the queue
In the background, the queue utilizes a Redis connection enabled by ioredis
. @bull-board/api
and @bull-board/express
enable a user interface that visualizes how and when the jobs in the queue get processed
When successful, install FFmpeg
$ sudo apt install ffmpeg
The above command install FFmpeg used to convert different video and audio formats, resize videos, alter sample rates, and capture video or audio streams
On the web interface, video uploads use a form to select the target source file. In this section, use ejs
to render the web pages required for this project. ejs
is a templating engine commonly used with express
to generate HTML within Javascript, and it's suitable for rendering dynamic content. Set up the necessary ejs
files as described in the steps below.
Using a Linux text editor such as Nano
, edit the BULLMQ/views/head.ejs
file
$ nano BULLMQ/views/head.ejs
Add the following code to the file
<head>
<meta charset="UTF-8" />
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Image Processor</title>
<link rel="stylesheet" href="#" />
</head>
Save and close the file
The above code links the CSS file used to style all web pages in the project.
Edit the BULLMQ/views/form.ejs
file
$ nano BULLMQ/views/form.ejs
Add the following code to the file
<!DOCTYPE html>
<html lang="en">
<%- include('./head'); %>
<body>
<div class="home-wrapper">
<h1>Video Processor</h1>
<form action="/upload" method="POST" enctype="multipart/form-data">
<input
type="file"
name="fileupload"
placeholder="Select Video from your computer"
/>
<button type="submit">Upload Video</button>
</form>
</div>
</body>
</html>
Save and close the file
The above code includes the CSS styles linked in head.ejs
, and a form that accepts a POST request to the /upload
endpoint.
The form only accepts files and assigns a name attribute of fileupload
to the uploaded video. The name attribute identifies the file on the backend. When a user clicks the submit button, it triggers the form submission method.
The final audio files display through the BULLMQ/views/result.ejs
file, edit it to declare new code elements
$ nano BULLMQ/views/result.ejs
Add the following code to the file
<!DOCTYPE html>
<html lang="en">
<%- include('./head'); %>
<body>
<div class="gallery-wrapper">
<% if (audioFiles.length > 0) { %>
<p>The following are the processed audios:</p>
<% for (let audioFile of audioFiles) { %>
<audio controls>
<source src="" type="audio/mp3">
Your browser does not support the audio element.
</audio>
<br>
<% } %>
<% } else { %>
<p>
The audio is being processed. Refresh after a few seconds to listen to the extracted audios.
</p>
<% } %>
</div>
</body>
</html>
Save and close the file
The above webpage code displays processed audio. When a user uploads a video using form.ejs
, the request redirects to result.ejs
with a gallery view of the processed audio files. The <% if (audioFiles.length > 0) { %>
directive verifies if available audios are successfully processed. When ready, they display on the webpage.
To design the web page, edit the BULLMQ/public/css/main.css
file
$ nano BULLMQ/public/css/main.css
Add the following code to style the web page
body { background: #f8f8f8; }
h1 { text-align: center; }
p { margin-bottom: 20px; }
/** Styles for the "Choose File" button **/
button[type="submit"] {
background: none;
border: 1px solid orange;
padding: 10px 30px;
border-radius: 30px;
transition: all 1s;
}
button[type="submit"]:hover {
background: orange;
}
/** Styles for the "Upload Image" button **/
input[type="file"]::file-selector-button {
border: 2px solid #2196f3;
padding: 10px 20px;
border-radius: 0.2em;
background-color: #2196f3;
}
ul {
list-style: none;
padding: 0;
display: flex;
flex-wrap: wrap;
gap: 20px;
}
.home-wrapper {
max-width: 500px;
margin: 0 auto;
padding-top: 100px;
}
.gallery-wrapper {
max-width: 1200px;
margin: 0 auto;
}
Save and close the file
In this section, create an express server that serves the following routes:
/admin
- serves the Bull Dashboard
/
- renders the form.ejs
and allows video uploads
/result
- renders the result.ejs
with all the processed audio files.
/upload
- POST endpoint adds the uploaded video to the job queue awaiting processing, then redirects to the /result
.
Edit the main express file BULLMQ/backend/index.js
$ nano BULLMQ/backend/index.js
Add the following code to the file
const express = require("express");
const bodyParser = require("body-parser");
const fileUpload = require("express-fileupload");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");
const path = require("path");
const audioRoutes = require("./routes/audioRoutes");
const { audioJobQueue, worker, queueEvents } = require("./queues/audioQueue");
const serverAdapter = new ExpressAdapter();
createBullBoard({
queues: [new BullMQAdapter(audioJobQueue)],
serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");
const app = express();
app.use(express.static(path.join(__dirname, "public")));
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: true }));
app.use(fileUpload());
app.use(express.static("public"));
app.use("/admin", serverAdapter.getRouter());
app.use("/", audioRoutes);
queueEvents.on("completed", () => {
console.log(`Job Initialized to Queue`);
});
worker
.on("failed", (job, err) => {
console.log(`Job ${job} failed with error: ${err.message}`);
})
.on("error", (err) => {
console.log("Worker error:", err);
})
.on("stalled", (job) => {
console.log(`Job ${job} stalled`);
});
app.listen(4000, function () {
console.log("Server running on port 4000");
});
Below is the code breakdown:
Imports
createBullBoard
, BullMQAdapter
, ExpressAdapter
: These modules set up the BullDashboard to monitor and manage BullMQ queues
path
: The inbuilt NodeJs module for working with file paths
audioRoutes
: Custom module defined in BULLMQ/backend/routes/audioRoutes
that configures routes used in the application
audioJobQueue
, worker
queueEvents
: Modules that implement the BullMQ modules to set up the queue, worker, and queue events
BullBoard Setup
serverAdapter
is an instance of the ExpressAdapter
. The serverAdapter
integrates the BullDashboard into an express application. A BullBoard passes the queues and the serverAdapter
to the createBullBoard
function
serverAdapter.setBasePath
sets the path to the dashboard as /admin
. The express server runs on port 4000
, and so, the administrator page is accessible using the URL localhost:4000/admin
Setting up express
An express instance starts as app
, and the backend/public
directory served by the express.static
function contains uploaded videos. ejs
is the view engine used to render the ejs templates in the views
folder
bodyparser
and fileUpload
middleware handle JSON form data and uploads respectively.
BULLMQ/public/
used by express.static
as the static directory, contains styles used by ejs
templates
The /admin
route serves the BullDashboard while the /
route serves the base route for routers in the audioRoutes
function
Queue events and workers
Adds the completed
event listener to the queueEvents instance, and it logs a message per job addition
failed
, error
, and stalled
event listeners added to the worker
instance handle worker errors, failed jobs, and stalled jobs
Starting the server
4000
with a callback function that logs a message to the terminalTo define routers, edit the audioRoutes.js
file
$ nano BULLMQ/backend/routes/audioRoutes.js
Add the following code to the file
const express = require("express");
const path = require("path");
const fs = require("fs");
const { audioJobQueue } = require("../queues/audioQueue");
const router = express.Router();
router.get("/", function (req, res) {
res.render("form");
});
router.get("/result", (req, res) => {
const audioDirPath = path.join(__dirname, "../public", "audios");
const audioFiles = fs.readdirSync(audioDirPath).map((audio) => {
console.log("audios is ",audio)
const decodedAudioName = decodeURIComponent(audio);
console.log("decoded audio is ",decodedAudioName);
return `/audios/${audio}`;
});
res.render("result", { audioFiles });
});
router.post("/upload", async function (req, res) {
try {
const {fileupload } = req.files;
console.log("Uploading:", fileupload.name);
if (!fileupload) return res.sendStatus(400);
const videoFolderPath = path.join(__dirname, "../public", "videos");
const videoFilePath = path.join(videoFolderPath, encodeURIComponent(fileupload.name));
await fileupload.mv(videoFilePath);
const videoPath = `/videos/${encodeURIComponent(fileupload.name)}`;
await audioJobQueue.add("audioProcess", {
video_path: videoPath,
});
res.redirect("/result");
} catch (error) {
console.log(error);
res.status(500).json({ error: "Failed to process video" });
}
});
module.exports = router;
Save and close the file
Below is what the code does:
An instance of the express.Router()
function initializes as router
router.get("/")
defines a GET route that renders the form.ejs
template
router.get("/result")
defines a GET route that retrieves all the audio file names in the public/audio
directory as an array. The array forwards as audioFiles
to the result
view
router.post("/upload")
defines a POST route that handles file uploads used by the form.ejs
template. The uploaded file has a name field of fileuploads
in the req.files
object. The file saves to the public/videos
directory if it exists
The video path links to audioJobQueue
, through the add method, and the audioProcess
references the job type added to the queue. audioProcess
is an arbitrary name that describe the job type added to the queue
To establish a connection to the Redis database, edit the audioQueue.js
file
$ nano BULLMQ/backend/queues/audioQueue.js
Add the following code to the file
const { Queue, Worker, QueueEvents } = require("bullmq");
const audioProcess = require("../utils/audioUtils");
const Redis = require('ioredis');
const connectionToRedis = new Redis({
host: "localhost",
port: 6379,
});
const audioJobQueue = new Queue("videoJobQueue", {
connection: connectionToRedis,
});
const worker = new Worker("videoJobQueue", async (job) => {
console.log("job added to queue")
try {
await audioProcess(job);
}
catch (error) {
console.log(`Job ${job.id} failed with error: ${error.message}`);
}
});
const queueEvents = new QueueEvents("videoJobQueue");
module.exports = {
audioJobQueue,
worker,
queueEvents,
};
Save and close the file
Below is what the code does:
Queue
, Worker
, and QueueEvents
are classes provisioned by the bullmq
package. Workers
is the process responsible for processing jobs. QueueEvents
provides a way to monitor the progress or status of a job in the queue
audioProcess
is a module in the utils/audioUtils
that performs the logic for converting videos to audio
ioredis
is the NodeJS Redis client that connects to the Redis database
connectionToRedis
instantiates a Redis connection with the default connection parameters
audioJobQueue
instantiates a Queue that utilizes the defined Redis connection at connectionToRedis
worker
is an instance of the Worker class. The worker class listens for jobs added to the videoJobQueue
queue. The catch block in the function logs any resulting errors
queueEvents
listens to events of all jobs added to the videoJobQueue
To implement the audioProcess
function which handles the conversion process using FFmpeg, edit the audioUtils
file
$ nano BULLMQ/backend/utils/audioUtils
Add the following code to the file
const ffmpeg = require("fluent-ffmpeg");
const path = require("path");
const fs = require('fs');
const audioProcess = async (job) => {
console.log("Processing job:", job.id);
const { video_path } = job.data;
console.log("job data is ", job.data);
const videoFilePath = path.join(__dirname, "../public", video_path);
const timestamp = Date.now();
const newFileName = `${timestamp}.mp3`; // Rename the file
const audioFolderPath = path.join(__dirname, "../public", "audios");
const audioFilePath = path.join(audioFolderPath, newFileName);
const convertVideoToAudio = ffmpeg(videoFilePath)
.toFormat("mp3")
.save(audioFilePath)
.on("error", (err) => {
console.error("FFmpeg error. Job processing terminated.", err);
})
.on("end", () => {
console.log("Video successfully converted to audio.....");
// Update job status to "complete"
job.updateProgress(100);
console.log("Job is successfully completed");
});
convertVideoToAudio.run();
};
module.exports = audioProcess;
Save and close the file
Below is what the above code does:
audioProcess
is an asynchronous function that takes a job object as an argument. The video_path
from the job.data
object extracts, and video_path
is the location of the video that needs processing to audio
path
defines an absolute path to the video using the video_path
and the base directory path. This becomes the videoFilePath
.
audiofilePath
joins the base path, audios folder path, and the newFileName
with a timestamp. This means the audio extracted from the video renames to the server timestamp. This ensures that each audio has a unique name.
convertVideoToAudio
initializes the ffmpeg
command, the video to convert, and the conversion to mp3 with the .toFormat("mp3")
option. .save
specifies where to save the mp3 file as defined by audiofilePath
.
In case of an error, the .on(error,..)
event handler logs the error to the console. When the conversion is complete, the .on("end",...)
event triggers and the job progress updates to 100.
To ensure that the server runs 24/7 without interruptions, use a process manager such as PM2. In this section, switch to the project directory and start the server using pm2
as described in the steps below.
Switch to the BULLMQ
directory.
$ cd BULLMQ
Start the server
$ pm2 start backend/index.js
The above command starts the application server as a background process managed by pm2
Your output should look like one below.
__/\\\\\\\\\\\\\____/\\\\____________/\\\\____/\\\\\\\\\_____
_\/\\\/////////\\\_\/\\\\\\________/\\\\\\__/\\\///////\\\___
_\/\\\_______\/\\\_\/\\\//\\\____/\\\//\\\_\///______\//\\\__
_\/\\\\\\\\\\\\\/__\/\\\\///\\\/\\\/_\/\\\___________/\\\/___
_\/\\\/////////____\/\\\__\///\\\/___\/\\\________/\\\//_____
_\/\\\_____________\/\\\____\///_____\/\\\_____/\\\//________
_\/\\\_____________\/\\\_____________\/\\\___/\\\/___________
_\/\\\_____________\/\\\_____________\/\\\__/\\\\\\\\\\\\\\\_
_\///______________\///______________\///__\///////////////__
[PM2] Spawning PM2 daemon with pm2_home=/home/example-user/.pm2
[PM2] PM2 Successfully daemonized
[PM2] Starting /home/example-user/BULLMQ/backend/index.js in fork_mode (1 instance)
[PM2] Done.
To view the process logs, run the following command:
$ pm2 logs
When the application is running, the following output should display in your server logs:
0|index | Server running on port 4000
Press :Key:Ctrl: + :key:C: to stop the pm2
log output
Test that the application server listens on port 4000
.
To secure the server, expose the HTTP port 80
instead of the backend application port 4000
for access. Configure Nginx as a Reverse Proxy as described in the steps below.
Allow the HTTP port 80
through the Firewall.
$ sudo ufw allow 80/tcp
The above command allows incoming HTTP requests to the server through the default UFW firewall.
Disable the default Nginx configuration file
$ sudo rm /etc/nginx/sites-enabled/default
Create a new host configuration file
$ sudo touch /etc/nginx/sites-available/bullmq
Edit the file
$ sudo nano /etc/nginx/sites-available/bullmq
Add the following configurations to the file
server {
listen 80 default_server;
listen [::]:80 default_server;
server_name _;
location / {
proxy_pass http://127.0.0.1:4000;
try_files $uri $uri/ =404;
}
}
Save and close the file
Enable the configuration file
$ sudo ln -s /etc/nginx/sites-available/bullmq /etc/nginx/sites-enabled/bullmq
Test the Nginx configuration for errors
$ sudo nginx -t
Restart Nginx to apply changes
$ sudo systemctl restart nginx
Using a web browser such as Firefox, visit your public server IP Address. Replace 192.0.2.1
with your actual server address
http://192.0.2.1
Verify that the application loads correctly with the Choose File
and Upload Video
buttons
Click the Choose File
button to browse a video file from your computer
When selected, click Upload Video
to start processing the video file
When the video-to-audio conversion is complete, your request redirects to the /result
page. Verify that you can play and download any of the generated audio files
To view the BullDashboard, navigate to the /admin/queue/videoJobQueue
path in your URL
http://SERVER-IP/admin/queue/videoJobQueue
Verify that all processed jobs and states display in the dashboard
To view the application status logs, run the following command in your SSH session
$ pm2 logs
Output:
0|index | Uploading: 1. Conclusion.mp4
0|index | job added to queue
0|index | Processing job: 2
0|index | job data is { video_path: '/videos/1.%20Conclusion.mp4' }
0|index | Job Initialized to Queue
0|index | audios is 1691771605201.mp3
0|index | decoded audio is 1691771605201.mp3
0|index | audios is 1691771984626.mp3
0|index | decoded audio is 1691771984626.mp3
0|index | Video successfully converted to audio.....
0|index | Job is successfully completed
0|index | Video successfully converted to audio.....
0|index | Job is successfully completed
In this article, you have developed a working application to handle asynchronous tasks with Node.js and BullMQ. By using BullMQ, you can make use of asynchronous programming in resource or time-intensive tasks. To change your interface design, edit the CSS file to add more style to the webpage elements. For more information about BullMQ, visit the official documentation.
To implement more solutions on your Vultr Cloud server, visit the following resources.