From be2a1ee92118e77bc5525ad7bf187e01c010bb11 Mon Sep 17 00:00:00 2001 From: Sebastian Goscik Date: Thu, 10 Apr 2025 00:15:43 +0100 Subject: [PATCH] Add a storage quota purger --- README.md | 3 + unifi_protect_backup/__init__.py | 3 +- unifi_protect_backup/cli.py | 6 ++ unifi_protect_backup/purge.py | 97 ++++++++++++++++++- .../unifi_protect_backup_core.py | 18 +++- unifi_protect_backup/uploader.py | 5 + 6 files changed, 129 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 6f96d3f..35c640e 100644 --- a/README.md +++ b/README.md @@ -203,6 +203,8 @@ Options: what the web UI does. This might be more stable if you are experiencing a lot of failed downloads with the default downloader. [default: False] --parallel-uploads INTEGER Max number of parallel uploads to allow [default: 1] + --storage-quota TEXT The maximum amount of storage to use for storing clips (you can + use suffixes like "B", "KiB", "MiB", "GiB") --help Show this message and exit. ``` @@ -231,6 +233,7 @@ always take priority over environment variables): - `MAX_EVENT_LENGTH` - `EXPERIMENTAL_DOWNLOADER` - `PARALLEL_UPLOADS` +- `STORAGE_QUOTA` ## File path formatting diff --git a/unifi_protect_backup/__init__.py b/unifi_protect_backup/__init__.py index 6d23dd4..c1d2ddd 100644 --- a/unifi_protect_backup/__init__.py +++ b/unifi_protect_backup/__init__.py @@ -7,7 +7,7 @@ __version__ = "0.13.1" from .downloader import VideoDownloader from .downloader_experimental import VideoDownloaderExperimental from .event_listener import EventListener -from .purge import Purge +from .purge import Purge, StorageQuotaPurge from .uploader import VideoUploader from .missing_event_checker import MissingEventChecker @@ -16,6 +16,7 @@ __all__ = [ "VideoDownloaderExperimental", "EventListener", "Purge", + "StorageQuotaPurge", "VideoUploader", "MissingEventChecker", ] diff --git a/unifi_protect_backup/cli.py b/unifi_protect_backup/cli.py index 2834df7..aefb782 100644 --- a/unifi_protect_backup/cli.py +++ b/unifi_protect_backup/cli.py @@ -247,6 +247,12 @@ a lot of failed downloads with the default downloader. type=int, help="Max number of parallel uploads to allow", ) +@click.option( + "--storage-quota", + envvar="STORAGE_QUOTA", + help='The maximum amount of storage to use for storing clips (you can use suffixes like "B", "KiB", "MiB", "GiB")', + callback=lambda ctx, param, value: int(human_readable_to_float(value)) if value is not None else None, +) def main(**kwargs): """Python based tool for backing up Unifi Protect event clips as they occur.""" try: diff --git a/unifi_protect_backup/purge.py b/unifi_protect_backup/purge.py index a52a340..b0771cf 100644 --- a/unifi_protect_backup/purge.py +++ b/unifi_protect_backup/purge.py @@ -3,11 +3,13 @@ import logging import time from datetime import datetime +import json +import asyncio import aiosqlite from dateutil.relativedelta import relativedelta -from unifi_protect_backup.utils import run_command, wait_until +from unifi_protect_backup.utils import run_command, wait_until, human_readable_size logger = logging.getLogger(__name__) @@ -88,3 +90,96 @@ class Purge: next_purge_time = datetime.now() + self.interval logger.extra_debug(f"sleeping until {next_purge_time}") await wait_until(next_purge_time) + + +async def get_utilisation(rclone_destination): + """Get storage utilisation of rclone destination. + + Args: + rclone_destination (str): What rclone destination the clips are stored in + + """ + returncode, stdout, stderr = await run_command(f"rclone size {rclone_destination} --json") + if returncode != 0: + logger.error(f" Failed to get size of: '{rclone_destination}'") + return json.loads(stdout)["bytes"] + + +class StorageQuotaPurge: + """Enforces maximum storage ultisation qutoa.""" + + def __init__( + self, + db: aiosqlite.Connection, + quota: int, + upload_event: asyncio.Event, + rclone_destination: str, + rclone_purge_args: str = "", + ): + """Init.""" + self._db = db + self.quota = quota + self._upload_event = upload_event + self.rclone_destination = rclone_destination + self.rclone_purge_args = rclone_purge_args + + async def start(self): + """Run main loop.""" + while True: + try: + # Wait for the uploaders to tell us there has been an upload + await self._upload_event.wait() + + deleted_a_file = False + + # While we exceed the storage quota + utilisation = await get_utilisation(self.rclone_destination) + while utilisation > self.quota: + # Get the oldest event + async with self._db.execute("SELECT id FROM events ORDER BY end ASC LIMIT 1") as event_cursor: + row = await event_cursor.fetchone() + if row is None: + logger.warning( + "Storage quota exceeded, but there are no events in the database" + " - Do you have stray files?" + ) + break + event_id = row[0] + + if ( + not deleted_a_file + ): # Only show this message once when the quota is exceeded till we drop below it again + logger.info( + f"Storage quota {human_readable_size(utilisation)}/{human_readable_size(self.quota)} " + "exceeded, purging oldest events" + ) + + # Get all the backups for this event + async with self._db.execute(f"SELECT * FROM backups WHERE id = '{event_id}'") as backup_cursor: + # Delete them + async for _, remote, file_path in backup_cursor: + logger.debug(f" Deleted: {remote}:{file_path}") + await delete_file(f"{remote}:{file_path}", self.rclone_purge_args) + deleted_a_file = True + + # delete event from database + # entries in the `backups` table are automatically deleted by sqlite triggers + await self._db.execute(f"DELETE FROM events WHERE id = '{event_id}'") + await self._db.commit() + + utilisation = await get_utilisation(self.rclone_destination) + logger.debug( + f"Storage utlisation: {human_readable_size(utilisation)}/{human_readable_size(self.quota)}" + ) + + if deleted_a_file: + await tidy_empty_dirs(self.rclone_destination) + logger.info( + "Storage utlisation back below quota limit: " + f"{human_readable_size(utilisation)}/{human_readable_size(self.quota)}" + ) + + self._upload_event.clear() + + except Exception as e: + logger.error("Unexpected exception occurred during purge:", exc_info=e) diff --git a/unifi_protect_backup/unifi_protect_backup_core.py b/unifi_protect_backup/unifi_protect_backup_core.py index 4762f56..2e898f4 100644 --- a/unifi_protect_backup/unifi_protect_backup_core.py +++ b/unifi_protect_backup/unifi_protect_backup_core.py @@ -16,6 +16,7 @@ from unifi_protect_backup import ( EventListener, MissingEventChecker, Purge, + StorageQuotaPurge, VideoDownloader, VideoDownloaderExperimental, VideoUploader, @@ -86,6 +87,7 @@ class UnifiProtectBackup: port: int = 443, use_experimental_downloader: bool = False, parallel_uploads: int = 1, + storage_quota: int | None = None, ): """Will configure logging settings and the Unifi Protect API (but not actually connect). @@ -120,6 +122,8 @@ class UnifiProtectBackup: use_experimental_downloader (bool): Use the new experimental downloader (the same method as used by the webUI) parallel_uploads (int): Max number of parallel uploads to allow + storage_quota (int): Maximum storage utilisation in bytes + """ self.color_logging = color_logging setup_logging(verbose, self.color_logging) @@ -159,6 +163,7 @@ class UnifiProtectBackup: logger.debug(f" {max_event_length=}s") logger.debug(f" {use_experimental_downloader=}") logger.debug(f" {parallel_uploads=}") + logger.debug(f" {storage_quota=}") self.rclone_destination = rclone_destination self.retention = retention @@ -195,6 +200,7 @@ class UnifiProtectBackup: self._max_event_length = timedelta(seconds=max_event_length) self._use_experimental_downloader = use_experimental_downloader self._parallel_uploads = parallel_uploads + self._storage_quota = storage_quota async def start(self): """Bootstrap the backup process and kick off the main loop. @@ -302,7 +308,7 @@ class UnifiProtectBackup: tasks.append(event_listener.start()) # Create purge task - # This will, every midnight, purge old backups from the rclone remotes and database + # This will, every _purge_interval, purge old backups from the rclone remotes and database purge = Purge( self._db, self.retention, @@ -312,6 +318,16 @@ class UnifiProtectBackup: ) tasks.append(purge.start()) + if self._storage_quota is not None: + storage_quota_purger = StorageQuotaPurge( + self._db, + self._storage_quota, + uploader.upload_signal, + self.rclone_destination, + self.rclone_purge_args, + ) + tasks.append(storage_quota_purger.start()) + # Create missing event task # This will check all the events within the retention period, if any have been missed and not backed up # they will be added to the event queue diff --git a/unifi_protect_backup/uploader.py b/unifi_protect_backup/uploader.py index 1d2c337..1969019 100644 --- a/unifi_protect_backup/uploader.py +++ b/unifi_protect_backup/uploader.py @@ -4,6 +4,7 @@ import logging import pathlib import re from datetime import datetime +import asyncio import aiosqlite from uiprotect import ProtectApiClient @@ -45,6 +46,8 @@ class VideoUploader: file_structure_format (str): format string for how to structure the uploaded files db (aiosqlite.Connection): Async SQlite database connection color_logging (bool): Whether or not to add color to logging output + upload_signal (asyncio.Event): Set by the uploader to signal an upload has occured + """ self._protect: ProtectApiClient = protect self.upload_queue: VideoQueue = upload_queue @@ -53,6 +56,7 @@ class VideoUploader: self._file_structure_format: str = file_structure_format self._db: aiosqlite.Connection = db self.current_event = None + self._upload_signal = asyncio.Event() self.base_logger = logging.getLogger(__name__) setup_event_logger(self.base_logger, color_logging) @@ -84,6 +88,7 @@ class VideoUploader: try: await self._upload_video(video, destination, self._rclone_args) await self._update_database(event, destination) + self._upload_signal.set() self.logger.debug("Uploaded") except SubprocessException: self.logger.error(f" Failed to upload file: '{destination}'")