Spaces:
Sleeping
Sleeping
| # app.py (with logging and debug improvements) | |
| import io, os, json, shutil, subprocess, traceback | |
| from typing import Dict, List, Any | |
| import gradio as gr | |
| from fastapi import FastAPI, UploadFile | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from PIL import Image | |
| import pytesseract | |
| import pdfplumber | |
| from pptx import Presentation | |
| from transformers import BlipProcessor, BlipForConditionalGeneration | |
| import torch | |
| import uvicorn | |
| # ----------- Tesseract Debugging ----------- | |
| try: | |
| print("\n--- DEBUG INFO ---") | |
| tesseract_path = shutil.which("tesseract") | |
| print("Tesseract path:", tesseract_path) | |
| if tesseract_path: | |
| result = subprocess.run(["tesseract", "--version"], capture_output=True, text=True) | |
| print("Tesseract version output:\n", result.stdout) | |
| else: | |
| print("Tesseract is NOT found in PATH") | |
| print("--- END DEBUG INFO ---\n") | |
| except Exception as e: | |
| print("Error during Tesseract check:", e) | |
| # ----------- BLIP Image Caption Model ----------- | |
| print("π Loading BLIP model...") | |
| processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base") | |
| blip_model = BlipForConditionalGeneration.from_pretrained( | |
| "Salesforce/blip-image-captioning-base", | |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32 | |
| ).eval() | |
| print("β BLIP model loaded") | |
| def _caption_image(img: Image.Image) -> str: | |
| """Run BLIP to caption a PIL image.""" | |
| try: | |
| inputs = processor(img.convert("RGB"), return_tensors="pt") | |
| with torch.no_grad(): | |
| out = blip_model.generate(**{k: v.to(blip_model.device) for k, v in inputs.items()}) | |
| return processor.decode(out[0], skip_special_tokens=True) | |
| except Exception as e: | |
| print(f"[ERROR] Captioning image failed: {e}") | |
| traceback.print_exc() | |
| return "[CAPTION_ERROR]" | |
| # ----------- Slidepack Processing ----------- | |
| def analyze_slidepack(file: Any) -> Dict[str, Any]: | |
| try: | |
| fname = os.path.basename(file.name) | |
| print(f"π Analyzing file: {fname}") | |
| slides_out: List[Dict[str, Any]] = [] | |
| # PPTX | |
| if fname.lower().endswith(".pptx"): | |
| pres = Presentation(file.name) | |
| for idx, slide in enumerate(pres.slides, start=1): | |
| texts, caps = [], [] | |
| for shape in slide.shapes: | |
| if hasattr(shape, "text"): | |
| text = shape.text.strip() | |
| if text: | |
| texts.append(text) | |
| if shape.shape_type == 13: | |
| img_blob = shape.image.blob | |
| img = Image.open(io.BytesIO(img_blob)) | |
| caps.append(_caption_image(img)) | |
| slides_out.append({ | |
| "slide_index": idx, | |
| "textBlocks": texts, | |
| "imageCaptions": caps | |
| }) | |
| elif fname.lower().endswith(".pdf"): | |
| with pdfplumber.open(file.name) as pdf: | |
| for idx, page in enumerate(pdf.pages, start=1): | |
| texts = [page.extract_text() or ""] | |
| caps = [] | |
| try: | |
| img = page.to_image(resolution=200).original | |
| caps.append(_caption_image(img)) | |
| ocr_text = pytesseract.image_to_string(img) | |
| if ocr_text.strip(): | |
| texts.append(ocr_text) | |
| except Exception as e: | |
| print(f"[WARN] Skipping image/OCR on page {idx} due to error: {e}") | |
| slides_out.append({ | |
| "slide_index": idx, | |
| "textBlocks": [t for t in texts if t.strip()], | |
| "imageCaptions": caps | |
| }) | |
| else: | |
| raise gr.Error("Unsupported file type. Upload a .pptx or .pdf.") | |
| print("β Slidepack analysis completed") | |
| return {"file_name": fname, "slides": slides_out} | |
| except Exception as e: | |
| print(f"[ERROR] Exception during slidepack analysis: {e}") | |
| traceback.print_exc() | |
| return {"error": str(e)} | |
| # ----------- Gradio UI ----------- | |
| demo = gr.Interface( | |
| fn=analyze_slidepack, | |
| inputs=gr.File(label="Upload PPTX or PDF"), | |
| outputs=gr.JSON(), | |
| title="Slide-Pack Full Extractor", | |
| description=( | |
| "Returns **every** text fragment and BLIP-generated image caption in JSON. " | |
| "No summarisation β perfect for downstream quiz agents." | |
| ), | |
| live=True | |
| ) | |
| # ----------- FastAPI REST Endpoint ----------- | |
| api = FastAPI() | |
| api.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def extract_slidepack(file: UploadFile): | |
| try: | |
| path = f"/tmp/{file.filename}" | |
| with open(path, "wb") as f: | |
| f.write(await file.read()) | |
| return analyze_slidepack(type("File", (object,), {"name": path})) | |
| except Exception as e: | |
| print(f"[ERROR] extract_slidepack endpoint failed: {e}") | |
| traceback.print_exc() | |
| return {"error": str(e)} | |
| # ----------- Main Entry ----------- | |
| if __name__ == "__main__": | |
| import asyncio | |
| async def delayed_startup(): | |
| print("β³ Waiting before MCP launch to avoid race condition...") | |
| await asyncio.sleep(3) | |
| print("π Launching with MCP support now.") | |
| demo.launch(mcp_server=True) | |
| asyncio.run(delayed_startup()) | |