Friday, 31 October 2025

Implementing Data Vault 2.0 Warehouses with PostgreSQL

 

Introduction

Data Vault 2.0 is a modern data warehousing methodology designed to handle the challenges of today's agile, complex data environments. Combining the flexibility of the original Data Vault architecture with Big Data capabilities and real-time processing, it offers a robust framework for building enterprise data warehouses.

In this comprehensive guide, we'll walk through implementing a Data Vault 2.0 warehouse using PostgreSQL, covering core concepts, practical implementation steps, and best practices.

What is Data Vault 2.0?

Data Vault 2.0 is an evolution of the original Data Vault methodology created by Dan Linstedt. It's designed to be:

  • Agile and scalable: Adapts easily to changing business requirements
  • Auditable: Maintains complete historical tracking
  • Flexible: Accommodates multiple source systems without restructuring
  • Insert-only: No updates or deletes in core tables, preserving data lineage

Core Components

Data Vault 2.0 consists of three primary entity types:

  1. Hubs: Store unique business keys and metadata
  2. Links: Represent relationships between business keys
  3. Satellites: Contain descriptive attributes and context

Why PostgreSQL for Data Vault 2.0?

PostgreSQL is an excellent choice for Data Vault implementations due to:

  • ACID compliance: Ensures data integrity
  • Advanced indexing: B-tree, hash, GiST, and GIN indexes for performance
  • Partitioning: Native table partitioning for managing large datasets
  • JSON support: Handles semi-structured data alongside relational
  • Extensibility: Custom functions, operators, and data types
  • Cost-effective: Open-source with enterprise features

Step 1: Setting Up Your PostgreSQL Environment

Database Configuration

First, create a dedicated database for your Data Vault warehouse:

-- Create the Data Vault database
CREATE DATABASE datavault_dw
    WITH 
    ENCODING = 'UTF8'
    LC_COLLATE = 'en_US.UTF-8'
    LC_CTYPE = 'en_US.UTF-8'
    TEMPLATE = template0;

-- Connect to the database
\c datavault_dw

-- Create schemas for organizational separation
CREATE SCHEMA IF NOT EXISTS raw_vault;
CREATE SCHEMA IF NOT EXISTS business_vault;
CREATE SCHEMA IF NOT EXISTS information_mart;
CREATE SCHEMA IF NOT EXISTS staging;

-- Set search path
SET search_path TO raw_vault, public;

Performance Optimization Settings

Configure PostgreSQL for optimal Data Vault performance:

-- Adjust PostgreSQL configuration (add to postgresql.conf)
-- shared_buffers = 256MB  -- Adjust based on available RAM
-- effective_cache_size = 1GB
-- maintenance_work_mem = 64MB
-- checkpoint_completion_target = 0.9
-- wal_buffers = 16MB
-- default_statistics_target = 100
-- random_page_cost = 1.1  -- For SSDs

Step 2: Defining Standard Metadata Columns

Data Vault 2.0 requires specific metadata columns for audit and tracking:

-- Create a function to generate standard metadata columns
CREATE OR REPLACE FUNCTION raw_vault.get_metadata_columns()
RETURNS TABLE (
    column_definition TEXT
) AS $$
BEGIN
    RETURN QUERY SELECT unnest(ARRAY[
        'load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP',
        'record_source VARCHAR(50) NOT NULL',
        'load_batch_id BIGINT',
        'cdc_operation CHAR(1)' -- I=Insert, U=Update, D=Delete
    ]);
END;
$$ LANGUAGE plpgsql;

Step 3: Implementing Hubs

Hubs store business keys - the unique identifiers from source systems.

Hub Structure

-- Example: Customer Hub
CREATE TABLE raw_vault.hub_customer (
    customer_hkey CHAR(32) PRIMARY KEY,  -- MD5 hash of business key
    customer_id VARCHAR(50) NOT NULL,     -- Business key
    load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    record_source VARCHAR(50) NOT NULL,
    UNIQUE(customer_id)
);

CREATE INDEX idx_hub_customer_loaddate ON raw_vault.hub_customer(load_date);

-- Example: Product Hub
CREATE TABLE raw_vault.hub_product (
    product_hkey CHAR(32) PRIMARY KEY,
    product_code VARCHAR(50) NOT NULL,
    load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    record_source VARCHAR(50) NOT NULL,
    UNIQUE(product_code)
);

CREATE INDEX idx_hub_product_loaddate ON raw_vault.hub_product(load_date);

