Add experimental downloader

This uses a new API to download events like the way the web ui does, where it first asks for a video to be prepared (on the unifi protect host) and then downloads it. This might be potentially more stable than the existing downloader.
This commit is contained in:
Sebastian Goscik
2024-06-06 00:15:06 +01:00
parent 2dac2cee23
commit 30ea7de5c2
4 changed files with 254 additions and 1 deletions

View File

@@ -5,6 +5,7 @@ __email__ = 'sebastian@goscik.com'
__version__ = '0.10.7'
from .downloader import VideoDownloader
from .downloader_experimental import VideoDownloaderExperimental
from .event_listener import EventListener
from .purge import Purge
from .uploader import VideoUploader

View File

@@ -211,6 +211,19 @@ Subsequent missing events will be downloaded (e.g. a missed event)
type=int,
help="Only download events shorter than this maximum length, in seconds",
)
@click.option(
'--experimental-downloader',
'use_experimental_downloader',
default=False,
show_default=True,
is_flag=True,
envvar='EXPERIMENTAL_DOWNLOADER',
help="""\b
If set, a new experimental download mechanism will be used to match
what the web UI does. This might be more stable if you are experiencing
a lot of failed downloads with the default downloader.
""",
)
def main(**kwargs):
"""A Python based tool for backing up Unifi Protect event clips as they occur."""
event_listener = UnifiProtectBackup(**kwargs)

View File

