diff --git a/.github/workflows/dev.yml b/.github/workflows/dev.yml index 7b135fc..fd1a2e0 100644 --- a/.github/workflows/dev.yml +++ b/.github/workflows/dev.yml @@ -14,38 +14,6 @@ on: # A workflow run is made up of one or more jobs that can run sequentially or in parallel jobs: # This workflow contains a single job called "test" - test: - # The type of runner that the job will run on - strategy: - matrix: - python-versions: [3.9] - os: [ubuntu-18.04, macos-latest, windows-latest] - runs-on: ${{ matrix.os }} - - # Steps represent a sequence of tasks that will be executed as part of the job - steps: - # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it - - uses: actions/checkout@v2 - - uses: actions/setup-python@v2 - with: - python-version: ${{ matrix.python-versions }} - - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install poetry tox tox-gh-actions - - - name: test with tox - run: - tox - - - name: list files - run: ls -l . - - - uses: codecov/codecov-action@v1 - with: - fail_ci_if_error: true - files: coverage.xml dev_container: name: Create dev container @@ -57,9 +25,9 @@ jobs: # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it - uses: actions/checkout@v2 - - uses: actions/setup-python@v2 + - uses: actions/setup-python@v5 with: - python-version: 3.10 + python-version: 3.13 - name: Install dependencies run: | diff --git a/unifi_protect_backup/downloader.py b/unifi_protect_backup/downloader.py index 47a5d43..5635155 100644 --- a/unifi_protect_backup/downloader.py +++ b/unifi_protect_backup/downloader.py @@ -175,16 +175,24 @@ class VideoDownloader: async def _download(self, event: Event) -> Optional[bytes]: """Downloads the video clip for the given event.""" - self.logger.debug(" Downloading video...") + for x in range(5): + self.logger.debug(" Downloading video...") assert isinstance(event.camera_id, str) assert isinstance(event.start, datetime) assert isinstance(event.end, datetime) + + request_start_time = datetime.now() try: video = await self._protect.get_camera_video(event.camera_id, event.start, event.end) assert isinstance(video, bytes) break except (AssertionError, ClientPayloadError, TimeoutError) as e: + diff_seconds = (datetime.now() - request_start_time).total_seconds() + if diff_seconds > 60: + self.logger.error(f"Ignoring event. Total wait: {diff_seconds}. Camera: {await get_camera_name(self._protect, event.camera_id)}. Start: {event.start.strftime('%Y-%m-%dT%H-%M-%S')} ({event.start.timestamp()}) End: {event.end.strftime('%Y-%m-%dT%H-%M-%S')} ({event.end.timestamp()})", exc_info=e) + await self._ignore_event(event) + break self.logger.warning(f" Failed download attempt {x+1}, retying in 1s", exc_info=e) await asyncio.sleep(1) else: diff --git a/unifi_protect_backup/missing_event_checker.py b/unifi_protect_backup/missing_event_checker.py index e8f4876..23609cc 100644 --- a/unifi_protect_backup/missing_event_checker.py +++ b/unifi_protect_backup/missing_event_checker.py @@ -103,9 +103,9 @@ class MissingEventChecker: downloading_event_ids.add(current_download.id) uploading_event_ids = {event.id for event, video in self._uploader.upload_queue._queue} # type: ignore - current_upload = self._uploader.current_event - if current_upload is not None: - uploading_event_ids.add(current_upload.id) + for current_upload in self._uploader.current_events: + if current_upload is not None: + uploading_event_ids.add(current_upload.id) missing_event_ids = set(unifi_events.keys()) - (db_event_ids | downloading_event_ids | uploading_event_ids) diff --git a/unifi_protect_backup/purge.py b/unifi_protect_backup/purge.py index feda6e1..0957e70 100644 --- a/unifi_protect_backup/purge.py +++ b/unifi_protect_backup/purge.py @@ -85,5 +85,5 @@ class Purge: logger.error("Unexpected exception occurred during purge:", exc_info=e) next_purge_time = datetime.now() + self.interval - logger.extra_debug(f"sleeping until {next_purge_time}") + logger.debug(f"sleeping until {next_purge_time}") await wait_until(next_purge_time) diff --git a/unifi_protect_backup/unifi_protect_backup_core.py b/unifi_protect_backup/unifi_protect_backup_core.py index 4cdbb5b..18fc8e0 100644 --- a/unifi_protect_backup/unifi_protect_backup_core.py +++ b/unifi_protect_backup/unifi_protect_backup_core.py @@ -76,7 +76,7 @@ class UnifiProtectBackup: color_logging: bool = False, download_rate_limit: float | None = None, port: int = 443, - use_experimental_downloader: bool = False, + use_experimental_downloader: bool = False ): """Will configure logging settings and the Unifi Protect API (but not actually connect). @@ -273,7 +273,7 @@ class UnifiProtectBackup: self.rclone_args, self.file_structure_format, self._db, - self.color_logging, + self.color_logging ) tasks.append(uploader.start()) diff --git a/unifi_protect_backup/uploader.py b/unifi_protect_backup/uploader.py index 2a860bc..4be3ef0 100644 --- a/unifi_protect_backup/uploader.py +++ b/unifi_protect_backup/uploader.py @@ -4,7 +4,8 @@ import logging import pathlib import re from datetime import datetime - +import os +import asyncio import aiosqlite from uiprotect import ProtectApiClient from uiprotect.data.nvr import Event @@ -52,12 +53,42 @@ class VideoUploader: self._rclone_args: str = rclone_args self._file_structure_format: str = file_structure_format self._db: aiosqlite.Connection = db - self.current_event = None + self.current_events = [] self.base_logger = logging.getLogger(__name__) setup_event_logger(self.base_logger, color_logging) self.logger = logging.LoggerAdapter(self.base_logger, {"event": ""}) + async def _upload_worker(self, semaphore, worker_id): + async with semaphore: + while True: + try: + event, video = await self.upload_queue.get() + self.current_events[worker_id] = event + + logger = logging.LoggerAdapter(self.base_logger, {'event': f' [{event.id}]'}) + + logger.info(f"Uploading event: {event.id}") + logger.debug( + f" Remaining Upload Queue: {self.upload_queue.qsize_files()}" + f" ({human_readable_size(self.upload_queue.qsize())})" + ) + + destination = await self._generate_file_path(event) + logger.debug(f" Destination: {destination}") + + try: + await self._upload_video(video, destination, self._rclone_args) + await self._update_database(event, destination) + logger.debug("Uploaded") + except SubprocessException: + logger.error(f" Failed to upload file: '{destination}'") + + except Exception as e: + logger.error(f"Unexpected exception occurred, abandoning event {event.id}:", exc_info=e) + + self.current_events[worker_id] = None + async def start(self): """Main loop. @@ -65,33 +96,13 @@ class VideoUploader: using rclone, finally it updates the database """ self.logger.info("Starting Uploader") - while True: - try: - event, video = await self.upload_queue.get() - self.current_event = event + + rclone_transfers = int(os.getenv('RCLONE_PARALLEL_UPLOADS', '1')) + self.current_events = [None] * rclone_transfers + semaphore = asyncio.Semaphore(rclone_transfers) - self.logger = logging.LoggerAdapter(self.base_logger, {"event": f" [{event.id}]"}) - - self.logger.info(f"Uploading event: {event.id}") - self.logger.debug( - f" Remaining Upload Queue: {self.upload_queue.qsize_files()}" - f" ({human_readable_size(self.upload_queue.qsize())})" - ) - - destination = await self._generate_file_path(event) - self.logger.debug(f" Destination: {destination}") - - try: - await self._upload_video(video, destination, self._rclone_args) - await self._update_database(event, destination) - self.logger.debug("Uploaded") - except SubprocessException: - self.logger.error(f" Failed to upload file: '{destination}'") - - self.current_event = None - - except Exception as e: - self.logger.error(f"Unexpected exception occurred, abandoning event {event.id}:", exc_info=e) + workers = [self._upload_worker(semaphore, i) for i in range(rclone_transfers)] + await asyncio.gather(*workers) async def _upload_video(self, video: bytes, destination: pathlib.Path, rclone_args: str): """Upload video using rclone. @@ -163,7 +174,7 @@ class VideoUploader: "camera_name": await get_camera_name(self._protect, event.camera_id), } - file_path = self._file_structure_format.format(**format_context) + file_path = self._file_structure_format.format(**format_context).lower() file_path = re.sub(r"[^\w\-_\.\(\)/ ]", "", file_path) # Sanitize any invalid chars - + file_path = file_path.replace(" ", "_") return pathlib.Path(f"{self._rclone_destination}/{file_path}")