Project Files
src / ingestion / extractors.py
"""
ingestion/extractors.py — Text extraction for every supported file format.
Supported:
PDF (.pdf) — pypdf with OCR fallback hint
Markdown (.md/.markdown) — python-frontmatter for YAML front-matter stripping
Plain text (.txt)
HTML (.html/.htm) — BeautifulSoup (optional, graceful fallback)
DOCX (.docx) — python-docx (optional, graceful fallback)
EPUB (.epub) — ebooklib (optional, graceful fallback)
CSV (.csv) — joins rows into readable text
JSON (.json) — pretty-prints for indexing
"""
from __future__ import annotations
import csv
import io
import json
from pathlib import Path
from src.utils.logging import get_logger
log = get_logger("ingestion.extractors")
SUPPORTED_EXTENSIONS: frozenset[str] = frozenset({
".pdf",
".md", ".markdown",
".txt",
".html", ".htm",
".docx",
".epub",
".csv",
".json",
})
# ── Optional imports ──────────────────────────────────────────────────────────
try:
from pypdf import PdfReader
_PYPDF = True
except ImportError:
_PYPDF = False
log.warning("pypdf not installed — PDF extraction disabled.")
try:
import frontmatter as _fm
_FRONTMATTER = True
except ImportError:
_FRONTMATTER = False
try:
from bs4 import BeautifulSoup
_BS4 = True
except ImportError:
_BS4 = False
try:
import docx as _docx
_DOCX = True
except ImportError:
_DOCX = False
try:
import ebooklib
from ebooklib import epub as _epub
_EPUB = True
except ImportError:
_EPUB = False
# ─────────────────────────────────────────────────────────────────────────────
# Dispatcher
# ─────────────────────────────────────────────────────────────────────────────
def extract_text(file: Path) -> str:
"""
Dispatch to the correct extractor based on file suffix.
Returns an empty string on failure (never raises).
"""
ext = file.suffix.lower()
try:
if ext == ".pdf":
return _pdf(file)
if ext in {".md", ".markdown"}:
return _markdown(file)
if ext in {".html", ".htm"}:
return _html(file)
if ext == ".docx":
return _docx_text(file)
if ext == ".epub":
return _epub_text(file)
if ext == ".csv":
return _csv(file)
if ext == ".json":
return _json(file)
# .txt and anything else
return file.read_text(encoding="utf-8", errors="ignore")
except Exception as exc:
log.error("Failed to extract %s: %s", file.name, exc)
return ""
# ─────────────────────────────────────────────────────────────────────────────
# Per-format extractors
# ─────────────────────────────────────────────────────────────────────────────
def _pdf(file: Path) -> str:
if not _PYPDF:
log.warning("Skipping %s — pypdf not installed.", file.name)
return ""
reader = PdfReader(str(file))
pages = []
for page in reader.pages:
text = page.extract_text()
if text:
pages.append(text)
return "\n\n".join(pages)
def _markdown(file: Path) -> str:
raw = file.read_text(encoding="utf-8", errors="ignore")
if _FRONTMATTER:
try:
post = _fm.loads(raw)
return post.content
except Exception:
pass
# Fallback: strip leading YAML front matter manually
lines = raw.splitlines()
if len(lines) >= 3 and lines[0].strip() == "---":
for idx in range(1, len(lines)):
if lines[idx].strip() == "---":
return "\n".join(lines[idx + 1:])
return raw
def _html(file: Path) -> str:
raw = file.read_text(encoding="utf-8", errors="ignore")
if _BS4:
soup = BeautifulSoup(raw, "html.parser")
# Remove script / style tags
for tag in soup(["script", "style", "nav", "footer", "header"]):
tag.decompose()
return soup.get_text(separator="\n", strip=True)
# Dumb fallback: strip tags with regex
import re
return re.sub(r"<[^>]+>", " ", raw)
def _docx_text(file: Path) -> str:
if not _DOCX:
log.warning("Skipping %s — python-docx not installed.", file.name)
return ""
doc = _docx.Document(str(file))
return "\n\n".join(p.text for p in doc.paragraphs if p.text.strip())
def _epub_text(file: Path) -> str:
if not _EPUB:
log.warning("Skipping %s — ebooklib not installed.", file.name)
return ""
book = _epub.read_epub(str(file))
parts = []
for item in book.get_items_of_type(ebooklib.ITEM_DOCUMENT):
raw = item.get_content().decode("utf-8", errors="ignore")
if _BS4:
soup = BeautifulSoup(raw, "html.parser")
parts.append(soup.get_text(separator="\n", strip=True))
else:
import re
parts.append(re.sub(r"<[^>]+>", " ", raw))
return "\n\n".join(parts)
def _csv(file: Path) -> str:
raw = file.read_text(encoding="utf-8", errors="ignore")
reader = csv.DictReader(io.StringIO(raw))
rows = []
for row in reader:
rows.append(" | ".join(f"{k}: {v}" for k, v in row.items() if v))
return "\n".join(rows)
def _json(file: Path) -> str:
raw = file.read_text(encoding="utf-8", errors="ignore")
try:
data = json.loads(raw)
return json.dumps(data, indent=2, ensure_ascii=False)
except json.JSONDecodeError:
return raw