diff options
| author | Danilo M. <danix@danix.xyz> | 2026-06-23 11:18:05 +0200 |
|---|---|---|
| committer | Danilo M. <danix@danix.xyz> | 2026-06-23 11:18:05 +0200 |
| commit | 9b2328bcb46b1b5fd074fdbc2c4bea8855220276 (patch) | |
| tree | f3936da4c051796dbf51983bbfa309b095bd241c /gitctl-helper | |
| download | gitctl-9b2328bcb46b1b5fd074fdbc2c4bea8855220276.tar.gz gitctl-9b2328bcb46b1b5fd074fdbc2c4bea8855220276.zip | |
Initial commit: gitctl
Thin CLI plus a server helper (run as the git user) to manage repos on a
personal gitolite3 + cgit server over a command=-restricted SSH key. Includes
the client, the helper, bash completion, a Claude Code skill, and docs.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Diffstat (limited to 'gitctl-helper')
| -rwxr-xr-x | gitctl-helper | 422 |
1 files changed, 422 insertions, 0 deletions
diff --git a/gitctl-helper b/gitctl-helper new file mode 100755 index 0000000..8cdf460 --- /dev/null +++ b/gitctl-helper @@ -0,0 +1,422 @@ +#!/usr/bin/env python3 +# +# gitctl-helper: server helper for gitctl, run as the git user. +# Copyright (C) 2026 Danilo M. <danix@danix.xyz> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +"""gitctl-helper: server helper for gitctl, run as the git user. Dispatches on SSH_ORIGINAL_COMMAND.""" +import os, re, sys, shlex, shutil, subprocess, argparse, datetime + +REPO_BASE = "/var/lib/gitolite3/repositories" +CGITRC = "/etc/cgitrc" +SYNC_SCRIPT = "/usr/local/bin/sync-cgit-descs.py" +# Backups go here, not next to CGITRC: the helper runs as the git user, which +# owns /etc/cgitrc (the file) but cannot create new files in /etc. Must be a +# git-writable directory. +BACKUP_DIR = "/var/lib/gitolite3/cgitrc-backups" +DEFAULT_OWNER = "by Your Name" +BANNER = "# ---------- {name} ------------#" + +URL_RE = re.compile(r"^[A-Za-z0-9._-]+$") + + +def die(msg, code=1): + print(msg, file=sys.stderr) + sys.exit(code) + + +def validate_url(u): + if not u or ".." in u or not URL_RE.match(u): + die(f"error: invalid repo name: {u!r}") + return u + + +def validate_section(s): + if not s or "\n" in s or "\r" in s: + die(f"error: invalid section name: {s!r}") + return s + + +SECTION_RE = re.compile(r"^section=(.*)$") +URL_LINE_RE = re.compile(r"^repo\.url=(.+)$") +# a section banner comment, e.g. "# ---------- Linux ------------#". It belongs +# to the section below it, so it bounds the preceding section's span. +BANNER_RE = re.compile(r"^#\s*-{3,}.*-{3,}#\s*$") + + +def list_sections_from(lines): + out = [] + for line in lines: + m = SECTION_RE.match(line.rstrip("\n")) + if m: + out.append(m.group(1)) + return out + + +def find_repo_section(lines, url): + """Return the section a repo.url= belongs to, or None if absent.""" + current = None + for line in lines: + s = SECTION_RE.match(line.rstrip("\n")) + if s: + current = s.group(1) + continue + u = URL_LINE_RE.match(line.rstrip("\n")) + if u and u.group(1) == url: + return current + return None + + +def repo_path_for(url): + return os.path.join(REPO_BASE, url + ".git") + + +def build_repo_block(url, owner): + return [ + f"repo.url={url}\n", + f"repo.path={repo_path_for(url)}\n", + f"repo.owner={owner}\n", + ] + + +def insert_repo_block(lines, section, block): + """Insert block at the end of `section`'s span. Raise KeyError if absent.""" + start = None + for i, line in enumerate(lines): + m = SECTION_RE.match(line.rstrip("\n")) + if m and m.group(1) == section: + start = i + break + if start is None: + raise KeyError(section) + # end of span = next section= line OR next banner comment (a banner belongs + # to the section it precedes), or EOF + end = len(lines) + for j in range(start + 1, len(lines)): + stripped = lines[j].rstrip("\n") + if SECTION_RE.match(stripped) or BANNER_RE.match(stripped): + end = j + break + # trim trailing blank lines inside the span so we control spacing + insert_at = end + while insert_at > start + 1 and lines[insert_at - 1].strip() == "": + insert_at -= 1 + payload = ["\n"] + block + ["\n"] + return lines[:insert_at] + payload + lines[insert_at:] + + +def write_cgitrc_lines(path, lines, backup_dir=None): + """Back up then write `path` in place. Not atomic: the helper runs as the + git user, which owns /etc/cgitrc but cannot create temp inodes in /etc, so + an os.replace rename is impossible. The pre-write backup makes a truncated + write (crash mid-write) recoverable. backup_dir defaults to the file's own + directory; for the real /etc/cgitrc the caller passes BACKUP_DIR, since git + cannot create files in /etc.""" + if os.path.exists(path): + bdir = backup_dir or os.path.dirname(os.path.abspath(path)) + os.makedirs(bdir, exist_ok=True) + ts = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") + shutil.copy2(path, os.path.join(bdir, os.path.basename(path) + ".bak." + ts)) + with open(path, "w") as f: # in place, preserves ownership + f.writelines(lines) + + +def ensure_section(cgitrc, name, backup_dir=None): + """Append a section banner+header if absent. Return True if added.""" + with open(cgitrc) as f: + lines = f.readlines() + if name in list_sections_from(lines): + return False + addition = ["\n", BANNER.format(name=name) + "\n", f"section={name}\n"] + write_cgitrc_lines(cgitrc, lines + addition, backup_dir) + return True + + +def do_add_repo(cgitrc, url, section, owner, run_sync, backup_dir=None): + with open(cgitrc) as f: + lines = f.readlines() + existing = find_repo_section(lines, url) + if existing is not None: + if existing == section: + print(f"repo {url} already present under section {section}") + return 0 + print(f"error: repo {url} already exists under section {existing!r}, " + f"refusing to move it to {section!r}", file=sys.stderr) + return 1 + if section not in list_sections_from(lines): + print(f"error: section {section!r} does not exist; run add-section first", + file=sys.stderr) + return 1 + new = insert_repo_block(lines, section, build_repo_block(url, owner)) + write_cgitrc_lines(cgitrc, new, backup_dir) + print(f"added repo {url} under section {section}") + run_sync() + return 0 + + +def do_set_desc(desc_path, desc, run_sync, url=None): + if not os.path.exists(desc_path): + what = f"repo {url!r} not found" if url else f"description file not found: {desc_path}" + print(f"error: {what}", file=sys.stderr) + return 1 + st = os.stat(desc_path) + with open(desc_path, "w") as f: # in place, preserves owner + f.write(desc.rstrip("\n") + "\n") + try: + os.chown(desc_path, st.st_uid, st.st_gid) # insurance + except PermissionError: + pass + print(f"wrote description to {desc_path}") + run_sync() + return 0 + + +def _self_test(): + assert validate_url("publisher") == "publisher" + assert validate_url("a.b_c-1") == "a.b_c-1" + for bad in ["", "../etc", "a/b", "a b", "a;b", "a$b", "a\nb"]: + try: + validate_url(bad); assert False, bad + except SystemExit: + pass + assert validate_section("Generic Projects") == "Generic Projects" + for bad in ["", "a\nb"]: + try: + validate_section(bad); assert False, bad + except SystemExit: + pass + fixture = [ + "section=Generic Projects\n", + "repo.url=cad-projects\n", + "repo.path=/var/lib/gitolite3/repositories/cad-projects.git\n", + "\n", + "# ---------- SlackBuilds ------------#\n", + "section=SlackBuilds\n", + "repo.url=my-slackbuilds\n", + "repo.path=/var/lib/gitolite3/repositories/my-slackbuilds.git\n", + "\n", + ] + assert list_sections_from(fixture) == ["Generic Projects", "SlackBuilds"] + assert find_repo_section(fixture, "cad-projects") == "Generic Projects" + assert find_repo_section(fixture, "my-slackbuilds") == "SlackBuilds" + assert find_repo_section(fixture, "nope") is None + + block = build_repo_block("publisher", "by Your Name") + assert block == [ + "repo.url=publisher\n", + "repo.path=/var/lib/gitolite3/repositories/publisher.git\n", + "repo.owner=by Your Name\n", + ] + + # insert under the FIRST section: must land before the SlackBuilds banner, + # NOT between the banner and its section= line (the banner belongs to the + # section it precedes) + new = insert_repo_block(fixture, "Generic Projects", block) + text = "".join(new) + assert "repo.url=publisher" in text + # publisher must precede the SlackBuilds banner (and thus its section header) + assert text.index("repo.url=publisher") < text.index("# ---------- SlackBuilds") + # original repo still there + assert "repo.url=cad-projects" in text + + # insert under the LAST section: lands before EOF + new2 = insert_repo_block(fixture, "SlackBuilds", block) + assert "repo.url=publisher" in "".join(new2) + + # missing section raises + try: + insert_repo_block(fixture, "Nope", block); assert False + except KeyError: + pass + + import tempfile as _tf + d = _tf.mkdtemp() + p = os.path.join(d, "cgitrc") + open(p, "w").write("section=X\nrepo.url=a\n") + write_cgitrc_lines(p, ["section=X\n", "repo.url=a\n", "repo.url=b\n"], backup_dir=d) + assert open(p).read() == "section=X\nrepo.url=a\nrepo.url=b\n" + baks = [f for f in os.listdir(d) if f.startswith("cgitrc.bak.")] + assert len(baks) == 1, baks + + p2 = os.path.join(d, "cgitrc2") + open(p2, "w").write("section=Existing\nrepo.url=z\n") + added = ensure_section(p2, "NewSec") + assert added is True + txt = open(p2).read() + assert "section=NewSec" in txt + assert "# ---------- NewSec ------------#" in txt + # idempotent: second call is a no-op + added2 = ensure_section(p2, "NewSec") + assert added2 is False + assert open(p2).read().count("section=NewSec") == 1 + + p3 = os.path.join(d, "cgitrc3") + open(p3, "w").writelines([ + "# ---------- Generic ------------#\n", + "section=Generic\n", + "repo.url=cad\n", + "repo.path=/var/lib/gitolite3/repositories/cad.git\n", + "\n", + "# ---------- SlackBuilds ------------#\n", + "section=SlackBuilds\n", + "repo.url=sps\n", + "repo.path=/var/lib/gitolite3/repositories/sps.git\n", + "\n", + ]) + calls = [] + rc = do_add_repo(p3, "publisher", "Generic", "by Your Name", + run_sync=lambda: calls.append(1)) + assert rc == 0 + t = open(p3).read() + assert t.index("repo.url=publisher") < t.index("section=SlackBuilds") + assert calls == [1] # sync ran after insert + + # idempotent: re-add same url under same section -> no duplicate, success, no sync + calls.clear() + rc = do_add_repo(p3, "publisher", "Generic", "x", run_sync=lambda: calls.append(1)) + assert rc == 0 + assert open(p3).read().count("repo.url=publisher") == 1 + assert calls == [] + + # exists under a DIFFERENT section -> report, non-zero, no write + rc = do_add_repo(p3, "publisher", "SlackBuilds", "x", run_sync=lambda: calls.append(1)) + assert rc != 0 + + # missing section -> non-zero + rc = do_add_repo(p3, "newrepo", "Nope", "x", run_sync=lambda: calls.append(1)) + assert rc != 0 + + repo_dir = os.path.join(d, "publisher.git") + os.makedirs(repo_dir) + desc_path = os.path.join(repo_dir, "description") + open(desc_path, "w").write("Unnamed repository; edit this file ...\n") + calls.clear() + rc = do_set_desc(desc_path, "my real description", + run_sync=lambda: calls.append(1)) + assert rc == 0 + assert open(desc_path).read() == "my real description\n" + assert calls == [1] + + # missing repo -> non-zero, no sync + calls.clear() + rc = do_set_desc(os.path.join(d, "ghost.git", "description"), "x", + run_sync=lambda: calls.append(1), url="ghost") + assert rc != 0 + assert calls == [] + + print("self-test OK") + + +def run_sync(): + r = subprocess.run([sys.executable, SYNC_SCRIPT]) + if r.returncode != 0: + die(f"error: sync exited {r.returncode}", r.returncode) + + +def main(): + raw = os.environ.get("SSH_ORIGINAL_COMMAND") + argv = shlex.split(raw) if raw else sys.argv[1:] + if argv and argv[0] == "--self-test": + _self_test(); return 0 + + p = argparse.ArgumentParser( + prog="gitctl-helper", + description="Server helper for gitctl, run as the git user. Normally invoked by the " + "client over a command=-restricted SSH key, which forces this " + "script and passes the verb plus arguments in SSH_ORIGINAL_COMMAND. " + "It is the single privileged writer of /etc/cgitrc and of each bare " + "repo's description file. Can also be run directly for local " + "testing; pass --self-test to run the built-in assertions.") + sub = p.add_subparsers(dest="verb", required=True, + help="server-side operation to perform") + + sub.add_parser("list-sections", + help="print every cgit section name, one per line", + description="Read /etc/cgitrc and print each section= name in file order.") + + sp = sub.add_parser("add-section", + help="append a new cgit section header if absent", + description="Append a banner and section= header to /etc/cgitrc. " + "Idempotent: a section that already exists is left " + "untouched. Prints 'added' or 'exists'.") + sp.add_argument("--name", required=True, + help="section title as shown in cgit, e.g. 'Generic Projects'") + + ar = sub.add_parser("add-repo", + help="insert a repo block under a section in cgitrc, then sync", + description="Insert a repo.url/path/owner block at the end of the " + "named section's span in /etc/cgitrc (positionally, " + "before the next section or EOF), then run the cgit " + "description sync. Idempotent under the same section; " + "refuses to move a repo that already exists under a " + "different section.") + ar.add_argument("--url", required=True, + help="repo name as it appears in cgit and on disk (becomes <url>.git)") + ar.add_argument("--section", required=True, + help="existing section to file the repo under; create it first with add-section") + ar.add_argument("--owner", default=DEFAULT_OWNER, + help="repo.owner string shown in cgit (default: %(default)r)") + + sd = sub.add_parser("set-desc", + help="write a bare repo's description file, then sync", + description="Write the bare repo's description file in place " + "(preserving its git:gitolite3 ownership) and run the " + "sync script. This is the only supported way to change " + "a cgit repo.desc=; it is never written directly.") + sd.add_argument("--url", required=True, help="repo name (the bare repo is <url>.git)") + sd.add_argument("--desc", required=True, help="description text (a single trailing newline is enforced)") + + sub.add_parser("sync", + help="run the cgit description sync script", + description="Run sync-cgit-descs.py, which copies each bare repo's " + "description into cgit's repo.desc=. The single writer of repo.desc=.") + + re_ = sub.add_parser("repo-exists", + help="exit 0 if the bare repo exists, 1 otherwise", + description="Check whether <url>.git exists under the repository " + "base. Used by the client to poll after a gitolite " + "push until gitolite has built the bare repo.") + re_.add_argument("--url", required=True, help="repo name to test for (as <url>.git)") + + a = p.parse_args(argv) + + if a.verb == "list-sections": + with open(CGITRC) as f: + for s in list_sections_from(f.readlines()): + print(s) + return 0 + if a.verb == "add-section": + validate_section(a.name) + print("added" if ensure_section(CGITRC, a.name, BACKUP_DIR) else "exists") + return 0 + if a.verb == "add-repo": + validate_url(a.url); validate_section(a.section) + return do_add_repo(CGITRC, a.url, a.section, a.owner, run_sync, BACKUP_DIR) + if a.verb == "set-desc": + validate_url(a.url) + return do_set_desc(os.path.join(repo_path_for(a.url), "description"), + a.desc, run_sync, url=a.url) + if a.verb == "sync": + run_sync(); return 0 + if a.verb == "repo-exists": + validate_url(a.url) + return 0 if os.path.isdir(repo_path_for(a.url)) else 1 + return 1 + + +if __name__ == "__main__": + if len(sys.argv) > 1 and sys.argv[1] == "--self-test" and not os.environ.get("SSH_ORIGINAL_COMMAND"): + _self_test(); sys.exit(0) + sys.exit(main()) |
