summaryrefslogtreecommitdiff
path: root/Biz/Dragons
diff options
context:
space:
mode:
Diffstat (limited to 'Biz/Dragons')
-rw-r--r--[-rwxr-xr-x]Biz/Dragons/main.py132
1 files changed, 70 insertions, 62 deletions
diff --git a/Biz/Dragons/main.py b/Biz/Dragons/main.py
index 7a94f99..6e2c995 100755..100644
--- a/Biz/Dragons/main.py
+++ b/Biz/Dragons/main.py
@@ -1,13 +1,11 @@
-#!/usr/bin/env python
# : out dragons.py
-"""
-Analyze developer allocation across a codebase.
-"""
+"""Analyze developer allocation across a codebase."""
import argparse
import datetime
import logging
import os
+import pathlib
import re
import subprocess
import sys
@@ -15,18 +13,26 @@ import typing
def find_user(line: str) -> typing.Any:
- """Given 'Ben Sima <ben@bsima.me>', finds `Ben Sima'. Returns the first
- matching string."""
+ """
+ Find a person's name in a .mailmap file.
+
+ Given 'Ben Sima <ben@bsima.me>', finds `Ben Sima'. Returns the first
+ matching string.
+ """
return re.findall(r"^[^<]*", line)[0].strip()
def authors_for(
- path: str, active_users: typing.List[str]
-) -> typing.Dict[str, str]:
- """Return a dictionary of {author: commits} for given path. Usernames not in
- the 'active_users' list will be filtered out."""
+ path: str,
+ active_users: list[str],
+) -> dict[str, str]:
+ """
+ Return a dictionary of {author: commits} for given path.
+
+ Usernames not in the 'active_users' list will be filtered out.
+ """
raw = subprocess.check_output(
- ["git", "shortlog", "--numbered", "--summary", "--email", "--", path]
+ ["git", "shortlog", "--numbered", "--summary", "--email", "--", path],
).decode("utf-8")
lines = [s for s in raw.split("\n") if s]
data = {}
@@ -39,21 +45,18 @@ def authors_for(
return data
-def mailmap_users() -> typing.List[str]:
- """Returns users from the .mailmap file."""
- users = []
- with open(".mailmap", encoding="utf-8") as file:
+def mailmap_users() -> list[str]:
+ """Return users from the .mailmap file."""
+ with pathlib.Path(".mailmap").open() as file:
lines = file.readlines()
- for line in lines:
- users.append(find_user(line))
- return users
+ return [find_user(line) for line in lines]
MAX_SCORE = 10
def score(blackhole: float, liability: float, good: int, total: int) -> float:
- "Calculate the score."
+ """Calculate the score."""
weights = {
"blackhole": 0.5,
"liability": 0.7,
@@ -70,17 +73,20 @@ def score(blackhole: float, liability: float, good: int, total: int) -> float:
def get_args() -> typing.Any:
- "Parse CLI arguments."
+ """Parse CLI arguments."""
cli = argparse.ArgumentParser(description=__doc__)
cli.add_argument("test", action="store_true", help="run the test suite")
cli.add_argument(
- "repo", default=".", help="the git repo to run on", metavar="REPO"
+ "repo",
+ default=".",
+ help="the git repo to run on",
+ metavar="REPO",
)
cli.add_argument(
"-b",
"--blackholes",
action="store_true",
- help="print the blackholes (files with one or zero active contributors)",
+ help="print the blackholes (files with 1 or 0 active contributors)",
)
cli.add_argument(
"-l",
@@ -105,12 +111,7 @@ def get_args() -> typing.Any:
"--active-users",
nargs="+",
default=[],
- help=" ".join(
- [
- "list of active user emails."
- "if not provided, this is loaded from .mailmap"
- ]
- ),
+ help="list of active user emails. default: loaded from .mailmap",
)
cli.add_argument(
"-v",
@@ -123,7 +124,7 @@ def get_args() -> typing.Any:
def staleness(path: str, now: datetime.datetime) -> int:
- "How long has it been since this file was touched?"
+ """How long has it been since this file was touched?."""
timestamp = datetime.datetime.strptime(
subprocess.check_output(["git", "log", "-n1", "--pretty=%aI", path])
.decode("utf-8")
@@ -135,15 +136,18 @@ def staleness(path: str, now: datetime.datetime) -> int:
class Repo:
- "Represents a repo and stats for the repo."
+ """Represents a repo and stats for the repo."""
def __init__(
- self, ignored_paths: typing.List[str], active_users: typing.List[str]
+ self: "Repo",
+ ignored_paths: list[str],
+ active_users: list[str],
) -> None:
+ """Create analysis of a git repo."""
self.paths = [
p
for p in subprocess.check_output(
- ["git", "ls-files", "--no-deleted"]
+ ["git", "ls-files", "--no-deleted"],
)
.decode("utf-8")
.split()
@@ -156,60 +160,65 @@ class Repo:
self.blackholes = [
path for path, authors in self.stats.items() if not authors
]
+ max_authors = 3
self.liabilities = {
path: list(authors)
for path, authors in self.stats.items()
- if 1 <= len(authors) < 3
+ if 1 <= len(authors) < max_authors
}
now = datetime.datetime.utcnow().astimezone()
self.stale = {}
- for path, _ in self.stats.items():
+ max_staleness = 180
+ for path in self.stats:
_staleness = staleness(path, now)
- if _staleness > 180:
+ if _staleness > max_staleness:
self.stale[path] = _staleness
- def print_blackholes(self, full: bool) -> None:
- "Print number of blackholes, or list of all blackholes."
+ def print_blackholes(self: "Repo", *, full: bool) -> None:
+ """Print number of blackholes, or list of all blackholes."""
# note: file renames may result in false positives
n_blackhole = len(self.blackholes)
- print(f"Blackholes: {n_blackhole}")
+ sys.stdout.write(f"Blackholes: {n_blackhole}")
if full:
for path in self.blackholes:
- print(f" {path}")
+ sys.stdout.write(f" {path}")
+ sys.stdout.flush()
- def print_liabilities(self, full: bool) -> None:
- "Print number of liabilities, or list of all liabilities."
+ def print_liabilities(self: "Repo", *, full: bool) -> None:
+ """Print number of liabilities, or list of all liabilities."""
n_liabilities = len(self.liabilities)
- print(f"Liabilities: {n_liabilities}")
+ sys.stdout.write(f"Liabilities: {n_liabilities}")
if full:
for path, authors in self.liabilities.items():
- print(f" {path} ({', '.join(authors)})")
+ sys.stdout.write(f" {path} ({', '.join(authors)})")
+ sys.stdout.flush()
- def print_score(self) -> None:
- "Print the overall score."
+ def print_score(self: "Repo") -> None:
+ """Print the overall score."""
n_total = len(self.stats.keys())
n_blackhole = len(self.blackholes)
n_liabilities = len(self.liabilities)
n_good = n_total - n_blackhole - n_liabilities
- print("Total:", n_total)
+ sys.stdout.write(f"Total: {n_total}")
this_score = score(n_blackhole, n_liabilities, n_good, n_total)
- print(f"Score: {this_score:.2f}/{MAX_SCORE}".format())
+ sys.stdout.write(f"Score: {this_score:.2f}/{MAX_SCORE}".format())
+ sys.stdout.flush()
- def print_stale(self, full: bool) -> None:
- "Print stale files"
+ def print_stale(self: "Repo", *, full: bool) -> None:
+ """Print stale files."""
n_stale = len(self.stale)
- print(f"Stale files: {n_stale}")
+ sys.stdout.write(f"Stale files: {n_stale}")
if full:
for path, days in self.stale.items():
- print(f" {path} ({days} days)")
+ sys.stdout.write(f" {path} ({days} days)")
+ sys.stdout.flush()
def guard_git(repo: Repo) -> None:
- "Guard against non-git repos."
+ """Guard against non-git repos."""
is_git = subprocess.run(
["git", "rev-parse"],
- stderr=subprocess.PIPE,
- stdout=subprocess.PIPE,
+ capture_output=True,
check=False,
).returncode
if is_git != 0:
@@ -219,25 +228,24 @@ def guard_git(repo: Repo) -> None:
if __name__ == "__main__":
ARGS = get_args()
if ARGS.test:
- print("ok")
+ sys.stdout.write("ok")
sys.exit()
logging.basicConfig(stream=sys.stderr, level=ARGS.verbosity.upper())
logging.debug("starting")
- os.chdir(os.path.abspath(ARGS.repo))
+ os.chdir(pathlib.Path(ARGS.repo).resolve())
guard_git(ARGS.repo)
# if no active users provided, load from .mailmap
- if ARGS.active_users == []:
- if os.path.exists(".mailmap"):
- ARGS.active_users = mailmap_users()
+ if ARGS.active_users == [] and pathlib.Path(".mailmap").exists():
+ ARGS.active_users = mailmap_users()
# collect data
REPO = Repo(ARGS.ignored, ARGS.active_users)
# print data
REPO.print_score()
- REPO.print_blackholes(ARGS.blackholes)
- REPO.print_liabilities(ARGS.liabilities)
- REPO.print_stale(ARGS.stale)
+ REPO.print_blackholes(full=ARGS.blackholes)
+ REPO.print_liabilities(full=ARGS.liabilities)
+ REPO.print_stale(full=ARGS.stale)