""" Gradio app for multi-label toxicity prediction using a Hugging Face model. Standalone file: app.py The model is ALWAYS loaded from the Hugging Face Hub via `from_pretrained(MODEL_ID)`. No Inference API is used. This works locally and on Hugging Face Spaces. Usage: 1) Install requirements: pip install transformers torch gradio numpy pandas huggingface-hub 2) Run locally: python app.py On Hugging Face Spaces: - The app logs to /data/toxicity_history.csv (persistent, not in the repo). """ import os import json import re import csv from html import escape import tempfile from pathlib import Path from datetime import datetime import shutil import torch import numpy as np import pandas as pd import gradio as gr from transformers import AutoTokenizer, AutoModelForSequenceClassification from huggingface_hub import hf_hub_download # ---- Config ---- MODEL_ID = "NathanDB/toxic-bert-dsti" # change if needed LABEL_COLS = ["toxic", "severe_toxic", "obscene", "threat", "insult", "identity_hate"] MAX_LEN = 128 # max token length for the model # ---- Admin access (for downloading logs) ---- # On HF Spaces, set ADMIN_KEY in Settings -> Variables & secrets ADMIN_KEY = os.environ.get("ADMIN_KEY", None) def admin_get_logs(key: str): """ Gradio callback to let the admin download the log file. - Returns the path to a TEMP COPY of LOG_FILE (in /tmp) if key matches ADMIN_KEY. - Returns None otherwise (file output will stay empty). """ # If no admin key configured, disable download if not ADMIN_KEY: print("ADMIN_KEY not set, refusing admin download.") return None if key != ADMIN_KEY: print("Wrong admin key, refusing admin download.") return None if not LOG_FILE.exists(): print("Log file does not exist yet.") return None # Copy to a temp location Gradio is happy with (/tmp) tmpdir = Path(tempfile.gettempdir()) tmp_path = tmpdir / f"toxicity_history_{datetime.utcnow().strftime('%Y%m%dT%H%M%S')}.csv" shutil.copy2(LOG_FILE, tmp_path) print("Admin download authorized, serving temp copy:", tmp_path) return str(tmp_path) # ---- Human-readable info per category (for the report) ---- LABEL_INFO = { "toxic": { "name": "Toxic", "severity": "medium", "desc": "General rude, aggressive, or hostile language that may be harmful to the conversation.", }, "severe_toxic": { "name": "Severe toxic", "severity": "high", "desc": "Extremely aggressive, abusive, or hateful language, often stronger than standard toxicity.", }, "obscene": { "name": "Obscene", "severity": "medium", "desc": "Explicitly vulgar or coarse language, often including swear words and offensive expressions.", }, "threat": { "name": "Threat", "severity": "critical", "desc": "Language that suggests violence, self-harm, or other physical danger directed at someone.", }, "insult": { "name": "Insult", "severity": "medium", "desc": "Direct attacks or derogatory statements about a person or group.", }, "identity_hate": { "name": "Identity hate", "severity": "critical", "desc": "Attacks targeting a person or group based on identity (race, religion, gender, etc.).", }, } # ---- Simple lexical dictionary to highlight risky words in the text ---- TOXIC_KEYWORDS = { "idiot": "insult", "stupid": "insult", "moron": "insult", "dumb": "insult", "worthless": "insult", "loser": "insult", "hate": "toxic", "kill": "threat", "die": "threat", "hang": "threat", "punch": "threat", "slap": "threat", "shit": "obscene", "fuck": "obscene", "fucking": "obscene", "bitch": "insult", "bastard": "obscene", "asshole": "insult", "retard": "insult", "retarded": "insult", # identity / group related terms – contextual, but useful as signal "nigger": "identity_hate", "faggot": "identity_hate", "spic": "identity_hate", "chink": "identity_hate", "kike": "identity_hate", } # ---- Thresholds used to convert probabilities -> binary labels ---- def load_thresholds(): local_path = "test/label_thresholds_test_tuned.json" # 1) Local file exists → use it if os.path.exists(local_path): with open(local_path, "r") as f: thr_dict = json.load(f) print("Loaded thresholds from LOCAL file.") return np.array([thr_dict[label] for label in LABEL_COLS], dtype=np.float32) # 2) Try to load from Hugging Face Hub try: hf_file = hf_hub_download( repo_id=MODEL_ID, filename="label_thresholds_test_tuned.json", repo_type="model", ) with open(hf_file, "r") as f: thr_dict = json.load(f) print("Loaded thresholds from Hugging Face Hub.") return np.array([thr_dict[label] for label in LABEL_COLS], dtype=np.float32) except Exception as e: print("WARNING: No thresholds found on HF Hub or locally. Using default 0.5.") print("Error detail:", e) return np.full(len(LABEL_COLS), 0.5, dtype=np.float32) THRESHOLDS = load_thresholds() # ---- Logging config ---- # HF Spaces: log to /data (persistent, not in repo) # Local: log to /data (add it to .gitignore) BASE_DIR = Path(__file__).resolve().parent ON_SPACE = bool(os.environ.get("SPACE_ID") or os.environ.get("HF_SPACE_ID")) if ON_SPACE: base_log_dir = Path("/data") else: base_log_dir = BASE_DIR / "data" base_log_dir.mkdir(parents=True, exist_ok=True) LOG_FILE = base_log_dir / "toxicity_history.csv" def log_interaction(text: str, result_dict: dict): """ Automatic, private logging of: - timestamp_utc - raw text - predicted_labels (comma-separated) - probabilities_by_label_json On HF Spaces this goes to /data/toxicity_history.csv (not visible in repo). """ if not isinstance(text, str) or text.strip() == "": return timestamp = datetime.utcnow().isoformat() predicted_labels = [ lbl for lbl, v in result_dict.items() if v.get("predicted", False) ] predicted_labels_str = ",".join(predicted_labels) probs_by_label = { lbl: float(v.get("probability", 0.0)) for lbl, v in result_dict.items() } file_exists = LOG_FILE.exists() print("Writing to:", LOG_FILE) with LOG_FILE.open("a", newline="", encoding="utf-8") as f: writer = csv.writer(f) if not file_exists: writer.writerow( [ "timestamp_utc", "text", "predicted_labels", "probabilities_by_label_json", ] ) writer.writerow( [ timestamp, text, predicted_labels_str, json.dumps(probs_by_label, ensure_ascii=False), ] ) # ---- Admin log retrieval (used by Gradio callback) ---- def admin_get_logs(key: str): """ Gradio callback to let the admin download the log file. - Returns the path to LOG_FILE if key matches ADMIN_KEY. - Returns None otherwise (file output will stay empty). """ # If no admin key configured, disable download if not ADMIN_KEY: print("ADMIN_KEY not set, refusing admin download.") return None if key != ADMIN_KEY: print("Wrong admin key, refusing admin download.") return None if not LOG_FILE.exists(): print("Log file does not exist yet.") return None # Return as string path for gr.File return str(LOG_FILE) # ---- Device & model load (always from HF Hub) ---- DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") print(f"Loading model '{MODEL_ID}' from Hugging Face Hub...") tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) model = AutoModelForSequenceClassification.from_pretrained(MODEL_ID) model.to(DEVICE) model.eval() print("Model loaded successfully.") # ---- Prediction helpers ---- def predict_toxicity(text: str): """Run the model (loaded from HF Hub) and return df + result_dict.""" if not isinstance(text, str) or text.strip() == "": empty_df = pd.DataFrame(columns=["label", "probability", "predicted"]) return empty_df, {} enc = tokenizer( [text], padding=True, truncation=True, max_length=MAX_LEN, return_tensors="pt", ).to(DEVICE) with torch.no_grad(): logits = model(**enc).logits probs = torch.sigmoid(logits).cpu().numpy()[0] preds = (probs >= THRESHOLDS).astype(int) result_dict = { lbl: { "probability": float(round(float(probs[i]), 6)), "predicted": bool(preds[i]), } for i, lbl in enumerate(LABEL_COLS) } rows = [] for i, lbl in enumerate(LABEL_COLS): prob = float(probs[i]) pred = int(preds[i]) rows.append({"label": lbl, "probability": round(prob, 6), "predicted": pred}) df = pd.DataFrame(rows).sort_values("probability", ascending=False).reset_index( drop=True ) return df, result_dict # Helper to save CSV (per-request results, separate from history log) def save_df_to_csv(df: pd.DataFrame): tmpdir = Path(tempfile.gettempdir()) path = tmpdir / f"toxicity_result_{os.getpid()}.csv" df.to_csv(path, index=False) return str(path) def highlight_risky_words(text: str, result_dict: dict): """ Simple lexical highlighter: - Wrap known toxic words in colored spans. - Distinguish between words in a category predicted toxic vs only lexically risky. """ if not isinstance(text, str) or text.strip() == "": return "", 0, [] tokens = text.split() highlighted_tokens = [] hits = [] for tok in tokens: clean = re.sub(r"\W+", "", tok).lower() if not clean: highlighted_tokens.append(escape(tok)) continue if clean in TOXIC_KEYWORDS: label = TOXIC_KEYWORDS[clean] is_active = bool(result_dict.get(label, {}).get("predicted", False)) if is_active: span = f"{escape(tok)}" else: span = f"{escape(tok)}" highlighted_tokens.append(span) hits.append((clean, label, is_active)) else: highlighted_tokens.append(escape(tok)) highlighted_html = " ".join(highlighted_tokens) unique_hits = {} for w, lbl, active in hits: key = (w, lbl) if key not in unique_hits: unique_hits[key] = active else: unique_hits[key] = unique_hits[key] or active hits_list = [(w, lbl, active) for (w, lbl), active in unique_hits.items()] return highlighted_html, len(hits_list), hits_list def build_result_html(df: pd.DataFrame, result_dict: dict, text: str): """Build a stylized HTML report with summary, metrics, and highlighted text.""" style = """ """ # No data case if df is None or df.empty or not result_dict: return style + """

