Spaces:
Running
Running
| import os | |
| import io | |
| import logging | |
| import zipfile | |
| import tarfile | |
| import time | |
| import uvicorn | |
| import fitz # PyMuPDF | |
| import docx # python-docx | |
| import pptx # python-pptx | |
| import openpyxl | |
| import pandas as pd | |
| from PIL import Image | |
| import pytesseract | |
| from fastapi import FastAPI, UploadFile, File, HTTPException, Header, BackgroundTasks, Body | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from typing import List, Optional, Tuple | |
| import asyncio | |
| from concurrent.futures import ThreadPoolExecutor | |
| import magic | |
| import chardet | |
| import json | |
| import xml.etree.ElementTree as ET | |
| from pathlib import Path | |
| import tempfile | |
| import shutil | |
| import subprocess | |
| from pdf2image import convert_from_bytes | |
| import concurrent.futures | |
| from vector import vdb | |
| from pydantic import BaseModel | |
| from typing import Optional | |
| from typing import List, Dict | |
| from fastapi.responses import JSONResponse | |
| import numpy as np | |
| import re | |
| # ==================== CONFIGURATION ==================== | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s | %(levelname)s | %(name)s | %(message)s' | |
| ) | |
| logger = logging.getLogger("ProductionExtractor") | |
| # Production Configuration | |
| class Config: | |
| MAX_ZIP_DEPTH = 3 | |
| MAX_FILES_IN_ZIP = 100 | |
| MAX_FILE_SIZE_MB = 50 | |
| MAX_TOTAL_SIZE_MB = 500 | |
| TIMEOUT_SECONDS = 300 | |
| WORKER_THREADS = 4 | |
| TEXTRACT_TIMEOUT = 30 | |
| MAX_PDF_PAGES = 100 | |
| TESSERACT_TIMEOUT = 60 | |
| ENABLE_OCR = True | |
| MAX_IMAGE_PIXELS = 80_000_000 # ~40MP limit for PIL | |
| OCR_LANGUAGE = os.getenv("TESSERACT_LANGUAGE", "eng+hin") | |
| class SearchRequest(BaseModel): | |
| query: str | |
| target: Optional[str] = None | |
| # Performance metrics tracking | |
| metrics = { | |
| "files_processed": 0, | |
| "total_bytes": 0, | |
| "processing_time": 0, | |
| "errors": [] | |
| } | |
| app = FastAPI( | |
| title="NeuralStream Production Extractor", | |
| version="1.0.0", | |
| description="High-performance file extraction service with support for 50+ file types", | |
| docs_url="/docs", | |
| redoc_url="/redoc" | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Thread pool for blocking operations | |
| executor = ThreadPoolExecutor(max_workers=Config.WORKER_THREADS) | |
| # Configure Tesseract path if needed | |
| if os.name == 'nt': # Windows | |
| tesseract_path = r'C:\Program Files\Tesseract-OCR\tesseract.exe' | |
| if os.path.exists(tesseract_path): | |
| pytesseract.pytesseract.tesseract_cmd = tesseract_path | |
| # ==================== UTILITY FUNCTIONS ==================== | |
| def sanitize_filename(filename: str) -> str: | |
| """Sanitize filename to prevent path traversal attacks.""" | |
| return os.path.basename(filename).replace('\\', '/') | |
| def get_file_extension(filename: str) -> str: | |
| """Extract file extension in a safe way.""" | |
| return Path(filename).suffix.lower() | |
| def detect_file_type(content: bytes, filename: str) -> str: | |
| """Detect file type using both magic numbers and extension.""" | |
| try: | |
| mime = magic.from_buffer(content[:2048], mime=True) | |
| return mime | |
| except Exception: | |
| ext = get_file_extension(filename) | |
| return f"extension/{ext}" | |
| def is_binary_file(content: bytes) -> bool: | |
| """Heuristic check if file is binary.""" | |
| if not content: | |
| return False | |
| if b'\x00' in content[:1024]: | |
| return True | |
| # Check if >30% of bytes are non-printable | |
| text_chars = bytearray({7,8,9,10,12,13,27} | set(range(0x20, 0x100)) - {0x7f}) | |
| sample = content[:1024] if len(content) >= 1024 else content | |
| if len(sample) == 0: | |
| return False | |
| try: | |
| non_text = sample.translate(None, text_chars) | |
| return float(len(non_text)) / len(sample) > 0.3 | |
| except: | |
| return False | |
| def truncate_content(content: str, max_length: int = 100000) -> str: | |
| """Truncate content if too long, keeping start and end.""" | |
| if len(content) <= max_length: | |
| return content | |
| half = max_length // 2 | |
| return content[:half] + f"\n\n[... TRUNCATED {len(content) - max_length} CHARACTERS ...]\n\n" + content[-half:] | |
| # ==================== EXTRACTION ENGINES ==================== | |
| def decode_text_safe(content: bytes, filename: str) -> str: | |
| """Tier 1: Universal text extraction with advanced encoding detection.""" | |
| try: | |
| # Try UTF-8 first (most common) | |
| try: | |
| decoded = content.decode('utf-8') | |
| if not is_binary_file(content): | |
| return format_text_content(decoded, filename, 'utf-8') | |
| except UnicodeDecodeError: | |
| pass | |
| # Try common encodings | |
| for encoding in ['utf-8-sig', 'latin-1', 'cp1252', 'ascii']: | |
| try: | |
| decoded = content.decode(encoding) | |
| if not is_binary_file(content): | |
| return format_text_content(decoded, filename, encoding) | |
| except UnicodeDecodeError: | |
| continue | |
| # Fallback to chardet | |
| try: | |
| detection = chardet.detect(content) | |
| encoding = detection['encoding'] or 'utf-8' | |
| decoded = content.decode(encoding, errors='replace') | |
| return format_text_content(decoded, filename, f"{encoding} (detected)") | |
| except: | |
| return f"\n--- BINARY/TEXT FILE: {filename} ---\n[Content appears to be binary or has unknown encoding]\n" | |
| except Exception as e: | |
| logger.error(f"Text extraction error for {filename}: {e}") | |
| return f"\n[Error extracting text from {filename}: {str(e)}]\n" | |
| def format_text_content(content: str, filename: str, encoding: str) -> str: | |
| """Format text content with metadata.""" | |
| content = truncate_content(content) | |
| return f""" | |
| --- TEXT FILE: {filename} --- | |
| Encoding: {encoding} | |
| Size: {len(content)} characters | |
| {content} | |
| --- END TEXT FILE --- | |
| """ | |
| # ==================== DOCUMENT EXTRACTION ==================== | |
| def extract_pdf(content: bytes, filename: str) -> str: | |
| """Advanced PDF extraction with OCR fallback.""" | |
| start_time = time.time() | |
| try: | |
| text_buffer = [] | |
| metadata_info = [] | |
| with fitz.open(stream=content, filetype="pdf") as doc: | |
| if doc.is_encrypted: | |
| try: | |
| doc.authenticate("") | |
| except: | |
| return f"\n[ENCRYPTED PDF: {filename} - Cannot extract content]\n" | |
| metadata = doc.metadata | |
| if metadata: | |
| metadata_info.append(f"Title: {metadata.get('title', 'N/A')}") | |
| metadata_info.append(f"Author: {metadata.get('author', 'N/A')}") | |
| metadata_info.append(f"Subject: {metadata.get('subject', 'N/A')}") | |
| metadata_info.append(f"Created: {metadata.get('creationDate', 'N/A')}") | |
| total_pages = len(doc) | |
| pages_extracted = 0 | |
| for i, page in enumerate(doc): | |
| if i >= Config.MAX_PDF_PAGES: | |
| text_buffer.append(f"\n[... Truncated at {Config.MAX_PDF_PAGES} pages from total {total_pages} ...]\n") | |
| break | |
| page_text = page.get_text("text") | |
| if page_text.strip(): | |
| text_buffer.append(f"\n--- Page {i+1} ---") | |
| text_buffer.append(page_text) | |
| pages_extracted += 1 | |
| full_text = "\n".join(text_buffer) | |
| if len(full_text.strip()) < 10 and Config.ENABLE_OCR: | |
| logger.info(f"PDF appears to be scanned, attempting OCR: {filename}") | |
| ocr_result = extract_text_from_image_pdf(content, filename) | |
| if ocr_result: | |
| elapsed = time.time() - start_time | |
| return f""" | |
| === PDF DOCUMENT (OCR): {filename} === | |
| Metadata: | |
| {chr(10).join(metadata_info)} | |
| Processing Time: {elapsed:.2f}s | |
| Pages: {pages_extracted}/{total_pages} | |
| {ocr_result} | |
| === END PDF === | |
| """ | |
| elapsed = time.time() - start_time | |
| return f""" | |
| === PDF DOCUMENT: {filename} === | |
| Metadata: | |
| {chr(10).join(metadata_info)} | |
| Extraction Time: {elapsed:.2f}s | |
| Pages: {pages_extracted}/{total_pages} | |
| {full_text} | |
| === END PDF === | |
| """ | |
| except Exception as e: | |
| logger.error(f"PDF extraction error for {filename}: {e}") | |
| return f"\n[Error parsing PDF {filename}: {str(e)}]\n" | |
| def extract_docx(content: bytes, filename: str) -> str: | |
| """Advanced DOCX extraction with tables.""" | |
| try: | |
| doc = docx.Document(io.BytesIO(content)) | |
| properties = [] | |
| if doc.core_properties.title: | |
| properties.append(f"Title: {doc.core_properties.title}") | |
| if doc.core_properties.author: | |
| properties.append(f"Author: {doc.core_properties.author}") | |
| if doc.core_properties.created: | |
| properties.append(f"Created: {doc.core_properties.created}") | |
| paragraphs = [] | |
| for para in doc.paragraphs: | |
| if para.text.strip(): | |
| paragraphs.append(para.text) | |
| tables_text = [] | |
| for i, table in enumerate(doc.tables): | |
| table_data = [] | |
| for row in table.rows: | |
| row_data = [cell.text for cell in row.cells] | |
| table_data.append(" | ".join(row_data)) | |
| if table_data: | |
| tables_text.append(f"\n--- Table {i+1} ---") | |
| tables_text.append("\n".join(table_data)) | |
| result = "\n".join(paragraphs) | |
| if tables_text: | |
| result += "\n" + "\n".join(tables_text) | |
| return f""" | |
| === WORD DOCUMENT: {filename} === | |
| Metadata: | |
| {chr(10).join(properties)} | |
| {result} | |
| === END DOCUMENT === | |
| """ | |
| except Exception as e: | |
| logger.error(f"DOCX extraction error for {filename}: {e}") | |
| return f"\n[Error parsing DOCX {filename}: {str(e)}]\n" | |
| def extract_pptx(content: bytes, filename: str) -> str: | |
| """Extract text from PowerPoint presentations.""" | |
| try: | |
| prs = pptx.Presentation(io.BytesIO(content)) | |
| text_slides = [] | |
| for i, slide in enumerate(prs.slides): | |
| slide_text = [] | |
| for shape in slide.shapes: | |
| if hasattr(shape, "text") and shape.text: | |
| if shape.text.strip(): | |
| slide_text.append(shape.text) | |
| # Check for table text | |
| if shape.has_table: | |
| for row in shape.table.rows: | |
| for cell in row.cells: | |
| if cell.text.strip(): | |
| slide_text.append(cell.text) | |
| if slide_text: | |
| text_slides.append(f"\n--- Slide {i+1} ---") | |
| text_slides.extend(slide_text) | |
| return f""" | |
| === POWERPOINT: {filename} === | |
| Slides: {len(prs.slides)} | |
| {chr(10).join(text_slides)} | |
| === END POWERPOINT === | |
| """ | |
| except Exception as e: | |
| logger.error(f"PPTX extraction error for {filename}: {e}") | |
| return f"\n[Error parsing PPTX {filename}: {str(e)}]\n" | |
| def extract_excel(content: bytes, filename: str) -> str: | |
| """Extract data from Excel files.""" | |
| try: | |
| wb = openpyxl.load_workbook(io.BytesIO(content), read_only=True, data_only=True) | |
| sheets_data = [] | |
| for sheet_name in wb.sheetnames: | |
| sheet = wb[sheet_name] | |
| sheet_rows = [] | |
| max_rows = 100 | |
| for i, row in enumerate(sheet.iter_rows(values_only=True)): | |
| if i >= max_rows: | |
| break | |
| row_data = [str(cell) if cell is not None else "" for cell in row] | |
| sheet_rows.append(" | ".join(row_data)) | |
| if sheet_rows: | |
| sheets_data.append(f"\n--- Sheet: {sheet_name} ---") | |
| sheets_data.extend(sheet_rows) | |
| if len(sheet_rows) >= max_rows: | |
| sheets_data.append(f"[... Only first {max_rows} rows shown ...]") | |
| try: | |
| df = pd.read_excel(io.BytesIO(content), engine='openpyxl') | |
| pandas_output = df.head(50).to_string(index=False, max_rows=50, max_colwidth=50) | |
| if pandas_output: | |
| sheets_data.append("\n--- Pandas Format (First 50 rows) ---") | |
| sheets_data.append(pandas_output) | |
| if len(df) > 50: | |
| sheets_data.append(f"[... {len(df) - 50} more rows truncated ...]") | |
| except Exception as pandas_error: | |
| logger.warning(f"Pandas extraction failed: {pandas_error}") | |
| return f""" | |
| === EXCEL FILE: {filename} === | |
| {chr(10).join(sheets_data)} | |
| === END EXCEL === | |
| """ | |
| except Exception as e: | |
| logger.error(f"Excel extraction error for {filename}: {e}") | |
| return f"\n[Error parsing Excel {filename}: {str(e)}]\n" | |
| # ==================== IMAGE EXTRACTION ==================== | |
| def extract_text_from_image_pdf(pdf_content: bytes, filename: str) -> Optional[str]: | |
| """Extract text from image-based PDF using OCR with pdf2image.""" | |
| if not Config.ENABLE_OCR: | |
| return None | |
| try: | |
| extracted_text = [] | |
| # Convert PDF to images with proper error handling | |
| images = convert_from_bytes( | |
| pdf_content, | |
| dpi=300, | |
| fmt='jpeg', | |
| thread_count=2, | |
| poppler_path=None # Will use system poppler | |
| ) | |
| logger.info(f"Converted {len(images)} pages from {filename} for OCR") | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: | |
| future_to_page = { | |
| executor.submit(perform_ocr_on_image, image, page_num): page_num | |
| for page_num, image in enumerate(images[:Config.MAX_PDF_PAGES]) | |
| } | |
| for future in concurrent.futures.as_completed(future_to_page, timeout=Config.TESSERACT_TIMEOUT): | |
| page_num = future_to_page[future] | |
| try: | |
| text = future.result(timeout=30) | |
| if text and text.strip(): | |
| extracted_text.append(f"\n--- Page {page_num + 1} (OCR) ---") | |
| extracted_text.append(text) | |
| logger.info(f"OCR completed for page {page_num + 1}") | |
| except Exception as e: | |
| logger.warning(f"OCR failed for page {page_num + 1}: {e}") | |
| continue | |
| if extracted_text: | |
| return "\n".join(extracted_text) | |
| else: | |
| return None | |
| except Exception as e: | |
| logger.error(f"PDF to image conversion or OCR failed for {filename}: {e}") | |
| return None | |
| def perform_ocr_on_image(image: Image.Image, page_num: int) -> str: | |
| """Perform OCR on a single image with proper configuration.""" | |
| try: | |
| # Resize if too large | |
| width, height = image.size | |
| total_pixels = width * height | |
| if total_pixels > Config.MAX_IMAGE_PIXELS: | |
| scale_factor = (Config.MAX_IMAGE_PIXELS / total_pixels) ** 0.5 | |
| new_width = int(width * scale_factor) | |
| new_height = int(height * scale_factor) | |
| image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
| logger.info(f"Resized page {page_num + 1} from {width}x{height} to {new_width}x{new_height}") | |
| # Configure Tesseract | |
| custom_config = f'--oem 3 --psm 3 -l {Config.OCR_LANGUAGE}' | |
| # Perform OCR | |
| text = pytesseract.image_to_string(image, config=custom_config, timeout=30) | |
| return truncate_content(text.strip(), max_length=50000) | |
| except Exception as e: | |
| logger.error(f"OCR error on page {page_num + 1}: {e}") | |
| return "" | |
| def extract_image_ocr(content: bytes, filename: str) -> str: | |
| """Extract text from image files using OCR.""" | |
| if not Config.ENABLE_OCR: | |
| return f"\n[IMAGE FILE: {filename}]\n[Image extraction disabled]\n" | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=get_file_extension(filename)) as temp_img: | |
| temp_img.write(content) | |
| temp_img.flush() | |
| try: | |
| # Open and check image | |
| with Image.open(temp_img.name) as img: | |
| img = img.convert('RGB') # Ensure RGB mode | |
| # Resize if too large | |
| width, height = img.size | |
| total_pixels = width * height | |
| if total_pixels > Config.MAX_IMAGE_PIXELS: | |
| scale_factor = (Config.MAX_IMAGE_PIXELS / total_pixels) ** 0.5 | |
| new_size = (int(width * scale_factor), int(height * scale_factor)) | |
| img = img.resize(new_size, Image.Resampling.LANCZOS) | |
| # Perform OCR | |
| custom_config = f'--oem 3 --psm 3 -l {Config.OCR_LANGUAGE}' | |
| text = pytesseract.image_to_string(img, config=custom_config, timeout=30) | |
| if text.strip(): | |
| return f""" | |
| --- IMAGE FILE (OCR): {filename} --- | |
| Size: {img.size[0]}x{img.size[1]} pixels | |
| Format: {img.format} | |
| Extracted Text: | |
| {text.strip()} | |
| --- END IMAGE --- | |
| """ | |
| else: | |
| return f"\n[IMAGE FILE: {filename}]\n[No text detected in image]\n" | |
| finally: | |
| os.unlink(temp_img.name) | |
| except Exception as e: | |
| logger.error(f"Image OCR extraction error for {filename}: {e}") | |
| return f"\n[Error processing image {filename}: {str(e)}]\n" | |
| # ==================== ARCHIVE EXTRACTION ==================== | |
| def process_zip_archive(zip_bytes: bytes, zip_name: str, depth: int = 0) -> Tuple[str, int]: | |
| """Recursive ZIP extraction with safety limits.""" | |
| if depth > Config.MAX_ZIP_DEPTH: | |
| return f"\n[ZIP Depth Limit Reached: {zip_name}]\n", 0 | |
| output_log = f"\n>>> ZIP ARCHIVE: {zip_name} (Depth {depth}) <<<\n" | |
| file_count = 0 | |
| total_size = 0 | |
| try: | |
| with zipfile.ZipFile(io.BytesIO(zip_bytes)) as z: | |
| file_list = [f for f in z.infolist() | |
| if not f.filename.startswith(('.', '__')) | |
| and not f.is_dir()] | |
| for zf in file_list: | |
| if file_count >= Config.MAX_FILES_IN_ZIP: | |
| output_log += f"\n[... File limit reached: {Config.MAX_FILES_IN_ZIP} files ...]\n" | |
| break | |
| if zf.file_size == 0 or zf.file_size > (Config.MAX_FILE_SIZE_MB * 1024 * 1024): | |
| continue | |
| total_size += zf.file_size | |
| if total_size > (Config.MAX_TOTAL_SIZE_MB * 1024 * 1024): | |
| output_log += f"\n[... Total size limit reached: {Config.MAX_TOTAL_SIZE_MB}MB ...]\n" | |
| break | |
| try: | |
| with z.open(zf) as f: | |
| content = f.read() | |
| ext = get_file_extension(zf.filename) | |
| if ext in ['.zip']: | |
| nested_output, nested_count = process_zip_archive(content, zf.filename, depth + 1) | |
| output_log += nested_output | |
| file_count += nested_count | |
| else: | |
| output_log += process_file_bytes(zf.filename, content) | |
| file_count += 1 | |
| except Exception as e: | |
| logger.error(f"Error processing nested file {zf.filename}: {e}") | |
| output_log += f"\n[Error processing {zf.filename} inside {zip_name}]\n" | |
| continue | |
| except zipfile.BadZipFile: | |
| return f"\n[Error: Corrupt Zip Archive - {zip_name}]\n", 0 | |
| except Exception as e: | |
| logger.error(f"Zip processing error for {zip_name}: {e}") | |
| return f"\n[Zip Processing Error: {str(e)}]\n", 0 | |
| output_log += f"\n>>> END ZIP: {zip_name} ({file_count} files) <<<\n" | |
| return output_log, file_count | |
| def extract_tar_gz(content: bytes, filename: str) -> str: | |
| """Extract files from tar.gz archives.""" | |
| output_log = f"\n>>> TAR.GZ ARCHIVE: {filename} <<<\n" | |
| file_count = 0 | |
| try: | |
| # Determine compression mode | |
| if filename.endswith('.tar.gz') or filename.endswith('.tgz'): | |
| mode = 'r:gz' | |
| elif filename.endswith('.tar.bz2'): | |
| mode = 'r:bz2' | |
| elif filename.endswith('.tar.xz'): | |
| mode = 'r:xz' | |
| else: | |
| mode = 'r:' | |
| with tarfile.open(fileobj=io.BytesIO(content), mode=mode) as tar: | |
| members = [m for m in tar.getmembers() | |
| if m.isfile() | |
| and not m.name.startswith(('.', '__')) | |
| and m.size <= (Config.MAX_FILE_SIZE_MB * 1024 * 1024)] | |
| for member in members: | |
| if file_count >= Config.MAX_FILES_IN_ZIP: | |
| output_log += "\n[...Tar file limit reached...]\n" | |
| break | |
| try: | |
| f = tar.extractfile(member) | |
| if f: | |
| content = f.read() | |
| output_log += process_file_bytes(member.name, content) | |
| file_count += 1 | |
| except Exception as e: | |
| logger.error(f"Error extracting {member.name}: {e}") | |
| continue | |
| except Exception as e: | |
| logger.error(f"TAR extraction error for {filename}: {e}") | |
| return f"\n[Error processing TAR {filename}: {str(e)}]\n" | |
| output_log += f"\n>>> END TAR: {filename} ({file_count} files) <<<\n" | |
| return output_log | |
| # ==================== STRUCTURED DATA EXTRACTION ==================== | |
| def extract_json(content: bytes, filename: str) -> str: | |
| """Extract and format JSON files.""" | |
| try: | |
| json_obj = json.loads(content.decode('utf-8')) | |
| formatted = json.dumps(json_obj, indent=2, ensure_ascii=False) | |
| return f""" | |
| === JSON FILE: {filename} === | |
| {formatted} | |
| === END JSON === | |
| """ | |
| except Exception as e: | |
| logger.error(f"JSON parsing error for {filename}: {e}") | |
| return decode_text_safe(content, filename) | |
| def extract_xml(content: bytes, filename: str) -> str: | |
| """Extract readable text from XML files.""" | |
| try: | |
| root = ET.fromstring(content) | |
| def extract_text(element, depth=0): | |
| text_parts = [] | |
| indent = " " * depth | |
| text_parts.append(f"{indent}<{element.tag}>") | |
| if element.text and element.text.strip(): | |
| text_parts.append(f"{indent} {element.text.strip()}") | |
| for child in element: | |
| text_parts.extend(extract_text(child, depth + 1)) | |
| text_parts.append(f"{indent}</{element.tag}>") | |
| return text_parts | |
| extracted = extract_text(root) | |
| return f""" | |
| === XML FILE: {filename} === | |
| {chr(10).join(extracted)} | |
| === END XML === | |
| """ | |
| except Exception as e: | |
| logger.error(f"XML parsing error for {filename}: {e}") | |
| return decode_text_safe(content, filename) | |
| def extract_csv(content: bytes, filename: str) -> str: | |
| """Extract and format CSV files.""" | |
| try: | |
| df = pd.read_csv(io.BytesIO(content), encoding_errors='replace') | |
| output = df.head(100).to_string(index=False, max_rows=100, max_colwidth=50) | |
| row_count = len(df) | |
| result = f""" | |
| === CSV FILE: {filename} === | |
| Total Rows: {row_count} | |
| Columns: {', '.join(df.columns.astype(str))} | |
| First 100 Rows: | |
| {output} | |
| """ | |
| if row_count > 100: | |
| result += f"\n[... {row_count - 100} more rows truncated ...]\n" | |
| result += "\n=== END CSV ===\n" | |
| return result | |
| except Exception as e: | |
| logger.error(f"CSV parsing error for {filename}: {e}") | |
| return decode_text_safe(content, filename) | |
| # ==================== MAIN ROUTING LOGIC ==================== | |
| def process_file_bytes(filename: str, content: bytes) -> str: | |
| """Route files to appropriate extraction engines.""" | |
| start_time = time.time() | |
| safe_name = sanitize_filename(filename) | |
| content_size = len(content) | |
| ext = get_file_extension(safe_name) | |
| try: | |
| result = "" | |
| # Document files | |
| if ext == '.pdf': | |
| result = extract_pdf(content, safe_name) | |
| elif ext == '.docx': | |
| result = extract_docx(content, safe_name) | |
| elif ext == '.pptx': | |
| result = extract_pptx(content, safe_name) | |
| elif ext in ['.xlsx', '.xls']: | |
| result = extract_excel(content, safe_name) | |
| # Archive files | |
| elif ext == '.zip': | |
| archive_result, count = process_zip_archive(content, safe_name) | |
| result = archive_result | |
| elif ext in ['.tar', '.tar.gz', '.tgz', '.tar.bz2', '.tar.xz']: | |
| result = extract_tar_gz(content, safe_name) | |
| # Structured data | |
| elif ext == '.json': | |
| result = extract_json(content, safe_name) | |
| elif ext == '.xml': | |
| result = extract_xml(content, safe_name) | |
| elif ext == '.csv': | |
| result = extract_csv(content, safe_name) | |
| # Image files with OCR | |
| elif ext in ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp', '.tiff', '.tif']: | |
| result = extract_image_ocr(content, safe_name) | |
| # Code and text files | |
| elif ext in [ | |
| '.py', '.js', '.ts', '.tsx', '.jsx', '.vue', '.svelte', | |
| '.java', '.kt', '.scala', '.clj', '.cljs', '.cljc', | |
| '.c', '.cpp', '.h', '.hpp', '.cs', '.fs', '.vb', | |
| '.go', '.rs', '.swift', '.dart', '.php', '.rb', '.pl', | |
| '.lua', '.r', '.scm', '.hs', '.elm', '.ex', '.exs', | |
| '.html', '.htm', '.xhtml', '.css', '.scss', '.sass', '.less', | |
| '.yaml', '.yml', '.toml', '.ini', '.env', '.cfg', | |
| '.svg', '.sql', '.sh', '.bash', '.zsh', '.fish', | |
| '.ps1', '.bat', '.cmd', '.md', '.markdown', '.rst', | |
| '.txt', '.log', '.tsv' | |
| ]: | |
| result = decode_text_safe(content, safe_name) | |
| # Binary files | |
| elif ext in ['.exe', '.dll', '.so', '.dylib', '.bin', '.dat']: | |
| result = f"\n[BINARY FILE: {safe_name}]\nSize: {content_size} bytes\n[Binary content not extractable]\n" | |
| # Audio/Video files | |
| elif ext in ['.mp3', '.mp4', '.avi', '.mov', '.wav', '.flac', '.mkv', '.webm']: | |
| result = f"\n[MEDIA FILE: {safe_name}]\nSize: {content_size} bytes\n[Media content not extractable]\n" | |
| # Database files | |
| elif ext in ['.db', '.sqlite', '.sqlite3', '.mdb', '.accdb']: | |
| result = f"\n[DATABASE FILE: {safe_name}]\n[Database content not extractable for security reasons]\n" | |
| # Unknown file type | |
| else: | |
| file_type = detect_file_type(content, safe_name) | |
| if not is_binary_file(content): | |
| result = decode_text_safe(content, safe_name) | |
| else: | |
| result = f"\n[UNKNOWN FILE TYPE: {safe_name}]\nType: {file_type}\nSize: {content_size} bytes\n[Binary content not extractable]\n" | |
| elapsed = time.time() - start_time | |
| metrics["files_processed"] += 1 | |
| metrics["total_bytes"] += content_size | |
| logger.info(f"Extracted {safe_name} ({content_size} bytes) in {elapsed:.2f}s") | |
| return result | |
| except Exception as e: | |
| error_msg = f"Error processing {safe_name}: {str(e)}" | |
| logger.error(error_msg) | |
| metrics["errors"].append(error_msg) | |
| return f"\n[FATAL ERROR processing {safe_name}: {str(e)}]\n" | |
| async def process_file_async(file: UploadFile) -> str: | |
| """Process a single file asynchronously.""" | |
| loop = asyncio.get_event_loop() | |
| try: | |
| content = await file.read() | |
| safe_name = sanitize_filename(file.filename) | |
| if len(content) > (Config.MAX_FILE_SIZE_MB * 1024 * 1024): | |
| return f"\n[ERROR: {safe_name} exceeds {Config.MAX_FILE_SIZE_MB}MB limit]\n" | |
| result = await loop.run_in_executor(executor, process_file_bytes, safe_name, content) | |
| return result | |
| except Exception as e: | |
| error_msg = f"Async processing error for {file.filename}: {str(e)}" | |
| logger.error(error_msg) | |
| metrics["errors"].append(error_msg) | |
| return f"\n[ERROR processing {file.filename}: {str(e)}]\n" | |
| # ==================== API ENDPOINTS ==================== | |
| async def ingest_files(files: List[UploadFile] = File(...)): | |
| """Universal file ingestion endpoint with async processing.""" | |
| if not files: | |
| raise HTTPException(status_code=400, detail="No files provided") | |
| start_time = time.time() | |
| logger.info(f"Processing batch of {len(files)} files") | |
| tasks = [process_file_async(file) for file in files] | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| combined_result = "" | |
| files_processed = 0 | |
| errors = [] | |
| total_size = 0 | |
| for i, result in enumerate(results): | |
| if isinstance(result, Exception): | |
| error_msg = f"Error processing {files[i].filename}: {str(result)}" | |
| logger.error(error_msg) | |
| errors.append(error_msg) | |
| combined_result += f"\n[ERROR: {error_msg}]\n" | |
| else: | |
| combined_result += result | |
| files_processed += 1 | |
| try: | |
| if hasattr(files[i], 'size'): | |
| total_size += files[i].size | |
| except: | |
| pass | |
| elapsed = time.time() - start_time | |
| logger.info(f"Batch processed in {elapsed:.2f}s - {files_processed} files, {total_size} bytes") | |
| return { | |
| "status": "success", | |
| "extracted_text": combined_result, | |
| "files_processed": files_processed, | |
| "total_files": len(files), | |
| "processing_time": elapsed, | |
| "total_size_bytes": total_size, | |
| "errors": errors if errors else [] | |
| } | |
| import re # Ensure this is imported at the top of app.py | |
| async def interact_with_files( | |
| files: List[UploadFile] = File(...), | |
| x_user_id: str = Header(..., alias="X-User-ID"), | |
| x_chat_id: str = Header(..., alias="X-Chat-ID") | |
| ): | |
| """ | |
| Process files and store them in vector DB with user session isolation. | |
| INCLUDES FIX: Strips metadata headers before DB storage to prevent AST Parser crashes. | |
| """ | |
| if not files: | |
| raise HTTPException(status_code=400, detail="No files provided") | |
| start_time = time.time() | |
| logger.info(f"📤 Processing {len(files)} files for user {x_user_id[:8]}...") | |
| # 1. Extract text from files (Async processing) | |
| tasks = [process_file_async(file) for file in files] | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| combined_result = "" | |
| files_processed = 0 | |
| storage_errors = [] | |
| # Regex to strip the "Wrapper" headers (e.g., --- TEXT FILE: app.py ---) | |
| # Matches: Header -> Metadata Block -> Double Newline -> CONTENT -> Double Newline -> Footer | |
| wrapper_pattern = r"(?s)(?:---|===)\s+.*?(?:FILE|DOCUMENT).*?[-=]+\n.*?\n\n(.*?)\n\n(?:---|===) END" | |
| # 2. Process each file and store in vector DB | |
| for i, result in enumerate(results): | |
| if isinstance(result, Exception): | |
| error_msg = f"Error processing {files[i].filename}: {str(result)}" | |
| logger.error(error_msg) | |
| combined_result += f"\n[ERROR: {error_msg}]\n" | |
| continue | |
| # Add to combined result (Keep headers for the User UI!) | |
| combined_result += result | |
| files_processed += 1 | |
| # 3. Prepare Clean Content for Vector DB | |
| filename = files[i].filename | |
| clean_text_for_db = result | |
| # Attempt to unwrap the content so the AST parser works | |
| match = re.search(wrapper_pattern, result) | |
| if match: | |
| # Found the "meat" of the file, use that | |
| clean_text_for_db = match.group(1) | |
| else: | |
| # Fallback: If regex misses (e.g. short file), use original but trim whitespace | |
| clean_text_for_db = result.strip() | |
| try: | |
| # Get vector DB instance | |
| from vector import vdb | |
| # 4. SYNC storage in vector DB using CLEAN TEXT | |
| # We pass the pure code (clean_text_for_db) but the real filename | |
| # This allows V3 to parse classes/functions correctly while linking them to the source file. | |
| storage_success = vdb.store_session_document( | |
| text=clean_text_for_db, | |
| filename=filename, | |
| user_id=x_user_id, | |
| chat_id=x_chat_id | |
| ) | |
| if not storage_success: | |
| error_msg = f"Vector storage failed for {filename}" | |
| logger.error(error_msg) | |
| storage_errors.append(error_msg) | |
| combined_result += f"\n[WARNING: Vector storage failed for {filename}]\n" | |
| else: | |
| logger.info(f"✅ Vector storage successful for {filename}") | |
| except Exception as e: | |
| error_msg = f"Vector DB error for {filename}: {str(e)}" | |
| logger.error(error_msg) | |
| storage_errors.append(error_msg) | |
| combined_result += f"\n[WARNING: {error_msg}]\n" | |
| elapsed = time.time() - start_time | |
| # 5. Return response | |
| response_data = { | |
| "status": "success", | |
| "extracted_text": combined_result, | |
| "files_processed": files_processed, | |
| "total_files": len(files), | |
| "processing_time": round(elapsed, 2), | |
| "vector_status": "stored_synchronously", | |
| "session_id": x_user_id, | |
| "storage_errors": storage_errors if storage_errors else [] | |
| } | |
| logger.info(f"✅ Interaction completed in {elapsed:.2f}s for user {x_user_id[:8]}") | |
| return response_data | |
| # Add debug endpoints for monitoring | |
| async def debug_vector_status(x_user_id: str = Header(..., alias="X-User-ID")): | |
| """Debug endpoint to check vector DB status""" | |
| from vector import vdb | |
| stats = vdb.get_user_stats(x_user_id) | |
| return { | |
| "user_id": x_user_id, | |
| "stats": stats, | |
| "index_status": { | |
| "total_vectors": vdb.index.ntotal, | |
| "total_metadata": len(vdb.metadata), | |
| "index_type": vdb.index.__class__.__name__ | |
| } | |
| } | |
| async def cleanup_vector_db( | |
| max_age_hours: int = 24, | |
| x_user_id: str = Header(..., alias="X-User-ID") | |
| ): | |
| """Clean up old session data""" | |
| from vector import vdb | |
| try: | |
| cleaned = vdb.cleanup_old_sessions(max_age_hours) | |
| return { | |
| "status": "success", | |
| "cleaned_vectors": cleaned, | |
| "max_age_hours": max_age_hours, | |
| "user_id": x_user_id | |
| } | |
| except Exception as e: | |
| logger.error(f"Cleanup failed: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def delete_specific_session( | |
| x_user_id: str = Header(..., alias="X-User-ID"), | |
| x_chat_id: str = Header(..., alias="X-Chat-ID") | |
| ): | |
| """Triggered when user clicks 'Delete Chat' in UI""" | |
| from vector import vdb | |
| # Run in thread to not block other users while rebuilding index | |
| success = await asyncio.to_thread(vdb.delete_session, x_user_id, x_chat_id) | |
| if success: | |
| return {"status": "deleted", "chat_id": x_chat_id} | |
| else: | |
| return {"status": "not_found", "message": "Session was already empty"} | |
| async def search_vector_db( | |
| payload: SearchRequest, | |
| x_user_id: str = Header(..., alias="X-User-ID"), | |
| x_chat_id: str = Header(..., alias="X-Chat-ID") | |
| ): | |
| """ | |
| Search within user's session data with proper JSON serialization. | |
| """ | |
| from vector import vdb | |
| logger.info(f"🔍 Search request from user {x_user_id[:8]}: '{payload.query[:50]}...'") | |
| try: | |
| results = vdb.retrieve_session_context( | |
| query=payload.query, | |
| user_id=x_user_id, | |
| chat_id=x_chat_id, | |
| filter_type=payload.target, | |
| top_k=50, | |
| final_k=3 | |
| ) | |
| logger.info(f"✅ Search completed: {len(results)} results for user {x_user_id[:8]}") | |
| # MANUALLY serialize to handle numpy types | |
| def serialize(obj): | |
| if isinstance(obj, (np.integer, np.floating)): | |
| return float(obj) | |
| elif isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| elif isinstance(obj, dict): | |
| return {k: serialize(v) for k, v in obj.items()} | |
| elif isinstance(obj, list): | |
| return [serialize(item) for item in obj] | |
| return obj | |
| serialized_results = serialize(results) | |
| # Use JSONResponse with custom encoder | |
| return JSONResponse( | |
| content={"results": serialized_results}, | |
| media_type="application/json" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Search failed: {e}") | |
| raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}") | |
| async def sync_chat_history( | |
| background_tasks: BackgroundTasks, | |
| messages: List[Dict] = Body(...), | |
| x_user_id: str = Header(..., alias="X-User-ID"), # <--- 1. Catch the ID | |
| x_chat_id: str = Header(..., alias="X-Chat-ID") | |
| ): | |
| """ | |
| Syncs chat history for the specific user session. | |
| """ | |
| if not messages: | |
| return {"status": "ignored", "reason": "empty"} | |
| # Trigger Secure Storage | |
| background_tasks.add_task( | |
| vdb.store_chat_context, # <--- Renamed Function | |
| messages=messages, | |
| user_id=x_user_id, # <--- Pass the ID | |
| chat_id=x_chat_id, | |
| ) | |
| return {"status": "syncing_started"} | |
| async def ingest_single_file(file: UploadFile = File(...)): | |
| """Process a single file endpoint.""" | |
| start_time = time.time() | |
| result = await process_file_async(file) | |
| elapsed = time.time() - start_time | |
| logger.info(f"Single file processed in {elapsed:.2f}s") | |
| return { | |
| "status": "success", | |
| "extracted_text": result, | |
| "filename": file.filename, | |
| "processing_time": elapsed, | |
| "file_size": file.size | |
| } | |
| async def health_check(): | |
| """Comprehensive health check endpoint.""" | |
| return { | |
| "status": "active", | |
| "version": "1.0.0", | |
| "engine": "High-Performance Production Extractor", | |
| "config": { | |
| "max_file_size_mb": Config.MAX_FILE_SIZE_MB, | |
| "max_zip_depth": Config.MAX_ZIP_DEPTH, | |
| "max_files_in_zip": Config.MAX_FILES_IN_ZIP, | |
| "worker_threads": Config.WORKER_THREADS, | |
| "enable_ocr": Config.ENABLE_OCR | |
| }, | |
| "metrics": { | |
| "files_processed": metrics["files_processed"], | |
| "total_bytes_processed": metrics["total_bytes"], | |
| "error_count": len(metrics["errors"]) | |
| }, | |
| "supported_types": [ | |
| "Documents: .pdf, .docx, .pptx, .xlsx, .xls", | |
| "Code: 20+ programming languages", | |
| "Archives: .zip, .tar, .tar.gz, .tar.bz2", | |
| "Data: .json, .xml, .csv, .tsv", | |
| "Text: .txt, .md, .log, .ini, .yaml", | |
| "Images: .png, .jpg, .jpeg, .tiff (OCR)" | |
| ] | |
| } | |
| async def get_metrics(): | |
| """Get detailed performance metrics.""" | |
| avg_bytes = metrics["total_bytes"] / max(1, metrics["files_processed"]) if metrics["files_processed"] > 0 else 0 | |
| return { | |
| "status": "ok", | |
| "metrics": { | |
| **metrics, | |
| "average_bytes_per_file": round(avg_bytes, 2), | |
| "uptime_seconds": metrics["processing_time"], | |
| "latest_errors": metrics["errors"][-10:] if len(metrics["errors"]) > 10 else metrics["errors"] | |
| } | |
| } | |
| async def get_config(): | |
| """Get current configuration.""" | |
| return { | |
| "status": "ok", | |
| "config": { | |
| "MAX_FILE_SIZE_MB": Config.MAX_FILE_SIZE_MB, | |
| "MAX_ZIP_DEPTH": Config.MAX_ZIP_DEPTH, | |
| "MAX_FILES_IN_ZIP": Config.MAX_FILES_IN_ZIP, | |
| "WORKER_THREADS": Config.WORKER_THREADS, | |
| "TIMEOUT_SECONDS": Config.TIMEOUT_SECONDS, | |
| "ENABLE_OCR": Config.ENABLE_OCR, | |
| "TEXTRACT_TIMEOUT": Config.TEXTRACT_TIMEOUT, | |
| "MAX_PDF_PAGES": Config.MAX_PDF_PAGES, | |
| "TESSERACT_LANGUAGE": Config.OCR_LANGUAGE, | |
| "MAX_IMAGE_PIXELS": Config.MAX_IMAGE_PIXELS | |
| } | |
| } | |
| async def root(): | |
| """Root endpoint with API information.""" | |
| return { | |
| "service": "NeuralStream Production Extractor", | |
| "version": "1.0.0", | |
| "endpoints": { | |
| "ingest": "/api/ingest (POST)", | |
| "single": "/api/single (POST)", | |
| "health": "/health (GET)", | |
| "metrics": "/metrics (GET)", | |
| "config": "/config (GET)", | |
| "docs": "/docs (GET)" | |
| } | |
| } | |
| # ==================== MAIN ==================== | |
| if __name__ == "__main__": | |
| import sys | |
| port = int(os.getenv("PORT", 7860)) | |
| workers = int(os.getenv("WORKERS", 1)) | |
| host = os.getenv("HOST", "0.0.0.0") | |
| logger.info(f"Starting NeuralStream Production Extractor on {host}:{port}") | |
| logger.info(f"Worker processes: {workers}") | |
| logger.info(f"File size limit: {Config.MAX_FILE_SIZE_MB}MB") | |
| logger.info(f"ZIP processing depth: {Config.MAX_ZIP_DEPTH}") | |
| logger.info(f"OCR Enabled: {Config.ENABLE_OCR}") | |
| logger.info(f"OCR Language: {Config.OCR_LANGUAGE}") | |
| logger.info(f"Supported file types: 50+ formats") | |
| if '--dev' in sys.argv: | |
| uvicorn.run("app:app", host="127.0.0.1", port=port, reload=True) | |
| else: | |
| uvicorn.run( | |
| "app:app", | |
| host=host, | |
| port=port, | |
| workers=workers, | |
| log_level="info", | |
| access_log=True, | |
| loop="asyncio" | |
| ) | |