From 30ea7de5c2ae888919645a3c974a4544f753f358 Mon Sep 17 00:00:00 2001 From: Sebastian Goscik Date: Thu, 6 Jun 2024 00:15:06 +0100 Subject: [PATCH] Add experimental downloader This uses a new API to download events like the way the web ui does, where it first asks for a video to be prepared (on the unifi protect host) and then downloads it. This might be potentially more stable than the existing downloader. --- unifi_protect_backup/__init__.py | 1 + unifi_protect_backup/cli.py | 13 + .../downloader_experimental.py | 228 ++++++++++++++++++ .../unifi_protect_backup_core.py | 13 +- 4 files changed, 254 insertions(+), 1 deletion(-) create mode 100644 unifi_protect_backup/downloader_experimental.py diff --git a/unifi_protect_backup/__init__.py b/unifi_protect_backup/__init__.py index b161fb0..6ab655f 100644 --- a/unifi_protect_backup/__init__.py +++ b/unifi_protect_backup/__init__.py @@ -5,6 +5,7 @@ __email__ = 'sebastian@goscik.com' __version__ = '0.10.7' from .downloader import VideoDownloader +from .downloader_experimental import VideoDownloaderExperimental from .event_listener import EventListener from .purge import Purge from .uploader import VideoUploader diff --git a/unifi_protect_backup/cli.py b/unifi_protect_backup/cli.py index 0e6972d..63c08dd 100644 --- a/unifi_protect_backup/cli.py +++ b/unifi_protect_backup/cli.py @@ -211,6 +211,19 @@ Subsequent missing events will be downloaded (e.g. a missed event) type=int, help="Only download events shorter than this maximum length, in seconds", ) +@click.option( + '--experimental-downloader', + 'use_experimental_downloader', + default=False, + show_default=True, + is_flag=True, + envvar='EXPERIMENTAL_DOWNLOADER', + help="""\b +If set, a new experimental download mechanism will be used to match +what the web UI does. This might be more stable if you are experiencing +a lot of failed downloads with the default downloader. +""", +) def main(**kwargs): """A Python based tool for backing up Unifi Protect event clips as they occur.""" event_listener = UnifiProtectBackup(**kwargs) diff --git a/unifi_protect_backup/downloader_experimental.py b/unifi_protect_backup/downloader_experimental.py new file mode 100644 index 0000000..4b22f51 --- /dev/null +++ b/unifi_protect_backup/downloader_experimental.py @@ -0,0 +1,228 @@ +# noqa: D100 + +import asyncio +import json +import logging +import shutil +from datetime import datetime, timedelta, timezone +from typing import Optional + +import aiosqlite +import pytz +from aiohttp.client_exceptions import ClientPayloadError +from expiring_dict import ExpiringDict # type: ignore +from aiolimiter import AsyncLimiter +from pyunifiprotect import ProtectApiClient +from pyunifiprotect.data.nvr import Event +from pyunifiprotect.data.types import EventType + +from unifi_protect_backup.utils import ( + SubprocessException, + VideoQueue, + get_camera_name, + human_readable_size, + run_command, + setup_event_logger, +) + + +async def get_video_length(video: bytes) -> float: + """Uses ffprobe to get the length of the video file passed in as a byte stream.""" + returncode, stdout, stderr = await run_command( + 'ffprobe -v quiet -show_streams -select_streams v:0 -of json -', video + ) + + if returncode != 0: + raise SubprocessException(stdout, stderr, returncode) + + json_data = json.loads(stdout) + return float(json_data['streams'][0]['duration']) + + +class VideoDownloaderExperimental: + """Downloads event video clips from Unifi Protect.""" + + def __init__( + self, + protect: ProtectApiClient, + db: aiosqlite.Connection, + download_queue: asyncio.Queue, + upload_queue: VideoQueue, + color_logging: bool, + download_rate_limit: float, + max_event_length: timedelta, + ): + """Init. + + Args: + protect (ProtectApiClient): UniFi Protect API client to use + db (aiosqlite.Connection): Async SQLite database to check for missing events + download_queue (asyncio.Queue): Queue to get event details from + upload_queue (VideoQueue): Queue to place downloaded videos on + color_logging (bool): Whether or not to add color to logging output + download_rate_limit (float): Limit how events can be downloaded in one minute", + max_event_length (timedelta): Maximum length in seconds for an event to be considered valid and downloaded + """ + self._protect: ProtectApiClient = protect + self._db: aiosqlite.Connection = db + self.download_queue: asyncio.Queue = download_queue + self.upload_queue: VideoQueue = upload_queue + self.current_event = None + self._failures = ExpiringDict(60 * 60 * 12) # Time to live = 12h + self._download_rate_limit = download_rate_limit + self._max_event_length = max_event_length + self._limiter = AsyncLimiter(self._download_rate_limit) if self._download_rate_limit is not None else None + + self.base_logger = logging.getLogger(__name__) + setup_event_logger(self.base_logger, color_logging) + self.logger = logging.LoggerAdapter(self.base_logger, {'event': ''}) + + # Check if `ffprobe` is available + ffprobe = shutil.which('ffprobe') + if ffprobe is not None: + self.logger.debug(f"ffprobe found: {ffprobe}") + self._has_ffprobe = True + else: + self._has_ffprobe = False + + async def start(self): + """Main loop.""" + self.logger.info("Starting Downloader") + while True: + if self._limiter: + self.logger.debug("Waiting for rate limit") + await self._limiter.acquire() + + try: + # Wait for unifi protect to be connected + await self._protect.connect_event.wait() + + event = await self.download_queue.get() + + self.current_event = event + self.logger = logging.LoggerAdapter(self.base_logger, {'event': f' [{event.id}]'}) + + # Fix timezones since pyunifiprotect sets all timestamps to UTC. Instead localize them to + # the timezone of the unifi protect NVR. + event.start = event.start.replace(tzinfo=pytz.utc).astimezone(self._protect.bootstrap.nvr.timezone) + event.end = event.end.replace(tzinfo=pytz.utc).astimezone(self._protect.bootstrap.nvr.timezone) + + self.logger.info(f"Downloading event: {event.id}") + self.logger.debug(f"Remaining Download Queue: {self.download_queue.qsize()}") + output_queue_current_size = human_readable_size(self.upload_queue.qsize()) + output_queue_max_size = human_readable_size(self.upload_queue.maxsize) + self.logger.debug(f"Video Download Buffer: {output_queue_current_size}/{output_queue_max_size}") + self.logger.debug(f" Camera: {await get_camera_name(self._protect, event.camera_id)}") + if event.type == EventType.SMART_DETECT: + self.logger.debug(f" Type: {event.type.value} ({', '.join(event.smart_detect_types)})") + else: + self.logger.debug(f" Type: {event.type.value}") + self.logger.debug(f" Start: {event.start.strftime('%Y-%m-%dT%H-%M-%S')} ({event.start.timestamp()})") + self.logger.debug(f" End: {event.end.strftime('%Y-%m-%dT%H-%M-%S')} ({event.end.timestamp()})") + duration = (event.end - event.start).total_seconds() + self.logger.debug(f" Duration: {duration}s") + + # Skip invalid events + if not self._valid_event(event): + await self._ignore_event(event) + continue + + # Unifi protect does not return full video clips if the clip is requested too soon. + # There are two issues at play here: + # - Protect will only cut a clip on an keyframe which happen every 5s + # - Protect's pipeline needs a finite amount of time to make a clip available + # So we will wait 1.5x the keyframe interval to ensure that there is always ample video + # stored and Protect can return a full clip (which should be at least the length requested, + # but often longer) + time_since_event_ended = datetime.utcnow().replace(tzinfo=timezone.utc) - event.end + sleep_time = (timedelta(seconds=5 * 1.5) - time_since_event_ended).total_seconds() + if sleep_time > 0: + self.logger.debug(f" Sleeping ({sleep_time}s) to ensure clip is ready to download...") + await asyncio.sleep(sleep_time) + + try: + video = await self._download(event) + assert video is not None + except Exception as e: + # Increment failure count + if event.id not in self._failures: + self._failures[event.id] = 1 + else: + self._failures[event.id] += 1 + self.logger.warning(f"Event failed download attempt {self._failures[event.id]}", exc_info=e) + + if self._failures[event.id] >= 10: + self.logger.error( + "Event has failed to download 10 times in a row. Permanently ignoring this event" + ) + await self._ignore_event(event) + continue + + # Remove successfully downloaded event from failures list + if event.id in self._failures: + del self._failures[event.id] + + # Get the actual length of the downloaded video using ffprobe + if self._has_ffprobe: + await self._check_video_length(video, duration) + + await self.upload_queue.put((event, video)) + self.logger.debug("Added to upload queue") + self.current_event = None + + except Exception as e: + self.logger.error(f"Unexpected exception occurred, abandoning event {event.id}:", exc_info=e) + + async def _download(self, event: Event) -> Optional[bytes]: + """Downloads the video clip for the given event.""" + self.logger.debug(" Downloading video...") + for x in range(5): + assert isinstance(event.camera_id, str) + assert isinstance(event.start, datetime) + assert isinstance(event.end, datetime) + try: + prepared_video_file = await self._protect.prepare_camera_video(event.camera_id, event.start, event.end) + video = await self._protect.download_camera_video(event.camera_id, prepared_video_file['fileName']) + assert isinstance(video, bytes) + break + except (AssertionError, ClientPayloadError, TimeoutError) as e: + self.logger.warning(f" Failed download attempt {x+1}, retying in 1s", exc_info=e) + await asyncio.sleep(1) + else: + self.logger.error(f"Download failed after 5 attempts, abandoning event {event.id}:") + return None + + self.logger.debug(f" Downloaded video size: {human_readable_size(len(video))}s") + return video + + async def _ignore_event(self, event): + self.logger.warning("Ignoring event") + await self._db.execute( + "INSERT INTO events VALUES " + f"('{event.id}', '{event.type.value}', '{event.camera_id}'," + f"'{event.start.timestamp()}', '{event.end.timestamp()}')" + ) + await self._db.commit() + + async def _check_video_length(self, video, duration): + """Check if the downloaded event is at least the length of the event, warn otherwise. + + It is expected for events to regularly be slightly longer than the event specified + """ + try: + downloaded_duration = await get_video_length(video) + msg = f" Downloaded video length: {downloaded_duration:.3f}s" f"({downloaded_duration - duration:+.3f}s)" + if downloaded_duration < duration: + self.logger.warning(msg) + else: + self.logger.debug(msg) + except SubprocessException as e: + self.logger.warning(" `ffprobe` failed", exc_info=e) + + def _valid_event(self, event): + duration = event.end - event.start + if duration > self._max_event_length: + self.logger.warning(f"Event longer ({duration}) than max allowed length {self._max_event_length}") + return False + + return True diff --git a/unifi_protect_backup/unifi_protect_backup_core.py b/unifi_protect_backup/unifi_protect_backup_core.py index ecf8c44..0970c32 100644 --- a/unifi_protect_backup/unifi_protect_backup_core.py +++ b/unifi_protect_backup/unifi_protect_backup_core.py @@ -1,4 +1,5 @@ """Main module.""" + import asyncio import logging import os @@ -16,6 +17,7 @@ from unifi_protect_backup import ( MissingEventChecker, Purge, VideoDownloader, + VideoDownloaderExperimental, VideoUploader, notifications, ) @@ -67,6 +69,7 @@ class UnifiProtectBackup: color_logging: bool = False, download_rate_limit: float = None, port: int = 443, + use_experimental_downloader: bool = False, ): """Will configure logging settings and the Unifi Protect API (but not actually connect). @@ -97,6 +100,7 @@ class UnifiProtectBackup: color_logging (bool): Whether to add color to logging output or not download_rate_limit (float): Limit how events can be downloaded in one minute. Disabled by default", max_event_length (int): Maximum length in seconds for an event to be considered valid and downloaded + use_experimental_downloader (bool): Use the new experimental downloader (the same method as used by the webUI) """ self.color_logging = color_logging setup_logging(verbose, self.color_logging) @@ -133,6 +137,7 @@ class UnifiProtectBackup: logger.debug(f" {skip_missing=}") logger.debug(f" {download_rate_limit=} events per minute") logger.debug(f" {max_event_length=}s") + logger.debug(f" {use_experimental_downloader=}") self.rclone_destination = rclone_destination self.retention = retention @@ -166,6 +171,7 @@ class UnifiProtectBackup: self._skip_missing = skip_missing self._download_rate_limit = download_rate_limit self._max_event_length = timedelta(seconds=max_event_length) + self._use_experimental_downloader = use_experimental_downloader async def start(self): """Bootstrap the backup process and kick off the main loop. @@ -225,7 +231,12 @@ class UnifiProtectBackup: # Create downloader task # This will download video files to its buffer - downloader = VideoDownloader( + if self._use_experimental_downloader: + downloader_cls = VideoDownloaderExperimental + else: + downloader_cls = VideoDownloader + + downloader = downloader_cls( self._protect, self._db, download_queue,