Invoice Analyzer ML

Modern end-to-end document extraction pipeline showing the evolution of OCR accuracy and structural analysis.

Source Code Repository

pdf_to_image_1.py

PyMuPDF (Fitz)

ocr_extract_1.py

Spatial JSON

ocr_extract_2.py

Markdown Layout

ocr_extract_4.py

Hybrid Production

pdf_to_image_1.py
import fitz  # PyMuPDF
import os

def pdf_to_images(pdf_path, output_dir="images", dpi=300):
    """
    Converts PDF to images using PyMuPDF (Fitz).
    Yields image paths.
    """
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    doc = fitz.open(pdf_path)
    image_paths = []

    for page_num in range(len(doc)):
        page = doc.load_page(page_num)
        
        # Matrix for scaling (DPI / 72 is the scale factor since default is 72)
        zoom = dpi / 72 
        mat = fitz.Matrix(zoom, zoom)
        
        pix = page.get_pixmap(matrix=mat)
        img_path = os.path.join(output_dir, f"page_{page_num + 1}.png")
        pix.save(img_path)
        image_paths.append(img_path)

    doc.close()
    return image_paths

if __name__ == "__main__":
    # Test
    pdf_file = "scan-loan.pdf"
    if os.path.exists(pdf_file):
        imgs = pdf_to_images(pdf_file)
        print(f"Generated {len(imgs)} images using PyMuPDF: {imgs}")
    else:
        print(f"File {pdf_file} not found.")
ocr_extract_1.py
import os
os.environ["PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION"] = "python"
from paddleocr import PaddleOCR
import os
import json
from PIL import Image
import numpy as np
import cv2
import matplotlib.pyplot as plt

# Initialize PaddleOCR
ocr = PaddleOCR(use_angle_cls=True, lang='en')

def load_image(path):
    img = Image.open(path)
    img = img.convert("RGB")
    return np.array(img)

def visualize_ocr(img, blocks):
    """Shows the bounding boxes plotted over the original image."""
    plt.figure(figsize=(12, 16))
    plt.imshow(img)
    
    for b in blocks:
        x1, y1, x2, y2 = b["bbox"]
        
        plt.gca().add_patch(
            plt.Rectangle(
                (x1, y1),
                x2 - x1,
                y2 - y1,
                fill=False,
                edgecolor="red",
                linewidth=1
            )
        )
    
    plt.axis("off")
    plt.title("OCR Bounding Boxes (JSON Mode)")
    plt.show()

def run_ocr_json(image_path):
    """
    Implementation of Recommendation 1: Spatial JSON Payload.
    Preserves text and bounding box coordinates for the LLM.
    """
    img = load_image(image_path)
    result = ocr.ocr(img, cls=True)

    ocr_payload = []

    if result and result[0]:
        for line in result[0]:
            bbox = line[0]              # [[x1,y1],[x2,y2],[x3,y3],[x4,y4]]
            text = line[1][0]
            conf = float(line[1][1])

            # Simplify bbox to [x_min, y_min, x_max, y_max]
            xs = [p[0] for p in bbox]
            ys = [p[1] for p in bbox]
            
            ocr_payload.append({
                "text": text,
                "bbox": [min(xs), min(ys), max(xs), max(ys)],
                "confidence": round(conf, 4)
            })

    return ocr_payload, img

if __name__ == "__main__":
    photos_dir = "../images"
    all_pages_data = {}

    if not os.path.exists(photos_dir):
        print(f"Error: {photos_dir} directory not found.")
    else:
        image_files = sorted([f for f in os.listdir(photos_dir) if f.endswith(".png")])
        
        for img_name in image_files:
            print(f"[INFO] Processing {img_name}...")
            image_path = os.path.join(photos_dir, img_name)
            
            page_data, img_np = run_ocr_json(image_path)
            all_pages_data[img_name] = page_data
            
            # SHOW FIG
            visualize_ocr(img_np, page_data)

        # Save as JSON for Recommendation 1
        output_file = "ocr_spatial_data.json"
        with open(output_file, "w", encoding="utf-8") as f:
            json.dump(all_pages_data, f, indent=2)
        
        print(f"\nāœ… Recommendation 1 Complete: Saved spatial data to {output_file}")
        print("šŸ’” Hint: Pass this JSON directly to your LLM payload.")
