Friday, 24 April 2026

KvRocks as Your Main Database — Possible Use Cases

 

A technical deep-dive into when Apache KvRocks earns a primary role in your stack


Introduction

Most engineers first encounter Apache KvRocks as a Redis cost-reduction story: same wire protocol, same commands, less RAM, cheaper bill. That framing is accurate but undersells what KvRocks actually is. When you replace Redis's in-memory engine with RocksDB — one of the most battle-tested LSM-tree storage engines ever built — you get a system with fundamentally different durability, capacity, and operational characteristics. That changes which role it should play in your architecture.

This post is not about KvRocks as a Redis cache replacement. It is about the specific use cases where KvRocks is the right primary data store — not a cache in front of something else, not a temporary buffer, but the authoritative system of record for a class of data.

We will cover the storage engine mechanics that make this possible, the use cases where KvRocks earns a primary role, and the hard boundaries where it should not be used as one.


Understanding the Storage Engine Difference

Before use cases, the mechanics matter.

Redis / Valkey

Redis keeps all data in RAM. Persistence is optional and retrofitted — either RDB snapshots (point-in-time, lossy) or AOF (append-only log, configurable fsync). Neither was designed from the ground up for durability. An AOF rewrite under load causes measurable latency spikes. RDB fork under a large working set causes memory pressure. The persistence model is a concession, not a design centre.

KvRocks + RocksDB

KvRocks uses RocksDB as its storage backend. RocksDB is an LSM-tree (Log-Structured Merge-Tree) engine developed at Facebook/Meta, now widely deployed in production at companies including Cockroach Labs, TiKV, Yugabyte, and LinkedIn. It was designed for:

  • Sequential write amplification — writes land in a memtable, flush to L0 SSTables, and compact downward through levels. Write throughput is high because it is fundamentally sequential I/O.
  • Durability by default — WAL (Write-Ahead Log) is on by default. Crash recovery is structural, not configured.
  • Compression — Snappy, LZ4, and Zstd compression at the SSTable level. Cold data compresses aggressively, reducing storage cost significantly.
  • Bloom filters — per-SSTable bloom filters accelerate point lookups by avoiding unnecessary disk reads.
  • Block cache — a configurable in-memory block cache sits above the storage layer, giving you warm-read performance close to Redis for frequently accessed keys.

The implication for KvRocks: your dataset size is bounded by NVMe/SSD capacity, not RAM. A dataset that would require a 256 GB RAM instance in Redis can run on a server with 32 GB RAM and 2 TB NVMe in KvRocks, with hot data served from block cache and cold data fetched from disk with bloom filter-accelerated lookups.

Latency Profile

This is the trade-off you accept:

Operation Redis (RAM) KvRocks (block cache hit) KvRocks (disk read)
GET (hot key) ~50–200µs ~200–800µs ~1–5ms
SET ~50–200µs ~200–500µs ~200–500µs (WAL)
HGETALL (large hash) ~100µs–1ms ~500µs–3ms ~2–10ms
SCAN ~1–10ms ~5–30ms ~10–100ms

Write latency is competitive because RocksDB writes are sequential. Read latency for hot data with a well-tuned block cache is acceptable for most non-authorisation-path use cases. Cold reads are measurably slower, which is why dataset access patterns determine fitness.


Use Case 1: Idempotency Key Store

The Problem

Payment systems, API gateways, and distributed job schedulers must guarantee exactly-once processing. The standard mechanism is an idempotency key: the client sends a unique key with each request; the server checks whether it has seen this key before and returns a cached result if so, or processes and stores the result if not.

The requirements for an idempotency key store are:

  • Durability — losing an idempotency record means double-processing a payment, sending a duplicate notification, or running a job twice. This is a hard correctness requirement.
  • Long retention — regulatory frameworks and API contracts often require 24–90 days of idempotency key retention. PCI-DSS environments and banking integrations commonly mandate the longer end of this range.
  • High write throughput — every incoming request is a write.
  • Point-lookup reads — the read pattern is almost exclusively GET idempotency:{uuid}.
  • Automatic expiry — keys should expire after their retention window.

Why Redis/Valkey Struggles Here

The retention window is the killer. At 100,000 requests/day with a 90-day window, you accumulate approximately 9 million keys. If each record stores the request hash, response payload, and metadata (~2KB average), that is ~18 GB of RAM dedicated solely to idempotency keys — before any other workload. Scaling to 1M requests/day makes this untenable.

AOF durability adds operational anxiety: AOF rewrite storms, the risk of appendfsync everysec losing up to one second of keys under failure.

Why KvRocks is the Right Primary Store

KvRocks handles this use case cleanly:

SET idempotency:{uuid} {payload} EX 7776000
GET idempotency:{uuid}

Identical commands to Redis. No application change required. But now:

  • The same 18 GB dataset fits in ~4 GB of NVMe space after Snappy compression (LSM-tree compaction eliminates tombstones and compresses cold SSTables).
  • WAL ensures that a committed SET survives a crash without requiring appendfsync always.
  • RocksDB bloom filters mean GET on a non-existent key (the common path for new requests) avoids disk reads.
  • EX (TTL) is implemented natively; RocksDB compaction handles tombstone cleanup.

Configuration notes:

# kvrocks.conf
rocksdb.compression_per_level     no:no:lz4:lz4:zstd:zstd:zstd
rocksdb.block_cache_size          4096      # 4GB block cache for hot idempotency keys
rocksdb.write_buffer_size         128       # MB, tune for write throughput
rocksdb.bloom_locality            1         # enable bloom filters

Sizing guidance: With LZ4/Zstd compression, budget approximately 200–400 bytes per idempotency record on disk. A 90-day window at 1M requests/day requires roughly 200–400 GB NVMe — trivial on modern hardware, and easily sharded across KvRocks cluster nodes.


Use Case 2: Fraud Feature Store

The Problem

Machine learning-based fraud detection requires a feature store: a low-latency lookup table of precomputed features associated with entities (cards, accounts, merchants, devices). Examples:

  • card:{pan_hash}:avg_txn_amount_30d42.17
  • device:{fingerprint}:countries_seen_7d["GH","GB","NG"]
  • merchant:{mid}:chargeback_rate_90d0.0031
  • account:{id}:velocity_1h7

These features are computed by a streaming pipeline (Flink, Kafka Streams, or similar) and written continuously. They are read at authorisation time by the fraud scoring model.

Requirements

  • Large dataset — feature stores for production fraud systems easily reach 50–500 GB of computed features.
  • High write throughput — continuous feature updates from the streaming pipeline.
  • Sub-5ms read latency — the fraud scoring model reads 20–100 features per transaction, in parallel. Total feature retrieval budget is typically 5–15ms.
  • Durability matters, but not perfectly — losing a few seconds of feature updates is survivable (the feature becomes slightly stale, not absent). Losing the entire feature store is catastrophic.
  • Rich data structures — features are often hashes, sorted sets (percentile distributions), or lists (recent transaction sequences).

Why KvRocks Works Here

A 200 GB feature store in Redis costs approximately $5,000–$15,000/month in managed cloud RAM (r6g.4xlarge class). In KvRocks on NVMe, the same dataset fits on commodity hardware costing a fraction of that.

The read latency profile is acceptable: 20–100 parallel HGET calls at 1–3ms each (block cache warm) satisfy the 5–15ms total budget. The pipeline uses PIPELINE or MGET to batch reads:

import redis  # KvRocks is wire-compatible

client = redis.Redis(host='kvrocks-host', port=6380)

def fetch_features(pan_hash: str, merchant_id: str, device_fp: str) -> dict:
    pipe = client.pipeline(transaction=False)
    pipe.hgetall(f"card:{pan_hash}:features")
    pipe.hgetall(f"merchant:{merchant_id}:features")
    pipe.hgetall(f"device:{device_fp}:features")
    results = pipe.execute()
    return {
        "card": results[0],
        "merchant": results[1],
        "device": results[2],
    }

Feature update writes from the streaming pipeline:

def update_card_features(pan_hash: str, features: dict, ttl_seconds: int = 7776000):
    key = f"card:{pan_hash}:features"
    pipe = client.pipeline()
    pipe.hset(key, mapping=features)
    pipe.expire(key, ttl_seconds)
    pipe.execute()

Block cache tuning is critical here. Hot features (recently active cards/merchants) must stay in block cache. Size the block cache to cover your hot working set — typically 10–20% of total feature store size:

rocksdb.block_cache_size    20480   # 20GB block cache for 200GB feature store

Use Case 3: Durable Event / Message Queue

The Problem

Redis Streams and LIST-based queues are widely used for lightweight message passing — webhook delivery queues, payment event buses, notification pipelines. The problem is durability. Under load:

  • AOF rewrite pauses can cause producer backpressure and consumer lag.
  • appendfsync everysec loses up to one second of messages on hard failure.
  • appendfsync always imposes per-command fsync overhead that collapses throughput.
  • RDB snapshots create fork-induced latency spikes.

For non-critical queues (cache warming, analytics events), this is acceptable. For payment event queues — settlement notifications, chargeback webhooks, compliance audit events — losing messages has regulatory and financial consequences.

Why KvRocks is the Right Primary Store

KvRocks implements Redis Streams (XADD, XREAD, XREADGROUP, XACK, XPENDING) with RocksDB persistence underneath. Every XADD is durably written to the WAL before acknowledgement. No fsync tuning required. No AOF rewrite storms.

# Producer — payment settlement event
client.xadd(
    "stream:settlement:events",
    {
        "transaction_id": "txn_abc123",
        "amount": "149.99",
        "currency": "GHS",
        "merchant_id": "mid_xyz",
        "status": "settled",
        "timestamp": "2025-04-25T14:32:00Z",
    },
    maxlen=5_000_000,  # cap stream length
)

# Consumer group — settlement reconciliation worker
client.xgroup_create("stream:settlement:events", "reconciliation-workers", id="0", mkstream=True)

messages = client.xreadgroup(
    groupname="reconciliation-workers",
    consumername="worker-1",
    streams={"stream:settlement:events": ">"},
    count=100,
    block=5000,
)

for stream, entries in messages:
    for msg_id, fields in entries:
        process_settlement(fields)
        client.xack("stream:settlement:events", "reconciliation-workers", msg_id)

KvRocks's XPENDING and dead-letter handling work identically to Redis Streams, giving you full consumer group semantics with structural durability.

Operational advantage: Stream data compresses well under Zstd. A stream of 5 million JSON payment events that would occupy ~5 GB in Redis RAM occupies ~500 MB–1 GB on KvRocks NVMe storage.


Use Case 4: Configuration and Reference Data Store

The Problem

Payment gateways, API platforms, and SaaS products maintain large volumes of configuration data that must be:

  • Always available — a missing merchant config causes failed transactions.
  • Consistent — all application instances must see the same config.
  • Large — thousands to millions of entities, each with deep configuration hashes.
  • Occasionally written — config changes are infrequent relative to reads.
  • Durable — losing config data requires manual reconstruction from source systems.

Typical examples in a payment gateway:

  • Per-merchant routing configuration (acquirer selection rules, fallback chains)
  • BIN table (Bank Identification Number → issuer metadata, ~500,000 entries)
  • Currency/FX rate tables with validity windows
  • 3DS ACS URL and version configuration per card range
  • Velocity rule sets per merchant category code

Why This is a Poor Fit for Redis

A complete BIN table with issuer metadata (~500,000 hashes, ~1 KB each) occupies ~500 MB in Redis RAM. Multiplied across a dozen such reference tables, you have several GB of RAM locked into static data that changes a few times per day. This is expensive RAM with a poor utilisation profile.

Why KvRocks Works as the Primary Store

Reference data is an ideal KvRocks workload because:

  • The access pattern is overwhelmingly read-heavy with predictable hot keys (top merchants, common BIN prefixes). Block cache handles this efficiently.
  • Write throughput requirements are minimal (config changes are infrequent).
  • Compression ratios on structured JSON/msgpack config hashes are excellent.
  • Durability is required but low-urgency — a config change written to KvRocks persists through any crash.
# Loading BIN table entry
def get_bin_metadata(bin_prefix: str) -> dict:
    raw = client.hgetall(f"bin:{bin_prefix}")
    return {k.decode(): v.decode() for k, v in raw.items()}

# Writing merchant routing config
def set_merchant_config(merchant_id: str, config: dict):
    client.hset(f"merchant:config:{merchant_id}", mapping=config)
    # No TTL — config is permanent until explicitly updated

