import uvicorn from fastapi import FastAPI, HTTPException, UploadFile, File, Form from pydantic import BaseModel, Field from transformers import pipeline import torch import os import json import httpx import shutil import whisper import librosa import numpy as np from dotenv import load_dotenv from typing import Optional, List import uuid try: from src.pronunciation import grade_pronunciation_advanced except ImportError: from pronunciation import grade_pronunciation_advanced load_dotenv() SCORER_MODEL_ID_TASK1 = "diminch/ielts-task1-grader-ai-v2" SCORER_MODEL_ID_TASK2 = "diminch/ielts-grader-ai-v2" DEVICE = "cuda:0" if torch.cuda.is_available() else "cpu" print(f"API running on: {DEVICE}") OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") OPENAI_API_URL = "https://api.openai.com/v1/chat/completions" if not OPENAI_API_KEY: print("WARNING: OPENAI_API_KEY not found in .env") print("Loading Whisper...") try: whisper_model = whisper.load_model("base", device=DEVICE) print("Whisper Loaded.") except Exception as e: print(f"Error loading Whisper: {e}") whisper_model = None pipelines = {} def load_writing_model(task_name, model_id): try: print(f"Loading {task_name}: {model_id}...") pipelines[task_name] = pipeline( "text-classification", model=model_id, tokenizer=model_id, device=DEVICE, return_all_scores=True ) print(f"Loaded {task_name}.") except Exception as e: print(f"Error loading {task_name}: {e}") pipelines[task_name] = None load_writing_model("task1", SCORER_MODEL_ID_TASK1) load_writing_model("task2", SCORER_MODEL_ID_TASK2) class WritingRequest(BaseModel): task_type: int prompt: str essay: str image_url: Optional[str] = None class WritingScores(BaseModel): taskResponse: float coherenceCohesion: float lexicalResource: float grammaticalRange: float class ShortFeedbackWriting(BaseModel): taskResponse: str coherenceCohesion: str lexicalResource: str grammaticalRange: str class WritingResponse(BaseModel): overallScore: float imageDescription: Optional[str] = None criteriaScores: WritingScores shortFeedback: ShortFeedbackWriting detailedFeedback: str class SpeakingScores(BaseModel): fluencyCoherence: float lexicalResource: float grammaticalRange: float pronunciation: float class PronunciationWord(BaseModel): word: str score: int phonemes_expected: str phonemes_actual: str is_correct: bool error_type: Optional[str] = None class SpeakingResponse(BaseModel): overallScore: float transcript: str refinedTranscript: str betterVersion: str criteriaScores: SpeakingScores shortFeedback: dict detailedFeedback: str pronunciationBreakdown: List[PronunciationWord] def round_to_half(score: float) -> float: return round(score * 2) / 2 async def analyze_chart_image(image_url: str, prompt_text: str) -> str: """Vision AI for Task 1""" if not image_url: return "No image provided." print("Analyzing chart image...") headers = { "Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json" } vision_prompt = f""" Act as a data analyst. Describe this IELTS Writing Task 1 image in detail. Focus strictly on the main trends, comparisons, and specific data points mentioned in the prompt: "{prompt_text}". Output a factual description paragraph representing the 'Ground Truth' of the image. """ payload = { "model": "gpt-4o", "messages": [{"role": "user", "content": [ {"type": "text", "text": vision_prompt}, {"type": "image_url", "image_url": {"url": image_url}} ]}], "max_tokens": 500 } async with httpx.AsyncClient(timeout=60.0) as client: try: resp = await client.post(OPENAI_API_URL, headers=headers, json=payload) return resp.json()['choices'][0]['message']['content'] except Exception as e: print(f"Vision Error: {e}") return "" async def generate_writing_feedback(prompt: str, essay: str, scores: WritingScores, task_type: int, img_desc: str = "") -> dict: print("Generating Writing feedback...") scores_dict = scores.model_dump() context_info = "" criterion_1_name = "Task Response" if task_type == 1: context_info = f"IMAGE GROUND TRUTH: {img_desc}\n(Check if the student accurately reported this data)" criterion_1_name = "Task Achievement" system_prompt = f""" You are a strict, expert IELTS Examiner. TASK INFO: - Type: Task {task_type} - Prompt: "{prompt}" {context_info} STUDENT ESSAY: "{essay}" SCORES GIVEN (0-9): {json.dumps(scores_dict)} YOUR GOAL: Provide a deeply analytical and educational feedback JSON. INSTRUCTIONS FOR 'detailedFeedback': The 'detailedFeedback' field MUST be a long Markdown string structured as follows: 1. **General Overview**: A brief summary of why the essay got this band score. 2. **Strengths & Weaknesses**: Bullet points highlighting what was done well and what was missing in each criteria (one by one, four criterias in total). 3. **Specific Corrections (CRITICAL)**: - Identify 3-4 specific errors (grammar, vocab, or data accuracy). - For each error, show the "Original Text" -> "Correction" -> "Explanation". - Example: *Original: "The data shows an increase." -> Better: "The data illustrates a significant upward trend." (Explanation: Use more precise academic vocabulary).* 4. **Actionable Advice**: Give 2-3 concrete steps the student should take to improve their score next time. Output JSON format: {{ "shortFeedback": {{ "{criterion_1_name}": "...", "Coherence and Cohesion": "...", "Lexical Resource": "...", "Grammatical Range and Accuracy": "..." }}, "detailedFeedback": "MARKDOWN STRING..." }} """ payload = { "model": "gpt-4o-mini", "messages": [{"role": "system", "content": system_prompt}], "response_format": {"type": "json_object"} } async with httpx.AsyncClient(timeout=60.0) as client: resp = await client.post(OPENAI_API_URL, headers={"Authorization": f"Bearer {OPENAI_API_KEY}"}, json=payload) return json.loads(resp.json()['choices'][0]['message']['content']) app = FastAPI(title="IELTS Full-Stack AI API (V15.0)") @app.post("/grade/writing", response_model=WritingResponse) async def grade_writing(request: WritingRequest): model = pipelines.get(f"task{request.task_type}") if not model: raise HTTPException(500, "Model not ready.") image_desc = "" if request.task_type == 1: if not request.image_url: raise HTTPException(400, "Task 1 requires image_url.") image_desc = await analyze_chart_image(request.image_url, request.prompt) final_input = f"PROMPT: {request.prompt}\n\nIMAGE CONTEXT: {image_desc} [SEP] {request.essay}" else: final_input = f"{request.prompt} [SEP] {request.essay}" results = model(final_input, truncation=True, max_length=512)[0] raw = {item['label']: item['score'] for item in results} def r(x): return round(x * 2) / 2 scores = WritingScores( taskResponse=r(raw.get('LABEL_0', 1.0)), coherenceCohesion=r(raw.get('LABEL_1', 1.0)), lexicalResource=r(raw.get('LABEL_2', 1.0)), grammaticalRange=r(raw.get('LABEL_3', 1.0)) ) overall = r((scores.taskResponse + scores.coherenceCohesion + scores.lexicalResource + scores.grammaticalRange) / 4) # Feedback fb = await generate_writing_feedback(request.prompt, request.essay, scores, request.task_type, image_desc) sf = fb.get("shortFeedback", {}) tr_fb = sf.get("Task Response") or sf.get("Task Achievement") or "No feedback" return WritingResponse( overallScore=overall, imageDescription=image_desc if request.task_type == 1 else None, criteriaScores=scores, shortFeedback=ShortFeedbackWriting( taskResponse=tr_fb, coherenceCohesion=sf.get("Coherence and Cohesion", ""), lexicalResource=sf.get("Lexical Resource", ""), grammaticalRange=sf.get("Grammatical Range and Accuracy", "") ), detailedFeedback=fb.get("detailedFeedback", "") ) async def grade_speaking_with_gpt(transcript: str, metrics: dict, ipa_data: dict, prompt_text: str) -> dict: """ Generate Speaking feedback with Pronunciation Breakdown array. """ print("Generating Speaking feedback...") system_prompt = f""" You are an expert IELTS Speaking Examiner and Phonetician. INPUT DATA: - Question: "{prompt_text}" - Transcript (Whisper): "{transcript}" - Raw Audio IPA (Actual): /{ipa_data.get('actual_ipa', '')}/ - Expected IPA (Standard): /{ipa_data.get('expected_ipa', '')}/ METRICS: - Speed: {metrics['wpm']:.1f} WPM - Pauses: {metrics['pause_ratio']*100:.1f}% YOUR TASK: 1. Score the 4 criteria (0-9). 2. **Pronunciation Breakdown**: Map words from Transcript to the IPA. Identify mispronounced words. - Compare Actual vs Expected IPA for each word. - Assign a score (1-10) for each word's pronunciation. - Flag errors (e.g., 'severe_substitution' if user said 'trip' but meant 'subject'). OUTPUT JSON FORMAT (This is sample structure, replace with actual data): {{ "scores": {{ "fluencyCoherence": 0.0, "lexicalResource": 0.0, "grammaticalRange": 0.0, "pronunciation": 0.0 }}, "shortFeedback": {{ "Fluency": "...", "Vocabulary": "...", "Grammar": "...", "Pronunciation": "..." }}, "detailedFeedback": "MARKDOWN string...", "refinedTranscript": "Corrected version...", "betterVersion": "Upgraded Band 8 version...", "pronunciationBreakdown": [ {{ "word": "subject", "score": 3, "phonemes_expected": "s ʌ b dʒ ɛ k t", "phonemes_actual": "t r ɪ p", "is_correct": false, "error_type": "severe_substitution" }}, ... (more words) ] }} """ payload = { "model": "gpt-4o-mini", "messages": [{"role": "system", "content": system_prompt}], "response_format": {"type": "json_object"} } async with httpx.AsyncClient(timeout=60.0) as client: resp = await client.post(OPENAI_API_URL, headers={"Authorization": f"Bearer {OPENAI_API_KEY}"}, json=payload) return json.loads(resp.json()['choices'][0]['message']['content']) @app.post("/grade/speaking", response_model=SpeakingResponse) async def grade_speaking(audio: UploadFile = File(...), prompt: str = Form(...)): temp_filename = f"temp_{uuid.uuid4()}.wav" try: with open(temp_filename, "wb") as buffer: shutil.copyfileobj(audio.file, buffer) # 1. Whisper & Acoustic Metrics if not whisper_model: raise HTTPException(500, "Whisper missing") res = whisper_model.transcribe(temp_filename) transcript = res["text"].strip() y, sr = librosa.load(temp_filename) duration = librosa.get_duration(y=y, sr=sr) word_count = len(transcript.split()) wpm = (word_count / duration) * 60 if duration > 0 else 0 non_silent = librosa.effects.split(y, top_db=20) silent_time = duration - sum([(e-s)/sr for s,e in non_silent]) pause_ratio = silent_time / duration if duration > 0 else 0 metrics = {"wpm": wpm, "pause_ratio": pause_ratio} # 2. IPA Analysis (Subprocess based) ipa_data = grade_pronunciation_advanced(temp_filename, transcript) # 3. GPT Analysis gpt_result = await grade_speaking_with_gpt(transcript, metrics, ipa_data, prompt) scores = gpt_result.get("scores", {}) # 4. Response criteria = SpeakingScores( fluencyCoherence=round_to_half(scores.get("fluencyCoherence", 0)), lexicalResource=round_to_half(scores.get("lexicalResource", 0)), grammaticalRange=round_to_half(scores.get("grammaticalRange", 0)), pronunciation=round_to_half(scores.get("pronunciation", 0)) ) overall = round_to_half((criteria.fluencyCoherence + criteria.lexicalResource + criteria.grammaticalRange + criteria.pronunciation) / 4) return SpeakingResponse( overallScore=overall, transcript=transcript, refinedTranscript=gpt_result.get("refinedTranscript", ""), betterVersion=gpt_result.get("betterVersion", ""), criteriaScores=criteria, shortFeedback=gpt_result.get("shortFeedback", {}), detailedFeedback=gpt_result.get("detailedFeedback", ""), pronunciationBreakdown=gpt_result.get("pronunciationBreakdown", []) ) except Exception as e: print(f"Speaking Error: {e}") import traceback traceback.print_exc() raise HTTPException(500, str(e)) finally: if os.path.exists(temp_filename): os.remove(temp_filename) @app.get("/") def read_root(): return {"message": "IELTS API is running."} if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)