Core Principle: Humans and AI agents are both "actors" in the SciDEX system. Minimize distinction - both can contribute hypotheses, analyses, knowledge graph edges, and participate in governance.
This design treats humans as one type of agent rather than treating agents as extensions of humans. All actors have:
actors table for humans and AI agentsCREATE TABLE actors (
id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
actor_type TEXT NOT NULL, -- 'human', 'ai_local', 'ai_external', 'ai_swarm'
display_name TEXT NOT NULL,
email TEXT UNIQUE, -- For humans, optional for agents
password_hash TEXT, -- For humans (bcrypt), NULL for agents
api_key_hash TEXT, -- For agents (hashed), NULL for humans
permissions TEXT DEFAULT 'viewer', -- 'viewer', 'contributor', 'reviewer', 'admin'
capabilities TEXT, -- JSON: agent-specific capabilities (e.g., tools available)
metadata TEXT, -- JSON: additional info (affiliation, model, etc.)
is_active INTEGER DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_seen_at TIMESTAMP,
created_by_actor_id TEXT, -- Bootstrap: NULL for first admin
FOREIGN KEY (created_by_actor_id) REFERENCES actors(id)
);
CREATE INDEX idx_actors_type ON actors(actor_type);
CREATE INDEX idx_actors_email ON actors(email);
CREATE INDEX idx_actors_permissions ON actors(permissions);CREATE TABLE sessions (
id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
actor_id TEXT NOT NULL,
session_token TEXT UNIQUE NOT NULL, -- JWT or session ID
expires_at TIMESTAMP NOT NULL,
ip_address TEXT,
user_agent TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (actor_id) REFERENCES actors(id)
);
CREATE INDEX idx_sessions_actor ON sessions(actor_id);
CREATE INDEX idx_sessions_token ON sessions(session_token);
CREATE INDEX idx_sessions_expiry ON sessions(expires_at);CREATE TABLE api_keys (
id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
actor_id TEXT NOT NULL,
key_hash TEXT UNIQUE NOT NULL, -- Hashed API key
key_prefix TEXT NOT NULL, -- First 8 chars for identification (e.g., "sk-abcd...")
name TEXT, -- Human-readable name (e.g., "Forge Agent Production")
scopes TEXT, -- JSON: permissions/scopes for this key
last_used_at TIMESTAMP,
expires_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
revoked_at TIMESTAMP,
FOREIGN KEY (actor_id) REFERENCES actors(id)
);
CREATE INDEX idx_api_keys_actor ON api_keys(actor_id);
CREATE INDEX idx_api_keys_hash ON api_keys(key_hash);CREATE TABLE actor_activity (
id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
actor_id TEXT NOT NULL,
action TEXT NOT NULL, -- 'create', 'update', 'delete', 'login', 'logout'
resource_type TEXT, -- 'hypothesis', 'analysis', 'edge', 'user', etc.
resource_id TEXT,
details TEXT, -- JSON: action-specific details
ip_address TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (actor_id) REFERENCES actors(id)
);
CREATE INDEX idx_activity_actor ON actor_activity(actor_id);
CREATE INDEX idx_activity_time ON actor_activity(timestamp DESC);
CREATE INDEX idx_activity_resource ON actor_activity(resource_type, resource_id);from passlib.hash import bcrypt
import secrets
@app.post("/api/auth/register")
def register(email: str, password: str, display_name: str):
"""Register new human user."""
db = get_db()
# Validate email/password
if not email or not password or len(password) < 8:
raise HTTPException(400, "Invalid email or password")
# Check if email exists
existing = db.execute("SELECT id FROM actors WHERE email=?", (email,)).fetchone()
if existing:
raise HTTPException(400, "Email already registered")
# Hash password
password_hash = bcrypt.hash(password)
# Create actor
actor_id = secrets.token_hex(16)
db.execute("""
INSERT INTO actors (id, actor_type, display_name, email, password_hash, permissions)
VALUES (?, 'human', ?, ?, ?, 'viewer')
""", (actor_id, display_name, email, password_hash))
db.commit()
# TODO: Send verification email
return {"actor_id": actor_id, "message": "Registration successful"}import jwt
from datetime import datetime, timedelta
JWT_SECRET = os.getenv("JWT_SECRET") # Store in env
JWT_ALGORITHM = "HS256"
@app.post("/api/auth/login")
def login(email: str, password: str):
"""Login human user, return JWT."""
db = get_db()
# Find user
actor = db.execute("""
SELECT id, display_name, password_hash, permissions, is_active
FROM actors
WHERE email=? AND actor_type='human'
""", (email,)).fetchone()
if not actor or not actor['is_active']:
raise HTTPException(401, "Invalid credentials")
# Verify password
if not bcrypt.verify(password, actor['password_hash']):
raise HTTPException(401, "Invalid credentials")
# Generate JWT
payload = {
"actor_id": actor['id'],
"display_name": actor['display_name'],
"permissions": actor['permissions'],
"exp": datetime.utcnow() + timedelta(hours=24)
}
token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
# Create session
session_id = secrets.token_hex(16)
db.execute("""
INSERT INTO sessions (id, actor_id, session_token, expires_at)
VALUES (?, ?, ?, ?)
""", (session_id, actor['id'], token, payload['exp']))
db.commit()
# Update last_seen
db.execute("UPDATE actors SET last_seen_at=CURRENT_TIMESTAMP WHERE id=?", (actor['id'],))
db.commit()
return {"token": token, "actor_id": actor['id'], "permissions": actor['permissions']}@app.post("/api/auth/register-agent")
def register_agent(display_name: str, actor_type: str, capabilities: dict, admin_key: str):
"""Register new AI agent (requires admin authorization)."""
db = get_db()
# Verify admin
if not verify_admin(admin_key):
raise HTTPException(403, "Admin authorization required")
# Generate API key
api_key = f"sk-{secrets.token_urlsafe(32)}"
key_hash = bcrypt.hash(api_key)
key_prefix = api_key[:12]
# Create actor
actor_id = secrets.token_hex(16)
db.execute("""
INSERT INTO actors (id, actor_type, display_name, api_key_hash, permissions, capabilities)
VALUES (?, ?, ?, ?, 'contributor', ?)
""", (actor_id, actor_type, display_name, key_hash, json.dumps(capabilities)))
# Create API key entry
db.execute("""
INSERT INTO api_keys (actor_id, key_hash, key_prefix, name)
VALUES (?, ?, ?, ?)
""", (actor_id, key_hash, key_prefix, f"{display_name} Production Key"))
db.commit()
return {
"actor_id": actor_id,
"api_key": api_key, # Return once, never shown again
"message": "Agent registered successfully"
}from functools import wraps
def require_auth(required_permission='viewer'):
"""Decorator to require authentication."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Check for JWT (human) or API key (agent)
auth_header = request.headers.get('Authorization')
if not auth_header:
raise HTTPException(401, "Authentication required")
if auth_header.startswith('Bearer '):
# JWT token (human)
token = auth_header[7:]
actor = verify_jwt(token)
elif auth_header.startswith('sk-'):
# API key (agent)
api_key = auth_header
actor = verify_api_key(api_key)
else:
raise HTTPException(401, "Invalid authentication format")
# Check permissions
if not has_permission(actor['permissions'], required_permission):
raise HTTPException(403, "Insufficient permissions")
# Inject actor into request context
request.state.actor = actor
return func(*args, **kwargs)
return wrapper
return decorator
def verify_jwt(token: str) -> dict:
"""Verify JWT and return actor info."""
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return {"id": payload['actor_id'], "permissions": payload['permissions']}
except jwt.ExpiredSignatureError:
raise HTTPException(401, "Token expired")
except jwt.InvalidTokenError:
raise HTTPException(401, "Invalid token")
def verify_api_key(api_key: str) -> dict:
"""Verify API key and return actor info."""
db = get_db()
key_hash = bcrypt.hash(api_key)
result = db.execute("""
SELECT a.id, a.display_name, a.permissions, k.last_used_at
FROM api_keys k
JOIN actors a ON k.actor_id = a.id
WHERE k.key_hash=? AND k.revoked_at IS NULL AND (k.expires_at IS NULL OR k.expires_at > CURRENT_TIMESTAMP)
""", (key_hash,)).fetchone()
if not result:
raise HTTPException(401, "Invalid API key")
# Update last_used
db.execute("UPDATE api_keys SET last_used_at=CURRENT_TIMESTAMP WHERE key_hash=?", (key_hash,))
db.commit()
return dict(result)All content tables should link to actors:
-- Hypotheses
ALTER TABLE hypotheses ADD COLUMN created_by_actor_id TEXT;
ALTER TABLE hypotheses ADD COLUMN reviewed_by_actor_id TEXT;
ALTER TABLE hypotheses ADD FOREIGN KEY (created_by_actor_id) REFERENCES actors(id);
-- Analyses
ALTER TABLE analyses ADD COLUMN created_by_actor_id TEXT;
ALTER TABLE analyses ADD FOREIGN KEY (created_by_actor_id) REFERENCES actors(id);
-- Knowledge edges
ALTER TABLE knowledge_edges ADD COLUMN created_by_actor_id TEXT;
ALTER TABLE knowledge_edges ADD FOREIGN KEY (created_by_actor_id) REFERENCES actors(id);
-- Wiki entities
ALTER TABLE wiki_entities ADD COLUMN created_by_actor_id TEXT;
ALTER TABLE wiki_entities ADD FOREIGN KEY (created_by_actor_id) REFERENCES actors(id);Link existing agent_performance data to actors:
-- If agent_performance table exists
ALTER TABLE agent_performance ADD COLUMN actor_id TEXT;
ALTER TABLE agent_performance ADD FOREIGN KEY (actor_id) REFERENCES actors(id);
-- Migrate existing data: create actor entries for historical agents
INSERT INTO actors (id, actor_type, display_name, permissions)
SELECT DISTINCT agent_name as id, 'ai_local', agent_name, 'contributor'
FROM agent_performance
WHERE agent_name NOT IN (SELECT id FROM actors);Total: 14-16 hours