# Atomic config update with version tracking
def update_merchant_config(merchant_id: str, updates: dict, version: int):
    pipe = client.pipeline()
    pipe.hset(f"merchant:config:{merchant_id}", mapping=updates)
    pipe.hset(f"merchant:config:{merchant_id}", "config_version", version)
    pipe.execute()

For application-layer caching on top of KvRocks, a short-lived local in-process cache (e.g. cachetools.TTLCache in Python) with a 30-second TTL gives you sub-millisecond reads for the hottest config keys with near-zero staleness risk.


Use Case 5: Session Store with Long Retention Windows

The Problem

Standard web session stores use Redis with short TTLs — 15 to 60 minutes. This is a well-understood use case. The problem arises in specific contexts:

  • Banking mobile apps — regulatory guidance in several jurisdictions (including UK FCA and Bank of Ghana) allows extended session validity for authenticated users in specific risk tiers, sometimes up to 24 hours with continuous activity.
  • Operator portals — back-office users in payment operations maintain long-running sessions.
  • API access tokens — OAuth refresh tokens may have 30–90 day validity windows.
  • MFA state — trusted device records may be retained for 30–90 days.

When session TTLs extend beyond a few hours and the session payload grows beyond a simple token (e.g. storing entitlements, preferences, risk scores), keeping sessions in Redis RAM becomes expensive.

Why KvRocks is the Right Primary Store

Session data has exactly the access pattern KvRocks handles well: sparse reads against a large key space (most sessions are idle at any moment), with bursts of activity for active users served from block cache.

import json
import uuid
from datetime import timedelta

SESSION_TTL = int(timedelta(hours=24).total_seconds())

def create_session(user_id: str, entitlements: list, risk_tier: str) -> str:
    session_id = str(uuid.uuid4())
    payload = {
        "user_id": user_id,
        "entitlements": entitlements,
        "risk_tier": risk_tier,
        "created_at": "2025-04-25T00:00:00Z",
    }
    client.set(f"session:{session_id}", json.dumps(payload), ex=SESSION_TTL)
    return session_id

def get_session(session_id: str) -> dict | None:
    raw = client.get(f"session:{session_id}")
    if raw is None:
        return None
    # Sliding expiry — refresh TTL on access
    client.expire(f"session:{session_id}", SESSION_TTL)
    return json.loads(raw)

def invalidate_session(session_id: str):
    client.delete(f"session:{session_id}")

No application change from a Redis-backed session store. The gain is dataset size: 1 million concurrent 24-hour sessions at 4 KB each is 4 GB in Redis RAM. In KvRocks with Snappy compression, the same dataset occupies ~800 MB–1.5 GB on NVMe.


Use Case 6: Time-Series Metrics and Telemetry Buffer

The Problem

Observability pipelines, IoT telemetry collectors, and financial transaction monitoring systems often need a fast write buffer for time-series data before it lands in a purpose-built time-series database (InfluxDB, TimescaleDB, Prometheus remote storage). Requirements:

  • Very high write throughput.
  • Moderate read throughput (dashboards, alerting queries over recent windows).
  • Retention of hours to days of raw data.
  • Durability — lost telemetry creates gaps in compliance audit trails.

KvRocks as a Telemetry Buffer

Sorted sets provide a natural time-series primitive, with Unix timestamps as scores:

import time

def record_metric(entity_id: str, metric_name: str, value: float):
    key = f"metrics:{entity_id}:{metric_name}"
    ts = time.time()
    client.zadd(key, {f"{ts}:{value}": ts})
    # Trim to retain only the last 24 hours
    cutoff = ts - 86400
    client.zremrangebyscore(key, "-inf", cutoff)

def query_metric_range(entity_id: str, metric_name: str,
                       start_ts: float, end_ts: float) -> list[tuple]:
    key = f"metrics:{entity_id}:{metric_name}"
    raw = client.zrangebyscore(key, start_ts, end_ts, withscores=True)
    return [(member.decode().split(":")[1], score) for member, score in raw]

For a payment gateway, this pattern suits real-time merchant transaction rate monitoring, acquirer response time tracking, and fraud score distribution tracking — all of which need durable, queryable recent history without the cost of RAM-bound storage.


Operational Considerations When Using KvRocks as a Primary Store

Replication

KvRocks supports primary-replica replication using a Redis-compatible replication protocol. For production primary store use:

# On replica
replicaof kvrocks-primary 6380
replica-read-only yes

Unlike Redis, replication does not require the primary to fork and produce an RDB snapshot. KvRocks streams SSTable files during initial sync, which is significantly more efficient for large datasets.

Cluster Mode

KvRocks supports cluster mode with hash-slot-based sharding, compatible with Redis cluster clients. For large idempotency stores, feature stores, or session stores, horizontal sharding is the natural scaling path.

from redis.cluster import RedisCluster

client = RedisCluster(
    startup_nodes=[
        {"host": "kvrocks-node-1", "port": 6380},
        {"host": "kvrocks-node-2", "port": 6380},
        {"host": "kvrocks-node-3", "port": 6380},
    ],
    decode_responses=True,
)

Backup Strategy

Because KvRocks data is on disk, backup is straightforward — snapshot the RocksDB data directory. KvRocks exposes a BGSAVE-equivalent DEBUG SLEEP and checkpoint mechanism:

# Trigger a RocksDB checkpoint
redis-cli -p 6380 BGSAVE

# Or directly snapshot the data directory with rsync
rsync -av --checksum /var/lib/kvrocks/data/ backup-host:/backups/kvrocks/$(date +%Y%m%d)/

Monitoring

KvRocks exposes INFO output compatible with Redis monitoring tools. Key metrics to watch for primary store use:

redis-cli -p 6380 INFO stats | grep -E 'used_memory|rdb_|aof_|keyspace_hits|keyspace_misses'
redis-cli -p 6380 INFO keyspace

For RocksDB-specific metrics (compaction lag, write stall, block cache hit rate):

redis-cli -p 6380 INFO rocksdb

The block cache hit rate is the single most important metric for primary store workloads. A hit rate below 80% signals that the block cache is undersized for your working set.


Where KvRocks Should NOT Be Your Primary Store

Honesty about limitations is more useful than sales material.

Sub-millisecond hot path operations. If your use case has a strict < 500µs read SLA (card authorisation BIN lookups, real-time rate limiting counters on the critical path), Redis/Valkey remains the correct choice. KvRocks cannot guarantee block cache hit latency below ~200µs under contention.

ACID multi-key transactions. KvRocks inherits Redis's transaction model — MULTI/EXEC is a pipeline with no rollback on logic failure. If your workload requires atomic reserve-debit-credit sequences with rollback guarantees, use Tarantool or PostgreSQL.

Complex queries and aggregations. KvRocks has no SQL layer. Anything requiring range scans, aggregations, or joins across key spaces belongs in a relational or columnar store.

Extremely write-heavy workloads with large value sizes. RocksDB write amplification under heavy compaction pressure can cause write stalls. For workloads writing multi-megabyte values at sustained high throughput, benchmark carefully before committing.


Summary: When KvRocks Earns a Primary Role

Use Case Key Driver Fits?
Idempotency key store (30–90 day retention) Durability + large dataset ✅ Strong fit
Fraud feature store (50–500 GB) Cost reduction + durability ✅ Strong fit
Durable payment event queue Structural durability ✅ Strong fit
Merchant / reference config store Durability + large hashes ✅ Strong fit
Long-retention session store Dataset size + durability ✅ Strong fit
Time-series telemetry buffer High write throughput + durability ✅ Good fit
Sub-millisecond authorisation cache Latency requirements ❌ Use Redis
ACID settlement sequences Transaction semantics ❌ Use Tarantool
AML velocity SQL aggregations Distributed compute ❌ Use GridGain

The unifying theme across the strong fits is the same: you need Redis semantics, you need genuine durability, and your dataset is larger than you want to hold in RAM. KvRocks is the only system that satisfies all three simultaneously. The moment you find yourself sizing Redis instances around dataset volume rather than throughput, or engineering AOF flush strategies to avoid data loss, KvRocks deserves a serious evaluation as the primary store for that workload.



Thursday, 27 November 2025

CogDB: A Comprehensive Guide to Python's Graph Database Solution

 

Introduction

CogDB is a high-performance, in-memory graph database written in Python that provides a simple yet powerful API for working with graph data structures. Unlike traditional relational databases that store data in tables, CogDB represents information as nodes and edges, making it ideal for applications involving complex relationships, social networks, recommendation systems, and knowledge graphs.

In this comprehensive guide, we'll explore CogDB from the ground up, covering its core concepts, installation, basic operations, and advanced features with practical code examples.

What is CogDB?

CogDB is designed to be a lightweight, fast, and easy-to-use graph database that runs entirely in memory. It's particularly well-suited for:

  • Social network analysis
  • Recommendation engines
  • Knowledge representation
  • Fraud detection systems
  • Network topology analysis
  • Dependency tracking

Key Features

  • In-memory storage: Lightning-fast query performance
  • Simple API: Intuitive Python interface
  • Flexible schema: No rigid structure requirements
  • Efficient traversals: Optimized graph algorithms
  • Thread-safe operations: Suitable for concurrent applications

Installation and Setup

pip install cogdb

For development or the latest features:

pip install git+https://github.com/arun1729/cog.git

Core Concepts

Nodes and Edges

In CogDB, data is represented as:

  • Nodes: Entities that can have properties (key-value pairs)
  • Edges: Relationships between nodes, which can also have properties

Graph Structure

from cog import Cog

# Initialize a new graph database
db = Cog()

# Create nodes
user1 = db.create_node("User", {"name": "Alice", "age": 30})
user2 = db.create_node("User", {"name": "Bob", "age": 25})
product = db.create_node("Product", {"name": "Laptop", "price": 999.99})

# Create relationships
purchase_edge = db.create_edge(user1, "PURCHASED", product, {"date": "2024-01-15", "quantity": 1})
friendship_edge = db.create_edge(user1, "FRIENDS_WITH", user2, {"since": "2020-03-10"})

Basic Operations

Creating and Managing Nodes

from cog import Cog
import datetime

db = Cog()

# Create nodes with different types and properties
person = db.create_node("Person", {
    "name": "John Doe",
    "email": "john@example.com",
    "created_at": datetime.datetime.now().isoformat()
})

company = db.create_node("Company", {
    "name": "TechCorp",
    "industry": "Technology",
    "founded": 2010,
    "employees": 500
})

# Access node properties
print(f"Person: {person['name']}")  # Person: John Doe
print(f"Company: {company['name']}")  # Company: TechCorp

# Update node properties
person["age"] = 35
person["location"] = "San Francisco"

# Get node by ID
node_id = person.id
retrieved_node = db.get_node(node_id)

Working with Edges

# Create different types of relationships
works_at = db.create_edge(person, "WORKS_AT", company, {
    "position": "Software Engineer",
    "start_date": "2023-01-01",
    "salary": 120000
})

# Create bidirectional relationship
friend1 = db.create_node("Person", {"name": "Alice"})
friend2 = db.create_node("Person", {"name": "Bob"})

# Create friendship (bidirectional)
friendship1 = db.create_edge(friend1, "FRIENDS_WITH", friend2)
friendship2 = db.create_edge(friend2, "FRIENDS_WITH", friend1)

# Access edge properties
print(f"Position: {works_at['position']}")
print(f"Start date: {works_at['start_date']}")

Querying Data

# Find nodes by label
all_persons = db.get_nodes("Person")
all_companies = db.get_nodes("Company")

# Find nodes with specific properties
tech_companies = [node for node in db.get_nodes("Company") 
                 if node.get("industry") == "Technology"]

# Find edges by relationship type
work_relationships = db.get_edges("WORKS_AT")
friendships = db.get_edges("FRIENDS_WITH")

# Get neighbors of a node
person_neighbors = db.get_neighbors(person)
company_neighbors = db.get_neighbors(company)

Advanced Querying and Traversals

Graph Traversal Patterns

def find_colleagues(db, person_node):
    """Find all colleagues of a person (people working at the same company)"""
    colleagues = []
    
    # Get companies where the person works
    work_edges = [edge for edge in db.get_edges("WORKS_AT") 
                  if edge.source == person_node]
    
    for work_edge in work_edges:
        company = work_edge.target
        # Find other people working at the same company
        company_edges = [edge for edge in db.get_edges("WORKS_AT") 
                        if edge.target == company and edge.source != person_node]
        
        for edge in company_edges:
            colleagues.append(edge.source)
    
    return colleagues