Hub Hash Key Generation

Create a function to generate consistent hash keys:

-- Hash key generation function
CREATE OR REPLACE FUNCTION raw_vault.generate_hash_key(
    p_business_keys TEXT[]
)
RETURNS CHAR(32) AS $$
DECLARE
    v_concatenated TEXT;
BEGIN
    -- Concatenate business keys with delimiter
    v_concatenated := array_to_string(p_business_keys, '||');
    
    -- Convert to uppercase and trim for consistency
    v_concatenated := UPPER(TRIM(v_concatenated));
    
    -- Return MD5 hash
    RETURN MD5(v_concatenated);
END;
$$ LANGUAGE plpgsql IMMUTABLE;

-- Usage example
SELECT raw_vault.generate_hash_key(ARRAY['CUST-12345']);

Step 4: Implementing Links

Links capture relationships between business entities.

Link Structure

-- Example: Customer-Product Order Link
CREATE TABLE raw_vault.link_customer_order_product (
    cop_link_hkey CHAR(32) PRIMARY KEY,
    customer_hkey CHAR(32) NOT NULL,
    order_hkey CHAR(32) NOT NULL,
    product_hkey CHAR(32) NOT NULL,
    load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    record_source VARCHAR(50) NOT NULL,
    FOREIGN KEY (customer_hkey) REFERENCES raw_vault.hub_customer(customer_hkey),
    FOREIGN KEY (product_hkey) REFERENCES raw_vault.hub_product(product_hkey)
);

CREATE INDEX idx_link_cop_customer ON raw_vault.link_customer_order_product(customer_hkey);
CREATE INDEX idx_link_cop_product ON raw_vault.link_customer_order_product(product_hkey);
CREATE INDEX idx_link_cop_loaddate ON raw_vault.link_customer_order_product(load_date);

-- Link hash key is generated from all participating hub keys
-- link_hkey = MD5(customer_hkey || order_hkey || product_hkey)

Creating Links with Hash Keys

-- Function to create link hash keys
CREATE OR REPLACE FUNCTION raw_vault.generate_link_hash_key(
    p_hub_keys TEXT[]
)
RETURNS CHAR(32) AS $$
DECLARE
    v_concatenated TEXT;
BEGIN
    -- Sort the hub keys for consistency
    v_concatenated := array_to_string(
        (SELECT array_agg(x ORDER BY x) FROM unnest(p_hub_keys) x),
        '||'
    );
    
    RETURN MD5(v_concatenated);
END;
$$ LANGUAGE plpgsql IMMUTABLE;

Step 5: Implementing Satellites

Satellites contain descriptive attributes and track changes over time.

Satellite Structure

-- Example: Customer Satellite (descriptive data)
CREATE TABLE raw_vault.sat_customer_details (
    customer_hkey CHAR(32) NOT NULL,
    load_date TIMESTAMP NOT NULL,
    load_end_date TIMESTAMP,
    record_source VARCHAR(50) NOT NULL,
    hash_diff CHAR(32) NOT NULL,  -- Hash of all descriptive attributes
    -- Descriptive attributes
    customer_name VARCHAR(200),
    email VARCHAR(100),
    phone VARCHAR(20),
    address VARCHAR(500),
    city VARCHAR(100),
    state VARCHAR(50),
    zip_code VARCHAR(20),
    country VARCHAR(100),
    PRIMARY KEY (customer_hkey, load_date),
    FOREIGN KEY (customer_hkey) REFERENCES raw_vault.hub_customer(customer_hkey)
);

CREATE INDEX idx_sat_customer_details_hkey ON raw_vault.sat_customer_details(customer_hkey);
CREATE INDEX idx_sat_customer_details_loaddate ON raw_vault.sat_customer_details(load_date);
CREATE INDEX idx_sat_customer_details_hashdiff ON raw_vault.sat_customer_details(hash_diff);

-- Example: Link Satellite (contextual data about relationship)
CREATE TABLE raw_vault.sat_customer_order_product (
    cop_link_hkey CHAR(32) NOT NULL,
    load_date TIMESTAMP NOT NULL,
    load_end_date TIMESTAMP,
    record_source VARCHAR(50) NOT NULL,
    hash_diff CHAR(32) NOT NULL,
    -- Contextual attributes
    order_date DATE,
    quantity INTEGER,
    unit_price NUMERIC(10,2),
    total_amount NUMERIC(12,2),
    discount_percent NUMERIC(5,2),
    status VARCHAR(50),
    PRIMARY KEY (cop_link_hkey, load_date),
    FOREIGN KEY (cop_link_hkey) REFERENCES raw_vault.link_customer_order_product(cop_link_hkey)
);

