This commit is contained in:
alash2k
2025-01-18 15:38:32 +00:00
committed by GitHub
6 changed files with 58 additions and 71 deletions

View File

@@ -14,38 +14,6 @@ on:
# A workflow run is made up of one or more jobs that can run sequentially or in parallel # A workflow run is made up of one or more jobs that can run sequentially or in parallel
jobs: jobs:
# This workflow contains a single job called "test" # This workflow contains a single job called "test"
test:
# The type of runner that the job will run on
strategy:
matrix:
python-versions: [3.9]
os: [ubuntu-18.04, macos-latest, windows-latest]
runs-on: ${{ matrix.os }}
# Steps represent a sequence of tasks that will be executed as part of the job
steps:
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-versions }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install poetry tox tox-gh-actions
- name: test with tox
run:
tox
- name: list files
run: ls -l .
- uses: codecov/codecov-action@v1
with:
fail_ci_if_error: true
files: coverage.xml
dev_container: dev_container:
name: Create dev container name: Create dev container
@@ -57,9 +25,9 @@ jobs:
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- uses: actions/setup-python@v2 - uses: actions/setup-python@v5
with: with:
python-version: 3.10 python-version: 3.13
- name: Install dependencies - name: Install dependencies
run: | run: |

View File

@@ -175,16 +175,24 @@ class VideoDownloader:
async def _download(self, event: Event) -> Optional[bytes]: async def _download(self, event: Event) -> Optional[bytes]:
"""Downloads the video clip for the given event.""" """Downloads the video clip for the given event."""
self.logger.debug(" Downloading video...")
for x in range(5): for x in range(5):
self.logger.debug(" Downloading video...")
assert isinstance(event.camera_id, str) assert isinstance(event.camera_id, str)
assert isinstance(event.start, datetime) assert isinstance(event.start, datetime)
assert isinstance(event.end, datetime) assert isinstance(event.end, datetime)
request_start_time = datetime.now()
try: try:
video = await self._protect.get_camera_video(event.camera_id, event.start, event.end) video = await self._protect.get_camera_video(event.camera_id, event.start, event.end)
assert isinstance(video, bytes) assert isinstance(video, bytes)
break break
except (AssertionError, ClientPayloadError, TimeoutError) as e: except (AssertionError, ClientPayloadError, TimeoutError) as e:
diff_seconds = (datetime.now() - request_start_time).total_seconds()
if diff_seconds > 60:
self.logger.error(f"Ignoring event. Total wait: {diff_seconds}. Camera: {await get_camera_name(self._protect, event.camera_id)}. Start: {event.start.strftime('%Y-%m-%dT%H-%M-%S')} ({event.start.timestamp()}) End: {event.end.strftime('%Y-%m-%dT%H-%M-%S')} ({event.end.timestamp()})", exc_info=e)
await self._ignore_event(event)
break
self.logger.warning(f" Failed download attempt {x+1}, retying in 1s", exc_info=e) self.logger.warning(f" Failed download attempt {x+1}, retying in 1s", exc_info=e)
await asyncio.sleep(1) await asyncio.sleep(1)
else: else:

View File

@@ -103,7 +103,7 @@ class MissingEventChecker:
downloading_event_ids.add(current_download.id) downloading_event_ids.add(current_download.id)
uploading_event_ids = {event.id for event, video in self._uploader.upload_queue._queue} # type: ignore uploading_event_ids = {event.id for event, video in self._uploader.upload_queue._queue} # type: ignore
current_upload = self._uploader.current_event for current_upload in self._uploader.current_events:
if current_upload is not None: if current_upload is not None:
uploading_event_ids.add(current_upload.id) uploading_event_ids.add(current_upload.id)

View File

@@ -85,5 +85,5 @@ class Purge:
logger.error("Unexpected exception occurred during purge:", exc_info=e) logger.error("Unexpected exception occurred during purge:", exc_info=e)
next_purge_time = datetime.now() + self.interval next_purge_time = datetime.now() + self.interval
logger.extra_debug(f"sleeping until {next_purge_time}") logger.debug(f"sleeping until {next_purge_time}")
await wait_until(next_purge_time) await wait_until(next_purge_time)

View File

@@ -76,7 +76,7 @@ class UnifiProtectBackup:
color_logging: bool = False, color_logging: bool = False,
download_rate_limit: float | None = None, download_rate_limit: float | None = None,
port: int = 443, port: int = 443,
use_experimental_downloader: bool = False, use_experimental_downloader: bool = False
): ):
"""Will configure logging settings and the Unifi Protect API (but not actually connect). """Will configure logging settings and the Unifi Protect API (but not actually connect).
@@ -273,7 +273,7 @@ class UnifiProtectBackup:
self.rclone_args, self.rclone_args,
self.file_structure_format, self.file_structure_format,
self._db, self._db,
self.color_logging, self.color_logging
) )
tasks.append(uploader.start()) tasks.append(uploader.start())

