Thursday, 27 November 2025

CogDB: A Comprehensive Guide to Python's Graph Database Solution

 

Introduction

CogDB is a high-performance, in-memory graph database written in Python that provides a simple yet powerful API for working with graph data structures. Unlike traditional relational databases that store data in tables, CogDB represents information as nodes and edges, making it ideal for applications involving complex relationships, social networks, recommendation systems, and knowledge graphs.

In this comprehensive guide, we'll explore CogDB from the ground up, covering its core concepts, installation, basic operations, and advanced features with practical code examples.

What is CogDB?

CogDB is designed to be a lightweight, fast, and easy-to-use graph database that runs entirely in memory. It's particularly well-suited for:

  • Social network analysis
  • Recommendation engines
  • Knowledge representation
  • Fraud detection systems
  • Network topology analysis
  • Dependency tracking

Key Features

  • In-memory storage: Lightning-fast query performance
  • Simple API: Intuitive Python interface
  • Flexible schema: No rigid structure requirements
  • Efficient traversals: Optimized graph algorithms
  • Thread-safe operations: Suitable for concurrent applications

Installation and Setup

pip install cogdb

For development or the latest features:

pip install git+https://github.com/arun1729/cog.git

Core Concepts

Nodes and Edges

In CogDB, data is represented as:

  • Nodes: Entities that can have properties (key-value pairs)
  • Edges: Relationships between nodes, which can also have properties

Graph Structure

from cog import Cog

# Initialize a new graph database
db = Cog()

# Create nodes
user1 = db.create_node("User", {"name": "Alice", "age": 30})
user2 = db.create_node("User", {"name": "Bob", "age": 25})
product = db.create_node("Product", {"name": "Laptop", "price": 999.99})

# Create relationships
purchase_edge = db.create_edge(user1, "PURCHASED", product, {"date": "2024-01-15", "quantity": 1})
friendship_edge = db.create_edge(user1, "FRIENDS_WITH", user2, {"since": "2020-03-10"})

Basic Operations

Creating and Managing Nodes

from cog import Cog
import datetime

db = Cog()

# Create nodes with different types and properties
person = db.create_node("Person", {
    "name": "John Doe",
    "email": "john@example.com",
    "created_at": datetime.datetime.now().isoformat()
})

company = db.create_node("Company", {
    "name": "TechCorp",
    "industry": "Technology",
    "founded": 2010,
    "employees": 500
})

# Access node properties
print(f"Person: {person['name']}")  # Person: John Doe
print(f"Company: {company['name']}")  # Company: TechCorp

# Update node properties
person["age"] = 35
person["location"] = "San Francisco"

# Get node by ID
node_id = person.id
retrieved_node = db.get_node(node_id)

Working with Edges

# Create different types of relationships
works_at = db.create_edge(person, "WORKS_AT", company, {
    "position": "Software Engineer",
    "start_date": "2023-01-01",
    "salary": 120000
})

# Create bidirectional relationship
friend1 = db.create_node("Person", {"name": "Alice"})
friend2 = db.create_node("Person", {"name": "Bob"})

# Create friendship (bidirectional)
friendship1 = db.create_edge(friend1, "FRIENDS_WITH", friend2)
friendship2 = db.create_edge(friend2, "FRIENDS_WITH", friend1)

# Access edge properties
print(f"Position: {works_at['position']}")
print(f"Start date: {works_at['start_date']}")

Querying Data

# Find nodes by label
all_persons = db.get_nodes("Person")
all_companies = db.get_nodes("Company")

# Find nodes with specific properties
tech_companies = [node for node in db.get_nodes("Company") 
                 if node.get("industry") == "Technology"]

# Find edges by relationship type
work_relationships = db.get_edges("WORKS_AT")
friendships = db.get_edges("FRIENDS_WITH")

# Get neighbors of a node
person_neighbors = db.get_neighbors(person)
company_neighbors = db.get_neighbors(company)

Advanced Querying and Traversals

Graph Traversal Patterns

