Leveraging Cloudflare D1: A Comprehensive Guide to Managing Your SQLite Database

Cloudflare D1 is a serverless, SQLite-compatible database designed to seamlessly integrate with your applications. Whether you're building a simple web app or a complex data-driven platform, D1 offers scalability, reliability, and ease of use. In this blog post, we'll walk through how to set up a Cloudflare D1 database, create tables, and insert data from a CSV file using Python's requests library. We'll use a dataset from the Google Play Store as our example.

Prerequisites

Before we begin, ensure you have the following:

  • A Cloudflare account. If you don't have one, you can sign up here.
  • Python installed on your machine.
  • The requests library installed. You can install it using pip pip install requests pandas

Setting Up the Environment

First, let's set up our Python environment by importing the necessary libraries and defining our Cloudflare account credentials. Note: For security reasons, always keep your ACCOUNT_ID and AUTH_TOKEN confidential.

In this guide, we'll mask them with *****.

import os
import re
import requests
import pandas as pd

# Cloudflare credentials (masked for security)
ACCOUNT_ID = "*****"
AUTH_TOKEN = "*****"


Next, we'll download and prepare our dataset. In this example, we'll use a Google Play Store dataset.

# Download the dataset
!curl -L -o /content/dataset.zip https://www.kaggle.com/api/v1/datasets/download/lava18/google-play-store-apps

# Unzip the dataset
!unzip dataset.zip

# Load the CSV into a Pandas DataFrame
df = pd.read_csv("/content/googleplaystore.csv")

# Clean column names
df.columns = [re.sub("\s+", "", k).lower() for k in df.columns]

# Display the first few rows
df.head()

# Add an 'id' column
df['id'] = range(1, len(df) + 1)

Creating a Cloudflare D1 Database

To interact with Cloudflare D1, we'll use the Cloudflare API.

Let's start by creating a new D1 database.

# Define the API endpoint for creating a D1 database
API_BASE_URL = f"https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/d1/database"

# Set up the headers with authorization
headers = {
    "Authorization": f"Bearer {AUTH_TOKEN}",
    "Content-Type": "application/json"
}

# Define the payload with the desired database name
data = {
    "name": "google_play_store_db"
}

# Send a POST request to create the database
response = requests.post(API_BASE_URL, headers=headers, json=data)

# Check the response
print(response.json())

Response Example:

{
    "success": true,
    "errors": [],
    "messages": [],
    "result": {
        "id": "your-d1-db-uuid",
        "name": "google_play_store_db",
        "created_on": "2024-04-27T12:34:56Z"
    }
}

Note: Replace "your-d1-db-uuid" with the actual UUID returned in the response. We'll use this UUID in subsequent API calls.

Creating Tables in D1

With our database created, the next step is to define the schema by creating tables. We'll generate a CREATE TABLE SQL statement based on our DataFrame's structure.

# Function to generate CREATE TABLE SQL query from DataFrame
def create_table_sql(df, table_name):
    """Generates a CREATE TABLE SQL query for a Pandas DataFrame."""
    cols = []
    for column in df.columns:
        dtype = str(df[column].dtype)
        column = re.sub("\s+", "_", column.lower())
        if dtype == "object":
            cols.append(f"  {column} TEXT")
        elif dtype == "int64":
            cols.append(f"  {column} INTEGER")
        elif dtype == "float64":
            cols.append(f"  {column} REAL")
        else:
            cols.append(f"  {column} TEXT")  # Default to TEXT for unknown types
    sql_query = f"CREATE TABLE {table_name} (\n" + ",\n".join(cols) + "\n);"
    return sql_query

# Generate the CREATE TABLE statement
table_name = "google_play_store"
create_table_query = create_table_sql(df, table_name)

# Optionally, drop the table if it exists (use with caution)
create_table_query = "DROP TABLE IF EXISTS google_play_store;" + create_table_query
print(create_table_query)

Output:

DROP TABLE IF EXISTS google_play_store;
CREATE TABLE google_play_store (
  app TEXT,
  category TEXT,
  rating REAL,
  reviews TEXT,
  size TEXT,
  installs TEXT,
  type TEXT,
  price TEXT,
  contentrating TEXT,
  genres TEXT,
  lastupdated TEXT,
  currentver TEXT,
  androidver TEXT,
  id INTEGER
);

Now, let's execute this SQL statement using the D1 API.