def find_mutual_friends(db, person1, person2):
    """Find mutual friends between two people"""
    person1_friends = set()
    person2_friends = set()
    
    # Get friends of person1
    for edge in db.get_edges("FRIENDS_WITH"):
        if edge.source == person1:
            person1_friends.add(edge.target)
    
    # Get friends of person2
    for edge in db.get_edges("FRIENDS_WITH"):
        if edge.source == person2:
            person2_friends.add(edge.target)
    
    return person1_friends.intersection(person2_friends)

# Usage examples
alice = db.create_node("Person", {"name": "Alice"})
bob = db.create_node("Person", {"name": "Bob"})
charlie = db.create_node("Person", {"name": "Charlie"})

# Create friendship network
db.create_edge(alice, "FRIENDS_WITH", charlie)
db.create_edge(bob, "FRIENDS_WITH", charlie)

mutual_friends = find_mutual_friends(db, alice, bob)
print(f"Mutual friends: {[friend['name'] for friend in mutual_friends]}")

Pathfinding Algorithms

def find_shortest_path(db, start_node, end_node, relationship_type=None):
    """Find shortest path between two nodes using BFS"""
    from collections import deque
    
    if start_node == end_node:
        return [start_node]
    
    queue = deque([(start_node, [start_node])])
    visited = {start_node}
    
    while queue:
        current_node, path = queue.popleft()
        
        # Get all edges from current node
        edges = [edge for edge in db.edges 
                if edge.source == current_node and 
                (relationship_type is None or edge.relationship == relationship_type)]
        
        for edge in edges:
            neighbor = edge.target
            
            if neighbor == end_node:
                return path + [neighbor]
            
            if neighbor not in visited:
                visited.add(neighbor)
                queue.append((neighbor, path + [neighbor]))
    
    return None  # No path found

def calculate_node_centrality(db, node):
    """Calculate simple degree centrality for a node"""
    incoming_edges = [edge for edge in db.edges if edge.target == node]
    outgoing_edges = [edge for edge in db.edges if edge.source == node]
    
    return {
        "in_degree": len(incoming_edges),
        "out_degree": len(outgoing_edges),
        "total_degree": len(incoming_edges) + len(outgoing_edges)
    }

Real-World Example: Social Network Analysis

Let's build a complete social network analysis system:

class SocialNetworkAnalyzer:
    def __init__(self):
        self.db = Cog()
        self.users = {}
    
    def add_user(self, username, profile_data):
        """Add a new user to the social network"""
        user_node = self.db.create_node("User", {
            "username": username,
            **profile_data,
            "created_at": datetime.datetime.now().isoformat()
        })
        self.users[username] = user_node
        return user_node
    
    def add_friendship(self, username1, username2):
        """Create a bidirectional friendship"""
        user1 = self.users.get(username1)
        user2 = self.users.get(username2)
        
        if not user1 or not user2:
            raise ValueError("One or both users not found")
        
        # Create bidirectional friendship
        self.db.create_edge(user1, "FRIENDS_WITH", user2, {
            "created_at": datetime.datetime.now().isoformat()
        })
        self.db.create_edge(user2, "FRIENDS_WITH", user1, {
            "created_at": datetime.datetime.now().isoformat()
        })
    
    def add_post(self, username, content, tags=None):
        """Add a post by a user"""
        user = self.users.get(username)
        if not user:
            raise ValueError("User not found")
        
        post_node = self.db.create_node("Post", {
            "content": content,
            "tags": tags or [],
            "created_at": datetime.datetime.now().isoformat(),
            "likes": 0
        })
        
        self.db.create_edge(user, "POSTED", post_node)
        return post_node
    
    def like_post(self, username, post_node):
        """User likes a post"""
        user = self.users.get(username)
        if not user:
            raise ValueError("User not found")
        
        # Check if already liked
        existing_like = any(
            edge.source == user and edge.target == post_node 
            for edge in self.db.get_edges("LIKED")
        )
        
        if not existing_like:
            self.db.create_edge(user, "LIKED", post_node)
            post_node["likes"] = post_node.get("likes", 0) + 1
    
    def get_user_feed(self, username, limit=10):
        """Get posts from user's friends"""
        user = self.users.get(username)
        if not user:
            return []
        
        # Get user's friends
        friends = []
        for edge in self.db.get_edges("FRIENDS_WITH"):
            if edge.source == user:
                friends.append(edge.target)
        
        # Get posts from friends
        feed_posts = []
        for friend in friends:
            for edge in self.db.get_edges("POSTED"):
                if edge.source == friend:
                    post = edge.target
                    feed_posts.append({
                        "author": friend["username"],
                        "content": post["content"],
                        "created_at": post["created_at"],
                        "likes": post.get("likes", 0)
                    })
        
        # Sort by creation time (most recent first)
        feed_posts.sort(key=lambda x: x["created_at"], reverse=True)
        return feed_posts[:limit]
    
    def get_mutual_friends(self, username1, username2):
        """Find mutual friends between two users"""
        user1 = self.users.get(username1)
        user2 = self.users.get(username2)
        
        if not user1 or not user2:
            return []
        
        user1_friends = set()
        user2_friends = set()
        
        # Get friends of both users
        for edge in self.db.get_edges("FRIENDS_WITH"):
            if edge.source == user1:
                user1_friends.add(edge.target)
            elif edge.source == user2:
                user2_friends.add(edge.target)
        
        mutual = user1_friends.intersection(user2_friends)
        return [friend["username"] for friend in mutual]
    
    def get_popular_posts(self, limit=10):
        """Get most liked posts"""
        posts = []
        for post_node in self.db.get_nodes("Post"):
            # Find the author
            author = None
            for edge in self.db.get_edges("POSTED"):
                if edge.target == post_node:
                    author = edge.source["username"]
                    break
            
            posts.append({
                "author": author,
                "content": post_node["content"],
                "likes": post_node.get("likes", 0),
                "created_at": post_node["created_at"]
            })
        
        posts.sort(key=lambda x: x["likes"], reverse=True)
        return posts[:limit]

# Usage example
network = SocialNetworkAnalyzer()

# Add users
network.add_user("alice", {"name": "Alice Johnson", "age": 28, "city": "New York"})
network.add_user("bob", {"name": "Bob Smith", "age": 32, "city": "San Francisco"})
network.add_user("charlie", {"name": "Charlie Brown", "age": 25, "city": "Chicago"})

# Create friendships
network.add_friendship("alice", "bob")
network.add_friendship("bob", "charlie")
network.add_friendship("alice", "charlie")

# Add posts
post1 = network.add_post("alice", "Just visited the Metropolitan Museum!", ["art", "nyc"])
post2 = network.add_post("bob", "Beautiful sunset in San Francisco today", ["sunset", "sf"])
post3 = network.add_post("charlie", "Deep dish pizza is the best!", ["food", "chicago"])

# Like posts
network.like_post("bob", post1)
network.like_post("charlie", post1)
network.like_post("alice", post2)

# Get user feed
alice_feed = network.get_user_feed("alice")
print("Alice's feed:")
for post in alice_feed:
    print(f"  {post['author']}: {post['content']} ({post['likes']} likes)")

# Get mutual friends
mutual = network.get_mutual_friends("alice", "charlie")
print(f"Mutual friends of Alice and Charlie: {mutual}")

# Get popular posts
popular = network.get_popular_posts(5)
print("Popular posts:")
for post in popular:
    print(f"  {post['author']}: {post['content']} ({post['likes']} likes)")

Performance Optimization

Indexing Strategies

class IndexedCogDB:
    def __init__(self):
        self.db = Cog()
        self.node_indices = {}  # property -> value -> [nodes]
        self.edge_indices = {}  # relationship_type -> [edges]
    
    def create_node_index(self, property_name):
        """Create an index on a node property"""
        if property_name not in self.node_indices:
            self.node_indices[property_name] = {}
            
            # Index existing nodes
            for node in self.db.nodes:
                if property_name in node:
                    value = node[property_name]
                    if value not in self.node_indices[property_name]:
                        self.node_indices[property_name][value] = []
                    self.node_indices[property_name][value].append(node)
    
    def find_nodes_by_property(self, property_name, value):
        """Fast lookup using index"""
        if property_name in self.node_indices:
            return self.node_indices[property_name].get(value, [])
        else:
            # Fallback to linear search
            return [node for node in self.db.nodes 
                   if node.get(property_name) == value]
    
    def create_node(self, label, properties):
        """Create node and update indices"""
        node = self.db.create_node(label, properties)
        
        # Update indices
        for prop_name, prop_value in properties.items():
            if prop_name in self.node_indices:
                if prop_value not in self.node_indices[prop_name]:
                    self.node_indices[prop_name][prop_value] = []
                self.node_indices[prop_name][prop_value].append(node)
        
        return node

Memory Management

def cleanup_orphaned_nodes(db):
    """Remove nodes that have no edges"""
    orphaned_nodes = []
    
    for node in db.nodes:
        has_edges = any(
            edge.source == node or edge.target == node 
            for edge in db.edges
        )
        
        if not has_edges:
            orphaned_nodes.append(node)
    
    for node in orphaned_nodes:
        db.delete_node(node.id)
    
    return len(orphaned_nodes)

def get_memory_stats(db):
    """Get memory usage statistics"""
    return {
        "nodes": len(db.nodes),
        "edges": len(db.edges),
        "avg_node_properties": sum(len(node) for node in db.nodes) / len(db.nodes) if db.nodes else 0,
        "avg_edge_properties": sum(len(edge) for edge in db.edges) / len(db.edges) if db.edges else 0
    }

Best Practices and Tips

1. Design Efficient Graph Schemas

# Good: Specific relationship types
db.create_edge(user, "PURCHASED", product)
db.create_edge(user, "VIEWED", product)
db.create_edge(user, "REVIEWED", product)

# Avoid: Generic relationships
# db.create_edge(user, "RELATED_TO", product)  # Too vague

2. Use Appropriate Data Types

# Store dates as ISO strings for consistency
node["created_at"] = datetime.datetime.now().isoformat()

# Use lists for multiple values
node["tags"] = ["python", "database", "graph"]

# Use numbers for numeric calculations
node["price"] = 99.99
node["quantity"] = 5

3. Implement Proper Error Handling

def safe_node_creation(db, label, properties):
    """Safely create a node with validation"""
    try:
        # Validate required properties
        if not properties.get("id"):
            raise ValueError("Node must have an ID")
        
        # Check for duplicates
        existing = [node for node in db.get_nodes(label) 
                   if node.get("id") == properties["id"]]
        
        if existing:
            raise ValueError(f"Node with ID {properties['id']} already exists")
        
        return db.create_node(label, properties)
        
    except Exception as e:
        print(f"Error creating node: {e}")
        return None

Conclusion

CogDB provides a powerful and intuitive way to work with graph data in Python. Its in-memory architecture makes it ideal for applications that require fast graph traversals and complex relationship queries. The examples in this guide demonstrate how to:

  • Set up and configure CogDB
  • Perform basic CRUD operations
  • Implement complex graph algorithms
  • Build real-world applications like social networks
  • Optimize performance with indexing
  • Follow best practices for graph database design

Whether you're building recommendation systems, analyzing social networks, or working with knowledge graphs, CogDB offers the flexibility and performance needed for modern graph-based applications. As your data grows, consider implementing the optimization strategies discussed here to maintain performance and manage memory efficiently.

The key to success with CogDB is understanding your data relationships and designing your graph schema to support your most common query patterns. Start simple, measure performance, and optimize based on your specific use cases.

Friday, 31 October 2025

Implementing Data Vault 2.0 Warehouses with PostgreSQL

 

Introduction

Data Vault 2.0 is a modern data warehousing methodology designed to handle the challenges of today's agile, complex data environments. Combining the flexibility of the original Data Vault architecture with Big Data capabilities and real-time processing, it offers a robust framework for building enterprise data warehouses.

In this comprehensive guide, we'll walk through implementing a Data Vault 2.0 warehouse using PostgreSQL, covering core concepts, practical implementation steps, and best practices.

What is Data Vault 2.0?

