#!/usr/bin/env python3
#
# gitctl-helper: server helper for gitctl, run as the git user.
# Copyright (C) 2026 Danilo M. <danix@danix.xyz>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""gitctl-helper: server helper for gitctl, run as the git user. Dispatches on SSH_ORIGINAL_COMMAND."""
import os, re, sys, shlex, shutil, subprocess, argparse, datetime

REPO_BASE   = "/var/lib/gitolite3/repositories"
CGITRC      = "/etc/cgitrc"
SYNC_SCRIPT = "/usr/local/bin/sync-cgit-descs.py"
# Backups go here, not next to CGITRC: the helper runs as the git user, which
# owns /etc/cgitrc (the file) but cannot create new files in /etc. Must be a
# git-writable directory.
BACKUP_DIR  = "/var/lib/gitolite3/cgitrc-backups"
DEFAULT_OWNER = "by Your Name"
BANNER = "# ---------- {name} ------------#"

URL_RE = re.compile(r"^[A-Za-z0-9._-]+$")


def die(msg, code=1):
    print(msg, file=sys.stderr)
    sys.exit(code)


def validate_url(u):
    if not u or ".." in u or not URL_RE.match(u):
        die(f"error: invalid repo name: {u!r}")
    return u


def validate_section(s):
    if not s or "\n" in s or "\r" in s:
        die(f"error: invalid section name: {s!r}")
    return s


SECTION_RE = re.compile(r"^section=(.*)$")
URL_LINE_RE = re.compile(r"^repo\.url=(.+)$")
# a section banner comment, e.g. "# ---------- Linux ------------#". It belongs
# to the section below it, so it bounds the preceding section's span.
BANNER_RE = re.compile(r"^#\s*-{3,}.*-{3,}#\s*$")


def list_sections_from(lines):
    out = []
    for line in lines:
        m = SECTION_RE.match(line.rstrip("\n"))
        if m:
            out.append(m.group(1))
    return out


def find_repo_section(lines, url):
    """Return the section a repo.url= belongs to, or None if absent."""
    current = None
    for line in lines:
        s = SECTION_RE.match(line.rstrip("\n"))
        if s:
            current = s.group(1)
            continue
        u = URL_LINE_RE.match(line.rstrip("\n"))
        if u and u.group(1) == url:
            return current
    return None


def repo_path_for(url):
    return os.path.join(REPO_BASE, url + ".git")


def build_repo_block(url, owner):
    return [
        f"repo.url={url}\n",
        f"repo.path={repo_path_for(url)}\n",
        f"repo.owner={owner}\n",
    ]


def insert_repo_block(lines, section, block):
    """Insert block at the end of `section`'s span. Raise KeyError if absent."""
    start = None
    for i, line in enumerate(lines):
        m = SECTION_RE.match(line.rstrip("\n"))
        if m and m.group(1) == section:
            start = i
            break
    if start is None:
        raise KeyError(section)
    # end of span = next section= line OR next banner comment (a banner belongs
    # to the section it precedes), or EOF
    end = len(lines)
    for j in range(start + 1, len(lines)):
        stripped = lines[j].rstrip("\n")
        if SECTION_RE.match(stripped) or BANNER_RE.match(stripped):
            end = j
            break
    # trim trailing blank lines inside the span so we control spacing
    insert_at = end
    while insert_at > start + 1 and lines[insert_at - 1].strip() == "":
        insert_at -= 1
    payload = ["\n"] + block + ["\n"]
    return lines[:insert_at] + payload + lines[insert_at:]


def write_cgitrc_lines(path, lines, backup_dir=None):
    """Back up then write `path` in place. Not atomic: the helper runs as the
    git user, which owns /etc/cgitrc but cannot create temp inodes in /etc, so
    an os.replace rename is impossible. The pre-write backup makes a truncated
    write (crash mid-write) recoverable. backup_dir defaults to the file's own
    directory; for the real /etc/cgitrc the caller passes BACKUP_DIR, since git
    cannot create files in /etc."""
    if os.path.exists(path):
        bdir = backup_dir or os.path.dirname(os.path.abspath(path))
        os.makedirs(bdir, exist_ok=True)
        ts = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
        shutil.copy2(path, os.path.join(bdir, os.path.basename(path) + ".bak." + ts))
    with open(path, "w") as f:        # in place, preserves ownership
        f.writelines(lines)


