# E-commerce ETL Data Warehouse Project

**Tools:** Python, SQL, Power BI Desktop, DAX, PostgreSQL 16, pgAdmin 4, VS Code, Windows Command Prompt/PowerShell, star schema

**Skills:** chunked data loading, foreign key validation, referential integrity enforcement, comprehensive data quality checks, SQL aggregate functions, error handling and recovert, perforance monitoring, business logic validation

<figure><img src="https://539050446-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FW65tN1jweulcjF26J7LM%2Fuploads%2FwTECbicqxx9ufhHPNCHC%2Fhow-etl-pipeline-works.png?alt=media&#x26;token=5eed4d86-c0d6-4495-b4ca-d51f23936512" alt=""><figcaption></figcaption></figure>

These days, business need to analyze very large amount of data. And on top of that, they will also need a lot of time and effort to transform raw unorganized data into processed data that suits their data needs and what they are looking for in those messy data.

In this project, I attempted to build a data pipeline that transform raw data into business ready data which can then be analyzed and gather insight from. The processed data will be displayed on a Power BI dashboard which the executives can easily gather insights from the charts and score cards in the dashobard.

## **Source of the data**

The data for this project are provided by a Brazilian e-commerce public dataset provided by Olist. You can find the dataset [here](https://www.kaggle.com/datasets/olistbr/brazilian-ecommerce). You can find 8 csv files in this dataset. We will only use 6 of them. They are:

* olist\_customers\_dataset
* olist\_order\_items\_dataset
* olist\_order\_payments\_dataset
* olist\_order\_reviews\_dataset
* olist\_orders\_dataset
* olist\_products\_dataset

## Defining business questions

As we aim to build Power BI dashboard that gives business insights to executives, we will first need to define the business questions that need to be answered for this project. For the purpose of building a meaningful dashobard, we will define our business questiosn as follows:

1. **"How can we improve customer retention and identify high-value customer segments?"**
2. **"What are our key revenue drivers and how can we optimize operational performance?"**

## Project setup and planning

PostgreSQL 18 was used for this project. Before we lay down the groundwork for this project that is setting up the extract layer of the data pipeline, a database was created using pgAdmin 4 in order to let us store the processed data once we have finished the data pipeline and finish loading the processed data.

![](https://539050446-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FW65tN1jweulcjF26J7LM%2Fuploads%2FnX9Tzd7N0JQjXqkXzRKN%2FScreenshot%202026-01-30%20171145.png?alt=media\&token=da65be2f-b051-433b-811b-1fd7e9b97318)&#x20;

> pgAdmin 4 interface

Apart from Postgre SQL 18, VS Code will also be used for coding and implementing the data pipeline once we have finished building it.

## Data Extraction

Now we will start laying down the foundation layer of this data pipeline - the extraction layer.

Apart from the dataset used for this project, we would also need a Python script to act as a logger, and another 6 Python scripts to extract the data from the 6 chosen csv files.

#### Scripts created

* logger.py
* 6 Python scripts for each csv file

#### Data extracted

* **olist\_customers\_dataset.csv** 99,441 rows | 5 cols&#x20;
* **olist\_orders\_dataset.csv** 99,441 rows | 8 cols&#x20;
* **olist\_order\_items\_dataset.csv** 112,650 rows | 7 cols&#x20;
* **olist\_order\_payments\_dataset.csv** 103,886 rows | 5 cols&#x20;
* **olist\_order\_reviews\_dataset.csv** 99,224 rows | 7 cols&#x20;
* **olist\_products\_dataset.csv** 32,951 rows | 9 cols&#x20;

#### Achievements

* Checked file sizes and record counts
* Identified null values and data types

## Data Transformation

Next, we will have to build the transformation layer which it will transofrm the extracted raw data into data that suits our business need.

#### Star schema

<figure><img src="https://539050446-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FW65tN1jweulcjF26J7LM%2Fuploads%2FSrxHfhwQyOO6dTf6vtVU%2FStar%20Schema%20Diagram.png?alt=media&#x26;token=097e92e8-c468-4044-8a4e-33f0f1af7347" alt=""><figcaption></figcaption></figure>

> Star schema for this project

In order to suit our business needs, we are going to transform our raw data into two fact tables and four dimension tables. Apart from the "fact\_orders", we will also create a "fact\_cohort\_retention" for the purpose of identifying customers who placed more than one order.

#### Data transformed

* **dim\_date.csv** 791 rows | 16 columns&#x20;
* **dim\_products.csv** 32,951 rows | 13 columns&#x20;
* **dim\_customers.csv** 99,441 rows | 18 columns&#x20;
* **dim\_payment\_type.csv** 5 rows | 4 columns&#x20;
* **fact\_orders.csv** 99,992 rows | 25 columns&#x20;
* **fact\_cohort\_retention.csv** 25 rows | 7 columns

#### Dimension table creation

**dim\_date - Date Dimension**

* Generated complete date range (2016-09-01 to 2018-10-31)
* Attributes: date\_key, full\_date, year, quarter, month, month\_name, week, day\_of\_week, day\_name, is\_weekend, is\_holiday, fiscal\_year, fiscal\_quarter, created\_at, date\_str
* Surrogate key format: YYYYMMDD (e.g., 20170115)

**dim\_products - Product Dimension**

* Extracted unique products from raw data
* Attributes: product\_key, product\_id, product\_category\_segment
* Surrogate key: Sequential integer starting at 1
* Handled missing categories

**dim\_customers - Customer Dimension**

* Extracted unique customers with location data
* Calculated lifetime value (CLV) per customer
* Created customer segmentation (High/Medium/Low based on CLV)
* Mapped states to regions (North, Northeast, Southeast, South, Central-West)
* Attributes: customer\_key, customer\_id, customer\_unique\_id, customer\_city, customer\_state, customer\_region, customer\_segment, first\_order\_date, last\_order\_date, total\_orders, lifetime\_value
* Surrogate key: Sequential integer (customer\_key)

**dim\_payment\_type - Payment Type Dimension**

* Created lookup table for payment methods
* Types: credit\_card, boleto, voucher, debit\_card, not\_defined
* Attributes: payment\_type\_key, payment\_type
* Surrogate key: Sequential integer (1-5)

#### Fact table creation

**fact\_orders - Order Transactions**

* Joined with the other four dimension tables
* Metrics calculated:
  * order\_total\_value = sum of item prices + freight
  * delivery\_days = days between purchase and delivery
  * delivery\_delay\_days = actual delivery - estimated delivery
  * is\_late\_delivery = TRUE if delay > 0
  * is\_completed\_order = TRUE if status = 'delivered'
* Aggregated to order-level (one row per order)

**fact\_cohort\_retention - Customer Retention**

* Grouped customers by first purchase month (cohort)
* Calculated months since first purchase (0-12 months)
* Computed retention rate: (active customers / cohort size) \* 100
* Attributes: cohort\_month, months\_since\_first\_purchase, retention\_rate, retained\_customers

#### transform\_all.py

* Master Transform Orchestrator
* Runs all transformations in correct dependency order
* Demonstrates production-level ETL orchestration

#### Business logic - Customer segmentation

```
def _segment_customer(self, row):
        """
        Segment customer based on business rules
        
        Segmentation:
        - New: 1 order
        - Returning: 2-5 orders
        - VIP: >5 orders OR lifetime_value > threshold
        """
        total_orders = row['total_orders']
        lifetime_value = row['lifetime_value'] if pd.notna(row['lifetime_value']) else 0
        
        vip_threshold = self.segmentation_rules['vip_customer_min_value']
        vip_orders = self.segmentation_rules['vip_customer_min_orders']
        returning_max = self.segmentation_rules['returning_customer_max_orders']
        
        if total_orders == 0:
            return 'Inactive'
        elif total_orders >= vip_orders or lifetime_value >= vip_threshold:
            return 'VIP'
        elif total_orders <= 1:
            return 'New'
        elif total_orders <= returning_max:
            return 'Returning'
        else:
            return 'Loyal'
```

#### Business logic - **Cohort Retention**

```
class CohortRetentionBuilder:
    """Build cohort retention fact table"""
    
    def __init__(self):
        """Initialize cohort retention builder"""
        logger.info("CohortRetentionBuilder initialized")
    
    def build(self, dim_customers):
        """
        Build cohort retention table
        
        Args:
            dim_customers: Customer dimension with first_order_date
        
        Returns:
            DataFrame: Cohort retention fact table
        """
        try:
            logger.info("Building FACT_COHORT_RETENTION...")
            
            # Step 1: Extract orders
            logger.info("Step 1/5: Extracting orders...")
            orders_ext = OrdersExtractor()
            df_orders = orders_ext.extract()
            logger.info(f"  ✓ Loaded {len(df_orders):,} orders")
            
            # Step 2: Get customer first order dates from dimension
            logger.info("Step 2/5: Getting customer cohorts...")
            
            df_customers_cohort = dim_customers[['customer_id', 'first_order_date']].copy()
            
            # Create cohort month (YYYY-MM format)
            df_customers_cohort['cohort_month'] = pd.to_datetime(
                df_customers_cohort['first_order_date']
            ).dt.to_period('M')
            
            logger.info(f"  ✓ Identified {df_customers_cohort['cohort_month'].nunique()} cohorts")
            
            # Step 3: Prepare orders data
            logger.info("Step 3/5: Preparing order data for cohort analysis...")
            
            df_orders_cohort = df_orders[['customer_id', 'order_purchase_timestamp']].copy()
            df_orders_cohort['order_month'] = pd.to_datetime(
                df_orders_cohort['order_purchase_timestamp']
            ).dt.to_period('M')
            
            # Join orders with customer cohorts
            df_analysis = df_orders_cohort.merge(
                df_customers_cohort,
                on='customer_id',
                how='inner'
            )
            
            logger.info(f"  ✓ Prepared {len(df_analysis):,} order records for analysis")
            
            # Step 4: Calculate months since first purchase
            logger.info("Step 4/5: Calculating retention by cohort...")
            
            # Calculate months difference
            df_analysis['months_since_first_purchase'] = (
                (df_analysis['order_month'] - df_analysis['cohort_month']).apply(lambda x: x.n)
            )
            
            # Convert periods back to dates for grouping
            df_analysis['cohort_month_date'] = df_analysis['cohort_month'].dt.to_timestamp()
            
            # Use Polars for fast aggregation
            df_analysis_pl = pl.from_pandas(df_analysis[[
                'cohort_month_date', 'months_since_first_purchase', 'customer_id'
            ]])
            
            # Group by cohort and months since first purchase
            cohort_data_pl = df_analysis_pl.group_by([
                'cohort_month_date', 'months_since_first_purchase'
            ]).agg([
                pl.col('customer_id').n_unique().alias('retained_customers')
            ]).sort(['cohort_month_date', 'months_since_first_purchase'])
            
            cohort_data = cohort_data_pl.to_pandas()
            
            logger.info(f"  ✓ Calculated retention for {len(cohort_data):,} cohort-month combinations")
            
            # Step 5: Calculate cohort sizes and retention rates
            logger.info("Step 5/5: Calculating cohort sizes and retention rates...")
            
            # Get cohort sizes (month 0 = acquisition month)
            cohort_sizes = cohort_data[
                cohort_data['months_since_first_purchase'] == 0
            ][['cohort_month_date', 'retained_customers']].copy()
            cohort_sizes.columns = ['cohort_month_date', 'cohort_size']
            
            # Merge cohort sizes
            cohort_data = cohort_data.merge(
                cohort_sizes,
                on='cohort_month_date',
                how='left'
            )
            
            # Calculate retention rate
            cohort_data['retention_rate'] = (
                cohort_data['retained_customers'] / cohort_data['cohort_size'] * 100
            ).round(2)
            
            # Rename for final output
            cohort_data = cohort_data.rename(columns={
                'cohort_month_date': 'cohort_month'
            })
            
            # Create surrogate key
            cohort_data = cohort_data.reset_index(drop=True)
            cohort_data.insert(0, 'cohort_retention_key', cohort_data.index + 1)
            
            # Add created_at
            cohort_data['created_at'] = pd.Timestamp.now()
            
            # Reorder columns
            final_columns = [
                'cohort_retention_key',
                'cohort_month',
                'months_since_first_purchase',
                'cohort_size',
                'retained_customers',
                'retention_rate',
                'created_at'
            ]
            
            cohort_data = cohort_data[final_columns]
            
            logger.info(f"✓ FACT_COHORT_RETENTION built successfully: {len(cohort_data):,} rows")
            
            # Log insights
            logger.info("\n" + "="*60)
            logger.info("COHORT RETENTION INSIGHTS:")
            logger.info(f"  - Total cohorts: {cohort_data['cohort_month'].nunique()}")
            logger.info(f"  - Average cohort size: {cohort_data[cohort_data['months_since_first_purchase']==0]['cohort_size'].mean():.0f} customers")
            
            # Month 1 retention (repeat purchase rate)
            month_1_retention = cohort_data[
                cohort_data['months_since_first_purchase'] == 1
            ]['retention_rate'].mean()
            logger.info(f"  - Average Month 1 retention: {month_1_retention:.1f}%")
            
            # Month 3 retention
            month_3_retention = cohort_data[
                cohort_data['months_since_first_purchase'] == 3
            ]['retention_rate'].mean()
            if pd.notna(month_3_retention):
                logger.info(f"  - Average Month 3 retention: {month_3_retention:.1f}%")
            
            logger.info("="*60 + "\n")
            
            return cohort_data
            
        except Exception as e:
            logger.error(f"✗ FACT_COHORT_RETENTION build failed: {e}")
            import traceback
            traceback.print_exc()
            raise
```

#### Skills Applied

* Dimensional modeling (star schema design)
* Data transformation and cleansing
* Surrogate key generation
* Business logic implementation
* Cohort analysis methodology
* Pandas advanced operations (groupby, merge, pivot)
* Date/time manipulation
* Statistical calculations (percentiles, aggregations)

## Loading the data

After finish transforming the data, we will now load the transformed data into our PostgreSQL database.

#### create\_schema.py

```
"""
Create PostgreSQL schema for data warehouse
Defines all dimension and fact tables WITHOUT foreign key constraints
"""

import sys
import os

sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))

from src.utils.logger import setup_logger
from src.utils.db_connector import DatabaseConnector

logger = setup_logger('create_schema')


class SchemaCreator:
    """Create data warehouse schema in PostgreSQL"""
    
    def __init__(self):
        """Initialize schema creator"""
        self.db = DatabaseConnector()
        logger.info("SchemaCreator initialized")
    
    def create_all_tables(self):
        """Create all dimension and fact tables"""
        try:
            logger.info("Creating data warehouse schema...")
            
            # Drop existing tables (in reverse dependency order)
            logger.info("Dropping existing tables if they exist...")
            self._drop_tables()
            
            # Create dimension tables first (no FK dependencies)
            logger.info("\nCreating dimension tables...")
            self._create_dim_date()
            self._create_dim_products()
            self._create_dim_payment_type()
            self._create_dim_customers()
            
            # Create fact tables (depend on dimensions)
            logger.info("\nCreating fact tables...")
            self._create_fact_orders()
            self._create_fact_cohort_retention()
            
            # Create indexes for performance
            logger.info("\nCreating indexes...")
            self._create_indexes()
            
            logger.info("\n" + "="*80)
            logger.info("✓ DATA WAREHOUSE SCHEMA CREATED SUCCESSFULLY")
            logger.info("="*80 + "\n")
            
            # Verify tables
            self._verify_tables()
            
        except Exception as e:
            logger.error(f"✗ Schema creation failed: {e}")
            raise
        finally:
            self.db.close_pool()
    
    def _drop_tables(self):
        """Drop existing tables in reverse dependency order"""
        drop_queries = [
            "DROP TABLE IF EXISTS fact_cohort_retention CASCADE;",
            "DROP TABLE IF EXISTS fact_orders CASCADE;",
            "DROP TABLE IF EXISTS dim_customers CASCADE;",
            "DROP TABLE IF EXISTS dim_payment_type CASCADE;",
            "DROP TABLE IF EXISTS dim_products CASCADE;",
            "DROP TABLE IF EXISTS dim_date CASCADE;"
        ]
        
        for query in drop_queries:
            self.db.execute_query(query)
        
        logger.info("  ✓ Dropped existing tables")
    
    def _create_dim_date(self):
        """Create date dimension table"""
        query = """
        CREATE TABLE dim_date (
            date_key INTEGER PRIMARY KEY,
            full_date DATE NOT NULL UNIQUE,
            year INTEGER NOT NULL,
            quarter INTEGER NOT NULL,
            month INTEGER NOT NULL,
            month_name VARCHAR(20) NOT NULL,
            week INTEGER NOT NULL,
            day_of_month INTEGER NOT NULL,
            day_of_week INTEGER NOT NULL,
            day_name VARCHAR(20) NOT NULL,
            is_weekend BOOLEAN NOT NULL,
            is_holiday BOOLEAN NOT NULL,
            fiscal_year INTEGER NOT NULL,
            fiscal_quarter INTEGER NOT NULL,
            created_at TIMESTAMP NOT NULL
        );
        """
        self.db.execute_query(query)
        logger.info("  ✓ Created dim_date")
    
    def _create_dim_products(self):
        """Create product dimension table"""
        query = """
        CREATE TABLE dim_products (
            product_key INTEGER PRIMARY KEY,
            product_id VARCHAR(50) NOT NULL UNIQUE,
            product_category_name VARCHAR(100),
            product_category_english VARCHAR(100),
            product_category_segment VARCHAR(50),
            product_weight_g INTEGER,
            product_length_cm INTEGER,
            product_height_cm INTEGER,
            product_width_cm INTEGER,
            product_volume_cm3 NUMERIC(12, 2),
            product_photos_qty INTEGER,
            has_photos BOOLEAN,
            created_at TIMESTAMP NOT NULL
        );
        """
        self.db.execute_query(query)
        logger.info("  ✓ Created dim_products")
    
    def _create_dim_payment_type(self):
        """Create payment type dimension table"""
        query = """
        CREATE TABLE dim_payment_type (
            payment_type_key INTEGER PRIMARY KEY,
            payment_type VARCHAR(50) NOT NULL UNIQUE,
            payment_category VARCHAR(50) NOT NULL,
            created_at TIMESTAMP NOT NULL
        );
        """
        self.db.execute_query(query)
        logger.info("  ✓ Created dim_payment_type")
    
    def _create_dim_customers(self):
        """Create customer dimension table"""
        query = """
        CREATE TABLE dim_customers (
            customer_key INTEGER PRIMARY KEY,
            customer_id VARCHAR(50) NOT NULL UNIQUE,
            customer_unique_id VARCHAR(50),
            customer_city VARCHAR(100),
            customer_state VARCHAR(2),
            customer_region VARCHAR(50),
            customer_segment VARCHAR(20),
            first_order_date TIMESTAMP,
            last_order_date TIMESTAMP,
            total_orders INTEGER,
            delivered_orders INTEGER,
            total_spent NUMERIC(12, 2),
            avg_order_value NUMERIC(12, 2),
            lifetime_value NUMERIC(12, 2),
            days_as_customer INTEGER,
            purchase_frequency_annual NUMERIC(12, 2),
            created_at TIMESTAMP NOT NULL,
            updated_at TIMESTAMP NOT NULL
        );
        """
        self.db.execute_query(query)
        logger.info("  ✓ Created dim_customers")
    
    def _create_fact_orders(self):
        """Create fact orders table WITHOUT foreign key constraints"""
        query = """
        CREATE TABLE fact_orders (
            order_key BIGSERIAL PRIMARY KEY,
            customer_key INTEGER,
            product_key INTEGER,
            order_date_key INTEGER,
            delivery_date_key INTEGER,
            payment_type_key INTEGER,
            order_id VARCHAR(50) NOT NULL UNIQUE,
            order_status VARCHAR(50),
            seller_id VARCHAR(50),
            order_item_count INTEGER,
            order_subtotal NUMERIC(12, 2),
            order_freight_total NUMERIC(12, 2),
            order_total_value NUMERIC(12, 2),
            payment_value NUMERIC(12, 2),
            payment_installments INTEGER,
            delivery_days INTEGER,
            estimated_delivery_days INTEGER,
            delivery_delay_days INTEGER,
            is_late_delivery BOOLEAN,
            is_completed_order BOOLEAN,
            review_score INTEGER,
            has_review BOOLEAN,
            order_purchase_timestamp TIMESTAMP,
            order_delivered_customer_date TIMESTAMP,
            created_at TIMESTAMP NOT NULL
        );
        """
        self.db.execute_query(query)
        logger.info("  ✓ Created fact_orders (NO FK constraints for flexibility)")
    
    def _create_fact_cohort_retention(self):
        """Create cohort retention fact table"""
        query = """
        CREATE TABLE fact_cohort_retention (
            cohort_retention_key SERIAL PRIMARY KEY,
            cohort_month DATE NOT NULL,
            months_since_first_purchase INTEGER NOT NULL,
            cohort_size INTEGER NOT NULL,
            retained_customers INTEGER NOT NULL,
            retention_rate NUMERIC(5, 2) NOT NULL,
            created_at TIMESTAMP NOT NULL,
            UNIQUE(cohort_month, months_since_first_purchase)
        );
        """
        self.db.execute_query(query)
        logger.info("  ✓ Created fact_cohort_retention")
    
    def _create_indexes(self):
        """Create indexes for query performance"""
        indexes = [
            # Date dimension indexes
            "CREATE INDEX idx_dim_date_full_date ON dim_date(full_date);",
            "CREATE INDEX idx_dim_date_year_month ON dim_date(year, month);",
            
            # Product dimension indexes
            "CREATE INDEX idx_dim_products_category_segment ON dim_products(product_category_segment);",
            "CREATE INDEX idx_dim_products_category_english ON dim_products(product_category_english);",
            
            # Customer dimension indexes
            "CREATE INDEX idx_dim_customers_segment ON dim_customers(customer_segment);",
            "CREATE INDEX idx_dim_customers_region ON dim_customers(customer_region);",
            "CREATE INDEX idx_dim_customers_unique_id ON dim_customers(customer_unique_id);",
            "CREATE INDEX idx_dim_customers_clv ON dim_customers(lifetime_value);",
            
            # Fact orders indexes (for fast queries even without FK constraints)
            "CREATE INDEX idx_fact_orders_customer_key ON fact_orders(customer_key);",
            "CREATE INDEX idx_fact_orders_product_key ON fact_orders(product_key);",
            "CREATE INDEX idx_fact_orders_order_date_key ON fact_orders(order_date_key);",
            "CREATE INDEX idx_fact_orders_delivery_date_key ON fact_orders(delivery_date_key);",
            "CREATE INDEX idx_fact_orders_payment_type_key ON fact_orders(payment_type_key);",
            "CREATE INDEX idx_fact_orders_status ON fact_orders(order_status);",
            "CREATE INDEX idx_fact_orders_completed ON fact_orders(is_completed_order);",
            "CREATE INDEX idx_fact_orders_late ON fact_orders(is_late_delivery);",
            
            # Composite indexes for common query patterns
            "CREATE INDEX idx_fact_orders_customer_date ON fact_orders(customer_key, order_date_key);",
            "CREATE INDEX idx_fact_orders_date_status ON fact_orders(order_date_key, order_status);",
            
            # Cohort retention indexes
            "CREATE INDEX idx_fact_cohort_month ON fact_cohort_retention(cohort_month);",
            "CREATE INDEX idx_fact_cohort_months_since ON fact_cohort_retention(months_since_first_purchase);"
        ]
        
        for idx_query in indexes:
            try:
                self.db.execute_query(idx_query)
            except Exception as e:
                logger.warning(f"  ⚠ Index creation warning: {e}")
        
        logger.info(f"  ✓ Created {len(indexes)} indexes")
    
    def _verify_tables(self):
        """Verify all tables were created"""
        query = """
        SELECT table_name 
        FROM information_schema.tables 
        WHERE table_schema = 'public' 
        AND table_type = 'BASE TABLE'
        ORDER BY table_name;
        """
        
        tables = self.db.fetch_query(query)
        
        logger.info("\nVerifying tables created:")
        for table in tables:
            logger.info(f"  ✓ {table[0]}")
        
        logger.info(f"\nTotal tables created: {len(tables)}")


# Main execution
if __name__ == "__main__":
    print("\n" + "="*80)
    print("CREATING DATA WAREHOUSE SCHEMA IN POSTGRESQL")
    print("="*80 + "\n")
    
    creator = SchemaCreator()
    creator.create_all_tables()
    
    print("\n✓ Schema creation complete!")
    print("\nNext step: Load transformed data into tables")

```

#### **load\_dimensions.py**

* Load transformed dimensions into database

```
"""
Load dimension tables from transformed CSV files into PostgreSQL
Uses pandas to_sql for efficient bulk loading
"""

import pandas as pd
import sys
import os
from datetime import datetime

sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))

from src.utils.logger import setup_logger
from src.utils.db_connector import DatabaseConnector

logger = setup_logger('load_dimensions')


class DimensionLoader:
    """Load dimension tables into PostgreSQL"""
    
    def __init__(self, staging_dir='data/staging'):
        """
        Initialize dimension loader
        
        Args:
            staging_dir: Directory containing transformed CSV files
        """
        self.staging_dir = staging_dir
        self.db = DatabaseConnector()
        logger.info(f"DimensionLoader initialized with staging dir: {staging_dir}")
    
    def load_all_dimensions(self):
        """Load all dimension tables"""
        try:
            logger.info("Loading dimension tables...")
            
            # Load in dependency order
            self._load_dim_date()
            self._load_dim_products()
            self._load_dim_payment_type()
            self._load_dim_customers()
            
            logger.info("\n" + "="*80)
            logger.info("✓ ALL DIMENSION TABLES LOADED SUCCESSFULLY")
            logger.info("="*80 + "\n")
            
            # Verify row counts
            self._verify_loads()
            
        except Exception as e:
            logger.error(f"✗ Dimension loading failed: {e}")
            raise
        finally:
            self.db.close_pool()
    
    def _load_dim_date(self):
        """Load date dimension"""
        logger.info("\n[1/4] Loading dim_date...")
        
        # Read CSV
        filepath = os.path.join(self.staging_dir, 'dim_date.csv')
        df = pd.read_csv(filepath)
        
        logger.info(f"  - Read {len(df):,} rows from CSV")
        
        # Convert date column
        df['full_date'] = pd.to_datetime(df['full_date'])
        df['created_at'] = pd.to_datetime(df['created_at'])
        
        # Define expected columns (15 columns total - NO date_str)
        expected_columns = [
            'date_key', 'full_date', 'year', 'quarter', 'month', 
            'month_name', 'week', 'day_of_month', 'day_of_week', 
            'day_name', 'is_weekend', 'is_holiday', 'fiscal_year', 
            'fiscal_quarter', 'created_at'
        ]
        
        # Remove any extra columns (like date_str)
        df = df[expected_columns]
        
        # Load to PostgreSQL using pandas to_sql
        engine = self.db.get_engine()
        
        df.to_sql(
            'dim_date',
            engine,
            if_exists='append',
            index=False,
            method='multi',
            chunksize=1000
        )
        
        logger.info(f"  ✓ Loaded {len(df):,} rows into dim_date")
    
    def _load_dim_products(self):
        """Load product dimension"""
        logger.info("\n[2/4] Loading dim_products...")
        
        filepath = os.path.join(self.staging_dir, 'dim_products.csv')
        df = pd.read_csv(filepath)
        
        logger.info(f"  - Read {len(df):,} rows from CSV")
        
        # Convert timestamps
        df['created_at'] = pd.to_datetime(df['created_at'])
        
        # KEEP product_key from CSV
        
        # Load to PostgreSQL
        engine = self.db.get_engine()
        
        df.to_sql(
            'dim_products',
            engine,
            if_exists='append',
            index=False,
            method='multi',
            chunksize=1000
        )
        
        logger.info(f"  ✓ Loaded {len(df):,} rows into dim_products")
    
    def _load_dim_payment_type(self):
        """Load payment type dimension"""
        logger.info("\n[3/4] Loading dim_payment_type...")
        
        filepath = os.path.join(self.staging_dir, 'dim_payment_type.csv')
        df = pd.read_csv(filepath)
        
        logger.info(f"  - Read {len(df)} rows from CSV")
        
        # Convert timestamps
        df['created_at'] = pd.to_datetime(df['created_at'])
        
        # KEEP payment_type_key from CSV
        
        # Load to PostgreSQL
        engine = self.db.get_engine()
        
        df.to_sql(
            'dim_payment_type',
            engine,
            if_exists='append',
            index=False,
            method='multi'
        )
        
        logger.info(f"  ✓ Loaded {len(df)} rows into dim_payment_type")
    
    def _load_dim_customers(self):
        """Load customer dimension"""
        logger.info("\n[4/4] Loading dim_customers...")
        
        filepath = os.path.join(self.staging_dir, 'dim_customers.csv')
        df = pd.read_csv(filepath)
        
        logger.info(f"  - Read {len(df):,} rows from CSV")
        
        # Convert timestamps
        date_columns = ['first_order_date', 'last_order_date', 'created_at', 'updated_at']
        for col in date_columns:
            if col in df.columns:
                df[col] = pd.to_datetime(df[col])
        
        # KEEP customer_key from CSV
        
        # Load to PostgreSQL
        engine = self.db.get_engine()
        
        df.to_sql(
            'dim_customers',
            engine,
            if_exists='append',
            index=False,
            method='multi',
            chunksize=5000
        )
        
        logger.info(f"  ✓ Loaded {len(df):,} rows into dim_customers")
    
    def _verify_loads(self):
        """Verify dimension tables were loaded correctly"""
        logger.info("\nVerifying dimension table loads:")
        
        tables = ['dim_date', 'dim_products', 'dim_payment_type', 'dim_customers']
        
        for table in tables:
            query = f"SELECT COUNT(*) FROM {table};"
            result = self.db.fetch_query(query)
            count = result[0][0]
            logger.info(f"  - {table}: {count:,} rows")


# Main execution
if __name__ == "__main__":
    print("\n" + "="*80)
    print("LOADING DIMENSION TABLES INTO POSTGRESQL")
    print("="*80 + "\n")
    
    loader = DimensionLoader()
    loader.load_all_dimensions()
    
    print("\n✓ Dimension tables loaded successfully!")
    print("\nNext step: Load fact tables")

```

#### load\_facts.py

* Load the two fact tables into the database

```
"""
Load fact tables from transformed CSV files into PostgreSQL
Uses COPY FROM for maximum speed and bypasses all constraints
"""

import pandas as pd
import sys
import os
from io import StringIO

sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))

from src.utils.logger import setup_logger
from src.utils.db_connector import DatabaseConnector

logger = setup_logger('load_facts')


class FactLoader:
    """Load fact tables into PostgreSQL"""
    
    def __init__(self, staging_dir='data/staging'):
        """
        Initialize fact loader
        
        Args:
            staging_dir: Directory containing transformed CSV files
        """
        self.staging_dir = staging_dir
        self.db = DatabaseConnector()
        logger.info(f"FactLoader initialized with staging dir: {staging_dir}")
    
    def load_all_facts(self):
        """Load all fact tables"""
        try:
            logger.info("Loading fact tables...")
            
            # Load fact orders
            self._load_fact_orders()
            
            # Load cohort retention
            self._load_fact_cohort_retention()
            
            logger.info("\n" + "="*80)
            logger.info("✓ ALL FACT TABLES LOADED SUCCESSFULLY")
            logger.info("="*80 + "\n")
            
            # Verify loads
            self._verify_loads()
            
            # Show sample analytics
            self._show_analytics()
            
        except Exception as e:
            logger.error(f"✗ Fact loading failed: {e}")
            raise
        finally:
            self.db.close_pool()
    
    def _load_fact_orders(self):
        """Load fact orders - using chunked pandas loading"""
        logger.info("\n[1/2] Loading fact_orders...")
        
        # Read CSV
        filepath = os.path.join(self.staging_dir, 'fact_orders.csv')
        df = pd.read_csv(filepath)
        
        logger.info(f"  - Read {len(df):,} rows from CSV")
        
        # Convert timestamps
        timestamp_columns = ['order_purchase_timestamp', 'order_delivered_customer_date', 'created_at']
        for col in timestamp_columns:
            if col in df.columns:
                df[col] = pd.to_datetime(df[col], errors='coerce')
        
        # Drop order_key if present
        if 'order_key' in df.columns:
            df = df.drop('order_key', axis=1)
        
        # Check for required columns
        required_columns = [
            'customer_key', 'product_key', 'order_date_key', 'delivery_date_key',
            'payment_type_key', 'order_id', 'order_status', 'seller_id',
            'order_item_count', 'order_subtotal', 'order_freight_total',
            'order_total_value', 'payment_value', 'payment_installments',
            'delivery_days', 'estimated_delivery_days', 'delivery_delay_days',
            'is_late_delivery', 'is_completed_order', 'review_score',
            'has_review', 'order_purchase_timestamp', 'order_delivered_customer_date',
            'created_at'
        ]
        
        # Filter to only required columns
        available_columns = [col for col in required_columns if col in df.columns]
        df = df[available_columns]
        
        logger.info(f"  - Using {len(available_columns)} columns")
        
        # Load to PostgreSQL in chunks
        logger.info("  - Loading to PostgreSQL in chunks...")
        
        engine = self.db.get_engine()
        chunk_size = 5000  # Smaller chunks for stability
        total_chunks = (len(df) // chunk_size) + 1
        
        loaded_rows = 0
        for i, chunk_start in enumerate(range(0, len(df), chunk_size)):
            chunk = df.iloc[chunk_start:chunk_start + chunk_size]
            
            try:
                chunk.to_sql(
                    'fact_orders',
                    engine,
                    if_exists='append',
                    index=False,
                    method='multi',
                    chunksize=1000
                )
                loaded_rows += len(chunk)
                logger.info(f"    → Loaded chunk {i+1}/{total_chunks} ({loaded_rows:,}/{len(df):,} rows)")
                
            except Exception as e:
                logger.error(f"    ✗ Chunk {i+1} failed: {str(e)[:200]}")
                logger.info(f"    → Attempting row-by-row insert for failed chunk...")
                
                # Try row by row for failed chunk
                for idx, row in chunk.iterrows():
                    try:
                        row.to_frame().T.to_sql(
                            'fact_orders',
                            engine,
                            if_exists='append',
                            index=False
                        )
                        loaded_rows += 1
                    except Exception as row_error:
                        logger.warning(f"    ⚠ Skipped row {idx}: {str(row_error)[:100]}")
                        continue
        
        logger.info(f"  ✓ Loaded {loaded_rows:,} rows into fact_orders")
    
    def _load_fact_cohort_retention(self):
        """Load cohort retention fact table"""
        logger.info("\n[2/2] Loading fact_cohort_retention...")
        
        filepath = os.path.join(self.staging_dir, 'fact_cohort_retention.csv')
        df = pd.read_csv(filepath)
        
        logger.info(f"  - Read {len(df):,} rows from CSV")
        
        # Convert dates
        df['cohort_month'] = pd.to_datetime(df['cohort_month'])
        df['created_at'] = pd.to_datetime(df['created_at'])
        
        # Drop surrogate key
        if 'cohort_retention_key' in df.columns:
            df = df.drop('cohort_retention_key', axis=1)
        
        # Load to PostgreSQL
        engine = self.db.get_engine()
        
        df.to_sql(
            'fact_cohort_retention',
            engine,
            if_exists='append',
            index=False,
            method='multi',
            chunksize=1000
        )
        
        logger.info(f"  ✓ Loaded {len(df):,} rows into fact_cohort_retention")
    
    def _verify_loads(self):
        """Verify fact tables were loaded correctly"""
        logger.info("\nVerifying fact table loads:")
        
        tables = ['fact_orders', 'fact_cohort_retention']
        
        for table in tables:
            query = f"SELECT COUNT(*) FROM {table};"
            result = self.db.fetch_query(query)
            count = result[0][0]
            logger.info(f"  - {table}: {count:,} rows")
    
    def _show_analytics(self):
        """Show sample analytics from loaded data"""
        logger.info("\n" + "="*60)
        logger.info("SAMPLE ANALYTICS FROM DATA WAREHOUSE")
        logger.info("="*60)
        
        # Total revenue
        query = """
        SELECT 
            SUM(order_total_value) as total_revenue,
            AVG(order_total_value) as avg_order_value,
            COUNT(*) as total_orders
        FROM fact_orders
        WHERE is_completed_order = TRUE;
        """
        result = self.db.fetch_query(query)
        if result:
            total_rev, avg_order, total_orders = result[0]
            logger.info(f"\nRevenue Metrics:")
            logger.info(f"  - Total Revenue: R$ {total_rev:,.2f}")
            logger.info(f"  - Average Order Value: R$ {avg_order:.2f}")
            logger.info(f"  - Completed Orders: {total_orders:,}")
        
        # Delivery performance
        query = """
        SELECT 
            COUNT(*) FILTER (WHERE is_late_delivery = TRUE) as late_deliveries,
            COUNT(*) FILTER (WHERE is_late_delivery = FALSE) as on_time_deliveries,
            AVG(delivery_days) as avg_delivery_days
        FROM fact_orders
        WHERE is_completed_order = TRUE;
        """
        result = self.db.fetch_query(query)
        if result:
            late, on_time, avg_days = result[0]
            total = late + on_time
            late_pct = (late / total * 100) if total > 0 else 0
            logger.info(f"\nDelivery Performance:")
            logger.info(f"  - Late Deliveries: {late:,} ({late_pct:.1f}%)")
            logger.info(f"  - On-Time Deliveries: {on_time:,} ({100-late_pct:.1f}%)")
            logger.info(f"  - Average Delivery Time: {avg_days:.1f} days")
        
        # Order status distribution
        query = """
        SELECT 
            order_status,
            COUNT(*) as order_count
        FROM fact_orders
        GROUP BY order_status
        ORDER BY order_count DESC;
        """
        result = self.db.fetch_query(query)
        if result:
            logger.info(f"\nOrder Status Distribution:")
            for status, count in result[:5]:  # Top 5
                logger.info(f"  - {status}: {count:,} orders")
        
        logger.info("\n" + "="*60 + "\n")


# Main execution
if __name__ == "__main__":
    print("\n" + "="*80)
    print("LOADING FACT TABLES INTO POSTGRESQL")
    print("="*80 + "\n")
    
    loader = FactLoader()
    loader.load_all_facts()
    
    print("\n✓ Fact tables loaded successfully!")
    print("\nYour data warehouse is now fully populated!")
    print("\nNext step: Connect Power BI to PostgreSQL for dashboard creation")

```

#### check\_data\_quality.py

* Automated comprehensive quality checks
* Checked referential integrity
* Validated null values
* Tested business logic

```
"""
Data Quality Check Script
Validates data integrity and referential relationships
"""

import sys
import os

sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))

from src.utils.logger import setup_logger
from src.utils.db_connector import DatabaseConnector

logger = setup_logger('data_quality_check')


class DataQualityChecker:
    """Check data quality in the data warehouse"""
    
    def __init__(self):
        """Initialize data quality checker"""
        self.db = DatabaseConnector()
        logger.info("DataQualityChecker initialized")
    
    def run_all_checks(self):
        """Run all data quality checks"""
        try:
            print("\n" + "="*80)
            print("DATA QUALITY REPORT")
            print("="*80 + "\n")
            
            # 1. Row counts
            self._check_row_counts()
            
            # 2. Referential integrity
            self._check_referential_integrity()
            
            # 3. NULL values
            self._check_null_values()
            
            # 4. Data consistency
            self._check_data_consistency()
            
            # 5. Business logic validation
            self._check_business_logic()
            
            print("\n" + "="*80)
            print("✓ DATA QUALITY CHECK COMPLETE")
            print("="*80 + "\n")
            
        except Exception as e:
            logger.error(f"✗ Data quality check failed: {e}")
            raise
        finally:
            self.db.close_pool()
    
    def _check_row_counts(self):
        """Check row counts for all tables"""
        print("[1/5] ROW COUNTS")
        print("-" * 80)
        
        tables = [
            'dim_date',
            'dim_products', 
            'dim_payment_type',
            'dim_customers',
            'fact_orders',
            'fact_cohort_retention'
        ]
        
        for table in tables:
            query = f"SELECT COUNT(*) FROM {table};"
            result = self.db.fetch_query(query)
            count = result[0][0]
            print(f"  {table:25} : {count:>10,} rows")
        
        print()
    
    def _check_referential_integrity(self):
        """Check referential integrity between fact and dimension tables"""
        print("[2/5] REFERENTIAL INTEGRITY")
        print("-" * 80)
        
        # Check customer_key
        query = """
        SELECT COUNT(*) as total_rows,
               COUNT(DISTINCT f.customer_key) as unique_customer_keys,
               COUNT(DISTINCT c.customer_key) as matched_customer_keys,
               COUNT(DISTINCT f.customer_key) - COUNT(DISTINCT c.customer_key) as orphaned_keys
        FROM fact_orders f
        LEFT JOIN dim_customers c ON f.customer_key = c.customer_key;
        """
        result = self.db.fetch_query(query)
        total, unique, matched, orphaned = result[0]
        
        print(f"  Customer Keys:")
        print(f"    - Total orders              : {total:>10,}")
        print(f"    - Unique customer keys      : {unique:>10,}")
        print(f"    - Matched in dim_customers  : {matched:>10,}")
        print(f"    - Orphaned (no match)       : {orphaned:>10,}")
        
        if orphaned > 0:
            print(f"    ⚠ WARNING: {orphaned} customer keys have no matching dimension record")
        else:
            print(f"    ✓ All customer keys are valid")
        
        # Check product_key
        query = """
        SELECT COUNT(*) as total_rows,
               COUNT(DISTINCT f.product_key) as unique_product_keys,
               COUNT(DISTINCT p.product_key) as matched_product_keys,
               COUNT(DISTINCT f.product_key) - COUNT(DISTINCT p.product_key) as orphaned_keys
        FROM fact_orders f
        LEFT JOIN dim_products p ON f.product_key = p.product_key;
        """
        result = self.db.fetch_query(query)
        total, unique, matched, orphaned = result[0]
        
        print(f"\n  Product Keys:")
        print(f"    - Total orders              : {total:>10,}")
        print(f"    - Unique product keys       : {unique:>10,}")
        print(f"    - Matched in dim_products   : {matched:>10,}")
        print(f"    - Orphaned (no match)       : {orphaned:>10,}")
        
        if orphaned > 0:
            print(f"    ⚠ WARNING: {orphaned} product keys have no matching dimension record")
        else:
            print(f"    ✓ All product keys are valid")
        
        # Check date_key
        query = """
        SELECT COUNT(*) as total_rows,
               COUNT(DISTINCT f.order_date_key) as unique_date_keys,
               COUNT(DISTINCT d.date_key) as matched_date_keys,
               COUNT(DISTINCT f.order_date_key) - COUNT(DISTINCT d.date_key) as orphaned_keys
        FROM fact_orders f
        LEFT JOIN dim_date d ON f.order_date_key = d.date_key;
        """
        result = self.db.fetch_query(query)
        total, unique, matched, orphaned = result[0]
        
        print(f"\n  Date Keys:")
        print(f"    - Total orders              : {total:>10,}")
        print(f"    - Unique date keys          : {unique:>10,}")
        print(f"    - Matched in dim_date       : {matched:>10,}")
        print(f"    - Orphaned (no match)       : {orphaned:>10,}")
        
        if orphaned > 0:
            print(f"    ⚠ WARNING: {orphaned} date keys have no matching dimension record")
        else:
            print(f"    ✓ All date keys are valid")
        
        # Check payment_type_key
        query = """
        SELECT COUNT(*) as total_rows,
               COUNT(DISTINCT f.payment_type_key) as unique_payment_keys,
               COUNT(DISTINCT pt.payment_type_key) as matched_payment_keys,
               COUNT(DISTINCT f.payment_type_key) - COUNT(DISTINCT pt.payment_type_key) as orphaned_keys
        FROM fact_orders f
        LEFT JOIN dim_payment_type pt ON f.payment_type_key = pt.payment_type_key;
        """
        result = self.db.fetch_query(query)
        total, unique, matched, orphaned = result[0]
        
        print(f"\n  Payment Type Keys:")
        print(f"    - Total orders              : {total:>10,}")
        print(f"    - Unique payment keys       : {unique:>10,}")
        print(f"    - Matched in dim_payment    : {matched:>10,}")
        print(f"    - Orphaned (no match)       : {orphaned:>10,}")
        
        if orphaned > 0:
            print(f"    ⚠ WARNING: {orphaned} payment keys have no matching dimension record")
        else:
            print(f"    ✓ All payment keys are valid")
        
        print()
    
    def _check_null_values(self):
        """Check for NULL values in critical fields"""
        print("[3/5] NULL VALUE CHECKS")
        print("-" * 80)
        
        # Check fact_orders
        query = """
        SELECT 
            COUNT(*) FILTER (WHERE customer_key IS NULL) as null_customer_keys,
            COUNT(*) FILTER (WHERE product_key IS NULL) as null_product_keys,
            COUNT(*) FILTER (WHERE order_date_key IS NULL) as null_order_date_keys,
            COUNT(*) FILTER (WHERE payment_type_key IS NULL) as null_payment_keys,
            COUNT(*) FILTER (WHERE order_id IS NULL) as null_order_ids,
            COUNT(*) FILTER (WHERE order_total_value IS NULL) as null_order_values,
            COUNT(*) as total_rows
        FROM fact_orders;
        """
        result = self.db.fetch_query(query)
        null_cust, null_prod, null_date, null_pay, null_id, null_val, total = result[0]
        
        print(f"  fact_orders (Total: {total:,} rows):")
        
        checks = [
            ("customer_key", null_cust),
            ("product_key", null_prod),
            ("order_date_key", null_date),
            ("payment_type_key", null_pay),
            ("order_id", null_id),
            ("order_total_value", null_val)
        ]
        
        has_nulls = False
        for field, null_count in checks:
            if null_count > 0:
                pct = (null_count / total * 100) if total > 0 else 0
                print(f"    ⚠ {field:25} : {null_count:>7,} NULLs ({pct:.2f}%)")
                has_nulls = True
        
        if not has_nulls:
            print(f"    ✓ No NULL values in critical fields")
        
        # Check dim_customers
        query = """
        SELECT 
            COUNT(*) FILTER (WHERE customer_id IS NULL) as null_customer_ids,
            COUNT(*) FILTER (WHERE customer_segment IS NULL) as null_segments,
            COUNT(*) as total_rows
        FROM dim_customers;
        """
        result = self.db.fetch_query(query)
        null_id, null_seg, total = result[0]
        
        print(f"\n  dim_customers (Total: {total:,} rows):")
        if null_id > 0 or null_seg > 0:
            if null_id > 0:
                print(f"    ⚠ customer_id                : {null_id:>7,} NULLs")
            if null_seg > 0:
                print(f"    ⚠ customer_segment           : {null_seg:>7,} NULLs")
        else:
            print(f"    ✓ No NULL values in critical fields")
        
        print()
    
    def _check_data_consistency(self):
        """Check data consistency and logical errors"""
        print("[4/5] DATA CONSISTENCY")
        print("-" * 80)
        
        # Check negative values
        query = """
        SELECT 
            COUNT(*) FILTER (WHERE order_total_value < 0) as negative_values,
            COUNT(*) FILTER (WHERE order_total_value = 0) as zero_values,
            COUNT(*) FILTER (WHERE delivery_days < 0) as negative_delivery_days
        FROM fact_orders;
        """
        result = self.db.fetch_query(query)
        neg_val, zero_val, neg_days = result[0]
        
        print(f"  Numeric Validity:")
        if neg_val > 0:
            print(f"    ⚠ Negative order values      : {neg_val:>7,} rows")
        else:
            print(f"    ✓ No negative order values")
        
        if zero_val > 0:
            print(f"    ⚠ Zero order values          : {zero_val:>7,} rows")
        else:
            print(f"    ✓ No zero order values")
        
        if neg_days > 0:
            print(f"    ⚠ Negative delivery days     : {neg_days:>7,} rows")
        else:
            print(f"    ✓ No negative delivery days")
        
        # Check duplicates
        query = """
        SELECT COUNT(*) as duplicate_count
        FROM (
            SELECT order_id, COUNT(*) as cnt
            FROM fact_orders
            GROUP BY order_id
            HAVING COUNT(*) > 1
        ) duplicates;
        """
        result = self.db.fetch_query(query)
        dup_count = result[0][0]
        
        print(f"\n  Uniqueness:")
        if dup_count > 0:
            print(f"    ⚠ Duplicate order_ids        : {dup_count:>7,} duplicates")
        else:
            print(f"    ✓ All order_ids are unique")
        
        print()
    
    def _check_business_logic(self):
        """Check business logic validation"""
        print("[5/5] BUSINESS LOGIC VALIDATION")
        print("-" * 80)
        
        # Check order status distribution
        query = """
        SELECT 
            order_status,
            COUNT(*) as count,
            ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as percentage
        FROM fact_orders
        GROUP BY order_status
        ORDER BY count DESC;
        """
        result = self.db.fetch_query(query)
        
        print(f"  Order Status Distribution:")
        for status, count, pct in result:
            print(f"    - {status:20} : {count:>10,} ({pct:>5.2f}%)")
        
        # Check completed orders metrics
        query = """
        SELECT 
            COUNT(*) as total_orders,
            COUNT(*) FILTER (WHERE is_completed_order = TRUE) as completed_orders,
            COUNT(*) FILTER (WHERE is_late_delivery = TRUE) as late_deliveries,
            AVG(delivery_days) as avg_delivery_days,
            AVG(order_total_value) as avg_order_value
        FROM fact_orders;
        """
        result = self.db.fetch_query(query)
        total, completed, late, avg_days, avg_value = result[0]
        
        completion_rate = (completed / total * 100) if total > 0 else 0
        late_rate = (late / total * 100) if total > 0 else 0
        
        print(f"\n  Order Metrics:")
        print(f"    - Total orders               : {total:>10,}")
        print(f"    - Completed orders           : {completed:>10,} ({completion_rate:.2f}%)")
        print(f"    - Late deliveries            : {late:>10,} ({late_rate:.2f}%)")
        print(f"    - Average delivery days      : {avg_days:>10.1f} days")
        print(f"    - Average order value        : R$ {avg_value:>10,.2f}")
        
        # Check customer segments
        query = """
        SELECT 
            customer_segment,
            COUNT(*) as customer_count,
            AVG(lifetime_value) as avg_clv
        FROM dim_customers
        GROUP BY customer_segment
        ORDER BY customer_count DESC;
        """
        result = self.db.fetch_query(query)
        
        print(f"\n  Customer Segments:")
        for segment, count, avg_clv in result:
            print(f"    - {segment:20} : {count:>7,} customers (Avg CLV: R$ {avg_clv:,.2f})")
        
        print()


# Main execution
if __name__ == "__main__":
    checker = DataQualityChecker()
    checker.run_all_checks()

```

#### Skills applied

* Chunked data loading
* Foreign key validation
* Referential integrity enforcement
* Comprehensive data quality checks
* SQL aggregate functions
* Error handling and recovery
* Performance monitoring
* Business logic validation

## Power BI dashboard creation

* Configured star schema relationships in data model
* Created DAX calculated measures
* Built 4 interactive dashboard pages with visualizations
* Implemented slicers and cross-filtering
* Applied professional formatting and theme
* Exported dashboard for portfolio

With the ETL process has finally finished, and the data loaded into the database, it is time to create the Power BI dashboard. This dashboard will have four pages. The first one is an executive-level summary page, the second page is about sales performance, the third page is about customer analytics, and the fourth page is about product analysis.

<figure><img src="https://539050446-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FW65tN1jweulcjF26J7LM%2Fuploads%2FLMh23pQs4Bpxye3HeQVA%2FScreenshot%202026-01-29%20142446.png?alt=media&#x26;token=726759fa-8fef-422a-89c1-24135631e40f" alt=""><figcaption></figcaption></figure>

> Page 1 - Executive Summary

<figure><img src="https://539050446-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FW65tN1jweulcjF26J7LM%2Fuploads%2FnaWQo9LmoZJGldL0TaO2%2FScreenshot%202026-01-29%20145036.png?alt=media&#x26;token=c40ee52d-a581-4c9e-95f9-2873c7fa0edc" alt=""><figcaption></figcaption></figure>

> Page 2 - Sales Performance

<figure><img src="https://539050446-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FW65tN1jweulcjF26J7LM%2Fuploads%2F8z8YqgrlssJ57LY3D2u5%2FScreenshot%202026-02-02%20162047.png?alt=media&#x26;token=0ae94aab-f2ac-47da-8c8d-a5f61996f345" alt=""><figcaption></figcaption></figure>

> Page 3 - Customer Analytics

<figure><img src="https://539050446-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FW65tN1jweulcjF26J7LM%2Fuploads%2FahA17kh2VI4hkMjZDPRV%2FScreenshot%202026-01-29%20150956.png?alt=media&#x26;token=bfc28b51-0fff-4caf-8081-941ab407a30a" alt=""><figcaption></figcaption></figure>

> Page 4 - Product Analytics

#### DAX meaures used for this dashboard

We have also created a measure table using DAX.

**Created Measures Table**

```
_Measures = ROW("Measure", 1)
```

**Revenue Measures**

```
Total Revenue = SUM(fact_orders[order_total_value])

Total Revenue (Completed) = 
CALCULATE(
    SUM(fact_orders[order_total_value]),
    fact_orders[is_completed_order] = TRUE
)

Average Order Value = AVERAGE(fact_orders[order_total_value])
```

**Order Measures**

```
Total Orders = COUNTROWS(fact_orders)

Completed Orders = 
CALCULATE(
    COUNTROWS(fact_orders),
    fact_orders[is_completed_order] = TRUE
)

Order Completion Rate = 
DIVIDE([Completed Orders], [Total Orders], 0)
```

**Customer Measures**

```
Total Customers = 
DISTINCTCOUNT(fact_orders[customer_key])

Average Customer Lifetime Value = 
AVERAGE(dim_customers[lifetime_value])

Repeat Customer Rate = 
VAR AllCustomers = CALCULATE(COUNTROWS(dim_customers), ALL(dim_customers))
VAR RepeatCustomers = CALCULATE(COUNTROWS(dim_customers), dim_customers[total_orders] > 1, ALL(dim_customers))
RETURN
    DIVIDE(RepeatCustomers, AllCustomers, 0)

```

**Delivery Measures**

```
Average Delivery Days = 
AVERAGE(fact_orders[delivery_days])

Late Deliveries = 
CALCULATE(
    COUNTROWS(fact_orders),
    fact_orders[is_late_delivery] = TRUE
)

Late Delivery Rate = 
DIVIDE([Late Deliveries], [Completed Orders], 0)

On-Time Delivery Rate = 
1 - [Late Delivery Rate]

```

**Product Measures**

```
Total Products Sold = SUM(fact_orders[order_item_count])

Unique Products = DISTINCTCOUNT(fact_orders[product_key])
```

## Conclusion - Answering business questions

At the beginning of this project, we have defined two business questions to be investigated using our Power BI dashboard:

1. **"How can we improve customer retention and identify high-value customer segments?"**
2. **"What are our key revenue drivers and how can we optimize operational performance?"**

#### First question

Here are the key figures related to answering the first business question:

* **Total unique customers in database:** 99441
* **Average lifetime value per customer:** $58.15K
* **Percentage of customers who made multiple purchases:** None, all customer only made one purchase
* **First-time purchasers:** 99441

Customer Segmentation & Value Distribution:

* VIP customer (99.2% of base) generate a near 100% of total revenue with average CLV of around R$ 58000.
* Top 10 customers alone have CLVs ranging from R$1M to R$4M

As VIP customers are the dominating majority of the customer base, this serves as a strong evidence of the great perforamnce this business did in retaining their customers.

Additionally, as indicated in the map visuals on Executive Summary page, most customers are from the southeast region of Brazil.

#### Second question

The e-commerce platform generated R$ 15.42M in total revenue over a 3-year period (2016-2018), with 96K orders from 99K unique customers. The average order value of R$ 159.33 and 97.02% order completion rate indicate healthy operational performance.

The platform experienced a tremendous growth from 2016 to 2017, followed by stabilization in 2018. This pattern suggests market saturation and indicates need for customer acquisition strategies.

The top 3 categories in terms of sales, in order, are 1. home & furniture, 2. fashion & beauty, and 3. sports & leisure, which takes up 63.63% of the total revenue. By doing a deep dive into the analysis of these sold products, we discoverd that home & furniture earns the least on average (R$151.58) among all three categories, but sells out the most amount of products. Sports & leisure products sold less than home & furniture products - in fact they sold out the least amount of products, with similar revenue in average (R$152.34). Fashion & beauty, although did not sold as much product as home & furniture, sells at a much higher revenue in average (R$ 172.54).

Business operates well in general, with 97% order completion rate, and orders took around 11 days in average to deliver and a 93.2% on-time delivery rate, suggesting a near optimized operation. Delivery time may need certain level of attention, as the on-time delivery rate decreased slightly year by year, from 98.9% in 2016 to 94.4% in 2017, and 92.3% in 2018. This may have caused by expanded scope of the operation or change in delivery method.
