Enable multiple parallel uploaders

This commit is contained in:
Sebastian Goscik
2025-04-09 11:12:23 +01:00
parent e0421c1dd1
commit 3a43c1b670
3 changed files with 35 additions and 19 deletions

View File

@@ -239,6 +239,14 @@ what the web UI does. This might be more stable if you are experiencing
a lot of failed downloads with the default downloader.
""",
)
@click.option(
"--parallel-uploads",
default=1,
show_default=True,
envvar="PARALLEL_UPLOADS",
type=int,
help="Max number of parallel uploads to allow",
)
def main(**kwargs):
"""A Python based tool for backing up Unifi Protect event clips as they occur."""

View File

@@ -25,7 +25,7 @@ class MissingEventChecker:
db: aiosqlite.Connection,
download_queue: asyncio.Queue,
downloader: VideoDownloader,
uploader: VideoUploader,
uploaders: List[VideoUploader],
retention: relativedelta,
detection_types: List[str],
ignore_cameras: List[str],
@@ -39,7 +39,7 @@ class MissingEventChecker:
db (aiosqlite.Connection): Async SQLite database to check for missing events
download_queue (asyncio.Queue): Download queue to check for on-going downloads
downloader (VideoDownloader): Downloader to check for on-going downloads
uploader (VideoUploader): Uploader to check for on-going uploads
uploaders (List[VideoUploader]): Uploaders to check for on-going uploads
retention (relativedelta): Retention period to limit search window
detection_types (List[str]): Detection types wanted to limit search
ignore_cameras (List[str]): Ignored camera IDs to limit search
@@ -50,7 +50,7 @@ class MissingEventChecker:
self._db: aiosqlite.Connection = db
self._download_queue: asyncio.Queue = download_queue
self._downloader: VideoDownloader = downloader
self._uploader: VideoUploader = uploader
self._uploaders: List[VideoUploader] = uploaders
self.retention: relativedelta = retention
self.detection_types: List[str] = detection_types
self.ignore_cameras: List[str] = ignore_cameras
@@ -102,10 +102,11 @@ class MissingEventChecker:
if current_download is not None:
downloading_event_ids.add(current_download.id)
uploading_event_ids = {event.id for event, video in self._uploader.upload_queue._queue} # type: ignore
current_upload = self._uploader.current_event
if current_upload is not None:
uploading_event_ids.add(current_upload.id)
uploading_event_ids = {event.id for event, video in self._downloader.upload_queue._queue} # type: ignore
for uploader in self._uploaders:
current_upload = uploader.current_event
if current_upload is not None:
uploading_event_ids.add(current_upload.id)
missing_event_ids = set(unifi_events.keys()) - (db_event_ids | downloading_event_ids | uploading_event_ids)

View File

@@ -85,6 +85,7 @@ class UnifiProtectBackup:
download_rate_limit: float | None = None,
port: int = 443,
use_experimental_downloader: bool = False,
parallel_uploads: int = 1,
):
"""Will configure logging settings and the Unifi Protect API (but not actually connect).
@@ -117,6 +118,7 @@ class UnifiProtectBackup:
download_rate_limit (float): Limit how events can be downloaded in one minute. Disabled by default",
max_event_length (int): Maximum length in seconds for an event to be considered valid and downloaded
use_experimental_downloader (bool): Use the new experimental downloader (the same method as used by the webUI)
parallel_uploads (int): Max number of parallel uploads to allow
"""
self.color_logging = color_logging
setup_logging(verbose, self.color_logging)
@@ -155,6 +157,7 @@ class UnifiProtectBackup:
logger.debug(f" {download_rate_limit=} events per minute")
logger.debug(f" {max_event_length=}s")
logger.debug(f" {use_experimental_downloader=}")
logger.debug(f" {parallel_uploads=}")
self.rclone_destination = rclone_destination
self.retention = retention
@@ -190,6 +193,7 @@ class UnifiProtectBackup:
self._download_rate_limit = download_rate_limit
self._max_event_length = timedelta(seconds=max_event_length)
self._use_experimental_downloader = use_experimental_downloader
self._parallel_uploads = parallel_uploads
async def start(self):
"""Bootstrap the backup process and kick off the main loop.
@@ -272,18 +276,21 @@ class UnifiProtectBackup:
)
tasks.append(downloader.start())
# Create upload task
# Create upload tasks
# This will upload the videos in the downloader's buffer to the rclone remotes and log it in the database
uploader = VideoUploader(
self._protect,
upload_queue,
self.rclone_destination,
self.rclone_args,
self.file_structure_format,
self._db,
self.color_logging,
)
tasks.append(uploader.start())
uploaders = []
for i in range(self._parallel_uploads):
uploader = VideoUploader(
self._protect,
upload_queue,
self.rclone_destination,
self.rclone_args,
self.file_structure_format,
self._db,
self.color_logging,
)
uploaders.append(uploader)
tasks.append(uploader.start())
# Create event listener task
# This will connect to the unifi protect websocket and listen for events. When one is detected it will
@@ -312,7 +319,7 @@ class UnifiProtectBackup:
self._db,
download_queue,
downloader,
uploader,
uploaders,
self.retention,
self.detection_types,
self.ignore_cameras,