|
|
import json |
|
|
import re |
|
|
from typing import List, Dict, Tuple |
|
|
|
|
|
def parse_timestamp(timestamp: str) -> Tuple[int, int]: |
|
|
"""Convert timestamp string like '00:15' to seconds.""" |
|
|
minutes, seconds = map(int, timestamp.split(':')) |
|
|
return minutes * 60 + seconds |
|
|
|
|
|
def extract_time_and_speaker(line: str) -> Tuple[Tuple[int, int], str]: |
|
|
"""Extract time range and speaker from a line.""" |
|
|
|
|
|
time_match = re.match(r'\[(\d{2}:\d{2}) - (\d{2}:\d{2})\] (Speaker [A-Z]):', line) |
|
|
if not time_match: |
|
|
return None, None |
|
|
|
|
|
start_time = parse_timestamp(time_match.group(1)) |
|
|
end_time = parse_timestamp(time_match.group(2)) |
|
|
speaker = time_match.group(3) |
|
|
|
|
|
return (start_time, end_time), speaker |
|
|
|
|
|
def has_overlap(range1: Tuple[int, int], range2: Tuple[int, int]) -> bool: |
|
|
"""Check if two time ranges overlap.""" |
|
|
start1, end1 = range1 |
|
|
start2, end2 = range2 |
|
|
return not (end1 <= start2 or end2 <= start1) |
|
|
|
|
|
def has_same_speaker_overlap(transcript: str) -> bool: |
|
|
"""Check if a transcript contains overlapping timestamps for the same speaker.""" |
|
|
lines = transcript.split('\n') |
|
|
|
|
|
speaker_ranges = {} |
|
|
|
|
|
for line in lines: |
|
|
if not line.strip(): |
|
|
continue |
|
|
|
|
|
time_range, speaker = extract_time_and_speaker(line) |
|
|
if time_range is None or speaker is None: |
|
|
continue |
|
|
|
|
|
|
|
|
if speaker in speaker_ranges: |
|
|
for existing_range in speaker_ranges[speaker]: |
|
|
if has_overlap(time_range, existing_range): |
|
|
return True |
|
|
|
|
|
speaker_ranges[speaker].append(time_range) |
|
|
else: |
|
|
speaker_ranges[speaker] = [time_range] |
|
|
|
|
|
return False |
|
|
|
|
|
def process_file(input_file: str, output_file: str, delete_file: str): |
|
|
"""Process the JSON file and separate entries with same-speaker overlapping timestamps.""" |
|
|
with open(input_file, 'r', encoding='utf-8') as f: |
|
|
data = json.load(f) |
|
|
|
|
|
if isinstance(data, dict): |
|
|
data = [data] |
|
|
|
|
|
cleaned_data = [] |
|
|
deleted_data = [] |
|
|
removed_count = 0 |
|
|
|
|
|
for entry in data: |
|
|
if 'model_output' in entry: |
|
|
if not has_same_speaker_overlap(entry['model_output']): |
|
|
cleaned_data.append(entry) |
|
|
else: |
|
|
deleted_data.append(entry) |
|
|
removed_count += 1 |
|
|
print(f"Removing entry with key: {entry.get('key', 'unknown')}") |
|
|
|
|
|
|
|
|
with open(output_file, 'w', encoding='utf-8') as f: |
|
|
json.dump(cleaned_data, f, ensure_ascii=False, indent=2) |
|
|
|
|
|
|
|
|
with open(delete_file, 'w', encoding='utf-8') as f: |
|
|
json.dump(deleted_data, f, ensure_ascii=False, indent=2) |
|
|
|
|
|
print(f"\nProcessing Summary:") |
|
|
print(f"Processed {len(data)} entries") |
|
|
print(f"Removed {removed_count} entries with same-speaker overlapping timestamps") |
|
|
print(f"Remaining entries: {len(cleaned_data)}") |
|
|
|
|
|
if __name__ == '__main__': |
|
|
input_file = 'silence_overlaps/transcriptions.json' |
|
|
output_file = 'silence_overlaps/cleaned_transcriptions2.json' |
|
|
delete_file = 'silence_overlaps/delete_transcript2.json' |
|
|
process_file(input_file, output_file, delete_file) |
|
|
print(f"\nCleaned transcriptions have been saved to {output_file}") |
|
|
print(f"Deleted entries have been saved to {delete_file}") |