Author: Francis Ndungu
Last Updated: Wed, Jul 12, 2023InfluxDB is a time-series database that powers most IoT projects like home automation apps, AI security cameras, health applications, server monitoring systems, and more. Although InfluxDB is a powerful database, you can enhance its functionalities using Python modules. Python offers a great library and its popularity is rapidly growing due to its user-friendly data structures and code versatility.
This guide explains how to use the InfluxDB library with Python on a Ubuntu server. The library is a flexible self-contained driver that allows you to establish communication to an InfluxDB database using Python.
Before you begin:
Using SSH, access the server.
Create a non-root sudo user and switch to the user account.
Install InfluxDB and create an administrator account.
influxdb
Library and Create a Sample DatabaseUpdate the server.
$ sudo apt update
Install the pip
Python package manager.
$ sudo apt install -y python3-pip
Using pip
install the influxdb
library.
$ pip install influxdb
If the above command returns an error, use:
$ apt install -y python3-influxdb
Log in to the InfluxDB database console. Replace EXAMPLE_PASSWORD
with a strong password.
$ influx -username 'admin' -password 'EXAMPLE_PASSWORD'
Output:
Connected to http://localhost:8086 version 1.6.7~rc0
InfluxDB shell version: 1.6.7~rc0
Create a sample fitness_app_db
database.
> CREATE DATABASE fitness_app_db
Switch to the new fitness_app_db
database.
> USE fitness_app_db
Insert sample data into the fitness_app_db
database using the line protocol syntax.
> INSERT steps_counter,user=john_doe daily_steps=1879i
INSERT steps_counter,user=mary_doe daily_steps=2785i
INSERT steps_counter,user=jane_smith daily_steps=4563i
The above commands declare steps_counter
as the name of the measurement
. The user
tag uniquely identifies each user in the database. Then the daily_steps
field logs a user's total daily steps in the database. The trailing i
at the end of the daily_steps
field value instructs InfluxDB to insert values as integers.
Query the steps_counter
measurements to verify the data.
> SELECT "user", "daily_steps" FROM "steps_counter"
Output:
name: steps_counter
time user daily_steps
---- ---- -----------
1687426847395972699 john_doe 1879
1687426847414045751 mary_doe 2785
1687426847498567130 jane_smith 4563
Exit the InfluxDB database console.
> quit
In this section, create a separate project
directory to store your Python source code. Then, within the directory, create a custom InfluxDB database gateway module to communicate with the database you created earlier as described below.
Create a new project
directory for your application.
$ mkdir project
Switch to the new project
directory.
$ cd project
Using a text editor such as Nano
, open the influx_db_gateway.py
file.
$ nano influx_db_gateway.py
Add the following configurations to the file.
from influxdb import InfluxDBClient
class InfluxDbGateway:
def db_conn(self):
client = InfluxDBClient(
'localhost',
8086,
'admin',
'EXAMPLE_PASSWORD',
'fitness_app_db'
)
return client
def query_data(self):
inf_db_client = self.db_conn()
result = inf_db_client.query('SELECT "user", "daily_steps" FROM "steps_counter"')
return list(result)
def insert_data(self, data_points):
user = data_points['user']
daily_steps = data_points['daily_steps']
inf_db_client = self.db_conn()
inf_db_client.write_points([
{
"measurement": "steps_counter",
"tags": {"user": user},
"fields": {"daily_steps": daily_steps}
}
])
return ['success']
Replace EXAMPLE_PASSWORD
with the actual InfluxDB password you created earlier.
Below is what the configuration file does:
from influxdb import InfluxDBClient
declaration imports the InfluxDB library to the file.
class InfluxDbGateway:
initializes a new class with three methods as below:
def db_conn(self):
connects to the sample fitness_app_db
database.
def query_data(self):
queries the steps_counter
measurement to retrieve points using the line protocol syntax SELECT "user", "daily_steps" FROM "steps_counter"
and returns the data as a list (return list(result)
).
def insert_data(self, data_points):
accepts InfluxDB data points in JSON format and runs the inf_db_client.write_points()
function to write the points to the database. After writing the points, the method returns a success message return ['success']
.
Save and close the file.
In this section, you will combine the sample databae, and gateway module to create a startup file for the application as described below.
Create a new index.py
file.
$ nano index.py
Add the following configurations to the file.
import http.server
from http import HTTPStatus
import socketserver
import json
import influx_db_gateway
class HttpHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
self.send_response(HTTPStatus.OK)
self.send_header('Content-type', 'application/json')
self.end_headers()
db_gateway = influx_db_gateway.InfluxDbGateway()
result = db_gateway.query_data()
self.wfile.write(bytes(json.dumps(result, indent=2) + "\n", "utf8"))
def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
data_points = json.loads(post_data)
self.send_response(HTTPStatus.OK)
self.send_header('Content-type', 'application/json')
self.end_headers()
db_gateway = influx_db_gateway.InfluxDbGateway()
result = db_gateway.insert_data(data_points)
self.wfile.write(bytes(json.dumps(result, indent=2) + "\n", "utf8"))
httpd = socketserver.TCPServer(('', 8082), HttpHandler)
print("Web server is now running on port 8082...")
try:
httpd.serve_forever()
except KeyboardInterrupt:
httpd.server_close()
print("The HTTP server has stopped running.")
Below is what the configuration file does:
import...
: imports the Python HTTP modules http.server
, HTTPStatus
, socketserver
, JSON module json
, and the custom influx_db_gateway
module you created earlier.
HttpHandler(http.server.SimpleHTTPRequestHandler):
handles the following HTTP methods:
GET
: retrieves points from the database using the db_gateway = influx_db_gateway.InfluxDbGateway()
and result = db_gateway.query_data()
statements.
POST
: inserts new data points in the database using the db_gateway = influx_db_gateway.InfluxDbGateway()
and result = db_gateway.insert_data(data_points)
statements.
The following code returns the correct status codes and headers to HTTP clients.
...
self.send_response(HTTPStatus.OK)
self.send_header('Content-type', 'application/json')
self.end_headers()
The code block below starts a web server that accepts incoming HTTP requests on port 8082
.
...
httpd = socketserver.TCPServer(('', 8082), HttpHandler)
print("Web server is now running on port 8082...")
try:
httpd.serve_forever()
except KeyboardInterrupt:
httpd.server_close()
print("The HTTP server has stopped running.")
Save and close the file.
Run the application in the background.
$ python3 index.py
Output:
Web server is now running on port 8082...
Using curl
, retrieve all points from the InfluxDB database.
$ curl -X GET http://localhost:8082/
Output:
[
[
{
"time": "2023-06-23T09:19:20.963877Z",
"user": "john_doe",
"daily_steps": 1879
},
{
"time": "2023-06-23T09:19:20.978350Z",
"user": "mary_doe",
"daily_steps": 2785
},
{
"time": "2023-06-23T09:19:21.020994Z",
"user": "jane_smith",
"daily_steps": 4563
}
]
]
Run the following commands to write new data points to the database.
$ curl -X POST http://localhost:8082/ -H 'Content-Type: application/json' -d '{"user": "james_gates", "daily_steps": 2948}'
$ curl -X POST http://localhost:8082/ -H 'Content-Type: application/json' -d '{"user": "eva_dukes", "daily_steps": 2759}'
Output:
...
[
"success"
]
Fetch the application again to verify if the new data is added.
$ curl -X GET http://localhost:8082/
Output:
[
[
{
"time": "2023-06-23T09:19:20.963877Z",
"user": "john_doe",
"daily_steps": 1879
},
{
"time": "2023-06-23T09:19:20.978350Z",
"user": "mary_doe",
"daily_steps": 2785
},
{
"time": "2023-06-23T09:19:21.020994Z",
"user": "jane_smith",
"daily_steps": 4563
},
{
"time": "2023-06-23T09:24:13.425451Z",
"user": "james_gates",
"daily_steps": 2948
},
{
"time": "2023-06-23T09:24:24.832866Z",
"user": "eva_dukes",
"daily_steps": 2759
}
]
]
To stop the application. View its Job ID.
$ jobs
Output:
[1]+ Running python3 index.py &
Kill the job ID to stop the application.
$ kill %1
In this guide, you have used the InfluxDB database with Python on a Ubuntu 20.04 server. You created a sample InfluxDB database, queried, and submitted data via Python source code using the curl
utility.
For more information about InfluxDB, visit the following resources.