diff --git a/poetry.lock b/poetry.lock index 2f32e27..5155427 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2169,6 +2169,18 @@ files = [ {file = "types_cryptography-3.3.23.2-py3-none-any.whl", hash = "sha256:b965d548f148f8e87f353ccf2b7bd92719fdf6c845ff7cedf2abb393a0643e4f"}, ] +[[package]] +name = "types-python-dateutil" +version = "2.8.19.10" +description = "Typing stubs for python-dateutil" +category = "dev" +optional = false +python-versions = "*" +files = [ + {file = "types-python-dateutil-2.8.19.10.tar.gz", hash = "sha256:c640f2eb71b4b94a9d3bfda4c04250d29a24e51b8bad6e12fddec0cf6e96f7a3"}, + {file = "types_python_dateutil-2.8.19.10-py3-none-any.whl", hash = "sha256:fbecd02c19cac383bf4a16248d45ffcff17c93a04c0794be5f95d42c6aa5de39"}, +] + [[package]] name = "types-pytz" version = "2021.3.8" @@ -2382,4 +2394,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more [metadata] lock-version = "2.0" python-versions = ">=3.9.0,<4.0" -content-hash = "9025e6deb7dca5f2c5cbf1f5545dcf82789c5190634993b8b88899865d86208c" +content-hash = "403929920e1f93f1267d3ffc57b3586451d6c880407c2e399be10f8f028315dd" diff --git a/pyproject.toml b/pyproject.toml index 478a2cd..7888ba1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,7 @@ types-cryptography = "^3.3.18" twine = "^3.3.0" bump2version = "^1.0.1" pre-commit = "^2.12.0" +types-python-dateutil = "^2.8.19.10" [tool.poetry.group.test] optional = true @@ -88,6 +89,9 @@ skip_gitignore = true # you can skip files as below #skip_glob = docs/conf.py +[tool.mypy] +allow_redefinition=true + [build-system] requires = ["poetry-core>=1.0.0"] build-backend = "poetry.core.masonry.api" diff --git a/tests/test_unifi_protect_backup.py b/tests/test_unifi_protect_backup.py index 316d55a..c00cfee 100644 --- a/tests/test_unifi_protect_backup.py +++ b/tests/test_unifi_protect_backup.py @@ -1,7 +1,7 @@ #!/usr/bin/env python """Tests for `unifi_protect_backup` package.""" -import pytest +import pytest # type: ignore # from click.testing import CliRunner diff --git a/unifi_protect_backup/cli.py b/unifi_protect_backup/cli.py index 8c2ce44..67e7f94 100644 --- a/unifi_protect_backup/cli.py +++ b/unifi_protect_backup/cli.py @@ -1,9 +1,7 @@ """Console script for unifi_protect_backup.""" -import asyncio - import click -from aiorun import run +from aiorun import run # type: ignore from unifi_protect_backup import __version__ from unifi_protect_backup.unifi_protect_backup_core import UnifiProtectBackup @@ -149,7 +147,7 @@ The following notification tags are available (corresponding to the respective l ERROR, WARNING, INFO, DEBUG, EXTRA_DEBUG, WEBSOCKET_DATA -If no tags are specified, it defaults to ERROR +If no tags are specified, it defaults to ERROR More details about supported platforms can be found here: https://github.com/caronc/apprise""", ) diff --git a/unifi_protect_backup/downloader.py b/unifi_protect_backup/downloader.py index 26a07d5..cbdfee8 100644 --- a/unifi_protect_backup/downloader.py +++ b/unifi_protect_backup/downloader.py @@ -1,8 +1,11 @@ +# noqa: D100 + import asyncio import json import logging import shutil from datetime import datetime, timedelta, timezone +from typing import Optional import pytz from aiohttp.client_exceptions import ClientPayloadError @@ -21,7 +24,7 @@ from unifi_protect_backup.utils import ( async def get_video_length(video: bytes) -> float: - """Uses ffprobe to get the length of the video file passed in as a byte stream""" + """Uses ffprobe to get the length of the video file passed in as a byte stream.""" returncode, stdout, stderr = await run_command( 'ffprobe -v quiet -show_streams -select_streams v:0 -of json -', video ) @@ -34,11 +37,19 @@ async def get_video_length(video: bytes) -> float: class VideoDownloader: - """Downloads event video clips from Unifi Protect""" + """Downloads event video clips from Unifi Protect.""" def __init__( self, protect: ProtectApiClient, download_queue: asyncio.Queue, upload_queue: VideoQueue, color_logging: bool ): + """Init. + + Args: + protect (ProtectApiClient): UniFi Protect API client to use + download_queue (asyncio.Queue): Queue to get event details from + upload_queue (VideoQueue): Queue to place downloaded videos on + color_logging (bool): Whether or not to add color to logging output + """ self._protect: ProtectApiClient = protect self.download_queue: asyncio.Queue = download_queue self.upload_queue: VideoQueue = upload_queue @@ -57,7 +68,7 @@ class VideoDownloader: self._has_ffprobe = False async def start(self): - """Main loop""" + """Main loop.""" self.logger.info("Starting Downloader") while True: try: @@ -113,11 +124,14 @@ class VideoDownloader: except Exception as e: self.logger.error(f"Unexpected exception occurred, abandoning event {event.id}:", exc_info=e) - async def _download(self, event: Event) -> bytes: - """Downloads the video clip for the given event""" + async def _download(self, event: Event) -> Optional[bytes]: + """Downloads the video clip for the given event.""" self.logger.debug(" Downloading video...") for x in range(5): try: + assert isinstance(event.camera_id, str) + assert isinstance(event.start, datetime) + assert isinstance(event.end, datetime) video = await self._protect.get_camera_video(event.camera_id, event.start, event.end) assert isinstance(video, bytes) break @@ -126,15 +140,16 @@ class VideoDownloader: await asyncio.sleep(1) else: self.logger.error(f"Download failed after 5 attempts, abandoning event {event.id}:") - return + return None self.logger.debug(f" Downloaded video size: {human_readable_size(len(video))}s") return video async def _check_video_length(self, video, duration): - """Check if the downloaded event is at least the length of the event, warn otherwise + """Check if the downloaded event is at least the length of the event, warn otherwise. - It is expected for events to regularly be slightly longer than the event specified""" + It is expected for events to regularly be slightly longer than the event specified + """ try: downloaded_duration = await get_video_length(video) msg = f" Downloaded video length: {downloaded_duration:.3f}s" f"({downloaded_duration - duration:+.3f}s)" @@ -143,4 +158,4 @@ class VideoDownloader: else: self.logger.debug(msg) except SubprocessException as e: - self.logger.warning(" `ffprobe` failed") + self.logger.warning(" `ffprobe` failed", exc_info=e) diff --git a/unifi_protect_backup/event_listener.py b/unifi_protect_backup/event_listener.py index 31c8c6b..5672ccc 100644 --- a/unifi_protect_backup/event_listener.py +++ b/unifi_protect_backup/event_listener.py @@ -1,3 +1,5 @@ +# noqa: D100 + import asyncio import logging from time import sleep @@ -12,7 +14,7 @@ logger = logging.getLogger(__name__) class EventListener: - """Listens to the unifi protect websocket for new events to backup""" + """Listens to the unifi protect websocket for new events to backup.""" def __init__( self, @@ -21,6 +23,14 @@ class EventListener: detection_types: List[str], ignore_cameras: List[str], ): + """Init. + + Args: + event_queue (asyncio.Queue): Queue to place events to backup on + protect (ProtectApiClient): UniFI Protect API client to use + detection_types (List[str]): Desired Event detection types to look for + ignore_cameras (List[str]): Cameras IDs to ignore events from + """ self._event_queue: asyncio.Queue = event_queue self._protect: ProtectApiClient = protect self._unsub = None @@ -28,7 +38,7 @@ class EventListener: self.ignore_cameras: List[str] = ignore_cameras async def start(self): - """Main Loop""" + """Main Loop.""" logger.debug("Subscribed to websocket") self._unsub = self._protect.subscribe_websocket(self._websocket_callback) @@ -71,7 +81,7 @@ class EventListener: # TODO: Will this even work? I think it will block the async loop while self._event_queue.full(): - logger.extra_debug("Event queue full, waiting 1s...") + logger.extra_debug("Event queue full, waiting 1s...") # type: ignore sleep(1) self._event_queue.put_nowait(msg.new_obj) @@ -85,8 +95,7 @@ class EventListener: logger.debug(f"Adding event {msg.new_obj.id} to queue (Current download queue={self._event_queue.qsize()})") async def _check_websocket_and_reconnect(self): - """Checks for websocket disconnect and triggers a reconnect""" - + """Checks for websocket disconnect and triggers a reconnect.""" logger.extra_debug("Checking the status of the websocket...") if self._protect.check_ws(): logger.extra_debug("Websocket is connected.") diff --git a/unifi_protect_backup/missing_event_checker.py b/unifi_protect_backup/missing_event_checker.py index 5a14b8d..105b6fd 100644 --- a/unifi_protect_backup/missing_event_checker.py +++ b/unifi_protect_backup/missing_event_checker.py @@ -1,3 +1,5 @@ +# noqa: D100 + import asyncio import logging from datetime import datetime @@ -14,7 +16,7 @@ logger = logging.getLogger(__name__) class MissingEventChecker: - """Periodically checks if any unifi protect events exist within the retention period that are not backed up""" + """Periodically checks if any unifi protect events exist within the retention period that are not backed up.""" def __init__( self, @@ -28,6 +30,19 @@ class MissingEventChecker: ignore_cameras: List[str], interval: int = 60 * 5, ) -> None: + """Init. + + Args: + protect (ProtectApiClient): UniFi Protect API client to use + db (aiosqlite.Connection): Async SQLite database to check for missing events + download_queue (asyncio.Queue): Download queue to check for on-going downloads + downloader (VideoDownloader): Downloader to check for on-going downloads + uploader (VideoUploader): Uploader to check for on-going uploads + retention (relativedelta): Retention period to limit search window + detection_types (List[str]): Detection types wanted to limit search + ignore_cameras (List[str]): Ignored camera IDs to limit search + interval (int): How frequently, in seconds, to check for missing events, + """ self._protect: ProtectApiClient = protect self._db: aiosqlite.Connection = db self._download_queue: asyncio.Queue = download_queue @@ -39,7 +54,7 @@ class MissingEventChecker: self.interval: int = interval async def start(self): - """main loop""" + """Main loop.""" logger.info("Starting Missing Event Checker") while True: try: @@ -102,15 +117,19 @@ class MissingEventChecker: event = unifi_events[event_id] if event.type != EventType.SMART_DETECT: missing_logger( - f" Adding missing event to backup queue: {event.id} ({event.type}) ({event.start.strftime('%Y-%m-%dT%H-%M-%S')} - {event.end.strftime('%Y-%m-%dT%H-%M-%S')})" + f" Adding missing event to backup queue: {event.id} ({event.type})" + f" ({event.start.strftime('%Y-%m-%dT%H-%M-%S')} -" + f" {event.end.strftime('%Y-%m-%dT%H-%M-%S')})" ) else: missing_logger( - f" Adding missing event to backup queue: {event.id} ({', '.join(event.smart_detect_types)}) ({event.start.strftime('%Y-%m-%dT%H-%M-%S')} - {event.end.strftime('%Y-%m-%dT%H-%M-%S')})" + f" Adding missing event to backup queue: {event.id} ({', '.join(event.smart_detect_types)})" + f" ({event.start.strftime('%Y-%m-%dT%H-%M-%S')} -" + f" {event.end.strftime('%Y-%m-%dT%H-%M-%S')})" ) await self._download_queue.put(event) except Exception as e: - logger.error(f"Unexpected exception occurred during missing event check:", exc_info=e) + logger.error("Unexpected exception occurred during missing event check:", exc_info=e) await asyncio.sleep(self.interval) diff --git a/unifi_protect_backup/notifications.py b/unifi_protect_backup/notifications.py index 60efd8f..c7b0649 100644 --- a/unifi_protect_backup/notifications.py +++ b/unifi_protect_backup/notifications.py @@ -1,9 +1,12 @@ +"""A 'singleton' module for registering apprise notifiers.""" + import apprise notifier = apprise.Apprise() def add_notification_service(url): + """Add apprise URI with support for tags e.g. TAG1,TAG2=PROTOCOL://settings.""" config = apprise.AppriseConfig() config.add_config(url, format='text') diff --git a/unifi_protect_backup/purge.py b/unifi_protect_backup/purge.py index 4a1de06..43d678d 100644 --- a/unifi_protect_backup/purge.py +++ b/unifi_protect_backup/purge.py @@ -1,4 +1,5 @@ -import asyncio +# noqa: D100 + import logging import time from datetime import datetime @@ -12,34 +13,44 @@ logger = logging.getLogger(__name__) async def delete_file(file_path): + """Deletes `file_path` via rclone.""" returncode, stdout, stderr = await run_command(f'rclone delete -vv "{file_path}"') if returncode != 0: logger.error(f" Failed to delete file: '{file_path}'") async def tidy_empty_dirs(base_dir_path): + """Deletes any empty directories in `base_dir_path` via rclone.""" returncode, stdout, stderr = await run_command(f'rclone rmdirs -vv --ignore-errors --leave-root "{base_dir_path}"') if returncode != 0: - logger.error(f" Failed to tidy empty dirs") + logger.error(" Failed to tidy empty dirs") class Purge: - """Deletes old files from rclone remotes""" + """Deletes old files from rclone remotes.""" def __init__( self, db: aiosqlite.Connection, retention: relativedelta, rclone_destination: str, - interval: relativedelta(days=1), + interval: relativedelta = relativedelta(days=1), ): + """Init. + + Args: + db (aiosqlite.Connection): Async SQlite database connection to purge clips from + retention (relativedelta): How long clips should be kept + rclone_destination (str): What rclone destination the clips are stored in + interval (relativedelta): How often to purge old clips + """ self._db: aiosqlite.Connection = db self.retention: relativedelta = retention self.rclone_destination: str = rclone_destination self.interval: relativedelta = interval async def start(self): - """Main loop - runs forever""" + """Main loop - runs forever.""" while True: try: deleted_a_file = False @@ -69,7 +80,7 @@ class Purge: await tidy_empty_dirs(self.rclone_destination) except Exception as e: - logger.error(f"Unexpected exception occurred during purge:", exc_info=e) + logger.error("Unexpected exception occurred during purge:", exc_info=e) next_purge_time = datetime.now() + self.interval logger.extra_debug(f'sleeping until {next_purge_time}') diff --git a/unifi_protect_backup/unifi_protect_backup_core.py b/unifi_protect_backup/unifi_protect_backup_core.py index c55253b..4f402b6 100644 --- a/unifi_protect_backup/unifi_protect_backup_core.py +++ b/unifi_protect_backup/unifi_protect_backup_core.py @@ -3,9 +3,7 @@ import asyncio import logging import os import shutil -from cmath import log from datetime import datetime, timezone -from time import sleep from typing import Callable, List import aiosqlite @@ -35,7 +33,7 @@ logger = logging.getLogger(__name__) async def create_database(path: str): - """Creates sqlite database and creates the events abd backups tables""" + """Creates sqlite database and creates the events abd backups tables.""" db = await aiosqlite.connect(path) await db.execute("CREATE TABLE events(id PRIMARY KEY, type, camera_id, start REAL, end REAL)") await db.execute( @@ -92,8 +90,11 @@ class UnifiProtectBackup: ignore_cameras (List[str]): List of camera IDs for which to not backup events. file_structure_format (str): A Python format string for output file path. verbose (int): How verbose to setup logging, see :func:`setup_logging` for details. - sqlite_path (str): Path where to find/create sqlite database + download_buffer_size (int): How many bytes big the download buffer should be purge_interval (str): How often to check for files to delete + apprise_notifiers (str): Apprise URIs for notifications + sqlite_path (str): Path where to find/create sqlite database + color_logging (bool): Whether to add color to logging output or not """ for notifier in apprise_notifiers: notifications.add_notification_service(notifier) @@ -256,7 +257,7 @@ class UnifiProtectBackup: await self._db.close() except Exception as e: - logger.error(f"Unexpected exception occurred in main loop:", exc_info=e) + logger.error("Unexpected exception occurred in main loop:", exc_info=e) await asyncio.sleep(10) # Give remaining tasks a chance to complete e.g sending notifications raise diff --git a/unifi_protect_backup/uploader.py b/unifi_protect_backup/uploader.py index 19246a8..0a3ae05 100644 --- a/unifi_protect_backup/uploader.py +++ b/unifi_protect_backup/uploader.py @@ -1,4 +1,5 @@ -import asyncio +# noqa: D100 + import logging import pathlib import re @@ -12,7 +13,7 @@ from unifi_protect_backup.utils import VideoQueue, get_camera_name, human_readab class VideoUploader: - """Uploads videos from the video_queue to the provided rclone destination + """Uploads videos from the video_queue to the provided rclone destination. Keeps a log of what its uploaded in `db` """ @@ -27,6 +28,17 @@ class VideoUploader: db: aiosqlite.Connection, color_logging: bool, ): + """Init. + + Args: + protect (ProtectApiClient): UniFi Protect API client to use + upload_queue (VideoQueue): Queue to get video files from + rclone_destination (str): rclone file destination URI + rclone_args (str): arguments to pass to the rclone command + file_structure_format (str): format string for how to structure the uploaded files + db (aiosqlite.Connection): Async SQlite database connection + color_logging (bool): Whether or not to add color to logging output + """ self._protect: ProtectApiClient = protect self.upload_queue: VideoQueue = upload_queue self._rclone_destination: str = rclone_destination @@ -40,11 +52,11 @@ class VideoUploader: self.logger = logging.LoggerAdapter(self.base_logger, {'event': ''}) async def start(self): - """Main loop + """Main loop. - Runs forever looking for video data in the video queue and then uploads it using rclone, finally it updates the database + Runs forever looking for video data in the video queue and then uploads it + using rclone, finally it updates the database """ - self.logger.info("Starting Uploader") while True: try: @@ -55,7 +67,8 @@ class VideoUploader: self.logger.info(f"Uploading event: {event.id}") self.logger.debug( - f" Remaining Upload Queue: {self.upload_queue.qsize_files()} ({human_readable_size(self.upload_queue.qsize())})" + f" Remaining Upload Queue: {self.upload_queue.qsize_files()}" + f" ({human_readable_size(self.upload_queue.qsize())})" ) destination = await self._generate_file_path(event) @@ -64,7 +77,7 @@ class VideoUploader: await self._upload_video(video, destination, self._rclone_args) await self._update_database(event, destination) - self.logger.debug(f"Uploaded") + self.logger.debug("Uploaded") self.current_event = None except Exception as e: @@ -89,13 +102,13 @@ class VideoUploader: self.logger.error(f" Failed to upload file: '{destination}'") async def _update_database(self, event: Event, destination: str): - """ - Add the backed up event to the database along with where it was backed up to - """ + """Add the backed up event to the database along with where it was backed up to.""" + assert isinstance(event.start, datetime) + assert isinstance(event.end, datetime) await self._db.execute( - f"""INSERT INTO events VALUES - ('{event.id}', '{event.type}', '{event.camera_id}', '{event.start.timestamp()}', '{event.end.timestamp()}') - """ + "INSERT INTO events VALUES " + f"('{event.id}', '{event.type}', '{event.camera_id}'," + f"'{event.start.timestamp()}', '{event.end.timestamp()}')" ) remote, file_path = str(destination).split(":") diff --git a/unifi_protect_backup/utils.py b/unifi_protect_backup/utils.py index 25f063a..1e8ae56 100644 --- a/unifi_protect_backup/utils.py +++ b/unifi_protect_backup/utils.py @@ -1,3 +1,5 @@ +"""Utility functions used throughout the code, kept here to allow re use and/or minimize clutter elsewhere.""" + import asyncio import logging import re @@ -7,6 +9,7 @@ from typing import List, Optional from apprise import NotifyType from dateutil.relativedelta import relativedelta from pyunifiprotect import ProtectApiClient +from pyunifiprotect.data.nvr import Event from unifi_protect_backup import notifications @@ -64,9 +67,7 @@ def add_logging_level(levelName: str, levelNum: int, methodName: Optional[str] = logging.log(levelNum, message, *args, **kwargs) def adapterLog(self, msg, *args, **kwargs): - """ - Delegate an error call to the underlying logger. - """ + """Delegate an error call to the underlying logger.""" self.log(levelNum, msg, *args, **kwargs) logging.addLevelName(levelNum, levelName) @@ -80,6 +81,7 @@ color_logging = False def add_color_to_record_levelname(record): + """Colorizes logging level names.""" levelno = record.levelno if levelno >= logging.CRITICAL: color = '\x1b[31;1m' # RED @@ -100,11 +102,18 @@ def add_color_to_record_levelname(record): class AppriseStreamHandler(logging.StreamHandler): - def __init__(self, color_logging, *args, **kwargs): + """Logging handler that also sends logging output to configured Apprise notifiers.""" + + def __init__(self, color_logging: bool, *args, **kwargs): + """Init. + + Args: + color_logging (bool): If true logging levels will be colorized + """ super().__init__(*args, **kwargs) self.color_logging = color_logging - def emit_apprise(self, record): + def _emit_apprise(self, record): try: loop = asyncio.get_event_loop() except RuntimeError: @@ -134,7 +143,7 @@ class AppriseStreamHandler(logging.StreamHandler): else: loop.run_until_complete(notify) - def emit_stream(self, record): + def _emit_stream(self, record): record.levelname = f"{record.levelname:^11s}" # Pad level name to max width if self.color_logging: record.levelname = add_color_to_record_levelname(record) @@ -146,15 +155,16 @@ class AppriseStreamHandler(logging.StreamHandler): self.flush() def emit(self, record): + """Emit log to stdout and apprise.""" try: - self.emit_apprise(record) + self._emit_apprise(record) except RecursionError: # See issue 36272 raise except Exception: self.handleError(record) try: - self.emit_stream(record) + self._emit_stream(record) except RecursionError: # See issue 36272 raise except Exception: @@ -162,6 +172,7 @@ class AppriseStreamHandler(logging.StreamHandler): def create_logging_handler(format, color_logging): + """Constructs apprise logging handler for the given format.""" date_format = "%Y-%m-%d %H:%M:%S" style = '{' @@ -228,6 +239,7 @@ def setup_logging(verbosity: int, color_logging: bool = False, apprise_notifiers def setup_event_logger(logger, color_logging): + """Sets up a logger that also displays the event ID currently being processed.""" format = "{asctime} [{levelname:^11s}] {name:<42} :{event} {message}" sh = create_logging_handler(format, color_logging) logger.addHandler(sh) @@ -253,6 +265,7 @@ def human_readable_size(num: float): def human_readable_to_float(num: str): + """Turns a human readable ISO/IEC 80000 suffix value to its full float value.""" pattern = r"([\d.]+)(" + "|".join(_suffixes) + ")" result = re.match(pattern, num) if result is None: @@ -265,8 +278,7 @@ def human_readable_to_float(num: str): async def get_camera_name(protect: ProtectApiClient, id: str): - """ - Returns the name for the camera with the given ID + """Returns the name for the camera with the given ID. If the camera ID is not know, it tries refreshing the cached data """ @@ -289,6 +301,8 @@ async def get_camera_name(protect: ProtectApiClient, id: str): class SubprocessException(Exception): + """Class to capture: stdout, stderr, and return code of Subprocess errors.""" + def __init__(self, stdout, stderr, returncode): """Exception class for when rclone does not exit with `0`. @@ -308,11 +322,7 @@ class SubprocessException(Exception): def parse_rclone_retention(retention: str) -> relativedelta: - """ - Parses the rclone `retention` parameter into a relativedelta which can then be used - to calculate datetimes - """ - + """Parses the rclone `retention` parameter into a relativedelta which can then be used to calculate datetimes.""" matches = {k: int(v) for v, k in re.findall(r"([\d]+)(ms|s|m|h|d|w|M|y)", retention)} return relativedelta( microseconds=matches.get("ms", 0) * 1000, @@ -327,9 +337,7 @@ def parse_rclone_retention(retention: str) -> relativedelta: async def run_command(cmd: str, data=None): - """ - Runs the given command returning the exit code, stdout and stderr - """ + """Runs the given command returning the exit code, stdout and stderr.""" proc = await asyncio.create_subprocess_shell( cmd, stdin=asyncio.subprocess.PIPE, @@ -347,16 +355,17 @@ async def run_command(cmd: str, data=None): logger.error(f"stdout:\n{stdout_indented}") logger.error(f"stderr:\n{stderr_indented}") else: - logger.extra_debug(f"stdout:\n{stdout_indented}") - logger.extra_debug(f"stderr:\n{stderr_indented}") + logger.extra_debug(f"stdout:\n{stdout_indented}") # type: ignore + logger.extra_debug(f"stderr:\n{stderr_indented}") # type: ignore return proc.returncode, stdout, stderr class VideoQueue(asyncio.Queue): - """A queue that limits the number of bytes it can store rather than discrete entries""" + """A queue that limits the number of bytes it can store rather than discrete entries.""" def __init__(self, *args, **kwargs): + """Init.""" super().__init__(*args, **kwargs) self._bytes_sum = 0 @@ -373,11 +382,11 @@ class VideoQueue(asyncio.Queue): self._bytes_sum -= len(data[1]) return data - def _put(self, item: bytes): - self._queue.append(item) + def _put(self, item: tuple[Event, bytes]): + self._queue.append(item) # type: ignore self._bytes_sum += len(item[1]) - def full(self, item: bytes = None): + def full(self, item: tuple[Event, bytes] = None): """Return True if there are maxsize bytes in the queue. optionally if `item` is provided, it will return False if there is enough space to @@ -386,35 +395,36 @@ class VideoQueue(asyncio.Queue): Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True. """ - if self._maxsize <= 0: + if self._maxsize <= 0: # type: ignore return False else: if item is None: - return self.qsize() >= self._maxsize + return self.qsize() >= self._maxsize # type: ignore else: - return self.qsize() + len(item[1]) >= self._maxsize + return self.qsize() + len(item[1]) >= self._maxsize # type: ignore - async def put(self, item: bytes): + async def put(self, item: tuple[Event, bytes]): """Put an item into the queue. Put an item into the queue. If the queue is full, wait until a free slot is available before adding item. """ - if len(item[1]) > self._maxsize: + if len(item[1]) > self._maxsize: # type: ignore raise ValueError( - f"Item is larger ({human_readable_size(len(item[1]))}) than the size of the buffer ({human_readable_size(self._maxsize)})" + f"Item is larger ({human_readable_size(len(item[1]))}) " + f"than the size of the buffer ({human_readable_size(self._maxsize)})" # type: ignore ) while self.full(item): - putter = self._loop.create_future() - self._putters.append(putter) + putter = self._loop.create_future() # type: ignore + self._putters.append(putter) # type: ignore try: await putter - except: + except: # noqa: E722 putter.cancel() # Just in case putter is not done yet. try: # Clean self._putters from canceled putters. - self._putters.remove(putter) + self._putters.remove(putter) # type: ignore except ValueError: # The putter could be removed from self._putters by a # previous get_nowait call. @@ -422,11 +432,11 @@ class VideoQueue(asyncio.Queue): if not self.full(item) and not putter.cancelled(): # We were woken up by get_nowait(), but can't take # the call. Wake up the next in line. - self._wakeup_next(self._putters) + self._wakeup_next(self._putters) # type: ignore raise return self.put_nowait(item) - def put_nowait(self, item: bytes): + def put_nowait(self, item: tuple[Event, bytes]): """Put an item into the queue without blocking. If no free slot is immediately available, raise QueueFull. @@ -434,12 +444,12 @@ class VideoQueue(asyncio.Queue): if self.full(item): raise asyncio.QueueFull self._put(item) - self._unfinished_tasks += 1 - self._finished.clear() - self._wakeup_next(self._getters) + self._unfinished_tasks += 1 # type: ignore + self._finished.clear() # type: ignore + self._wakeup_next(self._getters) # type: ignore async def wait_until(dt): - # sleep until the specified datetime + """Sleep until the specified datetime.""" now = datetime.now() await asyncio.sleep((dt - now).total_seconds())