#!/usr/bin/env python # : out dragons.py """ Analyze developer allocation across a codebase. """ import argparse import datetime import logging import os import re import subprocess import sys import typing def find_user(line: str) -> typing.Any: """Given 'Ben Sima ', finds `Ben Sima'. Returns the first matching string.""" return re.findall(r"^[^<]*", line)[0].strip() def authors_for(path: str, active_users: typing.List[str]) -> typing.Dict[str, str]: """Return a dictionary of {author: commits} for given path. Usernames not in the 'active_users' list will be filtered out.""" raw = subprocess.check_output( ["git", "shortlog", "--numbered", "--summary", "--email", "--", path] ).decode("utf-8") lines = [s for s in raw.split("\n") if s] data = {} for line in lines: parts = line.strip().split("\t") author = find_user(parts[1]) commits = parts[0] if author in active_users: data[author] = commits return data def mailmap_users() -> typing.List[str]: """Returns users from the .mailmap file.""" users = [] with open(".mailmap", encoding="utf-8") as file: lines = file.readlines() for line in lines: users.append(find_user(line)) return users MAX_SCORE = 10 def score(blackhole: float, liability: float, good: int, total: int) -> float: "Calculate the score." weights = { "blackhole": 0.5, "liability": 0.7, } return ( MAX_SCORE * ( (blackhole * weights["blackhole"]) + (liability * weights["liability"]) + good ) / total ) def get_args() -> typing.Any: "Parse CLI arguments." cli = argparse.ArgumentParser(description=__doc__) cli.add_argument("test", action="store_true", help="run the test suite") cli.add_argument("repo", default=".", help="the git repo to run on", metavar="REPO") cli.add_argument( "-b", "--blackholes", action="store_true", help="print the blackholes (files with one or zero active contributors)", ) cli.add_argument( "-l", "--liabilities", action="store_true", help="print the liabilities (files with < 3 active contributors)", ) cli.add_argument( "-s", "--stale", action="store_true", help="print stale files (haven't been touched in 6 months)", ) cli.add_argument( "-i", "--ignored", nargs="+", default=[], help="patterns to ignore in paths", ) cli.add_argument( "--active-users", nargs="+", default=[], help=" ".join( [ "list of active user emails." "if not provided, this is loaded from .mailmap" ] ), ) cli.add_argument( "-v", "--verbosity", help="set the log level verbosity", choices=["debug", "warning", "error"], default="error", ) return cli.parse_args() def staleness(path: str, now: datetime.datetime) -> int: "How long has it been since this file was touched?" timestamp = datetime.datetime.strptime( subprocess.check_output(["git", "log", "-n1", "--pretty=%aI", path]) .decode("utf-8") .strip(), "%Y-%m-%dT%H:%M:%S%z", ) delta = now - timestamp return delta.days class Repo: "Represents a repo and stats for the repo." def __init__( self, ignored_paths: typing.List[str], active_users: typing.List[str] ) -> None: self.paths = [ p for p in subprocess.check_output(["git", "ls-files", "--no-deleted"]) .decode("utf-8") .split() if not any(i in p for i in ignored_paths) ] logging.debug("collecting stats") self.stats = {} for path in self.paths: self.stats[path] = authors_for(path, active_users) self.blackholes = [path for path, authors in self.stats.items() if not authors] self.liabilities = { path: list(authors) for path, authors in self.stats.items() if 1 <= len(authors) < 3 } now = datetime.datetime.utcnow().astimezone() self.stale = {} for path, _ in self.stats.items(): _staleness = staleness(path, now) if _staleness > 180: self.stale[path] = _staleness def print_blackholes(self, full: bool) -> None: "Print number of blackholes, or list of all blackholes." # note: file renames may result in false positives n_blackhole = len(self.blackholes) print(f"Blackholes: {n_blackhole}") if full: for path in self.blackholes: print(f" {path}") def print_liabilities(self, full: bool) -> None: "Print number of liabilities, or list of all liabilities." n_liabilities = len(self.liabilities) print(f"Liabilities: {n_liabilities}") if full: for path, authors in self.liabilities.items(): print(f" {path} ({', '.join(authors)})") def print_score(self) -> None: "Print the overall score." n_total = len(self.stats.keys()) n_blackhole = len(self.blackholes) n_liabilities = len(self.liabilities) n_good = n_total - n_blackhole - n_liabilities print("Total:", n_total) this_score = score(n_blackhole, n_liabilities, n_good, n_total) print(f"Score: {this_score:.2f}/{MAX_SCORE}".format()) def print_stale(self, full: bool) -> None: "Print stale files" n_stale = len(self.stale) print(f"Stale files: {n_stale}") if full: for path, days in self.stale.items(): print(f" {path} ({days} days)") def guard_git(repo: Repo) -> None: "Guard against non-git repos." is_git = subprocess.run( ["git", "rev-parse"], stderr=subprocess.PIPE, stdout=subprocess.PIPE, check=False, ).returncode if is_git != 0: sys.exit(f"error: not a git repository: {repo}") if __name__ == "__main__": ARGS = get_args() if ARGS.test: print("ok") sys.exit() logging.basicConfig(stream=sys.stderr, level=ARGS.verbosity.upper()) logging.debug("starting") os.chdir(os.path.abspath(ARGS.repo)) guard_git(ARGS.repo) # if no active users provided, load from .mailmap if ARGS.active_users == []: if os.path.exists(".mailmap"): ARGS.active_users = mailmap_users() # collect data REPO = Repo(ARGS.ignored, ARGS.active_users) # print data REPO.print_score() REPO.print_blackholes(ARGS.blackholes) REPO.print_liabilities(ARGS.liabilities) REPO.print_stale(ARGS.stale)