Files
Sebastian Goscik eaabfbdb4e Add support for Finger Print, NFC Card Scan, and Audio Detections
Also refactored code that checks if an event should be backed up into one common shared function.
2025-07-18 22:44:01 +01:00

172 lines
6.4 KiB
Python

# noqa: D100
import logging
import pathlib
import re
from datetime import datetime
import aiosqlite
from uiprotect import ProtectApiClient
from uiprotect.data.nvr import Event
from unifi_protect_backup.utils import (
SubprocessException,
VideoQueue,
get_camera_name,
human_readable_size,
run_command,
setup_event_logger,
)
class VideoUploader:
"""Uploads videos from the video_queue to the provided rclone destination.
Keeps a log of what its uploaded in `db`
"""
def __init__(
self,
protect: ProtectApiClient,
upload_queue: VideoQueue,
rclone_destination: str,
rclone_args: str,
file_structure_format: str,
db: aiosqlite.Connection,
color_logging: bool,
):
"""Init.
Args:
protect (ProtectApiClient): UniFi Protect API client to use
upload_queue (VideoQueue): Queue to get video files from
rclone_destination (str): rclone file destination URI
rclone_args (str): arguments to pass to the rclone command
file_structure_format (str): format string for how to structure the uploaded files
db (aiosqlite.Connection): Async SQlite database connection
color_logging (bool): Whether or not to add color to logging output
"""
self._protect: ProtectApiClient = protect
self.upload_queue: VideoQueue = upload_queue
self._rclone_destination: str = rclone_destination
self._rclone_args: str = rclone_args
self._file_structure_format: str = file_structure_format
self._db: aiosqlite.Connection = db
self.current_event = None
self.base_logger = logging.getLogger(__name__)
setup_event_logger(self.base_logger, color_logging)
self.logger = logging.LoggerAdapter(self.base_logger, {"event": ""})
async def start(self):
"""Run main loop.
Runs forever looking for video data in the video queue and then uploads it
using rclone, finally it updates the database
"""
self.logger.info("Starting Uploader")
while True:
try:
event, video = await self.upload_queue.get()
self.current_event = event
self.logger = logging.LoggerAdapter(self.base_logger, {"event": f" [{event.id}]"})
self.logger.info(f"Uploading event: {event.id}")
self.logger.debug(
f" Remaining Upload Queue: {self.upload_queue.qsize_files()}"
f" ({human_readable_size(self.upload_queue.qsize())})"
)
destination = await self._generate_file_path(event)
self.logger.debug(f" Destination: {destination}")
try:
await self._upload_video(video, destination, self._rclone_args)
await self._update_database(event, destination)
self.logger.debug("Uploaded")
except SubprocessException:
self.logger.error(f" Failed to upload file: '{destination}'")
self.current_event = None
except Exception as e:
self.logger.error(f"Unexpected exception occurred, abandoning event {event.id}:", exc_info=e)
async def _upload_video(self, video: bytes, destination: pathlib.Path, rclone_args: str):
"""Upload video using rclone.
In order to avoid writing to disk, the video file data is piped directly
to the rclone process and uploaded using the `rcat` function of rclone.
Args:
video (bytes): The data to be written to the file
destination (pathlib.Path): Where rclone should write the file
rclone_args (str): Optional extra arguments to pass to `rclone`
Raises:
RuntimeError: If rclone returns a non-zero exit code
"""
returncode, stdout, stderr = await run_command(f'rclone rcat -vv {rclone_args} "{destination}"', video)
if returncode != 0:
raise SubprocessException(stdout, stderr, returncode)
async def _update_database(self, event: Event, destination: str):
"""Add the backed up event to the database along with where it was backed up to."""
assert isinstance(event.start, datetime)
assert isinstance(event.end, datetime)
await self._db.execute(
"INSERT INTO events VALUES "
f"('{event.id}', '{event.type.value}', '{event.camera_id}',"
f"'{event.start.timestamp()}', '{event.end.timestamp()}')"
)
remote, file_path = str(destination).split(":")
await self._db.execute(
f"""INSERT INTO backups VALUES
('{event.id}', '{remote}', '{file_path}')
"""
)
await self._db.commit()
async def _generate_file_path(self, event: Event) -> pathlib.Path:
"""Generate the rclone destination path for the provided event.
Generates rclone destination path for the given even based upon the format string
in `self.file_structure_format`.
Provides the following fields to the format string:
event: The `Event` object as per
https://github.com/briis/uiprotect/blob/master/uiprotect/data/nvr.py
duration_seconds: The duration of the event in seconds
detection_type: A nicely formatted list of the event detection type and the smart detection types (if any)
camera_name: The name of the camera that generated this event
Args:
event: The event for which to create an output path
Returns:
pathlib.Path: The rclone path the event should be backed up to
"""
assert isinstance(event.camera_id, str)
assert isinstance(event.start, datetime)
assert isinstance(event.end, datetime)
format_context = {
"event": event,
"duration_seconds": (event.end - event.start).total_seconds(),
"detection_type": f"{event.type.value} ({' '.join(event.smart_detect_types)})"
if event.smart_detect_types
else f"{event.type.value}",
"camera_name": await get_camera_name(self._protect, event.camera_id),
}
file_path = self._file_structure_format.format(**format_context)
file_path = re.sub(r"[^\w\-_\.\(\)/ ]", "", file_path) # Sanitize any invalid chars
return pathlib.Path(f"{self._rclone_destination}/{file_path}")