Data Vault 2.0 is an evolution of the original Data Vault methodology created by Dan Linstedt. It's designed to be:

  • Agile and scalable: Adapts easily to changing business requirements
  • Auditable: Maintains complete historical tracking
  • Flexible: Accommodates multiple source systems without restructuring
  • Insert-only: No updates or deletes in core tables, preserving data lineage

Core Components

Data Vault 2.0 consists of three primary entity types:

  1. Hubs: Store unique business keys and metadata
  2. Links: Represent relationships between business keys
  3. Satellites: Contain descriptive attributes and context

Why PostgreSQL for Data Vault 2.0?

PostgreSQL is an excellent choice for Data Vault implementations due to:

  • ACID compliance: Ensures data integrity
  • Advanced indexing: B-tree, hash, GiST, and GIN indexes for performance
  • Partitioning: Native table partitioning for managing large datasets
  • JSON support: Handles semi-structured data alongside relational
  • Extensibility: Custom functions, operators, and data types
  • Cost-effective: Open-source with enterprise features

Step 1: Setting Up Your PostgreSQL Environment

Database Configuration

First, create a dedicated database for your Data Vault warehouse:

-- Create the Data Vault database
CREATE DATABASE datavault_dw
    WITH 
    ENCODING = 'UTF8'
    LC_COLLATE = 'en_US.UTF-8'
    LC_CTYPE = 'en_US.UTF-8'
    TEMPLATE = template0;

-- Connect to the database
\c datavault_dw

-- Create schemas for organizational separation
CREATE SCHEMA IF NOT EXISTS raw_vault;
CREATE SCHEMA IF NOT EXISTS business_vault;
CREATE SCHEMA IF NOT EXISTS information_mart;
CREATE SCHEMA IF NOT EXISTS staging;

-- Set search path
SET search_path TO raw_vault, public;

Performance Optimization Settings

Configure PostgreSQL for optimal Data Vault performance:

-- Adjust PostgreSQL configuration (add to postgresql.conf)
-- shared_buffers = 256MB  -- Adjust based on available RAM
-- effective_cache_size = 1GB
-- maintenance_work_mem = 64MB
-- checkpoint_completion_target = 0.9
-- wal_buffers = 16MB
-- default_statistics_target = 100
-- random_page_cost = 1.1  -- For SSDs

Step 2: Defining Standard Metadata Columns

Data Vault 2.0 requires specific metadata columns for audit and tracking:

-- Create a function to generate standard metadata columns
CREATE OR REPLACE FUNCTION raw_vault.get_metadata_columns()
RETURNS TABLE (
    column_definition TEXT
) AS $$
BEGIN
    RETURN QUERY SELECT unnest(ARRAY[
        'load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP',
        'record_source VARCHAR(50) NOT NULL',
        'load_batch_id BIGINT',
        'cdc_operation CHAR(1)' -- I=Insert, U=Update, D=Delete
    ]);
END;
$$ LANGUAGE plpgsql;

Step 3: Implementing Hubs

Hubs store business keys - the unique identifiers from source systems.

Hub Structure

-- Example: Customer Hub
CREATE TABLE raw_vault.hub_customer (
    customer_hkey CHAR(32) PRIMARY KEY,  -- MD5 hash of business key
    customer_id VARCHAR(50) NOT NULL,     -- Business key
    load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    record_source VARCHAR(50) NOT NULL,
    UNIQUE(customer_id)
);

CREATE INDEX idx_hub_customer_loaddate ON raw_vault.hub_customer(load_date);

-- Example: Product Hub
CREATE TABLE raw_vault.hub_product (
    product_hkey CHAR(32) PRIMARY KEY,
    product_code VARCHAR(50) NOT NULL,
    load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    record_source VARCHAR(50) NOT NULL,
    UNIQUE(product_code)
);

CREATE INDEX idx_hub_product_loaddate ON raw_vault.hub_product(load_date);

Hub Hash Key Generation

Create a function to generate consistent hash keys:

-- Hash key generation function
CREATE OR REPLACE FUNCTION raw_vault.generate_hash_key(
    p_business_keys TEXT[]
)
RETURNS CHAR(32) AS $$
DECLARE
    v_concatenated TEXT;
BEGIN
    -- Concatenate business keys with delimiter
    v_concatenated := array_to_string(p_business_keys, '||');
    
    -- Convert to uppercase and trim for consistency
    v_concatenated := UPPER(TRIM(v_concatenated));
    
    -- Return MD5 hash
    RETURN MD5(v_concatenated);
END;
$$ LANGUAGE plpgsql IMMUTABLE;

-- Usage example
SELECT raw_vault.generate_hash_key(ARRAY['CUST-12345']);

Step 4: Implementing Links

Links capture relationships between business entities.

Link Structure

-- Example: Customer-Product Order Link
CREATE TABLE raw_vault.link_customer_order_product (
    cop_link_hkey CHAR(32) PRIMARY KEY,
    customer_hkey CHAR(32) NOT NULL,
    order_hkey CHAR(32) NOT NULL,
    product_hkey CHAR(32) NOT NULL,
    load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    record_source VARCHAR(50) NOT NULL,
    FOREIGN KEY (customer_hkey) REFERENCES raw_vault.hub_customer(customer_hkey),
    FOREIGN KEY (product_hkey) REFERENCES raw_vault.hub_product(product_hkey)
);

CREATE INDEX idx_link_cop_customer ON raw_vault.link_customer_order_product(customer_hkey);
CREATE INDEX idx_link_cop_product ON raw_vault.link_customer_order_product(product_hkey);
CREATE INDEX idx_link_cop_loaddate ON raw_vault.link_customer_order_product(load_date);

-- Link hash key is generated from all participating hub keys
-- link_hkey = MD5(customer_hkey || order_hkey || product_hkey)

Creating Links with Hash Keys

-- Function to create link hash keys
CREATE OR REPLACE FUNCTION raw_vault.generate_link_hash_key(
    p_hub_keys TEXT[]
)
RETURNS CHAR(32) AS $$
DECLARE
    v_concatenated TEXT;
BEGIN
    -- Sort the hub keys for consistency
    v_concatenated := array_to_string(
        (SELECT array_agg(x ORDER BY x) FROM unnest(p_hub_keys) x),
        '||'
    );
    
    RETURN MD5(v_concatenated);
END;
$$ LANGUAGE plpgsql IMMUTABLE;

Step 5: Implementing Satellites

Satellites contain descriptive attributes and track changes over time.

Satellite Structure

-- Example: Customer Satellite (descriptive data)
CREATE TABLE raw_vault.sat_customer_details (
    customer_hkey CHAR(32) NOT NULL,
    load_date TIMESTAMP NOT NULL,
    load_end_date TIMESTAMP,
    record_source VARCHAR(50) NOT NULL,
    hash_diff CHAR(32) NOT NULL,  -- Hash of all descriptive attributes
    -- Descriptive attributes
    customer_name VARCHAR(200),
    email VARCHAR(100),
    phone VARCHAR(20),
    address VARCHAR(500),
    city VARCHAR(100),
    state VARCHAR(50),
    zip_code VARCHAR(20),
    country VARCHAR(100),
    PRIMARY KEY (customer_hkey, load_date),
    FOREIGN KEY (customer_hkey) REFERENCES raw_vault.hub_customer(customer_hkey)
);

CREATE INDEX idx_sat_customer_details_hkey ON raw_vault.sat_customer_details(customer_hkey);
CREATE INDEX idx_sat_customer_details_loaddate ON raw_vault.sat_customer_details(load_date);
CREATE INDEX idx_sat_customer_details_hashdiff ON raw_vault.sat_customer_details(hash_diff);

-- Example: Link Satellite (contextual data about relationship)
CREATE TABLE raw_vault.sat_customer_order_product (
    cop_link_hkey CHAR(32) NOT NULL,
    load_date TIMESTAMP NOT NULL,
    load_end_date TIMESTAMP,
    record_source VARCHAR(50) NOT NULL,
    hash_diff CHAR(32) NOT NULL,
    -- Contextual attributes
    order_date DATE,
    quantity INTEGER,
    unit_price NUMERIC(10,2),
    total_amount NUMERIC(12,2),
    discount_percent NUMERIC(5,2),
    status VARCHAR(50),
    PRIMARY KEY (cop_link_hkey, load_date),
    FOREIGN KEY (cop_link_hkey) REFERENCES raw_vault.link_customer_order_product(cop_link_hkey)
);

CREATE INDEX idx_sat_cop_link ON raw_vault.sat_customer_order_product(cop_link_hkey);
CREATE INDEX idx_sat_cop_loaddate ON raw_vault.sat_customer_order_product(load_date);

Hash Diff Generation

The hash diff detects changes in descriptive attributes:

-- Function to generate hash diff
CREATE OR REPLACE FUNCTION raw_vault.generate_hash_diff(
    p_columns TEXT[]
)
RETURNS CHAR(32) AS $$
DECLARE
    v_concatenated TEXT;
BEGIN
    -- Concatenate all column values
    v_concatenated := array_to_string(p_columns, '||');
    
    -- Handle NULLs consistently
    v_concatenated := COALESCE(v_concatenated, '');
    
    -- Convert to uppercase for consistency
    v_concatenated := UPPER(v_concatenated);
    
    RETURN MD5(v_concatenated);
END;
$$ LANGUAGE plpgsql IMMUTABLE;

Step 6: Loading Data into Data Vault

Staging Area

Create a staging layer for incoming data:

-- Staging table for customer data
CREATE TABLE staging.stg_customer (
    customer_id VARCHAR(50),
    customer_name VARCHAR(200),
    email VARCHAR(100),
    phone VARCHAR(20),
    address VARCHAR(500),
    city VARCHAR(100),
    state VARCHAR(50),
    zip_code VARCHAR(20),
    country VARCHAR(100),
    load_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    source_system VARCHAR(50)
);

Loading Hubs

-- Load Hub from staging
INSERT INTO raw_vault.hub_customer (
    customer_hkey,
    customer_id,
    load_date,
    record_source
)
SELECT DISTINCT
    raw_vault.generate_hash_key(ARRAY[customer_id]),
    customer_id,
    CURRENT_TIMESTAMP,
    source_system
FROM staging.stg_customer stg
WHERE NOT EXISTS (
    SELECT 1 
    FROM raw_vault.hub_customer hub
    WHERE hub.customer_id = stg.customer_id
);

Loading Satellites with Change Detection

-- Load Satellite with change detection using hash diff
WITH new_records AS (
    SELECT
        raw_vault.generate_hash_key(ARRAY[stg.customer_id]) as customer_hkey,
        CURRENT_TIMESTAMP as load_date,
        stg.source_system as record_source,
        raw_vault.generate_hash_diff(ARRAY[
            COALESCE(stg.customer_name, ''),
            COALESCE(stg.email, ''),
            COALESCE(stg.phone, ''),
            COALESCE(stg.address, ''),
            COALESCE(stg.city, ''),
            COALESCE(stg.state, ''),
            COALESCE(stg.zip_code, ''),
            COALESCE(stg.country, '')
        ]) as hash_diff,
        stg.customer_name,
        stg.email,
        stg.phone,
        stg.address,
        stg.city,
        stg.state,
        stg.zip_code,
        stg.country
    FROM staging.stg_customer stg
),
current_satellites AS (
    SELECT DISTINCT ON (customer_hkey)
        customer_hkey,
        hash_diff
    FROM raw_vault.sat_customer_details
    ORDER BY customer_hkey, load_date DESC
)
INSERT INTO raw_vault.sat_customer_details (
    customer_hkey,
    load_date,
    record_source,
    hash_diff,
    customer_name,
    email,
    phone,
    address,
    city,
    state,
    zip_code,
    country
)
SELECT
    nr.customer_hkey,
    nr.load_date,
    nr.record_source,
    nr.hash_diff,
    nr.customer_name,
    nr.email,
    nr.phone,
    nr.address,
    nr.city,
    nr.state,
    nr.zip_code,
    nr.country
FROM new_records nr
LEFT JOIN current_satellites cs
    ON nr.customer_hkey = cs.customer_hkey
WHERE cs.customer_hkey IS NULL  -- New customer
   OR cs.hash_diff != nr.hash_diff;  -- Changed data

Loading Links