CREATE INDEX idx_sat_cop_link ON raw_vault.sat_customer_order_product(cop_link_hkey);
CREATE INDEX idx_sat_cop_loaddate ON raw_vault.sat_customer_order_product(load_date);

Hash Diff Generation

The hash diff detects changes in descriptive attributes:

-- Function to generate hash diff
CREATE OR REPLACE FUNCTION raw_vault.generate_hash_diff(
    p_columns TEXT[]
)
RETURNS CHAR(32) AS $$
DECLARE
    v_concatenated TEXT;
BEGIN
    -- Concatenate all column values
    v_concatenated := array_to_string(p_columns, '||');
    
    -- Handle NULLs consistently
    v_concatenated := COALESCE(v_concatenated, '');
    
    -- Convert to uppercase for consistency
    v_concatenated := UPPER(v_concatenated);
    
    RETURN MD5(v_concatenated);
END;
$$ LANGUAGE plpgsql IMMUTABLE;

Step 6: Loading Data into Data Vault

Staging Area

Create a staging layer for incoming data:

-- Staging table for customer data
CREATE TABLE staging.stg_customer (
    customer_id VARCHAR(50),
    customer_name VARCHAR(200),
    email VARCHAR(100),
    phone VARCHAR(20),
    address VARCHAR(500),
    city VARCHAR(100),
    state VARCHAR(50),
    zip_code VARCHAR(20),
    country VARCHAR(100),
    load_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    source_system VARCHAR(50)
);

Loading Hubs

-- Load Hub from staging
INSERT INTO raw_vault.hub_customer (
    customer_hkey,
    customer_id,
    load_date,
    record_source
)
SELECT DISTINCT
    raw_vault.generate_hash_key(ARRAY[customer_id]),
    customer_id,
    CURRENT_TIMESTAMP,
    source_system
FROM staging.stg_customer stg
WHERE NOT EXISTS (
    SELECT 1 
    FROM raw_vault.hub_customer hub
    WHERE hub.customer_id = stg.customer_id
);

Loading Satellites with Change Detection

-- Load Satellite with change detection using hash diff
WITH new_records AS (
    SELECT
        raw_vault.generate_hash_key(ARRAY[stg.customer_id]) as customer_hkey,
        CURRENT_TIMESTAMP as load_date,
        stg.source_system as record_source,
        raw_vault.generate_hash_diff(ARRAY[
            COALESCE(stg.customer_name, ''),
            COALESCE(stg.email, ''),
            COALESCE(stg.phone, ''),
            COALESCE(stg.address, ''),
            COALESCE(stg.city, ''),
            COALESCE(stg.state, ''),
            COALESCE(stg.zip_code, ''),
            COALESCE(stg.country, '')
        ]) as hash_diff,
        stg.customer_name,
        stg.email,
        stg.phone,
        stg.address,
        stg.city,
        stg.state,
        stg.zip_code,
        stg.country
    FROM staging.stg_customer stg
),
current_satellites AS (
    SELECT DISTINCT ON (customer_hkey)
        customer_hkey,
        hash_diff
    FROM raw_vault.sat_customer_details
    ORDER BY customer_hkey, load_date DESC
)
INSERT INTO raw_vault.sat_customer_details (
    customer_hkey,
    load_date,
    record_source,
    hash_diff,
    customer_name,
    email,
    phone,
    address,
    city,
    state,
    zip_code,
    country
)
SELECT
    nr.customer_hkey,
    nr.load_date,
    nr.record_source,
    nr.hash_diff,
    nr.customer_name,
    nr.email,
    nr.phone,
    nr.address,
    nr.city,
    nr.state,
    nr.zip_code,
    nr.country
FROM new_records nr
LEFT JOIN current_satellites cs
    ON nr.customer_hkey = cs.customer_hkey
WHERE cs.customer_hkey IS NULL  -- New customer
   OR cs.hash_diff != nr.hash_diff;  -- Changed data

Loading Links

