Author: Francis Ndungu
Last Updated: Fri, Jul 21, 2023In Redis, a job queue is an ordered list of tasks that run through a background scheduler. The Job queuing model improves the application's performance by providing a non-blocking user experience. For instance, consider a long-running application that converts images to PDF documents. The job queuing model allows users to submit their requests in a single call without waiting for the application to process the data. In the background, another sub-process (or worker) processes the jobs and informs the user when the task is complete.
This article explains how to implement a job queuing model with Redis and Node.js on a Ubuntu 22.04 server. The sample gaming application in this article records user scores to a MySQL database using the Redis LPUSH
and BLPOP
functions.
Before you begin:
Use SSH to access the server and Create a non-root sudo
user and install:
Create a new project
directory.
$ mkdir project
Switch to the new project
directory.
$ cd project
Log in to the MySQL server.
$ sudo mysql
Create a sample gaming_shop
database.
mysql> CREATE DATABASE gaming_shop;
Create a new gaming_shop_user
database user.
mysql> CREATE USER 'gaming_shop_user'@'localhost' IDENTIFIED WITH mysql_native_password BY 'EXAMPLE_PASSWORD';
Grant the user full privileges to the gaming_shop
database. Replace EXAMPLE_PASSWORD
with a strong password.
mysql> GRANT ALL PRIVILEGES ON gaming_shop.* TO 'gaming_shop_user'@'localhost';
Refresh MySQL privileges to apply the user rights.
mysql> FLUSH PRIVILEGES;
Switch to the gaming_shop
database.
mysql> USE gaming_shop;
Create a new scores
table.
mysql> CREATE TABLE user_scores (
score_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
username VARCHAR(20),
score INTEGER
) ENGINE = InnoDB;
In the above table, score_id
is a PRIMARY KEY
that uniquely identifies user scores in the table. AUTO_INCREMENT
instructs MySQL to assign new score_ids
to new records automatically. The created_at
column uses a MySQL TIMESTAMP
keyword to record the time when the application records a new entry. The username
column uniquely identifies the users, and score
records the number of points a user makes in a sample gaming application.
Exit the MySQL console.
mysql> QUIT;
mysql_db.js
FileIn this section, create a separate MySQL module that connects to the MYSQL database and runs a query to insert data in the user_scores
table as described in the steps below.
Using a text editor such as Nano
. Open a new mysql_db.js
file.
$ nano mysql_db.js
Add the following configurations to the file. Replace EXAMPLE_PASSWORD
with the gaming_shop_user
password you created earlier.
class mysql_db {
executeQuery(userData, callBack) {
const mysql = require('mysql');
const db_con = mysql.createConnection({
host: "localhost",
user: "gaming_shop_user",
password: "EXAMPLE_PASSWORD",
database: "gaming_shop"
});
db_con.connect(function(err) {
if (err) {
console.log(err.message);
}
});
var params = [];
params.push(userData["username"]);
params.push(userData["score"]);
var sql = "insert into user_scores (username, score) values (?, ?)"
db_con.query(sql, params, function (err, result) {
if (err) {
callBack(err, null);
} else {
callBack(null, "Success");
}
});
}
}
module.exports = mysql_db;
Save and close the file.
Below is what the file functions represent:
executeQuery(userData, callBack) {...}
in the mysql_db
class module accepts two arguments. The userData
argument accepts a JSON payload that contains a username
and a score
for a user. The callBack
argument takes a function that runs after the executeQuery(...)
function executes.
const mysql = require('mysql');
includes the MySQL driver for Node.js into the project. This adapter connects to the MySQL database from Node.js using the mysql.createConnection(...)
and db_con.connect(...)
functions.
db_con.query(sql, params,...)
submits user scores to the database by executing the insert into user_scores (username, score) values (?, ?)
SQL query.
index.js
FileIn order for the application to accept data through HTTP, create an HTTP server that runs on the local port 8080
. Through the port, the application receives incoming HTTP requests containing a JSON payload with user scores as described in this section.
Create a new index.js
file.
$ nano index.js
Add the following configurations to the file.
const redis = require('redis');
const http = require('http');
const hostname = 'localhost';
const port = 8080;
const server = http.createServer(httpHandler);
server.listen(port, hostname, () => {
console.log(`The HTTP Server is running at http://${hostname}:${port}/`);
});
function httpHandler(req, res) {
var json_payload = "";
req.on('data', function (data) {
json_payload += data;
});
req.on('end', function () {
var response = '';
if (req.method == "POST") {
const client = redis.createClient();
client.connect();
client.LPUSH("user_scores", JSON.stringify(json_payload));
response = "You've added a new job to the queue.\r\n";
} else {
response = "Method not allowed.\r\n";
}
res.write(response);
res.end();
});
}
Save and close the file.
Below is what the statements do:
const redis = require('redis');
imports the Redis driver for Node.js. This driver uses the const client = redis.createClient()
and client.connect()
functions to connect to the Redis server.
const http = require('http');
imports HTTP functions into the application and allows you to establish and run an HTTP server using the const server = http.createServer(httpHandler);
and server.listen(port, hostname, (...);
functions.
httpHandler(req, res)
creates a handler function for the HTTP server that processes the HTTP POST
method (if (req.method == "POST") {...}
).
client.LPUSH(...)
pushes a JSON payload containing user scores to the user_scores
Redis list. The LPUSH
command adds a new job to the queue.
The res.write(response);
statement writes a response to the calling HTTP client.
job_processor
FileIn a job queuing model, you must create a background scheduler that connects the Redis server to check for new jobs. The scheduler in this section dequeues the jobs from the list and saves them permanently to the MySQL database. Create the scheduler through the following steps.
Create a new job_processor.js
file.
$ nano job_processor.js
Add the following configurations to the file.
const mysql_db = require('./mysql_db.js');
const redis = require('redis');
const client = redis.createClient();
client.connect();
function callBack(err, result) {
if (err) {
console.log(err.message);
} else {
console.log("Data added to database. \r\n");
}
}
function process_queue() {
client.BLPOP('user_scores', 2).then((res) => {
if (res != null) {
var redisQueueData = JSON.parse(res['element']);
userData = JSON.parse(redisQueueData);
var dg = new mysql_db();
dg.executeQuery(userData, callBack);
}
process_queue();
});
}
process_queue();
Save and close the file.
Below is what the configuration does:
const mysql_db = require('./mysql_db.js');
imports and uses the mysql_db
module you created earlier.
const redis = require('redis');
imports the Redis server driver and establishes a new Redis connection using the ...redis.createClient()
, and client.connect();
functions.
The application processes callBack(err, result) {...}
after executing the dg.executeQuery(userData, callBack);
function.
Within the process_queue()
recursive function, the application uses the Redis BLPOP(...)
function to dequeue tasks from the user_scores
list. The BLPOP()
function removes and gets the first element from a list. After parsing the data using the JSON.parse(redisQueueData)
function, the application pushes data to the MySQL database using the var dg = new mysql_db();
and dg.executeQuery(userData, callBack);
functions.
After creating all the necessary Node.js source code files, test the application as described below.
Update the npm
package.
$ sudo npm install npm -g
Initialize the project.
$ npm init -yes
Install the redis
adapter for Node.js.
$ npm install redis
Install the mysql
adapter.
$ npm install mysql
Run the application's start-up file in the background.
$ node index.js &
Output:
The HTTP Server is running at http://localhost:8080/
Switch to the project directory.
$ cd project
Run the job_processor.js
file.
$ node job_processor.js
In a new terminal session, establish another SSH
connection to the server.
$ ssh user@SERVER-IP
Run the following curl
commands to queue sample jobs to the Redis server.
$ curl -X POST http://127.0.0.1:8080 -H 'Content-Type: application/json' -d '{"username":"john_doe","score":"429"}'
$ curl -X POST http://127.0.0.1:8080 -H 'Content-Type: application/json' -d '{"username":"mary_jane","score":"897"}'
$ curl -X POST http://127.0.0.1:8080 -H 'Content-Type: application/json' -d '{"username":"ian_job","score":"678"}'
Output:
...
You've added a new job to the queue.
In your main terminal session that runs the job_processor.js
file. Verify that the following output displays.
...
Data added to database.
Log in to the MySQL console.
$ sudo mysql
Switch to the gaming_shop
database.
mysql> USE gaming_shop;
View the user_scores
table data to verify that new data is added.
mysql> SELECT
score_id,
created_at,
username,
score
FROM user_scores;
If your output looks like the one below. The job-queuing model is working as expected.
+----------+---------------------+-----------+-------+
| score_id | created_at | username | score |
+----------+---------------------+-----------+-------+
| 1 | 2023-07-06 13:12:02 | john_doe | 429 |
| 2 | 2023-07-06 13:12:13 | mary_jane | 897 |
| 3 | 2023-07-06 13:12:22 | ian_job | 678 |
+----------+---------------------+-----------+-------+
3 rows in set (0.00 sec)
In this article, you implemented a Redis queueing model with Node.js on Ubuntu 20.04. You created a sample MySQL database, and set up Node.js modules that use the Redis LPUSH
and BLPOP
functions to queue, and process data from a Redis list.
For more Redis use cases, visit the following resources.