diff --git a/unifi_protect_backup/cli.py b/unifi_protect_backup/cli.py index 1cdba14..063ca83 100644 --- a/unifi_protect_backup/cli.py +++ b/unifi_protect_backup/cli.py @@ -226,6 +226,13 @@ what the web UI does. This might be more stable if you are experiencing a lot of failed downloads with the default downloader. """, ) +@click.option( + "--rclone-parallel-uploads", + default=1, + show_default=True, + envvar="RCLONE_PARALLEL_UPLOADS", + help="Number of parallel uploads", +) def main(**kwargs): """A Python based tool for backing up Unifi Protect event clips as they occur.""" event_listener = UnifiProtectBackup(**kwargs) diff --git a/unifi_protect_backup/missing_event_checker.py b/unifi_protect_backup/missing_event_checker.py index 6c245f4..fb0829f 100644 --- a/unifi_protect_backup/missing_event_checker.py +++ b/unifi_protect_backup/missing_event_checker.py @@ -97,9 +97,9 @@ class MissingEventChecker: downloading_event_ids.add(current_download.id) uploading_event_ids = {event.id for event, video in self._uploader.upload_queue._queue} # type: ignore - current_upload = self._uploader.current_event - if current_upload is not None: - uploading_event_ids.add(current_upload.id) + for current_upload in self._uploader.current_events: + if current_upload is not None: + uploading_event_ids.add(current_upload.id) missing_event_ids = set(unifi_events.keys()) - (db_event_ids | downloading_event_ids | uploading_event_ids) diff --git a/unifi_protect_backup/unifi_protect_backup_core.py b/unifi_protect_backup/unifi_protect_backup_core.py index 18715ba..b1a6179 100644 --- a/unifi_protect_backup/unifi_protect_backup_core.py +++ b/unifi_protect_backup/unifi_protect_backup_core.py @@ -76,6 +76,7 @@ class UnifiProtectBackup: download_rate_limit: float | None = None, port: int = 443, use_experimental_downloader: bool = False, + rclone_parallel_uploads: int = 1, ): """Will configure logging settings and the Unifi Protect API (but not actually connect). @@ -270,6 +271,7 @@ class UnifiProtectBackup: self.file_structure_format, self._db, self.color_logging, + self.rclone_parallel_uploads ) tasks.append(uploader.start()) diff --git a/unifi_protect_backup/uploader.py b/unifi_protect_backup/uploader.py index 2a860bc..b5df6fc 100644 --- a/unifi_protect_backup/uploader.py +++ b/unifi_protect_backup/uploader.py @@ -4,7 +4,8 @@ import logging import pathlib import re from datetime import datetime - +import os +import asyncio import aiosqlite from uiprotect import ProtectApiClient from uiprotect.data.nvr import Event @@ -34,6 +35,7 @@ class VideoUploader: file_structure_format: str, db: aiosqlite.Connection, color_logging: bool, + rclone_parallel_uploads: int ): """Init. @@ -53,11 +55,42 @@ class VideoUploader: self._file_structure_format: str = file_structure_format self._db: aiosqlite.Connection = db self.current_event = None + self.rclone_parallel_uploads = rclone_parallel_uploads self.base_logger = logging.getLogger(__name__) setup_event_logger(self.base_logger, color_logging) self.logger = logging.LoggerAdapter(self.base_logger, {"event": ""}) + async def _upload_worker(self, semaphore, worker_id): + async with semaphore: + while True: + try: + event, video = await self.upload_queue.get() + self.current_events[worker_id] = event + + logger = logging.LoggerAdapter(self.base_logger, {'event': f' [{event.id}]'}) + + logger.info(f"Uploading event: {event.id}") + logger.debug( + f" Remaining Upload Queue: {self.upload_queue.qsize_files()}" + f" ({human_readable_size(self.upload_queue.qsize())})" + ) + + destination = await self._generate_file_path(event) + logger.debug(f" Destination: {destination}") + + try: + await self._upload_video(video, destination, self._rclone_args) + await self._update_database(event, destination) + logger.debug("Uploaded") + except SubprocessException: + logger.error(f" Failed to upload file: '{destination}'") + + except Exception as e: + logger.error(f"Unexpected exception occurred, abandoning event {event.id}:", exc_info=e) + + self.current_events[worker_id] = None + async def start(self): """Main loop. @@ -65,33 +98,13 @@ class VideoUploader: using rclone, finally it updates the database """ self.logger.info("Starting Uploader") - while True: - try: - event, video = await self.upload_queue.get() - self.current_event = event + + rclone_transfers = self.rclone_parallel_uploads + self.current_events = [None] * rclone_transfers + semaphore = asyncio.Semaphore(rclone_transfers) - self.logger = logging.LoggerAdapter(self.base_logger, {"event": f" [{event.id}]"}) - - self.logger.info(f"Uploading event: {event.id}") - self.logger.debug( - f" Remaining Upload Queue: {self.upload_queue.qsize_files()}" - f" ({human_readable_size(self.upload_queue.qsize())})" - ) - - destination = await self._generate_file_path(event) - self.logger.debug(f" Destination: {destination}") - - try: - await self._upload_video(video, destination, self._rclone_args) - await self._update_database(event, destination) - self.logger.debug("Uploaded") - except SubprocessException: - self.logger.error(f" Failed to upload file: '{destination}'") - - self.current_event = None - - except Exception as e: - self.logger.error(f"Unexpected exception occurred, abandoning event {event.id}:", exc_info=e) + workers = [self._upload_worker(semaphore, i) for i in range(rclone_transfers)] + await asyncio.gather(*workers) async def _upload_video(self, video: bytes, destination: pathlib.Path, rclone_args: str): """Upload video using rclone.