-- Load Link table
INSERT INTO raw_vault.link_customer_order_product (
    cop_link_hkey,
    customer_hkey,
    order_hkey,
    product_hkey,
    load_date,
    record_source
)
SELECT DISTINCT
    raw_vault.generate_link_hash_key(ARRAY[
        raw_vault.generate_hash_key(ARRAY[stg.customer_id]),
        raw_vault.generate_hash_key(ARRAY[stg.order_id]),
        raw_vault.generate_hash_key(ARRAY[stg.product_code])
    ]),
    raw_vault.generate_hash_key(ARRAY[stg.customer_id]),
    raw_vault.generate_hash_key(ARRAY[stg.order_id]),
    raw_vault.generate_hash_key(ARRAY[stg.product_code]),
    CURRENT_TIMESTAMP,
    stg.source_system
FROM staging.stg_order_details stg
WHERE NOT EXISTS (
    SELECT 1
    FROM raw_vault.link_customer_order_product lnk
    WHERE lnk.cop_link_hkey = raw_vault.generate_link_hash_key(ARRAY[
        raw_vault.generate_hash_key(ARRAY[stg.customer_id]),
        raw_vault.generate_hash_key(ARRAY[stg.order_id]),
        raw_vault.generate_hash_key(ARRAY[stg.product_code])
    ])
);

Step 7: Creating Business Vault Objects

Business Vault extends the Raw Vault with computed attributes and business rules.

Business Vault Example

-- Business vault schema
CREATE SCHEMA IF NOT EXISTS business_vault;

-- Computed satellite example: Customer lifetime value
CREATE TABLE business_vault.sat_customer_metrics (
    customer_hkey CHAR(32) NOT NULL,
    load_date TIMESTAMP NOT NULL,
    load_end_date TIMESTAMP,
    -- Computed metrics
    total_orders INTEGER,
    total_spent NUMERIC(15,2),
    average_order_value NUMERIC(12,2),
    lifetime_value NUMERIC(15,2),
    customer_segment VARCHAR(50),
    PRIMARY KEY (customer_hkey, load_date)
);

-- Population query
INSERT INTO business_vault.sat_customer_metrics
SELECT
    h.customer_hkey,
    CURRENT_TIMESTAMP as load_date,
    NULL as load_end_date,
    COUNT(DISTINCT l.order_hkey) as total_orders,
    SUM(s.total_amount) as total_spent,
    AVG(s.total_amount) as average_order_value,
    SUM(s.total_amount) as lifetime_value,
    CASE
        WHEN SUM(s.total_amount) >= 10000 THEN 'Premium'
        WHEN SUM(s.total_amount) >= 5000 THEN 'Gold'
        WHEN SUM(s.total_amount) >= 1000 THEN 'Silver'
        ELSE 'Bronze'
    END as customer_segment
FROM raw_vault.hub_customer h
LEFT JOIN raw_vault.link_customer_order_product l
    ON h.customer_hkey = l.customer_hkey
LEFT JOIN raw_vault.sat_customer_order_product s
    ON l.cop_link_hkey = s.cop_link_hkey
GROUP BY h.customer_hkey;

Step 8: Building Information Marts

Information Marts are dimensional models built on top of the Data Vault for reporting.

Point-in-Time (PIT) Tables

PIT tables provide a snapshot of data at specific points in time:

-- Create PIT table
CREATE TABLE information_mart.pit_customer (
    customer_hkey CHAR(32) NOT NULL,
    snapshot_date DATE NOT NULL,
    sat_details_loaddate TIMESTAMP,
    sat_metrics_loaddate TIMESTAMP,
    PRIMARY KEY (customer_hkey, snapshot_date)
);

-- Populate PIT table
INSERT INTO information_mart.pit_customer
SELECT
    h.customer_hkey,
    d.snapshot_date,
    (SELECT MAX(load_date)
     FROM raw_vault.sat_customer_details s
     WHERE s.customer_hkey = h.customer_hkey
     AND s.load_date <= d.snapshot_date) as sat_details_loaddate,
    (SELECT MAX(load_date)
     FROM business_vault.sat_customer_metrics m
     WHERE m.customer_hkey = h.customer_hkey
     AND m.load_date <= d.snapshot_date) as sat_metrics_loaddate
FROM raw_vault.hub_customer h
CROSS JOIN (
    SELECT DISTINCT load_date::DATE as snapshot_date
    FROM raw_vault.sat_customer_details
) d;

Dimension Table

