Project Files
src / fastvlm_server / qwen3_vl_model.py
"""Qwen3-VL detection backend via persistent subprocess runner."""
from __future__ import annotations
import json
import logging
import subprocess
import threading
from pathlib import Path
logger = logging.getLogger(__name__)
_runner: subprocess.Popen | None = None
_runner_lock = threading.Lock()
_model_path: str = ""
def _plugin_root() -> Path:
# src/fastvlm_server/qwen3_vl_model.py -> up two levels -> plugin root
return Path(__file__).parent.parent.parent
def _qwen3vl_python() -> str:
return str(Path.home() / ".fastvlm" / "qwen3vl_venv" / "bin" / "python3")
def _runner_script() -> str:
return str(Path(__file__).parent / "qwen3_vl_runner.py")
def _drain_stderr(proc: subprocess.Popen) -> None:
for line in proc.stderr:
logger.info("qwen3vl runner: %s", line.rstrip())
def load(model_path: str) -> None:
"""Start (or reuse) the persistent Qwen3-VL runner subprocess."""
global _runner, _model_path
with _runner_lock:
if _runner is not None and _runner.poll() is None:
return
_model_path = model_path
python = _qwen3vl_python()
script = _runner_script()
proc = subprocess.Popen(
[python, script, model_path],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
)
drain = threading.Thread(target=_drain_stderr, args=(proc,), daemon=True)
drain.start()
raw = proc.stdout.readline()
if not raw:
raise RuntimeError("Qwen3-VL runner exited before ready signal")
resp = json.loads(raw)
if not resp.get("ready"):
proc.kill()
raise RuntimeError(f"Qwen3-VL runner failed to load: {resp.get('error', 'unknown')}")
_runner = proc
logger.info("Qwen3-VL runner ready (pid=%d)", proc.pid)
def detect(image_b64: str, task: str = "<OD>") -> dict:
"""Send one detection request to the runner and return the result dict."""
with _runner_lock:
if _runner is None or _runner.poll() is not None:
raise RuntimeError("Qwen3-VL runner is not running")
_runner.stdin.write(json.dumps({"image": image_b64, "task": task}) + "\n")
_runner.stdin.flush()
raw = _runner.stdout.readline()
if not raw:
raise RuntimeError("Qwen3-VL runner closed stdout unexpectedly")
resp = json.loads(raw)
if not resp.get("ok"):
raise RuntimeError(f"Qwen3-VL detection error: {resp.get('error', 'unknown')}")
return resp["result"]
def shutdown() -> None:
"""Terminate the runner subprocess gracefully."""
global _runner
with _runner_lock:
if _runner is not None:
try:
_runner.stdin.close()
except Exception:
pass
try:
_runner.wait(timeout=5)
except subprocess.TimeoutExpired:
_runner.kill()
_runner = None