ocr_extract_2.py
from paddleocr import PaddleOCR
import os
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt

# Initialize PaddleOCR
ocr = PaddleOCR(use_angle_cls=True, lang='en')

def load_image(path):
    img = Image.open(path)
    img = img.convert("RGB")
    return np.array(img)

def visualize_ocr(img, blocks):
    """Shows the bounding boxes plotted over the original image."""
    plt.figure(figsize=(12, 16))
    plt.imshow(img)
    
    for b in blocks:
        x1, y1 = b["x_min"], b["y_min"]
        x2, y2 = b["x_max"], b["y_max"]
        
        plt.gca().add_patch(
            plt.Rectangle(
                (x1, y1),
                x2 - x1,
                y2 - y1,
                fill=False,
                edgecolor="green",
                linewidth=1
            )
        )
    
    plt.axis("off")
    plt.title("OCR Bounding Boxes (Markdown Mode)")
    plt.show()

def group_into_lines(ocr_data, row_threshold=15):
    """
    Groups OCR blocks into virtual 'lines' based on their Y-coordinate.
    Implementation of Recommendation 2: Markdown/Layout Reconstruction.
    """
    ocr_data = sorted(ocr_data, key=lambda x: x["y_min"])
    
    lines = []
    for item in ocr_data:
        placed = False
        item_y = item["y_min"]
        
        for line in lines:
            line_y = line[0]["y_min"]
            if abs(item_y - line_y) < row_threshold:
                line.append(item)
                placed = True
                break
        
        if not placed:
            lines.append([item])
    
    reconstructed_text = []
    for line in lines:
        row_sorted = sorted(line, key=lambda x: x["x_min"])
        row_content = "   |   ".join(c["text"] for c in row_sorted)
        reconstructed_text.append(f"| {row_content} |")

    return "\n".join(reconstructed_text)

def run_ocr_markdown(image_path):
    img = load_image(image_path)
    result = ocr.ocr(img, cls=True)

    ocr_blocks = []
    if result and result[0]:
        for line in result[0]:
            bbox = line[0]
            text = line[1][0]
            
            xs = [p[0] for p in bbox]
            ys = [p[1] for p in bbox]
            
            ocr_blocks.append({
                "text": text,
                "x_min": min(xs),
                "y_min": min(ys),
                "x_max": max(xs),
                "y_max": max(ys)
            })

    markdown_layout = group_into_lines(ocr_blocks)
    return markdown_layout, ocr_blocks, img

if __name__ == "__main__":
    photos_dir = "Photos"
    all_pages_markdown = []

    if not os.path.exists(photos_dir):
        print(f"Error: {photos_dir} directory not found.")
    else:
        image_files = sorted([f for f in os.listdir(photos_dir) if f.endswith(".png")])
        
        for img_name in image_files:
            print(f"[INFO] Reconstructing layout for {img_name}...")
            image_path = os.path.join(photos_dir, img_name)
            
            reconstruction, blocks, img_np = run_ocr_markdown(image_path)
            
            # SHOW FIG
            visualize_ocr(img_np, blocks)
            
            page_header = f"### PAGE: {img_name}\n"
            all_pages_markdown.append(page_header + reconstruction)

        # Output to file
        output_file = "ocr_layout_reconstructed.md"
        with open(output_file, "w", encoding="utf-8") as f:
            f.write("\n\n---\n\n".join(all_pages_markdown))
        
        print(f"\nāœ… Recommendation 2 Complete: Reconstructed layout saved to {output_file}")
        print("šŸ’” Hint: This Markdown allows the LLM to 'see' the table structure.")
ocr_extract_4.py
from paddleocr import PPStructure, PaddleOCR
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import json

# Initialize BOTH engines for a Hybrid Approach
# 1. Standard OCR (Best for finding every word, including the name)
ocr_standard = PaddleOCR(use_angle_cls=True, lang='en', show_log=False)
# 2. Structural Engine (Best for tables and headers)
layout_engine = PPStructure(table=True, ocr=True, layout=True, show_log=False)

def load_image(path):
    img = cv2.imread(path)
    return cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