@@ -0,0 +1,228 @@
# noqa: D100
import asyncio
import json
import logging
import shutil
from datetime import datetime, timedelta, timezone
from typing import Optional
import aiosqlite
import pytz
from aiohttp.client_exceptions import ClientPayloadError
from expiring_dict import ExpiringDict # type: ignore
from aiolimiter import AsyncLimiter
from pyunifiprotect import ProtectApiClient
from pyunifiprotect.data.nvr import Event
from pyunifiprotect.data.types import EventType
from unifi_protect_backup.utils import (
SubprocessException,
VideoQueue,
get_camera_name,
human_readable_size,
run_command,
setup_event_logger,
)
async def get_video_length(video: bytes) -> float:
"""Uses ffprobe to get the length of the video file passed in as a byte stream."""
returncode, stdout, stderr = await run_command(
'ffprobe -v quiet -show_streams -select_streams v:0 -of json -', video
)
if returncode != 0:
raise SubprocessException(stdout, stderr, returncode)
json_data = json.loads(stdout)
return float(json_data['streams'][0]['duration'])
class VideoDownloaderExperimental:
"""Downloads event video clips from Unifi Protect."""
def __init__(
self,
protect: ProtectApiClient,
db: aiosqlite.Connection,
download_queue: asyncio.Queue,
upload_queue: VideoQueue,
color_logging: bool,
download_rate_limit: float,
max_event_length: timedelta,
):
"""Init.
Args:
protect (ProtectApiClient): UniFi Protect API client to use
db (aiosqlite.Connection): Async SQLite database to check for missing events
download_queue (asyncio.Queue): Queue to get event details from
upload_queue (VideoQueue): Queue to place downloaded videos on
color_logging (bool): Whether or not to add color to logging output
download_rate_limit (float): Limit how events can be downloaded in one minute",
max_event_length (timedelta): Maximum length in seconds for an event to be considered valid and downloaded
"""
self._protect: ProtectApiClient = protect
self._db: aiosqlite.Connection = db
self.download_queue: asyncio.Queue = download_queue
self.upload_queue: VideoQueue = upload_queue
self.current_event = None
self._failures = ExpiringDict(60 * 60 * 12) # Time to live = 12h
self._download_rate_limit = download_rate_limit
self._max_event_length = max_event_length
self._limiter = AsyncLimiter(self._download_rate_limit) if self._download_rate_limit is not None else None
self.base_logger = logging.getLogger(__name__)
setup_event_logger(self.base_logger, color_logging)
self.logger = logging.LoggerAdapter(self.base_logger, {'event': ''})
# Check if `ffprobe` is available
ffprobe = shutil.which('ffprobe')
if ffprobe is not None:
self.logger.debug(f"ffprobe found: {ffprobe}")
self._has_ffprobe = True
else:
self._has_ffprobe = False
async def start(self):
"""Main loop."""
self.logger.info("Starting Downloader")
while True:
if self._limiter:
self.logger.debug("Waiting for rate limit")
await self._limiter.acquire()
try:
# Wait for unifi protect to be connected
await self._protect.connect_event.wait()
event = await self.download_queue.get()
self.current_event = event
self.logger = logging.LoggerAdapter(self.base_logger, {'event': f' [{event.id}]'})
# Fix timezones since pyunifiprotect sets all timestamps to UTC. Instead localize them to
# the timezone of the unifi protect NVR.
event.start = event.start.replace(tzinfo=pytz.utc).astimezone(self._protect.bootstrap.nvr.timezone)
event.end = event.end.replace(tzinfo=pytz.utc).astimezone(self._protect.bootstrap.nvr.timezone)
self.logger.info(f"Downloading event: {event.id}")
self.logger.debug(f"Remaining Download Queue: {self.download_queue.qsize()}")
output_queue_current_size = human_readable_size(self.upload_queue.qsize())
output_queue_max_size = human_readable_size(self.upload_queue.maxsize)
self.logger.debug(f"Video Download Buffer: {output_queue_current_size}/{output_queue_max_size}")
self.logger.debug(f" Camera: {await get_camera_name(self._protect, event.camera_id)}")
if event.type == EventType.SMART_DETECT:
self.logger.debug(f" Type: {event.type.value} ({', '.join(event.smart_detect_types)})")
else:
self.logger.debug(f" Type: {event.type.value}")
self.logger.debug(f" Start: {event.start.strftime('%Y-%m-%dT%H-%M-%S')} ({event.start.timestamp()})")
self.logger.debug(f" End: {event.end.strftime('%Y-%m-%dT%H-%M-%S')} ({event.end.timestamp()})")
duration = (event.end - event.start).total_seconds()
self.logger.debug(f" Duration: {duration}s")
# Skip invalid events
if not self._valid_event(event):
await self._ignore_event(event)
continue
# Unifi protect does not return full video clips if the clip is requested too soon.
# There are two issues at play here:
# - Protect will only cut a clip on an keyframe which happen every 5s
# - Protect's pipeline needs a finite amount of time to make a clip available
# So we will wait 1.5x the keyframe interval to ensure that there is always ample video
# stored and Protect can return a full clip (which should be at least the length requested,
# but often longer)
time_since_event_ended = datetime.utcnow().replace(tzinfo=timezone.utc) - event.end
sleep_time = (timedelta(seconds=5 * 1.5) - time_since_event_ended).total_seconds()
if sleep_time > 0:
self.logger.debug(f" Sleeping ({sleep_time}s) to ensure clip is ready to download...")
await asyncio.sleep(sleep_time)
try:
video = await self._download(event)
assert video is not None
except Exception as e:
# Increment failure count
if event.id not in self._failures:
self._failures[event.id] = 1
else:
self._failures[event.id] += 1
self.logger.warning(f"Event failed download attempt {self._failures[event.id]}", exc_info=e)
if self._failures[event.id] >= 10:
self.logger.error(
"Event has failed to download 10 times in a row. Permanently ignoring this event"
)
await self._ignore_event(event)
continue
# Remove successfully downloaded event from failures list
if event.id in self._failures:
del self._failures[event.id]
# Get the actual length of the downloaded video using ffprobe
if self._has_ffprobe:
await self._check_video_length(video, duration)
await self.upload_queue.put((event, video))
self.logger.debug("Added to upload queue")
self.current_event = None
except Exception as e:
self.logger.error(f"Unexpected exception occurred, abandoning event {event.id}:", exc_info=e)
async def _download(self, event: Event) -> Optional[bytes]:
"""Downloads the video clip for the given event."""
self.logger.debug(" Downloading video...")
for x in range(5):
assert isinstance(event.camera_id, str)
assert isinstance(event.start, datetime)
assert isinstance(event.end, datetime)
try:
prepared_video_file = await self._protect.prepare_camera_video(event.camera_id, event.start, event.end)
video = await self._protect.download_camera_video(event.camera_id, prepared_video_file['fileName'])
assert isinstance(video, bytes)
break
except (AssertionError, ClientPayloadError, TimeoutError) as e:
self.logger.warning(f" Failed download attempt {x+1}, retying in 1s", exc_info=e)
await asyncio.sleep(1)
else:
self.logger.error(f"Download failed after 5 attempts, abandoning event {event.id}:")
return None
self.logger.debug(f" Downloaded video size: {human_readable_size(len(video))}s")
return video
async def _ignore_event(self, event):
self.logger.warning("Ignoring event")
await self._db.execute(
"INSERT INTO events VALUES "
f"('{event.id}', '{event.type.value}', '{event.camera_id}',"
f"'{event.start.timestamp()}', '{event.end.timestamp()}')"
)
await self._db.commit()
async def _check_video_length(self, video, duration):
"""Check if the downloaded event is at least the length of the event, warn otherwise.
It is expected for events to regularly be slightly longer than the event specified
"""
try:
downloaded_duration = await get_video_length(video)
msg = f" Downloaded video length: {downloaded_duration:.3f}s" f"({downloaded_duration - duration:+.3f}s)"
if downloaded_duration < duration:
self.logger.warning(msg)
else:
self.logger.debug(msg)
except SubprocessException as e:
self.logger.warning(" `ffprobe` failed", exc_info=e)
def _valid_event(self, event):
duration = event.end - event.start
if duration > self._max_event_length:
self.logger.warning(f"Event longer ({duration}) than max allowed length {self._max_event_length}")
return False
return True

