# : out dragons.py """Analyze developer allocation across a codebase.""" import argparse import datetime import logging import os import pathlib import re import subprocess import sys import typing def find_user(line: str) -> typing.Any: """ Find a person's name in a .mailmap file. Given 'Ben Sima ', finds `Ben Sima'. Returns the first matching string. """ return re.findall(r"^[^<]*", line)[0].strip() def authors_for( path: str, active_users: list[str], ) -> dict[str, str]: """ Return a dictionary of {author: commits} for given path. Usernames not in the 'active_users' list will be filtered out. """ raw = subprocess.check_output( ["git", "shortlog", "--numbered", "--summary", "--email", "--", path], ).decode("utf-8") lines = [s for s in raw.split("\n") if s] data = {} for line in lines: parts = line.strip().split("\t") author = find_user(parts[1]) commits = parts[0] if author in active_users: data[author] = commits return data def mailmap_users() -> list[str]: """Return users from the .mailmap file.""" with pathlib.Path(".mailmap").open() as file: lines = file.readlines() return [find_user(line) for line in lines] MAX_SCORE = 10 def score(blackhole: float, liability: float, good: int, total: int) -> float: """Calculate the score.""" weights = { "blackhole": 0.5, "liability": 0.7, } return ( MAX_SCORE * ( (blackhole * weights["blackhole"]) + (liability * weights["liability"]) + good ) / total ) def get_args() -> typing.Any: """Parse CLI arguments.""" cli = argparse.ArgumentParser(description=__doc__) cli.add_argument("test", action="store_true", help="run the test suite") cli.add_argument( "repo", default=".", help="the git repo to run on", metavar="REPO", ) cli.add_argument( "-b", "--blackholes", action="store_true", help="print the blackholes (files with 1 or 0 active contributors)", ) cli.add_argument( "-l", "--liabilities", action="store_true", help="print the liabilities (files with < 3 active contributors)", ) cli.add_argument( "-s", "--stale", action="store_true", help="print stale files (haven't been touched in 6 months)", ) cli.add_argument( "-i", "--ignored", nargs="+", default=[], help="patterns to ignore in paths", ) cli.add_argument( "--active-users", nargs="+", default=[], help="list of active user emails. default: loaded from .mailmap", ) cli.add_argument( "-v", "--verbosity", help="set the log level verbosity", choices=["debug", "warning", "error"], default="error", ) return cli.parse_args() def staleness(path: str, now: datetime.datetime) -> int: """How long has it been since this file was touched?.""" timestamp = datetime.datetime.strptime( subprocess.check_output(["git", "log", "-n1", "--pretty=%aI", path]) .decode("utf-8") .strip(), "%Y-%m-%dT%H:%M:%S%z", ) delta = now - timestamp return delta.days class Repo: """Represents a repo and stats for the repo.""" def __init__( self: "Repo", ignored_paths: list[str], active_users: list[str], ) -> None: """Create analysis of a git repo.""" self.paths = [ p for p in subprocess.check_output( ["git", "ls-files", "--no-deleted"], ) .decode("utf-8") .split() if not any(i in p for i in ignored_paths) ] logging.debug("collecting stats") self.stats = {} for path in self.paths: self.stats[path] = authors_for(path, active_users) self.blackholes = [ path for path, authors in self.stats.items() if not authors ] max_authors = 3 self.liabilities = { path: list(authors) for path, authors in self.stats.items() if 1 <= len(authors) < max_authors } now = datetime.datetime.utcnow().astimezone() self.stale = {} max_staleness = 180 for path in self.stats: _staleness = staleness(path, now) if _staleness > max_staleness: self.stale[path] = _staleness def print_blackholes(self: "Repo", *, full: bool) -> None: """Print number of blackholes, or list of all blackholes.""" # note: file renames may result in false positives n_blackhole = len(self.blackholes) sys.stdout.write(f"Blackholes: {n_blackhole}") if full: for path in self.blackholes: sys.stdout.write(f" {path}") sys.stdout.flush() def print_liabilities(self: "Repo", *, full: bool) -> None: """Print number of liabilities, or list of all liabilities.""" n_liabilities = len(self.liabilities) sys.stdout.write(f"Liabilities: {n_liabilities}") if full: for path, authors in self.liabilities.items(): sys.stdout.write(f" {path} ({', '.join(authors)})") sys.stdout.flush() def print_score(self: "Repo") -> None: """Print the overall score.""" n_total = len(self.stats.keys()) n_blackhole = len(self.blackholes) n_liabilities = len(self.liabilities) n_good = n_total - n_blackhole - n_liabilities sys.stdout.write(f"Total: {n_total}") this_score = score(n_blackhole, n_liabilities, n_good, n_total) sys.stdout.write(f"Score: {this_score:.2f}/{MAX_SCORE}".format()) sys.stdout.flush() def print_stale(self: "Repo", *, full: bool) -> None: """Print stale files.""" n_stale = len(self.stale) sys.stdout.write(f"Stale files: {n_stale}") if full: for path, days in self.stale.items(): sys.stdout.write(f" {path} ({days} days)") sys.stdout.flush() def guard_git(repo: Repo) -> None: """Guard against non-git repos.""" is_git = subprocess.run( ["git", "rev-parse"], capture_output=True, check=False, ).returncode if is_git != 0: sys.exit(f"error: not a git repository: {repo}") if __name__ == "__main__": ARGS = get_args() if ARGS.test: sys.stdout.write("ok") sys.exit() logging.basicConfig(stream=sys.stderr, level=ARGS.verbosity.upper()) logging.debug("starting") os.chdir(pathlib.Path(ARGS.repo).resolve()) guard_git(ARGS.repo) # if no active users provided, load from .mailmap if ARGS.active_users == [] and pathlib.Path(".mailmap").exists(): ARGS.active_users = mailmap_users() # collect data REPO = Repo(ARGS.ignored, ARGS.active_users) # print data REPO.print_score() REPO.print_blackholes(full=ARGS.blackholes) REPO.print_liabilities(full=ARGS.liabilities) REPO.print_stale(full=ARGS.stale)