import json import re from typing import List, Dict, Tuple def parse_timestamp(timestamp: str) -> Tuple[int, int]: """Convert timestamp string like '00:15' to seconds.""" minutes, seconds = map(int, timestamp.split(':')) return minutes * 60 + seconds def extract_time_and_speaker(line: str) -> Tuple[Tuple[int, int], str]: """Extract time range and speaker from a line.""" # Extract time range time_match = re.match(r'\[(\d{2}:\d{2}) - (\d{2}:\d{2})\] (Speaker [A-Z]):', line) if not time_match: return None, None start_time = parse_timestamp(time_match.group(1)) end_time = parse_timestamp(time_match.group(2)) speaker = time_match.group(3) return (start_time, end_time), speaker def has_overlap(range1: Tuple[int, int], range2: Tuple[int, int]) -> bool: """Check if two time ranges overlap.""" start1, end1 = range1 start2, end2 = range2 return not (end1 <= start2 or end2 <= start1) def has_same_speaker_overlap(transcript: str) -> bool: """Check if a transcript contains overlapping timestamps for the same speaker.""" lines = transcript.split('\n') # Dictionary to store time ranges for each speaker speaker_ranges = {} for line in lines: if not line.strip(): continue time_range, speaker = extract_time_and_speaker(line) if time_range is None or speaker is None: continue # Check for overlaps with existing ranges of the same speaker if speaker in speaker_ranges: for existing_range in speaker_ranges[speaker]: if has_overlap(time_range, existing_range): return True speaker_ranges[speaker].append(time_range) else: speaker_ranges[speaker] = [time_range] return False def process_file(input_file: str, output_file: str, delete_file: str): """Process the JSON file and separate entries with same-speaker overlapping timestamps.""" with open(input_file, 'r', encoding='utf-8') as f: data = json.load(f) if isinstance(data, dict): data = [data] cleaned_data = [] deleted_data = [] removed_count = 0 for entry in data: if 'model_output' in entry: if not has_same_speaker_overlap(entry['model_output']): cleaned_data.append(entry) else: deleted_data.append(entry) removed_count += 1 print(f"Removing entry with key: {entry.get('key', 'unknown')}") # Save cleaned data with open(output_file, 'w', encoding='utf-8') as f: json.dump(cleaned_data, f, ensure_ascii=False, indent=2) # Save deleted data with open(delete_file, 'w', encoding='utf-8') as f: json.dump(deleted_data, f, ensure_ascii=False, indent=2) print(f"\nProcessing Summary:") print(f"Processed {len(data)} entries") print(f"Removed {removed_count} entries with same-speaker overlapping timestamps") print(f"Remaining entries: {len(cleaned_data)}") if __name__ == '__main__': input_file = 'silence_overlaps/transcriptions.json' output_file = 'silence_overlaps/cleaned_transcriptions2.json' delete_file = 'silence_overlaps/delete_transcript2.json' process_file(input_file, output_file, delete_file) print(f"\nCleaned transcriptions have been saved to {output_file}") print(f"Deleted entries have been saved to {delete_file}")