def find_colleagues(db, person_node):
    """Find all colleagues of a person (people working at the same company)"""
    colleagues = []
    
    # Get companies where the person works
    work_edges = [edge for edge in db.get_edges("WORKS_AT") 
                  if edge.source == person_node]
    
    for work_edge in work_edges:
        company = work_edge.target
        # Find other people working at the same company
        company_edges = [edge for edge in db.get_edges("WORKS_AT") 
                        if edge.target == company and edge.source != person_node]
        
        for edge in company_edges:
            colleagues.append(edge.source)
    
    return colleagues

def find_mutual_friends(db, person1, person2):
    """Find mutual friends between two people"""
    person1_friends = set()
    person2_friends = set()
    
    # Get friends of person1
    for edge in db.get_edges("FRIENDS_WITH"):
        if edge.source == person1:
            person1_friends.add(edge.target)
    
    # Get friends of person2
    for edge in db.get_edges("FRIENDS_WITH"):
        if edge.source == person2:
            person2_friends.add(edge.target)
    
    return person1_friends.intersection(person2_friends)

# Usage examples
alice = db.create_node("Person", {"name": "Alice"})
bob = db.create_node("Person", {"name": "Bob"})
charlie = db.create_node("Person", {"name": "Charlie"})

# Create friendship network
db.create_edge(alice, "FRIENDS_WITH", charlie)
db.create_edge(bob, "FRIENDS_WITH", charlie)

mutual_friends = find_mutual_friends(db, alice, bob)
print(f"Mutual friends: {[friend['name'] for friend in mutual_friends]}")

Pathfinding Algorithms

def find_shortest_path(db, start_node, end_node, relationship_type=None):
    """Find shortest path between two nodes using BFS"""
    from collections import deque
    
    if start_node == end_node:
        return [start_node]
    
    queue = deque([(start_node, [start_node])])
    visited = {start_node}
    
    while queue:
        current_node, path = queue.popleft()
        
        # Get all edges from current node
        edges = [edge for edge in db.edges 
                if edge.source == current_node and 
                (relationship_type is None or edge.relationship == relationship_type)]
        
        for edge in edges:
            neighbor = edge.target
            
            if neighbor == end_node:
                return path + [neighbor]
            
            if neighbor not in visited:
                visited.add(neighbor)
                queue.append((neighbor, path + [neighbor]))
    
    return None  # No path found

def calculate_node_centrality(db, node):
    """Calculate simple degree centrality for a node"""
    incoming_edges = [edge for edge in db.edges if edge.target == node]
    outgoing_edges = [edge for edge in db.edges if edge.source == node]
    
    return {
        "in_degree": len(incoming_edges),
        "out_degree": len(outgoing_edges),
        "total_degree": len(incoming_edges) + len(outgoing_edges)
    }

Real-World Example: Social Network Analysis

Let's build a complete social network analysis system:

