aboutsummaryrefslogtreecommitdiffstats
path: root/gitctl-helper
diff options
context:
space:
mode:
Diffstat (limited to 'gitctl-helper')
-rwxr-xr-xgitctl-helper422
1 files changed, 422 insertions, 0 deletions
diff --git a/gitctl-helper b/gitctl-helper
new file mode 100755
index 0000000..8cdf460
--- /dev/null
+++ b/gitctl-helper
@@ -0,0 +1,422 @@
+#!/usr/bin/env python3
+#
+# gitctl-helper: server helper for gitctl, run as the git user.
+# Copyright (C) 2026 Danilo M. <danix@danix.xyz>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+"""gitctl-helper: server helper for gitctl, run as the git user. Dispatches on SSH_ORIGINAL_COMMAND."""
+import os, re, sys, shlex, shutil, subprocess, argparse, datetime
+
+REPO_BASE = "/var/lib/gitolite3/repositories"
+CGITRC = "/etc/cgitrc"
+SYNC_SCRIPT = "/usr/local/bin/sync-cgit-descs.py"
+# Backups go here, not next to CGITRC: the helper runs as the git user, which
+# owns /etc/cgitrc (the file) but cannot create new files in /etc. Must be a
+# git-writable directory.
+BACKUP_DIR = "/var/lib/gitolite3/cgitrc-backups"
+DEFAULT_OWNER = "by Your Name"
+BANNER = "# ---------- {name} ------------#"
+
+URL_RE = re.compile(r"^[A-Za-z0-9._-]+$")
+
+
+def die(msg, code=1):
+ print(msg, file=sys.stderr)
+ sys.exit(code)
+
+
+def validate_url(u):
+ if not u or ".." in u or not URL_RE.match(u):
+ die(f"error: invalid repo name: {u!r}")
+ return u
+
+
+def validate_section(s):
+ if not s or "\n" in s or "\r" in s:
+ die(f"error: invalid section name: {s!r}")
+ return s
+
+
+SECTION_RE = re.compile(r"^section=(.*)$")
+URL_LINE_RE = re.compile(r"^repo\.url=(.+)$")
+# a section banner comment, e.g. "# ---------- Linux ------------#". It belongs
+# to the section below it, so it bounds the preceding section's span.
+BANNER_RE = re.compile(r"^#\s*-{3,}.*-{3,}#\s*$")
+
+
+def list_sections_from(lines):
+ out = []
+ for line in lines:
+ m = SECTION_RE.match(line.rstrip("\n"))
+ if m:
+ out.append(m.group(1))
+ return out
+
+
+def find_repo_section(lines, url):
+ """Return the section a repo.url= belongs to, or None if absent."""
+ current = None
+ for line in lines:
+ s = SECTION_RE.match(line.rstrip("\n"))
+ if s:
+ current = s.group(1)
+ continue
+ u = URL_LINE_RE.match(line.rstrip("\n"))
+ if u and u.group(1) == url:
+ return current
+ return None
+
+
+def repo_path_for(url):
+ return os.path.join(REPO_BASE, url + ".git")
+
+
+def build_repo_block(url, owner):
+ return [
+ f"repo.url={url}\n",
+ f"repo.path={repo_path_for(url)}\n",
+ f"repo.owner={owner}\n",
+ ]
+
+
+def insert_repo_block(lines, section, block):
+ """Insert block at the end of `section`'s span. Raise KeyError if absent."""
+ start = None
+ for i, line in enumerate(lines):
+ m = SECTION_RE.match(line.rstrip("\n"))
+ if m and m.group(1) == section:
+ start = i
+ break
+ if start is None:
+ raise KeyError(section)
+ # end of span = next section= line OR next banner comment (a banner belongs
+ # to the section it precedes), or EOF
+ end = len(lines)
+ for j in range(start + 1, len(lines)):
+ stripped = lines[j].rstrip("\n")
+ if SECTION_RE.match(stripped) or BANNER_RE.match(stripped):
+ end = j
+ break
+ # trim trailing blank lines inside the span so we control spacing
+ insert_at = end
+ while insert_at > start + 1 and lines[insert_at - 1].strip() == "":
+ insert_at -= 1
+ payload = ["\n"] + block + ["\n"]
+ return lines[:insert_at] + payload + lines[insert_at:]
+
+
+def write_cgitrc_lines(path, lines, backup_dir=None):
+ """Back up then write `path` in place. Not atomic: the helper runs as the
+ git user, which owns /etc/cgitrc but cannot create temp inodes in /etc, so
+ an os.replace rename is impossible. The pre-write backup makes a truncated
+ write (crash mid-write) recoverable. backup_dir defaults to the file's own
+ directory; for the real /etc/cgitrc the caller passes BACKUP_DIR, since git
+ cannot create files in /etc."""
+ if os.path.exists(path):
+ bdir = backup_dir or os.path.dirname(os.path.abspath(path))
+ os.makedirs(bdir, exist_ok=True)
+ ts = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
+ shutil.copy2(path, os.path.join(bdir, os.path.basename(path) + ".bak." + ts))
+ with open(path, "w") as f: # in place, preserves ownership
+ f.writelines(lines)
+
+
+def ensure_section(cgitrc, name, backup_dir=None):
+ """Append a section banner+header if absent. Return True if added."""
+ with open(cgitrc) as f:
+ lines = f.readlines()
+ if name in list_sections_from(lines):
+ return False
+ addition = ["\n", BANNER.format(name=name) + "\n", f"section={name}\n"]
+ write_cgitrc_lines(cgitrc, lines + addition, backup_dir)
+ return True
+
+
+def do_add_repo(cgitrc, url, section, owner, run_sync, backup_dir=None):
+ with open(cgitrc) as f:
+ lines = f.readlines()
+ existing = find_repo_section(lines, url)
+ if existing is not None:
+ if existing == section:
+ print(f"repo {url} already present under section {section}")
+ return 0
+ print(f"error: repo {url} already exists under section {existing!r}, "
+ f"refusing to move it to {section!r}", file=sys.stderr)
+ return 1
+ if section not in list_sections_from(lines):
+ print(f"error: section {section!r} does not exist; run add-section first",
+ file=sys.stderr)
+ return 1
+ new = insert_repo_block(lines, section, build_repo_block(url, owner))
+ write_cgitrc_lines(cgitrc, new, backup_dir)
+ print(f"added repo {url} under section {section}")
+ run_sync()
+ return 0
+
+
+def do_set_desc(desc_path, desc, run_sync, url=None):
+ if not os.path.exists(desc_path):
+ what = f"repo {url!r} not found" if url else f"description file not found: {desc_path}"
+ print(f"error: {what}", file=sys.stderr)
+ return 1
+ st = os.stat(desc_path)
+ with open(desc_path, "w") as f: # in place, preserves owner
+ f.write(desc.rstrip("\n") + "\n")
+ try:
+ os.chown(desc_path, st.st_uid, st.st_gid) # insurance
+ except PermissionError:
+ pass
+ print(f"wrote description to {desc_path}")
+ run_sync()
+ return 0
+
+
+def _self_test():
+ assert validate_url("publisher") == "publisher"
+ assert validate_url("a.b_c-1") == "a.b_c-1"
+ for bad in ["", "../etc", "a/b", "a b", "a;b", "a$b", "a\nb"]:
+ try:
+ validate_url(bad); assert False, bad
+ except SystemExit:
+ pass
+ assert validate_section("Generic Projects") == "Generic Projects"
+ for bad in ["", "a\nb"]:
+ try:
+ validate_section(bad); assert False, bad
+ except SystemExit:
+ pass
+ fixture = [
+ "section=Generic Projects\n",
+ "repo.url=cad-projects\n",
+ "repo.path=/var/lib/gitolite3/repositories/cad-projects.git\n",
+ "\n",
+ "# ---------- SlackBuilds ------------#\n",
+ "section=SlackBuilds\n",
+ "repo.url=my-slackbuilds\n",
+ "repo.path=/var/lib/gitolite3/repositories/my-slackbuilds.git\n",
+ "\n",
+ ]
+ assert list_sections_from(fixture) == ["Generic Projects", "SlackBuilds"]
+ assert find_repo_section(fixture, "cad-projects") == "Generic Projects"
+ assert find_repo_section(fixture, "my-slackbuilds") == "SlackBuilds"
+ assert find_repo_section(fixture, "nope") is None
+
+ block = build_repo_block("publisher", "by Your Name")
+ assert block == [
+ "repo.url=publisher\n",
+ "repo.path=/var/lib/gitolite3/repositories/publisher.git\n",
+ "repo.owner=by Your Name\n",
+ ]
+
+ # insert under the FIRST section: must land before the SlackBuilds banner,
+ # NOT between the banner and its section= line (the banner belongs to the
+ # section it precedes)
+ new = insert_repo_block(fixture, "Generic Projects", block)
+ text = "".join(new)
+ assert "repo.url=publisher" in text
+ # publisher must precede the SlackBuilds banner (and thus its section header)
+ assert text.index("repo.url=publisher") < text.index("# ---------- SlackBuilds")
+ # original repo still there
+ assert "repo.url=cad-projects" in text
+
+ # insert under the LAST section: lands before EOF
+ new2 = insert_repo_block(fixture, "SlackBuilds", block)
+ assert "repo.url=publisher" in "".join(new2)
+
+ # missing section raises
+ try:
+ insert_repo_block(fixture, "Nope", block); assert False
+ except KeyError:
+ pass
+
+ import tempfile as _tf
+ d = _tf.mkdtemp()
+ p = os.path.join(d, "cgitrc")
+ open(p, "w").write("section=X\nrepo.url=a\n")
+ write_cgitrc_lines(p, ["section=X\n", "repo.url=a\n", "repo.url=b\n"], backup_dir=d)
+ assert open(p).read() == "section=X\nrepo.url=a\nrepo.url=b\n"
+ baks = [f for f in os.listdir(d) if f.startswith("cgitrc.bak.")]
+ assert len(baks) == 1, baks
+
+ p2 = os.path.join(d, "cgitrc2")
+ open(p2, "w").write("section=Existing\nrepo.url=z\n")
+ added = ensure_section(p2, "NewSec")
+ assert added is True
+ txt = open(p2).read()
+ assert "section=NewSec" in txt
+ assert "# ---------- NewSec ------------#" in txt
+ # idempotent: second call is a no-op
+ added2 = ensure_section(p2, "NewSec")
+ assert added2 is False
+ assert open(p2).read().count("section=NewSec") == 1
+
+ p3 = os.path.join(d, "cgitrc3")
+ open(p3, "w").writelines([
+ "# ---------- Generic ------------#\n",
+ "section=Generic\n",
+ "repo.url=cad\n",
+ "repo.path=/var/lib/gitolite3/repositories/cad.git\n",
+ "\n",
+ "# ---------- SlackBuilds ------------#\n",
+ "section=SlackBuilds\n",
+ "repo.url=sps\n",
+ "repo.path=/var/lib/gitolite3/repositories/sps.git\n",
+ "\n",
+ ])
+ calls = []
+ rc = do_add_repo(p3, "publisher", "Generic", "by Your Name",
+ run_sync=lambda: calls.append(1))
+ assert rc == 0
+ t = open(p3).read()
+ assert t.index("repo.url=publisher") < t.index("section=SlackBuilds")
+ assert calls == [1] # sync ran after insert
+
+ # idempotent: re-add same url under same section -> no duplicate, success, no sync
+ calls.clear()
+ rc = do_add_repo(p3, "publisher", "Generic", "x", run_sync=lambda: calls.append(1))
+ assert rc == 0
+ assert open(p3).read().count("repo.url=publisher") == 1
+ assert calls == []
+
+ # exists under a DIFFERENT section -> report, non-zero, no write
+ rc = do_add_repo(p3, "publisher", "SlackBuilds", "x", run_sync=lambda: calls.append(1))
+ assert rc != 0
+
+ # missing section -> non-zero
+ rc = do_add_repo(p3, "newrepo", "Nope", "x", run_sync=lambda: calls.append(1))
+ assert rc != 0
+
+ repo_dir = os.path.join(d, "publisher.git")
+ os.makedirs(repo_dir)
+ desc_path = os.path.join(repo_dir, "description")
+ open(desc_path, "w").write("Unnamed repository; edit this file ...\n")
+ calls.clear()
+ rc = do_set_desc(desc_path, "my real description",
+ run_sync=lambda: calls.append(1))
+ assert rc == 0
+ assert open(desc_path).read() == "my real description\n"
+ assert calls == [1]
+
+ # missing repo -> non-zero, no sync
+ calls.clear()
+ rc = do_set_desc(os.path.join(d, "ghost.git", "description"), "x",
+ run_sync=lambda: calls.append(1), url="ghost")
+ assert rc != 0
+ assert calls == []
+
+ print("self-test OK")
+
+
+def run_sync():
+ r = subprocess.run([sys.executable, SYNC_SCRIPT])
+ if r.returncode != 0:
+ die(f"error: sync exited {r.returncode}", r.returncode)
+
+
+def main():
+ raw = os.environ.get("SSH_ORIGINAL_COMMAND")
+ argv = shlex.split(raw) if raw else sys.argv[1:]
+ if argv and argv[0] == "--self-test":
+ _self_test(); return 0
+
+ p = argparse.ArgumentParser(
+ prog="gitctl-helper",
+ description="Server helper for gitctl, run as the git user. Normally invoked by the "
+ "client over a command=-restricted SSH key, which forces this "
+ "script and passes the verb plus arguments in SSH_ORIGINAL_COMMAND. "
+ "It is the single privileged writer of /etc/cgitrc and of each bare "
+ "repo's description file. Can also be run directly for local "
+ "testing; pass --self-test to run the built-in assertions.")
+ sub = p.add_subparsers(dest="verb", required=True,
+ help="server-side operation to perform")
+
+ sub.add_parser("list-sections",
+ help="print every cgit section name, one per line",
+ description="Read /etc/cgitrc and print each section= name in file order.")
+
+ sp = sub.add_parser("add-section",
+ help="append a new cgit section header if absent",
+ description="Append a banner and section= header to /etc/cgitrc. "
+ "Idempotent: a section that already exists is left "
+ "untouched. Prints 'added' or 'exists'.")
+ sp.add_argument("--name", required=True,
+ help="section title as shown in cgit, e.g. 'Generic Projects'")
+
+ ar = sub.add_parser("add-repo",
+ help="insert a repo block under a section in cgitrc, then sync",
+ description="Insert a repo.url/path/owner block at the end of the "
+ "named section's span in /etc/cgitrc (positionally, "
+ "before the next section or EOF), then run the cgit "
+ "description sync. Idempotent under the same section; "
+ "refuses to move a repo that already exists under a "
+ "different section.")
+ ar.add_argument("--url", required=True,
+ help="repo name as it appears in cgit and on disk (becomes <url>.git)")
+ ar.add_argument("--section", required=True,
+ help="existing section to file the repo under; create it first with add-section")
+ ar.add_argument("--owner", default=DEFAULT_OWNER,
+ help="repo.owner string shown in cgit (default: %(default)r)")
+
+ sd = sub.add_parser("set-desc",
+ help="write a bare repo's description file, then sync",
+ description="Write the bare repo's description file in place "
+ "(preserving its git:gitolite3 ownership) and run the "
+ "sync script. This is the only supported way to change "
+ "a cgit repo.desc=; it is never written directly.")
+ sd.add_argument("--url", required=True, help="repo name (the bare repo is <url>.git)")
+ sd.add_argument("--desc", required=True, help="description text (a single trailing newline is enforced)")
+
+ sub.add_parser("sync",
+ help="run the cgit description sync script",
+ description="Run sync-cgit-descs.py, which copies each bare repo's "
+ "description into cgit's repo.desc=. The single writer of repo.desc=.")
+
+ re_ = sub.add_parser("repo-exists",
+ help="exit 0 if the bare repo exists, 1 otherwise",
+ description="Check whether <url>.git exists under the repository "
+ "base. Used by the client to poll after a gitolite "
+ "push until gitolite has built the bare repo.")
+ re_.add_argument("--url", required=True, help="repo name to test for (as <url>.git)")
+
+ a = p.parse_args(argv)
+
+ if a.verb == "list-sections":
+ with open(CGITRC) as f:
+ for s in list_sections_from(f.readlines()):
+ print(s)
+ return 0
+ if a.verb == "add-section":
+ validate_section(a.name)
+ print("added" if ensure_section(CGITRC, a.name, BACKUP_DIR) else "exists")
+ return 0
+ if a.verb == "add-repo":
+ validate_url(a.url); validate_section(a.section)
+ return do_add_repo(CGITRC, a.url, a.section, a.owner, run_sync, BACKUP_DIR)
+ if a.verb == "set-desc":
+ validate_url(a.url)
+ return do_set_desc(os.path.join(repo_path_for(a.url), "description"),
+ a.desc, run_sync, url=a.url)
+ if a.verb == "sync":
+ run_sync(); return 0
+ if a.verb == "repo-exists":
+ validate_url(a.url)
+ return 0 if os.path.isdir(repo_path_for(a.url)) else 1
+ return 1
+
+
+if __name__ == "__main__":
+ if len(sys.argv) > 1 and sys.argv[1] == "--self-test" and not os.environ.get("SSH_ORIGINAL_COMMAND"):
+ _self_test(); sys.exit(0)
+ sys.exit(main())