CTapi-raw / app_optionB.py
Your Name
Deploy Option B: Query Parser + RAG + 355M Ranking
45cf63e
"""
Clinical Trial API - Option B (Simplified)
===========================================
Clean foundational RAG with single LLM query parser
Architecture:
1. Query Parser LLM (Llama-70B) - 3s, $0.001
2. RAG Search (BM25 + Semantic + Inverted Index) - 2s, free
3. 355M Perplexity Ranking - 2-5s, free
4. Structured JSON Output - instant, free
Total: ~7-10s per query, $0.001 cost
No response generation - clients use their own LLMs
"""
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import time
import logging
# Import Option B pipeline
import foundation_rag_optionB as rag
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(
title="Clinical Trial API - Option B",
description="Foundational RAG API with query parser LLM + perplexity ranking",
version="2.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ============================================================================
# REQUEST/RESPONSE MODELS
# ============================================================================
class SearchRequest(BaseModel):
query: str
top_k: int = 10
class Config:
schema_extra = {
"example": {
"query": "What trials exist for ianalumab in Sjogren's syndrome?",
"top_k": 10
}
}
class HealthResponse(BaseModel):
status: str
trials_loaded: int
embeddings_loaded: bool
api_version: str
architecture: str
# ============================================================================
# STARTUP
# ============================================================================
@app.on_event("startup")
async def startup_event():
"""Initialize RAG system on startup"""
logger.info("=" * 70)
logger.info("CLINICAL TRIAL API - OPTION B")
logger.info("=" * 70)
logger.info("Loading RAG data...")
try:
rag.load_all_data()
logger.info("=" * 70)
logger.info("✓ API READY - Option B Architecture Active")
logger.info("=" * 70)
except Exception as e:
logger.error(f"!!! Failed to load data: {e}")
logger.error("!!! API will start but queries will fail")
# ============================================================================
# ENDPOINTS
# ============================================================================
@app.get("/")
async def root():
"""API information"""
return {
"service": "Clinical Trial API - Option B",
"version": "2.0.0",
"architecture": "1 LLM (Query Parser) + RAG + 355M Perplexity Ranking",
"status": "healthy",
"endpoints": {
"POST /search": "Search clinical trials with structured JSON output",
"GET /health": "Health check",
"GET /docs": "Interactive API documentation (Swagger UI)",
"GET /redoc": "Alternative API documentation (ReDoc)"
},
"pipeline": [
"1. Query Parser LLM (Llama-70B) → Extract entities + synonyms (3s, $0.001)",
"2. RAG Search (BM25 + Semantic + Inverted Index) → Retrieve (2s, free)",
"3. 355M Perplexity Ranking → Rank by relevance (2-5s, free)",
"4. Structured JSON Output → Return ranked trials (instant, free)"
],
"performance": {
"average_latency": "7-10 seconds",
"cost_per_query": "$0.001",
"no_response_generation": "Clients handle text generation with their own LLMs"
}
}
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Health check endpoint"""
embeddings_loaded = rag.doc_embeddings is not None
chunks_loaded = len(rag.doc_chunks) if rag.doc_chunks else 0
return HealthResponse(
status="healthy" if embeddings_loaded else "degraded",
trials_loaded=chunks_loaded,
embeddings_loaded=embeddings_loaded,
api_version="2.0.0",
architecture="Option B: Query Parser LLM + RAG + 355M Ranking"
)
@app.post("/search")
async def search_trials(request: SearchRequest):
"""
Search clinical trials using Option B pipeline
**Pipeline:**
1. **Query Parser LLM** - Extracts entities (drugs, diseases, companies, endpoints)
and expands with synonyms using Llama-70B
2. **RAG Search** - Hybrid search using BM25 + semantic embeddings + inverted index
3. **355M Perplexity Ranking** - Re-ranks using Clinical Trial GPT perplexity scores
4. **Structured JSON Output** - Returns ranked trials with all metadata
**No Response Generation** - Returns raw trial data for client-side processing
Args:
- **query**: Your question about clinical trials
- **top_k**: Number of trials to return (default: 10, max: 50)
Returns:
- Structured JSON with ranked trials
- Query analysis (extracted entities, optimized search terms)
- Benchmarking data (timing breakdown)
- Trial metadata (NCT ID, title, status, phase, etc.)
- Scoring details (relevance, perplexity, rank changes)
**Example Query:**
```
{
"query": "What trials exist for ianalumab in Sjogren's syndrome?",
"top_k": 10
}
```
**Example Response:**
```
{
"query": "What trials exist for ianalumab in Sjogren's syndrome?",
"processing_time": 8.2,
"query_analysis": {
"extracted_entities": {
"drugs": ["ianalumab", "VAY736"],
"diseases": ["Sjogren's syndrome", "Sjögren's disease"],
"companies": [],
"endpoints": []
},
"optimized_search": "ianalumab VAY736 Sjogren's syndrome sjögren",
"parsing_time": 3.1
},
"results": {
"total_found": 30,
"returned": 10,
"top_relevance_score": 0.923
},
"trials": [
{
"nct_id": "NCT02962895",
"title": "Phase 2 Study of Ianalumab in Sjögren's Syndrome",
"status": "Completed",
"phase": "Phase 2",
"conditions": "Sjögren's Syndrome",
"interventions": "Ianalumab (VAY736)",
"sponsor": "Novartis",
"scoring": {
"relevance_score": 0.923,
"perplexity": 12.4,
"rank_before_355m": 2,
"rank_after_355m": 1
},
"url": "https://clinicaltrials.gov/study/NCT02962895"
}
],
"benchmarking": {
"query_parsing_time": 3.1,
"rag_search_time": 2.3,
"355m_ranking_time": 2.8,
"total_processing_time": 8.2
}
}
```
"""
try:
logger.info(f"[SEARCH] Query: {request.query[:100]}...")
# Validate top_k
if request.top_k > 50:
logger.warning(f"[SEARCH] top_k={request.top_k} exceeds max 50, capping")
request.top_k = 50
elif request.top_k < 1:
logger.warning(f"[SEARCH] top_k={request.top_k} invalid, using default 10")
request.top_k = 10
start_time = time.time()
# Process with Option B pipeline
result = rag.process_query_option_b(request.query, top_k=request.top_k)
processing_time = time.time() - start_time
logger.info(f"[SEARCH] ✓ Completed in {processing_time:.2f}s")
# Ensure processing_time is set
if 'processing_time' not in result or result['processing_time'] == 0:
result['processing_time'] = processing_time
return result
except Exception as e:
logger.error(f"[SEARCH] Error: {str(e)}")
import traceback
return {
"error": str(e),
"traceback": traceback.format_exc(),
"query": request.query,
"processing_time": time.time() - start_time if 'start_time' in locals() else 0
}
# ============================================================================
# RUN SERVER
# ============================================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)