Sentiment Analysis of Phone Reviews in PySpark with Large Language Models
Make LLM calls in Spark to analyze the sentiment of a review dataset.

Spark has made a huge impact on data engineering and analytics. Companies such as Databricks have grown into very large enterprises serving companies across industries to support their data engineering pipelines.
With the advent of large language models (LLMs) we have seen how easy it has become to perform many Natural Language Processing (NLP) tasks. Tasks like sentiment analysis, summarisation, topic classification, and translation, to name a few, have become trivial with LLMs compared to solutions in the pre-LLM era.
LLMs have also become adept at returning structured results in the form of JSON and obeying user instructions on how results must be returned in terms of format and structure. This, along with falling API costs to use foundation models, has created a situation where anyone can not only crunch large amounts of data using Sark, but can also enrich their data assets in many ways.
Consider an e-commerce marketplace that sells phones. Each phone in the catalog has been reviewed many times by customers. Structured data, such as the number of stars, is relatively simple to analyse compared to unstructured data, such as free-form text reviews.
Making LLM calls in Spark for sentiment analysis of user reviews is the subject of this post and something I've been experimenting with by using Codex.
The blog post should have really just been a video session where I record my screen as I prompt Codex to generate PySpark code, solve Java installation issues, submit PySpark code to run the analysis, and so on. But it would be best if you tried Codex yourself. This blog post is a set of notes and reflections based on my experience and a list of ingredients I used for the overall experience.
I always start my sessions with a new virtual environment in Python. Then I installed Python pyspark
which installed Spark version 4.0.1, which is the latest as of this writing.
The dataset I used is https://www.kaggle.com/datasets/grikomsn/amazon-cell-phones-reviews?resource=download
I then asked Codex to write a simple PySpark script for reading the dataset and doing some basic aggregations.
On submitting spark-submit analyze_reviews.py 20191226-reviews.csv
I was presented with a Java error.
Before I share how the Java error was solved by Codex, the code below was the first script I submitted to Spark locally. Give it a read, even though you may never (or most likely never) need to write the script by hand, I strongly encourage beginners to get a sense of what's going.
You can paste the code in ChatGPT/Claude/Gemini and ask them to explain it to you with added instructions on what you'd like to learn. You can use the Learning mode in ChatGPT or ask the LLM to explain the code in a Socratic manner to help you learn.
"""
PySpark analysis script for the 20191226-reviews.csv dataset.
Usage:
spark-submit analyze_reviews.py 20191226-reviews.csv
"""
from __future__ import annotations
import sys
from pathlib import Path
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import (
IntegerType,
StringType,
StructField,
StructType,
)
def build_spark(app_name: str = "ReviewsAnalysis") -> SparkSession:
"""Create a Spark session with sensible defaults for local runs."""
return (
SparkSession.builder.appName(app_name)
.master("local[*]")
.getOrCreate()
)
def load_reviews(spark: SparkSession, source: Path) -> DataFrame:
"""Load the reviews CSV with an explicit schema."""
schema = StructType(
[
StructField("asin", StringType(), True),
StructField("name", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("date", StringType(), True),
StructField("verified", StringType(), True),
StructField("title", StringType(), True),
StructField("body", StringType(), True),
StructField("helpfulVotes", IntegerType(), True),
]
)
df = (
spark.read
.option("header", True)
.schema(schema)
.csv(str(source))
)
# Normalize types that we will frequently query on.
return (
df.withColumn("is_verified", F.col("verified") == F.lit("true"))
.withColumn(
"review_date",
F.to_date(F.col("date"), "MMMM d, yyyy"),
)
)
def summarize(df: DataFrame) -> dict[str, DataFrame]:
"""Produce a set of summary DataFrames for further inspection."""
summaries: dict[str, DataFrame] = {}
summaries["schema"] = df.limit(0)
summaries["review_count"] = df.agg(F.count("*").alias("total_reviews"))
summaries["rating_distribution"] = (
df.groupBy("rating")
.agg(F.count("*").alias("count"))
.orderBy(F.desc("count"))
)
summaries["avg_rating_by_asin"] = (
df.groupBy("asin")
.agg(
F.count("*").alias("review_count"),
F.avg("rating").alias("avg_rating"),
F.sum("helpfulVotes").alias("total_helpful_votes"),
)
.orderBy(F.desc("review_count"))
)
summaries["avg_rating_by_verified"] = (
df.groupBy("is_verified")
.agg(
F.count("*").alias("review_count"),
F.avg("rating").alias("avg_rating"),
)
.orderBy(F.desc("review_count"))
)
summaries["top_helpful_reviews"] = (
df.orderBy(F.desc_nulls_last("helpfulVotes"))
.select("asin", "name", "rating", "helpfulVotes", "title")
.limit(10)
)
return summaries
def main(argv: list[str]) -> None:
if len(argv) != 2:
script = Path(argv[0]).name
print(f"Usage: spark-submit {script} <path-to-csv>", file=sys.stderr)
sys.exit(1)
source = Path(argv[1])
if not source.exists():
print(f"Error: file not found: {source}", file=sys.stderr)
sys.exit(2)
spark = build_spark()
try:
reviews = load_reviews(spark, source)
summaries = summarize(reviews)
print("=== Dataset schema ===")
reviews.printSchema()
print("\n=== Total reviews ===")
summaries["review_count"].show(truncate=False)
print("\n=== Rating distribution ===")
summaries["rating_distribution"].show(truncate=False)
print("\n=== Average rating by ASIN ===")
summaries["avg_rating_by_asin"].show(truncate=False)
print("\n=== Average rating by verified purchase flag ===")
summaries["avg_rating_by_verified"].show(truncate=False)
print("\n=== Top helpful reviews ===")
summaries["top_helpful_reviews"].show(truncate=False, vertical=False)
finally:
spark.stop()
if __name__ == "__main__":
main(sys.argv)
I pasted the Java error trace back into Codex, and it instructed me on what I should do on my Ubuntu setup to get Java working. The instructions involved installing Java and setting the JAVA_HOME variable in my bashrc
file. In the past, mucking around with these kinds of settings would have been a source of irritation for me.
I recently got a SER8 Beelink Mini PC and installed Ubuntu on it. The initial goal was to install DHH's Omarchy but decided to do that later and went with Ubuntu as the safe choice. I wanted this Mini PC to be a small form-factor desktop playground for Codex and me to try various experiments. I've always used MacBooks for work and personal tasks, but I recently gave away my personal MacBook and needed a personal computing device, and the SER8 felt like a worthy contender.
After successfully submitting the script to Spark and see the results light up my terminal, I decided to integrate calling GPT-5 for sentiment analysis.
I first created a .env
file with the OPENAI_API_KEY variable and pasted my API key in there. I then prompted Codex to use the Responses API documented in the Quickstart here https://platform.openai.com/docs/quickstart and shared the snippet of code from the webpage in my prompt. In retrospect, I need not have shared any code, but in the pas,t I've had the LLM (not Codex) use the completions API and wanted to ensure Codex did not make the same mistake.
After the script was generated and I submitted it to Spark, the responses were all null
values. Codex ensured only a sample of 10 rows was processed via the API. After sharing the results with Codex, it instructed me to check if the environment variables were being correctly recognized, and once this was taken care of, the script worked.
I then asked Codex to modify the script to find the top 100 reviews by length and only process those for sentiment analysis.
The script produced is shared below.
Sharing the script in this blog feels funny because you can literally ask an LLM to generate one yourself. But for those readers who are new to Spark, you may wish to copy the code into your favourite AI and ask it to explain what's going on. For experienced users, maybe save yourself the tokens and use this script to get started?!
"""
Enrich review data with sentiment labels fetched via an external API.
Example:
export OPENAI_API_KEY="sk-..."
export OPENAI_MODEL="gpt-4.1-mini"
spark-submit enrich_with_sentiment.py 20191226-reviews.csv --output top_reviews.tsv --top-k 100
"""
from __future__ import annotations
import argparse
import csv
import os
import sys
import time
from pathlib import Path
from typing import Optional
from dotenv import load_dotenv
from openai import OpenAI, OpenAIError
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, StringType, StructField, StructType
# Globals configured from CLI args/env so the UDF can access them on workers.
OPENAI_MODEL: str = "gpt-4.1-mini"
OPENAI_BASE_URL: Optional[str] = None
REQUEST_TIMEOUT: float = 10.0
REQUEST_SLEEP: float = 0.0
_OPENAI_CLIENT: Optional[OpenAI] = None
def build_spark(app_name: str, executor_env: dict[str, str]) -> SparkSession:
"""Create a Spark session for local development."""
builder = SparkSession.builder.appName(app_name).master("local[*]")
for key, value in executor_env.items():
builder = builder.config(f"spark.executorEnv.{key}", value)
return builder.getOrCreate()
def load_reviews(spark: SparkSession, source: Path) -> DataFrame:
"""Load the reviews CSV with an explicit schema."""
schema = StructType(
[
StructField("asin", StringType(), True),
StructField("name", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("date", StringType(), True),
StructField("verified", StringType(), True),
StructField("title", StringType(), True),
StructField("body", StringType(), True),
StructField("helpfulVotes", IntegerType(), True),
]
)
reviews = (
spark.read
.option("header", True)
.schema(schema)
.csv(str(source))
)
return (
reviews.withColumn("is_verified", F.col("verified") == F.lit("true"))
.withColumn(
"review_date",
F.to_date(F.col("date"), "MMMM d, yyyy"),
)
)
def get_openai_client() -> OpenAI:
"""Lazy-init OpenAI client for use inside UDF workers."""
global _OPENAI_CLIENT
if _OPENAI_CLIENT is None:
kwargs = {}
api_key = os.environ.get("OPENAI_API_KEY")
if api_key:
kwargs["api_key"] = api_key
base_url = os.environ.get("OPENAI_BASE_URL")
if base_url:
kwargs["base_url"] = base_url
_OPENAI_CLIENT = OpenAI(**kwargs)
return _OPENAI_CLIENT
def call_sentiment_api(review_text: Optional[str]) -> Optional[str]:
"""Invoke the OpenAI Responses API for a single review."""
if review_text is None or not review_text.strip():
return None
prompt = (
"Classify the following product review as positive, neutral, or negative. "
"Respond with exactly one word: positive, neutral, or negative.\n\n"
f"Review: {review_text}"
)
try:
client = get_openai_client()
response = client.responses.create(
model=OPENAI_MODEL,
input=prompt,
temperature=0,
max_output_tokens=16,
timeout=REQUEST_TIMEOUT,
)
label = response.output_text.strip().lower()
except OpenAIError:
return None
finally:
if REQUEST_SLEEP > 0:
# Simple throttle to avoid rate-limit penalties.
time.sleep(REQUEST_SLEEP)
allowed = {"positive", "neutral", "negative"}
if label in allowed:
return label
# Fallback: attempt to extract a known label if extra text is returned.
for candidate in allowed:
if candidate in label:
return candidate
return None
sentiment_label_udf = F.udf(call_sentiment_api, StringType())
def enrich_with_sentiment(reviews: DataFrame) -> DataFrame:
"""Append a sentiment label column by calling the external API."""
return reviews.withColumn(
"sentiment_label",
sentiment_label_udf(F.col("body")),
)
def select_top_reviews(reviews: DataFrame, limit: int) -> DataFrame:
"""Pick the longest reviews by `body` length to limit API calls."""
if limit <= 0:
return reviews
ranked = reviews.withColumn("body_length", F.length(F.col("body")))
return (
ranked.orderBy(F.desc_nulls_last("body_length"))
.limit(limit)
.drop("body_length")
)
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Enrich Amazon reviews with sentiment labels.")
parser.add_argument("source", help="Path to the reviews CSV file.")
parser.add_argument(
"--output",
help="Optional path to write the enriched dataset (supports Parquet/CSV/TSV based on extension).",
)
parser.add_argument(
"--top-k",
type=int,
default=100,
help="Number of longest reviews (by body length) to score and output.",
)
parser.add_argument(
"--throttle",
type=float,
default=float(os.environ.get("SENTIMENT_API_SLEEP", "0")),
help="Seconds to sleep after each API call to respect rate limits.",
)
parser.add_argument(
"--timeout",
type=float,
default=float(os.environ.get("SENTIMENT_API_TIMEOUT", "10")),
help="Request timeout in seconds for the OpenAI call.",
)
parser.add_argument(
"--show",
type=int,
default=5,
help="Number of enriched rows to display in the console.",
)
return parser.parse_args(argv)
def configure_api(throttle: float, timeout: float) -> dict[str, str]:
global OPENAI_MODEL, OPENAI_BASE_URL, REQUEST_TIMEOUT, REQUEST_SLEEP, _OPENAI_CLIENT
load_dotenv()
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("OPENAI_API_KEY is not set. Add it to your .env or environment before running Spark.")
OPENAI_MODEL = os.environ.get("OPENAI_MODEL", OPENAI_MODEL)
OPENAI_BASE_URL = os.environ.get("OPENAI_BASE_URL")
REQUEST_TIMEOUT = timeout
REQUEST_SLEEP = throttle
_OPENAI_CLIENT = None # Reset so workers pick up new env values.
executor_env = {
"OPENAI_API_KEY": api_key,
"OPENAI_MODEL": OPENAI_MODEL,
}
if OPENAI_BASE_URL:
executor_env["OPENAI_BASE_URL"] = OPENAI_BASE_URL
return executor_env
def write_output(df: DataFrame, destination: str) -> None:
path = Path(destination)
suffix = path.suffix.lower()
if suffix in {".parquet"}:
df.write.mode("overwrite").parquet(str(path))
elif suffix in {".csv"}:
df.write.mode("overwrite").option("header", True).csv(str(path))
elif suffix in {".tsv"}:
columns = df.columns
rows = df.collect()
with open(path, "w", newline="", encoding="utf-8") as handle:
writer = csv.writer(handle, delimiter="\t")
writer.writerow(columns)
for row in rows:
writer.writerow([row[col] for col in columns])
else:
raise ValueError(f"Unsupported output format for path: {destination}")
def main(argv: list[str]) -> None:
args = parse_args(argv)
source = Path(args.source)
if not source.exists():
print(f"Error: file not found: {source}", file=sys.stderr)
sys.exit(2)
executor_env = configure_api(throttle=args.throttle, timeout=args.timeout)
spark = build_spark(app_name="ReviewsSentimentEnrichment", executor_env=executor_env)
try:
reviews = load_reviews(spark, source)
longest_reviews = select_top_reviews(reviews, args.top_k)
# Consider using repartition/coalesce to control concurrency of API calls.
enriched = enrich_with_sentiment(longest_reviews)
if args.show > 0:
print(f"=== Sample sentiment output (showing {args.show}) ===")
enriched.select("asin", "rating", "sentiment_label", "title").show(args.show, truncate=False)
if args.output:
write_output(enriched, args.output)
print(f"Wrote enriched dataset to {args.output}")
finally:
spark.stop()
if __name__ == "__main__":
main(sys.argv[1:])
The script ran successfully, and I was given a TSV file with 100 of the lengthiest reviews analysed for their sentiment.
The next steps for me are to extract keywords related to specific areas of complaints in negative reviews and areas of appreciation in positive reviews. These keywords can be used as a way to catalog these reviews and to build dashboards and trend graphs.
Some of you may be wondering how you can use LLMs without needing to pay for OpenAI API costs. I think the OpenAI ecosystem of models is necessary for you to learn about and use for doing serious work with AI. Google is making some Gemini Models available for free, and you can get an API key at https://aistudio.google.com/api-keys
There are also options to run LLMs locally using solutions like Ollama and LMStudio but that's a discussion for another post.