-- Customer dimension for reporting
CREATE TABLE information_mart.dim_customer (
    customer_key SERIAL PRIMARY KEY,
    customer_hkey CHAR(32) NOT NULL,
    customer_id VARCHAR(50),
    customer_name VARCHAR(200),
    email VARCHAR(100),
    phone VARCHAR(20),
    full_address TEXT,
    city VARCHAR(100),
    state VARCHAR(50),
    country VARCHAR(100),
    customer_segment VARCHAR(50),
    lifetime_value NUMERIC(15,2),
    valid_from TIMESTAMP,
    valid_to TIMESTAMP,
    is_current BOOLEAN
);

-- Populate dimension with current view
CREATE OR REPLACE VIEW information_mart.vw_dim_customer_current AS
SELECT
    h.customer_hkey,
    h.customer_id,
    sd.customer_name,
    sd.email,
    sd.phone,
    sd.address || ', ' || sd.city || ', ' || sd.state || ' ' || sd.zip_code as full_address,
    sd.city,
    sd.state,
    sd.country,
    bm.customer_segment,
    bm.lifetime_value,
    sd.load_date as valid_from,
    sd.load_end_date as valid_to,
    CASE WHEN sd.load_end_date IS NULL THEN TRUE ELSE FALSE END as is_current
FROM raw_vault.hub_customer h
INNER JOIN LATERAL (
    SELECT *
    FROM raw_vault.sat_customer_details s
    WHERE s.customer_hkey = h.customer_hkey
    ORDER BY s.load_date DESC
    LIMIT 1
) sd ON TRUE
LEFT JOIN LATERAL (
    SELECT *
    FROM business_vault.sat_customer_metrics m
    WHERE m.customer_hkey = h.customer_hkey
    ORDER BY m.load_date DESC
    LIMIT 1
) bm ON TRUE;

Step 9: Implementing Data Quality Rules

Data Quality Satellites

-- Data quality satellite
CREATE TABLE raw_vault.sat_customer_dq (
    customer_hkey CHAR(32) NOT NULL,
    load_date TIMESTAMP NOT NULL,
    record_source VARCHAR(50) NOT NULL,
    dq_score INTEGER,  -- 0-100
    email_valid BOOLEAN,
    phone_valid BOOLEAN,
    address_complete BOOLEAN,
    duplicate_suspect BOOLEAN,
    PRIMARY KEY (customer_hkey, load_date)
);

-- Data quality check function
CREATE OR REPLACE FUNCTION raw_vault.check_customer_data_quality(
    p_customer_hkey CHAR(32)
)
RETURNS TABLE (
    dq_score INTEGER,
    email_valid BOOLEAN,
    phone_valid BOOLEAN,
    address_complete BOOLEAN
) AS $$
BEGIN
    RETURN QUERY
    SELECT
        CASE
            WHEN sd.email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' THEN 25 ELSE 0
        END +
        CASE
            WHEN sd.phone ~* '^\+?[0-9]{10,15}$' THEN 25 ELSE 0
        END +
        CASE
            WHEN sd.address IS NOT NULL AND sd.city IS NOT NULL 
                 AND sd.state IS NOT NULL AND sd.zip_code IS NOT NULL THEN 50 ELSE 0
        END as dq_score,
        sd.email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' as email_valid,
        sd.phone ~* '^\+?[0-9]{10,15}$' as phone_valid,
        (sd.address IS NOT NULL AND sd.city IS NOT NULL 
         AND sd.state IS NOT NULL AND sd.zip_code IS NOT NULL) as address_complete
    FROM raw_vault.sat_customer_details sd
    WHERE sd.customer_hkey = p_customer_hkey
    ORDER BY sd.load_date DESC
    LIMIT 1;
END;
$$ LANGUAGE plpgsql;

Step 10: Performance Optimization

Partitioning Large Tables

-- Partition satellite by load date
CREATE TABLE raw_vault.sat_customer_details_partitioned (
    customer_hkey CHAR(32) NOT NULL,
    load_date TIMESTAMP NOT NULL,
    load_end_date TIMESTAMP,
    record_source VARCHAR(50) NOT NULL,
    hash_diff CHAR(32) NOT NULL,
    customer_name VARCHAR(200),
    email VARCHAR(100),
    phone VARCHAR(20),
    address VARCHAR(500),
    city VARCHAR(100),
    state VARCHAR(50),
    zip_code VARCHAR(20),
    country VARCHAR(100),
    PRIMARY KEY (customer_hkey, load_date)
) PARTITION BY RANGE (load_date);