View File

@@ -1,4 +1,5 @@
"""Main module."""
import asyncio
import logging
import os
@@ -16,6 +17,7 @@ from unifi_protect_backup import (
MissingEventChecker,
Purge,
VideoDownloader,
VideoDownloaderExperimental,
VideoUploader,
notifications,
)
@@ -67,6 +69,7 @@ class UnifiProtectBackup:
color_logging: bool = False,
download_rate_limit: float = None,
port: int = 443,
use_experimental_downloader: bool = False,
):
"""Will configure logging settings and the Unifi Protect API (but not actually connect).
@@ -97,6 +100,7 @@ class UnifiProtectBackup:
color_logging (bool): Whether to add color to logging output or not
download_rate_limit (float): Limit how events can be downloaded in one minute. Disabled by default",
max_event_length (int): Maximum length in seconds for an event to be considered valid and downloaded
use_experimental_downloader (bool): Use the new experimental downloader (the same method as used by the webUI)
"""
self.color_logging = color_logging
setup_logging(verbose, self.color_logging)
@@ -133,6 +137,7 @@ class UnifiProtectBackup:
logger.debug(f" {skip_missing=}")
logger.debug(f" {download_rate_limit=} events per minute")
logger.debug(f" {max_event_length=}s")
logger.debug(f" {use_experimental_downloader=}")
self.rclone_destination = rclone_destination
self.retention = retention
@@ -166,6 +171,7 @@ class UnifiProtectBackup:
self._skip_missing = skip_missing
self._download_rate_limit = download_rate_limit
self._max_event_length = timedelta(seconds=max_event_length)
self._use_experimental_downloader = use_experimental_downloader
async def start(self):
"""Bootstrap the backup process and kick off the main loop.
@@ -225,7 +231,12 @@ class UnifiProtectBackup:
# Create downloader task
# This will download video files to its buffer
downloader = VideoDownloader(
if self._use_experimental_downloader:
downloader_cls = VideoDownloaderExperimental
else:
downloader_cls = VideoDownloader
downloader = downloader_cls(
self._protect,
self._db,
download_queue,