-- Load Link table
INSERT INTO raw_vault.link_customer_order_product (
    cop_link_hkey,
    customer_hkey,
    order_hkey,
    product_hkey,
    load_date,
    record_source
)
SELECT DISTINCT
    raw_vault.generate_link_hash_key(ARRAY[
        raw_vault.generate_hash_key(ARRAY[stg.customer_id]),
        raw_vault.generate_hash_key(ARRAY[stg.order_id]),
        raw_vault.generate_hash_key(ARRAY[stg.product_code])
    ]),
    raw_vault.generate_hash_key(ARRAY[stg.customer_id]),
    raw_vault.generate_hash_key(ARRAY[stg.order_id]),
    raw_vault.generate_hash_key(ARRAY[stg.product_code]),
    CURRENT_TIMESTAMP,
    stg.source_system
FROM staging.stg_order_details stg
WHERE NOT EXISTS (
    SELECT 1
    FROM raw_vault.link_customer_order_product lnk
    WHERE lnk.cop_link_hkey = raw_vault.generate_link_hash_key(ARRAY[
        raw_vault.generate_hash_key(ARRAY[stg.customer_id]),
        raw_vault.generate_hash_key(ARRAY[stg.order_id]),
        raw_vault.generate_hash_key(ARRAY[stg.product_code])
    ])
);

Step 7: Creating Business Vault Objects

Business Vault extends the Raw Vault with computed attributes and business rules.

Business Vault Example

-- Business vault schema
CREATE SCHEMA IF NOT EXISTS business_vault;

-- Computed satellite example: Customer lifetime value
CREATE TABLE business_vault.sat_customer_metrics (
    customer_hkey CHAR(32) NOT NULL,
    load_date TIMESTAMP NOT NULL,
    load_end_date TIMESTAMP,
    -- Computed metrics
    total_orders INTEGER,
    total_spent NUMERIC(15,2),
    average_order_value NUMERIC(12,2),
    lifetime_value NUMERIC(15,2),
    customer_segment VARCHAR(50),
    PRIMARY KEY (customer_hkey, load_date)
);

-- Population query
INSERT INTO business_vault.sat_customer_metrics
SELECT
    h.customer_hkey,
    CURRENT_TIMESTAMP as load_date,
    NULL as load_end_date,
    COUNT(DISTINCT l.order_hkey) as total_orders,
    SUM(s.total_amount) as total_spent,
    AVG(s.total_amount) as average_order_value,
    SUM(s.total_amount) as lifetime_value,
    CASE
        WHEN SUM(s.total_amount) >= 10000 THEN 'Premium'
        WHEN SUM(s.total_amount) >= 5000 THEN 'Gold'
        WHEN SUM(s.total_amount) >= 1000 THEN 'Silver'
        ELSE 'Bronze'
    END as customer_segment
FROM raw_vault.hub_customer h
LEFT JOIN raw_vault.link_customer_order_product l
    ON h.customer_hkey = l.customer_hkey
LEFT JOIN raw_vault.sat_customer_order_product s
    ON l.cop_link_hkey = s.cop_link_hkey
GROUP BY h.customer_hkey;

Step 8: Building Information Marts

Information Marts are dimensional models built on top of the Data Vault for reporting.

Point-in-Time (PIT) Tables

PIT tables provide a snapshot of data at specific points in time:

-- Create PIT table
CREATE TABLE information_mart.pit_customer (
    customer_hkey CHAR(32) NOT NULL,
    snapshot_date DATE NOT NULL,
    sat_details_loaddate TIMESTAMP,
    sat_metrics_loaddate TIMESTAMP,
    PRIMARY KEY (customer_hkey, snapshot_date)
);

-- Populate PIT table
INSERT INTO information_mart.pit_customer
SELECT
    h.customer_hkey,
    d.snapshot_date,
    (SELECT MAX(load_date)
     FROM raw_vault.sat_customer_details s
     WHERE s.customer_hkey = h.customer_hkey
     AND s.load_date <= d.snapshot_date) as sat_details_loaddate,
    (SELECT MAX(load_date)
     FROM business_vault.sat_customer_metrics m
     WHERE m.customer_hkey = h.customer_hkey
     AND m.load_date <= d.snapshot_date) as sat_metrics_loaddate
FROM raw_vault.hub_customer h
CROSS JOIN (
    SELECT DISTINCT load_date::DATE as snapshot_date
    FROM raw_vault.sat_customer_details
) d;

Dimension Table

-- Customer dimension for reporting
CREATE TABLE information_mart.dim_customer (
    customer_key SERIAL PRIMARY KEY,
    customer_hkey CHAR(32) NOT NULL,
    customer_id VARCHAR(50),
    customer_name VARCHAR(200),
    email VARCHAR(100),
    phone VARCHAR(20),
    full_address TEXT,
    city VARCHAR(100),
    state VARCHAR(50),
    country VARCHAR(100),
    customer_segment VARCHAR(50),
    lifetime_value NUMERIC(15,2),
    valid_from TIMESTAMP,
    valid_to TIMESTAMP,
    is_current BOOLEAN
);

-- Populate dimension with current view
CREATE OR REPLACE VIEW information_mart.vw_dim_customer_current AS
SELECT
    h.customer_hkey,
    h.customer_id,
    sd.customer_name,
    sd.email,
    sd.phone,
    sd.address || ', ' || sd.city || ', ' || sd.state || ' ' || sd.zip_code as full_address,
    sd.city,
    sd.state,
    sd.country,
    bm.customer_segment,
    bm.lifetime_value,
    sd.load_date as valid_from,
    sd.load_end_date as valid_to,
    CASE WHEN sd.load_end_date IS NULL THEN TRUE ELSE FALSE END as is_current
FROM raw_vault.hub_customer h
INNER JOIN LATERAL (
    SELECT *
    FROM raw_vault.sat_customer_details s
    WHERE s.customer_hkey = h.customer_hkey
    ORDER BY s.load_date DESC
    LIMIT 1
) sd ON TRUE
LEFT JOIN LATERAL (
    SELECT *
    FROM business_vault.sat_customer_metrics m
    WHERE m.customer_hkey = h.customer_hkey
    ORDER BY m.load_date DESC
    LIMIT 1
) bm ON TRUE;

Step 9: Implementing Data Quality Rules

Data Quality Satellites

-- Data quality satellite
CREATE TABLE raw_vault.sat_customer_dq (
    customer_hkey CHAR(32) NOT NULL,
    load_date TIMESTAMP NOT NULL,
    record_source VARCHAR(50) NOT NULL,
    dq_score INTEGER,  -- 0-100
    email_valid BOOLEAN,
    phone_valid BOOLEAN,
    address_complete BOOLEAN,
    duplicate_suspect BOOLEAN,
    PRIMARY KEY (customer_hkey, load_date)
);

-- Data quality check function
CREATE OR REPLACE FUNCTION raw_vault.check_customer_data_quality(
    p_customer_hkey CHAR(32)
)
RETURNS TABLE (
    dq_score INTEGER,
    email_valid BOOLEAN,
    phone_valid BOOLEAN,
    address_complete BOOLEAN
) AS $$
BEGIN
    RETURN QUERY
    SELECT
        CASE
            WHEN sd.email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' THEN 25 ELSE 0
        END +
        CASE
            WHEN sd.phone ~* '^\+?[0-9]{10,15}$' THEN 25 ELSE 0
        END +
        CASE
            WHEN sd.address IS NOT NULL AND sd.city IS NOT NULL 
                 AND sd.state IS NOT NULL AND sd.zip_code IS NOT NULL THEN 50 ELSE 0
        END as dq_score,
        sd.email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' as email_valid,
        sd.phone ~* '^\+?[0-9]{10,15}$' as phone_valid,
        (sd.address IS NOT NULL AND sd.city IS NOT NULL 
         AND sd.state IS NOT NULL AND sd.zip_code IS NOT NULL) as address_complete
    FROM raw_vault.sat_customer_details sd
    WHERE sd.customer_hkey = p_customer_hkey
    ORDER BY sd.load_date DESC
    LIMIT 1;
END;
$$ LANGUAGE plpgsql;

Step 10: Performance Optimization

Partitioning Large Tables

-- Partition satellite by load date
CREATE TABLE raw_vault.sat_customer_details_partitioned (
    customer_hkey CHAR(32) NOT NULL,
    load_date TIMESTAMP NOT NULL,
    load_end_date TIMESTAMP,
    record_source VARCHAR(50) NOT NULL,
    hash_diff CHAR(32) NOT NULL,
    customer_name VARCHAR(200),
    email VARCHAR(100),
    phone VARCHAR(20),
    address VARCHAR(500),
    city VARCHAR(100),
    state VARCHAR(50),
    zip_code VARCHAR(20),
    country VARCHAR(100),
    PRIMARY KEY (customer_hkey, load_date)
) PARTITION BY RANGE (load_date);

-- Create partitions
CREATE TABLE raw_vault.sat_customer_details_2024_q1
    PARTITION OF raw_vault.sat_customer_details_partitioned
    FOR VALUES FROM ('2024-01-01') TO ('2024-04-01');

CREATE TABLE raw_vault.sat_customer_details_2024_q2
    PARTITION OF raw_vault.sat_customer_details_partitioned
    FOR VALUES FROM ('2024-04-01') TO ('2024-07-01');

-- Continue creating partitions as needed

Indexing Strategy

-- Composite indexes for common query patterns
CREATE INDEX idx_sat_customer_hkey_loaddate 
    ON raw_vault.sat_customer_details(customer_hkey, load_date DESC);

-- Partial index for current records
CREATE INDEX idx_sat_customer_current 
    ON raw_vault.sat_customer_details(customer_hkey) 
    WHERE load_end_date IS NULL;

-- BRIN index for large sequential tables
CREATE INDEX idx_sat_customer_loaddate_brin 
    ON raw_vault.sat_customer_details 
    USING BRIN(load_date);

Materialized Views

-- Materialized view for current state
CREATE MATERIALIZED VIEW information_mart.mv_customer_current AS
SELECT
    h.customer_hkey,
    h.customer_id,
    sd.customer_name,
    sd.email,
    sd.phone,
    sd.city,
    sd.state,
    sd.country,
    bm.customer_segment,
    bm.lifetime_value,
    bm.total_orders
FROM raw_vault.hub_customer h
INNER JOIN LATERAL (
    SELECT *
    FROM raw_vault.sat_customer_details s
    WHERE s.customer_hkey = h.customer_hkey
    ORDER BY s.load_date DESC
    LIMIT 1
) sd ON TRUE
LEFT JOIN LATERAL (
    SELECT *
    FROM business_vault.sat_customer_metrics m
    WHERE m.customer_hkey = h.customer_hkey
    ORDER BY m.load_date DESC
    LIMIT 1
) bm ON TRUE;

CREATE UNIQUE INDEX idx_mv_customer_current_hkey 
    ON information_mart.mv_customer_current(customer_hkey);

-- Refresh strategy
CREATE OR REPLACE FUNCTION information_mart.refresh_customer_current()
RETURNS void AS $$
BEGIN
    REFRESH MATERIALIZED VIEW CONCURRENTLY information_mart.mv_customer_current;
END;
$$ LANGUAGE plpgsql;

Step 11: Automation and Maintenance

Automated ETL Procedure

-- Master ETL procedure
CREATE OR REPLACE PROCEDURE raw_vault.execute_etl_load(
    p_batch_id BIGINT,
    p_source_system VARCHAR(50)
)
LANGUAGE plpgsql
AS $$
BEGIN
    -- Step 1: Load Hubs
    PERFORM raw_vault.load_customer_hub(p_batch_id, p_source_system);
    PERFORM raw_vault.load_product_hub(p_batch_id, p_source_system);
    
    -- Step 2: Load Links
    PERFORM raw_vault.load_customer_order_product_link(p_batch_id, p_source_system);
    
    -- Step 3: Load Satellites
    PERFORM raw_vault.load_customer_details_sat(p_batch_id, p_source_system);
    PERFORM raw_vault.load_order_details_sat(p_batch_id, p_source_system);
    
    -- Step 4: Update Business Vault
    PERFORM business_vault.calculate_customer_metrics(p_batch_id);
    
    -- Step 5: Refresh Information Marts
    PERFORM information_mart.refresh_customer_current();
    
    -- Log completion
    INSERT INTO raw_vault.etl_log (batch_id, status, completed_at)
    VALUES (p_batch_id, 'SUCCESS', CURRENT_TIMESTAMP);
    
    COMMIT;
EXCEPTION
    WHEN OTHERS THEN
        INSERT INTO raw_vault.etl_log (batch_id, status, error_message, completed_at)
        VALUES (p_batch_id, 'FAILED', SQLERRM, CURRENT_TIMESTAMP);
        ROLLBACK;
        RAISE;