-- Create partitions
CREATE TABLE raw_vault.sat_customer_details_2024_q1
    PARTITION OF raw_vault.sat_customer_details_partitioned
    FOR VALUES FROM ('2024-01-01') TO ('2024-04-01');

CREATE TABLE raw_vault.sat_customer_details_2024_q2
    PARTITION OF raw_vault.sat_customer_details_partitioned
    FOR VALUES FROM ('2024-04-01') TO ('2024-07-01');

-- Continue creating partitions as needed

Indexing Strategy

-- Composite indexes for common query patterns
CREATE INDEX idx_sat_customer_hkey_loaddate 
    ON raw_vault.sat_customer_details(customer_hkey, load_date DESC);

-- Partial index for current records
CREATE INDEX idx_sat_customer_current 
    ON raw_vault.sat_customer_details(customer_hkey) 
    WHERE load_end_date IS NULL;

-- BRIN index for large sequential tables
CREATE INDEX idx_sat_customer_loaddate_brin 
    ON raw_vault.sat_customer_details 
    USING BRIN(load_date);

Materialized Views

-- Materialized view for current state
CREATE MATERIALIZED VIEW information_mart.mv_customer_current AS
SELECT
    h.customer_hkey,
    h.customer_id,
    sd.customer_name,
    sd.email,
    sd.phone,
    sd.city,
    sd.state,
    sd.country,
    bm.customer_segment,
    bm.lifetime_value,
    bm.total_orders
FROM raw_vault.hub_customer h
INNER JOIN LATERAL (
    SELECT *
    FROM raw_vault.sat_customer_details s
    WHERE s.customer_hkey = h.customer_hkey
    ORDER BY s.load_date DESC
    LIMIT 1
) sd ON TRUE
LEFT JOIN LATERAL (
    SELECT *
    FROM business_vault.sat_customer_metrics m
    WHERE m.customer_hkey = h.customer_hkey
    ORDER BY m.load_date DESC
    LIMIT 1
) bm ON TRUE;

CREATE UNIQUE INDEX idx_mv_customer_current_hkey 
    ON information_mart.mv_customer_current(customer_hkey);

-- Refresh strategy
CREATE OR REPLACE FUNCTION information_mart.refresh_customer_current()
RETURNS void AS $$
BEGIN
    REFRESH MATERIALIZED VIEW CONCURRENTLY information_mart.mv_customer_current;
END;
$$ LANGUAGE plpgsql;

Step 11: Automation and Maintenance

Automated ETL Procedure

-- Master ETL procedure
CREATE OR REPLACE PROCEDURE raw_vault.execute_etl_load(
    p_batch_id BIGINT,
    p_source_system VARCHAR(50)
)
LANGUAGE plpgsql
AS $$
BEGIN
    -- Step 1: Load Hubs
    PERFORM raw_vault.load_customer_hub(p_batch_id, p_source_system);
    PERFORM raw_vault.load_product_hub(p_batch_id, p_source_system);
    
    -- Step 2: Load Links
    PERFORM raw_vault.load_customer_order_product_link(p_batch_id, p_source_system);
    
    -- Step 3: Load Satellites
    PERFORM raw_vault.load_customer_details_sat(p_batch_id, p_source_system);
    PERFORM raw_vault.load_order_details_sat(p_batch_id, p_source_system);
    
    -- Step 4: Update Business Vault
    PERFORM business_vault.calculate_customer_metrics(p_batch_id);
    
    -- Step 5: Refresh Information Marts
    PERFORM information_mart.refresh_customer_current();
    
    -- Log completion
    INSERT INTO raw_vault.etl_log (batch_id, status, completed_at)
    VALUES (p_batch_id, 'SUCCESS', CURRENT_TIMESTAMP);
    
    COMMIT;
EXCEPTION
    WHEN OTHERS THEN
        INSERT INTO raw_vault.etl_log (batch_id, status, error_message, completed_at)
        VALUES (p_batch_id, 'FAILED', SQLERRM, CURRENT_TIMESTAMP);
        ROLLBACK;
        RAISE;
END;
$$;

Monitoring and Logging

-- ETL logging table
CREATE TABLE raw_vault.etl_log (
    log_id SERIAL PRIMARY KEY,
    batch_id BIGINT NOT NULL,
    table_name VARCHAR(100),
    records_processed INTEGER,
    records_inserted INTEGER,
    records_updated INTEGER,
    status VARCHAR(20),
    error_message TEXT,
    started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    completed_at TIMESTAMP
);

