"""Team data processing module."""
from __future__ import annotations
import logging
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from collections.abc import Callable
from nhl_scrabble.interfaces import APIClientProtocol, ScorerProtocol
from nhl_scrabble.api.nhl_client import NHLApiNotFoundError
from nhl_scrabble.models.player import PlayerScore
from nhl_scrabble.models.standings import ConferenceStandings, DivisionStandings
from nhl_scrabble.models.team import TeamScore
logger = logging.getLogger(__name__)
[docs]
class TeamProcessor:
"""Process team roster data and calculate aggregate scores.
This class orchestrates fetching roster data for all NHL teams, calculating Scrabble scores for
all players, and aggregating statistics at team, division, and conference levels.
Supports concurrent fetching of team rosters to improve performance for I/O-bound operations.
"""
[docs]
def __init__(
self,
api_client: APIClientProtocol,
scorer: ScorerProtocol,
max_workers: int = 5,
) -> None:
"""Initialize the team processor.
Args:
api_client: NHL API client for fetching data (APIClientProtocol)
scorer: Scrabble scorer for calculating player scores (ScorerProtocol)
max_workers: Maximum number of concurrent API requests (default: 5)
Examples:
Initialize with mocked dependencies:
>>> class MockAPIClient:
... def get_teams(self):
... return {"TOR": {"division": "Atlantic", "conference": "Eastern"}}
... def get_team_roster(self, team):
... return {"forwards": [], "defensemen": [], "goalies": []}
... def close(self):
... pass
>>> class MockScorer:
... def score_player(self, player):
... return 10
>>> client = MockAPIClient()
>>> scorer = MockScorer()
>>> processor = TeamProcessor(client, scorer, max_workers=3)
>>> processor.max_workers
3
"""
self.api_client = api_client
self.scorer = scorer
self.max_workers = max_workers
def _process_team_roster(
self,
roster: dict[str, Any],
team_abbrev: str,
division: str,
conference: str,
) -> list[PlayerScore]:
"""Process a single team's roster and score all players.
Args:
roster: Roster data from NHL API
team_abbrev: Team abbreviation
division: Division name
conference: Conference name
Returns:
List of PlayerScore objects for all players on the team
"""
team_players: list[PlayerScore] = []
# Process all position groups
for position in ("forwards", "defensemen", "goalies"):
if position not in roster:
continue
for player_data in roster[position]:
player_score = self.scorer.score_player(
player_data,
team_abbrev,
division,
conference,
)
team_players.append(player_score)
# Use level guard to avoid expensive sum() call when DEBUG disabled
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
f"Scored {len(team_players)} players for {team_abbrev} "
f"(total: {sum(p.full_score for p in team_players)})",
)
return team_players
def _fetch_and_process_team(
self,
team_abbrev: str,
team_meta: dict[str, str],
season: str | None = None,
) -> tuple[TeamScore, list[PlayerScore]] | None:
"""Fetch and process a single team (thread-safe).
This method is called concurrently from multiple threads. It fetches the roster
for a single team, calculates scores for all players, and aggregates team statistics.
Args:
team_abbrev: Team abbreviation (e.g., "TOR", "BOS")
team_meta: Team metadata containing division and conference
season: Optional season to analyze (format: YYYYYYYY, e.g., 20222023)
Returns:
Tuple of (TeamScore, player list) if successful, None if team fetch failed
Note:
This method is thread-safe as it operates only on local variables and
thread-safe API client methods. No shared mutable state is accessed.
"""
try:
# Fetch roster (with built-in retry and rate limiting)
roster = self.api_client.get_team_roster(team_abbrev, season=season)
# Process roster
team_players = self._process_team_roster(
roster,
team_abbrev,
team_meta["division"],
team_meta["conference"],
)
# Calculate team score
team_total = sum(player.full_score for player in team_players)
team_score = TeamScore(
abbrev=team_abbrev,
name=team_meta.get("name", team_abbrev),
total=team_total,
players=team_players,
division=team_meta["division"],
conference=team_meta["conference"],
)
return (team_score, team_players)
except NHLApiNotFoundError:
# Team has no roster data
return None
[docs]
def process_all_teams(
self,
progress_callback: Callable[[str], None] | None = None,
season: str | None = None,
) -> tuple[dict[str, TeamScore], list[PlayerScore], list[str]]:
"""Process all NHL teams and calculate scores with concurrent fetching.
Uses ThreadPoolExecutor to fetch team rosters concurrently, improving performance
for I/O-bound operations. The number of concurrent workers is controlled by max_workers.
Args:
progress_callback: Optional callback to report progress after each team.
Called with team abbreviation after successfully processing each team.
season: Optional season to analyze (format: YYYYYYYY, e.g., 20222023).
If None, fetches current season data.
Returns:
Tuple containing:
- Dictionary mapping team abbreviations to TeamScore objects
- List of all PlayerScore objects across all teams
- List of team abbreviations that failed to fetch
Examples:
>>> client = NHLApiClient()
>>> scorer = ScrabbleScorer()
>>> processor = TeamProcessor(client, scorer, max_workers=5)
>>> teams, players, failed = processor.process_all_teams()
>>> len(teams) > 0
True
>>> teams_2022, players_2022, failed = processor.process_all_teams(season="20222023")
>>> len(teams_2022) > 0
True
"""
season_desc = f"for season {season}" if season else "for current season"
logger.debug(
f"Starting team processing {season_desc} "
f"(concurrent mode, max_workers={self.max_workers})",
)
# Fetch all teams metadata
teams_info = self.api_client.get_teams(season=season)
total_teams = len(teams_info)
logger.debug(f"Fetched {total_teams} teams from NHL API")
team_scores: dict[str, TeamScore] = {}
all_players: list[PlayerScore] = []
failed_teams: list[str] = []
# Concurrent fetching with controlled parallelism
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all roster fetch jobs
future_to_team = {
executor.submit(
self._fetch_and_process_team,
team_abbrev,
team_meta,
season,
): team_abbrev
for team_abbrev, team_meta in teams_info.items()
}
# Process results as they complete
for completed, future in enumerate(as_completed(future_to_team), start=1):
team_abbrev = future_to_team[future]
try:
result = future.result()
if result is None:
# Team failed to fetch
failed_teams.append(team_abbrev)
logger.warning(
f"No roster data for {team_abbrev} ({completed}/{total_teams})",
)
else:
# Success
team_score, team_players = result
team_scores[team_abbrev] = team_score
all_players.extend(team_players)
logger.debug(f"Processed {team_abbrev} ({completed}/{total_teams})")
# Call progress callback if provided
if progress_callback:
progress_callback(team_abbrev)
except (OSError, ValueError) as e:
# OSError covers network/connection errors, ValueError for data parsing
logger.error(f"Error processing {team_abbrev}: {e}")
failed_teams.append(team_abbrev)
logger.debug(
f"Processing complete: {len(team_scores)} teams processed, "
f"{len(failed_teams)} failed (concurrent mode)",
)
return team_scores, all_players, failed_teams
[docs]
def calculate_division_standings(
self,
team_scores: dict[str, TeamScore],
) -> dict[str, DivisionStandings]:
"""Calculate division-level standings from team scores.
Args:
team_scores: Dictionary of TeamScore objects by team abbreviation
Returns:
Dictionary mapping division names to DivisionStandings objects
Examples:
>>> standings = processor.calculate_division_standings(teams)
>>> "Atlantic" in standings
True
"""
division_data: dict[str, dict[str, Any]] = defaultdict(
lambda: {"total": 0, "teams": [], "player_count": 0},
)
for team_abbrev, team_score in team_scores.items():
division = team_score.division
division_data[division]["total"] += team_score.total
division_data[division]["teams"].append(team_abbrev)
division_data[division]["player_count"] += team_score.player_count
# Convert to DivisionStandings objects
standings: dict[str, DivisionStandings] = {}
for division, data in division_data.items():
avg_per_team = data["total"] / len(data["teams"]) if data["teams"] else 0.0
standings[division] = DivisionStandings(
name=division,
total=data["total"],
teams=sorted(data["teams"]),
player_count=data["player_count"],
avg_per_team=avg_per_team,
)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Calculated standings for {len(standings)} divisions")
return standings
[docs]
def calculate_conference_standings(
self,
team_scores: dict[str, TeamScore],
) -> dict[str, ConferenceStandings]:
"""Calculate conference-level standings from team scores.
Args:
team_scores: Dictionary of TeamScore objects by team abbreviation
Returns:
Dictionary mapping conference names to ConferenceStandings objects
Examples:
>>> standings = processor.calculate_conference_standings(teams)
>>> "Eastern" in standings
True
"""
conference_data: dict[str, dict[str, Any]] = defaultdict(
lambda: {"total": 0, "teams": [], "player_count": 0},
)
for team_abbrev, team_score in team_scores.items():
conference = team_score.conference
conference_data[conference]["total"] += team_score.total
conference_data[conference]["teams"].append(team_abbrev)
conference_data[conference]["player_count"] += team_score.player_count
# Convert to ConferenceStandings objects
standings: dict[str, ConferenceStandings] = {}
for conference, data in conference_data.items():
avg_per_team = data["total"] / len(data["teams"]) if data["teams"] else 0.0
standings[conference] = ConferenceStandings(
name=conference,
total=data["total"],
teams=sorted(data["teams"]),
player_count=data["player_count"],
avg_per_team=avg_per_team,
)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Calculated standings for {len(standings)} conferences")
return standings