#!/usr/bin/env python3 # # gitctl-helper: server helper for gitctl, run as the git user. # Copyright (C) 2026 Danilo M. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """gitctl-helper: server helper for gitctl, run as the git user. Dispatches on SSH_ORIGINAL_COMMAND.""" import os, re, sys, shlex, shutil, subprocess, argparse, datetime REPO_BASE = "/var/lib/gitolite3/repositories" CGITRC = "/etc/cgitrc" SYNC_SCRIPT = "/usr/local/bin/sync-cgit-descs.py" # Backups go here, not next to CGITRC: the helper runs as the git user, which # owns /etc/cgitrc (the file) but cannot create new files in /etc. Must be a # git-writable directory. BACKUP_DIR = "/var/lib/gitolite3/cgitrc-backups" DEFAULT_OWNER = "by Your Name" BANNER = "# ---------- {name} ------------#" URL_RE = re.compile(r"^[A-Za-z0-9._-]+$") def die(msg, code=1): print(msg, file=sys.stderr) sys.exit(code) def validate_url(u): if not u or ".." in u or not URL_RE.match(u): die(f"error: invalid repo name: {u!r}") return u def validate_section(s): if not s or "\n" in s or "\r" in s: die(f"error: invalid section name: {s!r}") return s SECTION_RE = re.compile(r"^section=(.*)$") URL_LINE_RE = re.compile(r"^repo\.url=(.+)$") # a section banner comment, e.g. "# ---------- Linux ------------#". It belongs # to the section below it, so it bounds the preceding section's span. BANNER_RE = re.compile(r"^#\s*-{3,}.*-{3,}#\s*$") def list_sections_from(lines): out = [] for line in lines: m = SECTION_RE.match(line.rstrip("\n")) if m: out.append(m.group(1)) return out def find_repo_section(lines, url): """Return the section a repo.url= belongs to, or None if absent.""" current = None for line in lines: s = SECTION_RE.match(line.rstrip("\n")) if s: current = s.group(1) continue u = URL_LINE_RE.match(line.rstrip("\n")) if u and u.group(1) == url: return current return None def repo_path_for(url): return os.path.join(REPO_BASE, url + ".git") def build_repo_block(url, owner): return [ f"repo.url={url}\n", f"repo.path={repo_path_for(url)}\n", f"repo.owner={owner}\n", ] def insert_repo_block(lines, section, block): """Insert block at the end of `section`'s span. Raise KeyError if absent.""" start = None for i, line in enumerate(lines): m = SECTION_RE.match(line.rstrip("\n")) if m and m.group(1) == section: start = i break if start is None: raise KeyError(section) # end of span = next section= line OR next banner comment (a banner belongs # to the section it precedes), or EOF end = len(lines) for j in range(start + 1, len(lines)): stripped = lines[j].rstrip("\n") if SECTION_RE.match(stripped) or BANNER_RE.match(stripped): end = j break # trim trailing blank lines inside the span so we control spacing insert_at = end while insert_at > start + 1 and lines[insert_at - 1].strip() == "": insert_at -= 1 payload = ["\n"] + block + ["\n"] return lines[:insert_at] + payload + lines[insert_at:] def write_cgitrc_lines(path, lines, backup_dir=None): """Back up then write `path` in place. Not atomic: the helper runs as the git user, which owns /etc/cgitrc but cannot create temp inodes in /etc, so an os.replace rename is impossible. The pre-write backup makes a truncated write (crash mid-write) recoverable. backup_dir defaults to the file's own directory; for the real /etc/cgitrc the caller passes BACKUP_DIR, since git cannot create files in /etc.""" if os.path.exists(path): bdir = backup_dir or os.path.dirname(os.path.abspath(path)) os.makedirs(bdir, exist_ok=True) ts = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") shutil.copy2(path, os.path.join(bdir, os.path.basename(path) + ".bak." + ts)) with open(path, "w") as f: # in place, preserves ownership f.writelines(lines) def ensure_section(cgitrc, name, backup_dir=None): """Append a section banner+header if absent. Return True if added.""" with open(cgitrc) as f: lines = f.readlines() if name in list_sections_from(lines): return False addition = ["\n", BANNER.format(name=name) + "\n", f"section={name}\n"] write_cgitrc_lines(cgitrc, lines + addition, backup_dir) return True def do_add_repo(cgitrc, url, section, owner, run_sync, backup_dir=None): with open(cgitrc) as f: lines = f.readlines() existing = find_repo_section(lines, url) if existing is not None: if existing == section: print(f"repo {url} already present under section {section}") return 0 print(f"error: repo {url} already exists under section {existing!r}, " f"refusing to move it to {section!r}", file=sys.stderr) return 1 if section not in list_sections_from(lines): print(f"error: section {section!r} does not exist; run add-section first", file=sys.stderr) return 1 new = insert_repo_block(lines, section, build_repo_block(url, owner)) write_cgitrc_lines(cgitrc, new, backup_dir) print(f"added repo {url} under section {section}") run_sync() return 0 def do_set_desc(desc_path, desc, run_sync, url=None): if not os.path.exists(desc_path): what = f"repo {url!r} not found" if url else f"description file not found: {desc_path}" print(f"error: {what}", file=sys.stderr) return 1 st = os.stat(desc_path) with open(desc_path, "w") as f: # in place, preserves owner f.write(desc.rstrip("\n") + "\n") try: os.chown(desc_path, st.st_uid, st.st_gid) # insurance except PermissionError: pass print(f"wrote description to {desc_path}") run_sync() return 0 def _self_test(): assert validate_url("publisher") == "publisher" assert validate_url("a.b_c-1") == "a.b_c-1" for bad in ["", "../etc", "a/b", "a b", "a;b", "a$b", "a\nb"]: try: validate_url(bad); assert False, bad except SystemExit: pass assert validate_section("Generic Projects") == "Generic Projects" for bad in ["", "a\nb"]: try: validate_section(bad); assert False, bad except SystemExit: pass fixture = [ "section=Generic Projects\n", "repo.url=cad-projects\n", "repo.path=/var/lib/gitolite3/repositories/cad-projects.git\n", "\n", "# ---------- SlackBuilds ------------#\n", "section=SlackBuilds\n", "repo.url=my-slackbuilds\n", "repo.path=/var/lib/gitolite3/repositories/my-slackbuilds.git\n", "\n", ] assert list_sections_from(fixture) == ["Generic Projects", "SlackBuilds"] assert find_repo_section(fixture, "cad-projects") == "Generic Projects" assert find_repo_section(fixture, "my-slackbuilds") == "SlackBuilds" assert find_repo_section(fixture, "nope") is None block = build_repo_block("publisher", "by Your Name") assert block == [ "repo.url=publisher\n", "repo.path=/var/lib/gitolite3/repositories/publisher.git\n", "repo.owner=by Your Name\n", ] # insert under the FIRST section: must land before the SlackBuilds banner, # NOT between the banner and its section= line (the banner belongs to the # section it precedes) new = insert_repo_block(fixture, "Generic Projects", block) text = "".join(new) assert "repo.url=publisher" in text # publisher must precede the SlackBuilds banner (and thus its section header) assert text.index("repo.url=publisher") < text.index("# ---------- SlackBuilds") # original repo still there assert "repo.url=cad-projects" in text # insert under the LAST section: lands before EOF new2 = insert_repo_block(fixture, "SlackBuilds", block) assert "repo.url=publisher" in "".join(new2) # missing section raises try: insert_repo_block(fixture, "Nope", block); assert False except KeyError: pass import tempfile as _tf d = _tf.mkdtemp() p = os.path.join(d, "cgitrc") open(p, "w").write("section=X\nrepo.url=a\n") write_cgitrc_lines(p, ["section=X\n", "repo.url=a\n", "repo.url=b\n"], backup_dir=d) assert open(p).read() == "section=X\nrepo.url=a\nrepo.url=b\n" baks = [f for f in os.listdir(d) if f.startswith("cgitrc.bak.")] assert len(baks) == 1, baks p2 = os.path.join(d, "cgitrc2") open(p2, "w").write("section=Existing\nrepo.url=z\n") added = ensure_section(p2, "NewSec") assert added is True txt = open(p2).read() assert "section=NewSec" in txt assert "# ---------- NewSec ------------#" in txt # idempotent: second call is a no-op added2 = ensure_section(p2, "NewSec") assert added2 is False assert open(p2).read().count("section=NewSec") == 1 p3 = os.path.join(d, "cgitrc3") open(p3, "w").writelines([ "# ---------- Generic ------------#\n", "section=Generic\n", "repo.url=cad\n", "repo.path=/var/lib/gitolite3/repositories/cad.git\n", "\n", "# ---------- SlackBuilds ------------#\n", "section=SlackBuilds\n", "repo.url=sps\n", "repo.path=/var/lib/gitolite3/repositories/sps.git\n", "\n", ]) calls = [] rc = do_add_repo(p3, "publisher", "Generic", "by Your Name", run_sync=lambda: calls.append(1)) assert rc == 0 t = open(p3).read() assert t.index("repo.url=publisher") < t.index("section=SlackBuilds") assert calls == [1] # sync ran after insert # idempotent: re-add same url under same section -> no duplicate, success, no sync calls.clear() rc = do_add_repo(p3, "publisher", "Generic", "x", run_sync=lambda: calls.append(1)) assert rc == 0 assert open(p3).read().count("repo.url=publisher") == 1 assert calls == [] # exists under a DIFFERENT section -> report, non-zero, no write rc = do_add_repo(p3, "publisher", "SlackBuilds", "x", run_sync=lambda: calls.append(1)) assert rc != 0 # missing section -> non-zero rc = do_add_repo(p3, "newrepo", "Nope", "x", run_sync=lambda: calls.append(1)) assert rc != 0 repo_dir = os.path.join(d, "publisher.git") os.makedirs(repo_dir) desc_path = os.path.join(repo_dir, "description") open(desc_path, "w").write("Unnamed repository; edit this file ...\n") calls.clear() rc = do_set_desc(desc_path, "my real description", run_sync=lambda: calls.append(1)) assert rc == 0 assert open(desc_path).read() == "my real description\n" assert calls == [1] # missing repo -> non-zero, no sync calls.clear() rc = do_set_desc(os.path.join(d, "ghost.git", "description"), "x", run_sync=lambda: calls.append(1), url="ghost") assert rc != 0 assert calls == [] print("self-test OK") def run_sync(): r = subprocess.run([sys.executable, SYNC_SCRIPT]) if r.returncode != 0: die(f"error: sync exited {r.returncode}", r.returncode) def main(): raw = os.environ.get("SSH_ORIGINAL_COMMAND") argv = shlex.split(raw) if raw else sys.argv[1:] if argv and argv[0] == "--self-test": _self_test(); return 0 p = argparse.ArgumentParser( prog="gitctl-helper", description="Server helper for gitctl, run as the git user. Normally invoked by the " "client over a command=-restricted SSH key, which forces this " "script and passes the verb plus arguments in SSH_ORIGINAL_COMMAND. " "It is the single privileged writer of /etc/cgitrc and of each bare " "repo's description file. Can also be run directly for local " "testing; pass --self-test to run the built-in assertions.") sub = p.add_subparsers(dest="verb", required=True, help="server-side operation to perform") sub.add_parser("list-sections", help="print every cgit section name, one per line", description="Read /etc/cgitrc and print each section= name in file order.") sp = sub.add_parser("add-section", help="append a new cgit section header if absent", description="Append a banner and section= header to /etc/cgitrc. " "Idempotent: a section that already exists is left " "untouched. Prints 'added' or 'exists'.") sp.add_argument("--name", required=True, help="section title as shown in cgit, e.g. 'Generic Projects'") ar = sub.add_parser("add-repo", help="insert a repo block under a section in cgitrc, then sync", description="Insert a repo.url/path/owner block at the end of the " "named section's span in /etc/cgitrc (positionally, " "before the next section or EOF), then run the cgit " "description sync. Idempotent under the same section; " "refuses to move a repo that already exists under a " "different section.") ar.add_argument("--url", required=True, help="repo name as it appears in cgit and on disk (becomes .git)") ar.add_argument("--section", required=True, help="existing section to file the repo under; create it first with add-section") ar.add_argument("--owner", default=DEFAULT_OWNER, help="repo.owner string shown in cgit (default: %(default)r)") sd = sub.add_parser("set-desc", help="write a bare repo's description file, then sync", description="Write the bare repo's description file in place " "(preserving its git:gitolite3 ownership) and run the " "sync script. This is the only supported way to change " "a cgit repo.desc=; it is never written directly.") sd.add_argument("--url", required=True, help="repo name (the bare repo is .git)") sd.add_argument("--desc", required=True, help="description text (a single trailing newline is enforced)") sub.add_parser("sync", help="run the cgit description sync script", description="Run sync-cgit-descs.py, which copies each bare repo's " "description into cgit's repo.desc=. The single writer of repo.desc=.") re_ = sub.add_parser("repo-exists", help="exit 0 if the bare repo exists, 1 otherwise", description="Check whether .git exists under the repository " "base. Used by the client to poll after a gitolite " "push until gitolite has built the bare repo.") re_.add_argument("--url", required=True, help="repo name to test for (as .git)") a = p.parse_args(argv) if a.verb == "list-sections": with open(CGITRC) as f: for s in list_sections_from(f.readlines()): print(s) return 0 if a.verb == "add-section": validate_section(a.name) print("added" if ensure_section(CGITRC, a.name, BACKUP_DIR) else "exists") return 0 if a.verb == "add-repo": validate_url(a.url); validate_section(a.section) return do_add_repo(CGITRC, a.url, a.section, a.owner, run_sync, BACKUP_DIR) if a.verb == "set-desc": validate_url(a.url) return do_set_desc(os.path.join(repo_path_for(a.url), "description"), a.desc, run_sync, url=a.url) if a.verb == "sync": run_sync(); return 0 if a.verb == "repo-exists": validate_url(a.url) return 0 if os.path.isdir(repo_path_for(a.url)) else 1 return 1 if __name__ == "__main__": if len(sys.argv) > 1 and sys.argv[1] == "--self-test" and not os.environ.get("SSH_ORIGINAL_COMMAND"): _self_test(); sys.exit(0) sys.exit(main())