View File

@@ -4,7 +4,8 @@ import logging
import pathlib import pathlib
import re import re
from datetime import datetime from datetime import datetime
import os
import asyncio
import aiosqlite import aiosqlite
from uiprotect import ProtectApiClient from uiprotect import ProtectApiClient
from uiprotect.data.nvr import Event from uiprotect.data.nvr import Event
@@ -52,12 +53,42 @@ class VideoUploader:
self._rclone_args: str = rclone_args self._rclone_args: str = rclone_args
self._file_structure_format: str = file_structure_format self._file_structure_format: str = file_structure_format
self._db: aiosqlite.Connection = db self._db: aiosqlite.Connection = db
self.current_event = None self.current_events = []
self.base_logger = logging.getLogger(__name__) self.base_logger = logging.getLogger(__name__)
setup_event_logger(self.base_logger, color_logging) setup_event_logger(self.base_logger, color_logging)
self.logger = logging.LoggerAdapter(self.base_logger, {"event": ""}) self.logger = logging.LoggerAdapter(self.base_logger, {"event": ""})
async def _upload_worker(self, semaphore, worker_id):
async with semaphore:
while True:
try:
event, video = await self.upload_queue.get()
self.current_events[worker_id] = event
logger = logging.LoggerAdapter(self.base_logger, {'event': f' [{event.id}]'})
logger.info(f"Uploading event: {event.id}")
logger.debug(
f" Remaining Upload Queue: {self.upload_queue.qsize_files()}"
f" ({human_readable_size(self.upload_queue.qsize())})"
)
destination = await self._generate_file_path(event)
logger.debug(f" Destination: {destination}")
try:
await self._upload_video(video, destination, self._rclone_args)
await self._update_database(event, destination)
logger.debug("Uploaded")
except SubprocessException:
logger.error(f" Failed to upload file: '{destination}'")
except Exception as e:
logger.error(f"Unexpected exception occurred, abandoning event {event.id}:", exc_info=e)
self.current_events[worker_id] = None
async def start(self): async def start(self):
"""Main loop. """Main loop.
@@ -65,33 +96,13 @@ class VideoUploader:
using rclone, finally it updates the database using rclone, finally it updates the database
""" """
self.logger.info("Starting Uploader") self.logger.info("Starting Uploader")
while True:
try:
event, video = await self.upload_queue.get()
self.current_event = event
self.logger = logging.LoggerAdapter(self.base_logger, {"event": f" [{event.id}]"}) rclone_transfers = int(os.getenv('RCLONE_PARALLEL_UPLOADS', '1'))
self.current_events = [None] * rclone_transfers
semaphore = asyncio.Semaphore(rclone_transfers)
self.logger.info(f"Uploading event: {event.id}") workers = [self._upload_worker(semaphore, i) for i in range(rclone_transfers)]
self.logger.debug( await asyncio.gather(*workers)
f" Remaining Upload Queue: {self.upload_queue.qsize_files()}"
f" ({human_readable_size(self.upload_queue.qsize())})"
)
destination = await self._generate_file_path(event)
self.logger.debug(f" Destination: {destination}")
try:
await self._upload_video(video, destination, self._rclone_args)
await self._update_database(event, destination)
self.logger.debug("Uploaded")
except SubprocessException:
self.logger.error(f" Failed to upload file: '{destination}'")
self.current_event = None
except Exception as e:
self.logger.error(f"Unexpected exception occurred, abandoning event {event.id}:", exc_info=e)
async def _upload_video(self, video: bytes, destination: pathlib.Path, rclone_args: str): async def _upload_video(self, video: bytes, destination: pathlib.Path, rclone_args: str):
"""Upload video using rclone. """Upload video using rclone.
@@ -163,7 +174,7 @@ class VideoUploader:
"camera_name": await get_camera_name(self._protect, event.camera_id), "camera_name": await get_camera_name(self._protect, event.camera_id),
} }
file_path = self._file_structure_format.format(**format_context) file_path = self._file_structure_format.format(**format_context).lower()
file_path = re.sub(r"[^\w\-_\.\(\)/ ]", "", file_path) # Sanitize any invalid chars file_path = re.sub(r"[^\w\-_\.\(\)/ ]", "", file_path) # Sanitize any invalid chars
file_path = file_path.replace(" ", "_")
return pathlib.Path(f"{self._rclone_destination}/{file_path}") return pathlib.Path(f"{self._rclone_destination}/{file_path}")