def detect_checkboxes_opencv(img_gray, all_text_bboxes):
    """
    Fixed detector: Uses a combined mask from BOTH engines.
    """
    mask = np.zeros_like(img_gray)
    for bbox in all_text_bboxes:
        x1, y1, x2, y2 = [int(v) for v in bbox]
        cv2.rectangle(mask, (x1-5, y1-5), (x2+5, y2+5), 255, -1)

    _, binary = cv2.threshold(img_gray, 200, 255, cv2.THRESH_BINARY_INV)
    binary[mask == 255] = 0 # Wipe out all detected text before finding checkboxes

    contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    
    checkboxes = []
    for cnt in contours:
        x, y, w, h = cv2.boundingRect(cnt)
        aspect_ratio = float(w)/h
        if 16 < w < 40 and 16 < h < 40 and 0.8 < aspect_ratio < 1.2:
            roi = binary[y:y+h, x:x+w]
            density = cv2.countNonZero(roi) / (w * h)
            checkboxes.append({
                "type": "checkbox",
                "bbox": [float(x), float(y), float(x+w), float(y+h)],
                "is_checked": bool(density > 0.25)
            })
    return checkboxes

def run_production_ocr(image_path):
    img = load_image(image_path)
    img_gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    
    # STEP 1: Run Standard OCR to ensure we get the Name (and all other text)
    standard_res = ocr_standard.ocr(img)
    
    # STEP 2: Run Layout Engine for Tables
    structure_res = layout_engine(img)
    
    final_blocks = []
    all_text_bboxes = []

    # Process Standard OCR results first (High reliability for text)
    if standard_res and standard_res[0]:
        for line in standard_res[0]:
            bbox_raw = line[0] # [[x1,y1],[x2,y2],[x3,y3],[x4,y4]]
            text = line[1][0]
            conf = float(line[1][1])
            xs, ys = [p[0] for p in bbox_raw], [p[1] for p in bbox_raw]
            bbox = [min(xs), min(ys), max(xs), max(ys)]
            
            final_blocks.append({
                "type": "text",
                "text": text,
                "confidence": conf,
                "bbox": bbox
            })
            all_text_bboxes.append(bbox)

    # Process Layout Engine for Tables only (Structure)
    for block in structure_res:
        if block["type"] == "table":
            bbox = [float(c) for c in block["bbox"]]
            final_blocks.append({
                "type": "table",
                "bbox": bbox,
                "html": block["res"].get("html", "")
            })
            all_text_bboxes.append(bbox)
                
    # STEP 3: Detect Checkboxes (using the mask from all detected text)
    checkboxes = detect_checkboxes_opencv(img_gray, all_text_bboxes)
    final_blocks.extend(checkboxes)
    
    return final_blocks, img

def visualize_production(img, blocks):
    plt.figure(figsize=(12, 16))
    plt.imshow(img)
    
    for b in blocks:
        if b["type"] == "checkbox":
            color = "blue" if b["is_checked"] else "gray"
        elif b["type"] == "table":
            color = "red"
        else:
            color = "orange" if b.get("manual_review") else "green"
            
        bbox = b["bbox"]
        if len(bbox) == 4:
            x1, y1, x2, y2 = bbox
        else: # flattened polygon
            x1, y1, x2, y2 = bbox[0], bbox[1], bbox[4], bbox[5]

        plt.gca().add_patch(plt.Rectangle((x1, y1), x2-x1, y2-y1, fill=False, edgecolor=color, linewidth=1))

    plt.axis("off")
    plt.title("Production Pipeline (Red=Table, Green=Text, Blue=Checked, Gray=Empty, Orange=Review)")
    plt.show()

if __name__ == "__main__":
    photos_dir = "Photos"
    final_output = {}

    if not os.path.exists(photos_dir):
        print(f"Error: {photos_dir} directory not found.")
    else:
        # Sort and process page-by-page
        image_files = sorted([f for f in os.listdir(photos_dir) if f.endswith(".png")])
        
        for img_name in image_files:
            print(f"[INFO] Running Structural Analysis on {img_name}...")
            image_path = os.path.join(photos_dir, img_name)
            
            blocks, img_np = run_production_ocr(image_path)
            final_output[img_name] = blocks
            
            # Interactive visualization
            visualize_production(img_np, blocks)

        # High-Fidelity JSON Output
        output_file = "ocr_production_data.json"
        with open(output_file, "w", encoding="utf-8") as f:
            json.dump(final_output, f, indent=2)
        
        print(f"\nāœ… Production Pipeline Complete: Saved complex structural data to {output_file}")