NATS is an open-source, lightweight, and high-performance messaging system designed to build distributed and scalable applications. It provides publish-subscribe, queuing and request-reply messaging patterns that are often used in cloud-native and microservice implementations.
NATS offers publish-subscribe messaging in that a publisher sends a message on a subject and any active subscriber listening on the subject receives the message. This ensures that any message sent by a publisher reaches all registered subscribers.
In addition, NATS also provides queue-based messaging. This allows subscribers to register as part of a queue. Subscribers that are part of a queue form a queue group and only a single random queue group subscriber consumes a message each time it is received by the queue group.
In this guide, you will install a NATS cluster on a Vultr Kubernetes Engine (VKE) cluster using Helm. Then, you will deploy producer and consumer client applications to exchange messages using the NATS Go client within the cluster to verify how queue based messaging works with NATS.
Before you start:
Deploy a Vultr Kubernetes Engine (VKE) clusterwith at least 3
nodes
Deploy a OneClick Docker instance using the Vultr Marketplace Application to use the management server
Create a Vultr Container Registry instance to build and store private repositories
Using SSH, access the server as a non-root sudo user
Install and Configure Kubectl to access the cluster
Install the Helm CLI tool
$ sudo snap install helm --classic
Install Go
$ sudo apt install golang
Using Helm, add the NATS repository to your system
$ helm repo add nats https://nats-io.github.io/k8s/helm/charts/
Update the Helm repositories
$ help repo update
Install NATS to your cluster
$ helm install nats nats/nats
View your cluster pods filtered by the name nats
$ kubectl get pods -l=app.kubernetes.io/name=nats
Wait for the Pods to transition to Running
similar to the output below:
NAME READY STATUS RESTARTS AGE
nats-0 2/2 Running 0 25s
nats-box-7ffb855bbb-dhtvk 1/1 Running 0 25s
Switch to your user home directory
$ cd
Create a new NATS application directory nats-vke
$ mkdir nats-vke
Switch to the directory
$ cd nats-vke
Create a new consumer application directory nats-consumer
$ mkdir nats-consumer
Switch to the directory
$ cd nats-consumer
Create a new Go module nats-consumer
$ go mod init nats-consumer
Using a text editor such as nano
, create a new file consumer.go
$ nano consumer.go
Add the following contents to the file
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"github.com/nats-io/nats.go"
)
func main() {
natsServer := os.Getenv("NATS_SERVER")
if natsServer == "" {
log.Fatal("missing NATS_SERVER env variable")
}
subject := os.Getenv("NATS_SUBJECT")
if subject == "" {
log.Fatal("missing NATS_SUBJECT env variable")
}
queueGroup := os.Getenv("NATS_QUEUE_GROUP")
if queueGroup == "" {
log.Fatal("missing NATS_QUEUE_GROUP env variable")
}
nc, err := nats.Connect(natsServer)
if err != nil {
log.Fatalf("Error connecting to NATS: %v", err)
}
fmt.Println("successfully connected to", natsServer)
defer nc.Close()
_, err = nc.QueueSubscribe(subject, queueGroup, func(msg *nats.Msg) {
log.Printf("Received message on subject %s: %s", msg.Subject, string(msg.Data))
})
if err != nil {
log.Fatalf("Error subscribing to subject: %v", err)
}
log.Printf("Subscribed to subject %s within queue group %s", subject, queueGroup)
waitForSignal()
}
func waitForSignal() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("Received termination signal. Shutting down...")
}
Save and close the file.
Below is what the above application code does in order of execution:
Reads the required environment variables for the NATS server, subject and the queue group
Connects to the NATS server
Subscribes to the subject using a queue group
The message handler logs messages received from the subject to the console
Create a new file Dockerfile
$ nano Dockerfile
Add the following configurations to the file
FROM golang:1.18-buster AS build
WORKDIR /app
COPY go.mod ./
COPY go.sum ./
RUN go mod download
COPY consumer.go ./
RUN go build -o /nats-consumer-app
FROM gcr.io/distroless/base-debian10
WORKDIR /
COPY --from=build /nats-consumer-app /nats-consumer-app
EXPOSE 8080
USER nonroot:nonroot
ENTRYPOINT ["/nats-consumer-app"]
Save and close the file.
The above Dockerfile configuration uses a two-stage build process:
The first stage uses golang:1.18-buster
as the base image to build the NATS consumer program binary
The second stage uses gcr.io/distroless/base-debian10
as the base image and copies the binary produced by the first stage
Pull Go modules to create a new go.sum
file
$ go mod tidy
List files and verify your directory structure
$ ls
Output:
consumer.go Dockerfile go.mod go.sum
Login to your Vultr Container Registry account. Replace example
with your actual registry name
$ docker login https://sjc.vultrcr.com/example
When prompted, enter your Registry username and password
Build the nats-consumer-app
container image
$ docker build -t sjc.vultrcr.com/example/nats-consumer:latest .
Push the image to your registry
$ docker build -t sjc.vultrcr.com/example/nats-consumer:latest .
When successful, your output should look like the one below:
The push refers to repository [sjc.vultrcr.com/example/nats-consumer]
5adb57ca5a3c: Pushed
91f7bcfdfda8: Pushed
05ef21d76315: Pushed
latest: digest: sha256:1ee56100e7ba4274a8c33b4c49740bbd2f69e4f7f75461208b7d2854c07c63c5 size: 949
Create a new deployment manifest file consumer.yaml
$ nano consumer.yaml
Add the following contents to the file
apiVersion: apps/v1
kind: Deployment
metadata:
name: nats-consumer
spec:
replicas: 1
selector:
matchLabels:
app: nats-consumer
template:
metadata:
labels:
app: nats-consumer
spec:
containers:
- name: nats-consumer
image: sjc.vultrcr.com/example/nats-consumer:latest
imagePullPolicy: Always
env:
- name: NATS_SERVER
value: nats://nats:4222
- name: NATS_SUBJECT
value: vke-nats-demo-subject
- name: NATS_QUEUE_GROUP
value: vke-nats-demo-queue
Save and close the file
Deploy the consumer application to your cluster
$ kubectl apply -f consumer.yaml
View cluster pods with the name nats-consumer
$ kubectl get pods -l=app=nats-consumer
Verify that the nats-consumer
pod is available and running similar to the output below:
NAME READY STATUS RESTARTS AGE
nats-consumer-746f5ddf75-tzmxs 1/1 Running 0 12s
Navigate to the root NATS project directory nats-vke
$ cd /home/nats-vke/
Create a new directory nats-producer
$ mkdir nats-producer
Switch to the directory
$ cd nats-producer
Create a new Go module
$ go mod init nats-producer
Create a new file producer.go
$ nano producer.go
Add the following contents to the file
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/nats-io/nats.go"
)
func main() {
natsServer := os.Getenv("NATS_SERVER")
if natsServer == "" {
log.Fatal("missing NATS_SERVER env variable")
}
subject := os.Getenv("NATS_SUBJECT")
if subject == "" {
log.Fatal("missing NATS_SUBJECT env variable")
}
nc, err := nats.Connect(natsServer)
if err != nil {
log.Fatalf("Error connecting to NATS: %v", err)
}
fmt.Println("successfully connected to", natsServer)
defer nc.Close()
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
fmt.Println("\nReceived termination signal. Exiting...")
os.Exit(0)
}()
index := 0
for {
message := fmt.Sprintf("message-%d", index)
if err := nc.Publish(subject, []byte(message)); err != nil {
log.Printf("Error publishing message: %v", err)
} else {
log.Printf("Published message: %s", message)
}
index++
time.Sleep(3 * time.Second)
}
}
Save and close the file.
Below is what the above application code does:
Reads the required environment variables for the NATS server and subject
Connects to the NATS server
Publishes messages to the NATS server in an infinite loop, and waits for three seconds between each iteration
Gracefully in response to a SIGTERM
signal
Create a new Dockerfile
$ nano Dockerfile
Add the following contents to the file
FROM golang:1.18-buster AS build
WORKDIR /app
COPY go.mod ./
COPY go.sum ./
RUN go mod download
COPY producer.go ./
RUN go build -o /nats-producer-app
FROM gcr.io/distroless/base-debian10
WORKDIR /
COPY --from=build /nats-producer-app /nats-producer-app
EXPOSE 8080
USER nonroot:nonroot
ENTRYPOINT ["/nats-producer-app"]
Save and close the file.
The above Dockerfile configuration applies the two-stage build process below:
The first stage uses golang:1.18-buster
as the base image to build the NATS producer program binary
The second stage uses gcr.io/distroless/base-debian10
as the base image and copies the binary produced by the first stage
Pull Go modules to create a new go.sum
file
$ go mod tidy
Build the container image with to include all directory files
$ docker build -t sjc.vultrcr.com/example/nats-producer-app:latest .
Push the image to your Vultr Container Registry. Replace example
with your actual registry name
$ docker push sjc.vultrcr.com/example/nats-producer-app
Create a new file producer.yaml
$ nano producer.yaml
Add the following contents to the file. Replace sjc.vultrcr.com/example/nats-consumer
with your actual Vultr Container Registry URL
apiVersion: apps/v1
kind: Deployment
metadata:
name: nats-producer
spec:
replicas: 1
selector:
matchLabels:
app: nats-producer
template:
metadata:
labels:
app: nats-producer
spec:
containers:
- name: nats-producer
image: example-user/nats-producer-app
imagePullPolicy: Always
env:
- name: NATS_SERVER
value: nats://nats:4222
- name: NATS_SUBJECT
value: vke-nats-demo-subject
Save and close the file.
Deploy the producer application to your cluster
$ kubectl apply -f producer.yaml
Verify that the deployment is successful
$ kubectl get deployments
Output:
NAME READY UP-TO-DATE AVAILABLE AGE
nats-box 1/1 1 1 6h28m
nats-consumer 2/2 2 2 6h15m
nats-consumer2 1/1 1 1 34m
nats-consumer3 0/1 1 0 33m
nats-producer 1/1 1 1 6h5m
View cluster pods with the name nats-producer
$ kubectl get pods -l=app=nats-producer
Output:
NAME READY STATUS RESTARTS AGE
nats-producer-842f5eef42-dfgz 1/1 Running 0 20s
To verify that you have correctly deployed NATS in your VKE cluster, test the application perfomance. Monitor the nats-producer
and nats-consumer
application logs to view the ongoing cluster operations as described below.
View the NATS producer application logs
$ kubectl logs -f $(kubectl get pod -l=app=nats-producer -o jsonpath='{.items[0].metadata.name}')
Monitor the Published Message
operations similar to the output below:
Published message: message-10
Published message: message-11
Published message: message-12
View hhe NATS consumer application logs
$ kubectl logs -f $(kubectl get pod -l=app=nats-consumer -o jsonpath='{.items[0].metadata.name}')
Verify the Received Message
log operations
Received message on subject vke-nats-demo-subject: message-10
Received message on subject vke-nats-demo-subject: message-11
Received message on subject vke-nats-demo-subject: message-12
To implement load-balancing with multiple pods, scale up the NATS consumer application to 2
replicas
$ kubectl scale deployment/nats-consumer --replicas=2
View the NATS consumer pods to verify the new replica
$ kubectl get pods -l=app=nats-consumer
Output:
NAME READY STATUS RESTARTS AGE
nats-consumer-6fb9d66968-bclj7 1/1 Running 0 6h19m
nats-consumer-6fb9d66968-cgr95 1/1 Running 0 6h33m
The NATS consumer application performs load balancing across pods with sequence IDs depending on the deployment time. For example, the first pod is assigned the ID 0
and the second pod 1
View the NATS consumer replica application ID 0
$ kubectl logs -f $(kubectl get pod -l=app=nats-consumer -o jsonpath='{.items[1].metadata.name}')
Output:
Received message on subject vke-nats-demo-subject: message-17
Received message on subject vke-nats-demo-subject: message-20
Received message on subject vke-nats-demo-subject: message-23
View the replica application ID 1
$ kubectl logs -f $(kubectl get pod -l=app=nats-consumer -o jsonpath='{.items[1].metadata.name}')
Output:
Received message on subject vke-nats-demo-subject: message-18
Received message on subject vke-nats-demo-subject: message-19
Received message on subject vke-nats-demo-subject: message-21
As displayed in the log output, the NATS application messages are load-balanced between the two consumer instances. NATS sends messages to each instance randomly cecause they are in the same queue group. This way, it's possible to distribute data processing load among multiple consumer instances and scale the application horizontally within the cluster.
You have deployed NATS on a Vultr Kubernetes Engine (VKE) cluster and tested cluster operations using a producer application that sends data to a NATS subject. To balance user traffic and the cluster load, you load-balanced the processing across multiple NATS consumer instances using a queue based messaging pattern. For more information on how to use NATS, visit the official documentation.
For more information on how to interact with VKE cluster services, visit the following resources: