Spaces:
Sleeping
Sleeping
| from typing import List, Dict, Any | |
| import re | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class Preprocessor: | |
| def __init__(self): | |
| """Initialize preprocessor without external dependencies""" | |
| pass | |
| def clean_text(self, text: str) -> str: | |
| """Clean and normalize text""" | |
| if not text: | |
| return "" | |
| # Remove extra whitespace | |
| text = text.strip() | |
| text = re.sub(r'\s+', ' ', text) | |
| # Remove special characters but keep punctuation | |
| text = re.sub(r'[^\w\s\.\,\!\?\;\:\-\(\)]', '', text) | |
| return text.strip() | |
| def extract_sentences(self, text: str) -> List[str]: | |
| """Extract sentences from text (simplified version without NLTK)""" | |
| if not text: | |
| return [] | |
| # Simple sentence splitting based on punctuation | |
| sentences = re.split(r'[.!?]+', text) | |
| sentences = [s.strip() for s in sentences if s.strip()] | |
| return sentences | |
| def tokenize(self, text: str) -> List[str]: | |
| """Tokenize text into words (simplified version)""" | |
| if not text: | |
| return [] | |
| # Simple word tokenization | |
| words = re.findall(r'\b\w+\b', text.lower()) | |
| return words | |
| def preprocess_passages(self, passages: List[str]) -> List[Dict[str, Any]]: | |
| """Preprocess a list of passages""" | |
| processed = [] | |
| for i, passage in enumerate(passages): | |
| if not passage: | |
| continue | |
| cleaned = self.clean_text(passage) | |
| sentences = self.extract_sentences(cleaned) | |
| tokens = self.tokenize(cleaned) | |
| processed.append({ | |
| 'id': i, | |
| 'text': cleaned, | |
| 'sentences': sentences, | |
| 'tokens': tokens, | |
| 'length': len(tokens) | |
| }) | |
| return processed | |
| def preprocess_qa_data(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """Preprocess QA data, auto convert dict/list fields to string""" | |
| processed = [] | |
| def to_str(val): | |
| if isinstance(val, dict): | |
| # 拼接所有value | |
| return " ".join([to_str(v) for v in val.values()]) | |
| elif isinstance(val, list): | |
| return " ".join([to_str(v) for v in val]) | |
| elif val is None: | |
| return "" | |
| return str(val) | |
| for item in data: | |
| if not isinstance(item, dict): | |
| continue | |
| question = to_str(item.get('question', '')) | |
| answer = to_str(item.get('answer', '')) | |
| context = to_str(item.get('context', '')) | |
| processed_item = { | |
| 'question': self.clean_text(question), | |
| 'answer': self.clean_text(answer), | |
| 'context': self.clean_text(context), | |
| 'question_tokens': self.tokenize(question), | |
| 'answer_tokens': self.tokenize(answer), | |
| 'context_tokens': self.tokenize(context) | |
| } | |
| processed.append(processed_item) | |
| return processed | |
| def create_chunks(self, text: str, chunk_size: int = 512, overlap: int = 50) -> List[str]: | |
| """Create overlapping text chunks""" | |
| if not text: | |
| return [] | |
| tokens = self.tokenize(text) | |
| chunks = [] | |
| for i in range(0, len(tokens), chunk_size - overlap): | |
| chunk_tokens = tokens[i:i + chunk_size] | |
| chunk_text = ' '.join(chunk_tokens) | |
| chunks.append(chunk_text) | |
| return chunks |