interactSpeech / clean_transcripts.py
Student0809's picture
Add files using upload-large-folder tool
3b47bbc verified
import json
import re
from typing import List, Dict, Tuple
def parse_timestamp(timestamp: str) -> Tuple[int, int]:
"""Convert timestamp string like '00:15' to seconds."""
minutes, seconds = map(int, timestamp.split(':'))
return minutes * 60 + seconds
def extract_time_and_speaker(line: str) -> Tuple[Tuple[int, int], str]:
"""Extract time range and speaker from a line."""
# Extract time range
time_match = re.match(r'\[(\d{2}:\d{2}) - (\d{2}:\d{2})\] (Speaker [A-Z]):', line)
if not time_match:
return None, None
start_time = parse_timestamp(time_match.group(1))
end_time = parse_timestamp(time_match.group(2))
speaker = time_match.group(3)
return (start_time, end_time), speaker
def has_overlap(range1: Tuple[int, int], range2: Tuple[int, int]) -> bool:
"""Check if two time ranges overlap."""
start1, end1 = range1
start2, end2 = range2
return not (end1 <= start2 or end2 <= start1)
def has_same_speaker_overlap(transcript: str) -> bool:
"""Check if a transcript contains overlapping timestamps for the same speaker."""
lines = transcript.split('\n')
# Dictionary to store time ranges for each speaker
speaker_ranges = {}
for line in lines:
if not line.strip():
continue
time_range, speaker = extract_time_and_speaker(line)
if time_range is None or speaker is None:
continue
# Check for overlaps with existing ranges of the same speaker
if speaker in speaker_ranges:
for existing_range in speaker_ranges[speaker]:
if has_overlap(time_range, existing_range):
return True
speaker_ranges[speaker].append(time_range)
else:
speaker_ranges[speaker] = [time_range]
return False
def process_file(input_file: str, output_file: str, delete_file: str):
"""Process the JSON file and separate entries with same-speaker overlapping timestamps."""
with open(input_file, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, dict):
data = [data]
cleaned_data = []
deleted_data = []
removed_count = 0
for entry in data:
if 'model_output' in entry:
if not has_same_speaker_overlap(entry['model_output']):
cleaned_data.append(entry)
else:
deleted_data.append(entry)
removed_count += 1
print(f"Removing entry with key: {entry.get('key', 'unknown')}")
# Save cleaned data
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(cleaned_data, f, ensure_ascii=False, indent=2)
# Save deleted data
with open(delete_file, 'w', encoding='utf-8') as f:
json.dump(deleted_data, f, ensure_ascii=False, indent=2)
print(f"\nProcessing Summary:")
print(f"Processed {len(data)} entries")
print(f"Removed {removed_count} entries with same-speaker overlapping timestamps")
print(f"Remaining entries: {len(cleaned_data)}")
if __name__ == '__main__':
input_file = 'silence_overlaps/transcriptions.json'
output_file = 'silence_overlaps/cleaned_transcriptions2.json'
delete_file = 'silence_overlaps/delete_transcript2.json'
process_file(input_file, output_file, delete_file)
print(f"\nCleaned transcriptions have been saved to {output_file}")
print(f"Deleted entries have been saved to {delete_file}")