diff --git a/unifi_protect_backup/downloader.py b/unifi_protect_backup/downloader.py index 01f3eae..471e0b8 100644 --- a/unifi_protect_backup/downloader.py +++ b/unifi_protect_backup/downloader.py @@ -39,11 +39,12 @@ async def get_video_length(video: bytes) -> float: class VideoDownloader: """Downloads event video clips from Unifi Protect""" - def __init__(self, protect: ProtectApiClient, download_queue: asyncio.Queue, buffer_size: int = 256 * 1024 * 1024): + def __init__(self, protect: ProtectApiClient, download_queue: asyncio.Queue, upload_queue: VideoQueue): self._protect: ProtectApiClient = protect - self._download_queue: asyncio.Queue = download_queue - self.video_queue = VideoQueue(buffer_size) + self.download_queue: asyncio.Queue = download_queue + self.upload_queue: VideoQueue = upload_queue self.logger = logging.LoggerAdapter(logger, {'event': ''}) + self.current_event = None # Check if `ffprobe` is available ffprobe = shutil.which('ffprobe') @@ -58,7 +59,8 @@ class VideoDownloader: self.logger.info("Starting Downloader") while True: try: - event = await self._download_queue.get() + event = await self.download_queue.get() + self.current_event = event self.logger = logging.LoggerAdapter(logger, {'event': f' [{event.id}]'}) # Fix timezones since pyunifiprotect sets all timestamps to UTC. Instead localize them to @@ -67,9 +69,9 @@ class VideoDownloader: event.end = event.end.replace(tzinfo=pytz.utc).astimezone(self._protect.bootstrap.nvr.timezone) self.logger.info(f"Downloading event: {event.id}") - self.logger.debug(f"Remaining Download Queue: {self._download_queue.qsize()}") - output_queue_current_size = human_readable_size(self.video_queue.qsize()) - output_queue_max_size = human_readable_size(self.video_queue.maxsize) + self.logger.debug(f"Remaining Download Queue: {self.download_queue.qsize()}") + output_queue_current_size = human_readable_size(self.upload_queue.qsize()) + output_queue_max_size = human_readable_size(self.upload_queue.maxsize) self.logger.debug(f"Video Download Buffer: {output_queue_current_size}/{output_queue_max_size}") self.logger.debug(f" Camera: {await get_camera_name(self._protect, event.camera_id)}") if event.type == EventType.SMART_DETECT: @@ -102,7 +104,7 @@ class VideoDownloader: if self._has_ffprobe: await self._check_video_length(video, duration) - await self.video_queue.put((event, video)) + await self.upload_queue.put((event, video)) self.logger.debug("Added to upload queue") except Exception as e: diff --git a/unifi_protect_backup/missing_event_checker.py b/unifi_protect_backup/missing_event_checker.py index cc065e2..60d6fd2 100644 --- a/unifi_protect_backup/missing_event_checker.py +++ b/unifi_protect_backup/missing_event_checker.py @@ -9,6 +9,8 @@ from dateutil.relativedelta import relativedelta from pyunifiprotect import ProtectApiClient from pyunifiprotect.data.types import EventType +from unifi_protect_backup import VideoDownloader, VideoUploader + logger = logging.getLogger(__name__) @@ -19,7 +21,9 @@ class MissingEventChecker: self, protect: ProtectApiClient, db: aiosqlite.Connection, - event_queue: asyncio.Queue, + download_queue: asyncio.Queue, + downloader: VideoDownloader, + uploader: VideoUploader, retention: relativedelta, detection_types: List[str], ignore_cameras: List[str], @@ -27,7 +31,9 @@ class MissingEventChecker: ) -> None: self._protect: ProtectApiClient = protect self._db: aiosqlite.Connection = db - self._event_queue: asyncio.Queue = event_queue + self._download_queue: asyncio.Queue = download_queue + self._downloader: VideoDownloader = downloader + self._uploader: VideoUploader = uploader self.retention: relativedelta = retention self.detection_types: List[str] = detection_types self.ignore_cameras: List[str] = ignore_cameras @@ -54,10 +60,15 @@ class MissingEventChecker: rows = await cursor.fetchall() db_event_ids = {row[0] for row in rows} - # Prevent re-adding events currently in the download queue - downloading_event_ids = {event.id for event in self._event_queue._queue} + # Prevent re-adding events currently in the download/upload queue + downloading_event_ids = {event.id for event in self._downloader.download_queue._queue} + downloading_event_ids.add(self._downloader.current_event) + uploading_event_ids = {event.id for event in self._uploader.upload_queue._queue} + uploading_event_ids.add(self._uploader.current_event) - missing_event_ids = set(unifi_events.keys()) - (db_event_ids | downloading_event_ids) + missing_event_ids = set(unifi_events.keys()) - ( + db_event_ids | downloading_event_ids | uploading_event_ids + ) logger.debug(f" Total undownloaded events: {len(missing_event_ids)}") def wanted_event_type(event_id): @@ -93,7 +104,7 @@ class MissingEventChecker: missing_logger( f" Adding missing event to backup queue: {event.id} ({', '.join(event.smart_detect_types)}) ({event.start.strftime('%Y-%m-%dT%H-%M-%S')} - {event.end.strftime('%Y-%m-%dT%H-%M-%S')})" ) - await self._event_queue.put(event) + await self._download_queue.put(event) except Exception as e: logger.warn(f"Unexpected exception occurred during missing event check:") diff --git a/unifi_protect_backup/unifi_protect_backup.py b/unifi_protect_backup/unifi_protect_backup.py index c5649b1..0c7c0d6 100644 --- a/unifi_protect_backup/unifi_protect_backup.py +++ b/unifi_protect_backup/unifi_protect_backup.py @@ -20,6 +20,7 @@ from unifi_protect_backup.utils import ( run_command, setup_logging, human_readable_size, + VideoQueue, ) logger = logging.getLogger(__name__) @@ -169,21 +170,22 @@ class UnifiProtectBackup: else: self._db = await aiosqlite.connect(self._sqlite_path) - event_queue = asyncio.Queue() + download_queue = asyncio.Queue() + upload_queue = VideoQueue(self._download_buffer_size) # Enable foreign keys in the database await self._db.execute("PRAGMA foreign_keys = ON;") # Create downloader task # This will download video files to its buffer - downloader = VideoDownloader(self._protect, event_queue, buffer_size=self._download_buffer_size) + downloader = VideoDownloader(self._protect, download_queue, upload_queue) tasks.append(asyncio.create_task(downloader.start())) # Create upload task # This will upload the videos in the downloader's buffer to the rclone remotes and log it in the database uploader = VideoUploader( self._protect, - downloader.video_queue, + upload_queue, self.rclone_destination, self.rclone_args, self.file_structure_format, @@ -194,7 +196,7 @@ class UnifiProtectBackup: # Create event listener task # This will connect to the unifi protect websocket and listen for events. When one is detected it will # be added to the queue of events to download - event_listener = EventListener(event_queue, self._protect, self.detection_types, self.ignore_cameras) + event_listener = EventListener(download_queue, self._protect, self.detection_types, self.ignore_cameras) tasks.append(asyncio.create_task(event_listener.start())) # Create purge task @@ -206,7 +208,14 @@ class UnifiProtectBackup: # This will check all the events within the retention period, if any have been missed and not backed up # they will be added to the event queue missing = MissingEventChecker( - self._protect, self._db, event_queue, self.retention, self.detection_types, self.ignore_cameras + self._protect, + self._db, + download_queue, + downloader, + uploader, + self.retention, + self.detection_types, + self.ignore_cameras, ) tasks.append(asyncio.create_task(missing.start())) diff --git a/unifi_protect_backup/uploader.py b/unifi_protect_backup/uploader.py index 42f868c..d44845d 100644 --- a/unifi_protect_backup/uploader.py +++ b/unifi_protect_backup/uploader.py @@ -23,19 +23,20 @@ class VideoUploader: def __init__( self, protect: ProtectApiClient, - video_queue: VideoQueue, + upload_queue: VideoQueue, rclone_destination: str, rclone_args: str, file_structure_format: str, db: aiosqlite.Connection, ): self._protect: ProtectApiClient = protect - self._video_queue: VideoQueue = video_queue + self.upload_queue: VideoQueue = upload_queue self._rclone_destination: str = rclone_destination self._rclone_args: str = rclone_args self._file_structure_format: str = file_structure_format self._db: aiosqlite.Connection = db self.logger = logging.LoggerAdapter(logger, {'event': ''}) + self.current_event = None async def start(self): """Main loop @@ -46,7 +47,9 @@ class VideoUploader: self.logger.info("Starting Uploader") while True: try: - event, video = await self._video_queue.get() + event, video = await self.upload_queue.get() + self.current_event = event + self.logger = logging.LoggerAdapter(logger, {'event': f' [{event.id}]'}) self.logger.info(f"Uploading event: {event.id}")