Updates for rclone parallel uploads

This commit is contained in:
Radhakrishnan Sethuraman
2024-11-18 17:27:29 -06:00
parent c067dbd9f7
commit aa75ec6f97
4 changed files with 52 additions and 30 deletions

View File

@@ -226,6 +226,13 @@ what the web UI does. This might be more stable if you are experiencing
a lot of failed downloads with the default downloader. a lot of failed downloads with the default downloader.
""", """,
) )
@click.option(
"--rclone-parallel-uploads",
default=1,
show_default=True,
envvar="RCLONE_PARALLEL_UPLOADS",
help="Number of parallel uploads",
)
def main(**kwargs): def main(**kwargs):
"""A Python based tool for backing up Unifi Protect event clips as they occur.""" """A Python based tool for backing up Unifi Protect event clips as they occur."""
event_listener = UnifiProtectBackup(**kwargs) event_listener = UnifiProtectBackup(**kwargs)

View File

@@ -97,9 +97,9 @@ class MissingEventChecker:
downloading_event_ids.add(current_download.id) downloading_event_ids.add(current_download.id)
uploading_event_ids = {event.id for event, video in self._uploader.upload_queue._queue} # type: ignore uploading_event_ids = {event.id for event, video in self._uploader.upload_queue._queue} # type: ignore
current_upload = self._uploader.current_event for current_upload in self._uploader.current_events:
if current_upload is not None: if current_upload is not None:
uploading_event_ids.add(current_upload.id) uploading_event_ids.add(current_upload.id)
missing_event_ids = set(unifi_events.keys()) - (db_event_ids | downloading_event_ids | uploading_event_ids) missing_event_ids = set(unifi_events.keys()) - (db_event_ids | downloading_event_ids | uploading_event_ids)

View File

@@ -76,6 +76,7 @@ class UnifiProtectBackup:
download_rate_limit: float | None = None, download_rate_limit: float | None = None,
port: int = 443, port: int = 443,
use_experimental_downloader: bool = False, use_experimental_downloader: bool = False,
rclone_parallel_uploads: int = 1,
): ):
"""Will configure logging settings and the Unifi Protect API (but not actually connect). """Will configure logging settings and the Unifi Protect API (but not actually connect).
@@ -270,6 +271,7 @@ class UnifiProtectBackup:
self.file_structure_format, self.file_structure_format,
self._db, self._db,
self.color_logging, self.color_logging,
self.rclone_parallel_uploads
) )
tasks.append(uploader.start()) tasks.append(uploader.start())

View File

@@ -4,7 +4,8 @@ import logging
import pathlib import pathlib
import re import re
from datetime import datetime from datetime import datetime
import os
import asyncio
import aiosqlite import aiosqlite
from uiprotect import ProtectApiClient from uiprotect import ProtectApiClient
from uiprotect.data.nvr import Event from uiprotect.data.nvr import Event
@@ -34,6 +35,7 @@ class VideoUploader:
file_structure_format: str, file_structure_format: str,
db: aiosqlite.Connection, db: aiosqlite.Connection,
color_logging: bool, color_logging: bool,
rclone_parallel_uploads: int
): ):
"""Init. """Init.
@@ -53,11 +55,42 @@ class VideoUploader:
self._file_structure_format: str = file_structure_format self._file_structure_format: str = file_structure_format
self._db: aiosqlite.Connection = db self._db: aiosqlite.Connection = db
self.current_event = None self.current_event = None
self.rclone_parallel_uploads = rclone_parallel_uploads
self.base_logger = logging.getLogger(__name__) self.base_logger = logging.getLogger(__name__)
setup_event_logger(self.base_logger, color_logging) setup_event_logger(self.base_logger, color_logging)
self.logger = logging.LoggerAdapter(self.base_logger, {"event": ""}) self.logger = logging.LoggerAdapter(self.base_logger, {"event": ""})
async def _upload_worker(self, semaphore, worker_id):
async with semaphore:
while True:
try:
event, video = await self.upload_queue.get()
self.current_events[worker_id] = event
logger = logging.LoggerAdapter(self.base_logger, {'event': f' [{event.id}]'})
logger.info(f"Uploading event: {event.id}")
logger.debug(
f" Remaining Upload Queue: {self.upload_queue.qsize_files()}"
f" ({human_readable_size(self.upload_queue.qsize())})"
)
destination = await self._generate_file_path(event)
logger.debug(f" Destination: {destination}")
try:
await self._upload_video(video, destination, self._rclone_args)
await self._update_database(event, destination)
logger.debug("Uploaded")
except SubprocessException:
logger.error(f" Failed to upload file: '{destination}'")
except Exception as e:
logger.error(f"Unexpected exception occurred, abandoning event {event.id}:", exc_info=e)
self.current_events[worker_id] = None
async def start(self): async def start(self):
"""Main loop. """Main loop.
@@ -65,33 +98,13 @@ class VideoUploader:
using rclone, finally it updates the database using rclone, finally it updates the database
""" """
self.logger.info("Starting Uploader") self.logger.info("Starting Uploader")
while True:
try: rclone_transfers = self.rclone_parallel_uploads
event, video = await self.upload_queue.get() self.current_events = [None] * rclone_transfers
self.current_event = event semaphore = asyncio.Semaphore(rclone_transfers)
self.logger = logging.LoggerAdapter(self.base_logger, {"event": f" [{event.id}]"}) workers = [self._upload_worker(semaphore, i) for i in range(rclone_transfers)]
await asyncio.gather(*workers)
self.logger.info(f"Uploading event: {event.id}")
self.logger.debug(
f" Remaining Upload Queue: {self.upload_queue.qsize_files()}"
f" ({human_readable_size(self.upload_queue.qsize())})"
)
destination = await self._generate_file_path(event)
self.logger.debug(f" Destination: {destination}")
try:
await self._upload_video(video, destination, self._rclone_args)
await self._update_database(event, destination)
self.logger.debug("Uploaded")
except SubprocessException:
self.logger.error(f" Failed to upload file: '{destination}'")
self.current_event = None
except Exception as e:
self.logger.error(f"Unexpected exception occurred, abandoning event {event.id}:", exc_info=e)
async def _upload_video(self, video: bytes, destination: pathlib.Path, rclone_args: str): async def _upload_video(self, video: bytes, destination: pathlib.Path, rclone_args: str):
"""Upload video using rclone. """Upload video using rclone.