This commit is contained in:
alash2k
2025-01-29 16:59:09 +00:00
committed by GitHub
6 changed files with 58 additions and 71 deletions

View File

@@ -14,38 +14,6 @@ on:
# A workflow run is made up of one or more jobs that can run sequentially or in parallel
jobs:
# This workflow contains a single job called "test"
test:
# The type of runner that the job will run on
strategy:
matrix:
python-versions: [3.9]
os: [ubuntu-18.04, macos-latest, windows-latest]
runs-on: ${{ matrix.os }}
# Steps represent a sequence of tasks that will be executed as part of the job
steps:
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-versions }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install poetry tox tox-gh-actions
- name: test with tox
run:
tox
- name: list files
run: ls -l .
- uses: codecov/codecov-action@v1
with:
fail_ci_if_error: true
files: coverage.xml
dev_container:
name: Create dev container
@@ -57,9 +25,9 @@ jobs:
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
- uses: actions/setup-python@v5
with:
python-version: 3.10
python-version: 3.13
- name: Install dependencies
run: |

View File

@@ -175,16 +175,24 @@ class VideoDownloader:
async def _download(self, event: Event) -> Optional[bytes]:
"""Downloads the video clip for the given event."""
self.logger.debug(" Downloading video...")
for x in range(5):
self.logger.debug(" Downloading video...")
assert isinstance(event.camera_id, str)
assert isinstance(event.start, datetime)
assert isinstance(event.end, datetime)
request_start_time = datetime.now()
try:
video = await self._protect.get_camera_video(event.camera_id, event.start, event.end)
assert isinstance(video, bytes)
break
except (AssertionError, ClientPayloadError, TimeoutError) as e:
diff_seconds = (datetime.now() - request_start_time).total_seconds()
if diff_seconds > 60:
self.logger.error(f"Ignoring event. Total wait: {diff_seconds}. Camera: {await get_camera_name(self._protect, event.camera_id)}. Start: {event.start.strftime('%Y-%m-%dT%H-%M-%S')} ({event.start.timestamp()}) End: {event.end.strftime('%Y-%m-%dT%H-%M-%S')} ({event.end.timestamp()})", exc_info=e)
await self._ignore_event(event)
break
self.logger.warning(f" Failed download attempt {x+1}, retying in 1s", exc_info=e)
await asyncio.sleep(1)
else:

View File

@@ -103,9 +103,9 @@ class MissingEventChecker:
downloading_event_ids.add(current_download.id)
uploading_event_ids = {event.id for event, video in self._uploader.upload_queue._queue} # type: ignore
current_upload = self._uploader.current_event
if current_upload is not None:
uploading_event_ids.add(current_upload.id)
for current_upload in self._uploader.current_events:
if current_upload is not None:
uploading_event_ids.add(current_upload.id)
missing_event_ids = set(unifi_events.keys()) - (db_event_ids | downloading_event_ids | uploading_event_ids)

View File

@@ -85,5 +85,5 @@ class Purge:
logger.error("Unexpected exception occurred during purge:", exc_info=e)
next_purge_time = datetime.now() + self.interval
logger.extra_debug(f"sleeping until {next_purge_time}")
logger.debug(f"sleeping until {next_purge_time}")
await wait_until(next_purge_time)

View File

@@ -84,7 +84,7 @@ class UnifiProtectBackup:
color_logging: bool = False,
download_rate_limit: float | None = None,
port: int = 443,
use_experimental_downloader: bool = False,
use_experimental_downloader: bool = False
):
"""Will configure logging settings and the Unifi Protect API (but not actually connect).
@@ -281,7 +281,7 @@ class UnifiProtectBackup:
self.rclone_args,
self.file_structure_format,
self._db,
self.color_logging,
self.color_logging
)
tasks.append(uploader.start())

View File

@@ -4,7 +4,8 @@ import logging
import pathlib
import re
from datetime import datetime
import os
import asyncio
import aiosqlite
from uiprotect import ProtectApiClient
from uiprotect.data.nvr import Event
@@ -52,12 +53,42 @@ class VideoUploader:
self._rclone_args: str = rclone_args
self._file_structure_format: str = file_structure_format
self._db: aiosqlite.Connection = db
self.current_event = None
self.current_events = []
self.base_logger = logging.getLogger(__name__)
setup_event_logger(self.base_logger, color_logging)
self.logger = logging.LoggerAdapter(self.base_logger, {"event": ""})
async def _upload_worker(self, semaphore, worker_id):
async with semaphore:
while True:
try:
event, video = await self.upload_queue.get()
self.current_events[worker_id] = event
logger = logging.LoggerAdapter(self.base_logger, {'event': f' [{event.id}]'})
logger.info(f"Uploading event: {event.id}")
logger.debug(
f" Remaining Upload Queue: {self.upload_queue.qsize_files()}"
f" ({human_readable_size(self.upload_queue.qsize())})"
)
destination = await self._generate_file_path(event)
logger.debug(f" Destination: {destination}")
try:
await self._upload_video(video, destination, self._rclone_args)
await self._update_database(event, destination)
logger.debug("Uploaded")
except SubprocessException:
logger.error(f" Failed to upload file: '{destination}'")
except Exception as e:
logger.error(f"Unexpected exception occurred, abandoning event {event.id}:", exc_info=e)
self.current_events[worker_id] = None
async def start(self):
"""Main loop.
@@ -65,33 +96,13 @@ class VideoUploader:
using rclone, finally it updates the database
"""
self.logger.info("Starting Uploader")
while True:
try:
event, video = await self.upload_queue.get()
self.current_event = event
rclone_transfers = int(os.getenv('RCLONE_PARALLEL_UPLOADS', '1'))
self.current_events = [None] * rclone_transfers
semaphore = asyncio.Semaphore(rclone_transfers)
self.logger = logging.LoggerAdapter(self.base_logger, {"event": f" [{event.id}]"})
self.logger.info(f"Uploading event: {event.id}")
self.logger.debug(
f" Remaining Upload Queue: {self.upload_queue.qsize_files()}"
f" ({human_readable_size(self.upload_queue.qsize())})"
)
destination = await self._generate_file_path(event)
self.logger.debug(f" Destination: {destination}")
try:
await self._upload_video(video, destination, self._rclone_args)
await self._update_database(event, destination)
self.logger.debug("Uploaded")
except SubprocessException:
self.logger.error(f" Failed to upload file: '{destination}'")
self.current_event = None
except Exception as e:
self.logger.error(f"Unexpected exception occurred, abandoning event {event.id}:", exc_info=e)
workers = [self._upload_worker(semaphore, i) for i in range(rclone_transfers)]
await asyncio.gather(*workers)
async def _upload_video(self, video: bytes, destination: pathlib.Path, rclone_args: str):
"""Upload video using rclone.
@@ -163,7 +174,7 @@ class VideoUploader:
"camera_name": await get_camera_name(self._protect, event.camera_id),
}
file_path = self._file_structure_format.format(**format_context)
file_path = self._file_structure_format.format(**format_context).lower()
file_path = re.sub(r"[^\w\-_\.\(\)/ ]", "", file_path) # Sanitize any invalid chars
file_path = file_path.replace(" ", "_")
return pathlib.Path(f"{self._rclone_destination}/{file_path}")