# D1 Database UUID obtained from the creation response
D1_DB_UUID = "*****"  # Replace with your actual D1_DB_UUID

# Endpoint for executing SQL queries
query_url = f"https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/d1/database/{D1_DB_UUID}/query"

# Define headers
headers = {
    "Authorization": f"Bearer {AUTH_TOKEN}",
    "Content-Type": "application/json"
}

# Payload with the CREATE TABLE SQL
data = {
    "sql": create_table_query
}

# Send the POST request to create the table
response = requests.post(query_url, headers=headers, json=data)

# Check the response
print(response.status_code)
print(response.json())

Response Example:

{
    "success": true,
    "errors": [],
    "messages": [],
    "result": {}
}

Inserting Data into D1

With the table created, we can now insert data from our DataFrame into the D1 database. We'll sanitize the data to prevent SQL injection and handle special characters.

Sanitizing Data

Before inserting data, it's crucial to sanitize the inputs:

# Function to sanitize string values for SQL queries
def sanitize_for_sql(value):
    """Sanitizes a value for use in an SQLite insert query."""
    if isinstance(value, str):
        value = value.replace('\n', ' ').replace("'", "''")  # Escape single quotes
    return value

# Apply sanitization to each column in the DataFrame
for column in df.columns:
    df[column] = df[column].apply(sanitize_for_sql)

Batch Insertion

Inserting large datasets can be time-consuming and may lead to rate limiting. We'll insert data in batches to optimize performance.

# Define the batch size
batch_size = 1000

# Function to generate INSERT statements
def generate_insert_sql(batch):
    """Generates INSERT SQL statements for a batch of rows."""
    sql_statements = []
    for _, row in batch.iterrows():
        sql = (
            f"INSERT INTO google_play_store (app, category, rating, reviews, size, installs, type, price, "
            f"contentrating, genres, lastupdated, currentver, androidver, id) VALUES ("
            f"'{row['app']}', '{row['category']}', {row['rating']}, '{row['reviews']}', '{row['size']}', "
            f"'{row['installs']}', '{row['type']}', '{row['price']}', '{row['contentrating']}', "
            f"'{row['genres']}', '{row['lastupdated']}', '{row['currentver']}', '{row['androidver']}', {row['id']})"
        )
        sql_statements.append(sql)
    return "; ".join(sql_statements) + ";"

# Insert data in batches
for start in range(0, len(df), batch_size):
    # Get the current batch
    batch = df.iloc[start:start + batch_size]
    
    # Generate the INSERT SQL
    insert_sql = generate_insert_sql(batch)
    
    # Prepare the payload
    data = {
        "sql": insert_sql
    }
    
    # Send the POST request to insert the batch
    response = requests.post(query_url, headers=headers, json=data)
    
    # Check the response
    if response.status_code == 200 and response.json().get("success"):
        print(f"Successfully inserted batch starting at index {start}")
    else:
        print(f"Failed to insert batch starting at index {start}")
        print(response.status_code, response.json())

Notes:

  • Batch Size: Adjust the batch_size based on your data volume and API rate limits.
  • Error Handling: Always check the response to handle any insertion errors gracefully.

Querying Data from D1

After inserting the data, you might want to retrieve and analyze it. Here's how to execute a SELECT query.

Example Output:

# Define the SELECT SQL query
select_query = "SELECT * FROM google_play_store WHERE app LIKE 'Photo%' LIMIT 100;"

# Prepare the payload
data = {
    "sql": select_query
}

# Send the POST request to execute the SELECT query
response = requests.post(query_url, headers=headers, json=data)

# Check the response and process the results
if response.status_code == 200 and response.json().get("success"):
    results = response.json().get("result", [])
    print("Data retrieved successfully:")
    for row in results:
        print(row)
else:
    print("Failed to retrieve data")
    print(response.status_code, response.json())

You can further process the retrieved data as needed, such as converting it back to a Pandas DataFrame for analysis.

# Convert results to a DataFrame
results_df = pd.DataFrame(results)
print(results_df.head())

Conclusion

Cloudflare D1 offers a powerful and flexible solution for managing SQLite databases in a serverless environment.

By leveraging the Cloudflare API and Python's requests library, you can effortlessly create databases, define schemas, and manipulate data. Whether you're building scalable applications or conducting data analysis, D1 provides the tools you need to succeed.