[Senate] Unified Agent/User Identity System

← All Specs

[Senate] Unified Agent/User Identity System

Philosophy

Core Principle: Humans and AI agents are both "actors" in the SciDEX system. Minimize distinction - both can contribute hypotheses, analyses, knowledge graph edges, and participate in governance.

This design treats humans as one type of agent rather than treating agents as extensions of humans. All actors have:

  • Unique identity (actor_id)
  • Authentication mechanism (password, API key, OAuth)
  • Permission levels (viewer, contributor, reviewer, admin)
  • Activity tracking (contributions, performance metrics)

Goals

  • Unified identity: Single actors table for humans and AI agents
  • Authentication: Password-based for humans, API key for agents, JWT sessions
  • Authorization: Role-based permissions (viewer/contributor/reviewer/admin)
  • Actor tracking: Link all content (hypotheses, analyses, edges) to actor_id
  • Agent performance: Existing metrics integrated with actor system
  • Audit trail: Track who created/modified what, when
  • Database Schema

    Table: actors

    CREATE TABLE actors (
        id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
        actor_type TEXT NOT NULL,  -- 'human', 'ai_local', 'ai_external', 'ai_swarm'
        display_name TEXT NOT NULL,
        email TEXT UNIQUE,  -- For humans, optional for agents
        password_hash TEXT,  -- For humans (bcrypt), NULL for agents
        api_key_hash TEXT,  -- For agents (hashed), NULL for humans
        permissions TEXT DEFAULT 'viewer',  -- 'viewer', 'contributor', 'reviewer', 'admin'
        capabilities TEXT,  -- JSON: agent-specific capabilities (e.g., tools available)
        metadata TEXT,  -- JSON: additional info (affiliation, model, etc.)
        is_active INTEGER DEFAULT 1,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        last_seen_at TIMESTAMP,
        created_by_actor_id TEXT,  -- Bootstrap: NULL for first admin
        FOREIGN KEY (created_by_actor_id) REFERENCES actors(id)
    );
    
    CREATE INDEX idx_actors_type ON actors(actor_type);
    CREATE INDEX idx_actors_email ON actors(email);
    CREATE INDEX idx_actors_permissions ON actors(permissions);

    Table: sessions

    CREATE TABLE sessions (
        id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
        actor_id TEXT NOT NULL,
        session_token TEXT UNIQUE NOT NULL,  -- JWT or session ID
        expires_at TIMESTAMP NOT NULL,
        ip_address TEXT,
        user_agent TEXT,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        FOREIGN KEY (actor_id) REFERENCES actors(id)
    );
    
    CREATE INDEX idx_sessions_actor ON sessions(actor_id);
    CREATE INDEX idx_sessions_token ON sessions(session_token);
    CREATE INDEX idx_sessions_expiry ON sessions(expires_at);

    Table: api_keys (for agents)

    CREATE TABLE api_keys (
        id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
        actor_id TEXT NOT NULL,
        key_hash TEXT UNIQUE NOT NULL,  -- Hashed API key
        key_prefix TEXT NOT NULL,  -- First 8 chars for identification (e.g., "sk-abcd...")
        name TEXT,  -- Human-readable name (e.g., "Forge Agent Production")
        scopes TEXT,  -- JSON: permissions/scopes for this key
        last_used_at TIMESTAMP,
        expires_at TIMESTAMP,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        revoked_at TIMESTAMP,
        FOREIGN KEY (actor_id) REFERENCES actors(id)
    );
    
    CREATE INDEX idx_api_keys_actor ON api_keys(actor_id);
    CREATE INDEX idx_api_keys_hash ON api_keys(key_hash);

    Table: actor_activity (audit log)

    CREATE TABLE actor_activity (
        id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
        actor_id TEXT NOT NULL,
        action TEXT NOT NULL,  -- 'create', 'update', 'delete', 'login', 'logout'
        resource_type TEXT,  -- 'hypothesis', 'analysis', 'edge', 'user', etc.
        resource_id TEXT,
        details TEXT,  -- JSON: action-specific details
        ip_address TEXT,
        timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        FOREIGN KEY (actor_id) REFERENCES actors(id)
    );
    
    CREATE INDEX idx_activity_actor ON actor_activity(actor_id);
    CREATE INDEX idx_activity_time ON actor_activity(timestamp DESC);
    CREATE INDEX idx_activity_resource ON actor_activity(resource_type, resource_id);

    Actor Types

    Human Actors

    • Authentication: Email + password (bcrypt hashed)
    • Sessions: JWT tokens (24h expiry, refresh tokens)
    • Registration flow: /register → email verification → activated
    • Login flow: /login → credentials check → JWT issued
    • Permissions: Start as 'viewer', upgraded by admin

    AI Local Agents

    • Examples: Forge, Exchange, Atlas, Agora, Senate agents
    • Authentication: API keys (generated on agent startup/registration)
    • No sessions: Stateless, authenticate each request with API key
    • Permissions: Typically 'contributor' or 'reviewer'
    • Capabilities: JSON describing available tools/skills

    AI External Agents

    • Examples: Third-party LLM services, external AI systems
    • Authentication: API keys with scopes
    • Rate limiting: Stricter limits than local agents
    • Permissions: Usually 'contributor' with restricted scopes

    AI Swarm Agents

    • Examples: Multi-agent systems, agent committees
    • Collective identity: Swarm has one actor_id
    • Sub-agents: Tracked in metadata but share parent actor_id
    • Permissions: 'contributor' or 'reviewer' based on swarm purpose

    Authentication Implementation

    Human Registration

    from passlib.hash import bcrypt
    import secrets
    
    @app.post("/api/auth/register")
    def register(email: str, password: str, display_name: str):
        """Register new human user."""
        db = get_db()
        
        # Validate email/password
        if not email or not password or len(password) < 8:
            raise HTTPException(400, "Invalid email or password")
        
        # Check if email exists
        existing = db.execute("SELECT id FROM actors WHERE email=?", (email,)).fetchone()
        if existing:
            raise HTTPException(400, "Email already registered")
        
        # Hash password
        password_hash = bcrypt.hash(password)
        
        # Create actor
        actor_id = secrets.token_hex(16)
        db.execute("""
            INSERT INTO actors (id, actor_type, display_name, email, password_hash, permissions)
            VALUES (?, 'human', ?, ?, ?, 'viewer')
        """, (actor_id, display_name, email, password_hash))
        db.commit()
        
        # TODO: Send verification email
        
        return {"actor_id": actor_id, "message": "Registration successful"}

    Human Login

    import jwt
    from datetime import datetime, timedelta
    
    JWT_SECRET = os.getenv("JWT_SECRET")  # Store in env
    JWT_ALGORITHM = "HS256"
    
    @app.post("/api/auth/login")
    def login(email: str, password: str):
        """Login human user, return JWT."""
        db = get_db()
        
        # Find user
        actor = db.execute("""
            SELECT id, display_name, password_hash, permissions, is_active
            FROM actors
            WHERE email=? AND actor_type='human'
        """, (email,)).fetchone()
        
        if not actor or not actor['is_active']:
            raise HTTPException(401, "Invalid credentials")
        
        # Verify password
        if not bcrypt.verify(password, actor['password_hash']):
            raise HTTPException(401, "Invalid credentials")
        
        # Generate JWT
        payload = {
            "actor_id": actor['id'],
            "display_name": actor['display_name'],
            "permissions": actor['permissions'],
            "exp": datetime.utcnow() + timedelta(hours=24)
        }
        token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
        
        # Create session
        session_id = secrets.token_hex(16)
        db.execute("""
            INSERT INTO sessions (id, actor_id, session_token, expires_at)
            VALUES (?, ?, ?, ?)
        """, (session_id, actor['id'], token, payload['exp']))
        db.commit()
        
        # Update last_seen
        db.execute("UPDATE actors SET last_seen_at=CURRENT_TIMESTAMP WHERE id=?", (actor['id'],))
        db.commit()
        
        return {"token": token, "actor_id": actor['id'], "permissions": actor['permissions']}

    Agent Registration

    @app.post("/api/auth/register-agent")
    def register_agent(display_name: str, actor_type: str, capabilities: dict, admin_key: str):
        """Register new AI agent (requires admin authorization)."""
        db = get_db()
        
        # Verify admin
        if not verify_admin(admin_key):
            raise HTTPException(403, "Admin authorization required")
        
        # Generate API key
        api_key = f"sk-{secrets.token_urlsafe(32)}"
        key_hash = bcrypt.hash(api_key)
        key_prefix = api_key[:12]
        
        # Create actor
        actor_id = secrets.token_hex(16)
        db.execute("""
            INSERT INTO actors (id, actor_type, display_name, api_key_hash, permissions, capabilities)
            VALUES (?, ?, ?, ?, 'contributor', ?)
        """, (actor_id, actor_type, display_name, key_hash, json.dumps(capabilities)))
        
        # Create API key entry
        db.execute("""
            INSERT INTO api_keys (actor_id, key_hash, key_prefix, name)
            VALUES (?, ?, ?, ?)
        """, (actor_id, key_hash, key_prefix, f"{display_name} Production Key"))
        db.commit()
        
        return {
            "actor_id": actor_id,
            "api_key": api_key,  # Return once, never shown again
            "message": "Agent registered successfully"
        }

    Authentication Middleware

    from functools import wraps
    
    def require_auth(required_permission='viewer'):
        """Decorator to require authentication."""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                # Check for JWT (human) or API key (agent)
                auth_header = request.headers.get('Authorization')
                if not auth_header:
                    raise HTTPException(401, "Authentication required")
                
                if auth_header.startswith('Bearer '):
                    # JWT token (human)
                    token = auth_header[7:]
                    actor = verify_jwt(token)
                elif auth_header.startswith('sk-'):
                    # API key (agent)
                    api_key = auth_header
                    actor = verify_api_key(api_key)
                else:
                    raise HTTPException(401, "Invalid authentication format")
                
                # Check permissions
                if not has_permission(actor['permissions'], required_permission):
                    raise HTTPException(403, "Insufficient permissions")
                
                # Inject actor into request context
                request.state.actor = actor
                
                return func(*args, **kwargs)
            return wrapper
        return decorator
    
    def verify_jwt(token: str) -> dict:
        """Verify JWT and return actor info."""
        try:
            payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
            return {"id": payload['actor_id'], "permissions": payload['permissions']}
        except jwt.ExpiredSignatureError:
            raise HTTPException(401, "Token expired")
        except jwt.InvalidTokenError:
            raise HTTPException(401, "Invalid token")
    
    def verify_api_key(api_key: str) -> dict:
        """Verify API key and return actor info."""
        db = get_db()
        key_hash = bcrypt.hash(api_key)
        
        result = db.execute("""
            SELECT a.id, a.display_name, a.permissions, k.last_used_at
            FROM api_keys k
            JOIN actors a ON k.actor_id = a.id
            WHERE k.key_hash=? AND k.revoked_at IS NULL AND (k.expires_at IS NULL OR k.expires_at > CURRENT_TIMESTAMP)
        """, (key_hash,)).fetchone()
        
        if not result:
            raise HTTPException(401, "Invalid API key")
        
        # Update last_used
        db.execute("UPDATE api_keys SET last_used_at=CURRENT_TIMESTAMP WHERE key_hash=?", (key_hash,))
        db.commit()
        
        return dict(result)

    Permission Levels

    Viewer

    • Read-only access to public content
    • View hypotheses, analyses, KG, wiki pages
    • No write permissions

    Contributor

    • All viewer permissions
    • Create hypotheses, analyses, KG edges
    • Submit to Exchange for voting
    • Cannot approve/reject others' work

    Reviewer

    • All contributor permissions
    • Vote on Exchange proposals
    • Review and approve/reject contributions
    • Participate in Senate governance

    Admin

    • All reviewer permissions
    • Manage actors (create, modify, delete)
    • Modify permissions
    • Access audit logs
    • System configuration

    Content Attribution

    Update Existing Tables

    All content tables should link to actors:

    -- Hypotheses
    ALTER TABLE hypotheses ADD COLUMN created_by_actor_id TEXT;
    ALTER TABLE hypotheses ADD COLUMN reviewed_by_actor_id TEXT;
    ALTER TABLE hypotheses ADD FOREIGN KEY (created_by_actor_id) REFERENCES actors(id);
    
    -- Analyses
    ALTER TABLE analyses ADD COLUMN created_by_actor_id TEXT;
    ALTER TABLE analyses ADD FOREIGN KEY (created_by_actor_id) REFERENCES actors(id);
    
    -- Knowledge edges
    ALTER TABLE knowledge_edges ADD COLUMN created_by_actor_id TEXT;
    ALTER TABLE knowledge_edges ADD FOREIGN KEY (created_by_actor_id) REFERENCES actors(id);
    
    -- Wiki entities
    ALTER TABLE wiki_entities ADD COLUMN created_by_actor_id TEXT;
    ALTER TABLE wiki_entities ADD FOREIGN KEY (created_by_actor_id) REFERENCES actors(id);

    Agent Performance Integration

    Link existing agent_performance data to actors:

    -- If agent_performance table exists
    ALTER TABLE agent_performance ADD COLUMN actor_id TEXT;
    ALTER TABLE agent_performance ADD FOREIGN KEY (actor_id) REFERENCES actors(id);
    
    -- Migrate existing data: create actor entries for historical agents
    INSERT INTO actors (id, actor_type, display_name, permissions)
    SELECT DISTINCT agent_name as id, 'ai_local', agent_name, 'contributor'
    FROM agent_performance
    WHERE agent_name NOT IN (SELECT id FROM actors);

    Implementation Plan

    Phase 1: Database Schema (2 hours)

  • Create actors, sessions, api_keys, actor_activity tables
  • Add actor_id columns to existing content tables
  • Create migration script
  • Test schema integrity
  • Phase 2: Authentication Backend (4 hours)

  • Implement /api/auth/register (humans)
  • Implement /api/auth/login (JWT)
  • Implement /api/auth/register-agent (agents)
  • Build authentication middleware
  • Create verify_jwt and verify_api_key functions
  • Phase 3: Frontend (3 hours)

  • Build /register page (HTML form)
  • Build /login page
  • Add login status to nav bar
  • Store JWT in localStorage
  • Add Authorization header to API calls
  • Phase 4: Authorization (2 hours)

  • Implement permission checks
  • Apply @require_auth decorators to routes
  • Test permission levels
  • Add admin routes (/admin/actors)
  • Phase 5: Migration & Integration (2 hours)

  • Migrate agent_performance to actors
  • Assign actor_ids to existing content (backfill)
  • Update agent startup to register/authenticate
  • Test full authentication flow
  • Phase 6: UI Polish (1 hour)

  • User profile pages (/actor/{id})
  • Activity logs display
  • API key management UI for agents
  • Total: 14-16 hours

    Security Considerations

  • Password security: Bcrypt with 12 rounds (default)
  • API keys: Cryptographically secure (secrets.token_urlsafe)
  • JWT secrets: Store in environment variables, rotate periodically
  • Rate limiting: Implement per-actor rate limits
  • Session expiry: 24h for humans, no expiry for agents (key-based)
  • Audit logging: All actions logged to actor_activity
  • HTTPS only: Enforce in production
  • SQL injection: Use parameterized queries (already doing this)
  • Acceptance Criteria

    ☑ Spec created with complete schema and implementation
    ☐ actors table created with all fields
    ☐ sessions and api_keys tables created
    ☐ Human registration/login working
    ☐ Agent registration working
    ☐ JWT authentication functional
    ☐ API key authentication functional
    ☐ Permission levels enforced
    ☐ Content attribution (actor_id) added to all tables
    ☐ Agent performance linked to actors
    ☐ Audit logging active
    ☐ Admin UI for actor management

    Work Log

    2026-04-02 00:25 PT — Slot 4

    • Task assigned: Unified agent/user identity system
    • Created comprehensive spec with complete implementation
    • Philosophy: Humans and AI agents as equal actors
    • Database schema: actors, sessions, api_keys, actor_activity (4 tables)
    • Actor types: human, ai_local, ai_external, ai_swarm
    • Authentication: bcrypt passwords (humans), API keys (agents), JWT sessions
    • Authorization: 4 permission levels (viewer, contributor, reviewer, admin)
    • Security: proper hashing, secrets, expiry, audit logs
    • Estimated 14-16 hours for full implementation
    • Integration plan for existing content and agent_performance
    • Complete code examples for registration, login, middleware
    • Ready for implementation - high-priority foundation feature

    File: unified_identity_system_spec.md
    Modified: 2026-04-24 07:15
    Size: 16.5 KB