CREATE INDEX idx_etl_log_batch ON raw_vault.etl_log(batch_id);
CREATE INDEX idx_etl_log_status ON raw_vault.etl_log(status);

Best Practices

1. Naming Conventions

Follow consistent naming patterns:

  • Hubs: hub_<business_entity>
  • Links: link_<entity1>_<entity2>_<entityN>
  • Satellites: sat_<parent>_<description>
  • Hash keys: <entity>_hkey or <entity>_link_hkey

2. Hash Key Generation

  • Always use deterministic hash functions
  • Include delimiters between concatenated values
  • Standardize case (uppercase) and trim whitespace
  • Handle NULLs consistently

3. Load Patterns

  • Use batch loading windows for large datasets
  • Implement incremental loads using change data capture (CDC)
  • Separate staging, loading, and transformation phases
  • Maintain idempotency in load processes

4. Data Quality

  • Implement data quality satellites
  • Validate business keys before loading
  • Monitor for duplicate keys
  • Track data lineage and source systems

5. Performance

  • Partition large tables by load_date
  • Use appropriate indexes (B-tree, BRIN, partial)
  • Implement materialized views for common queries
  • Consider columnar storage extensions for analytics

6. Security

-- Role-based access control
CREATE ROLE dv_read;
CREATE ROLE dv_write;
CREATE ROLE dv_admin;

-- Grant privileges
GRANT USAGE ON SCHEMA raw_vault TO dv_read;
GRANT SELECT ON ALL TABLES IN SCHEMA raw_vault TO dv_read;

GRANT USAGE ON SCHEMA raw_vault TO dv_write;
GRANT INSERT ON ALL TABLES IN SCHEMA raw_vault TO dv_write;

GRANT ALL PRIVILEGES ON SCHEMA raw_vault TO dv_admin;

Complete Example: End-to-End Implementation

Let's walk through a complete example implementing a customer order system:

-- 1. Create all Hubs
CREATE TABLE raw_vault.hub_customer (
    customer_hkey CHAR(32) PRIMARY KEY,
    customer_id VARCHAR(50) NOT NULL UNIQUE,
    load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    record_source VARCHAR(50) NOT NULL
);

CREATE TABLE raw_vault.hub_order (
    order_hkey CHAR(32) PRIMARY KEY,
    order_id VARCHAR(50) NOT NULL UNIQUE,
    load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    record_source VARCHAR(50) NOT NULL
);

CREATE TABLE raw_vault.hub_product (
    product_hkey CHAR(32) PRIMARY KEY,
    product_code VARCHAR(50) NOT NULL UNIQUE,
    load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    record_source VARCHAR(50) NOT NULL
);

-- 2. Create Links
CREATE TABLE raw_vault.link_customer_order (
    co_link_hkey CHAR(32) PRIMARY KEY,
    customer_hkey CHAR(32) NOT NULL,
    order_hkey CHAR(32) NOT NULL,
    load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    record_source VARCHAR(50) NOT NULL,
    FOREIGN KEY (customer_hkey) REFERENCES raw_vault.hub_customer(customer_hkey),
    FOREIGN KEY (order_hkey) REFERENCES raw_vault.hub_order(order_hkey)
);

CREATE TABLE raw_vault.link_order_product (
    op_link_hkey CHAR(32) PRIMARY KEY,
    order_hkey CHAR(32) NOT NULL,
    product_hkey CHAR(32) NOT NULL,
    load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    record_source VARCHAR(50) NOT NULL,
    FOREIGN KEY (order_hkey) REFERENCES raw_vault.hub_order(order_hkey),
    FOREIGN KEY (product_hkey) REFERENCES raw_vault.hub_product(product_hkey)
);

-- 3. Create Satellites
CREATE TABLE raw_vault.sat_customer_details (
    customer_hkey CHAR(32) NOT NULL,
    load_date TIMESTAMP NOT NULL,
    load_end_date TIMESTAMP,
    record_source VARCHAR(50) NOT NULL,
    hash_diff CHAR(32) NOT NULL,
    customer_name VARCHAR(200),
    email VARCHAR(100),
    phone VARCHAR(20),
    PRIMARY KEY (customer_hkey, load_date),
    FOREIGN KEY (customer_hkey) REFERENCES raw_vault.hub_customer(customer_hkey)
);

