Author: Francis Ndungu
Last Updated: Tue, Dec 13, 2022A database transaction is a chain of SQL commands that fulfill a business logic. For example, in an e-commerce application, the SQL commands required to fill a customer's order may affect the sales_orders
, sales_order_products
, and sales_payments
tables. Database transactions address the principle of atomicity that state that a transaction should have an all or nothing effect in a database. If any of the SQL commands in the transaction fails, the database should delete (roll back) the entire transaction. PostgreSQL is one of the most popular database servers that support transactions to eliminate the possibility of partial database updates.
This guide shows you how to implement PostgreSQL transactions with psycopg2
, an advanced Python library used to connect to a PostgreSQL server.
To complete this guide:
Locate the Connection Details for the PostgreSQL database cluster, located under the Overview tab. This guide uses the following sample connection details:
username: vultradmin
password: EXAMPLE_POSTGRESQL_PASSWORD
host: SAMPLE_POSTGRESQL_DB_HOST_STRING.vultrdb.com
port: 16751
This sample database is the back end of a bank application that stores customers and their loan balances.
This application uses two tables to complete a database transaction. The customers
table stores the names of the customers. Then, the loans
table stores the customers' loan balances. Later, this guide shows you how to use the Linux curl
command to send sample transactions to the application. The application must complete transactions as a single unit of work to fulfill the business logic. Otherwise, the database should reject partially complete transactions.
To set up this sample application, you require the postgresql-client
package to connect to the managed PostgreSQL database cluster and create a database. Follow the steps below to install the package and initialize the database:
Update the server's package information index.
$ sudo apt update
Use the apt
tool to install the postgresql-client
package.
$ sudo apt install -y postgresql-client
Log in to your managed PostgreSQL database cluster. Replace SAMPLE_POSTGRESQL_DB_HOST_STRING.vultrdb.com
with the correct host
for your database.
$ psql -h SAMPLE_POSTGRESQL_DB_HOST_STRING.vultrdb.com -p 16751 -U vultradmin defaultdb
Output:
Password for user vultradmin:
Enter the password for your managed PostgreSQL database cluster and press ENTER to proceed.
Output:
defaultdb=>
Issue the following SQL command to create a sample bank_db
database.
defaultdb=> CREATE DATABASE bank_db;
Output:
CREATE DATABASE
Connect to the new bank_db
database.
defaultdb=> \c bank_db;
Output:
...
You are now connected to database "bank_db" as user "vultradmin".
Create a sample customers
table. This table stores the customer_ids
, first_names
, and last_names
. The SERIAL
keyword instructs the PostgreSQL server to generate new customer_ids
automatically.
bank_db=> CREATE TABLE customers (
customer_id SERIAL PRIMARY KEY,
first_name VARCHAR(50),
last_name VARCHAR(50)
);
Output:
CREATE TABLE
Create a loans
table. This table stores loan account balances held by customers. The customer_id
column in this table links back to the same column in the customers
table.
bank_db=> CREATE TABLE loans (
loan_id SERIAL PRIMARY KEY,
customer_id BIGINT,
amount DECIMAL(17, 4)
);
Output:
CREATE TABLE
Log out from the managed PostgreSQL database cluster.
bank_db=> \q
Follow the next step to create a database class to access your sample PostgreSQL database.
With your sample database in place, you now require a central class that connects to the database to store data in the tables. Follow the steps below to create the class:
Begin by creating a new project
directory to separate your source code from system files.
$ mkdir project
Navigate to the new project
directory.
$ cd project
Open a new postgresql_db.py
file on a text editor.
$ nano postgresql_db.py
Enter the following information into the postgresql_db.py
file. Remember to replace the database credentials (db_host
, db_user
, db_pass
, and db_port
) with the correct values for your PostgreSQL database cluster.
import psycopg2
class PostgresqlDb:
def __init__(self):
db_host = 'SAMPLE_POSTGRESQL_DB_HOST_STRING.vultrdb.com'
db_name = 'bank_db'
db_user = 'vultradmin'
db_pass = 'EXAMPLE_POSTGRESQL_PASSWORD'
db_port = 16751
self.db_conn = psycopg2.connect(host = db_host, database = db_name, user = db_user, password = db_pass, port = db_port)
def execute_db(self, json_payload):
try:
print("Starting new database transaction...")
self.db_conn.autocommit = False
self.cur = self.db_conn.cursor()
print("Inserting new customer to database...")
sql_string = 'insert into customers(first_name, last_name) values(%s, %s) RETURNING customer_id'
self.cur.execute(sql_string, (json_payload['first_name'], json_payload['last_name']))
customer_id = self.cur.fetchone()[0]
print("Customer successfully inserted to database, new customer_id is " + str(customer_id))
print("Inserting customer's loan record...")
sql_string = 'insert into loans(customer_id, amount) values(%s, %s) RETURNING loan_id'
self.cur.execute(sql_string, (customer_id, json_payload['loan_amount']))
loan_id = self.cur.fetchone()[0]
print("Customer loan record inserted successfully, new loan_id is " + str(loan_id))
self.db_conn.commit()
print("Database transaction completed successfully.")
return "Success"
except (Exception, psycopg2.DatabaseError) as error:
print("Database transaction failed, rolling back database changes...")
self.db_conn.rollback()
return str(error)
finally:
if self.db_conn:
self.cur.close()
self.db_conn.close()
print("Database connection closed successfully.")
Save and close the postgresql_db.py
file.
The postgresql_db.py
file explained:
The import psycopg2
statement loads the psycopg2
adapter that connects your PostgreSQL database cluster from the Python code.
The postgresql_db.py
file contains one PostgresqlDb
class with two methods.
import psycopg2
class PostgresqlDb:
def __init__(self):
...
def execute_db(self, json_payload):
...
The __init__(...)
method is a constructor that fires every time you create a new object from the PostgresqlDb
class.
The execute_db(self, json_payload)
method takes a JSON payload from an HTTP POST
method containing the customer's names and the loan balance and forwards the requests to the PostgreSQL database.
Under the execute_db(...)
method, you're setting the PostgreSQL autocommit
argument to False
. This directive allows you to use the commit()
command to permanently commit successful transactions or the rollback()
command to prevent partial transactions.
...
try:
print("Starting new database transaction...")
self.db_conn.autocommit = False
self.cur = self.db_conn.cursor()
...
The following code block only fires when there are no errors in the database transaction. Under the transaction, the application creates a new record in the customers
table and another record in a loans
table.
...
print("Inserting new customer to database...")
sql_string = 'insert into customers(first_name, last_name) values(%s, %s) RETURNING customer_id'
self.cur.execute(sql_string, (json_payload['first_name'], json_payload['last_name']))
customer_id = self.cur.fetchone()[0]
print("Customer successfully inserted to database, new customer_id is " + str(customer_id))
print("Inserting customer's loan record...")
sql_string = 'insert into loans(customer_id, amount) values(%s, %s) RETURNING loan_id'
self.cur.execute(sql_string, (customer_id, json_payload['loan_amount']))
loan_id = self.cur.fetchone()[0]
print("Customer loan record inserted successfully, new loan_id is " + str(loan_id))
self.db_conn.commit()
print("Database transaction completed successfully.")
return "Success"
...
The except(...)
block fires when a transaction fails with an exception. Then, the finally
block executes in every case to close the cursor and the database connection.
...
except (Exception, psycopg2.DatabaseError) as error:
print("Database transaction failed, rolling back database changes...")
self.db_conn.rollback()
return str(error)
finally:
if self.db_conn:
self.cur.close()
self.db_conn.close()
print("Database connection closed successfully.")
The PostgresqlDb
class is now ready. Use the following syntax to include it in other Python source code files.
import postgresql_db
pg = postgresql_db.PostgresqlDb()
resp = pg.execute_db(...)
Follow the next step to create the main.py
file for your Python application.
To complete this sample application, you need an HTTP server that accepts incoming POST
requests on port 8080
. Python has some inbuilt libraries that you can use to carry out the task. Follow the steps below to create the HTTP server:
Open a new main.py
file on a text editor.
$ nano main.py
Enter the following information into the main.py
file.
import http.server
from http import HTTPStatus
import socketserver
import json
import postgresql_db
class httpHandler(http.server.SimpleHTTPRequestHandler):
def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
json_payload = json.loads(post_data)
self.send_response(HTTPStatus.OK)
self.send_header('Content-type', 'application/json')
self.end_headers()
pg = postgresql_db.PostgresqlDb()
resp = pg.execute_db(json_payload)
self.wfile.write(bytes( resp + '\r\n', "utf8"))
httpServer = socketserver.TCPServer(('', 8080), httpHandler)
print("Web server started at port 8080")
try:
httpServer.serve_forever()
except KeyboardInterrupt:
httpServer.server_close()
print("The HTTP server is stopped.")
Save and close the main.py
file.
The main.py
file explained:
The import
section loads all the Python libraries required by the sample application. The http.server
, HTTPStatus
, and socketserver
libraries load HTTP functionalities. The json
module allows you to work with JSON data while the postgresql_db
loads your custom PostgreSQL database class.
import http.server
from http import HTTPStatus
import socketserver
import json
import postgresql_db
...
The httpHandler
is a handler class for the HTTP server. This class accepts a JSON payload from HTTP clients. Then under this class, the pg = postgresql_db.PostgresqlDb()
and pg.execute_db(json_payload)
statements call your custom PostgresqlDb
class to save data to the database and return a response using the self.wfile.write(bytes( resp + '\r\n', "utf8"))
statement.
...
class httpHandler(http.server.SimpleHTTPRequestHandler):
def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
json_payload = json.loads(post_data)
self.send_response(HTTPStatus.OK)
self.send_header('Content-type', 'application/json')
self.end_headers()
pg = postgresql_db.PostgresqlDb()
resp = pg.execute_db(json_payload)
self.wfile.write(bytes( resp + '\r\n', "utf8"))
...
The following declarations at the end of the file create a web server that listens for HTTP requests and dispatches the request to the httpHandler
class.
...
httpServer = socketserver.TCPServer(('', 8080), httpHandler)
print("Web server started at port 8080")
try:
httpServer.serve_forever()
except KeyboardInterrupt:
httpServer.server_close()
print("The HTTP server is stopped.")
You now have all the necessary source code files required by your application. Proceed to the next step to test the application.
After coding all the Python files, the final step is installing the Python pip
package, downloading the psycopg2
library, and testing the application. Follow the steps below to complete the application:
Install the Python pip
package.
$ sudo apt install -y python3-pip
Use the pip
package to install the psycopg2-binary
library for the PostgreSQL server.
$ pip install psycopg2-binary
Output:
...
Installing collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.5
Use the python3
command to run the application.
$ python3 main.py
Output:
Web server started at port 8080
Establish another SSH
connection to your server and run the following Linux curl
command to send a sample JSON payload to the application.
$ curl -X POST http://localhost:8080/ -H 'Content-Type: application/json' -d '{"first_name": "JOHN", "last_name": "DOE", "loan_amount": "4560"}'
Output:
"Success"
Check out the output below from the first terminal window where the web server is running. The transaction succeeds without any errors.
Web server started at port 8080
Starting new database transaction...
Inserting new customer to database...
Customer successfully inserted to database, new customer_id is 1
Inserting customer's loan record...
Customer loan record inserted successfully, new loan_id is 1
Database transaction completed successfully.
Database connection closed successfully.
Try sending the following invalid transaction with the wrong loan amount. That is PP
instead of a numeric value.
$ curl -X POST http://localhost:8080/ -H 'Content-Type: application/json' -d '{"first_name": "JOHN", "last_name": "DOE", "loan_amount": "PP"}'
Output:
"invalid input syntax for type numeric: \"PP\"..."
Examine the output from the first terminal window. This time, the transaction fails without making any changes to the database. Although the application inserts the customer's details into the database and obtains a new customer_id
(2
), the entire transaction rolls back per the following output.
..
Starting new database transaction...
Inserting new customer to database...
Customer successfully inserted to database, new customer_id is 2
Inserting customer's loan record...
Database transaction failed, rolling back database changes...
Database connection closed successfully.
To verify the changes, log in to your PostgreSQL database cluster.
$ psql -h SAMPLE_POSTGRESQL_DB_HOST_STRING.vultrdb.com -p 16751 -U vultradmin defaultdb
Output:
Password for user vultradmin:
Enter your password and press ENTER to proceed.
Output:
defaultdb=>
Switch to the bank_db
database.
defaultdb=> \c bank_db;
Output:
You are now connected to database "bank_db" as user "vultradmin".
Query the customers
table.
defaultdb=> SELECT
customer_id,
first_name,
last_name
FROM customers;
Output:
customer_id | first_name | last_name
-------------+------------+-----------
1 | JOHN | DOE
(1 row)
Query the loans
table.
defaultdb=> SELECT
loan_id,
customer_id,
amount
FROM loans;
Output:
loan_id | customer_id | amount
---------+-------------+-----------
1 | 1 | 4560.0000
(1 row)
The above outputs confirm that the application's logic is working as expected. Without the PostgreSQL transactions logic, you should now have an orphaned customer record without a matching loan record.
This guide shows you how to implement PostgreSQL database transactions with Python on Ubuntu 20.04 server. Use the source code in this guide to create applications that treat the unit of database work as a whole. Transactions ensure database consistency and prevent possible cases of orphaned records.
Check out the links below to learn more about the managed PostgreSQL database cluster: