"""
Docker Model Runner - Anthropic API Compatible
Full compatibility with Anthropic Messages API + Interleaved Thinking
Supports: /v1/messages, /anthropic/v1/messages, /api/v1/messages
Optimized for: 2 vCPU, 16GB RAM
"""
from fastapi import FastAPI, HTTPException, Header, Request, status
from fastapi.responses import StreamingResponse, HTMLResponse, FileResponse, JSONResponse
from fastapi.exceptions import RequestValidationError
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import Optional, List, Union, Literal, Any, Dict
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import os
from datetime import datetime
from contextlib import asynccontextmanager
import uuid
import time
import json
import asyncio
# CPU-optimized lightweight models
GENERATOR_MODEL = os.getenv("GENERATOR_MODEL", "distilgpt2")
MODEL_DISPLAY_NAME = os.getenv("MODEL_NAME", "MiniMax-M2")
# Set CPU threading
torch.set_num_threads(2)
# Global model cache
models = {}
def load_models():
global models
print("Loading models for CPU inference...")
models["tokenizer"] = AutoTokenizer.from_pretrained(GENERATOR_MODEL)
models["model"] = AutoModelForCausalLM.from_pretrained(GENERATOR_MODEL)
models["model"].eval()
if models["tokenizer"].pad_token is None:
models["tokenizer"].pad_token = models["tokenizer"].eos_token
print("✅ All models loaded successfully!")
@asynccontextmanager
async def lifespan(app: FastAPI):
load_models()
yield
models.clear()
app = FastAPI(
title="Model Runner",
description="Anthropic API Compatible - Works with Claude Code & Agentic Tools",
version="1.1.0",
lifespan=lifespan,
docs_url="/api/docs",
redoc_url="/api/redoc"
)
# CORS for agentic tools
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ============== Anthropic Error Handling ==============
class AnthropicError(BaseModel):
type: str
message: str
class AnthropicErrorResponse(BaseModel):
type: str = "error"
error: AnthropicError
def create_error_response(status_code: int, error_type: str, message: str) -> JSONResponse:
"""Create Anthropic-compatible error response."""
return JSONResponse(
status_code=status_code,
content={
"type": "error",
"error": {
"type": error_type,
"message": message
}
}
)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
"""Handle validation errors (400 - invalid_request_error)."""
errors = exc.errors()
message = "; ".join([f"{e['loc'][-1]}: {e['msg']}" for e in errors])
return create_error_response(400, "invalid_request_error", message)
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""Handle HTTP exceptions with Anthropic error format."""
error_mapping = {
400: "invalid_request_error",
401: "authentication_error",
403: "permission_error",
404: "not_found_error",
413: "request_too_large",
429: "rate_limit_error",
500: "api_error",
529: "overloaded_error"
}
error_type = error_mapping.get(exc.status_code, "api_error")
return create_error_response(exc.status_code, error_type, str(exc.detail))
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""Handle unexpected errors (500 - api_error)."""
return create_error_response(500, "api_error", f"An unexpected error occurred: {str(exc)}")
# ============== Anthropic API Models ==============
class TextBlock(BaseModel):
type: Literal["text"] = "text"
text: str
class ThinkingBlock(BaseModel):
type: Literal["thinking"] = "thinking"
thinking: str
class SignatureBlock(BaseModel):
type: Literal["signature"] = "signature"
signature: str
class ToolUseBlock(BaseModel):
type: Literal["tool_use"] = "tool_use"
id: str
name: str
input: Dict[str, Any]
class ToolResultContent(BaseModel):
type: Literal["tool_result"] = "tool_result"
tool_use_id: str
content: Union[str, List[TextBlock]]
is_error: Optional[bool] = False
class ImageSource(BaseModel):
type: Literal["base64", "url"]
media_type: Optional[str] = None
data: Optional[str] = None
url: Optional[str] = None
class ImageBlock(BaseModel):
type: Literal["image"] = "image"
source: ImageSource
ContentBlock = Union[TextBlock, ThinkingBlock, SignatureBlock, ToolUseBlock, ToolResultContent, ImageBlock, str]
class MessageParam(BaseModel):
role: Literal["user", "assistant"]
content: Union[str, List[ContentBlock]]
class ToolInputSchema(BaseModel):
type: str = "object"
properties: Optional[Dict[str, Any]] = None
required: Optional[List[str]] = None
class Tool(BaseModel):
name: str
description: str
input_schema: ToolInputSchema
class ToolChoice(BaseModel):
type: Literal["auto", "any", "tool"] = "auto"
name: Optional[str] = None
disable_parallel_tool_use: Optional[bool] = False
class ThinkingConfig(BaseModel):
type: Literal["enabled", "disabled"] = "disabled"
budget_tokens: Optional[int] = None
class Metadata(BaseModel):
user_id: Optional[str] = None
class AnthropicRequest(BaseModel):
model: str = "MiniMax-M2"
messages: List[MessageParam]
max_tokens: int = 4096
temperature: Optional[float] = Field(default=1.0, gt=0.0, le=1.0)
top_p: Optional[float] = Field(default=1.0, gt=0.0, le=1.0)
top_k: Optional[int] = None
stop_sequences: Optional[List[str]] = None
stream: Optional[bool] = False
system: Optional[Union[str, List[TextBlock]]] = None
tools: Optional[List[Tool]] = None
tool_choice: Optional[Union[ToolChoice, Dict[str, Any]]] = None
metadata: Optional[Metadata] = None
thinking: Optional[Union[ThinkingConfig, Dict[str, Any]]] = None
service_tier: Optional[str] = None
class Usage(BaseModel):
input_tokens: int
output_tokens: int
cache_creation_input_tokens: Optional[int] = 0
cache_read_input_tokens: Optional[int] = 0
class AnthropicResponse(BaseModel):
id: str
type: Literal["message"] = "message"
role: Literal["assistant"] = "assistant"
content: List[Union[TextBlock, ThinkingBlock, SignatureBlock, ToolUseBlock]]
model: str
stop_reason: Optional[Literal["end_turn", "max_tokens", "stop_sequence", "tool_use"]] = "end_turn"
stop_sequence: Optional[str] = None
usage: Usage
# ============== Helper Functions ==============
def extract_text_from_content(content: Union[str, List[ContentBlock]]) -> str:
if isinstance(content, str):
return content
texts = []
for block in content:
if isinstance(block, str):
texts.append(block)
elif hasattr(block, 'text'):
texts.append(block.text)
elif hasattr(block, 'thinking'):
texts.append(block.thinking)
elif isinstance(block, dict):
if block.get('type') == 'text':
texts.append(block.get('text', ''))
elif block.get('type') == 'thinking':
texts.append(block.get('thinking', ''))
return " ".join(texts)
def format_system_prompt(system: Optional[Union[str, List[TextBlock]]]) -> str:
if system is None:
return ""
if isinstance(system, str):
return system
return " ".join([block.text for block in system if hasattr(block, 'text')])
def format_messages_to_prompt(messages: List[MessageParam], system: Optional[Union[str, List[TextBlock]]] = None, include_thinking: bool = False) -> str:
prompt_parts = []
system_text = format_system_prompt(system)
if system_text:
prompt_parts.append(f"System: {system_text}\n\n")
for msg in messages:
role = msg.role
content = msg.content
if isinstance(content, list):
for block in content:
if isinstance(block, dict):
block_type = block.get('type', 'text')
if block_type == 'thinking' and include_thinking:
prompt_parts.append(f"{block.get('thinking', '')}\n")
elif block_type == 'text':
text_content = block.get('text', '')
if role == "user":
prompt_parts.append(f"Human: {text_content}\n\n")
else:
prompt_parts.append(f"Assistant: {text_content}\n\n")
elif block_type == 'tool_result':
prompt_parts.append(f"Tool Result: {block.get('content', '')}\n\n")
elif hasattr(block, 'type'):
if block.type == 'thinking' and include_thinking:
prompt_parts.append(f"{block.thinking}\n")
elif block.type == 'text':
if role == "user":
prompt_parts.append(f"Human: {block.text}\n\n")
else:
prompt_parts.append(f"Assistant: {block.text}\n\n")
else:
content_text = content if isinstance(content, str) else extract_text_from_content(content)
if role == "user":
prompt_parts.append(f"Human: {content_text}\n\n")
elif role == "assistant":
prompt_parts.append(f"Assistant: {content_text}\n\n")
prompt_parts.append("Assistant:")
return "".join(prompt_parts)
def generate_text(prompt: str, max_tokens: int, temperature: float, top_p: float) -> tuple:
tokenizer = models["tokenizer"]
model = models["model"]
inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=1024)
input_tokens = inputs["input_ids"].shape[1]
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=min(max_tokens, 512),
temperature=temperature if temperature > 0 else 1.0,
top_p=top_p,
do_sample=temperature > 0,
pad_token_id=tokenizer.pad_token_id,
eos_token_id=tokenizer.eos_token_id
)
generated_tokens = outputs[0][input_tokens:]
output_tokens = len(generated_tokens)
generated_text = tokenizer.decode(generated_tokens, skip_special_tokens=True)
return generated_text.strip(), input_tokens, output_tokens
def generate_thinking(prompt: str, budget_tokens: int = 100) -> tuple:
tokenizer = models["tokenizer"]
model = models["model"]
thinking_prompt = f"{prompt}\n\nLet me think through this step by step:\n"
inputs = tokenizer(thinking_prompt, return_tensors="pt", truncation=True, max_length=512)
input_tokens = inputs["input_ids"].shape[1]
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=min(budget_tokens, 256),
temperature=0.7,
top_p=0.9,
do_sample=True,
pad_token_id=tokenizer.pad_token_id,
eos_token_id=tokenizer.eos_token_id
)
generated_tokens = outputs[0][input_tokens:]
thinking_tokens = len(generated_tokens)
thinking_text = tokenizer.decode(generated_tokens, skip_special_tokens=True)
return thinking_text.strip(), thinking_tokens
async def generate_stream_with_thinking(prompt: str, max_tokens: int, temperature: float, top_p: float, message_id: str, model_name: str, thinking_enabled: bool = False, thinking_budget: int = 100):
tokenizer = models["tokenizer"]
model = models["model"]
inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=1024)
input_tokens = inputs["input_ids"].shape[1]
total_output_tokens = 0
message_start = {
"type": "message_start",
"message": {"id": message_id, "type": "message", "role": "assistant", "content": [], "model": model_name, "stop_reason": None, "stop_sequence": None, "usage": {"input_tokens": input_tokens, "output_tokens": 0}}
}
yield f"event: message_start\ndata: {json.dumps(message_start)}\n\n"
content_index = 0
if thinking_enabled:
thinking_block_start = {"type": "content_block_start", "index": content_index, "content_block": {"type": "thinking", "thinking": ""}}
yield f"event: content_block_start\ndata: {json.dumps(thinking_block_start)}\n\n"
thinking_text, thinking_tokens = generate_thinking(prompt, thinking_budget)
total_output_tokens += thinking_tokens
for i in range(0, len(thinking_text), 10):
chunk = thinking_text[i:i+10]
yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': content_index, 'delta': {'type': 'thinking_delta', 'thinking': chunk}})}\n\n"
await asyncio.sleep(0.01)
yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': content_index})}\n\n"
content_index += 1
yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': content_index, 'content_block': {'type': 'text', 'text': ''}})}\n\n"
with torch.no_grad():
outputs = model.generate(**inputs, max_new_tokens=min(max_tokens, 512), temperature=temperature if temperature > 0 else 1.0, top_p=top_p, do_sample=temperature > 0, pad_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id)
generated_tokens = outputs[0][input_tokens:]
generated_text = tokenizer.decode(generated_tokens, skip_special_tokens=True).strip()
total_output_tokens += len(generated_tokens)
for i in range(0, len(generated_text), 5):
yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': content_index, 'delta': {'type': 'text_delta', 'text': generated_text[i:i+5]}})}\n\n"
await asyncio.sleep(0.005)
yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': content_index})}\n\n"
yield f"event: message_delta\ndata: {json.dumps({'type': 'message_delta', 'delta': {'stop_reason': 'end_turn', 'stop_sequence': None}, 'usage': {'output_tokens': total_output_tokens}})}\n\n"
yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n"
def handle_tool_call(tools: List[Tool], messages: List[MessageParam], generated_text: str) -> Optional[ToolUseBlock]:
if not tools:
return None
for tool in tools:
if tool.name.lower() in generated_text.lower():
return ToolUseBlock(type="tool_use", id=f"toolu_{uuid.uuid4().hex[:24]}", name=tool.name, input={})
return None
# ============== Core Messages Handler ==============
async def handle_messages(request: AnthropicRequest):
"""Core handler for Anthropic Messages API"""
try:
message_id = f"msg_{uuid.uuid4().hex[:24]}"
thinking_enabled = False
thinking_budget = 100
if request.thinking:
if isinstance(request.thinking, dict):
thinking_enabled = request.thinking.get('type') == 'enabled'
thinking_budget = request.thinking.get('budget_tokens', 100) or 100
else:
thinking_enabled = request.thinking.type == 'enabled'
thinking_budget = request.thinking.budget_tokens or 100
prompt = format_messages_to_prompt(request.messages, request.system, include_thinking=thinking_enabled)
if request.stream:
return StreamingResponse(
generate_stream_with_thinking(prompt, request.max_tokens, request.temperature or 1.0, request.top_p or 1.0, message_id, request.model, thinking_enabled, thinking_budget),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"}
)
content_blocks = []
total_output_tokens = 0
if thinking_enabled:
thinking_text, thinking_tokens = generate_thinking(prompt, thinking_budget)
total_output_tokens += thinking_tokens
content_blocks.append(ThinkingBlock(type="thinking", thinking=thinking_text))
generated_text, input_tokens, output_tokens = generate_text(prompt, request.max_tokens, request.temperature or 1.0, request.top_p or 1.0)
total_output_tokens += output_tokens
tool_use = handle_tool_call(request.tools, request.messages, generated_text) if request.tools else None
if tool_use:
content_blocks.append(TextBlock(type="text", text=generated_text))
content_blocks.append(tool_use)
stop_reason = "tool_use"
else:
content_blocks.append(TextBlock(type="text", text=generated_text))
stop_reason = "end_turn"
return AnthropicResponse(id=message_id, content=content_blocks, model=request.model, stop_reason=stop_reason, usage=Usage(input_tokens=input_tokens, output_tokens=total_output_tokens))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============== Frontend ==============
@app.get("/", response_class=HTMLResponse)
async def home():
return HTMLResponse(content="""
Model Runner
""")
# ============== Anthropic API Routes ==============
# Support multiple base paths for compatibility
@app.post("/v1/messages")
async def messages_v1(request: AnthropicRequest):
"""Standard Anthropic API endpoint"""
return await handle_messages(request)
@app.post("/anthropic/v1/messages")
async def messages_anthropic(request: AnthropicRequest):
"""Anthropic base path - for Claude Code compatibility"""
return await handle_messages(request)
@app.post("/api/v1/messages")
async def messages_api(request: AnthropicRequest):
"""API base path variant"""
return await handle_messages(request)
# ============== OpenAI Compatible ==============
class ChatMessage(BaseModel):
role: str
content: Union[str, List[Dict[str, Any]]]
class ChatCompletionRequest(BaseModel):
model: str = "gpt-4"
messages: List[ChatMessage]
max_tokens: Optional[int] = 4096
temperature: Optional[float] = 0.7
top_p: Optional[float] = 1.0
stream: Optional[bool] = False
@app.post("/v1/chat/completions")
async def chat_completions(request: ChatCompletionRequest):
try:
# Extract text from messages
formatted_messages = []
for msg in request.messages:
if msg.role in ["user", "assistant"]:
content = msg.content
if isinstance(content, list):
text_parts = [c.get('text', '') for c in content if isinstance(c, dict) and c.get('type') == 'text']
content = ' '.join(text_parts)
formatted_messages.append(MessageParam(role=msg.role, content=content))
prompt = format_messages_to_prompt(formatted_messages)
generated_text, input_tokens, output_tokens = generate_text(prompt, request.max_tokens or 4096, request.temperature or 0.7, request.top_p or 1.0)
return {
"id": f"chatcmpl-{uuid.uuid4().hex[:24]}",
"object": "chat.completion",
"created": int(time.time()),
"model": request.model,
"choices": [{"index": 0, "message": {"role": "assistant", "content": generated_text}, "finish_reason": "stop"}],
"usage": {"prompt_tokens": input_tokens, "completion_tokens": output_tokens, "total_tokens": input_tokens + output_tokens}
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============== Models Endpoints ==============
@app.get("/v1/models")
@app.get("/anthropic/v1/models")
@app.get("/api/v1/models")
async def list_models():
return {
"object": "list",
"data": [
{"id": "claude-sonnet-4-20250514", "object": "model", "created": int(time.time()), "owned_by": "anthropic"},
{"id": "claude-3-5-sonnet-20241022", "object": "model", "created": int(time.time()), "owned_by": "anthropic"},
{"id": "MiniMax-M2", "object": "model", "created": int(time.time()), "owned_by": "local"},
{"id": "MiniMax-M2-Stable", "object": "model", "created": int(time.time()), "owned_by": "local"},
{"id": GENERATOR_MODEL, "object": "model", "created": int(time.time()), "owned_by": "local"}
]
}
# ============== Utility Endpoints ==============
@app.get("/health")
async def health():
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat(), "models_loaded": len(models) > 0}
@app.get("/info")
async def info():
return {
"name": "Model Runner",
"version": "1.1.0",
"api_compatibility": ["anthropic", "openai"],
"base_paths": ["/v1/messages", "/anthropic/v1/messages", "/api/v1/messages"],
"interleaved_thinking": True,
"agentic_tools": True
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)