Spaces:
Runtime error
Runtime error
| import os | |
| import shutil | |
| import warnings | |
| from loguru import logger | |
| import mlflow | |
| import numpy as np | |
| from numpy import ndarray | |
| from sklearn.metrics import ( | |
| accuracy_score, | |
| classification_report, | |
| f1_score, | |
| precision_score, | |
| recall_score, | |
| ) | |
| import torch | |
| from torch.utils.data import Dataset | |
| from transformers import ( | |
| AutoModelForSequenceClassification, | |
| AutoTokenizer, | |
| EarlyStoppingCallback, | |
| Trainer, | |
| TrainingArguments, | |
| ) | |
| from turing.config import MODELS_DIR | |
| from ..baseModel import BaseModel | |
| warnings.filterwarnings("ignore") | |
| def compute_metrics(eval_pred): | |
| predictions, labels = eval_pred | |
| # Sigmoid function to convert logits to probabilities | |
| probs = 1 / (1 + np.exp(-predictions)) | |
| # Apply threshold of 0.5 (becomes 1 if > 0.5, otherwise 0) | |
| preds = (probs > 0.5).astype(int) | |
| # Calculate F1 score (macro average for multi-label) | |
| f1 = f1_score(labels, preds, average='macro') | |
| precision = precision_score(labels, preds, average='macro', zero_division=0) | |
| recall = recall_score(labels, preds, average='macro', zero_division=0) | |
| return { | |
| 'f1': f1, | |
| 'precision': precision, | |
| 'recall': recall, | |
| } | |
| class CodeBERTaDataset(Dataset): | |
| """ | |
| Internal Dataset class for CodeBERTa. | |
| """ | |
| def __init__(self, encodings, labels=None, num_labels=None): | |
| """ | |
| Initialize the InternalDataset. | |
| Args: | |
| encodings (dict): Tokenized encodings. | |
| labels (list or np.ndarray, optional): Corresponding labels. | |
| num_labels (int, optional): Total number of classes. Required for auto-converting indices to one-hot. | |
| """ | |
| self.encodings = {key: torch.tensor(val) for key, val in encodings.items()} | |
| if labels is not None: | |
| if not isinstance(labels, (np.ndarray, torch.Tensor)): | |
| labels = np.array(labels) | |
| # Case A: labels are indices (integers) | |
| if num_labels is not None and (len(labels.shape) == 1 or (len(labels.shape) == 2 and labels.shape[1] == 1)): | |
| labels_flat = labels.flatten() | |
| # Create one-hot encoded matrix | |
| one_hot = np.zeros((len(labels_flat), num_labels), dtype=np.float32) | |
| # Set the corresponding index to 1 | |
| valid_indices = labels_flat < num_labels | |
| one_hot[valid_indices, labels_flat[valid_indices]] = 1.0 | |
| self.labels = torch.tensor(one_hot, dtype=torch.float) | |
| # Case B: labels are already vectors (e.g., One-Hot or Multi-Hot) | |
| else: | |
| self.labels = torch.tensor(labels, dtype=torch.float) | |
| else: | |
| self.labels = None | |
| def __getitem__(self, idx): | |
| """ | |
| Retrieve item at index idx. | |
| Args: | |
| idx (int): Index of the item to retrieve. | |
| Returns: | |
| dict: Dictionary containing input_ids, attention_mask, and labels (if available). | |
| """ | |
| item = {key: val[idx] for key, val in self.encodings.items()} | |
| if self.labels is not None: | |
| item['labels'] = self.labels[idx] | |
| return item | |
| def __len__(self): | |
| """ | |
| Return the length of the dataset. | |
| Returns: | |
| int: Length of the dataset. | |
| """ | |
| return len(self.encodings['input_ids']) | |
| class CodeBERTa(BaseModel): | |
| """ | |
| HuggingFace implementation of BaseModel for Code Comment Classification. | |
| Uses CodeBERTa-small-v1 for efficient inference. | |
| """ | |
| def __init__(self, language, path=None): | |
| """ | |
| Initialize the CodeBERTa model with configuration parameters. | |
| Args: | |
| language (str): Language for the model. | |
| path (str, optional): Path to load a pre-trained model. Defaults to None. | |
| """ | |
| self.params = { | |
| "model_name_hf": "huggingface/CodeBERTa-small-v1", | |
| "num_labels": 7 if language == "java" else 5 if language == "python" else 6, | |
| "max_length": 128, | |
| "epochs": 15, | |
| "batch_size_train": 16, | |
| "batch_size_eval": 64, | |
| "learning_rate": 1e-5, | |
| "weight_decay": 0.02, | |
| "train_size": 0.8, | |
| "early_stopping_patience": 3, | |
| "early_stopping_threshold": 0.005 | |
| } | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.tokenizer = None | |
| super().__init__(language, path) | |
| def setup_model(self): | |
| """ | |
| Initialize the CodeBERTa tokenizer and model. | |
| """ | |
| logger.info(f"Initializing {self.params['model_name_hf']} on {self.device}...") | |
| self.tokenizer = AutoTokenizer.from_pretrained(self.params["model_name_hf"]) | |
| self.model = AutoModelForSequenceClassification.from_pretrained( | |
| self.params["model_name_hf"], | |
| num_labels=self.params["num_labels"], | |
| problem_type="multi_label_classification" | |
| ).to(self.device) | |
| logger.info("CodeBERTa model initialized.") | |
| def _tokenize(self, texts): | |
| """ | |
| Helper to tokenize list of texts efficiently. | |
| Args: | |
| texts (list): List of text strings to tokenize. | |
| Returns: | |
| dict: Tokenized encodings. | |
| """ | |
| safe_texts = [] | |
| for t in texts: | |
| if t is None: | |
| safe_texts.append("") | |
| elif isinstance(t, (int, float)): | |
| if t != t: # NaN check | |
| safe_texts.append("") | |
| else: | |
| safe_texts.append(str(t)) | |
| else: | |
| safe_texts.append(str(t)) | |
| return self.tokenizer( | |
| safe_texts, | |
| truncation=True, | |
| padding=True, | |
| max_length=self.params["max_length"] | |
| ) | |
| def train(self, X_train, y_train) -> dict[str,any]: | |
| """ | |
| Train the model using HF Trainer and log to MLflow. | |
| Args: | |
| X_train (list): Training input texts. | |
| y_train (list or np.ndarray): Training labels. | |
| Returns: | |
| dict[str, any]: Dictionary of parameters used for training. | |
| """ | |
| if self.model is None: | |
| raise ValueError("Model is not initialized. Call setup_model() before training.") | |
| # log parameters to MLflow without model_name_hf | |
| params_to_log = {k: v for k, v in self.params.items() if k != "model_name_hf" and k != "num_labels"} | |
| logger.info(f"Starting training for: {self.language.upper()}") | |
| # Prepare dataset (train/val split) | |
| train_encodings = self._tokenize(X_train) | |
| full_dataset = CodeBERTaDataset(train_encodings, y_train, num_labels=self.params["num_labels"]) | |
| train_size = int(self.params["train_size"] * len(full_dataset)) | |
| val_size = len(full_dataset) - train_size | |
| train_dataset, val_dataset = torch.utils.data.random_split(full_dataset, [train_size, val_size]) | |
| temp_ckpt_dir = os.path.join(MODELS_DIR, "temp_checkpoints") | |
| use_fp16 = torch.cuda.is_available() | |
| if not use_fp16: | |
| logger.info("Mixed Precision (fp16) disabled because CUDA is not available.") | |
| training_args = TrainingArguments( | |
| output_dir=temp_ckpt_dir, | |
| num_train_epochs=self.params["epochs"], | |
| per_device_train_batch_size=self.params["batch_size_train"], | |
| per_device_eval_batch_size=self.params["batch_size_eval"], | |
| learning_rate=self.params["learning_rate"], | |
| weight_decay=self.params["weight_decay"], | |
| eval_strategy="epoch", | |
| save_strategy="epoch", | |
| load_best_model_at_end=True, | |
| metric_for_best_model="f1", | |
| greater_is_better=True, | |
| save_total_limit=2, | |
| logging_dir='./logs', | |
| logging_steps=50, | |
| fp16=use_fp16, | |
| optim="adamw_torch", | |
| report_to="none", | |
| no_cuda=not torch.cuda.is_available() | |
| ) | |
| trainer = Trainer( | |
| model=self.model, | |
| args=training_args, | |
| train_dataset=train_dataset, | |
| eval_dataset=val_dataset, | |
| compute_metrics=compute_metrics, | |
| callbacks=[EarlyStoppingCallback(early_stopping_patience=self.params["early_stopping_patience"], early_stopping_threshold=self.params["early_stopping_threshold"])] | |
| ) | |
| trainer.train() | |
| logger.info(f"Training for {self.language.upper()} completed.") | |
| if os.path.exists(temp_ckpt_dir): | |
| shutil.rmtree(temp_ckpt_dir) | |
| return params_to_log | |
| def evaluate(self, X_test, y_test) -> dict[str,any]: | |
| """ | |
| Evaluate model on test data, return metrics and log to MLflow. | |
| Handles automatic conversion of y_test to match multi-label prediction shape. | |
| Args: | |
| X_test (list): Input test data. | |
| y_test (list or np.ndarray): True labels for test data. | |
| Returns: | |
| dict[str, any]: Dictionary of evaluation metrics. | |
| """ | |
| # Obtain predictions | |
| y_pred = self.predict(X_test) | |
| # Convert y_test to numpy array if needed | |
| if not isinstance(y_test, (np.ndarray, torch.Tensor)): | |
| y_test_np = np.array(y_test) | |
| elif isinstance(y_test, torch.Tensor): | |
| y_test_np = y_test.cpu().numpy() | |
| else: | |
| y_test_np = y_test | |
| num_labels = self.params["num_labels"] | |
| is_multilabel_pred = (y_pred.ndim == 2 and y_pred.shape[1] > 1) | |
| is_flat_truth = (y_test_np.ndim == 1) or (y_test_np.ndim == 2 and y_test_np.shape[1] == 1) | |
| if is_multilabel_pred and is_flat_truth: | |
| # Create a zero matrix | |
| y_test_expanded = np.zeros((y_test_np.shape[0], num_labels), dtype=int) | |
| # Flatten y_test for iteration | |
| indices = y_test_np.flatten() | |
| # Use indices to set the correct column to 1 | |
| for i, label_idx in enumerate(indices): | |
| idx = int(label_idx) | |
| if 0 <= idx < num_labels: | |
| y_test_expanded[i, idx] = 1 | |
| y_test_np = y_test_expanded | |
| # Generate classification report | |
| report = classification_report(y_test_np, y_pred, zero_division=0) | |
| print("\n" + "=" * 50) | |
| print("CLASSIFICATION REPORT") | |
| print(report) | |
| print("=" * 50 + "\n") | |
| metrics = { | |
| "accuracy": accuracy_score(y_test_np, y_pred), | |
| "precision": precision_score(y_test_np, y_pred, average="macro", zero_division=0), | |
| "recall": recall_score(y_test_np, y_pred, average="macro", zero_division=0), | |
| "f1_score": f1_score(y_test_np, y_pred, average="macro"), | |
| } | |
| mlflow.log_metrics(metrics) | |
| logger.info( | |
| f"Evaluation completed — Accuracy: {metrics['accuracy']:.3f}, F1: {metrics['f1_score']:.3f}" | |
| ) | |
| return metrics | |
| def predict(self, X) -> ndarray: | |
| """ | |
| Make predictions for Multi-Label classification. | |
| Returns Binary Matrix (Multi-Hot) where multiple classes can be 1. | |
| Args: | |
| X (list): Input texts for prediction. | |
| Returns: | |
| np.ndarray: Multi-Hot Encoded predictions (e.g., [[0, 1, 1, 0], ...]) | |
| """ | |
| if self.model is None: | |
| raise ValueError("Model is not trained. Call train() or load() before prediction.") | |
| # Set model to evaluation mode | |
| self.model.eval() | |
| encodings = self._tokenize(X) | |
| # Pass None as labels because we are in inference | |
| dataset = CodeBERTaDataset(encodings, labels=None) | |
| use_fp16 = torch.cuda.is_available() | |
| training_args = TrainingArguments( | |
| output_dir="./pred_temp", | |
| per_device_eval_batch_size=self.params["batch_size_eval"], | |
| fp16=use_fp16, | |
| report_to="none", | |
| no_cuda=not torch.cuda.is_available() | |
| ) | |
| trainer = Trainer(model=self.model, args=training_args) | |
| output = trainer.predict(dataset) | |
| # Clean up temporary prediction directory | |
| if os.path.exists("./pred_temp"): | |
| shutil.rmtree("./pred_temp") | |
| # Convert logits to probabilities | |
| logits = output.predictions | |
| probs = 1 / (1 + np.exp(-logits)) | |
| # Apply a threshold of 0.5 (if prob > 0.5, predict 1 else 0) | |
| preds_binary = (probs > 0.5).astype(int) | |
| return preds_binary | |
| def save(self, path, model_name): | |
| """ | |
| Save model locally and log to MLflow as artifact. | |
| Args: | |
| path (str): Directory path to save the model. | |
| model_name (str): Name for the saved model. | |
| """ | |
| if self.model is None: | |
| raise ValueError("Model is not trained. Cannot save uninitialized model.") | |
| # Local Saving | |
| complete_path = os.path.join(path, f"{model_name}_{self.language}") | |
| # Remove existing directory if it exists | |
| if os.path.exists(complete_path) and os.path.isdir(complete_path): | |
| shutil.rmtree(complete_path) | |
| # Save model and tokenizer | |
| logger.info(f"Saving model to: {complete_path}") | |
| self.model.save_pretrained(complete_path) | |
| self.tokenizer.save_pretrained(complete_path) | |
| logger.info("Model saved locally.") | |
| try: | |
| # Log to MLflow | |
| logger.info("Logging artifacts to MLflow...") | |
| mlflow.log_artifacts(local_dir=complete_path, artifact_path=f"{model_name}_{self.language}") | |
| except Exception as e: | |
| logger.error(f"Failed to log model artifacts to MLflow: {e}") | |
| def load(self, model_path): | |
| """ | |
| Load model from a local path OR an MLflow URI. | |
| Args: | |
| model_path (str): Local path or MLflow URI to load the model from. | |
| """ | |
| logger.info(f"Loading model from: {model_path}") | |
| local_model_path = model_path | |
| # Downloading model from MLflow and saving to local path | |
| if model_path.startswith("models:/") or model_path.startswith("runs:/"): | |
| try: | |
| logger.info("Detected MLflow model URI. Attempting to load from MLflow...") | |
| local_model_path = os.path.join(MODELS_DIR, "mlflow_temp_models") | |
| local_model_path = mlflow.artifacts.download_artifacts(artifact_uri=model_path, dst_path=local_model_path) | |
| logger.info(f"Model downloaded from MLflow to: {local_model_path}") | |
| except Exception as e: | |
| logger.error(f"Failed to load from MLflow: {e}") | |
| raise e | |
| # Loading from local path | |
| try: | |
| if not os.path.exists(local_model_path): | |
| raise FileNotFoundError(f"Model path not found: {local_model_path}") | |
| # Load tokenizer and model from local path | |
| self.tokenizer = AutoTokenizer.from_pretrained(local_model_path) | |
| self.model = AutoModelForSequenceClassification.from_pretrained( | |
| local_model_path | |
| ).to(self.device) | |
| logger.info("Model loaded from local path successfully.") | |
| except Exception as e: | |
| logger.error(f"Failed to load model from local path: {e}") | |
| raise e | |
| # Set model to evaluation mode | |
| self.model.eval() |