Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| import os | |
| import random | |
| import time | |
| import gradio as gr | |
| import pandas as pd | |
| from selenium import webdriver | |
| from selenium.common.exceptions import WebDriverException | |
| from PIL import Image | |
| from io import BytesIO | |
| import base64 | |
| from datetime import datetime | |
| from pathlib import Path | |
| from uuid import uuid4 | |
| import trafilatura | |
| from datasets import load_dataset | |
| from datasets import Features, Value, Sequence | |
| from huggingface_hub import CommitScheduler | |
| from huggingface_hub import whoami | |
| from languages import ISO_CODE_TO_LANGUAGE_NAME | |
| from texts import ABOUT_TEXT | |
| DISABLE_FETCH_URL = os.environ.get("DISABLE_FETCH_URL", False) | |
| if DISABLE_FETCH_URL: | |
| print("Fetch URL is disabled: Only dummy screenshot and text will be returned.") | |
| DATASET_REPO_ID = os.environ.get("DATASET_REPO_ID", "malteos/seed-crawl-urls") | |
| JSON_DATASET_DIR = Path("jsonl_dataset") | |
| JSON_DATASET_DIR.mkdir(parents=True, exist_ok=True) | |
| # Each instance of this space will spawn a unique file for each type of result | |
| # For the life of that space, it will append to that file pushed to a dataset every so often | |
| # It also is append_only, so no previous data will be overwritten | |
| JSON_DATASET_PATH = JSON_DATASET_DIR / f"urls-{uuid4()}.jsonl" | |
| if os.getenv("HF_TOKEN"): | |
| scheduler = CommitScheduler( | |
| repo_id=DATASET_REPO_ID, | |
| repo_type="dataset", | |
| folder_path=JSON_DATASET_DIR, | |
| path_in_repo="data", | |
| ) | |
| else: | |
| scheduler = None | |
| print("No HF_TOKEN found, results will not be uploaded to the hub.") | |
| def save_to_jsonl(obj: dict) -> None: | |
| if scheduler: | |
| with scheduler.lock: | |
| with JSON_DATASET_PATH.open("a") as f: | |
| json.dump(obj, f) | |
| f.write("\n") | |
| def get_candidate_urls(): | |
| return [ | |
| "http://example.com", | |
| "https://wikipedia.org/", | |
| "https://occiglot.eu", | |
| "https://ostendorff.org", | |
| "https://fr.wikipedia.org/", | |
| "https://amazon.com/" | |
| ] | |
| def pil_image_to_base64(image): | |
| # Save the image to a BytesIO buffer | |
| buffer = BytesIO() | |
| image.save(buffer, format="PNG") # You can change the format if needed | |
| buffer.seek(0) | |
| # Encode the bytes into a base64 string | |
| img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8") | |
| # Format the base64 string for use in an HTML image tag | |
| html_img_tag_src = f"data:image/png;base64,{img_base64}" | |
| return html_img_tag_src | |
| def fetch_screenshot_and_text_from_url(url): | |
| screen_width = 1080 | |
| height = 350 | |
| text = "" | |
| if DISABLE_FETCH_URL: | |
| screenshot = Image.new('RGB', (350, height)) | |
| text = f"Some dummy text for {url} (offline mode enabled)" | |
| else: | |
| options = webdriver.ChromeOptions() | |
| options.add_argument('--headless') | |
| options.add_argument('--no-sandbox') | |
| options.add_argument('--disable-dev-shm-usage') | |
| try: | |
| driver = webdriver.Chrome(options=options) | |
| #driver.set_window_size(1080, 720) # Adjust the window size here | |
| driver.get(url) | |
| driver.implicitly_wait(10) | |
| # Wait for the page to fully load; you may adjust the sleep time or implement a wait condition | |
| # time.sleep(2) | |
| # fetch html from web page | |
| html_str = driver.page_source | |
| # Execute JS to find the full height of the rendered page | |
| scroll_height = driver.execute_script("return document.body.scrollHeight") | |
| # Resize the window to full page height | |
| driver.set_window_size(screen_width, max(scroll_height + 200, 900)) | |
| raw_screenshot = driver.get_screenshot_as_png() | |
| screenshot = Image.open(BytesIO(raw_screenshot)) | |
| # extract text | |
| text = trafilatura.extract(html_str) | |
| except WebDriverException as e: | |
| screenshot = Image.new('RGB', (1, 1)) | |
| finally: | |
| if driver: | |
| driver.quit() | |
| # embed base65 encoded image as <img> tag into html string | |
| screenshot_html_str = f"""<div style="width: 100%; height: {height}px; overflow-y: scroll;"><img src="{pil_image_to_base64(screenshot)}" /></div>""" | |
| # return gr.update(value=html_str, visible=True), text, gr.update(visible=True) | |
| return screenshot_html_str, text | |
| with gr.Blocks(fill_height=True) as demo: | |
| gr.Markdown( | |
| """ | |
| # Seed Crawl Annotator | |
| """) | |
| with gr.Tab("Contribute"): | |
| gr.Markdown("Welcome! This is a crowd-sourced effort to improve crawling of low-resource languages. Your contributions will be part of a public dataset.") | |
| profile_state = gr.State([]) | |
| gr.LoginButton() | |
| with gr.Column(visible=False) as wrapper_col: | |
| login_status = gr.Markdown("no", visible=False) | |
| def handle_login(profile: gr.OAuthProfile | None) -> dict: | |
| if profile: | |
| gr.Info(f"Logged in as {profile.username}") | |
| return { | |
| profile_state: f"{profile.username}", | |
| wrapper_col: gr.update(visible=True), | |
| login_status: "yes", | |
| } | |
| else: | |
| gr.Warning(f"You need to login to use this app.") | |
| return { | |
| profile_state: [], | |
| wrapper_col: gr.update(visible=False), | |
| login_status: "no", | |
| } | |
| demo.load(handle_login, inputs=None, outputs=[profile_state, wrapper_col, login_status]) | |
| url_field = gr.Textbox(label="Website URL", placeholder="Enter a URL you want to annotate", interactive=True) | |
| with gr.Row(): | |
| set_random_btn = gr.Button("Pick Random URL", variant="secondary", interactive=True) | |
| load_btn = gr.Button("Annotate URL", variant="primary", interactive=True) | |
| with gr.Row(): | |
| extracted_text = gr.Textbox( | |
| label="Extracted text", | |
| max_lines=15, | |
| lines=15, | |
| visible=True, | |
| placeholder="Click on `Load URL` to fetch Web page's text content." | |
| ) | |
| screenshot_scrollable = gr.HTML("", visible=False) | |
| with gr.Column(visible=False) as output_col: | |
| with gr.Row(): | |
| language_codes = gr.Dropdown( | |
| [("unknown", "unknown")] + [(f"{code}: {name}", code) for code, name in ISO_CODE_TO_LANGUAGE_NAME.items()], | |
| label="Language codes", | |
| multiselect=True, | |
| # allow_custom_value=True, | |
| ) | |
| categories = gr.CheckboxGroup(["News", "Culture/History", "Government", "Political Parties", "Other"], label="Categories") | |
| with gr.Row(): | |
| do_crawl_btn = gr.Button("✅ Do Crawl", elem_classes="success") | |
| dont_crawl_btn = gr.Button("❌ Don't Crawl", elem_classes="error") | |
| # random_subpage_btn = gr.Button("🔁 Load Another Subpage", variant="secondary") | |
| def set_random_url(): | |
| candidate_urls = get_candidate_urls() | |
| selected_url = random.choice(candidate_urls) | |
| return selected_url | |
| set_random_btn.click(fn=set_random_url, outputs=url_field) | |
| def load_url(url): | |
| screenshot_html_str, text = fetch_screenshot_and_text_from_url(url) | |
| if not screenshot_html_str or not text: | |
| gr.Error("Could not fetch data for url") | |
| else: | |
| return { | |
| screenshot_scrollable: gr.update(value=screenshot_html_str, visible=True), | |
| extracted_text: gr.update(value=text, visible=True), | |
| output_col: gr.update(visible=True), | |
| language_codes: "unknown", # Reset by set to invalid value # gr.update(None, label=url), | |
| categories: gr.update(value=None), | |
| } | |
| load_btn.click(fn=load_url, inputs=url_field, outputs=[screenshot_scrollable, extracted_text, output_col, language_codes, categories], api_name="load_url") | |
| def do_crawl_error_handler(msg): | |
| # error response | |
| print("error -> no changes") | |
| gr.Warning(f"❌ Error: {msg}") | |
| return { | |
| url_field: gr.update(), | |
| output_col: gr.update(), | |
| extracted_text: gr.update(), | |
| screenshot_scrollable: gr.update(), | |
| } | |
| def do_crawl(profile_state, url, language_codes, categories, do_crawl=True): | |
| print(f"{url=}") | |
| print(f"{language_codes=}") | |
| print(f"{categories=}") | |
| print(f"{do_crawl=}") | |
| if not profile_state: | |
| return do_crawl_error_handler("You are not authenticated.") | |
| elif len(url) <= 0: | |
| return do_crawl_error_handler("URL is empty.") | |
| elif len(categories) <= 0: | |
| return do_crawl_error_handler("You must select at least one category.") | |
| elif len(language_codes) <= 0: | |
| return do_crawl_error_handler("You must select at least one language.") | |
| else: | |
| # | |
| save_to_jsonl({ | |
| "url": url, | |
| "language_codes": language_codes, | |
| "categories": categories, | |
| "do_crawl": int(do_crawl), | |
| "username": profile_state, | |
| "submission_datetime": datetime.now().isoformat(), | |
| }) | |
| # html_str = f"<b>Thanks {profile_state}, we have saved your feedback!</b>" | |
| gr.Info("✅ Thanks for your feedback. Let's continue!") | |
| return { | |
| url_field: "", # TODO fetch new url | |
| output_col: gr.update(visible=False), | |
| extracted_text: gr.update(value=None, visible=True), | |
| screenshot_scrollable: gr.update(value="", visible=False), | |
| } | |
| # def do_crawl(profile_state, url, language_codes, categories): | |
| # return do_crawl_or_not(profile_state, url, language_codes, categories, do_crawl=True) | |
| # def dont_crawl(profile_state, url, language_codes, categories): | |
| # return do_crawl_or_not(profile_state, url, language_codes, categories, do_crawl=False) | |
| do_crawl_btn.click( | |
| fn=do_crawl, | |
| inputs=[profile_state, url_field, language_codes, categories], | |
| outputs=[ | |
| url_field, | |
| output_col, | |
| extracted_text, | |
| screenshot_scrollable | |
| ], | |
| api_name="do_crawl", | |
| ) | |
| dont_crawl_btn.click( | |
| fn=do_crawl, | |
| inputs=[profile_state, url_field, language_codes, categories], | |
| outputs=[ | |
| url_field, | |
| output_col, | |
| extracted_text, | |
| screenshot_scrollable | |
| ], | |
| api_name="do_crawl", | |
| ) | |
| # dont_crawl_btn.click(fn=dont_crawl, inputs=[profile_state, url, language_codes, categories], outputs=[url, output_col, extracted_text, screenshot_scrollable], api_name="dont_crawl") | |
| # def random_subpage(url): | |
| # new_url = "http://example.com" | |
| # return [new_url, *fetch_screenshot_and_text_from_url(new_url)] | |
| # random_subpage_btn.click(fn=random_subpage, inputs=url, outputs=[url, screenshot_scrollable, extracted_text, output_col], api_name="load_random_subpage") | |
| with gr.Tab("Browse Contributions"): | |
| gr.Markdown("This page lists all the data we have collected so far. Please note that the list might be out-of-sync.") | |
| """ | |
| dataset_info: | |
| - config_name: base | |
| features: | |
| - name: url | |
| dtype: string | |
| - name: language_codes | |
| list: string | |
| - name: categories | |
| list: string | |
| - name: do_crawl | |
| dtype: int32 | |
| - name: username | |
| dtype: string | |
| - name: submission_datetime | |
| dtype: string | |
| """ | |
| features = Features({ | |
| "url": Value("string"), | |
| "language_codes": Sequence(Value(dtype="string")), | |
| "categories": Sequence(Value(dtype="string")), | |
| "do_crawl": Value("int32"), | |
| "username": Value("string"), | |
| "submission_datetime": Value("string"), | |
| }) | |
| try: | |
| ds = load_dataset(DATASET_REPO_ID, data_files={"train": "data/*.jsonl"}, features=features) | |
| df = ds["train"].to_pandas() | |
| gr.Dataframe(df) | |
| except ValueError as e: | |
| print(e) | |
| gr.Markdown("> Error: Dataset cannot be loaded.") | |
| with gr.Tab("About"): | |
| gr.Markdown(ABOUT_TEXT) | |
| if __name__ == "__main__": | |
| demo.launch() |