def ensure_section(cgitrc, name, backup_dir=None):
    """Append a section banner+header if absent. Return True if added."""
    with open(cgitrc) as f:
        lines = f.readlines()
    if name in list_sections_from(lines):
        return False
    addition = ["\n", BANNER.format(name=name) + "\n", f"section={name}\n"]
    write_cgitrc_lines(cgitrc, lines + addition, backup_dir)
    return True


def do_add_repo(cgitrc, url, section, owner, run_sync, backup_dir=None):
    with open(cgitrc) as f:
        lines = f.readlines()
    existing = find_repo_section(lines, url)
    if existing is not None:
        if existing == section:
            print(f"repo {url} already present under section {section}")
            return 0
        print(f"error: repo {url} already exists under section {existing!r}, "
              f"refusing to move it to {section!r}", file=sys.stderr)
        return 1
    if section not in list_sections_from(lines):
        print(f"error: section {section!r} does not exist; run add-section first",
              file=sys.stderr)
        return 1
    new = insert_repo_block(lines, section, build_repo_block(url, owner))
    write_cgitrc_lines(cgitrc, new, backup_dir)
    print(f"added repo {url} under section {section}")
    run_sync()
    return 0


def do_set_desc(desc_path, desc, run_sync, url=None):
    if not os.path.exists(desc_path):
        what = f"repo {url!r} not found" if url else f"description file not found: {desc_path}"
        print(f"error: {what}", file=sys.stderr)
        return 1
    st = os.stat(desc_path)
    with open(desc_path, "w") as f:        # in place, preserves owner
        f.write(desc.rstrip("\n") + "\n")
    try:
        os.chown(desc_path, st.st_uid, st.st_gid)  # insurance
    except PermissionError:
        pass
    print(f"wrote description to {desc_path}")
    run_sync()
    return 0


