Spaces:
Running
Running
| import gradio as gr | |
| import requests | |
| import json | |
| import os | |
| import time | |
| from collections import defaultdict | |
| from PIL import Image | |
| import io | |
| BASE_URL = "https://api.jigsawstack.com/v1" | |
| headers = { | |
| "x-api-key": os.getenv("JIGSAWSTACK_API_KEY") | |
| } | |
| # Rate limiting configuration | |
| request_times = defaultdict(list) | |
| MAX_REQUESTS = 20 # Maximum requests per time window | |
| TIME_WINDOW = 3600 # Time window in seconds (1 hour) | |
| def get_real_ip(request: gr.Request): | |
| """Extract real IP address using x-forwarded-for header or fallback""" | |
| if not request: | |
| return "unknown" | |
| forwarded = request.headers.get("x-forwarded-for") | |
| if forwarded: | |
| ip = forwarded.split(",")[0].strip() # First IP in the list is the client's | |
| else: | |
| ip = request.client.host # fallback | |
| return ip | |
| def check_rate_limit(request: gr.Request): | |
| """Check if the current request exceeds rate limits""" | |
| if not request: | |
| return True, "Rate limit check failed - no request info" | |
| ip = get_real_ip(request) | |
| now = time.time() | |
| # Clean up old timestamps outside the time window | |
| request_times[ip] = [t for t in request_times[ip] if now - t < TIME_WINDOW] | |
| # Check if rate limit exceeded | |
| if len(request_times[ip]) >= MAX_REQUESTS: | |
| time_remaining = int(TIME_WINDOW - (now - request_times[ip][0])) | |
| time_remaining_minutes = round(time_remaining / 60, 1) | |
| time_window_minutes = round(TIME_WINDOW / 60, 1) | |
| return False, f"Rate limit exceeded. You can make {MAX_REQUESTS} requests per {time_window_minutes} minutes. Try again in {time_remaining_minutes} minutes." | |
| # Add current request timestamp | |
| request_times[ip].append(now) | |
| return True, "" | |
| def enhanced_ai_scrape(input_method, url, html, prompts_str, selector, page_pos, request: gr.Request): | |
| def error_response(message): | |
| return ( | |
| message, | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| ) | |
| # Check rate limit first | |
| rate_limit_ok, rate_limit_msg = check_rate_limit(request) | |
| if not rate_limit_ok: | |
| return error_response(f"Rate limit exceeded: {rate_limit_msg}") | |
| try: | |
| # Validate element prompts | |
| prompts = [p.strip() for p in prompts_str.split(",") if p.strip()] | |
| if not prompts: | |
| return error_response("Error: No element prompts provided.") | |
| if len(prompts) > 5: | |
| return error_response("Error: Maximum 5 element prompts allowed.") | |
| payload = { | |
| "element_prompts": prompts, | |
| "root_element_selector": selector or "main", | |
| "page_position": int(page_pos) if str(page_pos).strip().isdigit() else 1 | |
| } | |
| # Add URL or HTML based on input method | |
| if input_method == "URL": | |
| if not url or not url.strip(): | |
| return error_response("Error: URL is required when using URL input method.") | |
| payload["url"] = url.strip() | |
| elif input_method == "HTML Content": | |
| if not html or not html.strip(): | |
| return error_response("Error: HTML content is required when using HTML input method.") | |
| payload["html"] = html.strip() | |
| response = requests.post(f"{BASE_URL}/ai/scrape", headers=headers, json=payload) | |
| response.raise_for_status() | |
| result = response.json() | |
| if not result.get("success"): | |
| return error_response(f"Error: Scraping failed - {result.get('message', 'Unknown error')}") | |
| # Extract all the data | |
| context = result.get("context", {}) | |
| selectors = result.get("selectors", {}) | |
| data = result.get("data", []) | |
| links = result.get("link", []) | |
| current_page = result.get("page_position", 1) | |
| total_pages = result.get("page_position_length", 1) | |
| # Format pagination info | |
| pagination_text = f"Page {current_page} of {total_pages}" | |
| if total_pages > 1: | |
| pagination_text += f" (Total pages available: {total_pages})" | |
| status_text = f"✅ Successfully scraped {len(data)} data items" | |
| if context: | |
| status_text += f" with {len(context)} context elements" | |
| return ( | |
| status_text, | |
| gr.update(value=context, visible=True if context else False), | |
| gr.update(value=selectors, visible=True if selectors else False), | |
| gr.update(value=data, visible=True if data else False), | |
| gr.update(value=links, visible=True if links else False), | |
| gr.update(value=pagination_text, visible=True), | |
| ) | |
| except requests.exceptions.RequestException as req_err: | |
| return error_response(f"Request failed: {str(req_err)}") | |
| except Exception as e: | |
| return error_response(f"Unexpected error: {str(e)}") | |
| def get_rate_limit_status(request: gr.Request): | |
| """Get current rate limit status for the user""" | |
| if not request: | |
| return {"error": "Unable to get request info"} | |
| ip = get_real_ip(request) | |
| now = time.time() | |
| # Clean up old timestamps | |
| request_times[ip] = [t for t in request_times[ip] if now - t < TIME_WINDOW] | |
| current_requests = len(request_times[ip]) | |
| time_window_minutes = round(TIME_WINDOW / 60, 1) | |
| if current_requests >= MAX_REQUESTS: | |
| time_remaining = int(TIME_WINDOW - (now - request_times[ip][0])) | |
| time_remaining_minutes = round(time_remaining / 60, 1) | |
| return { | |
| "status": "Rate limited", | |
| "current_requests": current_requests, | |
| "max_requests": MAX_REQUESTS, | |
| "time_window_minutes": time_window_minutes, | |
| "time_remaining_minutes": time_remaining_minutes | |
| } | |
| else: | |
| return { | |
| "status": "Available", | |
| "current_requests": current_requests, | |
| "max_requests": MAX_REQUESTS, | |
| "time_window_minutes": time_window_minutes, | |
| "remaining_requests": MAX_REQUESTS - current_requests | |
| } | |
| # ----------------- Gradio UI ------------------ | |
| with gr.Blocks() as demo: | |
| gr.Markdown(""" | |
| <div style='text-align: center; margin-bottom: 24px;'> | |
| <h1 style='font-size:2.2em; margin-bottom: 0.2em;'>🧩 AI Scraper</h1> | |
| <p style='font-size:1.2em; margin-top: 0;'>Extract structured data from web pages with advanced AI models.</p> | |
| <p style='font-size:1em; margin-top: 0.5em;'>For more details and API usage, see the <a href='https://jigsawstack.com/docs/api-reference/ai/scrape' target='_blank'>documentation</a>.</p> | |
| <p style='font-size:0.9em; margin-top: 0.5em; color: #666;'>Rate limit: 1 request per hour per IP address</p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("#### Input Method") | |
| input_method_scraper = gr.Radio( | |
| choices=["URL", "HTML Content"], | |
| label="Choose Input Method", | |
| value="URL" | |
| ) | |
| # Conditional inputs based on selection | |
| url_scraper = gr.Textbox( | |
| label="Page URL", | |
| placeholder="https://example.com/pricing", | |
| info="URL of the page to scrape" | |
| ) | |
| html_content = gr.Textbox( | |
| label="HTML Content", | |
| lines=8, | |
| placeholder="<html>...</html>", | |
| visible=False, | |
| info="Raw HTML content to scrape" | |
| ) | |
| gr.Markdown("#### Scraping Configuration") | |
| element_prompts = gr.Textbox( | |
| label="Element Prompts (comma-separated)", | |
| lines=3, | |
| placeholder="Plan title, Plan price, Features, Button text", | |
| info="Items to scrape (max 5). E.g., 'Plan price', 'Plan title'" | |
| ) | |
| root_selector = gr.Textbox( | |
| label="Root Element Selector", | |
| value="main", | |
| placeholder="main, .container, #content", | |
| info="CSS selector to limit scraping scope (default: main)" | |
| ) | |
| page_position = gr.Number( | |
| label="Page Position", | |
| value=1, | |
| minimum=1, | |
| info="For pagination, current page number (min: 1)" | |
| ) | |
| with gr.Column(): | |
| gr.Markdown("#### Results") | |
| scrape_status = gr.Textbox( | |
| label="Status", | |
| interactive=False, | |
| placeholder="Ready to scrape..." | |
| ) | |
| gr.Markdown("#### Extracted Data") | |
| context_output = gr.JSON( | |
| label="Context Data", | |
| visible=False | |
| ) | |
| selectors_output = gr.JSON( | |
| label="CSS Selectors Used", | |
| visible=False | |
| ) | |
| detailed_data = gr.JSON( | |
| label="Detailed Scrape Data", | |
| visible=False | |
| ) | |
| links_data = gr.JSON( | |
| label="Detected Links", | |
| visible=False | |
| ) | |
| gr.Markdown("#### Pagination Info") | |
| pagination_info = gr.Textbox( | |
| label="Page Information", | |
| interactive=False, | |
| visible=False | |
| ) | |
| scrape_btn = gr.Button("Scrape with AI", variant="primary") | |
| # Function to show/hide input groups based on selection | |
| def update_scraper_input_visibility(method): | |
| if method == "URL": | |
| return gr.Textbox(visible=True), gr.Textbox(visible=False) | |
| elif method == "HTML Content": | |
| return gr.Textbox(visible=False), gr.Textbox(visible=True) | |
| else: | |
| return gr.Textbox(visible=True), gr.Textbox(visible=False) | |
| input_method_scraper.change( | |
| update_scraper_input_visibility, | |
| inputs=input_method_scraper, | |
| outputs=[url_scraper, html_content] | |
| ) | |
| scrape_btn.click( | |
| enhanced_ai_scrape, | |
| inputs=[input_method_scraper, url_scraper, html_content, element_prompts, root_selector, page_position], | |
| outputs=[scrape_status, context_output, selectors_output, detailed_data, links_data, pagination_info], | |
| ) | |
| demo.launch() | |