class SocialNetworkAnalyzer:
    def __init__(self):
        self.db = Cog()
        self.users = {}
    
    def add_user(self, username, profile_data):
        """Add a new user to the social network"""
        user_node = self.db.create_node("User", {
            "username": username,
            **profile_data,
            "created_at": datetime.datetime.now().isoformat()
        })
        self.users[username] = user_node
        return user_node
    
    def add_friendship(self, username1, username2):
        """Create a bidirectional friendship"""
        user1 = self.users.get(username1)
        user2 = self.users.get(username2)
        
        if not user1 or not user2:
            raise ValueError("One or both users not found")
        
        # Create bidirectional friendship
        self.db.create_edge(user1, "FRIENDS_WITH", user2, {
            "created_at": datetime.datetime.now().isoformat()
        })
        self.db.create_edge(user2, "FRIENDS_WITH", user1, {
            "created_at": datetime.datetime.now().isoformat()
        })
    
    def add_post(self, username, content, tags=None):
        """Add a post by a user"""
        user = self.users.get(username)
        if not user:
            raise ValueError("User not found")
        
        post_node = self.db.create_node("Post", {
            "content": content,
            "tags": tags or [],
            "created_at": datetime.datetime.now().isoformat(),
            "likes": 0
        })
        
        self.db.create_edge(user, "POSTED", post_node)
        return post_node
    
    def like_post(self, username, post_node):
        """User likes a post"""
        user = self.users.get(username)
        if not user:
            raise ValueError("User not found")
        
        # Check if already liked
        existing_like = any(
            edge.source == user and edge.target == post_node 
            for edge in self.db.get_edges("LIKED")
        )
        
        if not existing_like:
            self.db.create_edge(user, "LIKED", post_node)
            post_node["likes"] = post_node.get("likes", 0) + 1
    
    def get_user_feed(self, username, limit=10):
        """Get posts from user's friends"""
        user = self.users.get(username)
        if not user:
            return []
        
        # Get user's friends
        friends = []
        for edge in self.db.get_edges("FRIENDS_WITH"):
            if edge.source == user:
                friends.append(edge.target)
        
        # Get posts from friends
        feed_posts = []
        for friend in friends:
            for edge in self.db.get_edges("POSTED"):
                if edge.source == friend:
                    post = edge.target
                    feed_posts.append({
                        "author": friend["username"],
                        "content": post["content"],
                        "created_at": post["created_at"],
                        "likes": post.get("likes", 0)
                    })
        
        # Sort by creation time (most recent first)
        feed_posts.sort(key=lambda x: x["created_at"], reverse=True)
        return feed_posts[:limit]
    
    def get_mutual_friends(self, username1, username2):
        """Find mutual friends between two users"""
        user1 = self.users.get(username1)
        user2 = self.users.get(username2)
        
        if not user1 or not user2:
            return []
        
        user1_friends = set()
        user2_friends = set()
        
        # Get friends of both users
        for edge in self.db.get_edges("FRIENDS_WITH"):
            if edge.source == user1:
                user1_friends.add(edge.target)
            elif edge.source == user2:
                user2_friends.add(edge.target)
        
        mutual = user1_friends.intersection(user2_friends)
        return [friend["username"] for friend in mutual]
    
    def get_popular_posts(self, limit=10):
        """Get most liked posts"""
        posts = []
        for post_node in self.db.get_nodes("Post"):
            # Find the author
            author = None
            for edge in self.db.get_edges("POSTED"):
                if edge.target == post_node:
                    author = edge.source["username"]
                    break
            
            posts.append({
                "author": author,
                "content": post_node["content"],
                "likes": post_node.get("likes", 0),
                "created_at": post_node["created_at"]
            })
        
        posts.sort(key=lambda x: x["likes"], reverse=True)
        return posts[:limit]

# Usage example
network = SocialNetworkAnalyzer()

# Add users
network.add_user("alice", {"name": "Alice Johnson", "age": 28, "city": "New York"})
network.add_user("bob", {"name": "Bob Smith", "age": 32, "city": "San Francisco"})
network.add_user("charlie", {"name": "Charlie Brown", "age": 25, "city": "Chicago"})

# Create friendships
network.add_friendship("alice", "bob")
network.add_friendship("bob", "charlie")
network.add_friendship("alice", "charlie")

# Add posts
post1 = network.add_post("alice", "Just visited the Metropolitan Museum!", ["art", "nyc"])
post2 = network.add_post("bob", "Beautiful sunset in San Francisco today", ["sunset", "sf"])
post3 = network.add_post("charlie", "Deep dish pizza is the best!", ["food", "chicago"])

# Like posts
network.like_post("bob", post1)
network.like_post("charlie", post1)
network.like_post("alice", post2)

# Get user feed
alice_feed = network.get_user_feed("alice")
print("Alice's feed:")
for post in alice_feed:
    print(f"  {post['author']}: {post['content']} ({post['likes']} likes)")

# Get mutual friends
mutual = network.get_mutual_friends("alice", "charlie")
print(f"Mutual friends of Alice and Charlie: {mutual}")

# Get popular posts
popular = network.get_popular_posts(5)
print("Popular posts:")
for post in popular:
    print(f"  {post['author']}: {post['content']} ({post['likes']} likes)")

Performance Optimization

Indexing Strategies

