summaryrefslogtreecommitdiff
path: root/Devalloc/main.py
diff options
context:
space:
mode:
authorBen Sima <ben@bsima.me>2020-09-11 11:40:18 -0500
committerBen Sima <ben@bsima.me>2020-09-16 15:34:23 -0400
commitc6b32e05d15c428d37d5bf2ee6b5a7210d4a1dbf (patch)
tree6af0a0f1c67314392ac94d9f8dee91ee2bb3e3e1 /Devalloc/main.py
parent2351224340999dc21997a19db094f644c240318b (diff)
devalloc: prototype
Diffstat (limited to 'Devalloc/main.py')
-rwxr-xr-xDevalloc/main.py168
1 files changed, 168 insertions, 0 deletions
diff --git a/Devalloc/main.py b/Devalloc/main.py
new file mode 100755
index 0000000..1010966
--- /dev/null
+++ b/Devalloc/main.py
@@ -0,0 +1,168 @@
+#!/usr/bin/env python
+"""
+Analyze developer allocation across a codebase.
+"""
+
+import argparse
+import logging
+import os
+import re
+import subprocess
+import sys
+
+
+def extract_email(line):
+ """Given 'Ben Sima <ben@bsima.me>', extract `ben@bsima.me'."""
+ return re.search(r"<(\S*)>", line).group(1)
+
+
+def authors_for(path, active_users):
+ """Return a dictionary of {author: commits} for given path. Usernames not in
+ the 'active_users' list will be filtered out."""
+ raw = subprocess.check_output(
+ ["git", "shortlog", "--numbered", "--summary", "--email", "--", path]
+ ).decode("utf-8")
+ lines = [s for s in raw.split("\n") if s]
+ data = {}
+ for line in lines:
+ parts = line.strip().split("\t")
+ author = extract_email(parts[1])
+ commits = parts[0]
+ if author in active_users:
+ data[author] = commits
+ return data
+
+
+MAX_SCORE = 10
+
+
+def score(blackhole, liability, good, total):
+ "Calculate the score."
+ weights = {
+ "blackhole": 0.5,
+ "liability": 0.7,
+ }
+ return (
+ MAX_SCORE
+ * (
+ (blackhole * weights["blackhole"])
+ + (liability * weights["liability"])
+ + good
+ )
+ / total
+ )
+
+
+def get_args():
+ "Parse CLI arguments."
+ cli = argparse.ArgumentParser(description=__doc__)
+ cli.add_argument("repo", default=".", help="the git repo to run on", metavar="REPO")
+ cli.add_argument(
+ "-b",
+ "--blackholes",
+ action="store_true",
+ help="print the blackholes (files with one or zero active contributors)",
+ )
+ cli.add_argument(
+ "-l",
+ "--liabilities",
+ action="store_true",
+ help="print the liabilities (files with < 3 active contributors)",
+ )
+ cli.add_argument(
+ "-i", "--ignored", nargs="+", default=[], help="patterns to ignore in paths",
+ )
+ cli.add_argument(
+ "--active-users", nargs="+", default=[], help="list of active user emails",
+ )
+ cli.add_argument(
+ "-v",
+ "--verbosity",
+ help="set the log level verbosity",
+ choices=["debug", "warning", "error"],
+ default="error",
+ )
+ return cli.parse_args()
+
+
+def guard_git(repo):
+ "Guard against non-git repos."
+ is_git = subprocess.run(
+ ["git", "rev-parse"],
+ stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ check=False,
+ ).returncode
+ if is_git != 0:
+ sys.exit(f"error: not a git repository: {repo}")
+
+
+class Repo:
+ "Represents a repo and stats for the repo."
+
+ def __init__(self, ignored_paths, active_users):
+ self.paths = [
+ p
+ for p in subprocess.check_output(["git", "ls-files"])
+ .decode("utf-8")
+ .split()
+ if not any(i in p for i in ignored_paths)
+ ]
+ logging.debug("collecting stats")
+ self.stats = {}
+ for path in self.paths:
+ self.stats[path] = authors_for(path, active_users)
+ self.blackholes = [path for path, authors in self.stats.items() if not authors]
+ self.liabilities = {
+ path: list(authors.keys())
+ for path, authors in self.stats.items()
+ if 1 < len(authors) < 3
+ }
+
+ def print_blackholes(self, full):
+ "Print number of blackholes, or list of all blackholes."
+ # note: file renames may result in false positives
+ n_blackhole = len(self.blackholes)
+ print(f"Blackholes: {n_blackhole}")
+ if full:
+ for path in self.blackholes:
+ print(f" {path}")
+
+ def print_liabilities(self, full):
+ "Print number of liabilities, or list of all liabilities."
+ n_liabilities = len(self.liabilities)
+ print(f"Liabilities: {n_liabilities}")
+ if full:
+ for path, authors in self.liabilities.items():
+ print(f" {path} ({', '.join(authors)})")
+
+ def print_score(self):
+ "Print the overall score."
+ n_total = len(self.stats.keys())
+ n_blackhole = len(self.blackholes)
+ n_liabilities = len(self.liabilities)
+ n_good = n_total - n_blackhole - n_liabilities
+ print("Total:", n_total)
+ print(
+ "Score: {:.2f}/{}".format(
+ score(n_blackhole, n_liabilities, n_good, n_total), MAX_SCORE
+ )
+ )
+
+
+if __name__ == "__main__":
+ ARGS = get_args()
+ logging.basicConfig(stream=sys.stderr, level=ARGS.verbosity.upper())
+
+ logging.debug("starting")
+ os.chdir(os.path.abspath(ARGS.repo))
+
+ guard_git(ARGS.repo)
+
+ # collect data
+ REPO = Repo(ARGS.ignored, ARGS.active_users)
+
+ # print data
+ REPO.print_score()
+ REPO.print_blackholes(ARGS.blackholes)
+ REPO.print_liabilities(ARGS.liabilities)