Invoice Analyzer ML
Modern end-to-end document extraction pipeline showing the evolution of OCR accuracy and structural analysis.
Source Code Repository
pdf_to_image_1.py
PyMuPDF (Fitz)
ocr_extract_1.py
Spatial JSON
ocr_extract_2.py
Markdown Layout
ocr_extract_4.py
Hybrid Production
pdf_to_image_1.py
import fitz # PyMuPDF
import os
def pdf_to_images(pdf_path, output_dir="images", dpi=300):
"""
Converts PDF to images using PyMuPDF (Fitz).
Yields image paths.
"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
doc = fitz.open(pdf_path)
image_paths = []
for page_num in range(len(doc)):
page = doc.load_page(page_num)
# Matrix for scaling (DPI / 72 is the scale factor since default is 72)
zoom = dpi / 72
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat)
img_path = os.path.join(output_dir, f"page_{page_num + 1}.png")
pix.save(img_path)
image_paths.append(img_path)
doc.close()
return image_paths
if __name__ == "__main__":
# Test
pdf_file = "scan-loan.pdf"
if os.path.exists(pdf_file):
imgs = pdf_to_images(pdf_file)
print(f"Generated {len(imgs)} images using PyMuPDF: {imgs}")
else:
print(f"File {pdf_file} not found.")
ocr_extract_1.py
import os
os.environ["PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION"] = "python"
from paddleocr import PaddleOCR
import os
import json
from PIL import Image
import numpy as np
import cv2
import matplotlib.pyplot as plt
# Initialize PaddleOCR
ocr = PaddleOCR(use_angle_cls=True, lang='en')
def load_image(path):
img = Image.open(path)
img = img.convert("RGB")
return np.array(img)
def visualize_ocr(img, blocks):
"""Shows the bounding boxes plotted over the original image."""
plt.figure(figsize=(12, 16))
plt.imshow(img)
for b in blocks:
x1, y1, x2, y2 = b["bbox"]
plt.gca().add_patch(
plt.Rectangle(
(x1, y1),
x2 - x1,
y2 - y1,
fill=False,
edgecolor="red",
linewidth=1
)
)
plt.axis("off")
plt.title("OCR Bounding Boxes (JSON Mode)")
plt.show()
def run_ocr_json(image_path):
"""
Implementation of Recommendation 1: Spatial JSON Payload.
Preserves text and bounding box coordinates for the LLM.
"""
img = load_image(image_path)
result = ocr.ocr(img, cls=True)
ocr_payload = []
if result and result[0]:
for line in result[0]:
bbox = line[0] # [[x1,y1],[x2,y2],[x3,y3],[x4,y4]]
text = line[1][0]
conf = float(line[1][1])
# Simplify bbox to [x_min, y_min, x_max, y_max]
xs = [p[0] for p in bbox]
ys = [p[1] for p in bbox]
ocr_payload.append({
"text": text,
"bbox": [min(xs), min(ys), max(xs), max(ys)],
"confidence": round(conf, 4)
})
return ocr_payload, img
if __name__ == "__main__":
photos_dir = "../images"
all_pages_data = {}
if not os.path.exists(photos_dir):
print(f"Error: {photos_dir} directory not found.")
else:
image_files = sorted([f for f in os.listdir(photos_dir) if f.endswith(".png")])
for img_name in image_files:
print(f"[INFO] Processing {img_name}...")
image_path = os.path.join(photos_dir, img_name)
page_data, img_np = run_ocr_json(image_path)
all_pages_data[img_name] = page_data
# SHOW FIG
visualize_ocr(img_np, page_data)
# Save as JSON for Recommendation 1
output_file = "ocr_spatial_data.json"
with open(output_file, "w", encoding="utf-8") as f:
json.dump(all_pages_data, f, indent=2)
print(f"\nā
Recommendation 1 Complete: Saved spatial data to {output_file}")
print("š” Hint: Pass this JSON directly to your LLM payload.")
ocr_extract_2.py
from paddleocr import PaddleOCR
import os
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
# Initialize PaddleOCR
ocr = PaddleOCR(use_angle_cls=True, lang='en')
def load_image(path):
img = Image.open(path)
img = img.convert("RGB")
return np.array(img)
def visualize_ocr(img, blocks):
"""Shows the bounding boxes plotted over the original image."""
plt.figure(figsize=(12, 16))
plt.imshow(img)
for b in blocks:
x1, y1 = b["x_min"], b["y_min"]
x2, y2 = b["x_max"], b["y_max"]
plt.gca().add_patch(
plt.Rectangle(
(x1, y1),
x2 - x1,
y2 - y1,
fill=False,
edgecolor="green",
linewidth=1
)
)
plt.axis("off")
plt.title("OCR Bounding Boxes (Markdown Mode)")
plt.show()
def group_into_lines(ocr_data, row_threshold=15):
"""
Groups OCR blocks into virtual 'lines' based on their Y-coordinate.
Implementation of Recommendation 2: Markdown/Layout Reconstruction.
"""
ocr_data = sorted(ocr_data, key=lambda x: x["y_min"])
lines = []
for item in ocr_data:
placed = False
item_y = item["y_min"]
for line in lines:
line_y = line[0]["y_min"]
if abs(item_y - line_y) < row_threshold:
line.append(item)
placed = True
break
if not placed:
lines.append([item])
reconstructed_text = []
for line in lines:
row_sorted = sorted(line, key=lambda x: x["x_min"])
row_content = " | ".join(c["text"] for c in row_sorted)
reconstructed_text.append(f"| {row_content} |")
return "\n".join(reconstructed_text)
def run_ocr_markdown(image_path):
img = load_image(image_path)
result = ocr.ocr(img, cls=True)
ocr_blocks = []
if result and result[0]:
for line in result[0]:
bbox = line[0]
text = line[1][0]
xs = [p[0] for p in bbox]
ys = [p[1] for p in bbox]
ocr_blocks.append({
"text": text,
"x_min": min(xs),
"y_min": min(ys),
"x_max": max(xs),
"y_max": max(ys)
})
markdown_layout = group_into_lines(ocr_blocks)
return markdown_layout, ocr_blocks, img
if __name__ == "__main__":
photos_dir = "Photos"
all_pages_markdown = []
if not os.path.exists(photos_dir):
print(f"Error: {photos_dir} directory not found.")
else:
image_files = sorted([f for f in os.listdir(photos_dir) if f.endswith(".png")])
for img_name in image_files:
print(f"[INFO] Reconstructing layout for {img_name}...")
image_path = os.path.join(photos_dir, img_name)
reconstruction, blocks, img_np = run_ocr_markdown(image_path)
# SHOW FIG
visualize_ocr(img_np, blocks)
page_header = f"### PAGE: {img_name}\n"
all_pages_markdown.append(page_header + reconstruction)
# Output to file
output_file = "ocr_layout_reconstructed.md"
with open(output_file, "w", encoding="utf-8") as f:
f.write("\n\n---\n\n".join(all_pages_markdown))
print(f"\nā
Recommendation 2 Complete: Reconstructed layout saved to {output_file}")
print("š” Hint: This Markdown allows the LLM to 'see' the table structure.")
ocr_extract_4.py
from paddleocr import PPStructure, PaddleOCR
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import json
# Initialize BOTH engines for a Hybrid Approach
# 1. Standard OCR (Best for finding every word, including the name)
ocr_standard = PaddleOCR(use_angle_cls=True, lang='en', show_log=False)
# 2. Structural Engine (Best for tables and headers)
layout_engine = PPStructure(table=True, ocr=True, layout=True, show_log=False)
def load_image(path):
img = cv2.imread(path)
return cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
def detect_checkboxes_opencv(img_gray, all_text_bboxes):
"""
Fixed detector: Uses a combined mask from BOTH engines.
"""
mask = np.zeros_like(img_gray)
for bbox in all_text_bboxes:
x1, y1, x2, y2 = [int(v) for v in bbox]
cv2.rectangle(mask, (x1-5, y1-5), (x2+5, y2+5), 255, -1)
_, binary = cv2.threshold(img_gray, 200, 255, cv2.THRESH_BINARY_INV)
binary[mask == 255] = 0 # Wipe out all detected text before finding checkboxes
contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
checkboxes = []
for cnt in contours:
x, y, w, h = cv2.boundingRect(cnt)
aspect_ratio = float(w)/h
if 16 < w < 40 and 16 < h < 40 and 0.8 < aspect_ratio < 1.2:
roi = binary[y:y+h, x:x+w]
density = cv2.countNonZero(roi) / (w * h)
checkboxes.append({
"type": "checkbox",
"bbox": [float(x), float(y), float(x+w), float(y+h)],
"is_checked": bool(density > 0.25)
})
return checkboxes
def run_production_ocr(image_path):
img = load_image(image_path)
img_gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
# STEP 1: Run Standard OCR to ensure we get the Name (and all other text)
standard_res = ocr_standard.ocr(img)
# STEP 2: Run Layout Engine for Tables
structure_res = layout_engine(img)
final_blocks = []
all_text_bboxes = []
# Process Standard OCR results first (High reliability for text)
if standard_res and standard_res[0]:
for line in standard_res[0]:
bbox_raw = line[0] # [[x1,y1],[x2,y2],[x3,y3],[x4,y4]]
text = line[1][0]
conf = float(line[1][1])
xs, ys = [p[0] for p in bbox_raw], [p[1] for p in bbox_raw]
bbox = [min(xs), min(ys), max(xs), max(ys)]
final_blocks.append({
"type": "text",
"text": text,
"confidence": conf,
"bbox": bbox
})
all_text_bboxes.append(bbox)
# Process Layout Engine for Tables only (Structure)
for block in structure_res:
if block["type"] == "table":
bbox = [float(c) for c in block["bbox"]]
final_blocks.append({
"type": "table",
"bbox": bbox,
"html": block["res"].get("html", "")
})
all_text_bboxes.append(bbox)
# STEP 3: Detect Checkboxes (using the mask from all detected text)
checkboxes = detect_checkboxes_opencv(img_gray, all_text_bboxes)
final_blocks.extend(checkboxes)
return final_blocks, img
def visualize_production(img, blocks):
plt.figure(figsize=(12, 16))
plt.imshow(img)
for b in blocks:
if b["type"] == "checkbox":
color = "blue" if b["is_checked"] else "gray"
elif b["type"] == "table":
color = "red"
else:
color = "orange" if b.get("manual_review") else "green"
bbox = b["bbox"]
if len(bbox) == 4:
x1, y1, x2, y2 = bbox
else: # flattened polygon
x1, y1, x2, y2 = bbox[0], bbox[1], bbox[4], bbox[5]
plt.gca().add_patch(plt.Rectangle((x1, y1), x2-x1, y2-y1, fill=False, edgecolor=color, linewidth=1))
plt.axis("off")
plt.title("Production Pipeline (Red=Table, Green=Text, Blue=Checked, Gray=Empty, Orange=Review)")
plt.show()
if __name__ == "__main__":
photos_dir = "Photos"
final_output = {}
if not os.path.exists(photos_dir):
print(f"Error: {photos_dir} directory not found.")
else:
# Sort and process page-by-page
image_files = sorted([f for f in os.listdir(photos_dir) if f.endswith(".png")])
for img_name in image_files:
print(f"[INFO] Running Structural Analysis on {img_name}...")
image_path = os.path.join(photos_dir, img_name)
blocks, img_np = run_production_ocr(image_path)
final_output[img_name] = blocks
# Interactive visualization
visualize_production(img_np, blocks)
# High-Fidelity JSON Output
output_file = "ocr_production_data.json"
with open(output_file, "w", encoding="utf-8") as f:
json.dump(final_output, f, indent=2)
print(f"\nā
Production Pipeline Complete: Saved complex structural data to {output_file}")