END;
$$;

Monitoring and Logging

-- ETL logging table
CREATE TABLE raw_vault.etl_log (
    log_id SERIAL PRIMARY KEY,
    batch_id BIGINT NOT NULL,
    table_name VARCHAR(100),
    records_processed INTEGER,
    records_inserted INTEGER,
    records_updated INTEGER,
    status VARCHAR(20),
    error_message TEXT,
    started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    completed_at TIMESTAMP
);

CREATE INDEX idx_etl_log_batch ON raw_vault.etl_log(batch_id);
CREATE INDEX idx_etl_log_status ON raw_vault.etl_log(status);

Best Practices

1. Naming Conventions

Follow consistent naming patterns:

  • Hubs: hub_<business_entity>
  • Links: link_<entity1>_<entity2>_<entityN>
  • Satellites: sat_<parent>_<description>
  • Hash keys: <entity>_hkey or <entity>_link_hkey

2. Hash Key Generation

  • Always use deterministic hash functions
  • Include delimiters between concatenated values
  • Standardize case (uppercase) and trim whitespace
  • Handle NULLs consistently

3. Load Patterns

  • Use batch loading windows for large datasets
  • Implement incremental loads using change data capture (CDC)
  • Separate staging, loading, and transformation phases
  • Maintain idempotency in load processes

4. Data Quality

  • Implement data quality satellites
  • Validate business keys before loading
  • Monitor for duplicate keys
  • Track data lineage and source systems

5. Performance

  • Partition large tables by load_date
  • Use appropriate indexes (B-tree, BRIN, partial)
  • Implement materialized views for common queries
  • Consider columnar storage extensions for analytics

6. Security

-- Role-based access control
CREATE ROLE dv_read;
CREATE ROLE dv_write;
CREATE ROLE dv_admin;

-- Grant privileges
GRANT USAGE ON SCHEMA raw_vault TO dv_read;
GRANT SELECT ON ALL TABLES IN SCHEMA raw_vault TO dv_read;

GRANT USAGE ON SCHEMA raw_vault TO dv_write;
GRANT INSERT ON ALL TABLES IN SCHEMA raw_vault TO dv_write;

GRANT ALL PRIVILEGES ON SCHEMA raw_vault TO dv_admin;

Complete Example: End-to-End Implementation

Let's walk through a complete example implementing a customer order system:

-- 1. Create all Hubs
CREATE TABLE raw_vault.hub_customer (
    customer_hkey CHAR(32) PRIMARY KEY,
    customer_id VARCHAR(50) NOT NULL UNIQUE,
    load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    record_source VARCHAR(50) NOT NULL
);

CREATE TABLE raw_vault.hub_order (
    order_hkey CHAR(32) PRIMARY KEY,
    order_id VARCHAR(50) NOT NULL UNIQUE,
    load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    record_source VARCHAR(50) NOT NULL
);

CREATE TABLE raw_vault.hub_product (
    product_hkey CHAR(32) PRIMARY KEY,
    product_code VARCHAR(50) NOT NULL UNIQUE,
    load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    record_source VARCHAR(50) NOT NULL
);

-- 2. Create Links
CREATE TABLE raw_vault.link_customer_order (
    co_link_hkey CHAR(32) PRIMARY KEY,
    customer_hkey CHAR(32) NOT NULL,
    order_hkey CHAR(32) NOT NULL,
    load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    record_source VARCHAR(50) NOT NULL,
    FOREIGN KEY (customer_hkey) REFERENCES raw_vault.hub_customer(customer_hkey),
    FOREIGN KEY (order_hkey) REFERENCES raw_vault.hub_order(order_hkey)
);

CREATE TABLE raw_vault.link_order_product (
    op_link_hkey CHAR(32) PRIMARY KEY,
    order_hkey CHAR(32) NOT NULL,
    product_hkey CHAR(32) NOT NULL,
    load_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    record_source VARCHAR(50) NOT NULL,
    FOREIGN KEY (order_hkey) REFERENCES raw_vault.hub_order(order_hkey),
    FOREIGN KEY (product_hkey) REFERENCES raw_vault.hub_product(product_hkey)
);

-- 3. Create Satellites
CREATE TABLE raw_vault.sat_customer_details (
    customer_hkey CHAR(32) NOT NULL,
    load_date TIMESTAMP NOT NULL,
    load_end_date TIMESTAMP,
    record_source VARCHAR(50) NOT NULL,
    hash_diff CHAR(32) NOT NULL,
    customer_name VARCHAR(200),
    email VARCHAR(100),
    phone VARCHAR(20),
    PRIMARY KEY (customer_hkey, load_date),
    FOREIGN KEY (customer_hkey) REFERENCES raw_vault.hub_customer(customer_hkey)
);

CREATE TABLE raw_vault.sat_order_details (
    order_hkey CHAR(32) NOT NULL,
    load_date TIMESTAMP NOT NULL,
    load_end_date TIMESTAMP,
    record_source VARCHAR(50) NOT NULL,
    hash_diff CHAR(32) NOT NULL,
    order_date DATE,
    order_status VARCHAR(50),
    total_amount NUMERIC(12,2),
    PRIMARY KEY (order_hkey, load_date),
    FOREIGN KEY (order_hkey) REFERENCES raw_vault.hub_order(order_hkey)
);

CREATE TABLE raw_vault.sat_order_product_details (
    op_link_hkey CHAR(32) NOT NULL,
    load_date TIMESTAMP NOT NULL,
    load_end_date TIMESTAMP,
    record_source VARCHAR(50) NOT NULL,
    hash_diff CHAR(32) NOT NULL,
    quantity INTEGER,
    unit_price NUMERIC(10,2),
    line_total NUMERIC(12,2),
    PRIMARY KEY (op_link_hkey, load_date),
    FOREIGN KEY (op_link_hkey) REFERENCES raw_vault.link_order_product(op_link_hkey)
);

-- 4. Sample data load
INSERT INTO raw_vault.hub_customer VALUES 
    (raw_vault.generate_hash_key(ARRAY['CUST001']), 'CUST001', CURRENT_TIMESTAMP, 'ERP_SYSTEM');

INSERT INTO raw_vault.sat_customer_details VALUES (
    raw_vault.generate_hash_key(ARRAY['CUST001']),
    CURRENT_TIMESTAMP,
    NULL,
    'ERP_SYSTEM',
    raw_vault.generate_hash_diff(ARRAY['John Doe', 'john@example.com', '555-0100']),
    'John Doe',
    'john@example.com',
    '555-0100'
);

Troubleshooting Common Issues

Issue 1: Slow Satellite Queries

Problem: Queries retrieving current satellite records are slow.

Solution: Create a partial index on current records:

CREATE INDEX idx_sat_current 
    ON raw_vault.sat_customer_details(customer_hkey) 
    WHERE load_end_date IS NULL;

Issue 2: Hash Collisions

Problem: Different business keys produce the same hash.

Solution: While MD5 collisions are extremely rare, use SHA-256 for critical systems:

CREATE OR REPLACE FUNCTION raw_vault.generate_hash_key_sha256(
    p_business_keys TEXT[]
)
RETURNS CHAR(64) AS $$
BEGIN
    RETURN encode(digest(
        array_to_string(p_business_keys, '||'),
        'sha256'
    ), 'hex');
END;
$$ LANGUAGE plpgsql IMMUTABLE;

Issue 3: Managing Late-Arriving Data

Problem: Data arrives out of sequence.

Solution: Implement effective date handling in satellites:

-- Add effective date column
ALTER TABLE raw_vault.sat_customer_details
ADD COLUMN effective_date DATE;

-- Backfill with load_date
UPDATE raw_vault.sat_customer_details
SET effective_date = load_date::DATE
WHERE effective_date IS NULL;

Conclusion

Implementing Data Vault 2.0 in PostgreSQL provides a robust, scalable foundation for enterprise data warehousing. The methodology's insert-only architecture, combined with PostgreSQL's advanced features, creates a system that:

  • Maintains complete data history and auditability
  • Adapts easily to changing business requirements
  • Scales horizontally and vertically
  • Supports multiple source systems without restructuring
  • Enables parallel processing and incremental loads

By following the step-by-step implementation outlined in this guide, you can build a production-ready Data Vault warehouse that meets modern data engineering requirements.

Additional Resources

  • Official Data Vault 2.0 specification: https://datavaultalliance.com
  • PostgreSQL documentation: https://www.postgresql.org/docs/
  • Data Vault modeling tools and templates
  • ETL frameworks: Apache Airflow, Prefect, dbt

Next Steps

  1. Set up a development PostgreSQL instance
  2. Implement a proof-of-concept with sample data
  3. Design your business key structure
  4. Create automated ETL pipelines
  5. Build information marts for reporting
  6. Establish monitoring and alerting
  7. Scale to production workloads

Remember, Data Vault 2.0 is most effective when implemented incrementally. Start with a core business process, validate the approach, and expand systematically.

Friday, 17 October 2025

MySQL REST Service: A Comprehensive Developer Guide

 


Introduction

The MySQL REST Service (MRS) represents a significant evolution in how developers interact with MySQL databases. Built directly into the MySQL ecosystem, MRS provides a next-generation JSON Document Store solution that enables fast, secure HTTPS access to data stored in MySQL, HeatWave, InnoDB ClusterSet, and InnoDB ReplicaSet.

Unlike third-party solutions or custom-built REST APIs, MRS is a fully integrated MySQL solution that prioritizes ease of use, adherence to standards, and high performance. This guide will walk you through everything you need to know to get started with MRS, from installation to implementation in both Python and C#.

Architecture Overview

The MySQL REST Service consists of four major components:

  1. MySQL Server (8.0.39+) - The database server hosting your data and the MRS metadata schema
  2. MySQL Router (9.3.1+) - Serves REST endpoints and Progressive Web Apps
  3. MySQL Shell (9.4.0+) - Configuration and management tool
  4. MySQL Shell for VS Code Extension - Optional GUI for managing MRS

The service operates in two modes:

  • Development Mode: For local development with immediate testing
  • Production Mode: For serving published REST services

Prerequisites and Installation

Requirements

  • MySQL Server 8.0.39 or later
  • MySQL Router 9.3.1 or later
  • MySQL Shell 9.4.0 or higher
  • VS Code with MySQL Shell extension (recommended)

Initial Setup

First, ensure your MySQL Server is running. Then, configure MRS support using MySQL Shell:

-- Connect to your MySQL instance
mysqlsh root@localhost

-- Configure MRS (execute in MySQL Shell)
\sql
CONFIGURE REST METADATA ENABLED UPDATE IF AVAILABLE;

Alternatively, using the VS Code extension:

  1. Right-click on your database connection
  2. Select "Configuring Instance for MySQL REST Service Support"
  3. Provide REST user credentials (minimum 8 characters with mixed case, numbers, and special characters)

Bootstrap MySQL Router

For development:

mysqlrouter --bootstrap root@localhost:3306 --directory=/path/to/router --mrs-development myuser

For production:

mysqlrouter --bootstrap root@localhost:3306 --directory=/path/to/router

Start the router:

mysqlrouter --config=/path/to/router/mysqlrouter.conf

Creating Your First REST Service

Step 1: Create a REST Service

-- Create a new REST service
CREATE REST SERVICE /myService
  COMMENTS "My first REST service"
  ENABLED;

-- Configure authentication
CREATE OR REPLACE REST AUTH APP MRS
  VENDOR "MRS"
  DEFAULT AUTH APP
  SERVICE /myService;

Step 2: Enable a Schema for REST

-- Enable the sakila demo database for REST access
CREATE REST SCHEMA /sakila
  FROM `sakila`
  SERVICE /myService;

Step 3: Enable Database Objects

-- Enable the actor table for REST access
CREATE REST DUALITY VIEW /actor
  ON SERVICE /myService SCHEMA /sakila
  AS `sakila`.`actor` {
    actorId: actor_id @SORTABLE,
    firstName: first_name,
    lastName: last_name,
    lastUpdate: last_update
  };

-- Enable with custom options
CREATE REST DUALITY VIEW /film
  ON SERVICE /myService SCHEMA /sakila
  AS `sakila`.`film` {
    filmId: film_id @SORTABLE,
    title: title @SORTABLE,
    description: description,
    releaseYear: release_year,
    rentalDuration: rental_duration,
    rentalRate: rental_rate,
    length: length,
    rating: rating
  }
  AUTHENTICATION REQUIRED;

