Spaces:
Sleeping
Sleeping
| import json | |
| import logging | |
| import datasets | |
| import requests | |
| import math | |
| import re | |
| from datasets import load_dataset, get_dataset_config_names, get_dataset_infos | |
| from huggingface_hub import HfApi, DatasetCard, DatasetCardData | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class DatasetCommandCenter: | |
| def __init__(self, token=None): | |
| self.token = token | |
| self.api = HfApi(token=token) | |
| self.username=self.api.whoami()['name'] | |
| print("######################################") | |
| print(self.username) | |
| print("######################################") | |
| # ========================================== | |
| # 1. METADATA & SCHEMA INSPECTION | |
| # ========================================== | |
| def get_dataset_metadata(self, dataset_id): | |
| """ | |
| Fetches Configs and Splits. | |
| """ | |
| configs = ['default'] | |
| splits = ['train', 'test', 'validation'] | |
| license_name = "unknown" | |
| try: | |
| # 1. Fetch Configs | |
| try: | |
| found_configs = get_dataset_config_names(dataset_id, token=self.token) | |
| if found_configs: | |
| configs = found_configs | |
| except Exception: | |
| pass | |
| # 2. Fetch Metadata (Splits & License) | |
| try: | |
| selected = configs[0] | |
| infos = get_dataset_infos(dataset_id, token=self.token) | |
| print(infos) | |
| info = None | |
| if selected in infos: | |
| info = infos[selected] | |
| elif 'default' in infos: | |
| info = infos['default'] | |
| elif infos: | |
| info = list(infos.values())[0] | |
| if info: | |
| splits = list(info.splits.keys()) | |
| license_name = info.license or "unknown" | |
| except Exception: | |
| pass | |
| return { | |
| "status": "success", | |
| "configs": configs, | |
| "splits": splits, | |
| "license_detected": license_name | |
| } | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| def get_splits_for_config(self, dataset_id, config_name): | |
| try: | |
| infos = get_dataset_infos(dataset_id, config_name=config_name, token=self.token) | |
| if config_name in infos: | |
| splits = list(infos[config_name].splits.keys()) | |
| elif len(infos) > 0: | |
| splits = list(infos.values())[0].splits.keys() | |
| else: | |
| splits = ['train', 'test'] | |
| return {"status": "success", "splits": splits} | |
| except: | |
| return {"status": "success", "splits": ['train', 'test', 'validation']} | |
| def _sanitize_for_json(self, obj): | |
| """ | |
| Recursively cleans data for JSON serialization. | |
| """ | |
| if isinstance(obj, float): | |
| if math.isnan(obj) or math.isinf(obj): | |
| return None | |
| return obj | |
| elif isinstance(obj, dict): | |
| return {k: self._sanitize_for_json(v) for k, v in obj.items()} | |
| elif isinstance(obj, list): | |
| return [self._sanitize_for_json(v) for v in obj] | |
| elif isinstance(obj, (str, int, bool, type(None))): | |
| return obj | |
| else: | |
| return str(obj) | |
| def _flatten_object(self, obj, parent_key='', sep='.'): | |
| """ | |
| Recursively finds keys for the UI dropdowns. | |
| """ | |
| items = {} | |
| # Transparently parse JSON strings | |
| if isinstance(obj, str): | |
| s = obj.strip() | |
| if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')): | |
| try: | |
| obj = json.loads(s) | |
| except: | |
| pass | |
| if isinstance(obj, dict): | |
| for k, v in obj.items(): | |
| new_key = f"{parent_key}{sep}{k}" if parent_key else k | |
| items.update(self._flatten_object(v, new_key, sep=sep)) | |
| elif isinstance(obj, list): | |
| new_key = f"{parent_key}" if parent_key else "list_content" | |
| items[new_key] = "List" | |
| else: | |
| items[parent_key] = type(obj).__name__ | |
| return items | |
| def inspect_dataset(self, dataset_id, config, split): | |
| try: | |
| conf = config if config != 'default' else None | |
| ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token) | |
| sample_rows = [] | |
| available_paths = set() | |
| schema_map = {} | |
| for i, row in enumerate(ds_stream): | |
| if i >= 10: break | |
| # CRITICAL FIX: Force Materialization | |
| row = dict(row) | |
| # Clean row for UI | |
| clean_row = self._sanitize_for_json(row) | |
| sample_rows.append(clean_row) | |
| # Schema Discovery | |
| flattened = self._flatten_object(row) | |
| available_paths.update(flattened.keys()) | |
| # List Mode Detection | |
| for k, v in row.items(): | |
| if k not in schema_map: | |
| schema_map[k] = {"type": "Object"} | |
| val = v | |
| if isinstance(val, str): | |
| try: val = json.loads(val) | |
| except: pass | |
| if isinstance(val, list): | |
| schema_map[k]["type"] = "List" | |
| sorted_paths = sorted(list(available_paths)) | |
| schema_tree = {} | |
| for path in sorted_paths: | |
| root = path.split('.')[0] | |
| if root not in schema_tree: | |
| schema_tree[root] = [] | |
| schema_tree[root].append(path) | |
| return { | |
| "status": "success", | |
| "samples": sample_rows, | |
| "schema_tree": schema_tree, | |
| "schema": schema_map, | |
| "dataset_id": dataset_id | |
| } | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| # ========================================== | |
| # 2. CORE EXTRACTION LOGIC | |
| # ========================================== | |
| def _get_value_by_path(self, obj, path): | |
| """ | |
| Retrieves value. PRIORITY: Direct Key Access (Fastest). | |
| """ | |
| if not path: | |
| return obj | |
| # Handle None/empty path edge cases | |
| if path is None or path == '': | |
| return obj | |
| # 1. Try Direct Access First (handles simple column names) | |
| # This works for dict, UserDict, LazyRow due to duck-typing | |
| try: | |
| # For simple paths (no dots), this is all we need | |
| if '.' not in path: | |
| return obj[path] | |
| except (KeyError, TypeError, AttributeError): | |
| pass | |
| # 2. If direct access failed OR path contains dots, try dot notation | |
| keys = path.split('.') | |
| current = obj | |
| for i, key in enumerate(keys): | |
| if current is None: | |
| return None | |
| try: | |
| # Array/List index access support (e.g. solutions.0.code) | |
| if isinstance(current, list) and key.isdigit(): | |
| current = current[int(key)] | |
| else: | |
| # Try dictionary-style access | |
| current = current[key] | |
| except (KeyError, TypeError, IndexError, AttributeError): | |
| return None | |
| # Lazy Parsing: Only parse JSON string if we need to go deeper | |
| is_last_key = (i == len(keys) - 1) | |
| if not is_last_key and isinstance(current, str): | |
| s = current.strip() | |
| if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')): | |
| try: | |
| current = json.loads(s) | |
| except: | |
| return None | |
| return current | |
| def _extract_from_list_logic(self, row, source_col, filter_key, filter_val, target_path): | |
| """ | |
| FROM source_col FIND ITEM WHERE filter_key == filter_val EXTRACT target_path | |
| """ | |
| data = row.get(source_col) | |
| if isinstance(data, str): | |
| try: | |
| data = json.loads(data) | |
| except: | |
| return None | |
| if not isinstance(data, list): | |
| return None | |
| matched_item = None | |
| for item in data: | |
| # String comparison for safety | |
| if str(item.get(filter_key, '')) == str(filter_val): | |
| matched_item = item | |
| break | |
| if matched_item: | |
| return self._get_value_by_path(matched_item, target_path) | |
| return None | |
| def _apply_projection(self, row, recipe): | |
| new_row = {} | |
| # OPTIMIZATION: Only create eval_context if we actually have a Python column. | |
| # This prevents expensive row.copy() calls for Simple Path operations. | |
| eval_context = None | |
| for col_def in recipe['columns']: | |
| t_type = col_def.get('type', 'simple') | |
| target_col = col_def['name'] | |
| try: | |
| if t_type == 'simple': | |
| # Fast path - no context needed | |
| new_row[target_col] = self._get_value_by_path(row, col_def['source']) | |
| elif t_type == 'list_search': | |
| # Fast path - no context needed | |
| new_row[target_col] = self._extract_from_list_logic( | |
| row, | |
| col_def['source'], | |
| col_def['filter_key'], | |
| col_def['filter_val'], | |
| col_def['target_key'] | |
| ) | |
| elif t_type == 'python': | |
| if eval_context is None: | |
| eval_context = row.copy() | |
| eval_context['row'] = row | |
| eval_context['json'] = json | |
| eval_context['re'] = re | |
| eval_context['requests'] = requests | |
| # This evaluates the ENTIRE expression as Python | |
| val = eval(col_def['expression'], {}, eval_context) | |
| new_row[target_col] = val | |
| elif t_type == 'requests': | |
| print(t_type) | |
| # Lazy Context Creation: Only pay the cost if used | |
| eval_context = row.copy() | |
| eval_context['row'] = row | |
| #val = eval(col_def['rpay'], {}, eval_context) | |
| print(col_def['rpay']) | |
| val = json.loads(col_def['rpay']) | |
| print(val) | |
| new_row[target_col] = requests.post(col_def['rurl'], json=val).text | |
| except Exception as e: | |
| raise ValueError(f"Column '{target_col}' failed: {str(e)}") | |
| return new_row | |
| # ========================================== | |
| # 3. DOCUMENTATION (MODEL CARD) | |
| # ========================================== | |
| def _generate_card(self, source_id, target_id, recipe, license_name): | |
| print(source_id) | |
| print(target_id) | |
| card_data = DatasetCardData( | |
| language="en", | |
| license=license_name, | |
| tags=["dataset-command-center", "etl", "generated-dataset"], | |
| base_model=source_id, | |
| ) | |
| content = f""" | |
| # {target_id.split('/')[-1]} | |
| This dataset is a transformation of [{source_id}](https://huggingface.co/datasets/{source_id}). | |
| It was generated using the **Hugging Face Dataset Command Center**. | |
| ## Transformation Recipe | |
| The following operations were applied to the source data: | |
| | Target Column | Operation Type | Source / Logic | | |
| |---------------|----------------|----------------| | |
| """ | |
| for col in recipe['columns']: | |
| c_type = col.get('type', 'simple') | |
| c_name = col['name'] | |
| c_src = col.get('source', '-') | |
| logic = "-" | |
| if c_type == 'simple': | |
| logic = f"Mapped from `{c_src}`" | |
| elif c_type == 'list_search': | |
| logic = f"Get `{col['target_key']}` where `{col['filter_key']} == {col['filter_val']}`" | |
| elif c_type == 'python': | |
| logic = f"Python: `{col.get('expression')}`" | |
| content += f"| **{c_name}** | {c_type} | {logic} |\n" | |
| if recipe.get('filter_rule'): | |
| content += f"\n### Row Filtering\n**Filter Applied:** `{recipe['filter_rule']}`\n" | |
| content += f"\n## Original License\nThis dataset inherits the license: `{license_name}` from the source." | |
| card = DatasetCard.from_template(card_data, content=content) | |
| return card | |
| # ========================================== | |
| # 4. EXECUTION | |
| # ========================================== | |
| def process_and_push(self, source_id, config, split, target_id, recipe, max_rows=None, new_license=None): | |
| logger.info(f"Job started: {source_id} -> {target_id}") | |
| conf = config if config != 'default' else None | |
| def gen(): | |
| ds_stream = load_dataset(source_id, name=conf, split=split, streaming=True, token=self.token) | |
| count = 0 | |
| for i, row in enumerate(ds_stream): | |
| if max_rows and count >= int(max_rows): | |
| break | |
| # CRITICAL FIX: Force Materialization | |
| row = dict(row) | |
| # 1. Filter | |
| if recipe.get('filter_rule'): | |
| try: | |
| ctx = row.copy() | |
| ctx['row'] = row | |
| ctx['json'] = json | |
| ctx['re'] = re | |
| ctx['requests'] = requests | |
| if not eval(recipe['filter_rule'], {}, ctx): | |
| continue | |
| except Exception as e: | |
| raise ValueError(f"Filter crashed on row {i}: {e}") | |
| # 2. Projection | |
| try: | |
| yield self._apply_projection(row, recipe) | |
| count += 1 | |
| except ValueError as ve: | |
| raise ve | |
| except Exception as e: | |
| raise ValueError(f"Unexpected crash on row {i}: {e}") | |
| try: | |
| # 1. Process & Push | |
| new_dataset = datasets.Dataset.from_generator(gen) | |
| new_dataset.push_to_hub(target_id, token=self.token) | |
| # 2. Card | |
| try: | |
| card = self._generate_card(source_id, target_id, recipe, new_license or "unknown") | |
| card.push_to_hub(f'{self.username}/{target_id}', token=self.token) | |
| except Exception as e: | |
| logger.error(f"Failed to push Dataset Card: {e}") | |
| return {"status": "success", "rows_processed": len(new_dataset)} | |
| except Exception as e: | |
| logger.error(f"Job Failed: {e}") | |
| return {"status": "failed", "error": str(e)} | |
| # ========================================== | |
| # 5. PREVIEW | |
| # ========================================== | |
| def preview_transform(self, dataset_id, config, split, recipe): | |
| conf = config if config != 'default' else None | |
| try: | |
| # Load dataset in streaming mode | |
| ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token) | |
| processed = [] | |
| for i, row in enumerate(ds_stream): | |
| # Stop after 5 successful rows | |
| if len(processed) >= 5: | |
| break | |
| # CRITICAL: Force materialization from LazyRow to standard Dict. | |
| # This fixes the interaction between Streaming datasets and JSON serialization. | |
| row = dict(row) | |
| # --- Filter Logic --- | |
| passed = True | |
| if recipe.get('filter_rule'): | |
| try: | |
| # Create context only for the filter check | |
| ctx = row.copy() | |
| ctx['row'] = row | |
| ctx['json'] = json | |
| ctx['re'] = re | |
| if not eval(recipe['filter_rule'], {}, ctx): | |
| passed = False | |
| except: | |
| # If filter errors out (e.g. missing column), treat as filtered out | |
| passed = False | |
| if passed: | |
| try: | |
| # --- Projection Logic --- | |
| new_row = self._apply_projection(row, recipe) | |
| # --- Sanitization --- | |
| # Convert NaNs, Infinity, and complex objects to prevent browser/Flask crash | |
| clean_new_row = self._sanitize_for_json(new_row) | |
| processed.append(clean_new_row) | |
| except Exception as e: | |
| # Capture specific row errors for the UI | |
| processed.append({"_preview_error": f"Row {i} Error: {str(e)}"}) | |
| return processed | |
| except Exception as e: | |
| # Raise global errors (like 404 Dataset Not Found) so the UI sees them | |
| raise e |