import hashlib import os from collections import defaultdict from dataclasses import dataclass from pathlib import Path, PurePath from typing import List, Union import cv2 import gradio as gr import img2pdf import pandas as pd import pytesseract from pdf2image import convert_from_path cache = {} @dataclass class MetaData: """ Class to store metadata for each processed pdf file. """ image_path: str = None xlsx_path: str = None cache = defaultdict(MetaData) def get_latest_file(directory, pattern="*"): files = list(Path(directory).glob(pattern)) if not files: return None latest_file = max(files, key=os.path.getmtime) return latest_file.absolute() def convert_images_to_pdf(image_paths: str, output_pdf_path: str): """ Convert images to PDF using img2pdf for better quality preservation. Args: image_paths (list): List of paths to image files output_pdf_path (str): Path where the output PDF will be saved """ # Check if the list is empty if not image_paths: print("No images provided!") return # Convert images to PDF with open(output_pdf_path, "wb") as f: f.write( img2pdf.convert( [ i for i in image_paths if i.lower().endswith((".png", ".jpg", ".jpeg", ".tiff", ".bmp")) ] ) ) def hash_file(filepath) -> str: """Generate a hash for the file at the given path.""" hasher = hashlib.md5() with open(filepath, "rb") as f: while chunk := f.read(8192): hasher.update(chunk) return hasher.hexdigest() def convert_txt(pdf_paths: List[str], is_save_image: bool = False): """ Convert a list of PDF paths to a list of image paths. :param pdf_paths: List of PDF paths :param is_save_image: Whether to save the images or not """ text_filepaths = [] for pdf_path in pdf_paths: pdf_path = Path(pdf_path) suffix = PurePath(pdf_path).stem df = pd.DataFrame(columns=["text", "page"]) # Generate a hash for the PDF file, store it in cache and check if it exists pdf_hash = hash_file(pdf_path) print(f"Hash for {pdf_path}: {pdf_hash}") if pdf_hash in cache.keys(): print(f"Skipping {pdf_path}, already processed.") continue else: cache[pdf_hash] = MetaData() # Convert PDF to images images = convert_from_path(pdf_path) image_folder_path = Path(f"./tmp/{suffix}") text_folder_path = Path(f"./text/{suffix}") cache[pdf_hash].image_path = str(image_folder_path.absolute()) if not text_folder_path.exists(): os.makedirs(text_folder_path, exist_ok=True) if not image_folder_path.exists(): os.makedirs(image_folder_path, exist_ok=True) for i, image in enumerate(images): image_path = image_folder_path / f"{suffix}_{i + 1}.jpg" image.save(image_path) if i > 1: image = cv2.imread(str(image_path)) image = cv2.rotate(image, cv2.ROTATE_90_COUNTERCLOCKWISE) text_filepath = text_folder_path / f"{suffix}_{i + 1}.txt" txt = pytesseract.image_to_string( image, lang="vie", config="--oem 1 --psm 6" ) text_filepath.write_text(txt, encoding="utf-8") text_filepaths.append(str(text_filepath.absolute())) df = pd.concat([df, pd.DataFrame({"text": [txt], "page": [i + 1]})]) # del images return text_filepaths # # if not is_save_image: # # os.remove(str(image_folder_path.absolute())) # # excel_path = Path(f"./excel/{suffix}.xlsx") # if not excel_path.exists(): # os.makedirs(excel_path.parent, exist_ok=True) # # cache[pdf_hash].xlsx_path = str(excel_path.absolute()) # print(f"Saving {pdf_path} to {excel_path}") # df.to_excel(str(excel_path), index=False) def filter_by_keyword(keywords: Union[str, List[str]], hash_id: str = ""): """ Filter the text in the Excel file by keyword. :param keyword: Keyword to filter by """ if isinstance(keywords, str): keywords = [keywords] page_id_folder = Path("./page_id") if not os.path.exists(page_id_folder): os.makedirs(page_id_folder, exist_ok=True) if hash_id != "": excel_path = get_latest_file(Path("./excel"), pattern="*.xlsx") else: excel_path = cache[hash_id].xlsx_path print(f"-------Excel path --------- {excel_path}") df = pd.read_excel(str(excel_path.absolute())) page_id_path = page_id_folder / f"{excel_path.stem}.txt" with (page_id_path).open("w+") as f: for k in keywords: f.write(f"\n{k}\n") for _, row in df.iterrows(): text = row["text"] if isinstance(text, str) and k.lower() in text.lower(): f.write(f"{row['page']}\n") content = page_id_path.read_text() return content def gradio_interface(file, keyword=None): """ Gradio interface for the PDF processing and filtering. :param file: Uploaded PDF file :return: Path to the filtered text file """ pdf_path = file.name hash_id = hash_file(pdf_path) if hash_id in cache.items(): print(f"Skipping {pdf_path}, already processed.") else: filepaths = convert_txt([pdf_path]) if keyword: content = filter_by_keyword(keyword, hash_id) return filepaths # return content if __name__ == "__main__": os.system( "apt-get update && apt-get install -y poppler-utils tesseract-ocr tesseract-ocr-vie" ) os.system("pip install -q pytesseract openpyxl") demo = gr.Interface( fn=gradio_interface, inputs=[ gr.File(label="Upload PDF"), gr.Textbox(label="Keyword"), ], # outputs=gr.Textbox(label="Filtered Text"), outputs=gr.Files(label="Filtered Text File"), title="PDF Keyword Filter", description="Upload a PDF file and enter a keyword to filter the text.", ) demo.launch(debug=True)