Step 4: Create REST Users

-- Add a REST user for authentication
ALTER REST AUTH APP MRS
  ON SERVICE /myService
  ADD USER "developer"
  IDENTIFIED BY "SecurePass123!";

Step 5: Publish the Service

-- Publish the service for production
ALTER REST SERVICE /myService
  PUBLISHED;

Authentication Strategies

MRS supports multiple authentication methods:

1. MRS Native Authentication

CREATE REST AUTH APP MRS
  VENDOR "MRS"
  SERVICE /myService;

ALTER REST AUTH APP MRS
  ADD USER "apiuser"
  IDENTIFIED BY "YourSecurePassword123!";

2. OAuth2 (Google, Facebook, OCI)

CREATE REST AUTH APP GoogleAuth
  VENDOR "Google"
  APP_ID "your-google-app-id.apps.googleusercontent.com"
  ACCESS_TOKEN "your-access-token"
  SERVICE /myService;

3. MySQL Database Authentication

CREATE REST AUTH APP MySQLAuth
  VENDOR "MySQL"
  SERVICE /myService;

Using the REST API

Base URL Structure

https://hostname:port/MRS_SERVICE_PATH/MRS_SCHEMA_PATH/MRS_OBJECT_PATH

Example:

https://localhost:8443/myService/sakila/actor

Basic Operations

GET - Retrieve All Records

curl -X GET https://localhost:8443/myService/sakila/actor \
  -H "Authorization: Basic $(echo -n 'developer:SecurePass123!' | base64)"

GET - Retrieve Single Record

curl -X GET https://localhost:8443/myService/sakila/actor/1 \
  -H "Authorization: Basic $(echo -n 'developer:SecurePass123!' | base64)"

POST - Create Record

curl -X POST https://localhost:8443/myService/sakila/actor \
  -H "Content-Type: application/json" \
  -H "Authorization: Basic $(echo -n 'developer:SecurePass123!' | base64)" \
  -d '{
    "firstName": "John",
    "lastName": "Doe"
  }'

PUT - Update Record

curl -X PUT https://localhost:8443/myService/sakila/actor/201 \
  -H "Content-Type: application/json" \
  -H "Authorization: Basic $(echo -n 'developer:SecurePass123!' | base64)" \
  -d '{
    "firstName": "Jane",
    "lastName": "Smith"
  }'

DELETE - Remove Record

curl -X DELETE https://localhost:8443/myService/sakila/actor/201 \
  -H "Authorization: Basic $(echo -n 'developer:SecurePass123!' | base64)"

Advanced Filtering

MRS supports sophisticated filtering using JSON query objects:

# Filter by first name
curl -X GET 'https://localhost:8443/myService/sakila/actor?q={"firstName":"BRUCE"}' \
  -H "Authorization: Basic $(echo -n 'developer:SecurePass123!' | base64)"

# Complex filter with sorting
curl -X GET 'https://localhost:8443/myService/sakila/actor?q={"$orderby":{"lastName":"ASC"},"$where":{"lastName":{"$like":"S%"}}}' \
  -H "Authorization: Basic $(echo -n 'developer:SecurePass123!' | base64)"

Python Implementation

Installation

pip install requests urllib3

Basic Python Client

import requests
import json
from requests.auth import HTTPBasicAuth
import urllib3

# Disable SSL warnings for self-signed certificates (development only)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

class MySQLRestClient:
    def __init__(self, base_url, username, password):
        self.base_url = base_url.rstrip('/')
        self.auth = HTTPBasicAuth(username, password)
        self.headers = {
            'Content-Type': 'application/json',
            'Accept': 'application/json'
        }
    
    def get_all(self, resource):
        """Retrieve all records from a resource"""
        url = f"{self.base_url}/{resource}"
        response = requests.get(url, auth=self.auth, headers=self.headers, verify=False)
        response.raise_for_status()
        return response.json()
    
    def get_by_id(self, resource, record_id):
        """Retrieve a single record by ID"""
        url = f"{self.base_url}/{resource}/{record_id}"
        response = requests.get(url, auth=self.auth, headers=self.headers, verify=False)
        response.raise_for_status()
        return response.json()
    
    def create(self, resource, data):
        """Create a new record"""
        url = f"{self.base_url}/{resource}"
        response = requests.post(url, auth=self.auth, headers=self.headers, 
                                json=data, verify=False)
        response.raise_for_status()
        return response.json()
    
    def update(self, resource, record_id, data):
        """Update an existing record"""
        url = f"{self.base_url}/{resource}/{record_id}"
        response = requests.put(url, auth=self.auth, headers=self.headers, 
                               json=data, verify=False)
        response.raise_for_status()
        return response.json()
    
    def delete(self, resource, record_id):
        """Delete a record"""
        url = f"{self.base_url}/{resource}/{record_id}"
        response = requests.delete(url, auth=self.auth, headers=self.headers, verify=False)
        response.raise_for_status()
        return response.status_code == 204 or response.status_code == 200
    
    def query(self, resource, filter_obj):
        """Execute a filtered query"""
        url = f"{self.base_url}/{resource}"
        params = {'q': json.dumps(filter_obj)}
        response = requests.get(url, auth=self.auth, headers=self.headers, 
                               params=params, verify=False)
        response.raise_for_status()
        return response.json()


# Usage Example
def main():
    # Initialize client
    client = MySQLRestClient(
        base_url='https://localhost:8443/myService/sakila',
        username='developer',
        password='SecurePass123!'
    )
    
    try:
        # Get all actors
        print("Fetching all actors...")
        actors = client.get_all('actor')
        print(f"Found {len(actors.get('items', []))} actors")
        
        # Get specific actor
        print("\nFetching actor with ID 1...")
        actor = client.get_by_id('actor', 1)
        print(f"Actor: {actor.get('firstName')} {actor.get('lastName')}")
        
        # Create new actor
        print("\nCreating new actor...")
        new_actor = client.create('actor', {
            'firstName': 'Tom',
            'lastName': 'Hanks'
        })
        print(f"Created actor with ID: {new_actor.get('actorId')}")
        
        # Update actor
        print("\nUpdating actor...")
        updated_actor = client.update('actor', new_actor.get('actorId'), {
            'firstName': 'Thomas',
            'lastName': 'Hanks'
        })
        print(f"Updated actor: {updated_actor.get('firstName')} {updated_actor.get('lastName')}")
        
        # Query with filter
        print("\nQuerying actors with last name starting with 'S'...")
        filtered = client.query('actor', {
            '$where': {
                'lastName': {'$like': 'S%'}
            },
            '$orderby': {
                'lastName': 'ASC'
            }
        })
        for actor in filtered.get('items', [])[:5]:
            print(f"  - {actor.get('firstName')} {actor.get('lastName')}")
        
        # Delete actor
        print(f"\nDeleting actor {new_actor.get('actorId')}...")
        success = client.delete('actor', new_actor.get('actorId'))
        print(f"Delete successful: {success}")
        
    except requests.exceptions.HTTPError as e:
        print(f"HTTP Error: {e}")
        print(f"Response: {e.response.text}")
    except Exception as e:
        print(f"Error: {e}")


if __name__ == '__main__':
    main()

Advanced Python Example with Connection Pooling

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from requests.auth import HTTPBasicAuth
import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

class AdvancedMySQLRestClient:
    def __init__(self, base_url, username, password, pool_connections=10, pool_maxsize=20):
        self.base_url = base_url.rstrip('/')
        self.auth = HTTPBasicAuth(username, password)
        
        # Configure session with connection pooling and retry strategy
        self.session = requests.Session()
        
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["HEAD", "GET", "OPTIONS", "POST", "PUT", "DELETE"]
        )
        
        adapter = HTTPAdapter(
            pool_connections=pool_connections,
            pool_maxsize=pool_maxsize,
            max_retries=retry_strategy
        )
        
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)
        self.session.auth = self.auth
        self.session.headers.update({
            'Content-Type': 'application/json',
            'Accept': 'application/json'
        })
    
    def batch_create(self, resource, records):
        """Create multiple records efficiently"""
        results = []
        for record in records:
            try:
                url = f"{self.base_url}/{resource}"
                response = self.session.post(url, json=record, verify=False)
                response.raise_for_status()
                results.append({'success': True, 'data': response.json()})
            except Exception as e:
                results.append({'success': False, 'error': str(e)})
        return results
    
    def paginated_query(self, resource, filter_obj=None, limit=100):
        """Retrieve records with pagination"""
        offset = 0
        all_items = []
        
        while True:
            query = filter_obj.copy() if filter_obj else {}
            query['$limit'] = limit
            query['$offset'] = offset
            
            url = f"{self.base_url}/{resource}"
            response = self.session.get(
                url, 
                params={'q': json.dumps(query)}, 
                verify=False
            )
            response.raise_for_status()
            
            data = response.json()
            items = data.get('items', [])
            
            if not items:
                break
                
            all_items.extend(items)
            offset += limit
            
            if len(items) < limit:
                break
        
        return all_items
    
    def close(self):
        """Close the session"""
        self.session.close()


# Usage
if __name__ == '__main__':
    client = AdvancedMySQLRestClient(
        base_url='https://localhost:8443/myService/sakila',
        username='developer',
        password='SecurePass123!'
    )
    
    try:
        # Batch create
        new_actors = [
            {'firstName': 'Robert', 'lastName': 'Downey'},
            {'firstName': 'Chris', 'lastName': 'Evans'},
            {'firstName': 'Scarlett', 'lastName': 'Johansson'}
        ]
        results = client.batch_create('actor', new_actors)
        print(f"Created {sum(1 for r in results if r['success'])} actors")
        
        # Paginated query
        all_actors = client.paginated_query('actor', limit=50)
        print(f"Retrieved {len(all_actors)} actors total")
        
    finally:
        client.close()

C# Implementation

Installation

dotnet add package Newtonsoft.Json
dotnet add package RestSharp

Basic C# Client

using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

namespace MySQLRestClient
{
    public class MySQLRestClient : IDisposable
    {
        private readonly HttpClient _httpClient;
        private readonly string _baseUrl;

        public MySQLRestClient(string baseUrl, string username, string password)
        {
            _baseUrl = baseUrl.TrimEnd('/');
            _httpClient = new HttpClient();
            
            // Configure authentication
            var authToken = Convert.ToBase64String(
                Encoding.ASCII.GetBytes($"{username}:{password}")
            );
            _httpClient.DefaultRequestHeaders.Authorization = 
                new AuthenticationHeaderValue("Basic", authToken);
            
            // Set headers
            _httpClient.DefaultRequestHeaders.Accept.Add(
                new MediaTypeWithQualityHeaderValue("application/json")
            );
            
            // Ignore SSL certificate errors (development only)
            var handler = new HttpClientHandler
            {
                ServerCertificateCustomValidationCallback = (message, cert, chain, errors) => true
            };
            _httpClient = new HttpClient(handler);
            _httpClient.DefaultRequestHeaders.Authorization = 
                new AuthenticationHeaderValue("Basic", authToken);
            _httpClient.DefaultRequestHeaders.Accept.Add(
                new MediaTypeWithQualityHeaderValue("application/json")
            );
        }

        public async Task<JObject> GetAllAsync(string resource)
        {
            var url = $"{_baseUrl}/{resource}";
            var response = await _httpClient.GetAsync(url);
            response.EnsureSuccessStatusCode();
            
            var content = await response.Content.ReadAsStringAsync();
            return JObject.Parse(content);
        }

        public async Task<JObject> GetByIdAsync(string resource, int id)
        {
            var url = $"{_baseUrl}/{resource}/{id}";
            var response = await _httpClient.GetAsync(url);
            response.EnsureSuccessStatusCode();
            
            var content = await response.Content.ReadAsStringAsync();
            return JObject.Parse(content);
        }

        public async Task<JObject> CreateAsync(string resource, object data)
        {
            var url = $"{_baseUrl}/{resource}";
            var json = JsonConvert.SerializeObject(data);
            var content = new StringContent(json, Encoding.UTF8, "application/json");
            
            var response = await _httpClient.PostAsync(url, content);
            response.EnsureSuccessStatusCode();
            
            var responseContent = await response.Content.ReadAsStringAsync();
            return JObject.Parse(responseContent);
        }

