Author: Francis Ndungu
Last Updated: Wed, Dec 15, 2021An event in computer programming represents a change of state. Common examples include a subscriber submitting registration information to your application, a hardware sensor reporting a spike of temperature in a room, a request to validate a payment, a call to a customer service department, and more. When events occur in your application, you must track and analyze them as soon as they occur. This is called event processing.
In today's world, you'll encounter different situations where you must integrate event processing in your application. The main reason you need this technology is scalability and enhancing the user experience through real-time stream processing.
One of the most efficient ways for processing events is using the publish/subscribe model. In this software design architecture, you simply create a publishing script that channels events as they occur to different subscribers through a broker such as the Redis server.
For instance, if you expect thousands of signups in your online subscription service, you can use Redis to publish the information to a signups
channel. Then, under the hood, you can use several Redis subscribers (event processing scripts) to save the information to a MySQL database, send confirmation emails to customers, and process credit card payments.
In this guide, you'll learn how to implement the event processing model with Golang, Redis, and MySQL 8 on your Linux server.
To complete this tutorial, you require:
A Linux server.
A Redis server.
In this sample application, you'll capture customers' signup events as they occur in your application using a Redis server. However, to store data permanently, you'll need MySQL. So, SSH to your server and follow the steps below to create a database.
Log in to your MySQL server as root
.
$ sudo mysql -u root -p
Enter the root
password for your MySQL server and press ENTER to proceed when prompted. Then, issue the SQL commands below to set up a new sample_store
database and a sample_store_user
account. Replace EXAMPLE_PASSWORD
with a strong value.
mysql> CREATE DATABASE sample_store;
CREATE USER 'sample_store_user'@'localhost' IDENTIFIED WITH mysql_native_password BY 'EXAMPLE_PASSWORD';
GRANT ALL PRIVILEGES ON sample_store.* TO 'sample_store_user'@'localhost';
FLUSH PRIVILEGES;
Next, switch to the new sample_store
database.
mysql> USE sample_store;
Then, create a customers
table. This table stores customers' information including their customer_ids
, first_names
, and last_names
.
mysql> CREATE TABLE customers (
customer_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
first_name VARCHAR(50),
last_name VARCHAR(50),
email_address VARCHAR(255)
) ENGINE = InnoDB;
Populate the customers
table with sample data.
mysql> INSERT INTO customers (first_name, last_name, email_address) VALUES ('JOHN', 'DOE', 'john_doe@example.com');
INSERT INTO customers (first_name, last_name, email_address) VALUES ('JIM', 'JADE', 'jim_jade@example.com');
INSERT INTO customers (first_name, last_name, email_address) VALUES ('MARY', 'MARK', 'mary_mark@example.com');
Query the customers
table to ensure the records are in place.
mysql> SELECT
customer_id,
first_name,
last_name,
email_address
FROM customers;
Output.
+-------------+------------+-----------+-----------------------+
| customer_id | first_name | last_name | email_address |
+-------------+------------+-----------+-----------------------+
| 1 | JOHN | DOE | john_doe@example.com |
| 2 | JIM | JADE | jim_jade@example.com |
| 3 | MARY | MARK | mary_mark@example.com |
+-------------+------------+-----------+-----------------------+
3 rows in set (0.00 sec)
Create a packages
table. This table stores different packages that customers can choose when subscribing to your services in your sample company.
mysql> CREATE TABLE packages (
package_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
package_name VARCHAR(50),
monthly_rate DOUBLE
) ENGINE = InnoDB;
Populate the packages
table with sample data.
mysql> INSERT INTO packages (package_name, monthly_rate) VALUES ('BASIC PACKAGE', 5);
INSERT INTO packages (package_name, monthly_rate) VALUES ('ADVANCED PACKAGE', 15);
INSERT INTO packages (package_name, monthly_rate) VALUES ('PREMIUM PACKAGE', 50);
Query the packages
table to make sure you've populated it.
mysql> SELECT
package_id,
package_name,
monthly_rate
FROM packages;
Output.
+------------+------------------+--------------+
| package_id | package_name | monthly_rate |
+------------+------------------+--------------+
| 1 | BASIC PACKAGE | 5 |
| 2 | ADVANCED PACKAGE | 15 |
| 3 | PREMIUM PACKAGE | 50 |
+------------+------------------+--------------+
3 rows in set (0.00 sec)
Next, create a subscriptions
table. This table creates a many-to-many relationship between the customers
and packages
tables. In simple terms, it shows you the services that customers have subscribed to.
mysql> CREATE TABLE subscriptions (
subscription_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
customer_id DOUBLE,
package_id BIGINT,
subscription_date DATETIME
) ENGINE = InnoDB;
Don't populate the subscriptions
table for now. When customer events for subscribing to your services arrive in your application through an HTTP call, you'll capture the data in a Redis channel. Then, you will create an independent Redis script that subscribes to the channel and insert the data into the MySQL database. This will de-couple your system's events and the data processing logic to enhance scalability.
Log out from the MySQL server.
mysql> QUIT;
In this Golang event processing application, you'll need a frontend and a backend script. The frontend script acts as a web server and listens for incoming JSON signup requests. The script publishes the customers' signup information to a Redis signups
channel. On the other hand, the backend script subscribes to the signups
channel to listen and save events' details permanently to your database.
When completed, the directory structure for your applications will have the following levels.
project
--frontend
--main.go
--backend
--event_processor.go
Begin by creating a project
directory under your home directory.
$ mkdir ~/project
Next, navigate to the new project
directory.
$ cd ~/project
Make frontend
and backend
sub-directories under project
.
$ mkdir ~/project/frontend
$ mkdir ~/project/backend
You now have the correct directory structure for your event streaming project.
main.go
FileThe main.go
file is the main entry point for your application. This script runs the Golang inbuilt web server and accepts signup requests submitted as JSON payload.
To create the main.go
file, first navigate to the ~/project/frontend
directory.
$ cd ~/project/frontend
Then, use nano
to open a new main.go
file.
$ nano main.go
Enter the following information into the file.
package main
import (
"fmt"
"net/http"
"github.com/go-redis/redis"
"context"
"bytes"
)
func main() {
http.HandleFunc("/signup", signupHandler)
http.ListenAndServe(":8080", nil)
}
func signupHandler(w http.ResponseWriter, req *http.Request) {
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
ctx := context.TODO()
buf := new(bytes.Buffer)
buf.ReadFrom(req.Body)
reqBody := buf.String()
err := redisClient.Publish(ctx, "signups", reqBody).Err();
if err != nil {
fmt.Fprintf(w, err.Error() + "\r\n")
} else {
fmt.Fprintf(w, "Success\r\n")
}
}
Save and close the main.go
file when you're through with editing.
In the above file, you're accepting signup information via the /signup
resource. Then, you're redirecting the HTTP request to your signupHandler
function that connects to your Redis server on port 6379
. You're then using the statement err := redisClient.Publish(ctx, "signups", reqBody).Err();
to publish the signup information to a Redis channel that you've named signups
.
event_processor.go
FileThe main.go
file you've created in the previous step doesn't directly interact with your database. Instead, it simply creates an unprocessed event on the Redis server in a shared channel that you've named signups
.
To process this information, you'll create an event_processor.go
file that passes signup data to your database.
Navigate to the ~/project/backend
directory.
$ cd ~/project/backend
Next, open a new event_processor.go
file for editing purposes.
$ nano event_processor.go
Enter the following information into the event_processor.go
file.
package main
import (
"github.com/go-redis/redis"
_"github.com/go-sql-driver/mysql"
"database/sql"
"encoding/json"
"context"
"fmt"
"strings"
"time"
)
func main() {
ctx := context.TODO()
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
subscriber := redisClient.Subscribe(ctx, "signups")
for {
msg, err := subscriber.ReceiveMessage(ctx)
if err != nil {
fmt.Println(err.Error())
} else {
params := map[string]interface{}{}
err := json.NewDecoder(strings.NewReader(msg.Payload)).Decode(¶ms)
if err != nil {
fmt.Println(err.Error())
} else {
err = createSubscription(params)
if err != nil {
fmt.Println(err.Error())
} else {
fmt.Println("Processed subscription for customer # " + fmt.Sprint(params["customer_id"]) + "\r\n...")
}
}
}
}
}
func createSubscription (params map[string]interface{}) error {
db, err := sql.Open("mysql", "sample_store_user:EXAMPLE_PASSWORD@tcp(127.0.0.1:3306)/sample_store")
if err != nil {
return err
}
defer db.Close()
queryString := `insert into subscriptions (
customer_id,
package_id,
subscription_date
) values (
?,
?,
?
)`
stmt, err := db.Prepare(queryString)
if err != nil {
return err
}
defer stmt.Close()
dt := time.Now()
subscriptionDate := dt.Format("2006-01-02 15:04:05")
_, err = stmt.Exec(params["customer_id"], params["package_id"], subscriptionDate)
if err != nil {
return err
}
return nil
}
Save and close the file when you're through with editing.
In the above file, you have a main
function that opens a connection to your Redis server. Then, you're using the statement subscriber := redisClient.Subscribe(ctx, "signups")
to subscribe to the signups
channel. You're then using the Golang for {...}
blocking loop to listen for incoming events. Next, you redirect the customers' signup events to a createSubscription
function which saves data to your MySQL database under the subscriptions
table.
Your frontend and backend scripts are now ready to accept new signups.
Once you've finalized coding the frontend and backend scripts for your event processing application, you'll now run them to test the functionalities.
Before that, download the packages you've used in your scripts from GitHub.
$ go get github.com/go-sql-driver/mysql
$ go get github.com/go-redis/redis
Next, navigate to the backend directory and run the main.go
file. This command has a blocking function, and you should not run any other command on this SSH session.
$ cd ~/project/backend
$ go run ./
Open a second terminal window, navigate to the ~/project/frontend
directory, and run the event_processor.go
file. This causes your application to listen on port 8080
. Don't enter any other command on this terminal.
$ cd ~/project/frontend
$ go run ./
Next, open a third terminal window and run the following curl
commands to signup three customers.
$ curl -i -X POST localhost:8080/signup -H "Content-Type: application/json" -d '{"customer_id": 1, "package_id": 2}'
$ curl -i -X POST localhost:8080/signup -H "Content-Type: application/json" -d '{"customer_id": 2, "package_id": 3}'
$ curl -i -X POST localhost:8080/signup -H "Content-Type: application/json" -d '{"customer_id": 3, "package_id": 2}'
Output.
...
Success
After running the commands above, your backend scripts prints the following output to confirm everything is working as expected.
Output.
Processed subscription for customer # 1
...
Processed subscription for customer # 2
...
Processed subscription for customer # 3
...
Still on your third terminal window, log in to your MySQL database server to confirm if your scripts are successfully streaming and processing events via the Redis server.
$ sudo mysql -u root -p
Enter your root password and press ENTER to proceed. Then, switch to the sample_store
database.
mysql> USE sample_store;
Query the subscriptions
table to check if you can stream and process the new signup entries.
mysql> SELECT
customer_id,
package_id,
subscription_date
FROM subscriptions;
You should now get the following output which confirms your application is working as expected.
+-------------+------------+---------------------+
| customer_id | package_id | subscription_date |
+-------------+------------+---------------------+
| 1 | 2 | 2021-11-30 09:57:24 |
| 2 | 3 | 2021-11-30 09:58:22 |
| 3 | 2 | 2021-11-30 09:58:32 |
+-------------+------------+---------------------+
3 rows in set (0.00 sec)
In this guide, you've implemented event streaming with Golang, MySQL 8, and Redis server on your Linux machine. Use the knowledge in this guide to capture and process your system events as they occur, enhance scalability, and increase responsiveness for your complex multi-step applications.
Visit the following resources to read more Golang tutorials: