From 66b3344e29707ec209737aca40dddd1729bf792a Mon Sep 17 00:00:00 2001 From: Sebastian Goscik Date: Sat, 8 Jul 2023 20:57:43 +0100 Subject: [PATCH] Add download rate limiter --- README.md | 3 +++ poetry.lock | 14 +++++++++++++- pyproject.toml | 1 + unifi_protect_backup/cli.py | 8 ++++++++ unifi_protect_backup/downloader.py | 9 +++++++++ unifi_protect_backup/unifi_protect_backup_core.py | 10 ++++++++-- 6 files changed, 42 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 5e32a92..b06f606 100644 --- a/README.md +++ b/README.md @@ -189,6 +189,8 @@ Options: https://github.com/caronc/apprise --skip-missing If set, events which are 'missing' at the start will be ignored. Subsequent missing events will be downloaded (e.g. a missed event) [default: False] + --download-rate-limit FLOAT Limit how events can be downloaded in one minute. Disabled by + default --help Show this message and exit. ``` @@ -212,6 +214,7 @@ always take priority over environment variables): - `PURGE_INTERVAL` - `APPRISE_NOTIFIERS` - `SKIP_MISSING` +- `DOWNLOAD_RATELIMIT` ## File path formatting diff --git a/poetry.lock b/poetry.lock index 4c8f130..432de2d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -121,6 +121,18 @@ yarl = ">=1.0,<2.0" [package.extras] speedups = ["Brotli", "aiodns", "cchardet"] +[[package]] +name = "aiolimiter" +version = "1.1.0" +description = "asyncio rate limiter, a leaky bucket implementation" +category = "main" +optional = false +python-versions = ">=3.7,<4.0" +files = [ + {file = "aiolimiter-1.1.0-py3-none-any.whl", hash = "sha256:0b4997961fc58b8df40279e739f9cf0d3e255e63e9a44f64df567a8c17241e24"}, + {file = "aiolimiter-1.1.0.tar.gz", hash = "sha256:461cf02f82a29347340d031626c92853645c099cb5ff85577b831a7bd21132b5"}, +] + [[package]] name = "aiorun" version = "2022.11.1" @@ -2436,4 +2448,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more [metadata] lock-version = "2.0" python-versions = ">=3.9.0,<4.0" -content-hash = "42a25af6210e11892bc9e94fb4a4565e06f9dfe9239a8f277f4970e85de72218" +content-hash = "d090a2822defef7c6dacec8b826d80f01718abe4f7dacebf301c119c98fd98b8" diff --git a/pyproject.toml b/pyproject.toml index 855583f..0ac2b9b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ python-dateutil = "^2.8.2" apprise = "^1.3.0" expiring-dict = "^1.1.0" async-lru = "^2.0.3" +aiolimiter = "^1.1.0" [tool.poetry.group.dev] optional = true diff --git a/unifi_protect_backup/cli.py b/unifi_protect_backup/cli.py index 3ce1380..3fb8c89 100644 --- a/unifi_protect_backup/cli.py +++ b/unifi_protect_backup/cli.py @@ -195,6 +195,14 @@ If set, events which are 'missing' at the start will be ignored. Subsequent missing events will be downloaded (e.g. a missed event) """, ) +@click.option( + '--download-rate-limit', + default=None, + show_default=True, + envvar='DOWNLOAD_RATELIMIT', + type=float, + help="Limit how events can be downloaded in one minute. Disabled by default", +) def main(**kwargs): """A Python based tool for backing up Unifi Protect event clips as they occur.""" event_listener = UnifiProtectBackup(**kwargs) diff --git a/unifi_protect_backup/downloader.py b/unifi_protect_backup/downloader.py index 67af7b9..ff20fc6 100644 --- a/unifi_protect_backup/downloader.py +++ b/unifi_protect_backup/downloader.py @@ -11,6 +11,7 @@ import aiosqlite import pytz from aiohttp.client_exceptions import ClientPayloadError from expiring_dict import ExpiringDict # type: ignore +from aiolimiter import AsyncLimiter from pyunifiprotect import ProtectApiClient from pyunifiprotect.data.nvr import Event from pyunifiprotect.data.types import EventType @@ -48,6 +49,7 @@ class VideoDownloader: download_queue: asyncio.Queue, upload_queue: VideoQueue, color_logging: bool, + download_rate_limit: float, ): """Init. @@ -57,6 +59,7 @@ class VideoDownloader: download_queue (asyncio.Queue): Queue to get event details from upload_queue (VideoQueue): Queue to place downloaded videos on color_logging (bool): Whether or not to add color to logging output + download_rate_limit (float): Limit how events can be downloaded in one minute", """ self._protect: ProtectApiClient = protect self._db: aiosqlite.Connection = db @@ -64,6 +67,8 @@ class VideoDownloader: self.upload_queue: VideoQueue = upload_queue self.current_event = None self._failures = ExpiringDict(60 * 60 * 12) # Time to live = 12h + self._download_rate_limit = download_rate_limit + self._limiter = AsyncLimiter(self._download_rate_limit) if self._download_rate_limit is not None else None self.base_logger = logging.getLogger(__name__) setup_event_logger(self.base_logger, color_logging) @@ -81,6 +86,10 @@ class VideoDownloader: """Main loop.""" self.logger.info("Starting Downloader") while True: + if self._limiter: + self.logger.debug("Waiting for rate limit") + await self._limiter.acquire() + try: # Wait for unifi protect to be connected await self._protect.connect_event.wait() diff --git a/unifi_protect_backup/unifi_protect_backup_core.py b/unifi_protect_backup/unifi_protect_backup_core.py index e37c91e..ab3b83e 100644 --- a/unifi_protect_backup/unifi_protect_backup_core.py +++ b/unifi_protect_backup/unifi_protect_backup_core.py @@ -63,7 +63,8 @@ class UnifiProtectBackup: apprise_notifiers: str, skip_missing: bool, sqlite_path: str = "events.sqlite", - color_logging=False, + color_logging: bool = False, + download_rate_limit: float = None, port: int = 443, ): """Will configure logging settings and the Unifi Protect API (but not actually connect). @@ -93,6 +94,7 @@ class UnifiProtectBackup: skip_missing (bool): If initial missing events should be ignored sqlite_path (str): Path where to find/create sqlite database color_logging (bool): Whether to add color to logging output or not + download_rate_limit (float): Limit how events can be downloaded in one minute. Disabled by default", """ self.color_logging = color_logging setup_logging(verbose, self.color_logging) @@ -127,6 +129,7 @@ class UnifiProtectBackup: logger.debug(f" {purge_interval=}") logger.debug(f" {apprise_notifiers=}") logger.debug(f" {skip_missing=}") + logger.debug(f" {download_rate_limit=} events per minute") self.rclone_destination = rclone_destination self.retention = retention @@ -158,6 +161,7 @@ class UnifiProtectBackup: self._download_buffer_size = download_buffer_size self._purge_interval = purge_interval self._skip_missing = skip_missing + self._download_rate_limit = download_rate_limit async def start(self): """Bootstrap the backup process and kick off the main loop. @@ -217,7 +221,9 @@ class UnifiProtectBackup: # Create downloader task # This will download video files to its buffer - downloader = VideoDownloader(self._protect, self._db, download_queue, upload_queue, self.color_logging) + downloader = VideoDownloader( + self._protect, self._db, download_queue, upload_queue, self.color_logging, self._download_rate_limit + ) tasks.append(downloader.start()) # Create upload task