Introduction
Data Vault 2.0 is a modern data warehousing methodology designed to handle the challenges of today's agile, complex data environments. Combining the flexibility of the original Data Vault architecture with Big Data capabilities and real-time processing, it offers a robust framework for building enterprise data warehouses.
In this comprehensive guide, we'll walk through implementing a Data Vault 2.0 warehouse using PostgreSQL, covering core concepts, practical implementation steps, and best practices.
What is Data Vault 2.0?
Data Vault 2.0 is an evolution of the original Data Vault methodology created by Dan Linstedt. It's designed to be:
- Agile and scalable: Adapts easily to changing business requirements
- Auditable: Maintains complete historical tracking
- Flexible: Accommodates multiple source systems without restructuring
- Insert-only: No updates or deletes in core tables, preserving data lineage
Core Components
Data Vault 2.0 consists of three primary entity types:
- Hubs: Store unique business keys and metadata
- Links: Represent relationships between business keys
- Satellites: Contain descriptive attributes and context
Why PostgreSQL for Data Vault 2.0?
PostgreSQL is an excellent choice for Data Vault implementations due to:
- ACID compliance: Ensures data integrity
- Advanced indexing: B-tree, hash, GiST, and GIN indexes for performance
- Partitioning: Native table partitioning for managing large datasets
- JSON support: Handles semi-structured data alongside relational
- Extensibility: Custom functions, operators, and data types
- Cost-effective: Open-source with enterprise features
Step 1: Setting Up Your PostgreSQL Environment
Database Configuration
First, create a dedicated database for your Data Vault warehouse:
-- Create the Data Vault database
CREATE DATABASE datavault_dw
WITH
ENCODING = 'UTF8'
LC_COLLATE = 'en_US.UTF-8'
LC_CTYPE = 'en_US.UTF-8'
TEMPLATE = template0;
-- Connect to the database
\c datavault_dw
-- Create schemas for organizational separation
CREATE SCHEMA IF NOT EXISTS raw_vault;
CREATE SCHEMA IF NOT EXISTS business_vault;
CREATE SCHEMA IF NOT EXISTS information_mart;
CREATE SCHEMA IF NOT EXISTS staging;
-- Set search path
SET search_path TO raw_vault, public;
Performance Optimization Settings
Configure PostgreSQL for optimal Data Vault performance:
-- Adjust PostgreSQL configuration (add to postgresql.conf)
-- shared_buffers = 256MB -- Adjust based on available RAM
-- effective_cache_size = 1GB
-- maintenance_work_mem = 64MB
-- checkpoint_completion_target = 0.9
-- wal_buffers = 16MB
-- default_statistics_target = 100
-- random_page_cost = 1.1 -- For SSDs
Step 2: Defining Standard Metadata Columns
Data Vault 2.0 requires specific metadata columns for audit and tracking:
-- Create a function to generate standard metadata columns
CREATE OR REPLACE FUNCTION raw_vault.get_metadata_columns()
RETURNS TABLE (
column_definition TEXT
) AS $$
BEGIN
RETURN QUERY SELECT unnest(ARRAY[
'load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP',
'record_source VARCHAR(50) NOT NULL',
'load_batch_id BIGINT',
'cdc_operation CHAR(1)' -- I=Insert, U=Update, D=Delete
]);
END;
$$ LANGUAGE plpgsql;
Step 3: Implementing Hubs
Hubs store business keys - the unique identifiers from source systems.
Hub Structure
-- Example: Customer Hub
CREATE TABLE raw_vault.hub_customer (
customer_hkey CHAR(32) PRIMARY KEY, -- MD5 hash of business key
customer_id VARCHAR(50) NOT NULL, -- Business key
load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
record_source VARCHAR(50) NOT NULL,
UNIQUE(customer_id)
);
CREATE INDEX idx_hub_customer_loaddate ON raw_vault.hub_customer(load_date);
-- Example: Product Hub
CREATE TABLE raw_vault.hub_product (
product_hkey CHAR(32) PRIMARY KEY,
product_code VARCHAR(50) NOT NULL,
load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
record_source VARCHAR(50) NOT NULL,
UNIQUE(product_code)
);
CREATE INDEX idx_hub_product_loaddate ON raw_vault.hub_product(load_date);
Hub Hash Key Generation
Create a function to generate consistent hash keys:
-- Hash key generation function
CREATE OR REPLACE FUNCTION raw_vault.generate_hash_key(
p_business_keys TEXT[]
)
RETURNS CHAR(32) AS $$
DECLARE
v_concatenated TEXT;
BEGIN
-- Concatenate business keys with delimiter
v_concatenated := array_to_string(p_business_keys, '||');
-- Convert to uppercase and trim for consistency
v_concatenated := UPPER(TRIM(v_concatenated));
-- Return MD5 hash
RETURN MD5(v_concatenated);
END;
$$ LANGUAGE plpgsql IMMUTABLE;
-- Usage example
SELECT raw_vault.generate_hash_key(ARRAY['CUST-12345']);
Step 4: Implementing Links
Links capture relationships between business entities.
Link Structure
-- Example: Customer-Product Order Link
CREATE TABLE raw_vault.link_customer_order_product (
cop_link_hkey CHAR(32) PRIMARY KEY,
customer_hkey CHAR(32) NOT NULL,
order_hkey CHAR(32) NOT NULL,
product_hkey CHAR(32) NOT NULL,
load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
record_source VARCHAR(50) NOT NULL,
FOREIGN KEY (customer_hkey) REFERENCES raw_vault.hub_customer(customer_hkey),
FOREIGN KEY (product_hkey) REFERENCES raw_vault.hub_product(product_hkey)
);
CREATE INDEX idx_link_cop_customer ON raw_vault.link_customer_order_product(customer_hkey);
CREATE INDEX idx_link_cop_product ON raw_vault.link_customer_order_product(product_hkey);
CREATE INDEX idx_link_cop_loaddate ON raw_vault.link_customer_order_product(load_date);
-- Link hash key is generated from all participating hub keys
-- link_hkey = MD5(customer_hkey || order_hkey || product_hkey)
Creating Links with Hash Keys
-- Function to create link hash keys
CREATE OR REPLACE FUNCTION raw_vault.generate_link_hash_key(
p_hub_keys TEXT[]
)
RETURNS CHAR(32) AS $$
DECLARE
v_concatenated TEXT;
BEGIN
-- Sort the hub keys for consistency
v_concatenated := array_to_string(
(SELECT array_agg(x ORDER BY x) FROM unnest(p_hub_keys) x),
'||'
);
RETURN MD5(v_concatenated);
END;
$$ LANGUAGE plpgsql IMMUTABLE;
Step 5: Implementing Satellites
Satellites contain descriptive attributes and track changes over time.
Satellite Structure
-- Example: Customer Satellite (descriptive data)
CREATE TABLE raw_vault.sat_customer_details (
customer_hkey CHAR(32) NOT NULL,
load_date TIMESTAMP NOT NULL,
load_end_date TIMESTAMP,
record_source VARCHAR(50) NOT NULL,
hash_diff CHAR(32) NOT NULL, -- Hash of all descriptive attributes
-- Descriptive attributes
customer_name VARCHAR(200),
email VARCHAR(100),
phone VARCHAR(20),
address VARCHAR(500),
city VARCHAR(100),
state VARCHAR(50),
zip_code VARCHAR(20),
country VARCHAR(100),
PRIMARY KEY (customer_hkey, load_date),
FOREIGN KEY (customer_hkey) REFERENCES raw_vault.hub_customer(customer_hkey)
);
CREATE INDEX idx_sat_customer_details_hkey ON raw_vault.sat_customer_details(customer_hkey);
CREATE INDEX idx_sat_customer_details_loaddate ON raw_vault.sat_customer_details(load_date);
CREATE INDEX idx_sat_customer_details_hashdiff ON raw_vault.sat_customer_details(hash_diff);
-- Example: Link Satellite (contextual data about relationship)
CREATE TABLE raw_vault.sat_customer_order_product (
cop_link_hkey CHAR(32) NOT NULL,
load_date TIMESTAMP NOT NULL,
load_end_date TIMESTAMP,
record_source VARCHAR(50) NOT NULL,
hash_diff CHAR(32) NOT NULL,
-- Contextual attributes
order_date DATE,
quantity INTEGER,
unit_price NUMERIC(10,2),
total_amount NUMERIC(12,2),
discount_percent NUMERIC(5,2),
status VARCHAR(50),
PRIMARY KEY (cop_link_hkey, load_date),
FOREIGN KEY (cop_link_hkey) REFERENCES raw_vault.link_customer_order_product(cop_link_hkey)
);
CREATE INDEX idx_sat_cop_link ON raw_vault.sat_customer_order_product(cop_link_hkey);
CREATE INDEX idx_sat_cop_loaddate ON raw_vault.sat_customer_order_product(load_date);
Hash Diff Generation
The hash diff detects changes in descriptive attributes:
-- Function to generate hash diff
CREATE OR REPLACE FUNCTION raw_vault.generate_hash_diff(
p_columns TEXT[]
)
RETURNS CHAR(32) AS $$
DECLARE
v_concatenated TEXT;
BEGIN
-- Concatenate all column values
v_concatenated := array_to_string(p_columns, '||');
-- Handle NULLs consistently
v_concatenated := COALESCE(v_concatenated, '');
-- Convert to uppercase for consistency
v_concatenated := UPPER(v_concatenated);
RETURN MD5(v_concatenated);
END;
$$ LANGUAGE plpgsql IMMUTABLE;
Step 6: Loading Data into Data Vault
Staging Area
Create a staging layer for incoming data:
-- Staging table for customer data
CREATE TABLE staging.stg_customer (
customer_id VARCHAR(50),
customer_name VARCHAR(200),
email VARCHAR(100),
phone VARCHAR(20),
address VARCHAR(500),
city VARCHAR(100),
state VARCHAR(50),
zip_code VARCHAR(20),
country VARCHAR(100),
load_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
source_system VARCHAR(50)
);
Loading Hubs
-- Load Hub from staging
INSERT INTO raw_vault.hub_customer (
customer_hkey,
customer_id,
load_date,
record_source
)
SELECT DISTINCT
raw_vault.generate_hash_key(ARRAY[customer_id]),
customer_id,
CURRENT_TIMESTAMP,
source_system
FROM staging.stg_customer stg
WHERE NOT EXISTS (
SELECT 1
FROM raw_vault.hub_customer hub
WHERE hub.customer_id = stg.customer_id
);
Loading Satellites with Change Detection
-- Load Satellite with change detection using hash diff
WITH new_records AS (
SELECT
raw_vault.generate_hash_key(ARRAY[stg.customer_id]) as customer_hkey,
CURRENT_TIMESTAMP as load_date,
stg.source_system as record_source,
raw_vault.generate_hash_diff(ARRAY[
COALESCE(stg.customer_name, ''),
COALESCE(stg.email, ''),
COALESCE(stg.phone, ''),
COALESCE(stg.address, ''),
COALESCE(stg.city, ''),
COALESCE(stg.state, ''),
COALESCE(stg.zip_code, ''),
COALESCE(stg.country, '')
]) as hash_diff,
stg.customer_name,
stg.email,
stg.phone,
stg.address,
stg.city,
stg.state,
stg.zip_code,
stg.country
FROM staging.stg_customer stg
),
current_satellites AS (
SELECT DISTINCT ON (customer_hkey)
customer_hkey,
hash_diff
FROM raw_vault.sat_customer_details
ORDER BY customer_hkey, load_date DESC
)
INSERT INTO raw_vault.sat_customer_details (
customer_hkey,
load_date,
record_source,
hash_diff,
customer_name,
email,
phone,
address,
city,
state,
zip_code,
country
)
SELECT
nr.customer_hkey,
nr.load_date,
nr.record_source,
nr.hash_diff,
nr.customer_name,
nr.email,
nr.phone,
nr.address,
nr.city,
nr.state,
nr.zip_code,
nr.country
FROM new_records nr
LEFT JOIN current_satellites cs
ON nr.customer_hkey = cs.customer_hkey
WHERE cs.customer_hkey IS NULL -- New customer
OR cs.hash_diff != nr.hash_diff; -- Changed data
Loading Links
-- Load Link table
INSERT INTO raw_vault.link_customer_order_product (
cop_link_hkey,
customer_hkey,
order_hkey,
product_hkey,
load_date,
record_source
)
SELECT DISTINCT
raw_vault.generate_link_hash_key(ARRAY[
raw_vault.generate_hash_key(ARRAY[stg.customer_id]),
raw_vault.generate_hash_key(ARRAY[stg.order_id]),
raw_vault.generate_hash_key(ARRAY[stg.product_code])
]),
raw_vault.generate_hash_key(ARRAY[stg.customer_id]),
raw_vault.generate_hash_key(ARRAY[stg.order_id]),
raw_vault.generate_hash_key(ARRAY[stg.product_code]),
CURRENT_TIMESTAMP,
stg.source_system
FROM staging.stg_order_details stg
WHERE NOT EXISTS (
SELECT 1
FROM raw_vault.link_customer_order_product lnk
WHERE lnk.cop_link_hkey = raw_vault.generate_link_hash_key(ARRAY[
raw_vault.generate_hash_key(ARRAY[stg.customer_id]),
raw_vault.generate_hash_key(ARRAY[stg.order_id]),
raw_vault.generate_hash_key(ARRAY[stg.product_code])
])
);
Step 7: Creating Business Vault Objects
Business Vault extends the Raw Vault with computed attributes and business rules.
Business Vault Example
-- Business vault schema
CREATE SCHEMA IF NOT EXISTS business_vault;
-- Computed satellite example: Customer lifetime value
CREATE TABLE business_vault.sat_customer_metrics (
customer_hkey CHAR(32) NOT NULL,
load_date TIMESTAMP NOT NULL,
load_end_date TIMESTAMP,
-- Computed metrics
total_orders INTEGER,
total_spent NUMERIC(15,2),
average_order_value NUMERIC(12,2),
lifetime_value NUMERIC(15,2),
customer_segment VARCHAR(50),
PRIMARY KEY (customer_hkey, load_date)
);
-- Population query
INSERT INTO business_vault.sat_customer_metrics
SELECT
h.customer_hkey,
CURRENT_TIMESTAMP as load_date,
NULL as load_end_date,
COUNT(DISTINCT l.order_hkey) as total_orders,
SUM(s.total_amount) as total_spent,
AVG(s.total_amount) as average_order_value,
SUM(s.total_amount) as lifetime_value,
CASE
WHEN SUM(s.total_amount) >= 10000 THEN 'Premium'
WHEN SUM(s.total_amount) >= 5000 THEN 'Gold'
WHEN SUM(s.total_amount) >= 1000 THEN 'Silver'
ELSE 'Bronze'
END as customer_segment
FROM raw_vault.hub_customer h
LEFT JOIN raw_vault.link_customer_order_product l
ON h.customer_hkey = l.customer_hkey
LEFT JOIN raw_vault.sat_customer_order_product s
ON l.cop_link_hkey = s.cop_link_hkey
GROUP BY h.customer_hkey;
Step 8: Building Information Marts
Information Marts are dimensional models built on top of the Data Vault for reporting.
Point-in-Time (PIT) Tables
PIT tables provide a snapshot of data at specific points in time:
-- Create PIT table
CREATE TABLE information_mart.pit_customer (
customer_hkey CHAR(32) NOT NULL,
snapshot_date DATE NOT NULL,
sat_details_loaddate TIMESTAMP,
sat_metrics_loaddate TIMESTAMP,
PRIMARY KEY (customer_hkey, snapshot_date)
);
-- Populate PIT table
INSERT INTO information_mart.pit_customer
SELECT
h.customer_hkey,
d.snapshot_date,
(SELECT MAX(load_date)
FROM raw_vault.sat_customer_details s
WHERE s.customer_hkey = h.customer_hkey
AND s.load_date <= d.snapshot_date) as sat_details_loaddate,
(SELECT MAX(load_date)
FROM business_vault.sat_customer_metrics m
WHERE m.customer_hkey = h.customer_hkey
AND m.load_date <= d.snapshot_date) as sat_metrics_loaddate
FROM raw_vault.hub_customer h
CROSS JOIN (
SELECT DISTINCT load_date::DATE as snapshot_date
FROM raw_vault.sat_customer_details
) d;
Dimension Table
-- Customer dimension for reporting
CREATE TABLE information_mart.dim_customer (
customer_key SERIAL PRIMARY KEY,
customer_hkey CHAR(32) NOT NULL,
customer_id VARCHAR(50),
customer_name VARCHAR(200),
email VARCHAR(100),
phone VARCHAR(20),
full_address TEXT,
city VARCHAR(100),
state VARCHAR(50),
country VARCHAR(100),
customer_segment VARCHAR(50),
lifetime_value NUMERIC(15,2),
valid_from TIMESTAMP,
valid_to TIMESTAMP,
is_current BOOLEAN
);
-- Populate dimension with current view
CREATE OR REPLACE VIEW information_mart.vw_dim_customer_current AS
SELECT
h.customer_hkey,
h.customer_id,
sd.customer_name,
sd.email,
sd.phone,
sd.address || ', ' || sd.city || ', ' || sd.state || ' ' || sd.zip_code as full_address,
sd.city,
sd.state,
sd.country,
bm.customer_segment,
bm.lifetime_value,
sd.load_date as valid_from,
sd.load_end_date as valid_to,
CASE WHEN sd.load_end_date IS NULL THEN TRUE ELSE FALSE END as is_current
FROM raw_vault.hub_customer h
INNER JOIN LATERAL (
SELECT *
FROM raw_vault.sat_customer_details s
WHERE s.customer_hkey = h.customer_hkey
ORDER BY s.load_date DESC
LIMIT 1
) sd ON TRUE
LEFT JOIN LATERAL (
SELECT *
FROM business_vault.sat_customer_metrics m
WHERE m.customer_hkey = h.customer_hkey
ORDER BY m.load_date DESC
LIMIT 1
) bm ON TRUE;
Step 9: Implementing Data Quality Rules
Data Quality Satellites
-- Data quality satellite
CREATE TABLE raw_vault.sat_customer_dq (
customer_hkey CHAR(32) NOT NULL,
load_date TIMESTAMP NOT NULL,
record_source VARCHAR(50) NOT NULL,
dq_score INTEGER, -- 0-100
email_valid BOOLEAN,
phone_valid BOOLEAN,
address_complete BOOLEAN,
duplicate_suspect BOOLEAN,
PRIMARY KEY (customer_hkey, load_date)
);
-- Data quality check function
CREATE OR REPLACE FUNCTION raw_vault.check_customer_data_quality(
p_customer_hkey CHAR(32)
)
RETURNS TABLE (
dq_score INTEGER,
email_valid BOOLEAN,
phone_valid BOOLEAN,
address_complete BOOLEAN
) AS $$
BEGIN
RETURN QUERY
SELECT
CASE
WHEN sd.email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' THEN 25 ELSE 0
END +
CASE
WHEN sd.phone ~* '^\+?[0-9]{10,15}$' THEN 25 ELSE 0
END +
CASE
WHEN sd.address IS NOT NULL AND sd.city IS NOT NULL
AND sd.state IS NOT NULL AND sd.zip_code IS NOT NULL THEN 50 ELSE 0
END as dq_score,
sd.email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' as email_valid,
sd.phone ~* '^\+?[0-9]{10,15}$' as phone_valid,
(sd.address IS NOT NULL AND sd.city IS NOT NULL
AND sd.state IS NOT NULL AND sd.zip_code IS NOT NULL) as address_complete
FROM raw_vault.sat_customer_details sd
WHERE sd.customer_hkey = p_customer_hkey
ORDER BY sd.load_date DESC
LIMIT 1;
END;
$$ LANGUAGE plpgsql;
Step 10: Performance Optimization
Partitioning Large Tables
-- Partition satellite by load date
CREATE TABLE raw_vault.sat_customer_details_partitioned (
customer_hkey CHAR(32) NOT NULL,
load_date TIMESTAMP NOT NULL,
load_end_date TIMESTAMP,
record_source VARCHAR(50) NOT NULL,
hash_diff CHAR(32) NOT NULL,
customer_name VARCHAR(200),
email VARCHAR(100),
phone VARCHAR(20),
address VARCHAR(500),
city VARCHAR(100),
state VARCHAR(50),
zip_code VARCHAR(20),
country VARCHAR(100),
PRIMARY KEY (customer_hkey, load_date)
) PARTITION BY RANGE (load_date);
-- Create partitions
CREATE TABLE raw_vault.sat_customer_details_2024_q1
PARTITION OF raw_vault.sat_customer_details_partitioned
FOR VALUES FROM ('2024-01-01') TO ('2024-04-01');
CREATE TABLE raw_vault.sat_customer_details_2024_q2
PARTITION OF raw_vault.sat_customer_details_partitioned
FOR VALUES FROM ('2024-04-01') TO ('2024-07-01');
-- Continue creating partitions as needed
Indexing Strategy
-- Composite indexes for common query patterns
CREATE INDEX idx_sat_customer_hkey_loaddate
ON raw_vault.sat_customer_details(customer_hkey, load_date DESC);
-- Partial index for current records
CREATE INDEX idx_sat_customer_current
ON raw_vault.sat_customer_details(customer_hkey)
WHERE load_end_date IS NULL;
-- BRIN index for large sequential tables
CREATE INDEX idx_sat_customer_loaddate_brin
ON raw_vault.sat_customer_details
USING BRIN(load_date);
Materialized Views
-- Materialized view for current state
CREATE MATERIALIZED VIEW information_mart.mv_customer_current AS
SELECT
h.customer_hkey,
h.customer_id,
sd.customer_name,
sd.email,
sd.phone,
sd.city,
sd.state,
sd.country,
bm.customer_segment,
bm.lifetime_value,
bm.total_orders
FROM raw_vault.hub_customer h
INNER JOIN LATERAL (
SELECT *
FROM raw_vault.sat_customer_details s
WHERE s.customer_hkey = h.customer_hkey
ORDER BY s.load_date DESC
LIMIT 1
) sd ON TRUE
LEFT JOIN LATERAL (
SELECT *
FROM business_vault.sat_customer_metrics m
WHERE m.customer_hkey = h.customer_hkey
ORDER BY m.load_date DESC
LIMIT 1
) bm ON TRUE;
CREATE UNIQUE INDEX idx_mv_customer_current_hkey
ON information_mart.mv_customer_current(customer_hkey);
-- Refresh strategy
CREATE OR REPLACE FUNCTION information_mart.refresh_customer_current()
RETURNS void AS $$
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY information_mart.mv_customer_current;
END;
$$ LANGUAGE plpgsql;
Step 11: Automation and Maintenance
Automated ETL Procedure
-- Master ETL procedure
CREATE OR REPLACE PROCEDURE raw_vault.execute_etl_load(
p_batch_id BIGINT,
p_source_system VARCHAR(50)
)
LANGUAGE plpgsql
AS $$
BEGIN
-- Step 1: Load Hubs
PERFORM raw_vault.load_customer_hub(p_batch_id, p_source_system);
PERFORM raw_vault.load_product_hub(p_batch_id, p_source_system);
-- Step 2: Load Links
PERFORM raw_vault.load_customer_order_product_link(p_batch_id, p_source_system);
-- Step 3: Load Satellites
PERFORM raw_vault.load_customer_details_sat(p_batch_id, p_source_system);
PERFORM raw_vault.load_order_details_sat(p_batch_id, p_source_system);
-- Step 4: Update Business Vault
PERFORM business_vault.calculate_customer_metrics(p_batch_id);
-- Step 5: Refresh Information Marts
PERFORM information_mart.refresh_customer_current();
-- Log completion
INSERT INTO raw_vault.etl_log (batch_id, status, completed_at)
VALUES (p_batch_id, 'SUCCESS', CURRENT_TIMESTAMP);
COMMIT;
EXCEPTION
WHEN OTHERS THEN
INSERT INTO raw_vault.etl_log (batch_id, status, error_message, completed_at)
VALUES (p_batch_id, 'FAILED', SQLERRM, CURRENT_TIMESTAMP);
ROLLBACK;
RAISE;
END;
$$;
Monitoring and Logging
-- ETL logging table
CREATE TABLE raw_vault.etl_log (
log_id SERIAL PRIMARY KEY,
batch_id BIGINT NOT NULL,
table_name VARCHAR(100),
records_processed INTEGER,
records_inserted INTEGER,
records_updated INTEGER,
status VARCHAR(20),
error_message TEXT,
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP
);
CREATE INDEX idx_etl_log_batch ON raw_vault.etl_log(batch_id);
CREATE INDEX idx_etl_log_status ON raw_vault.etl_log(status);
Best Practices
1. Naming Conventions
Follow consistent naming patterns:
- Hubs:
hub_<business_entity> - Links:
link_<entity1>_<entity2>_<entityN> - Satellites:
sat_<parent>_<description> - Hash keys:
<entity>_hkeyor<entity>_link_hkey
2. Hash Key Generation
- Always use deterministic hash functions
- Include delimiters between concatenated values
- Standardize case (uppercase) and trim whitespace
- Handle NULLs consistently
3. Load Patterns
- Use batch loading windows for large datasets
- Implement incremental loads using change data capture (CDC)
- Separate staging, loading, and transformation phases
- Maintain idempotency in load processes
4. Data Quality
- Implement data quality satellites
- Validate business keys before loading
- Monitor for duplicate keys
- Track data lineage and source systems
5. Performance
- Partition large tables by load_date
- Use appropriate indexes (B-tree, BRIN, partial)
- Implement materialized views for common queries
- Consider columnar storage extensions for analytics
6. Security
-- Role-based access control
CREATE ROLE dv_read;
CREATE ROLE dv_write;
CREATE ROLE dv_admin;
-- Grant privileges
GRANT USAGE ON SCHEMA raw_vault TO dv_read;
GRANT SELECT ON ALL TABLES IN SCHEMA raw_vault TO dv_read;
GRANT USAGE ON SCHEMA raw_vault TO dv_write;
GRANT INSERT ON ALL TABLES IN SCHEMA raw_vault TO dv_write;
GRANT ALL PRIVILEGES ON SCHEMA raw_vault TO dv_admin;
Complete Example: End-to-End Implementation
Let's walk through a complete example implementing a customer order system:
-- 1. Create all Hubs
CREATE TABLE raw_vault.hub_customer (
customer_hkey CHAR(32) PRIMARY KEY,
customer_id VARCHAR(50) NOT NULL UNIQUE,
load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
record_source VARCHAR(50) NOT NULL
);
CREATE TABLE raw_vault.hub_order (
order_hkey CHAR(32) PRIMARY KEY,
order_id VARCHAR(50) NOT NULL UNIQUE,
load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
record_source VARCHAR(50) NOT NULL
);
CREATE TABLE raw_vault.hub_product (
product_hkey CHAR(32) PRIMARY KEY,
product_code VARCHAR(50) NOT NULL UNIQUE,
load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
record_source VARCHAR(50) NOT NULL
);
-- 2. Create Links
CREATE TABLE raw_vault.link_customer_order (
co_link_hkey CHAR(32) PRIMARY KEY,
customer_hkey CHAR(32) NOT NULL,
order_hkey CHAR(32) NOT NULL,
load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
record_source VARCHAR(50) NOT NULL,
FOREIGN KEY (customer_hkey) REFERENCES raw_vault.hub_customer(customer_hkey),
FOREIGN KEY (order_hkey) REFERENCES raw_vault.hub_order(order_hkey)
);
CREATE TABLE raw_vault.link_order_product (
op_link_hkey CHAR(32) PRIMARY KEY,
order_hkey CHAR(32) NOT NULL,
product_hkey CHAR(32) NOT NULL,
load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
record_source VARCHAR(50) NOT NULL,
FOREIGN KEY (order_hkey) REFERENCES raw_vault.hub_order(order_hkey),
FOREIGN KEY (product_hkey) REFERENCES raw_vault.hub_product(product_hkey)
);
-- 3. Create Satellites
CREATE TABLE raw_vault.sat_customer_details (
customer_hkey CHAR(32) NOT NULL,
load_date TIMESTAMP NOT NULL,
load_end_date TIMESTAMP,
record_source VARCHAR(50) NOT NULL,
hash_diff CHAR(32) NOT NULL,
customer_name VARCHAR(200),
email VARCHAR(100),
phone VARCHAR(20),
PRIMARY KEY (customer_hkey, load_date),
FOREIGN KEY (customer_hkey) REFERENCES raw_vault.hub_customer(customer_hkey)
);
CREATE TABLE raw_vault.sat_order_details (
order_hkey CHAR(32) NOT NULL,
load_date TIMESTAMP NOT NULL,
load_end_date TIMESTAMP,
record_source VARCHAR(50) NOT NULL,
hash_diff CHAR(32) NOT NULL,
order_date DATE,
order_status VARCHAR(50),
total_amount NUMERIC(12,2),
PRIMARY KEY (order_hkey, load_date),
FOREIGN KEY (order_hkey) REFERENCES raw_vault.hub_order(order_hkey)
);
CREATE TABLE raw_vault.sat_order_product_details (
op_link_hkey CHAR(32) NOT NULL,
load_date TIMESTAMP NOT NULL,
load_end_date TIMESTAMP,
record_source VARCHAR(50) NOT NULL,
hash_diff CHAR(32) NOT NULL,
quantity INTEGER,
unit_price NUMERIC(10,2),
line_total NUMERIC(12,2),
PRIMARY KEY (op_link_hkey, load_date),
FOREIGN KEY (op_link_hkey) REFERENCES raw_vault.link_order_product(op_link_hkey)
);
-- 4. Sample data load
INSERT INTO raw_vault.hub_customer VALUES
(raw_vault.generate_hash_key(ARRAY['CUST001']), 'CUST001', CURRENT_TIMESTAMP, 'ERP_SYSTEM');
INSERT INTO raw_vault.sat_customer_details VALUES (
raw_vault.generate_hash_key(ARRAY['CUST001']),
CURRENT_TIMESTAMP,
NULL,
'ERP_SYSTEM',
raw_vault.generate_hash_diff(ARRAY['John Doe', 'john@example.com', '555-0100']),
'John Doe',
'john@example.com',
'555-0100'
);
Troubleshooting Common Issues
Issue 1: Slow Satellite Queries
Problem: Queries retrieving current satellite records are slow.
Solution: Create a partial index on current records:
CREATE INDEX idx_sat_current
ON raw_vault.sat_customer_details(customer_hkey)
WHERE load_end_date IS NULL;
Issue 2: Hash Collisions
Problem: Different business keys produce the same hash.
Solution: While MD5 collisions are extremely rare, use SHA-256 for critical systems:
CREATE OR REPLACE FUNCTION raw_vault.generate_hash_key_sha256(
p_business_keys TEXT[]
)
RETURNS CHAR(64) AS $$
BEGIN
RETURN encode(digest(
array_to_string(p_business_keys, '||'),
'sha256'
), 'hex');
END;
$$ LANGUAGE plpgsql IMMUTABLE;
Issue 3: Managing Late-Arriving Data
Problem: Data arrives out of sequence.
Solution: Implement effective date handling in satellites:
-- Add effective date column
ALTER TABLE raw_vault.sat_customer_details
ADD COLUMN effective_date DATE;
-- Backfill with load_date
UPDATE raw_vault.sat_customer_details
SET effective_date = load_date::DATE
WHERE effective_date IS NULL;
Conclusion
Implementing Data Vault 2.0 in PostgreSQL provides a robust, scalable foundation for enterprise data warehousing. The methodology's insert-only architecture, combined with PostgreSQL's advanced features, creates a system that:
- Maintains complete data history and auditability
- Adapts easily to changing business requirements
- Scales horizontally and vertically
- Supports multiple source systems without restructuring
- Enables parallel processing and incremental loads
By following the step-by-step implementation outlined in this guide, you can build a production-ready Data Vault warehouse that meets modern data engineering requirements.
Additional Resources
- Official Data Vault 2.0 specification: https://datavaultalliance.com
- PostgreSQL documentation: https://www.postgresql.org/docs/
- Data Vault modeling tools and templates
- ETL frameworks: Apache Airflow, Prefect, dbt
Next Steps
- Set up a development PostgreSQL instance
- Implement a proof-of-concept with sample data
- Design your business key structure
- Create automated ETL pipelines
- Build information marts for reporting
- Establish monitoring and alerting
- Scale to production workloads
Remember, Data Vault 2.0 is most effective when implemented incrementally. Start with a core business process, validate the approach, and expand systematically.
No comments:
Post a Comment