        public async Task<JObject> UpdateAsync(string resource, int id, object data)
        {
            var url = $"{_baseUrl}/{resource}/{id}";
            var json = JsonConvert.SerializeObject(data);
            var content = new StringContent(json, Encoding.UTF8, "application/json");
            
            var response = await _httpClient.PutAsync(url, content);
            response.EnsureSuccessStatusCode();
            
            var responseContent = await response.Content.ReadAsStringAsync();
            return JObject.Parse(responseContent);
        }

        public async Task<bool> DeleteAsync(string resource, int id)
        {
            var url = $"{_baseUrl}/{resource}/{id}";
            var response = await _httpClient.DeleteAsync(url);
            return response.IsSuccessStatusCode;
        }

        public async Task<JObject> QueryAsync(string resource, object filter)
        {
            var filterJson = JsonConvert.SerializeObject(filter);
            var url = $"{_baseUrl}/{resource}?q={Uri.EscapeDataString(filterJson)}";
            
            var response = await _httpClient.GetAsync(url);
            response.EnsureSuccessStatusCode();
            
            var content = await response.Content.ReadAsStringAsync();
            return JObject.Parse(content);
        }

        public void Dispose()
        {
            _httpClient?.Dispose();
        }
    }

    // Usage Example
    class Program
    {
        static async Task Main(string[] args)
        {
            using var client = new MySQLRestClient(
                "https://localhost:8443/myService/sakila",
                "developer",
                "SecurePass123!"
            );

            try
            {
                // Get all actors
                Console.WriteLine("Fetching all actors...");
                var actors = await client.GetAllAsync("actor");
                var items = actors["items"] as JArray;
                Console.WriteLine($"Found {items?.Count ?? 0} actors");

                // Get specific actor
                Console.WriteLine("\nFetching actor with ID 1...");
                var actor = await client.GetByIdAsync("actor", 1);
                Console.WriteLine($"Actor: {actor["firstName"]} {actor["lastName"]}");

                // Create new actor
                Console.WriteLine("\nCreating new actor...");
                var newActor = await client.CreateAsync("actor", new
                {
                    firstName = "Leonardo",
                    lastName = "DiCaprio"
                });
                var actorId = newActor["actorId"].Value<int>();
                Console.WriteLine($"Created actor with ID: {actorId}");

                // Update actor
                Console.WriteLine("\nUpdating actor...");
                var updatedActor = await client.UpdateAsync("actor", actorId, new
                {
                    firstName = "Leo",
                    lastName = "DiCaprio"
                });
                Console.WriteLine($"Updated actor: {updatedActor["firstName"]} {updatedActor["lastName"]}");

                // Query with filter
                Console.WriteLine("\nQuerying actors with last name starting with 'D'...");
                var filtered = await client.QueryAsync("actor", new
                {
                    @where = new
                    {
                        lastName = new { @like = "D%" }
                    },
                    orderby = new
                    {
                        lastName = "ASC"
                    }
                });
                
                var filteredItems = filtered["items"] as JArray;
                foreach (var item in filteredItems.Take(5))
                {
                    Console.WriteLine($"  - {item["firstName"]} {item["lastName"]}");
                }

                // Delete actor
                Console.WriteLine($"\nDeleting actor {actorId}...");
                var success = await client.DeleteAsync("actor", actorId);
                Console.WriteLine($"Delete successful: {success}");
            }
            catch (HttpRequestException ex)
            {
                Console.WriteLine($"HTTP Error: {ex.Message}");
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Error: {ex.Message}");
            }
        }
    }
}

Advanced C# Client with Strongly-Typed Models

using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json;

namespace MySQLRestClient.Models
{
    // Model classes
    public class Actor
    {
        [JsonProperty("actorId")]
        public int ActorId { get; set; }

        [JsonProperty("firstName")]
        public string FirstName { get; set; }

        [JsonProperty("lastName")]
        public string LastName { get; set; }

        [JsonProperty("lastUpdate")]
        public DateTime LastUpdate { get; set; }
    }

    public class ApiResponse<T>
    {
        [JsonProperty("items")]
        public List<T> Items { get; set; }

        [JsonProperty("count")]
        public int Count { get; set; }

        [JsonProperty("offset")]
        public int Offset { get; set; }

        [JsonProperty("limit")]
        public int Limit { get; set; }

        [JsonProperty("hasMore")]
        public bool HasMore { get; set; }
    }

    // Strongly-typed client
    public class TypedMySQLRestClient<T> : IDisposable where T : class
    {
        private readonly HttpClient _httpClient;
        private readonly string _baseUrl;
        private readonly string _resource;

        public TypedMySQLRestClient(string baseUrl, string resource, string username, string password)
        {
            _baseUrl = baseUrl.TrimEnd('/');
            _resource = resource;
            
            var handler = new HttpClientHandler
            {
                ServerCertificateCustomValidationCallback = (message, cert, chain, errors) => true
            };
            
            _httpClient = new HttpClient(handler);
            
            var authToken = Convert.ToBase64String(
                Encoding.ASCII.GetBytes($"{username}:{password}")
            );
            
            _httpClient.DefaultRequestHeaders.Authorization = 
                new AuthenticationHeaderValue("Basic", authToken);
            _httpClient.DefaultRequestHeaders.Accept.Add(
                new MediaTypeWithQualityHeaderValue("application/json")
            );
        }

        public async Task<ApiResponse<T>> GetAllAsync(int limit = 100, int offset = 0)
        {
            var url = $"{_baseUrl}/{_resource}?limit={limit}&offset={offset}";
            var response = await _httpClient.GetAsync(url);
            response.EnsureSuccessStatusCode();
            
            var content = await response.Content.ReadAsStringAsync();
            return JsonConvert.DeserializeObject<ApiResponse<T>>(content);
        }

        public async Task<T> GetByIdAsync(int id)
        {
            var url = $"{_baseUrl}/{_resource}/{id}";
            var response = await _httpClient.GetAsync(url);
            response.EnsureSuccessStatusCode();
            
            var content = await response.Content.ReadAsStringAsync();
            return JsonConvert.DeserializeObject<T>(content);
        }

        public async Task<T> CreateAsync(T entity)
        {
            var url = $"{_baseUrl}/{_resource}";
            var json = JsonConvert.SerializeObject(entity);
            var content = new StringContent(json, Encoding.UTF8, "application/json");
            
            var response = await _httpClient.PostAsync(url, content);
            response.EnsureSuccessStatusCode();
            
            var responseContent = await response.Content.ReadAsStringAsync();
            return JsonConvert.DeserializeObject<T>(responseContent);
        }

        public void Dispose()
        {
            _httpClient?.Dispose();
        }
    }

    // Usage
    class Program
    {
        static async Task Main(string[] args)
        {
            using var actorClient = new TypedMySQLRestClient<Actor>(
                "https://localhost:8443/myService/sakila",
                "actor",
                "developer",
                "SecurePass123!"
            );

            // Get all actors with pagination
            var response = await actorClient.GetAllAsync(limit: 10, offset: 0);
            Console.WriteLine($"Retrieved {response.Items.Count} actors");
            
            foreach (var actor in response.Items)
            {
                Console.WriteLine($"{actor.ActorId}: {actor.FirstName} {actor.LastName}");
            }

            // Create new actor
            var newActor = new Actor
            {
                FirstName = "Brad",
                LastName = "Pitt"
            };
            
            var created = await actorClient.CreateAsync(newActor);
            Console.WriteLine($"Created actor with ID: {created.ActorId}");
        }
    }
}

Best Practices

1. Security

  • Always use HTTPS in production
  • Never hardcode credentials - use environment variables or secure vaults
  • Implement proper authentication - Use OAuth2 for public-facing services
  • Validate input data on both client and server side
  • Use role-based access control for different user types

2. Performance

  • Enable connection pooling in your HTTP clients
  • Implement caching for frequently accessed data
  • Use pagination for large datasets
  • Leverage filtering to reduce data transfer
  • Monitor query performance and optimize slow queries

3. Error Handling

# Python example
try:
    result = client.get_by_id('actor', 999)
except requests.exceptions.HTTPError as e:
    if e.response.status_code == 404:
        print("Actor not found")
    elif e.response.status_code == 401:
        print("Authentication failed")
    elif e.response.status_code == 403:
        print("Access forbidden")
    else:
        print(f"HTTP error occurred: {e}")
except requests.exceptions.ConnectionError:
    print("Failed to connect to the server")
except Exception as e:
    print(f"An unexpected error occurred: {e}")
// C# example
try
{
    var actor = await client.GetByIdAsync("actor", 999);
}
catch (HttpRequestException ex) when (ex.StatusCode == System.Net.HttpStatusCode.NotFound)
{
    Console.WriteLine("Actor not found");
}
catch (HttpRequestException ex) when (ex.StatusCode == System.Net.HttpStatusCode.Unauthorized)
{
    Console.WriteLine("Authentication failed");
}
catch (HttpRequestException ex)
{
    Console.WriteLine($"HTTP error: {ex.Message}");
}
catch (Exception ex)
{
    Console.WriteLine($"Unexpected error: {ex.Message}");
}

4. Development Workflow

  1. Start with development mode - Use MRS development router for rapid prototyping
  2. Test thoroughly - Write comprehensive tests for all endpoints
  3. Use version control - Track MRS service definitions
  4. Document your API - Generate and maintain SDK documentation
  5. Monitor in production - Use MySQL Router metrics and logs

Advanced Features

Stored Procedures as REST Endpoints

-- Create a stored procedure
DELIMITER //
CREATE PROCEDURE GetActorFilmCount(IN actorId INT)
BEGIN
    SELECT COUNT(*) as filmCount
    FROM film_actor
    WHERE actor_id = actorId;
END //
DELIMITER ;

-- Enable it as REST endpoint
CREATE REST PROCEDURE /getActorFilmCount
  ON SERVICE /myService SCHEMA /sakila
  AS `sakila`.`GetActorFilmCount`;

Call from Python:

# Call stored procedure
result = client.session.post(
    f"{client.base_url}/getActorFilmCount",
    json={"actorId": 1},
    auth=client.auth,
    verify=False
)
print(result.json())

SDK Generation

Generate type-safe SDKs directly from MRS:

# Generate TypeScript SDK
mysqlsh dba@localhost --py -e \
  'mrs.dump.sdk_service_files(
    directory="/path/to/project/sdk",
    options={
      "sdk_language": "TypeScript",
      "service_url": "https://api.example.com/myService"
    }
  )'

# Generate Python SDK
mysqlsh dba@localhost --py -e \
  'mrs.dump.sdk_service_files(
    directory="/path/to/project/sdk",
    options={
      "sdk_language": "Python",
      "service_url": "https://api.example.com/myService"
    }
  )'

Read Your Own Writes (Cluster Consistency)

For MySQL InnoDB Cluster deployments:

# Enable read-your-own-writes consistency
result = client.session.get(
    f"{client.base_url}/actor",
    headers={
        'X-Read-Own-Writes': 'true'
    },
    auth=client.auth,
    verify=False
)

Troubleshooting

Common Issues

1. Connection Refused

# Check if MySQL Router is running
ps aux | grep mysqlrouter

# Check router logs
tail -f /path/to/router/log/mysqlrouter.log

2. Authentication Failures

-- Verify user exists
SELECT * FROM mysql_rest_service_metadata.auth_user;

-- Reset user password
ALTER REST AUTH APP MRS
  ON SERVICE /myService
  ALTER USER "developer"
  IDENTIFIED BY "NewPassword123!";

3. Service Not Found

-- List all services
SHOW REST SERVICES;

-- Verify service is published
SELECT * FROM mysql_rest_service_metadata.service
WHERE url_context_root = '/myService';

Conclusion

The MySQL REST Service represents a powerful, integrated solution for exposing MySQL data through RESTful APIs. By following this guide, you should now be able to:

  • Set up and configure MRS in both development and production environments
  • Create and manage REST services with proper authentication
  • Implement robust clients in Python and C#
  • Apply best practices for security, performance, and error handling
  • Leverage advanced features like stored procedures and SDK generation

The native integration with MySQL, combined with enterprise-grade features like OAuth2 support, connection pooling, and cluster consistency, makes MRS an excellent choice for modern application development.

For more information, consult the official MySQL REST Service documentation.


Happy coding!

KvRocks as Your Main Database — Possible Use Cases

  A technical deep-dive into when Apache KvRocks earns a primary role in your stack Introduction Most engineers first encounter Apache Kv...