A MLOps pipeline that transforms e-commerce behavior data into real-time purchase predictions. Built on modern open-source technologies including Kafka, Flink, Spark, Ray, and MLflow, this project demonstrates a complete ML lifecycle from data ingestion through model deployment. The system features automated CDC, multi-layer data warehousing, real-time feature serving, and comprehensive observability.
- π Dataset
- π Architecture Overview
- π Details
- Contributing
- π License
eCommerce Behavior Data from Multi Category Store
The dataset can be found here. This dataset contains behavior data from over 285 million user events on a large multi-category eCommerce website.
The data spans 7 months (October 2019 to April 2020) and captures user-product interactions like views, cart additions/removals, and purchases. Each event represents a many-to-many relationship between users and products.
The dataset was collected by the Open CDP project, an open source customer data platform that enables tracking and analysis of user behavior data.
| Field | Description |
|---|---|
| event_time | UTC timestamp when the event occurred |
| event_type | Type of user interaction event |
| product_id | Unique identifier for the product |
| category_id | Product category identifier |
| category_code | Product category taxonomy (when available for meaningful categories) |
| brand | Brand name (lowercase, may be missing) |
| price | Product price (float) |
| user_id | Permanent user identifier |
| user_session | Temporary session ID that changes after long user inactivity |
The dataset captures four types of user interactions:
- view: User viewed a product
- cart: User added a product to shopping cart
- remove_from_cart: User removed a product from shopping cart
- purchase: User purchased a product
The core modeling task is to predict whether a user will purchase a product at the moment they add it to their shopping cart.
We transform the raw event data into meaningful features for our machine learning model. The analysis focuses specifically on cart addition events and their subsequent outcomes.
Key engineered features include:
| Feature | Description |
|---|---|
| category_code_level1 | Main product category |
| category_code_level2 | Product sub-category |
| event_weekday | Day of week when cart addition occurred |
| activity_count | Total user activities in the current session |
| price | Original product price |
| brand | Product brand name |
| is_purchased | Target variable: whether cart item was eventually purchased |
You can download the dataset and put it under the data folder.
The system comprises four main componentsβData, Training, Serving, and Observabilityβalongside a Dev Environment and a Model Registry.
- Kafka Producer: Continuously emits user behavior events to
tracking.raw_user_behaviortopic - CDC Service: Uses Debezium to capture PostgreSQL changes, streaming to
tracking_postgres_cdc.public.events
- Validates incoming events from both sources
- Routes events to:
tracking.user_behavior.validatedfor valid eventstracking.user_behavior.invalidfor schema violations
- Handles ~10k events/second
- Alerts invalid events to Elasticsearch
- Data Lake (MinIO):
- External Storage
- Stores data in time-partitioned buckets (year/month/day/hour)
- Supports checkpointing for pipeline resilience
- Data Warehouse (PostgreSQL):
- Organized in bronze β silver β gold layers
- Houses dimension/fact tables for analysis purposes
- Offline Store (PostgreSQL):
- Used for training and batch feature serving
- Periodically materialized to online store
- Online Store (Redis):
- Low-latency feature serving
- Updated through streaming pipeline
- Exposed via Feature Retrieval API
- Transforms validated events into ML features
- Focuses on session-based metrics and purchase behavior
- Dual-writes to online/offline stores
- Ray Cluster:
- Handles distributed hyperparameter tuning via Ray Tune
- Executes final model training
- Integrates with MLflow for experiment tracking
- MLflow + MinIO + PostgreSQL:
- Tracks experiments, parameters, and metrics
- Versions model artifacts
- Provides model registry UI at
localhost:5001
- Ray Serve:
- Loads models from MLflow registry
- Automatically scales horizontally for high throughput
- Provides REST API for predictions
- Feature Service:
- FastAPI endpoint for feature retrieval
- Integrates with Redis for real-time features
- SigNoz:
- Collects OpenTelemetry data
- Provides service-level monitoring
- Accessible at
localhost:3301
- Ray Dashboard:
- Monitors training/serving jobs
- Available at
localhost:8265
- Prometheus + Grafana:
- Tracks Ray cluster metrics
- Visualizes system performance
- Accessible at
localhost:3009
- Superset:
- Visualize the data in the Data Warehouse
- Accessible at
localhost:8089
- Elasticsearch:
- Alert invalid events
- NGINX Proxy Manager:
- Reverse proxy for all services
- SSL/TLS termination
- Access control and routing
The architecture prioritizes reliability, scalability, and observability while maintaining clear separation of concerns between pipeline stages. Each component is containerized and can be deployed independently using Docker Compose.
All available commands can be found in the Makefile.
In this section, we will dive into the details of the system.
Please run the following command to setup the .env files:
cp .env.example .env
cp ./src/cdc/.env.example ./src/cdc/.env
cp ./src/model_registry/.env.example ./src/model_registry/.env
cp ./src/orchestration/.env.example ./src/orchestration/.env
cp ./src/producer/.env.example ./src/producer/.env
cp ./src/streaming/.env.example ./src/streaming/.envNote: I don't use any secrets in this project, so run the above command and you are good to go.
I will use the same network for all the services, first we need to create the network.
make up-networkmake up-kafkaThe last service in the docker-compose.kafka.yaml file is kafka_producer, this service acts as a producer and will start sending messages to the tracking.raw_user_behavior topic.
To check if Kafka is running, you can go to localhost:9021 and you should see the Kafka dashboard. Then go to the Topics tab and you should see tracking.raw_user_behavior topic.
To check if the producer is sending messages, you can click on the tracking.raw_user_behavior topic and you should see the messages being sent.
Here is an example of the message's value in the tracking.raw_user_behavior topic:
{
"schema": {
"type": "struct",
"fields": [
{
"name": "event_time",
"type": "string"
},
{
"name": "event_type",
"type": "string"
},
{
"name": "product_id",
"type": "long"
},
{
"name": "category_id",
"type": "long"
},
{
"name": "category_code",
"type": ["null", "string"],
"default": null
},
{
"name": "brand",
"type": ["null", "string"],
"default": null
},
{
"name": "price",
"type": "double"
},
{
"name": "user_id",
"type": "long"
},
{
"name": "user_session",
"type": "string"
}
]
},
"payload": {
"event_time": "2019-10-01 02:30:12 UTC",
"event_type": "view",
"product_id": 1306133,
"category_id": "2053013558920217191",
"category_code": "computers.notebook",
"brand": "xiaomi",
"price": 1029.37,
"user_id": 512900744,
"user_session": "76b918d5-b344-41fc-8632-baf222ec760f"
}
}make up-cdcNext, we start the CDC (Change Data Capture) service using Docker Compose. This setup includes the following components:
- Debezium: Monitors the Backend DB for any changes (inserts, updates, deletes) and captures those changes.
- PostgreSQL: The database where the changes are being monitored.
- A Python service: Registers the connector, creates the table, and inserts the data into PostgreSQL.
Steps involved:
- Debezium monitors the Backend DB for any changes. (2.1)
- Debezium captures these changes and pushes them to the Raw Events Topic in the message broker. (2.2)
The data is automatically synced from PostgreSQL to the tracking_postgres_cdc.public.events topic. To confirm this, go to the Connect tab in the Kafka UI; you should see a connector named cdc-postgresql.
Return to localhost:9021; there should be a new topic called tracking_postgres_cdc.public.events.
make schema_validationThis is a Flink job that will consume the tracking_postgres_cdc.public.events and tracking.raw_user_behavior topics and validate the schema of the events. The validated events will be sent to the tracking.user_behavior.validated topic and the invalid events will be sent to the tracking.user_behavior.invalid topic, respectively. For easier understanding, I don't push these Flink jobs into a Docker Compose file, but you can do it if you want. Watch the terminal to see the job running, the log may look like this:
We can handle 10k RPS, noting that approximately 10% of events are failures. I purposely make the producer send invalid events to the tracking.user_behavior.invalid topic. You can check this at line 127 in src/producer/produce.py.
After starting the job, you can go to localhost:9021 and you should see the tracking.user_behavior.validated and tracking.user_behavior.invalid topics.
Beside that, we can also start the alert_invalid_events job to alert the invalid events.
make alert_invalid_eventsNote: This feature of pushing the invalid events to Elasticsearch is not implemented yet, I will implement it in the future, but you can do it easily by modifying the src/streaming/jobs/alert_invalid_events_job.py file.
First, we need to start the Data Warehouse and the Online Store.
make up-dwh
make up-online-storeThe Data Warehouse is just a PostgreSQL instance.
The Online Store is a Redis instance.
Look at the docker-compose.online-store.yaml file, you will see 2 services, the redis service and the feature-retrieval service. The redis service is the Online Store, and the feature-retrieval service is the Feature Retrieval service.
The feature-retrieval service is a Python service that will run the following commands:
python api.py # Start a simple FastAPI app to retrieve the featuresTo view the Swagger UI, you can go to localhost:8001/docs. But before that, you need to run the ingest_stream job.
Then, we need to start the transformation job.
make ingest_streamThis is a Spark Streaming job that consumes events from the tracking.user_behavior.validated topic. It transforms raw user behavior data into structured machine learning features, focusing on session-based metrics and purchase behavior. The transformed data is then pushed to both online and offline feature stores, enabling real-time and batch feature serving for ML models. Periodically, the data is materialized to the online store.
The terminal will look like this:
Beside that, you can use any tool to visualize the offline store, for example, you can use DataGrip to connect to the dwh database and you should see the feature_store schema.
make up-orchestrationThis will start the Airflow service and the other services that are needed for the orchestration. Here is the list of services that will be started:
- MinIO (Data Lake)
- PostgreSQL (Data Warehouse)
- Ray Cluster
- MLflow (Model Registry)
- Prometheus & Grafana (for Ray monitoring)
Relevant URLs:
- π Airflow UI:
localhost:8080(user/password:airflow:airflow) - π Ray Dashboard:
localhost:8265 - π Grafana:
localhost:3009(user/password:admin:admin) - π₯οΈ MLflow UI:
localhost:5001
Go to the Airflow UI (default user and password is airflow:airflow) and you should see the data_pipeline and training_pipeline DAGs. These 2 DAGs are automatically triggered, but you can also trigger them manually.
Data from external sources is ingested into the Data Lake, then transformed into a format suitable for the Data Warehouse for analysis purposes.
To make it simple, I used the data from the tracking.user_behavior.validated topic in this data_pipeline DAG. To end this, we first start the Data Lake, then we create a connector to ingest the data from the tracking.user_behavior.validated topic to the Data Lake.
make up-data-lakeThe Data Lake is a MinIO instance, you can see the UI at localhost:9001 (user/password: minioadmin:minioadmin).
Next, we need to create a connector to ingest the data from the tracking.user_behavior.validated topic to the Data Lake.
make deploy_s3_connectorTo see the MinIO UI, you can go to localhost:9001 (default user and password is minioadmin:minioadmin). There are 2 buckets, validated-events-bucket and invalidated-events-bucket, you can go to each bucket and you should see the events being synced.
Each record in buckets is a JSON file, you can click on the file and you should see the event.
The data_pipeline DAG is divided into three layers:
- ingest_raw_data - Ingests raw data from the Data Lake.
- quality_check_raw_data - Performs validations on the ingested raw data, ensuring data integrity.
- transform_data - Cleans and transforms validated raw data, preparing it for downstream usage.
- create dim and fact tables - Creates dimension and fact tables in the Data Warehouse for analysis.
Trigger the data_pipeline DAG, and you should see the tasks running. This DAG will take some time to complete, but you can check the logs in the Airflow UI to monitor the progress. For simplicity, I hardcoded the MINIO_PATH_PREFIX to topics/tracking.user_behavior.validated/year=2025/month=01. Ideally, you should use the actual timestamp for each run. For example, validated-events-bucket/topics/tracking.user_behavior.validated/year=2025/month=01/day=07/hour=XX, where XX is the hour of the day.
I also use checkpointing to ensure the DAG is resilient to failures and can resume from where it left off. The checkpoint is stored in the Data Lake, just under the MINIO_PATH_PREFIX, so if the DAG fails, you can simply trigger it again, and it will resume from the last checkpoint.
To visualize the data, you can use Superset.
make up-supersetThen go to localhost:8089 and you should see the Superset dashboard. Connect to the dwh database and you should see the dwh schema.
The training_pipeline DAG is composed of these steps:
- Load Data - Pulls processed data from the Data Warehouse for use in training the machine learning model.
- Tune Hyperparameters - Utilizes Ray Tune to perform distributed hyperparameter tuning, optimizing the model's performance.
- Train Final Model - Trains the final machine learning model using the best hyperparameters from the tuning phase.
- Save Results - Saves the trained model and associated metrics to the Model Registry for future deployment and evaluation.
Trigger the training_pipeline DAG, and you should see the tasks running. This DAG will take some time to complete, but you can check the logs in the Airflow UI to see the progress.
After hitting the Trigger DAG button, you should see the tasks running. The tune_hyperparameters task will be deferred because it will submit the Ray Tune job to the Ray Cluster and use polling to check if the job is done. The same happens with the train_final_model task.
When the tune_hyperparameters or train_final_model tasks are running, you can go to the Ray Dashboard at localhost:8265 and you should see the tasks running.
Click on the task and you should see the task details, including the id, status, time, logs, and more.
To see the results of the training, you can go to the MLflow UI at localhost:5001 and you should see the training results.
The model will be versioned in the Model Registry, you can go to localhost:5001 and hit the Models tab and you should see the model.
make up-servingThis command will start the Serving Pipeline. Note that we did not port forward the 8000 port in the docker-compose.serving.yaml file, but we just expose it. The reason is that we use Ray Serve, and the job will be submitted to the Ray Cluster. That is the reason why you see the port 8000 in the docker-compose.serving.ray file instead of the docker-compose.serving.yaml file.
Currently, you have to manually restart the Ray Serve job (aka docker container) to load new model from the Model Registry. But in the future, I will add a feature to automatically load the new model from the Model Registry (Jenkins).
make up-observabilityThis command will start the Observability Pipeline. This is a SigNoz instance that will receive the data from the OpenTelemetry Collector. Go to localhost:3301 and you should see the SigNoz dashboard.
To see the Ray Cluster information, you can go to localhost:3009 (user/password: admin:admin) and you should see the Grafana dashboard.
Note: If you dont see the dashboards, please remove the tmp/ray folder and then restart Ray Cluster and Grafana again.
make up-nginxThis command will start the NGINX Proxy Manager, which provides a user-friendly interface for configuring reverse proxies and SSL certificates. Access the UI at localhost:81 using the default credentials:
- Username:
[email protected] - Password:
changeme
Key configuration options include:
- Free SSL certificate management using:
- Let's Encrypt
- Cloudflare SSL
- Free dynamic DNS providers:
- Setting up reverse proxies for services like Signoz, Ray Dashboard, MLflow, and Grafana.
Security Tip: Change the default password immediately after first login to protect your proxy configuration.
This project is open to contributions. Please feel free to submit a PR.
This project is provided under an MIT license. See the LICENSE file for details.
























