From 3a43c1b670c1088c006ced7db0b693ca41caf92d Mon Sep 17 00:00:00 2001 From: Sebastian Goscik Date: Wed, 9 Apr 2025 11:12:23 +0100 Subject: [PATCH] Enable multiple parallel uploaders --- unifi_protect_backup/cli.py | 8 +++++ unifi_protect_backup/missing_event_checker.py | 15 ++++----- .../unifi_protect_backup_core.py | 31 ++++++++++++------- 3 files changed, 35 insertions(+), 19 deletions(-) diff --git a/unifi_protect_backup/cli.py b/unifi_protect_backup/cli.py index 32300d3..4d4d493 100644 --- a/unifi_protect_backup/cli.py +++ b/unifi_protect_backup/cli.py @@ -239,6 +239,14 @@ what the web UI does. This might be more stable if you are experiencing a lot of failed downloads with the default downloader. """, ) +@click.option( + "--parallel-uploads", + default=1, + show_default=True, + envvar="PARALLEL_UPLOADS", + type=int, + help="Max number of parallel uploads to allow", +) def main(**kwargs): """A Python based tool for backing up Unifi Protect event clips as they occur.""" diff --git a/unifi_protect_backup/missing_event_checker.py b/unifi_protect_backup/missing_event_checker.py index 3d1286a..360442c 100644 --- a/unifi_protect_backup/missing_event_checker.py +++ b/unifi_protect_backup/missing_event_checker.py @@ -25,7 +25,7 @@ class MissingEventChecker: db: aiosqlite.Connection, download_queue: asyncio.Queue, downloader: VideoDownloader, - uploader: VideoUploader, + uploaders: List[VideoUploader], retention: relativedelta, detection_types: List[str], ignore_cameras: List[str], @@ -39,7 +39,7 @@ class MissingEventChecker: db (aiosqlite.Connection): Async SQLite database to check for missing events download_queue (asyncio.Queue): Download queue to check for on-going downloads downloader (VideoDownloader): Downloader to check for on-going downloads - uploader (VideoUploader): Uploader to check for on-going uploads + uploaders (List[VideoUploader]): Uploaders to check for on-going uploads retention (relativedelta): Retention period to limit search window detection_types (List[str]): Detection types wanted to limit search ignore_cameras (List[str]): Ignored camera IDs to limit search @@ -50,7 +50,7 @@ class MissingEventChecker: self._db: aiosqlite.Connection = db self._download_queue: asyncio.Queue = download_queue self._downloader: VideoDownloader = downloader - self._uploader: VideoUploader = uploader + self._uploaders: List[VideoUploader] = uploaders self.retention: relativedelta = retention self.detection_types: List[str] = detection_types self.ignore_cameras: List[str] = ignore_cameras @@ -102,10 +102,11 @@ class MissingEventChecker: if current_download is not None: downloading_event_ids.add(current_download.id) - uploading_event_ids = {event.id for event, video in self._uploader.upload_queue._queue} # type: ignore - current_upload = self._uploader.current_event - if current_upload is not None: - uploading_event_ids.add(current_upload.id) + uploading_event_ids = {event.id for event, video in self._downloader.upload_queue._queue} # type: ignore + for uploader in self._uploaders: + current_upload = uploader.current_event + if current_upload is not None: + uploading_event_ids.add(current_upload.id) missing_event_ids = set(unifi_events.keys()) - (db_event_ids | downloading_event_ids | uploading_event_ids) diff --git a/unifi_protect_backup/unifi_protect_backup_core.py b/unifi_protect_backup/unifi_protect_backup_core.py index b31508c..c9b943c 100644 --- a/unifi_protect_backup/unifi_protect_backup_core.py +++ b/unifi_protect_backup/unifi_protect_backup_core.py @@ -85,6 +85,7 @@ class UnifiProtectBackup: download_rate_limit: float | None = None, port: int = 443, use_experimental_downloader: bool = False, + parallel_uploads: int = 1, ): """Will configure logging settings and the Unifi Protect API (but not actually connect). @@ -117,6 +118,7 @@ class UnifiProtectBackup: download_rate_limit (float): Limit how events can be downloaded in one minute. Disabled by default", max_event_length (int): Maximum length in seconds for an event to be considered valid and downloaded use_experimental_downloader (bool): Use the new experimental downloader (the same method as used by the webUI) + parallel_uploads (int): Max number of parallel uploads to allow """ self.color_logging = color_logging setup_logging(verbose, self.color_logging) @@ -155,6 +157,7 @@ class UnifiProtectBackup: logger.debug(f" {download_rate_limit=} events per minute") logger.debug(f" {max_event_length=}s") logger.debug(f" {use_experimental_downloader=}") + logger.debug(f" {parallel_uploads=}") self.rclone_destination = rclone_destination self.retention = retention @@ -190,6 +193,7 @@ class UnifiProtectBackup: self._download_rate_limit = download_rate_limit self._max_event_length = timedelta(seconds=max_event_length) self._use_experimental_downloader = use_experimental_downloader + self._parallel_uploads = parallel_uploads async def start(self): """Bootstrap the backup process and kick off the main loop. @@ -272,18 +276,21 @@ class UnifiProtectBackup: ) tasks.append(downloader.start()) - # Create upload task + # Create upload tasks # This will upload the videos in the downloader's buffer to the rclone remotes and log it in the database - uploader = VideoUploader( - self._protect, - upload_queue, - self.rclone_destination, - self.rclone_args, - self.file_structure_format, - self._db, - self.color_logging, - ) - tasks.append(uploader.start()) + uploaders = [] + for i in range(self._parallel_uploads): + uploader = VideoUploader( + self._protect, + upload_queue, + self.rclone_destination, + self.rclone_args, + self.file_structure_format, + self._db, + self.color_logging, + ) + uploaders.append(uploader) + tasks.append(uploader.start()) # Create event listener task # This will connect to the unifi protect websocket and listen for events. When one is detected it will @@ -312,7 +319,7 @@ class UnifiProtectBackup: self._db, download_queue, downloader, - uploader, + uploaders, self.retention, self.detection_types, self.ignore_cameras,