Add a storage quota purger

This commit is contained in:
Sebastian Goscik
2025-04-10 00:15:43 +01:00
parent ef06d2a4d4
commit be2a1ee921
6 changed files with 129 additions and 3 deletions

View File

@@ -203,6 +203,8 @@ Options:
what the web UI does. This might be more stable if you are experiencing
a lot of failed downloads with the default downloader. [default: False]
--parallel-uploads INTEGER Max number of parallel uploads to allow [default: 1]
--storage-quota TEXT The maximum amount of storage to use for storing clips (you can
use suffixes like "B", "KiB", "MiB", "GiB")
--help Show this message and exit.
```
@@ -231,6 +233,7 @@ always take priority over environment variables):
- `MAX_EVENT_LENGTH`
- `EXPERIMENTAL_DOWNLOADER`
- `PARALLEL_UPLOADS`
- `STORAGE_QUOTA`
## File path formatting

View File

@@ -7,7 +7,7 @@ __version__ = "0.13.1"
from .downloader import VideoDownloader
from .downloader_experimental import VideoDownloaderExperimental
from .event_listener import EventListener
from .purge import Purge
from .purge import Purge, StorageQuotaPurge
from .uploader import VideoUploader
from .missing_event_checker import MissingEventChecker
@@ -16,6 +16,7 @@ __all__ = [
"VideoDownloaderExperimental",
"EventListener",
"Purge",
"StorageQuotaPurge",
"VideoUploader",
"MissingEventChecker",
]

View File

@@ -247,6 +247,12 @@ a lot of failed downloads with the default downloader.
type=int,
help="Max number of parallel uploads to allow",
)
@click.option(
"--storage-quota",
envvar="STORAGE_QUOTA",
help='The maximum amount of storage to use for storing clips (you can use suffixes like "B", "KiB", "MiB", "GiB")',
callback=lambda ctx, param, value: int(human_readable_to_float(value)) if value is not None else None,
)
def main(**kwargs):
"""Python based tool for backing up Unifi Protect event clips as they occur."""
try:

View File

@@ -3,11 +3,13 @@
import logging
import time
from datetime import datetime
import json
import asyncio
import aiosqlite
from dateutil.relativedelta import relativedelta
from unifi_protect_backup.utils import run_command, wait_until
from unifi_protect_backup.utils import run_command, wait_until, human_readable_size
logger = logging.getLogger(__name__)
@@ -88,3 +90,96 @@ class Purge:
next_purge_time = datetime.now() + self.interval
logger.extra_debug(f"sleeping until {next_purge_time}")
await wait_until(next_purge_time)
async def get_utilisation(rclone_destination):
"""Get storage utilisation of rclone destination.
Args:
rclone_destination (str): What rclone destination the clips are stored in
"""
returncode, stdout, stderr = await run_command(f"rclone size {rclone_destination} --json")
if returncode != 0:
logger.error(f" Failed to get size of: '{rclone_destination}'")
return json.loads(stdout)["bytes"]
class StorageQuotaPurge:
"""Enforces maximum storage ultisation qutoa."""
def __init__(
self,
db: aiosqlite.Connection,
quota: int,
upload_event: asyncio.Event,
rclone_destination: str,
rclone_purge_args: str = "",
):
"""Init."""
self._db = db
self.quota = quota
self._upload_event = upload_event
self.rclone_destination = rclone_destination
self.rclone_purge_args = rclone_purge_args
async def start(self):
"""Run main loop."""
while True:
try:
# Wait for the uploaders to tell us there has been an upload
await self._upload_event.wait()
deleted_a_file = False
# While we exceed the storage quota
utilisation = await get_utilisation(self.rclone_destination)
while utilisation > self.quota:
# Get the oldest event
async with self._db.execute("SELECT id FROM events ORDER BY end ASC LIMIT 1") as event_cursor:
row = await event_cursor.fetchone()
if row is None:
logger.warning(
"Storage quota exceeded, but there are no events in the database"
" - Do you have stray files?"
)
break
event_id = row[0]
if (
not deleted_a_file
): # Only show this message once when the quota is exceeded till we drop below it again
logger.info(
f"Storage quota {human_readable_size(utilisation)}/{human_readable_size(self.quota)} "
"exceeded, purging oldest events"
)
# Get all the backups for this event
async with self._db.execute(f"SELECT * FROM backups WHERE id = '{event_id}'") as backup_cursor:
# Delete them
async for _, remote, file_path in backup_cursor:
logger.debug(f" Deleted: {remote}:{file_path}")
await delete_file(f"{remote}:{file_path}", self.rclone_purge_args)
deleted_a_file = True
# delete event from database
# entries in the `backups` table are automatically deleted by sqlite triggers
await self._db.execute(f"DELETE FROM events WHERE id = '{event_id}'")
await self._db.commit()
utilisation = await get_utilisation(self.rclone_destination)
logger.debug(
f"Storage utlisation: {human_readable_size(utilisation)}/{human_readable_size(self.quota)}"
)
if deleted_a_file:
await tidy_empty_dirs(self.rclone_destination)
logger.info(
"Storage utlisation back below quota limit: "
f"{human_readable_size(utilisation)}/{human_readable_size(self.quota)}"
)
self._upload_event.clear()
except Exception as e:
logger.error("Unexpected exception occurred during purge:", exc_info=e)

View File

@@ -16,6 +16,7 @@ from unifi_protect_backup import (
EventListener,
MissingEventChecker,
Purge,
StorageQuotaPurge,
VideoDownloader,
VideoDownloaderExperimental,
VideoUploader,
@@ -86,6 +87,7 @@ class UnifiProtectBackup:
port: int = 443,
use_experimental_downloader: bool = False,
parallel_uploads: int = 1,
storage_quota: int | None = None,
):
"""Will configure logging settings and the Unifi Protect API (but not actually connect).
@@ -120,6 +122,8 @@ class UnifiProtectBackup:
use_experimental_downloader (bool): Use the new experimental downloader (the same method as used by the
webUI)
parallel_uploads (int): Max number of parallel uploads to allow
storage_quota (int): Maximum storage utilisation in bytes
"""
self.color_logging = color_logging
setup_logging(verbose, self.color_logging)
@@ -159,6 +163,7 @@ class UnifiProtectBackup:
logger.debug(f" {max_event_length=}s")
logger.debug(f" {use_experimental_downloader=}")
logger.debug(f" {parallel_uploads=}")
logger.debug(f" {storage_quota=}")
self.rclone_destination = rclone_destination
self.retention = retention
@@ -195,6 +200,7 @@ class UnifiProtectBackup:
self._max_event_length = timedelta(seconds=max_event_length)
self._use_experimental_downloader = use_experimental_downloader
self._parallel_uploads = parallel_uploads
self._storage_quota = storage_quota
async def start(self):
"""Bootstrap the backup process and kick off the main loop.
@@ -302,7 +308,7 @@ class UnifiProtectBackup:
tasks.append(event_listener.start())
# Create purge task
# This will, every midnight, purge old backups from the rclone remotes and database
# This will, every _purge_interval, purge old backups from the rclone remotes and database
purge = Purge(
self._db,
self.retention,
@@ -312,6 +318,16 @@ class UnifiProtectBackup:
)
tasks.append(purge.start())
if self._storage_quota is not None:
storage_quota_purger = StorageQuotaPurge(
self._db,
self._storage_quota,
uploader.upload_signal,
self.rclone_destination,
self.rclone_purge_args,
)
tasks.append(storage_quota_purger.start())
# Create missing event task
# This will check all the events within the retention period, if any have been missed and not backed up
# they will be added to the event queue

View File

@@ -4,6 +4,7 @@ import logging
import pathlib
import re
from datetime import datetime
import asyncio
import aiosqlite
from uiprotect import ProtectApiClient
@@ -45,6 +46,8 @@ class VideoUploader:
file_structure_format (str): format string for how to structure the uploaded files
db (aiosqlite.Connection): Async SQlite database connection
color_logging (bool): Whether or not to add color to logging output
upload_signal (asyncio.Event): Set by the uploader to signal an upload has occured
"""
self._protect: ProtectApiClient = protect
self.upload_queue: VideoQueue = upload_queue
@@ -53,6 +56,7 @@ class VideoUploader:
self._file_structure_format: str = file_structure_format
self._db: aiosqlite.Connection = db
self.current_event = None
self._upload_signal = asyncio.Event()
self.base_logger = logging.getLogger(__name__)
setup_event_logger(self.base_logger, color_logging)
@@ -84,6 +88,7 @@ class VideoUploader:
try:
await self._upload_video(video, destination, self._rclone_args)
await self._update_database(event, destination)
self._upload_signal.set()
self.logger.debug("Uploaded")
except SubprocessException:
self.logger.error(f" Failed to upload file: '{destination}'")