From 2be76d946d741ee2e4dd2cd65fa5742a83119fd3 Mon Sep 17 00:00:00 2001 From: Cade Mirchandani Date: Thu, 24 Jul 2025 11:41:20 -0700 Subject: [PATCH 1/2] feat: add event dataclasses --- .../events.py | 238 ++++++++++++++++++ 1 file changed, 238 insertions(+) create mode 100644 src/snakemake_interface_logger_plugins/events.py diff --git a/src/snakemake_interface_logger_plugins/events.py b/src/snakemake_interface_logger_plugins/events.py new file mode 100644 index 0000000..03fdbc5 --- /dev/null +++ b/src/snakemake_interface_logger_plugins/events.py @@ -0,0 +1,238 @@ +import uuid +from dataclasses import dataclass, field +from logging import LogRecord +from typing import Any, Dict, List, Optional + + +@dataclass +class Error: + exception: Optional[str] = None + location: Optional[str] = None + rule: Optional[str] = None + traceback: Optional[str] = None + file: Optional[str] = None + line: Optional[str] = None + + @classmethod + def from_record(cls, record: LogRecord) -> "Error": + return cls( + exception=getattr(record, "exception", None), + location=getattr(record, "location", None), + rule=getattr(record, "rule", None), + traceback=getattr(record, "traceback", None), + file=getattr(record, "file", None), + line=getattr(record, "line", None), + ) + + +@dataclass +class WorkflowStarted: + workflow_id: uuid.UUID + snakefile: Optional[str] + + def __post_init__(self) -> None: + if not isinstance(self.snakefile, str): + try: + # Try to convert to string - this should work for PosixPath and other path-like objects + self.snakefile = str(self.snakefile) + except (TypeError, ValueError) as e: + raise ValueError(f"Could not convert snakefile to string: {e}") + + @classmethod + def from_record(cls, record: LogRecord) -> "WorkflowStarted": + return cls( + workflow_id=getattr(record, "workflow_id"), + snakefile=getattr(record, "snakefile", None), + ) + + +@dataclass +class JobInfo: + jobid: int + rule_name: str + threads: int + input: Optional[List[str]] = None + output: Optional[List[str]] = None + log: Optional[List[str]] = None + benchmark: Optional[List[str]] = None + rule_msg: Optional[str] = None + wildcards: Optional[Dict[str, Any]] = field(default_factory=dict) + reason: Optional[str] = None + shellcmd: Optional[str] = None + priority: Optional[int] = None + resources: Optional[Dict[str, Any]] = field(default_factory=dict) + + @classmethod + def from_record(cls, record: LogRecord) -> "JobInfo": + resources = {} + if hasattr(record, "resources") and hasattr(record.resources, "_names"): # type: ignore + resources = { + name: value + for name, value in zip(record.resources._names, record.resources) # type: ignore + if name not in {"_cores", "_nodes"} + } + + return cls( + jobid=getattr(record, "jobid", 0), + rule_name=getattr(record, "rule_name", ""), + threads=getattr(record, "threads", 1), + rule_msg=getattr(record, "rule_msg", None), + wildcards=getattr(record, "wildcards", {}), + reason=getattr(record, "reason", None), + shellcmd=getattr(record, "shellcmd", None), + priority=getattr(record, "priority", None), + input=getattr(record, "input", None), + log=getattr(record, "log", None), + output=getattr(record, "output", None), + benchmark=getattr(record, "benchmark", None), + resources=resources, + ) + + +@dataclass +class JobStarted: + job_ids: List[int] + + @classmethod + def from_record(cls, record: LogRecord) -> "JobStarted": + jobs = getattr(record, "jobs", []) + + if jobs is None: + jobs = [] + elif isinstance(jobs, int): + jobs = [jobs] + + return cls(job_ids=jobs) + + +@dataclass +class JobFinished: + job_id: int + + @classmethod + def from_record(cls, record: LogRecord) -> "JobFinished": + return cls(job_id=getattr(record, "job_id")) + + +@dataclass +class ShellCmd: + jobid: int + shellcmd: Optional[str] = None + rule_name: Optional[str] = None + + @classmethod + def from_record(cls, record: LogRecord) -> "ShellCmd": + return cls( + jobid=getattr(record, "jobid", 0), + shellcmd=getattr(record, "shellcmd", ""), + rule_name=getattr(record, "name", None), + ) + + +@dataclass +class JobError: + jobid: int + + @classmethod + def from_record(cls, record: LogRecord) -> "JobError": + return cls( + jobid=getattr(record, "jobid", 0), + ) + + +@dataclass +class GroupInfo: + group_id: int + jobs: List[Any] = field(default_factory=list) + + @classmethod + def from_record(cls, record: LogRecord) -> "GroupInfo": + return cls( + group_id=getattr(record, "group_id", 0), jobs=getattr(record, "jobs", []) + ) + + +@dataclass +class GroupError: + groupid: int + aux_logs: List[Any] = field(default_factory=list) + job_error_info: Dict[str, Any] = field(default_factory=dict) + + @classmethod + def from_record(cls, record: LogRecord) -> "GroupError": + return cls( + groupid=getattr(record, "groupid", 0), + aux_logs=getattr(record, "aux_logs", []), + job_error_info=getattr(record, "job_error_info", {}), + ) + + +@dataclass +class ResourcesInfo: + nodes: Optional[List[str]] = None + cores: Optional[int] = None + provided_resources: Optional[Dict[str, Any]] = None + + @classmethod + def from_record(cls, record: LogRecord) -> "ResourcesInfo": + if hasattr(record, "nodes"): + return cls(nodes=record.nodes) # type: ignore + elif hasattr(record, "cores"): + return cls(cores=record.cores) # type: ignore + elif hasattr(record, "provided_resources"): + return cls(provided_resources=record.provided_resources) # type: ignore + else: + return cls() + + +@dataclass +class DebugDag: + status: Optional[str] = None + job: Optional[Any] = None + file: Optional[str] = None + exception: Optional[str] = None + + @classmethod + def from_record(cls, record: LogRecord) -> "DebugDag": + return cls( + status=getattr(record, "status", None), + job=getattr(record, "job", None), + file=getattr(record, "file", None), + exception=getattr(record, "exception", None), + ) + + +@dataclass +class Progress: + done: int + total: int + + @classmethod + def from_record(cls, record: LogRecord) -> "Progress": + return cls(done=getattr(record, "done", 0), total=getattr(record, "total", 0)) + + +@dataclass +class RuleGraph: + rulegraph: Dict[str, Any] + + @classmethod + def from_record(cls, record: LogRecord) -> "RuleGraph": + return cls(rulegraph=getattr(record, "rulegraph", {})) + + +@dataclass +class RunInfo: + per_rule_job_counts: Dict[str, int] = field(default_factory=dict) + total_job_count: int = 0 + + @classmethod + def from_record(cls, record: LogRecord) -> "RunInfo": + all_stats = getattr(record, "stats", {}) + + per_rule_job_counts = {k: v for k, v in all_stats.items() if k != "total"} + + total_job_count = all_stats.get("total", 0) + return cls( + per_rule_job_counts=per_rule_job_counts, total_job_count=total_job_count + ) From 1cce5ea4ad9cf9613bd4c4024153ccd961ca4ca7 Mon Sep 17 00:00:00 2001 From: Cade Mirchandani Date: Thu, 24 Jul 2025 11:47:06 -0700 Subject: [PATCH 2/2] docs: update docs for event dataclasses --- README.md | 170 +++++++++++++++++++++++------------------------------- 1 file changed, 71 insertions(+), 99 deletions(-) diff --git a/README.md b/README.md index 6d3558f..03ed7c5 100644 --- a/README.md +++ b/README.md @@ -201,105 +201,77 @@ class LogHandler(LogHandlerBase): ## Available Log Events -The `LogEvent` enum defines particularly important Snakemake events such as workflow starting, job submission, job failure, etc. Below are the available events and the fields you can typically expect in `LogRecord` objects for each event type. **Note: These field lists are guidelines only and may change between versions. Always use defensive programming practices like `getattr()` with defaults or `hasattr()` checks when accessing fields.** - -### Event Types and Typical Available Fields - -**`LogEvent.ERROR`** -- `exception: Optional[str]` - Exception type -- `location: Optional[str]` - Location where error occurred -- `rule: Optional[str]` - Rule name associated with error -- `traceback: Optional[str]` - Full traceback -- `file: Optional[str]` - File where error occurred -- `line: Optional[str]` - Line number where error occurred - -**`LogEvent.WORKFLOW_STARTED`** -- `workflow_id: uuid.UUID` - Unique workflow identifier -- `snakefile: Optional[str]` - Path to the Snakefile - -**`LogEvent.JOB_INFO`** -- `jobid: int` - Job identifier -- `rule_name: str` - Name of the rule -- `threads: int` - Number of threads allocated -- `input: Optional[List[str]]` - Input files -- `output: Optional[List[str]]` - Output files -- `log: Optional[List[str]]` - Log files -- `benchmark: Optional[List[str]]` - Benchmark files -- `rule_msg: Optional[str]` - Rule message -- `wildcards: Optional[Dict[str, Any]]` - Wildcard values -- `reason: Optional[str]` - Reason for job execution -- `shellcmd: Optional[str]` - Shell command to execute -- `priority: Optional[int]` - Job priority -- `resources: Optional[Dict[str, Any]]` - Resource requirements - -**`LogEvent.JOB_STARTED`** -- `job_ids: List[int]` - List of job IDs that started - -**`LogEvent.JOB_FINISHED`** -- `job_id: int` - ID of the finished job - -**`LogEvent.SHELLCMD`** -- `jobid: int` - Job identifier -- `shellcmd: Optional[str]` - Shell command being executed -- `rule_name: Optional[str]` - Name of the rule - -**`LogEvent.JOB_ERROR`** -- `jobid: int` - ID of the job that failed - -**`LogEvent.GROUP_INFO`** -- `group_id: int` - Group identifier -- `jobs: List[Any]` - Jobs in the group - -**`LogEvent.GROUP_ERROR`** -- `groupid: int` - Group identifier -- `aux_logs: List[Any]` - Auxiliary log information -- `job_error_info: Dict[str, Any]` - Job error details - -**`LogEvent.RESOURCES_INFO`** -- `nodes: Optional[List[str]]` - Available nodes -- `cores: Optional[int]` - Available cores -- `provided_resources: Optional[Dict[str, Any]]` - Provided resources - -**`LogEvent.DEBUG_DAG`** -- `status: Optional[str]` - DAG status -- `job: Optional[Any]` - Job information -- `file: Optional[str]` - Related file -- `exception: Optional[str]` - Exception information - -**`LogEvent.PROGRESS`** -- `done: int` - Number of completed jobs -- `total: int` - Total number of jobs - -**`LogEvent.RULEGRAPH`** -- `rulegraph: Dict[str, Any]` - Rule graph data structure - -**`LogEvent.RUN_INFO`** -- `per_rule_job_counts: Dict[str, int]` - Job count per rule -- `total_job_count: int` - Total number of jobs - -### Accessing Event Fields - -You can filter for specific events and access their fields in your `emit()` method: +The `LogEvent` enum defines particularly important Snakemake events such as workflow starting, job submission, job failure, etc. +For each event, a corresponding dataclass is defined in `snakemake_interface_logger_plugins.events`. +These dataclasses provide a typed interface for accessing event-specific fields from a `LogRecord`. + +**To extract event data from a `LogRecord`, use the appropriate dataclass's `from_record()` method.** + +### Event Types, Dataclasses, and Typical Fields + +| LogEvent | Dataclass | Typical Fields (see class for details) | +| --------------------------- | ----------------- | -------------------------------------------------------------------------------------------------------------------- | +| `LogEvent.ERROR` | `Error` | exception, location, rule, traceback, file, line | +| `LogEvent.WORKFLOW_STARTED` | `WorkflowStarted` | workflow_id, snakefile | +| `LogEvent.JOB_INFO` | `JobInfo` | jobid, rule_name, threads, input, output, log, benchmark, rule_msg, wildcards, reason, shellcmd, priority, resources | +| `LogEvent.JOB_STARTED` | `JobStarted` | job_ids | +| `LogEvent.JOB_FINISHED` | `JobFinished` | job_id | +| `LogEvent.SHELLCMD` | `ShellCmd` | jobid, shellcmd, rule_name | +| `LogEvent.JOB_ERROR` | `JobError` | jobid | +| `LogEvent.GROUP_INFO` | `GroupInfo` | group_id, jobs | +| `LogEvent.GROUP_ERROR` | `GroupError` | groupid, aux_logs, job_error_info | +| `LogEvent.RESOURCES_INFO` | `ResourcesInfo` | nodes, cores, provided_resources | +| `LogEvent.DEBUG_DAG` | `DebugDag` | status, job, file, exception | +| `LogEvent.PROGRESS` | `Progress` | done, total | +| `LogEvent.RULEGRAPH` | `RuleGraph` | rulegraph | +| `LogEvent.RUN_INFO` | `RunInfo` | per_rule_job_counts, total_job_count | + +**Note:** These field lists are guidelines only and may change between versions. +Always use defensive programming practices like `getattr()` with defaults or `hasattr()` checks when accessing fields. + +#### Example: Selecting the Right Dataclass for a LogRecord ```python +from snakemake_interface_logger_plugins.common import LogEvent +from snakemake_interface_logger_plugins import events + +def parse_event(record): + if not hasattr(record, "event"): + return None + + event = record.event + + # Map LogEvent to the corresponding dataclass + event_map = { + LogEvent.ERROR: events.Error, + LogEvent.WORKFLOW_STARTED: events.WorkflowStarted, + LogEvent.JOB_INFO: events.JobInfo, + LogEvent.JOB_STARTED: events.JobStarted, + LogEvent.JOB_FINISHED: events.JobFinished, + LogEvent.SHELLCMD: events.ShellCmd, + LogEvent.JOB_ERROR: events.JobError, + LogEvent.GROUP_INFO: events.GroupInfo, + LogEvent.GROUP_ERROR: events.GroupError, + LogEvent.RESOURCES_INFO: events.ResourcesInfo, + LogEvent.DEBUG_DAG: events.DebugDag, + LogEvent.PROGRESS: events.Progress, + LogEvent.RULEGRAPH: events.RuleGraph, + LogEvent.RUN_INFO: events.RunInfo, + } + + dataclass_type = event_map.get(event) + if dataclass_type is not None: + return dataclass_type.from_record(record) + else: + return None + +# Usage in a log handler: def emit(self, record): - if hasattr(record, 'event'): - if record.event == LogEvent.JOB_ERROR: - # Access job error fields - jobid = getattr(record, 'jobid', 0) - # Handle job errors - pass - elif record.event == LogEvent.JOB_FINISHED: - # Access job completion fields - job_id = getattr(record, 'job_id', 0) - # Handle job completion - pass - elif record.event == LogEvent.PROGRESS: - # Access progress fields - done = getattr(record, 'done', 0) - total = getattr(record, 'total', 0) - # Handle progress updates - pass -``` - -Always use `getattr(record, 'field_name', default_value)` or check with `hasattr(record, 'field_name')` before accessing fields, as not all fields may be present in every record. \ No newline at end of file + event_data = parse_event(record) + if event_data: + # Now you can access event-specific fields, e.g.: + if isinstance(event_data, events.JobError): + print(f"Job error for jobid: {event_data.jobid}") + elif isinstance(event_data, events.Progress): + print(f"Progress: {event_data.done}/{event_data.total}") +``` \ No newline at end of file