summaryrefslogtreecommitdiff
path: root/Biz/Devalloc/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'Biz/Devalloc/main.py')
-rwxr-xr-xBiz/Devalloc/main.py221
1 files changed, 221 insertions, 0 deletions
diff --git a/Biz/Devalloc/main.py b/Biz/Devalloc/main.py
new file mode 100755
index 0000000..bb10441
--- /dev/null
+++ b/Biz/Devalloc/main.py
@@ -0,0 +1,221 @@
+#!/usr/bin/env python
+"""
+Analyze developer allocation across a codebase.
+"""
+
+import argparse
+import datetime
+import logging
+import os
+import re
+import subprocess
+import sys
+
+
+def find_user(line):
+ """Given 'Ben Sima <ben@bsima.me>', finds `Ben Sima'. Returns the first
+ matching string."""
+ return re.findall(r"^[^<]*", line)[0].strip()
+
+
+def authors_for(path, active_users):
+ """Return a dictionary of {author: commits} for given path. Usernames not in
+ the 'active_users' list will be filtered out."""
+ raw = subprocess.check_output(
+ ["git", "shortlog", "--numbered", "--summary", "--email", "--", path]
+ ).decode("utf-8")
+ lines = [s for s in raw.split("\n") if s]
+ data = {}
+ for line in lines:
+ parts = line.strip().split("\t")
+ author = find_user(parts[1])
+ commits = parts[0]
+ if author in active_users:
+ data[author] = commits
+ return data
+
+
+def mailmap_users():
+ """Returns users from the .mailmap file."""
+ users = []
+ with open(".mailmap") as file:
+ lines = file.readlines()
+ for line in lines:
+ users.append(find_user(line))
+ return users
+
+
+MAX_SCORE = 10
+
+
+def score(blackhole, liability, good, total):
+ "Calculate the score."
+ weights = {
+ "blackhole": 0.5,
+ "liability": 0.7,
+ }
+ return (
+ MAX_SCORE
+ * (
+ (blackhole * weights["blackhole"])
+ + (liability * weights["liability"])
+ + good
+ )
+ / total
+ )
+
+
+def get_args():
+ "Parse CLI arguments."
+ cli = argparse.ArgumentParser(description=__doc__)
+ cli.add_argument("repo", default=".", help="the git repo to run on", metavar="REPO")
+ cli.add_argument(
+ "-b",
+ "--blackholes",
+ action="store_true",
+ help="print the blackholes (files with one or zero active contributors)",
+ )
+ cli.add_argument(
+ "-l",
+ "--liabilities",
+ action="store_true",
+ help="print the liabilities (files with < 3 active contributors)",
+ )
+ cli.add_argument(
+ "-s",
+ "--stale",
+ action="store_true",
+ help="print stale files (haven't been touched in 6 months)",
+ )
+ cli.add_argument(
+ "-i", "--ignored", nargs="+", default=[], help="patterns to ignore in paths",
+ )
+ cli.add_argument(
+ "--active-users",
+ nargs="+",
+ default=[],
+ help="list of active user emails. if not provided, this is loaded from .mailmap",
+ )
+ cli.add_argument(
+ "-v",
+ "--verbosity",
+ help="set the log level verbosity",
+ choices=["debug", "warning", "error"],
+ default="error",
+ )
+ return cli.parse_args()
+
+
+def guard_git(repo):
+ "Guard against non-git repos."
+ is_git = subprocess.run(
+ ["git", "rev-parse"],
+ stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ check=False,
+ ).returncode
+ if is_git != 0:
+ sys.exit(f"error: not a git repository: {repo}")
+
+
+def staleness(path, now):
+ "How long has it been since this file was touched?"
+ timestamp = datetime.datetime.strptime(
+ subprocess.check_output(["git", "log", "-n1", "--pretty=%aI", path])
+ .decode("utf-8")
+ .strip(),
+ "%Y-%m-%dT%H:%M:%S%z",
+ )
+ delta = now - timestamp
+ return delta.days
+
+
+class Repo:
+ "Represents a repo and stats for the repo."
+
+ def __init__(self, ignored_paths, active_users):
+ self.paths = [
+ p
+ for p in subprocess.check_output(["git", "ls-files", "--no-deleted"])
+ .decode("utf-8")
+ .split()
+ if not any(i in p for i in ignored_paths)
+ ]
+ logging.debug("collecting stats")
+ self.stats = {}
+ for path in self.paths:
+ self.stats[path] = authors_for(path, active_users)
+ self.blackholes = [path for path, authors in self.stats.items() if not authors]
+ self.liabilities = {
+ path: list(authors)
+ for path, authors in self.stats.items()
+ if 1 <= len(authors) < 3
+ }
+ now = datetime.datetime.utcnow().astimezone()
+ self.stale = {}
+ for path, _ in self.stats.items():
+ _staleness = staleness(path, now)
+ if _staleness > 180:
+ self.stale[path] = _staleness
+
+ def print_blackholes(self, full):
+ "Print number of blackholes, or list of all blackholes."
+ # note: file renames may result in false positives
+ n_blackhole = len(self.blackholes)
+ print(f"Blackholes: {n_blackhole}")
+ if full:
+ for path in self.blackholes:
+ print(f" {path}")
+
+ def print_liabilities(self, full):
+ "Print number of liabilities, or list of all liabilities."
+ n_liabilities = len(self.liabilities)
+ print(f"Liabilities: {n_liabilities}")
+ if full:
+ for path, authors in self.liabilities.items():
+ print(f" {path} ({', '.join(authors)})")
+
+ def print_score(self):
+ "Print the overall score."
+ n_total = len(self.stats.keys())
+ n_blackhole = len(self.blackholes)
+ n_liabilities = len(self.liabilities)
+ n_good = n_total - n_blackhole - n_liabilities
+ print("Total:", n_total)
+ print(
+ "Score: {:.2f}/{}".format(
+ score(n_blackhole, n_liabilities, n_good, n_total), MAX_SCORE
+ )
+ )
+
+ def print_stale(self, full):
+ "Print stale files"
+ n_stale = len(self.stale)
+ print(f"Stale files: {n_stale}")
+ if full:
+ for path, days in self.stale.items():
+ print(f" {path} ({days} days)")
+
+
+if __name__ == "__main__":
+ ARGS = get_args()
+ logging.basicConfig(stream=sys.stderr, level=ARGS.verbosity.upper())
+
+ logging.debug("starting")
+ os.chdir(os.path.abspath(ARGS.repo))
+
+ guard_git(ARGS.repo)
+
+ # if no active users provided, load from .mailmap
+ if ARGS.active_users == []:
+ if os.path.exists(".mailmap"):
+ ARGS.active_users = mailmap_users()
+
+ # collect data
+ REPO = Repo(ARGS.ignored, ARGS.active_users)
+
+ # print data
+ REPO.print_score()
+ REPO.print_blackholes(ARGS.blackholes)
+ REPO.print_liabilities(ARGS.liabilities)
+ REPO.print_stale(ARGS.stale)