Toxicity Analysis

Enter a sentence to get a detailed report.
No input
Type or paste a message on the left, then click Analyze.
""" # --- Basic metrics --- any_toxic = any(v["predicted"] for v in result_dict.values()) probs_arr = df["probability"].values.astype(float) max_prob = float(probs_arr.max()) max_label = df.iloc[0]["label"] avg_prob = float(probs_arr.mean()) clean_text = text.strip() n_chars = len(clean_text) n_tokens = len(clean_text.split()) if clean_text else 0 # Strong/weak positives, borderline labels, and margin-based confidence strong_pos = [] weak_pos = [] borderline = [] margin_abs_list = [] for lbl, v in result_dict.items(): prob = float(v["probability"]) thr = float(THRESHOLDS[LABEL_COLS.index(lbl)]) margin = prob - thr margin_abs = abs(margin) margin_abs_list.append(margin_abs) if v["predicted"]: if margin >= 0.2: strong_pos.append((lbl, prob, margin)) else: weak_pos.append((lbl, prob, margin)) else: if thr - 0.05 <= prob < thr: borderline.append((lbl, prob, thr - prob)) global_margin = float(np.mean(margin_abs_list)) if margin_abs_list else 0.0 if global_margin >= 0.3: confidence_label = "High" elif global_margin >= 0.15: confidence_label = "Medium" else: confidence_label = "Low" toxic_probs = [v["probability"] for k, v in result_dict.items() if v["predicted"]] max_toxic_prob = max(toxic_probs) if toxic_probs else max_prob if not any_toxic or max_toxic_prob < 0.20: severity_badge_class = "good" severity_text = "Low risk" elif max_toxic_prob < 0.50: severity_badge_class = "medium" severity_text = "Medium risk" else: severity_badge_class = "bad" severity_text = "High risk" toxic_categories = [ lbl.replace("_", " ").title() for lbl, v in result_dict.items() if v["predicted"] ] highlighted_html, n_hits, hits_list = highlight_risky_words(text, result_dict) # --- Build HTML --- html = style + "
" html += ( "

Toxicity Analysis

" "
Detailed model report for this message.
" f"
{severity_text}
" ) if any_toxic: toxic_str = ( ", ".join( [f"{cat}" for cat in toxic_categories] ) if toxic_categories else "toxic patterns" ) html += ( "
" f"
Message detected as toxic — " f"main categories: {toxic_str}.
" ) else: msg = "this message appears safe and appropriate according to the model and thresholds." if n_hits > 0: msg = ( "the model considers this message globally safe, although it contains some potentially risky terms used in a relatively neutral context." ) html += ( "
" f"
✅ No toxicity detected — {msg}
" ) html += "
" html += ( "
Max category" f"{max_label.replace('_',' ').title()} ({max_prob*100:.1f}%)
" ) html += "
Average toxicity" f"{avg_prob*100:.1f}%
" html += ( "
Model confidence" f"{confidence_label} (margin-based)
" ) html += ( "
Message length" f"{n_tokens} tokens · {n_chars} chars
" ) html += "
" html += "
Interpretation summary
" html += "" html += "
Message with highlighted risky words
" html += "
Red = words in categories predicted toxic, orange = lexically risky terms even if the model kept them below threshold.
" html += "
" if clean_text: html += highlighted_html else: html += "No text provided." html += "
" if n_hits > 0: html += "
" html += "Detected terms:" for word, lbl, active in sorted( hits_list, key=lambda x: (x[2], x[1], x[0]), reverse=True ): dot_class = "lex-dot-hard" if active else "lex-dot-soft" status_txt = "model: toxic" if active else "lexical only" html += ( f"" f"{escape(word)} · {lbl.replace('_',' ')}" f"({status_txt})" ) html += "
" html += "
Per-category probabilities
" html += "
Bars show the model probability for each label. The vertical red line is the decision threshold used in this app.
" html += "
" for _, row in df.iterrows(): label = row["label"] label_display = label.replace("_", " ") prob = float(row["probability"]) is_predicted = bool(result_dict[label]["predicted"]) threshold = float(THRESHOLDS[LABEL_COLS.index(label)]) threshold_percent = threshold * 100 prob_percent = prob * 100 margin = prob - threshold badge_class = "predicted-true" if is_predicted else "predicted-false" badge_text = "🚨 Toxic" if is_predicted else "✓ Safe" if margin >= 0: margin_txt = f"+{margin*100:.1f} pts over" else: margin_txt = f"{margin*100:.1f} pts under" html += "
" html += f"
{label_display}
" html += "
" html += "
" html += "
" html += f"
" html += f"
" html += "
" html += ( f"
0%" f"Threshold: {threshold_percent:.1f}%" "100%
" ) html += "
" html += f"
{prob_percent:.1f}%
" html += f"
{margin_txt}
" html += f"
{badge_text}
" html += "
" html += "
" html += "
Category definitions
" html += "
Short description of each label so you can interpret the scores.
" relevant_labels = set() for lbl, v in result_dict.items(): prob = float(v["probability"]) thr = float(THRESHOLDS[LABEL_COLS.index(lbl)]) if v["predicted"] or prob >= 0.20 or thr - 0.05 <= prob < thr: relevant_labels.add(lbl) if not relevant_labels: relevant_labels = set(LABEL_COLS) html += "
" for lbl in LABEL_COLS: if lbl not in relevant_labels: continue info = LABEL_INFO.get(lbl, {}) name = info.get("name", lbl.replace("_", " ").title()) sev = info.get("severity", "medium").title() desc = info.get("desc", "") html += "
" html += f"
{name}
" html += f"
Severity: {sev}
" html += f"
{escape(desc)}
" html += "
" html += "
" html += "" html += "
" return html # ---- Gradio UI ---- with gr.Blocks(title="Toxicity Analyzer") as demo: gr.HTML( "

🛡️ Toxicity Analyzer

" ) with gr.Row(): with gr.Column(scale=2): txt = gr.Textbox( label="Text to analyze (in English)", placeholder="Type or paste text here...", lines=6, ) with gr.Row(): btn = gr.Button("Analyze", variant="primary", scale=2) btn_clear = gr.Button("Clear", scale=1) with gr.Row(): out_html = gr.HTML() download_file = gr.File(label="📥 Download CSV (this run only)", visible=False) def analyze(text): df, result_dict = predict_toxicity(text) html = build_result_html(df, result_dict, text) # log every call (private, server-side only) if result_dict: log_interaction(text, result_dict) if df is None or df.empty: return html, gr.update(visible=False) csv_path = save_df_to_csv(df) return html, csv_path def clear_all(): return "", gr.update(visible=False) btn.click(analyze, inputs=txt, outputs=[out_html, download_file]) btn_clear.click(clear_all, inputs=None, outputs=[txt, download_file]) gr.Examples( examples=[ "I will kill you!", "You are wonderful and helpful.", "Get out of here, you idiot.", "This is the best day ever!", "I hate everything about this.", "You are so stupid and worthless.", "Let's grab coffee tomorrow.", "Go die in a fire.", "Have a great day!", "I'm going to punch you in the face.", ], inputs=txt, ) # ---- Admin-only log download section ---- with gr.Accordion("Admin Tools (restricted)", open=False): admin_key_box = gr.Textbox( label="Enter admin key", type="password", placeholder="Admin key required", ) admin_download = gr.File( label="Infos", interactive=False, ) admin_btn = gr.Button("Get Infos") admin_btn.click( admin_get_logs, # top-level function inputs=admin_key_box, outputs=admin_download, ) if __name__ == "__main__": demo.launch( server_name="0.0.0.0", server_port=7860, allowed_paths=[str(base_log_dir)], # this points to /data on Spaces share=False, # or just remove this argument )