Apps: - dwg-rooms: extract room numbers from DWG/DXF - dwg-counting: count symbols in PDF drawings (OpenCV template matching) - contract-check: review PDF contracts against a checklist (Claude vision + Tesseract OCR fallback) - email-drafter: bullet notes → polished Czech/English business emails - invoice-extractor: PDF/image invoice → structured data → Excel - translator: Czech-first translator across 19 languages with tone control - vv-check: find inconsistent unit prices across VV sheets in one workbook - vv-compare: diff original vs new VV files (changes / added / removed) - feature-request: portal users submit ideas + sample files Infrastructure: - LiteLLM gateway with per-app virtual keys + budgets - Langfuse observability - Geist font, shared theme, cross-subdomain back link + theme sync via cookie/URL - Caddy reverse proxy on *.klas.chat
352 lines
13 KiB
Python
352 lines
13 KiB
Python
"""Render DWG/DXF/PDF → PNG image(s) for vision model consumption.
|
|
|
|
Strategy: multi-floor architectural drawings are split per detected legend.
|
|
Each floor renders to its own PNG at a resolution where individual symbols
|
|
remain distinguishable for the vision model.
|
|
"""
|
|
import logging
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
import io
|
|
|
|
import ezdxf
|
|
from ezdxf.addons.drawing.config import (
|
|
BackgroundPolicy, ColorPolicy, Configuration,
|
|
)
|
|
from ezdxf.addons.drawing import Frontend, RenderContext, layout
|
|
from ezdxf.addons.drawing.svg import SVGBackend
|
|
import cairosvg
|
|
from PIL import Image
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
RENDER_PX = 8000 # target pixel size of the longer edge — Claude vision max
|
|
|
|
|
|
def dwg_to_dxf(dwg_path: Path, out_dir: Path) -> Path:
|
|
dxf_path = out_dir / f"{dwg_path.stem}.dxf"
|
|
r = subprocess.run(
|
|
["dwgread", "-O", "DXF", "-o", str(dxf_path), str(dwg_path)],
|
|
capture_output=True, text=True, timeout=180,
|
|
)
|
|
if not dxf_path.exists():
|
|
raise RuntimeError(f"DWG→DXF failed (exit {r.returncode}): {r.stderr or r.stdout}")
|
|
return dxf_path
|
|
|
|
|
|
SKIP_LAYER_PATTERNS = (
|
|
"dimens", "kota", "kóta", "koty", # dimensions
|
|
"sit_konst", "konstrukce", "ckkoty", # structural / dimensioning
|
|
"ckprofi", "profily", "csprofily", # steel/concrete profile rezy
|
|
"sanita", "vzt", "ov_kan", "tzb", # plumbing / HVAC (when not the target)
|
|
"viewport", "defpoints", "0_", "_b_", # CAD bookkeeping
|
|
"raster", "wipeout",
|
|
)
|
|
|
|
|
|
def _clean_doc(doc, drop_layers=False):
|
|
"""Strip elements that overwhelm vision rendering."""
|
|
msp = doc.modelspace()
|
|
valid_blocks = {b.name for b in doc.blocks}
|
|
for e in list(msp.query("INSERT")):
|
|
if e.dxf.name not in valid_blocks:
|
|
msp.delete_entity(e)
|
|
for typ in ("HATCH", "SOLID", "MPOLYGON"):
|
|
for e in list(msp.query(typ)):
|
|
msp.delete_entity(e)
|
|
if drop_layers:
|
|
for e in list(msp):
|
|
layer = str(getattr(e.dxf, "layer", "")).lower()
|
|
if any(p in layer for p in SKIP_LAYER_PATTERNS):
|
|
try:
|
|
msp.delete_entity(e)
|
|
except Exception:
|
|
pass
|
|
for e in msp:
|
|
try:
|
|
e.dxf.lineweight = 5
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def find_floors(dxf_path: Path) -> list[dict]:
|
|
"""Find legend 'LEGENDA' markers — each represents one floor.
|
|
|
|
Returns list of {legend_xy, floor_bbox} dicts ordered top to bottom.
|
|
"""
|
|
doc = ezdxf.readfile(str(dxf_path))
|
|
msp = doc.modelspace()
|
|
|
|
positions = []
|
|
for e in msp:
|
|
text = ""
|
|
if e.dxftype() == "MTEXT":
|
|
text = e.text
|
|
elif e.dxftype() == "TEXT":
|
|
text = e.dxf.text
|
|
if text and text.strip().upper() == "LEGENDA":
|
|
try:
|
|
positions.append((e.dxf.insert.x, e.dxf.insert.y))
|
|
except Exception:
|
|
pass
|
|
positions.sort(key=lambda p: -p[1])
|
|
|
|
if not positions:
|
|
return [{"legend_xy": None, "floor_bbox": None}]
|
|
|
|
# Use the same cleanup as render_region so extents reflect what'll be drawn
|
|
_clean_doc(doc)
|
|
from ezdxf.bbox import extents
|
|
try:
|
|
ext = extents(msp, fast=True)
|
|
mxmin, mymin = ext.extmin.x, ext.extmin.y
|
|
mxmax, mymax = ext.extmax.x, ext.extmax.y
|
|
except Exception:
|
|
mxmin, mymin = -1e9, -1e9
|
|
mxmax, mymax = 1e9, 1e9
|
|
logger.info("find_floors: model extents x=(%.0f,%.0f) y=(%.0f,%.0f)",
|
|
mxmin, mxmax, mymin, mymax)
|
|
|
|
floors = []
|
|
for i, (lx, ly) in enumerate(positions):
|
|
if i + 1 < len(positions):
|
|
y_height = ly - positions[i + 1][1]
|
|
elif i > 0:
|
|
y_height = positions[i - 1][1] - ly
|
|
else:
|
|
y_height = 60000
|
|
# Legend appears at the TOP of its floor view; plan extends downward
|
|
y_top = min(mymax, ly + 0.10 * y_height)
|
|
y_bot = max(mymin, ly - 0.95 * y_height)
|
|
# X span = whole model width (floor plans typically span the page width)
|
|
x_left = mxmin
|
|
x_right = mxmax
|
|
floors.append({
|
|
"legend_xy": (lx, ly),
|
|
"floor_bbox": (x_left, y_bot, x_right, y_top),
|
|
})
|
|
return floors
|
|
|
|
|
|
def render_region(dxf_path: Path, out_path: Path, bbox: tuple | None) -> Path:
|
|
"""Render a DXF (optionally clipped to bbox) to PNG via SVG.
|
|
|
|
Pipeline: ezdxf → SVG (vector, faithful linework) → cairosvg → PNG.
|
|
Cropping is done in pixel space after rasterization, using the page
|
|
rectangle ezdxf assigned to the SVG.
|
|
"""
|
|
doc = ezdxf.readfile(str(dxf_path))
|
|
auditor = doc.audit()
|
|
if auditor.has_errors:
|
|
logger.info("DXF audit: %d errors", len(auditor.errors))
|
|
_clean_doc(doc)
|
|
msp = doc.modelspace()
|
|
|
|
from ezdxf.bbox import extents
|
|
try:
|
|
ext = extents(msp, fast=True)
|
|
model_xmin = ext.extmin.x
|
|
model_ymin = ext.extmin.y
|
|
model_w = ext.size.x or 1
|
|
model_h = ext.size.y or 1
|
|
except Exception:
|
|
model_xmin = model_ymin = 0
|
|
model_w = model_h = 1
|
|
|
|
config = Configuration(
|
|
background_policy=BackgroundPolicy.WHITE,
|
|
color_policy=ColorPolicy.BLACK,
|
|
lineweight_scaling=0.5,
|
|
min_lineweight=0.05,
|
|
)
|
|
|
|
# ezdxf SVG: target page sized so longest dimension is RENDER_PX pixels.
|
|
# SVG uses mm; pick a scale such that the page fits cleanly.
|
|
aspect = model_w / max(model_h, 1)
|
|
if aspect >= 1:
|
|
page_w_mm = 1000
|
|
page_h_mm = 1000 / aspect
|
|
else:
|
|
page_h_mm = 1000
|
|
page_w_mm = 1000 * aspect
|
|
|
|
page = layout.Page(width=page_w_mm, height=page_h_mm,
|
|
units=layout.Units.mm, margins=layout.Margins.all(0))
|
|
backend = SVGBackend()
|
|
Frontend(RenderContext(doc), backend, config=config).draw_layout(msp, finalize=True)
|
|
svg_str = backend.get_string(page)
|
|
|
|
# cairosvg renders SVG → PNG. output_width sets the PNG pixel width.
|
|
longest_px = RENDER_PX
|
|
out_width = longest_px if aspect >= 1 else int(longest_px * aspect)
|
|
png_bytes = cairosvg.svg2png(bytestring=svg_str.encode("utf-8"),
|
|
output_width=out_width)
|
|
img = Image.open(io.BytesIO(png_bytes))
|
|
# Ensure white background (cairosvg may produce alpha)
|
|
if img.mode == "RGBA":
|
|
white = Image.new("RGB", img.size, (255, 255, 255))
|
|
white.paste(img, mask=img.split()[3])
|
|
img = white
|
|
|
|
if bbox is not None:
|
|
W, H = img.size
|
|
xmin, ymin, xmax, ymax = bbox
|
|
logger.info("Crop input: model_w=%.0f model_h=%.0f W=%d H=%d bbox=%s",
|
|
model_w, model_h, W, H, bbox)
|
|
px0 = max(0, int((xmin - model_xmin) / model_w * W))
|
|
px1 = min(W, int((xmax - model_xmin) / model_w * W))
|
|
py0 = max(0, int((model_ymin + model_h - ymax) / model_h * H))
|
|
py1 = min(H, int((model_ymin + model_h - ymin) / model_h * H))
|
|
logger.info("Crop pixels: (%d,%d) → (%d,%d)", px0, py0, px1, py1)
|
|
if px1 > px0 and py1 > py0:
|
|
img = img.crop((px0, py0, px1, py1))
|
|
|
|
if max(img.size) > RENDER_PX:
|
|
r = RENDER_PX / max(img.size)
|
|
img = img.resize((int(img.size[0] * r), int(img.size[1] * r)), Image.LANCZOS)
|
|
|
|
img.save(out_path, "PNG", optimize=True)
|
|
logger.info("Rendered → %s (%dx%d)", out_path.name, img.size[0], img.size[1])
|
|
return out_path
|
|
|
|
|
|
def render(input_path: Path, out_dir: Path) -> dict:
|
|
"""Convert input → list of floor images.
|
|
|
|
Returns: {"floors": [{"index":0, "png":"floor_0.png", "legend_xy":[x,y]}, ...]}.
|
|
"""
|
|
suffix = input_path.suffix.lower()
|
|
|
|
if suffix == ".pdf":
|
|
return _render_pdf(input_path, out_dir)
|
|
|
|
if suffix == ".dwg":
|
|
dxf_path = dwg_to_dxf(input_path, out_dir)
|
|
elif suffix == ".dxf":
|
|
dxf_path = input_path
|
|
else:
|
|
raise ValueError(f"Unsupported format: {suffix}")
|
|
|
|
floors = find_floors(dxf_path)
|
|
logger.info("Detected %d floor(s) via LEGENDA markers", len(floors))
|
|
# MVP: render only the first floor. Multi-floor selection is a follow-up.
|
|
f = floors[0]
|
|
png = out_dir / "floor_0.png"
|
|
render_region(dxf_path, png, f["floor_bbox"])
|
|
return {"floors": [{"index": 0, "png": png.name,
|
|
"legend_xy": list(f["legend_xy"]) if f["legend_xy"] else None}]}
|
|
|
|
|
|
def _render_pdf(pdf_path: Path, out_dir: Path) -> dict:
|
|
"""Render PDF → PNG, auto-rotate so LEGENDA reads horizontally.
|
|
|
|
Uses pdfplumber to find 'LEGENDA' text and its rotation, then renders
|
|
via pdf2image and applies image rotation so the legend is upright.
|
|
Returns the same shape as the DXF render path.
|
|
"""
|
|
from pdf2image import convert_from_path
|
|
import pdfplumber
|
|
# Allow large rasterizations (architectural PDFs can be 200M+ px at high DPI)
|
|
Image.MAX_IMAGE_PIXELS = None
|
|
|
|
# Pick DPI so longest page edge lands near RENDER_PX pixels
|
|
with pdfplumber.open(str(pdf_path)) as pdf:
|
|
first = pdf.pages[0]
|
|
pw_in = max(first.width, first.height) / 72 # PDF points → inches
|
|
target_dpi = max(150, min(600, int(RENDER_PX / max(pw_in, 1))))
|
|
logger.info("PDF page longest edge %.1f in → using dpi=%d", pw_in, target_dpi)
|
|
|
|
pages = convert_from_path(str(pdf_path), dpi=target_dpi)
|
|
out_paths = []
|
|
legend_info: list[dict] = []
|
|
|
|
with pdfplumber.open(str(pdf_path)) as pdf:
|
|
for i, plumb_page in enumerate(pdf.pages):
|
|
page_img = pages[i] if i < len(pages) else None
|
|
if page_img is None:
|
|
continue
|
|
pw, ph = plumb_page.width, plumb_page.height
|
|
iw, ih = page_img.size
|
|
# Find any text matching legend headings
|
|
words = plumb_page.extract_words(extra_attrs=["upright"]) or []
|
|
legend_word = None
|
|
for w in words:
|
|
text = w["text"].strip().upper()
|
|
if text in ("LEGENDA", "VYSVĚTLIVKY", "LEGENDA:", "POPIS"):
|
|
legend_word = w
|
|
break
|
|
rotation = 0
|
|
if legend_word is not None and not legend_word.get("upright", True):
|
|
# Sideways text → rotate the image so text is upright.
|
|
# PIL.rotate uses CCW for positive angles.
|
|
rotation = 90
|
|
page_img = page_img.rotate(90, expand=True)
|
|
iw, ih = page_img.size
|
|
logger.info("PDF page %d: rotated 90° CCW (LEGENDA was sideways)", i)
|
|
if legend_word is not None:
|
|
# Convert PDF coords to NORMALIZED image coords (after rotation)
|
|
x0, y0 = legend_word["x0"], legend_word["top"]
|
|
x1, y1 = legend_word["x1"], legend_word["bottom"]
|
|
if rotation == 90:
|
|
# CCW 90° rotation: original (x, y) → new (y, W-x).
|
|
# Rotated image has width=ph, height=pw.
|
|
nx0 = y0 / ph
|
|
nx1 = y1 / ph
|
|
ny0 = 1 - (x1 / pw)
|
|
ny1 = 1 - (x0 / pw)
|
|
else:
|
|
nx0, nx1 = x0 / pw, x1 / pw
|
|
ny0, ny1 = y0 / ph, y1 / ph
|
|
legend_info.append({"page": i, "norm_bbox": (nx0, ny0, nx1, ny1)})
|
|
logger.info("PDF page %d: LEGENDA at norm bbox %s",
|
|
i, (nx0, ny0, nx1, ny1))
|
|
|
|
if max(page_img.size) > RENDER_PX:
|
|
r = RENDER_PX / max(page_img.size)
|
|
page_img = page_img.resize(
|
|
(int(page_img.size[0] * r), int(page_img.size[1] * r)),
|
|
Image.LANCZOS,
|
|
)
|
|
p = out_dir / f"floor_{i}.png"
|
|
page_img.save(p, "PNG", optimize=True)
|
|
out_paths.append(p)
|
|
|
|
return {
|
|
"floors": [
|
|
{"index": i, "png": p.name, "legend_xy": None,
|
|
"legend_norm_bbox": next((li["norm_bbox"] for li in legend_info
|
|
if li["page"] == i), None)}
|
|
for i, p in enumerate(out_paths)
|
|
]
|
|
}
|
|
|
|
|
|
def crop_region(image_path: Path, bbox: dict, out_path: Path,
|
|
pad: float = 1.5, min_px: int = 120) -> Path:
|
|
"""Crop a region with generous padding so symbols are visible.
|
|
|
|
pad: multiplier of the bbox half-extent added on each side.
|
|
min_px: ensure the output is at least this many pixels wide/tall by
|
|
expanding the crop region if the requested area is smaller.
|
|
"""
|
|
img = Image.open(image_path)
|
|
W, H = img.size
|
|
bx, by, bw, bh = bbox["x"], bbox["y"], bbox["w"], bbox["h"]
|
|
cx, cy = bx + bw / 2, by + bh / 2
|
|
# Padded extents in normalized coords
|
|
half_w = bw / 2 + bw * pad
|
|
half_h = bh / 2 + bh * pad
|
|
# Enforce minimum pixel dimensions
|
|
if (2 * half_w) * W < min_px:
|
|
half_w = (min_px / 2) / W
|
|
if (2 * half_h) * H < min_px:
|
|
half_h = (min_px / 2) / H
|
|
x0 = max(0, int((cx - half_w) * W))
|
|
x1 = min(W, int((cx + half_w) * W))
|
|
y0 = max(0, int((cy - half_h) * H))
|
|
y1 = min(H, int((cy + half_h) * H))
|
|
crop = img.crop((x0, y0, x1, y1))
|
|
crop.save(out_path, "PNG")
|
|
return out_path
|