def _self_test():
    assert validate_url("publisher") == "publisher"
    assert validate_url("a.b_c-1") == "a.b_c-1"
    for bad in ["", "../etc", "a/b", "a b", "a;b", "a$b", "a\nb"]:
        try:
            validate_url(bad); assert False, bad
        except SystemExit:
            pass
    assert validate_section("Generic Projects") == "Generic Projects"
    for bad in ["", "a\nb"]:
        try:
            validate_section(bad); assert False, bad
        except SystemExit:
            pass
    fixture = [
        "section=Generic Projects\n",
        "repo.url=cad-projects\n",
        "repo.path=/var/lib/gitolite3/repositories/cad-projects.git\n",
        "\n",
        "# ---------- SlackBuilds ------------#\n",
        "section=SlackBuilds\n",
        "repo.url=my-slackbuilds\n",
        "repo.path=/var/lib/gitolite3/repositories/my-slackbuilds.git\n",
        "\n",
    ]
    assert list_sections_from(fixture) == ["Generic Projects", "SlackBuilds"]
    assert find_repo_section(fixture, "cad-projects") == "Generic Projects"
    assert find_repo_section(fixture, "my-slackbuilds") == "SlackBuilds"
    assert find_repo_section(fixture, "nope") is None

    block = build_repo_block("publisher", "by Your Name")
    assert block == [
        "repo.url=publisher\n",
        "repo.path=/var/lib/gitolite3/repositories/publisher.git\n",
        "repo.owner=by Your Name\n",
    ]

    # insert under the FIRST section: must land before the SlackBuilds banner,
    # NOT between the banner and its section= line (the banner belongs to the
    # section it precedes)
    new = insert_repo_block(fixture, "Generic Projects", block)
    text = "".join(new)
    assert "repo.url=publisher" in text
    # publisher must precede the SlackBuilds banner (and thus its section header)
    assert text.index("repo.url=publisher") < text.index("# ---------- SlackBuilds")
    # original repo still there
    assert "repo.url=cad-projects" in text

    # insert under the LAST section: lands before EOF
    new2 = insert_repo_block(fixture, "SlackBuilds", block)
    assert "repo.url=publisher" in "".join(new2)

    # missing section raises
    try:
        insert_repo_block(fixture, "Nope", block); assert False
    except KeyError:
        pass

    import tempfile as _tf
    d = _tf.mkdtemp()
    p = os.path.join(d, "cgitrc")
    open(p, "w").write("section=X\nrepo.url=a\n")
    write_cgitrc_lines(p, ["section=X\n", "repo.url=a\n", "repo.url=b\n"], backup_dir=d)
    assert open(p).read() == "section=X\nrepo.url=a\nrepo.url=b\n"
    baks = [f for f in os.listdir(d) if f.startswith("cgitrc.bak.")]
    assert len(baks) == 1, baks

    p2 = os.path.join(d, "cgitrc2")
    open(p2, "w").write("section=Existing\nrepo.url=z\n")
    added = ensure_section(p2, "NewSec")
    assert added is True
    txt = open(p2).read()
    assert "section=NewSec" in txt
    assert "# ---------- NewSec ------------#" in txt
    # idempotent: second call is a no-op
    added2 = ensure_section(p2, "NewSec")
    assert added2 is False
    assert open(p2).read().count("section=NewSec") == 1

    p3 = os.path.join(d, "cgitrc3")
    open(p3, "w").writelines([
        "# ---------- Generic ------------#\n",
        "section=Generic\n",
        "repo.url=cad\n",
        "repo.path=/var/lib/gitolite3/repositories/cad.git\n",
        "\n",
        "# ---------- SlackBuilds ------------#\n",
        "section=SlackBuilds\n",
        "repo.url=sps\n",
        "repo.path=/var/lib/gitolite3/repositories/sps.git\n",
        "\n",
    ])
    calls = []
    rc = do_add_repo(p3, "publisher", "Generic", "by Your Name",
                     run_sync=lambda: calls.append(1))
    assert rc == 0
    t = open(p3).read()
    assert t.index("repo.url=publisher") < t.index("section=SlackBuilds")
    assert calls == [1]  # sync ran after insert

    # idempotent: re-add same url under same section -> no duplicate, success, no sync
    calls.clear()
    rc = do_add_repo(p3, "publisher", "Generic", "x", run_sync=lambda: calls.append(1))
    assert rc == 0
    assert open(p3).read().count("repo.url=publisher") == 1
    assert calls == []

    # exists under a DIFFERENT section -> report, non-zero, no write
    rc = do_add_repo(p3, "publisher", "SlackBuilds", "x", run_sync=lambda: calls.append(1))
    assert rc != 0

    # missing section -> non-zero
    rc = do_add_repo(p3, "newrepo", "Nope", "x", run_sync=lambda: calls.append(1))
    assert rc != 0

    repo_dir = os.path.join(d, "publisher.git")
    os.makedirs(repo_dir)
    desc_path = os.path.join(repo_dir, "description")
    open(desc_path, "w").write("Unnamed repository; edit this file ...\n")
    calls.clear()
    rc = do_set_desc(desc_path, "my real description",
                     run_sync=lambda: calls.append(1))
    assert rc == 0
    assert open(desc_path).read() == "my real description\n"
    assert calls == [1]

    # missing repo -> non-zero, no sync
    calls.clear()
    rc = do_set_desc(os.path.join(d, "ghost.git", "description"), "x",
                     run_sync=lambda: calls.append(1), url="ghost")
    assert rc != 0
    assert calls == []

    print("self-test OK")


def run_sync():
    r = subprocess.run([sys.executable, SYNC_SCRIPT])
    if r.returncode != 0:
        die(f"error: sync exited {r.returncode}", r.returncode)