CREATE TABLE raw_vault.sat_order_details (
    order_hkey CHAR(32) NOT NULL,
    load_date TIMESTAMP NOT NULL,
    load_end_date TIMESTAMP,
    record_source VARCHAR(50) NOT NULL,
    hash_diff CHAR(32) NOT NULL,
    order_date DATE,
    order_status VARCHAR(50),
    total_amount NUMERIC(12,2),
    PRIMARY KEY (order_hkey, load_date),
    FOREIGN KEY (order_hkey) REFERENCES raw_vault.hub_order(order_hkey)
);

CREATE TABLE raw_vault.sat_order_product_details (
    op_link_hkey CHAR(32) NOT NULL,
    load_date TIMESTAMP NOT NULL,
    load_end_date TIMESTAMP,
    record_source VARCHAR(50) NOT NULL,
    hash_diff CHAR(32) NOT NULL,
    quantity INTEGER,
    unit_price NUMERIC(10,2),
    line_total NUMERIC(12,2),
    PRIMARY KEY (op_link_hkey, load_date),
    FOREIGN KEY (op_link_hkey) REFERENCES raw_vault.link_order_product(op_link_hkey)
);

-- 4. Sample data load
INSERT INTO raw_vault.hub_customer VALUES 
    (raw_vault.generate_hash_key(ARRAY['CUST001']), 'CUST001', CURRENT_TIMESTAMP, 'ERP_SYSTEM');

INSERT INTO raw_vault.sat_customer_details VALUES (
    raw_vault.generate_hash_key(ARRAY['CUST001']),
    CURRENT_TIMESTAMP,
    NULL,
    'ERP_SYSTEM',
    raw_vault.generate_hash_diff(ARRAY['John Doe', 'john@example.com', '555-0100']),
    'John Doe',
    'john@example.com',
    '555-0100'
);

Troubleshooting Common Issues

Issue 1: Slow Satellite Queries

Problem: Queries retrieving current satellite records are slow.

Solution: Create a partial index on current records:

CREATE INDEX idx_sat_current 
    ON raw_vault.sat_customer_details(customer_hkey) 
    WHERE load_end_date IS NULL;

Issue 2: Hash Collisions

Problem: Different business keys produce the same hash.

Solution: While MD5 collisions are extremely rare, use SHA-256 for critical systems:

CREATE OR REPLACE FUNCTION raw_vault.generate_hash_key_sha256(
    p_business_keys TEXT[]
)
RETURNS CHAR(64) AS $$
BEGIN
    RETURN encode(digest(
        array_to_string(p_business_keys, '||'),
        'sha256'
    ), 'hex');
END;
$$ LANGUAGE plpgsql IMMUTABLE;

Issue 3: Managing Late-Arriving Data

Problem: Data arrives out of sequence.

Solution: Implement effective date handling in satellites:

-- Add effective date column
ALTER TABLE raw_vault.sat_customer_details
ADD COLUMN effective_date DATE;

-- Backfill with load_date
UPDATE raw_vault.sat_customer_details
SET effective_date = load_date::DATE
WHERE effective_date IS NULL;

Conclusion

Implementing Data Vault 2.0 in PostgreSQL provides a robust, scalable foundation for enterprise data warehousing. The methodology's insert-only architecture, combined with PostgreSQL's advanced features, creates a system that:

  • Maintains complete data history and auditability
  • Adapts easily to changing business requirements
  • Scales horizontally and vertically
  • Supports multiple source systems without restructuring
  • Enables parallel processing and incremental loads

By following the step-by-step implementation outlined in this guide, you can build a production-ready Data Vault warehouse that meets modern data engineering requirements.

Additional Resources

  • Official Data Vault 2.0 specification: https://datavaultalliance.com
  • PostgreSQL documentation: https://www.postgresql.org/docs/
  • Data Vault modeling tools and templates
  • ETL frameworks: Apache Airflow, Prefect, dbt

Next Steps

  1. Set up a development PostgreSQL instance
  2. Implement a proof-of-concept with sample data
  3. Design your business key structure
  4. Create automated ETL pipelines
  5. Build information marts for reporting
  6. Establish monitoring and alerting
  7. Scale to production workloads

Remember, Data Vault 2.0 is most effective when implemented incrementally. Start with a core business process, validate the approach, and expand systematically.

No comments:

Post a Comment

Implementing Data Vault 2.0 Warehouses with PostgreSQL

  Introduction Data Vault 2.0 is a modern data warehousing methodology designed to handle the challenges of today's agile, complex data...