Compare commits

..

3 Commits

Author SHA1 Message Date
Sebastian Goscik
2bf90b6763 Update readme with parallel downloads 2025-04-09 11:27:45 +01:00
Sebastian Goscik
f275443a7a Fix issue with duplicated logging with parallel loggers 2025-04-09 11:25:34 +01:00
Sebastian Goscik
3a43c1b670 Enable multiple parallel uploaders 2025-04-09 11:25:34 +01:00
5 changed files with 48 additions and 24 deletions

View File

@@ -203,6 +203,7 @@ Options:
--experimental-downloader If set, a new experimental download mechanism will be used to match
what the web UI does. This might be more stable if you are experiencing
a lot of failed downloads with the default downloader. [default: False]
--parallel-uploads INTEGER Max number of parallel uploads to allow [default: 1]
--help Show this message and exit.
```
@@ -230,6 +231,7 @@ always take priority over environment variables):
- `DOWNLOAD_RATELIMIT`
- `MAX_EVENT_LENGTH`
- `EXPERIMENTAL_DOWNLOADER`
- `PARALLEL_UPLOADS`
## File path formatting

View File

@@ -239,6 +239,14 @@ what the web UI does. This might be more stable if you are experiencing
a lot of failed downloads with the default downloader.
""",
)
@click.option(
"--parallel-uploads",
default=1,
show_default=True,
envvar="PARALLEL_UPLOADS",
type=int,
help="Max number of parallel uploads to allow",
)
def main(**kwargs):
"""A Python based tool for backing up Unifi Protect event clips as they occur."""

View File

@@ -25,7 +25,7 @@ class MissingEventChecker:
db: aiosqlite.Connection,
download_queue: asyncio.Queue,
downloader: VideoDownloader,
uploader: VideoUploader,
uploaders: List[VideoUploader],
retention: relativedelta,
detection_types: List[str],
ignore_cameras: List[str],
@@ -39,7 +39,7 @@ class MissingEventChecker:
db (aiosqlite.Connection): Async SQLite database to check for missing events
download_queue (asyncio.Queue): Download queue to check for on-going downloads
downloader (VideoDownloader): Downloader to check for on-going downloads
uploader (VideoUploader): Uploader to check for on-going uploads
uploaders (List[VideoUploader]): Uploaders to check for on-going uploads
retention (relativedelta): Retention period to limit search window
detection_types (List[str]): Detection types wanted to limit search
ignore_cameras (List[str]): Ignored camera IDs to limit search
@@ -50,7 +50,7 @@ class MissingEventChecker:
self._db: aiosqlite.Connection = db
self._download_queue: asyncio.Queue = download_queue
self._downloader: VideoDownloader = downloader
self._uploader: VideoUploader = uploader
self._uploaders: List[VideoUploader] = uploaders
self.retention: relativedelta = retention
self.detection_types: List[str] = detection_types
self.ignore_cameras: List[str] = ignore_cameras
@@ -102,10 +102,11 @@ class MissingEventChecker:
if current_download is not None:
downloading_event_ids.add(current_download.id)
uploading_event_ids = {event.id for event, video in self._uploader.upload_queue._queue} # type: ignore
current_upload = self._uploader.current_event
if current_upload is not None:
uploading_event_ids.add(current_upload.id)
uploading_event_ids = {event.id for event, video in self._downloader.upload_queue._queue} # type: ignore
for uploader in self._uploaders:
current_upload = uploader.current_event
if current_upload is not None:
uploading_event_ids.add(current_upload.id)
missing_event_ids = set(unifi_events.keys()) - (db_event_ids | downloading_event_ids | uploading_event_ids)

View File

@@ -85,6 +85,7 @@ class UnifiProtectBackup:
download_rate_limit: float | None = None,
port: int = 443,
use_experimental_downloader: bool = False,
parallel_uploads: int = 1,
):
"""Will configure logging settings and the Unifi Protect API (but not actually connect).
@@ -117,6 +118,7 @@ class UnifiProtectBackup:
download_rate_limit (float): Limit how events can be downloaded in one minute. Disabled by default",
max_event_length (int): Maximum length in seconds for an event to be considered valid and downloaded
use_experimental_downloader (bool): Use the new experimental downloader (the same method as used by the webUI)
parallel_uploads (int): Max number of parallel uploads to allow
"""
self.color_logging = color_logging
setup_logging(verbose, self.color_logging)
@@ -155,6 +157,7 @@ class UnifiProtectBackup:
logger.debug(f" {download_rate_limit=} events per minute")
logger.debug(f" {max_event_length=}s")
logger.debug(f" {use_experimental_downloader=}")
logger.debug(f" {parallel_uploads=}")
self.rclone_destination = rclone_destination
self.retention = retention
@@ -190,6 +193,7 @@ class UnifiProtectBackup:
self._download_rate_limit = download_rate_limit
self._max_event_length = timedelta(seconds=max_event_length)
self._use_experimental_downloader = use_experimental_downloader
self._parallel_uploads = parallel_uploads
async def start(self):
"""Bootstrap the backup process and kick off the main loop.
@@ -272,18 +276,21 @@ class UnifiProtectBackup:
)
tasks.append(downloader.start())
# Create upload task
# Create upload tasks
# This will upload the videos in the downloader's buffer to the rclone remotes and log it in the database
uploader = VideoUploader(
self._protect,
upload_queue,
self.rclone_destination,
self.rclone_args,
self.file_structure_format,
self._db,
self.color_logging,
)
tasks.append(uploader.start())
uploaders = []
for i in range(self._parallel_uploads):
uploader = VideoUploader(
self._protect,
upload_queue,
self.rclone_destination,
self.rclone_args,
self.file_structure_format,
self._db,
self.color_logging,
)
uploaders.append(uploader)
tasks.append(uploader.start())
# Create event listener task
# This will connect to the unifi protect websocket and listen for events. When one is detected it will
@@ -312,7 +319,7 @@ class UnifiProtectBackup:
self._db,
download_queue,
downloader,
uploader,
uploaders,
self.retention,
self.detection_types,
self.ignore_cameras,

View File

@@ -238,12 +238,18 @@ def setup_logging(verbosity: int, color_logging: bool = False, apprise_notifiers
logger.setLevel(logging.WEBSOCKET_DATA) # type: ignore
_initialized_loggers = []
def setup_event_logger(logger, color_logging):
"""Sets up a logger that also displays the event ID currently being processed."""
format = "{asctime} [{levelname:^11s}] {name:<42} :{event} {message}"
sh = create_logging_handler(format, color_logging)
logger.addHandler(sh)
logger.propagate = False
global _initialized_loggers
if logger not in _initialized_loggers:
format = "{asctime} [{levelname:^11s}] {name:<42} :{event} {message}"
sh = create_logging_handler(format, color_logging)
logger.addHandler(sh)
logger.propagate = False
_initialized_loggers.append(logger)
_suffixes = ["B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"]
@@ -407,7 +413,7 @@ class VideoQueue(asyncio.Queue):
)
while self.full(item):
putter = self._loop.create_future() # type: ignore
putter = self._get_loop().create_future() # type: ignore
self._putters.append(putter) # type: ignore
try:
await putter