354 lines
14 KiB
Python
354 lines
14 KiB
Python
import os
|
|
import re
|
|
import copy
|
|
import tempfile
|
|
import subprocess
|
|
from fastapi import FastAPI, UploadFile, File, HTTPException
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import StreamingResponse
|
|
import json
|
|
from fastapi.staticfiles import StaticFiles
|
|
|
|
from . import geviset
|
|
from . import excel
|
|
from pathlib import Path
|
|
|
|
app = FastAPI(title="GeViSet Web")
|
|
|
|
_LAST_TREE = None
|
|
_ROOT_DIR = Path(__file__).resolve().parents[2]
|
|
_SCRIPT_PATH = _ROOT_DIR / "scripts" / "build_mapping.js"
|
|
_TEMPLATE_RULES = _ROOT_DIR / "frontend" / "src" / "templates" / "templateRules.json"
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
@app.get("/api/health")
|
|
async def health():
|
|
return {"ok": True}
|
|
|
|
|
|
@app.post("/api/set/parse")
|
|
async def parse_set(file: UploadFile = File(...)):
|
|
if not file.filename.lower().endswith(".set"):
|
|
raise HTTPException(status_code=400, detail="Only .set files are supported")
|
|
contents = await file.read()
|
|
try:
|
|
tree = geviset.load_set_bytes(contents)
|
|
mapping = geviset.extract_mapping(tree)
|
|
gcore = geviset.extract_gcore(tree)
|
|
gsc = geviset.extract_gsc(tree)
|
|
except Exception as exc:
|
|
raise HTTPException(status_code=400, detail=str(exc))
|
|
global _LAST_TREE
|
|
_LAST_TREE = tree
|
|
return {
|
|
"filename": file.filename,
|
|
"token": "last",
|
|
"tree": tree,
|
|
"mapping": mapping,
|
|
"gcore": gcore,
|
|
"gsc": gsc,
|
|
}
|
|
|
|
|
|
@app.post("/api/set/export")
|
|
async def export_set(payload: dict):
|
|
try:
|
|
tree = payload.get("tree")
|
|
token = payload.get("token")
|
|
if tree is None:
|
|
if token == "last" and _LAST_TREE is not None:
|
|
tree = copy.deepcopy(_LAST_TREE)
|
|
else:
|
|
raise ValueError("Missing tree in payload")
|
|
if "mapping" in payload:
|
|
geviset.apply_mapping(tree, payload["mapping"])
|
|
mapping_for_ids = geviset.extract_mapping(tree)
|
|
camera_ids = set()
|
|
ptz_by_id = {}
|
|
for rule in payload["mapping"].get("rules", []):
|
|
inp = rule.get("input", {})
|
|
vid = inp.get("videoInputId")
|
|
if isinstance(vid, int) and vid > 0 and isinstance(inp.get("ptz"), bool):
|
|
ptz_by_id[vid] = inp["ptz"]
|
|
else:
|
|
name = inp.get("name", "")
|
|
match = re.search(r"(\\d+)$", str(name))
|
|
if match and isinstance(inp.get("ptz"), bool):
|
|
ptz_by_id[int(match.group(1))] = inp["ptz"]
|
|
|
|
for rule in mapping_for_ids.get("rules", []):
|
|
inp = rule.get("input", {})
|
|
vid = inp.get("videoInputId")
|
|
if isinstance(vid, int) and vid > 0:
|
|
camera_ids.add(vid)
|
|
else:
|
|
name = inp.get("name", "")
|
|
match = re.search(r"(\\d+)$", str(name))
|
|
if match:
|
|
camera_ids.add(int(match.group(1)))
|
|
for field in rule.get("fields", []):
|
|
if field.get("name") == "VideoInput":
|
|
val = field.get("value")
|
|
if isinstance(val, int) and val > 0:
|
|
camera_ids.add(val)
|
|
elif isinstance(val, str) and val.isdigit():
|
|
camera_ids.add(int(val))
|
|
for out in rule.get("outputs", []):
|
|
action = out.get("action", "")
|
|
match = re.search(r"Camera:\\s*(\\d+)", action)
|
|
if match:
|
|
camera_ids.add(int(match.group(1)))
|
|
geviset.ensure_video_inputs(tree, camera_ids, ptz_by_id)
|
|
geviset.ensure_global_video_inputs(tree, camera_ids, ptz_by_id)
|
|
geviset.ensure_vx3_video_inputs(tree, camera_ids, ptz_by_id)
|
|
geviset.prune_video_inputs(tree, camera_ids)
|
|
print(
|
|
f"EXPORT camera_ids={len(camera_ids)} contains_101027={101027 in camera_ids}",
|
|
flush=True,
|
|
)
|
|
if "gcore" in payload:
|
|
geviset.apply_gcore(tree, payload["gcore"])
|
|
if "gsc" in payload:
|
|
geviset.apply_gsc(tree, payload["gsc"])
|
|
if "keyboards" in payload:
|
|
geviset.apply_keyboards(tree, payload["keyboards"])
|
|
if "videoOutputs" in payload:
|
|
geviset.apply_video_outputs(tree, payload["videoOutputs"])
|
|
if "logging" in payload:
|
|
geviset.apply_logging(tree, payload["logging"])
|
|
data = geviset.save_set_bytes(tree)
|
|
except Exception as exc:
|
|
raise HTTPException(status_code=400, detail=str(exc))
|
|
|
|
return StreamingResponse(
|
|
iter([data]),
|
|
media_type="application/octet-stream",
|
|
headers={"Content-Disposition": "attachment; filename=updated.set"},
|
|
)
|
|
|
|
|
|
@app.post("/api/excel/servers")
|
|
async def import_servers(file: UploadFile = File(...)):
|
|
if not file.filename.lower().endswith(".xlsx"):
|
|
raise HTTPException(status_code=400, detail="Only .xlsx files are supported")
|
|
contents = await file.read()
|
|
try:
|
|
parsed = excel.parse_servers_xlsx(contents)
|
|
except Exception as exc:
|
|
raise HTTPException(status_code=400, detail=str(exc))
|
|
return parsed
|
|
|
|
|
|
@app.post("/api/excel/action-mappings")
|
|
async def import_action_mappings(file: UploadFile = File(...)):
|
|
if not file.filename.lower().endswith(".xlsx"):
|
|
raise HTTPException(status_code=400, detail="Only .xlsx files are supported")
|
|
contents = await file.read()
|
|
try:
|
|
parsed = excel.parse_actions_xlsx(contents)
|
|
except Exception as exc:
|
|
raise HTTPException(status_code=400, detail=str(exc))
|
|
return parsed
|
|
|
|
|
|
@app.post("/api/excel/keyboards")
|
|
async def import_keyboards(file: UploadFile = File(...)):
|
|
if not file.filename.lower().endswith(".xlsx"):
|
|
raise HTTPException(status_code=400, detail="Only .xlsx files are supported")
|
|
contents = await file.read()
|
|
try:
|
|
parsed = excel.parse_keyboards_xlsx(contents)
|
|
except Exception as exc:
|
|
raise HTTPException(status_code=400, detail=str(exc))
|
|
return parsed
|
|
|
|
|
|
@app.post("/api/excel/video-outputs")
|
|
async def export_video_outputs(payload: dict):
|
|
try:
|
|
tree = payload.get("tree")
|
|
if tree is None:
|
|
raise ValueError("Missing tree in payload")
|
|
outputs = geviset.extract_video_outputs(tree)
|
|
data = excel.build_video_outputs_xlsx(outputs)
|
|
except Exception as exc:
|
|
raise HTTPException(status_code=400, detail=str(exc))
|
|
return StreamingResponse(
|
|
iter([data]),
|
|
media_type="application/octet-stream",
|
|
headers={"Content-Disposition": "attachment; filename=video_outputs.xlsx"},
|
|
)
|
|
|
|
|
|
@app.post("/api/excel/video-outputs/import")
|
|
async def import_video_outputs(file: UploadFile = File(...)):
|
|
if not file.filename.lower().endswith(".xlsx"):
|
|
raise HTTPException(status_code=400, detail="Only .xlsx files are supported")
|
|
contents = await file.read()
|
|
try:
|
|
parsed = excel.parse_video_outputs_xlsx(contents)
|
|
except Exception as exc:
|
|
raise HTTPException(status_code=400, detail=str(exc))
|
|
return parsed
|
|
|
|
|
|
@app.post("/api/excel/logging")
|
|
async def export_logging(payload: dict):
|
|
try:
|
|
tree = payload.get("tree")
|
|
if tree is None:
|
|
raise ValueError("Missing tree in payload")
|
|
rows = geviset.extract_logging(tree)
|
|
data = excel.build_logging_xlsx(rows)
|
|
except Exception as exc:
|
|
raise HTTPException(status_code=400, detail=str(exc))
|
|
return StreamingResponse(
|
|
iter([data]),
|
|
media_type="application/octet-stream",
|
|
headers={"Content-Disposition": "attachment; filename=logging.xlsx"},
|
|
)
|
|
|
|
|
|
@app.post("/api/excel/logging/import")
|
|
async def import_logging(file: UploadFile = File(...)):
|
|
if not file.filename.lower().endswith(".xlsx"):
|
|
raise HTTPException(status_code=400, detail="Only .xlsx files are supported")
|
|
contents = await file.read()
|
|
try:
|
|
parsed = excel.parse_logging_xlsx(contents)
|
|
except Exception as exc:
|
|
raise HTTPException(status_code=400, detail=str(exc))
|
|
return parsed
|
|
|
|
|
|
@app.post("/api/batch/build")
|
|
async def build_from_excel(
|
|
base_set: UploadFile = File(...),
|
|
servers: UploadFile = File(None),
|
|
actions: UploadFile = File(None),
|
|
outputs: UploadFile = File(None),
|
|
keyboards: UploadFile = File(None),
|
|
logging: UploadFile = File(None),
|
|
):
|
|
if not base_set.filename.lower().endswith(".set"):
|
|
raise HTTPException(status_code=400, detail="Base file must be .set")
|
|
try:
|
|
tree = geviset.load_set_bytes(await base_set.read())
|
|
|
|
if actions and actions.filename:
|
|
if not actions.filename.lower().endswith(".xlsx"):
|
|
raise HTTPException(status_code=400, detail="Actions must be .xlsx")
|
|
actions_parsed = excel.parse_actions_xlsx(await actions.read())
|
|
mapping = geviset.extract_mapping(tree)
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
map_in = Path(tmpdir) / "mapping_in.json"
|
|
rows_in = Path(tmpdir) / "actions_rows.json"
|
|
map_out = Path(tmpdir) / "mapping_out.json"
|
|
map_in.write_text(json.dumps(mapping), encoding="utf-8")
|
|
rows_in.write_text(json.dumps(actions_parsed["rows"]), encoding="utf-8")
|
|
subprocess.check_call(
|
|
["node", str(_SCRIPT_PATH), str(map_in), str(rows_in), str(_TEMPLATE_RULES), str(map_out)]
|
|
)
|
|
mapping_updated = json.loads(map_out.read_text(encoding="utf-8"))
|
|
geviset.apply_mapping(tree, mapping_updated)
|
|
|
|
mapping_for_ids = geviset.extract_mapping(tree)
|
|
camera_ids = set()
|
|
ptz_by_id = {}
|
|
for rule in mapping_updated.get("rules", []):
|
|
inp = rule.get("input", {})
|
|
vid = inp.get("videoInputId")
|
|
if isinstance(vid, int) and vid > 0 and isinstance(inp.get("ptz"), bool):
|
|
ptz_by_id[vid] = inp["ptz"]
|
|
else:
|
|
name = inp.get("name", "")
|
|
match = re.search(r"(\\d+)$", str(name))
|
|
if match and isinstance(inp.get("ptz"), bool):
|
|
ptz_by_id[int(match.group(1))] = inp["ptz"]
|
|
|
|
for rule in mapping_for_ids.get("rules", []):
|
|
inp = rule.get("input", {})
|
|
vid = inp.get("videoInputId")
|
|
if isinstance(vid, int) and vid > 0:
|
|
camera_ids.add(vid)
|
|
else:
|
|
name = inp.get("name", "")
|
|
match = re.search(r"(\\d+)$", str(name))
|
|
if match:
|
|
camera_ids.add(int(match.group(1)))
|
|
for field in rule.get("fields", []):
|
|
if field.get("name") == "VideoInput":
|
|
val = field.get("value")
|
|
if isinstance(val, int) and val > 0:
|
|
camera_ids.add(val)
|
|
elif isinstance(val, str) and val.isdigit():
|
|
camera_ids.add(int(val))
|
|
for out in rule.get("outputs", []):
|
|
action = out.get("action", "")
|
|
match = re.search(r"Camera:\\s*(\\d+)", action)
|
|
if match:
|
|
camera_ids.add(int(match.group(1)))
|
|
|
|
geviset.ensure_video_inputs(tree, camera_ids, ptz_by_id)
|
|
geviset.ensure_global_video_inputs(tree, camera_ids, ptz_by_id)
|
|
geviset.ensure_vx3_video_inputs(tree, camera_ids, ptz_by_id)
|
|
geviset.prune_video_inputs(tree, camera_ids)
|
|
|
|
if servers and servers.filename:
|
|
if not servers.filename.lower().endswith(".xlsx"):
|
|
raise HTTPException(status_code=400, detail="Servers must be .xlsx")
|
|
servers_parsed = excel.parse_servers_xlsx(await servers.read())
|
|
gcore_list = [s for s in servers_parsed["servers"] if s.get("type") == "gcore"]
|
|
gsc_list = [s for s in servers_parsed["servers"] if s.get("type") == "geviscope"]
|
|
for idx, s in enumerate(gcore_list, start=1):
|
|
s["id"] = str(idx)
|
|
for idx, s in enumerate(gsc_list, start=1):
|
|
s["id"] = str(idx)
|
|
bundle_gcore = geviset.extract_gcore(tree)
|
|
bundle_gsc = geviset.extract_gsc(tree)
|
|
bundle_gcore["servers"] = gcore_list
|
|
bundle_gsc["servers"] = gsc_list
|
|
geviset.apply_gcore(tree, bundle_gcore)
|
|
geviset.apply_gsc(tree, bundle_gsc)
|
|
|
|
if outputs and outputs.filename:
|
|
if not outputs.filename.lower().endswith(".xlsx"):
|
|
raise HTTPException(status_code=400, detail="Outputs must be .xlsx")
|
|
outputs_parsed = excel.parse_video_outputs_xlsx(await outputs.read())
|
|
geviset.apply_video_outputs(tree, outputs_parsed["outputs"])
|
|
|
|
if keyboards and keyboards.filename:
|
|
if not keyboards.filename.lower().endswith(".xlsx"):
|
|
raise HTTPException(status_code=400, detail="Keyboards must be .xlsx")
|
|
keyboards_parsed = excel.parse_keyboards_xlsx(await keyboards.read())
|
|
geviset.apply_keyboards(tree, keyboards_parsed["keyboards"])
|
|
|
|
if logging and logging.filename:
|
|
if not logging.filename.lower().endswith(".xlsx"):
|
|
raise HTTPException(status_code=400, detail="Logging must be .xlsx")
|
|
logging_parsed = excel.parse_logging_xlsx(await logging.read())
|
|
geviset.apply_logging(tree, logging_parsed["rows"])
|
|
|
|
data = geviset.save_set_bytes(tree)
|
|
except Exception as exc:
|
|
raise HTTPException(status_code=400, detail=str(exc))
|
|
|
|
return StreamingResponse(
|
|
iter([data]),
|
|
media_type="application/octet-stream",
|
|
headers={"Content-Disposition": "attachment; filename=combined.set"},
|
|
)
|
|
|
|
|
|
static_dir = os.getenv("STATIC_DIR")
|
|
if static_dir and os.path.isdir(static_dir):
|
|
app.mount("/", StaticFiles(directory=static_dir, html=True), name="static")
|