--- /dev/null
+from __future__ import annotations
+from PyQt6.QtCore import QObject, QProcess, pyqtSignal
+from pathlib import Path
+
+class HugoWorker(QObject):
+ log_line = pyqtSignal(str)
+ started = pyqtSignal()
+ stopped = pyqtSignal()
+ error = pyqtSignal(str)
+
+ def __init__(self, repo_path: Path, parent=None):
+ super().__init__(parent)
+ self._repo_path = repo_path
+ self._process = QProcess(self)
+ self._process.readyReadStandardOutput.connect(self._on_stdout)
+ self._process.readyReadStandardError.connect(self._on_stderr)
+ self._process.finished.connect(self._on_finished)
+
+ @property
+ def is_running(self) -> bool:
+ return self._process.state() == QProcess.ProcessState.Running
+
+ def start(self):
+ if self.is_running:
+ return
+ self._process.setWorkingDirectory(str(self._repo_path))
+ self._process.start("hugo", ["server", "-D"])
+ self.started.emit()
+
+ def stop(self):
+ if self.is_running:
+ self._process.terminate()
+ self._process.waitForFinished(3000)
+ self.stopped.emit()
+
+ def _on_stdout(self):
+ data = self._process.readAllStandardOutput().data().decode("utf-8", errors="replace")
+ for line in data.splitlines():
+ self.log_line.emit(line)
+
+ def _on_stderr(self):
+ data = self._process.readAllStandardError().data().decode("utf-8", errors="replace")
+ for line in data.splitlines():
+ self.log_line.emit(line)
+
+ def _on_finished(self, exit_code: int, _):
+ if exit_code != 0:
+ self.error.emit(f"hugo server exited with code {exit_code}")
+ self.stopped.emit()