Real-Time Sentiment Analysis ML Pipeline
Project Overview
Built an end-to-end ML pipeline for real-time sentiment analysis at scale, processing 50M+ social media posts daily across multiple platforms to provide brand monitoring and market insights.
Business Problem
A market intelligence company needed to:
- Monitor brand sentiment across Twitter, Reddit, News
- Detect emerging trends and crises in real-time
- Provide sentiment analytics to enterprise clients
- Support 100+ brands with sub-minute latency
Challenges:
- Volume: 50M+ posts/day (600+ QPS)
- Velocity: Real-time processing required
- Variety: Multiple platforms, languages, formats
- Accuracy: 85%+ sentiment classification required
- Scale: Cost-effective infrastructure
Architecture
System Design
┌──────────┐ ┌─────────┐ ┌──────────┐ ┌──────────┐
│ Data │────▶│ Kafka │────▶│ Spark │────▶│ Storage │
│ Ingestion│ │ Streams │ │ Processing│ │ & API │
└──────────┘ └─────────┘ └──────────┘ └──────────┘
│ │ │
│ │ │
┌────▼────┐ ┌───▼────┐ ┌───▼────┐
│ Schema │ │ ML │ │ Time │
│Registry │ │ Models │ │ Series │
└─────────┘ └────────┘ └────────┘
Technology Stack
- Streaming: Apache Kafka, Kafka Streams
- Processing: Apache Spark (PySpark), Pandas
- ML: TensorFlow, Hugging Face Transformers, Scikit-learn
- Orchestration: Apache Airflow
- Storage: PostgreSQL, TimescaleDB, Redis
- Infrastructure: Docker, Kubernetes, AWS
- Monitoring: Prometheus, Grafana, MLflow
Implementation
1. Data Ingestion Layer
Multi-Source Connectors:
from kafka import KafkaProducer
import json
from typing import Dict
class SocialMediaConnector:
def __init__(self, kafka_bootstrap_servers: str):
self.producer = KafkaProducer(
bootstrap_servers=kafka_bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
compression_type='gzip'
)
def ingest_twitter_stream(self):
"""Ingest Twitter real-time stream"""
import tweepy
class TwitterListener(tweepy.StreamListener):
def on_status(self, status):
post = self.transform_tweet(status)
self.producer.send('social-media-raw', post)
stream = tweepy.Stream(
auth=twitter_auth,
listener=TwitterListener()
)
stream.filter(track=self.tracked_keywords)
def transform_tweet(self, status) -> Dict:
"""Normalize tweet format"""
return {
"id": status.id_str,
"platform": "twitter",
"text": status.full_text,
"author": status.user.screen_name,
"created_at": status.created_at.isoformat(),
"metrics": {
"likes": status.favorite_count,
"retweets": status.retweet_count,
"followers": status.user.followers_count
},
"metadata": {
"language": status.lang,
"location": status.user.location
}
}
2. Stream Processing
Kafka Streams for Real-Time Processing:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, udf
from pyspark.sql.types import StructType, StringType, FloatType
class StreamProcessor:
def __init__(self):
self.spark = SparkSession.builder \
.appName("SentimentAnalysis") \
.config("spark.jars.packages",
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
.getOrCreate()
self.model = self.load_sentiment_model()
def process_stream(self):
"""Process Kafka stream with Spark"""
# Read from Kafka
stream_df = self.spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "social-media-raw") \
.option("startingOffsets", "latest") \
.load()
# Parse JSON
schema = StructType() \
.add("id", StringType()) \
.add("platform", StringType()) \
.add("text", StringType()) \
.add("author", StringType()) \
.add("created_at", StringType())
parsed_df = stream_df \
.select(from_json(col("value").cast("string"), schema).alias("data")) \
.select("data.*")
# Apply sentiment analysis
sentiment_udf = udf(self.predict_sentiment, FloatType())
result_df = parsed_df \
.withColumn("sentiment_score", sentiment_udf(col("text"))) \
.withColumn("sentiment_label", self.classify_sentiment(col("sentiment_score")))
# Write to sink
query = result_df \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "sentiment-results") \
.option("checkpointLocation", "/checkpoints/sentiment") \
.start()
query.awaitTermination()
def predict_sentiment(self, text: str) -> float:
"""Predict sentiment score (-1 to 1)"""
prediction = self.model.predict(text)
return float(prediction)
3. ML Model Development
Custom Sentiment Transformer:
import tensorflow as tf
from transformers import TFAutoModel, AutoTokenizer
import numpy as np
class SentimentTransformer:
def __init__(self, base_model="distilbert-base-uncased"):
self.tokenizer = AutoTokenizer.from_pretrained(base_model)
self.model = self.build_model(base_model)
def build_model(self, base_model):
"""Build custom sentiment model"""
# Load pre-trained transformer
transformer = TFAutoModel.from_pretrained(base_model)
# Input layers
input_ids = tf.keras.Input(
shape=(128,),
dtype=tf.int32,
name="input_ids"
)
attention_mask = tf.keras.Input(
shape=(128,),
dtype=tf.int32,
name="attention_mask"
)
# Transformer outputs
outputs = transformer(
input_ids=input_ids,
attention_mask=attention_mask
)
# Classification head
x = outputs.last_hidden_state[:, 0, :] # CLS token
x = tf.keras.layers.Dropout(0.3)(x)
x = tf.keras.layers.Dense(256, activation='relu')(x)
x = tf.keras.layers.Dropout(0.2)(x)
# Multi-task outputs
sentiment_score = tf.keras.layers.Dense(
1, activation='tanh', name='sentiment_score'
)(x)
sentiment_class = tf.keras.layers.Dense(
3, activation='softmax', name='sentiment_class'
)(x) # positive, neutral, negative
model = tf.keras.Model(
inputs=[input_ids, attention_mask],
outputs=[sentiment_score, sentiment_class]
)
return model
def train(self, train_data, val_data, epochs=5):
"""Train sentiment model"""
self.model.compile(
optimizer=tf.keras.optimizers.Adam(learning_rate=2e-5),
loss={
'sentiment_score': 'mse',
'sentiment_class': 'categorical_crossentropy'
},
metrics={
'sentiment_score': 'mae',
'sentiment_class': 'accuracy'
}
)
# Callbacks
callbacks = [
tf.keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=3,
restore_best_weights=True
),
tf.keras.callbacks.ModelCheckpoint(
'best_model.h5',
save_best_only=True
),
tf.keras.callbacks.TensorBoard(log_dir='./logs')
]
history = self.model.fit(
train_data,
validation_data=val_data,
epochs=epochs,
callbacks=callbacks
)
return history
def predict(self, texts: list) -> np.ndarray:
"""Batch prediction"""
encodings = self.tokenizer(
texts,
padding=True,
truncation=True,
max_length=128,
return_tensors='tf'
)
scores, classes = self.model.predict({
'input_ids': encodings['input_ids'],
'attention_mask': encodings['attention_mask']
})
return scores, classes
4. Model Serving
TensorFlow Serving with Load Balancing:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import tensorflow as tf
import numpy as np
from typing import List
app = FastAPI()
# Load model
model = tf.saved_model.load('/models/sentiment-v1.2')
tokenizer = load_tokenizer()
class SentimentRequest(BaseModel):
texts: List[str]
batch_size: int = 32
class SentimentResponse(BaseModel):
scores: List[float]
labels: List[str]
confidence: List[float]
@app.post("/predict", response_model=SentimentResponse)
async def predict_sentiment(request: SentimentRequest):
"""Batch sentiment prediction"""
try:
# Tokenize
inputs = tokenizer(
request.texts,
padding=True,
truncation=True,
return_tensors='tf'
)
# Predict
scores, classes = model(inputs)
# Process results
labels = ['negative', 'neutral', 'positive']
results = {
"scores": scores.numpy().flatten().tolist(),
"labels": [labels[np.argmax(c)] for c in classes.numpy()],
"confidence": [float(np.max(c)) for c in classes.numpy()]
}
return SentimentResponse(**results)
except Exception as e:
raise HTTPException(500, f"Prediction failed: {str(e)}")
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "model_version": "1.2"}
5. Workflow Orchestration
Airflow DAGs for Model Training:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'ml-team',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'sentiment_model_training',
default_args=default_args,
description='Retrain sentiment model weekly',
schedule_interval='0 2 * * 0', # Sunday 2AM
start_date=datetime(2024, 1, 1),
catchup=False
)
def extract_training_data():
"""Extract labeled data from last week"""
db = connect_to_db()
query = """
SELECT text, sentiment_label, human_verified
FROM social_posts
WHERE created_at > NOW() - INTERVAL '7 days'
AND human_verified = true
"""
data = db.execute(query).fetchall()
save_to_s3(data, 'training-data-latest.parquet')
def train_model():
"""Train new model version"""
data = load_from_s3('training-data-latest.parquet')
model = SentimentTransformer()
history = model.train(data['train'], data['val'])
# Save model
model.save('models/sentiment-latest')
# Log metrics to MLflow
mlflow.log_metrics({
'train_accuracy': history.history['accuracy'][-1],
'val_accuracy': history.history['val_accuracy'][-1]
})
def evaluate_model():
"""Evaluate on test set"""
test_data = load_test_data()
model = load_model('models/sentiment-latest')
metrics = model.evaluate(test_data)
# Compare with production model
prod_model = load_model('models/production')
prod_metrics = prod_model.evaluate(test_data)
if metrics['accuracy'] > prod_metrics['accuracy']:
promote_to_production()
# Define tasks
extract = PythonOperator(
task_id='extract_data',
python_callable=extract_training_data,
dag=dag
)
train = PythonOperator(
task_id='train_model',
python_callable=train_model,
dag=dag
)
evaluate = PythonOperator(
task_id='evaluate_model',
python_callable=evaluate_model,
dag=dag
)
# Task dependencies
extract >> train >> evaluate
Results & Impact
Performance Metrics
Scale:
- Processing: 50M+ posts/day (600+ QPS sustained)
- Latency: <500ms end-to-end (p95)
- Throughput: 1000+ predictions/second per instance
- Availability: 99.95% uptime
ML Accuracy:
- Sentiment classification: 87.3% accuracy
- Multi-class F1 score: 0.85
- False positive rate: <3%
- Supports 12 languages
Cost Efficiency:
- Infrastructure cost: $0.002 per 1000 predictions
- 70% cost reduction vs. managed ML APIs
- Horizontal scaling: Linear cost with volume
Business Impact
- Client Satisfaction: 95% client retention rate
- Revenue: Enabled $5M ARR product line
- Insights: Detected 15 brand crises before viral spread
- Competitive Advantage: 10x faster than competitors
Technical Achievements
- Real-Time at Scale: Sub-second latency at 600+ QPS
- Custom Model: 5% better accuracy than off-the-shelf models
- Multi-Language: Trained multilingual sentiment model
- Production ML: Complete MLOps lifecycle with monitoring
- Cost Optimized: 70% cheaper than managed solutions
Challenges & Solutions
Challenge 1: Streaming Backpressure
- Problem: Kafka backlog during traffic spikes
- Solution: Auto-scaling Spark consumers, partition tuning
Challenge 2: Model Drift
- Problem: Language evolves, slang changes
- Solution: Weekly retraining with recent data + active learning
Challenge 3: Multi-Language Support
- Problem: Need to support 12 languages
- Solution: Multilingual transformers (mBERT, XLM-RoBERTa)
Key Learnings
- Streaming is Hard: Invest in monitoring and backpressure handling
- Model Caching: Redis caching reduced duplicate predictions by 40%
- Batch Processing: Batch inference 10x faster than individual requests
- Feature Engineering: Domain-specific features improved accuracy 5%
- Continuous Learning: Weekly retraining keeps model current
Technologies: Python, TensorFlow, Kafka, Spark, Docker, Kubernetes, Airflow
Role: ML Engineer
Impact: $5M ARR enablement, 50M+ posts/day processing
Duration: 4 months development + ongoing