class IndexedCogDB:
    def __init__(self):
        self.db = Cog()
        self.node_indices = {}  # property -> value -> [nodes]
        self.edge_indices = {}  # relationship_type -> [edges]
    
    def create_node_index(self, property_name):
        """Create an index on a node property"""
        if property_name not in self.node_indices:
            self.node_indices[property_name] = {}
            
            # Index existing nodes
            for node in self.db.nodes:
                if property_name in node:
                    value = node[property_name]
                    if value not in self.node_indices[property_name]:
                        self.node_indices[property_name][value] = []
                    self.node_indices[property_name][value].append(node)
    
    def find_nodes_by_property(self, property_name, value):
        """Fast lookup using index"""
        if property_name in self.node_indices:
            return self.node_indices[property_name].get(value, [])
        else:
            # Fallback to linear search
            return [node for node in self.db.nodes 
                   if node.get(property_name) == value]
    
    def create_node(self, label, properties):
        """Create node and update indices"""
        node = self.db.create_node(label, properties)
        
        # Update indices
        for prop_name, prop_value in properties.items():
            if prop_name in self.node_indices:
                if prop_value not in self.node_indices[prop_name]:
                    self.node_indices[prop_name][prop_value] = []
                self.node_indices[prop_name][prop_value].append(node)
        
        return node

Memory Management

def cleanup_orphaned_nodes(db):
    """Remove nodes that have no edges"""
    orphaned_nodes = []
    
    for node in db.nodes:
        has_edges = any(
            edge.source == node or edge.target == node 
            for edge in db.edges
        )
        
        if not has_edges:
            orphaned_nodes.append(node)
    
    for node in orphaned_nodes:
        db.delete_node(node.id)
    
    return len(orphaned_nodes)

def get_memory_stats(db):
    """Get memory usage statistics"""
    return {
        "nodes": len(db.nodes),
        "edges": len(db.edges),
        "avg_node_properties": sum(len(node) for node in db.nodes) / len(db.nodes) if db.nodes else 0,
        "avg_edge_properties": sum(len(edge) for edge in db.edges) / len(db.edges) if db.edges else 0
    }

Best Practices and Tips

1. Design Efficient Graph Schemas

# Good: Specific relationship types
db.create_edge(user, "PURCHASED", product)
db.create_edge(user, "VIEWED", product)
db.create_edge(user, "REVIEWED", product)

# Avoid: Generic relationships
# db.create_edge(user, "RELATED_TO", product)  # Too vague

2. Use Appropriate Data Types

# Store dates as ISO strings for consistency
node["created_at"] = datetime.datetime.now().isoformat()

# Use lists for multiple values
node["tags"] = ["python", "database", "graph"]

# Use numbers for numeric calculations
node["price"] = 99.99
node["quantity"] = 5

3. Implement Proper Error Handling

def safe_node_creation(db, label, properties):
    """Safely create a node with validation"""
    try:
        # Validate required properties
        if not properties.get("id"):
            raise ValueError("Node must have an ID")
        
        # Check for duplicates
        existing = [node for node in db.get_nodes(label) 
                   if node.get("id") == properties["id"]]
        
        if existing:
            raise ValueError(f"Node with ID {properties['id']} already exists")
        
        return db.create_node(label, properties)
        
    except Exception as e:
        print(f"Error creating node: {e}")
        return None

Conclusion

CogDB provides a powerful and intuitive way to work with graph data in Python. Its in-memory architecture makes it ideal for applications that require fast graph traversals and complex relationship queries. The examples in this guide demonstrate how to:

  • Set up and configure CogDB
  • Perform basic CRUD operations
  • Implement complex graph algorithms
  • Build real-world applications like social networks
  • Optimize performance with indexing
  • Follow best practices for graph database design

Whether you're building recommendation systems, analyzing social networks, or working with knowledge graphs, CogDB offers the flexibility and performance needed for modern graph-based applications. As your data grows, consider implementing the optimization strategies discussed here to maintain performance and manage memory efficiently.

The key to success with CogDB is understanding your data relationships and designing your graph schema to support your most common query patterns. Start simple, measure performance, and optimize based on your specific use cases.

CogDB: A Comprehensive Guide to Python's Graph Database Solution

  Introduction CogDB is a high-performance, in-memory graph database written in Python that provides a simple yet powerful API for working ...