def main():
    raw = os.environ.get("SSH_ORIGINAL_COMMAND")
    argv = shlex.split(raw) if raw else sys.argv[1:]
    if argv and argv[0] == "--self-test":
        _self_test(); return 0

    p = argparse.ArgumentParser(
        prog="gitctl-helper",
        description="Server helper for gitctl, run as the git user. Normally invoked by the "
                    "client over a command=-restricted SSH key, which forces this "
                    "script and passes the verb plus arguments in SSH_ORIGINAL_COMMAND. "
                    "It is the single privileged writer of /etc/cgitrc and of each bare "
                    "repo's description file. Can also be run directly for local "
                    "testing; pass --self-test to run the built-in assertions.")
    sub = p.add_subparsers(dest="verb", required=True,
                           help="server-side operation to perform")

    sub.add_parser("list-sections",
                   help="print every cgit section name, one per line",
                   description="Read /etc/cgitrc and print each section= name in file order.")

    sp = sub.add_parser("add-section",
                        help="append a new cgit section header if absent",
                        description="Append a banner and section= header to /etc/cgitrc. "
                                    "Idempotent: a section that already exists is left "
                                    "untouched. Prints 'added' or 'exists'.")
    sp.add_argument("--name", required=True,
                    help="section title as shown in cgit, e.g. 'Generic Projects'")

    ar = sub.add_parser("add-repo",
                        help="insert a repo block under a section in cgitrc, then sync",
                        description="Insert a repo.url/path/owner block at the end of the "
                                    "named section's span in /etc/cgitrc (positionally, "
                                    "before the next section or EOF), then run the cgit "
                                    "description sync. Idempotent under the same section; "
                                    "refuses to move a repo that already exists under a "
                                    "different section.")
    ar.add_argument("--url", required=True,
                    help="repo name as it appears in cgit and on disk (becomes <url>.git)")
    ar.add_argument("--section", required=True,
                    help="existing section to file the repo under; create it first with add-section")
    ar.add_argument("--owner", default=DEFAULT_OWNER,
                    help="repo.owner string shown in cgit (default: %(default)r)")

    sd = sub.add_parser("set-desc",
                        help="write a bare repo's description file, then sync",
                        description="Write the bare repo's description file in place "
                                    "(preserving its git:gitolite3 ownership) and run the "
                                    "sync script. This is the only supported way to change "
                                    "a cgit repo.desc=; it is never written directly.")
    sd.add_argument("--url", required=True, help="repo name (the bare repo is <url>.git)")
    sd.add_argument("--desc", required=True, help="description text (a single trailing newline is enforced)")

    sub.add_parser("sync",
                   help="run the cgit description sync script",
                   description="Run sync-cgit-descs.py, which copies each bare repo's "
                               "description into cgit's repo.desc=. The single writer of repo.desc=.")

    re_ = sub.add_parser("repo-exists",
                         help="exit 0 if the bare repo exists, 1 otherwise",
                         description="Check whether <url>.git exists under the repository "
                                     "base. Used by the client to poll after a gitolite "
                                     "push until gitolite has built the bare repo.")
    re_.add_argument("--url", required=True, help="repo name to test for (as <url>.git)")

    a = p.parse_args(argv)

    if a.verb == "list-sections":
        with open(CGITRC) as f:
            for s in list_sections_from(f.readlines()):
                print(s)
        return 0
    if a.verb == "add-section":
        validate_section(a.name)
        print("added" if ensure_section(CGITRC, a.name, BACKUP_DIR) else "exists")
        return 0
    if a.verb == "add-repo":
        validate_url(a.url); validate_section(a.section)
        return do_add_repo(CGITRC, a.url, a.section, a.owner, run_sync, BACKUP_DIR)
    if a.verb == "set-desc":
        validate_url(a.url)
        return do_set_desc(os.path.join(repo_path_for(a.url), "description"),
                           a.desc, run_sync, url=a.url)
    if a.verb == "sync":
        run_sync(); return 0
    if a.verb == "repo-exists":
        validate_url(a.url)
        return 0 if os.path.isdir(repo_path_for(a.url)) else 1
    return 1


if __name__ == "__main__":
    if len(sys.argv) > 1 and sys.argv[1] == "--self-test" and not os.environ.get("SSH_ORIGINAL_COMMAND"):
        _self_test(); sys.exit(0)
    sys.exit(main())
