mirror of
https://github.com/ep1cman/unifi-protect-backup.git
synced 2025-12-05 23:53:30 +00:00
Previously `-v` showed a lot of spam meesaged for each time the check was done, this is not particularly useful.
551 lines
23 KiB
Python
551 lines
23 KiB
Python
"""Main module."""
|
|
import asyncio
|
|
import logging
|
|
import pathlib
|
|
import shutil
|
|
from asyncio.exceptions import TimeoutError
|
|
from typing import Callable, List, Optional
|
|
|
|
import aiocron
|
|
from aiohttp.client_exceptions import ClientPayloadError
|
|
from pyunifiprotect import NvrError, ProtectApiClient
|
|
from pyunifiprotect.data.nvr import Event
|
|
from pyunifiprotect.data.types import EventType, ModelType
|
|
from pyunifiprotect.data.websocket import WSAction, WSSubscriptionMessage
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class RcloneException(Exception):
|
|
"""Exception class for when rclone does not exit with `0`."""
|
|
|
|
def __init__(self, stdout, stderr, returncode):
|
|
"""Exception class for when rclone does not exit with `0`.
|
|
|
|
Args:
|
|
stdout (str): What rclone output to stdout
|
|
stderr (str): What rclone output to stderr
|
|
returncode (str): The return code of the rclone process
|
|
"""
|
|
super().__init__()
|
|
self.stdout: str = stdout
|
|
self.stderr: str = stderr
|
|
self.returncode: int = returncode
|
|
|
|
def __str__(self):
|
|
"""Turns excpetion into a human readable form."""
|
|
return f"Return Code: {self.returncode}\nStdout:\n{self.stdout}\nStderr:\n{self.stderr}"
|
|
|
|
|
|
def add_logging_level(levelName: str, levelNum: int, methodName: Optional[str] = None) -> None:
|
|
"""Comprehensively adds a new logging level to the `logging` module and the currently configured logging class.
|
|
|
|
`levelName` becomes an attribute of the `logging` module with the value
|
|
`levelNum`. `methodName` becomes a convenience method for both `logging`
|
|
itself and the class returned by `logging.getLoggerClass()` (usually just
|
|
`logging.Logger`).
|
|
|
|
To avoid accidental clobbering of existing attributes, this method will
|
|
raise an `AttributeError` if the level name is already an attribute of the
|
|
`logging` module or if the method name is already present
|
|
|
|
Credit: https://stackoverflow.com/a/35804945
|
|
|
|
Args:
|
|
levelName (str): The name of the new logging level (in all caps).
|
|
levelNum (int): The priority value of the logging level, lower=more verbose.
|
|
methodName (str): The name of the method used to log using this.
|
|
If `methodName` is not specified, `levelName.lower()` is used.
|
|
|
|
Example:
|
|
::
|
|
>>> add_logging_level('TRACE', logging.DEBUG - 5)
|
|
>>> logging.getLogger(__name__).setLevel("TRACE")
|
|
>>> logging.getLogger(__name__).trace('that worked')
|
|
>>> logging.trace('so did this')
|
|
>>> logging.TRACE
|
|
5
|
|
|
|
"""
|
|
if not methodName:
|
|
methodName = levelName.lower()
|
|
|
|
if hasattr(logging, levelName):
|
|
raise AttributeError('{} already defined in logging module'.format(levelName))
|
|
if hasattr(logging, methodName):
|
|
raise AttributeError('{} already defined in logging module'.format(methodName))
|
|
if hasattr(logging.getLoggerClass(), methodName):
|
|
raise AttributeError('{} already defined in logger class'.format(methodName))
|
|
|
|
# This method was inspired by the answers to Stack Overflow post
|
|
# http://stackoverflow.com/q/2183233/2988730, especially
|
|
# http://stackoverflow.com/a/13638084/2988730
|
|
def logForLevel(self, message, *args, **kwargs):
|
|
if self.isEnabledFor(levelNum):
|
|
self._log(levelNum, message, args, **kwargs)
|
|
|
|
def logToRoot(message, *args, **kwargs):
|
|
logging.log(levelNum, message, *args, **kwargs)
|
|
|
|
logging.addLevelName(levelNum, levelName)
|
|
setattr(logging, levelName, levelNum)
|
|
setattr(logging.getLoggerClass(), methodName, logForLevel)
|
|
setattr(logging, methodName, logToRoot)
|
|
|
|
|
|
def setup_logging(verbosity: int) -> None:
|
|
"""Configures loggers to provided the desired level of verbosity.
|
|
|
|
Verbosity 0: Only log info messages created by `unifi-protect-backup`, and all warnings
|
|
verbosity 1: Only log info & debug messages created by `unifi-protect-backup`, and all warnings
|
|
verbosity 2: Log info & debug messages created by `unifi-protect-backup`, command output, and
|
|
all warnings
|
|
Verbosity 3: Log debug messages created by `unifi-protect-backup`, command output, all info
|
|
messages, and all warnings
|
|
Verbosity 4: Log debug messages created by `unifi-protect-backup` command output, all info
|
|
messages, all warnings, and websocket data
|
|
Verbosity 5: Log websocket data, command output, all debug messages, all info messages and all
|
|
warnings
|
|
|
|
Args:
|
|
verbosity (int): The desired level of verbosity
|
|
|
|
"""
|
|
add_logging_level(
|
|
'EXTRA_DEBUG',
|
|
logging.DEBUG - 1,
|
|
)
|
|
add_logging_level(
|
|
'WEBSOCKET_DATA',
|
|
logging.DEBUG - 2,
|
|
)
|
|
|
|
format = "{asctime} [{levelname}]:{name: <20}:\t{message}"
|
|
date_format = "%Y-%m-%d %H:%M:%S"
|
|
style = '{'
|
|
|
|
if verbosity == 0:
|
|
logging.basicConfig(level=logging.WARN, format=format, style=style, datefmt=date_format)
|
|
logger.setLevel(logging.INFO)
|
|
elif verbosity == 1:
|
|
logging.basicConfig(level=logging.WARN, format=format, style=style, datefmt=date_format)
|
|
logger.setLevel(logging.DEBUG)
|
|
elif verbosity == 2:
|
|
logging.basicConfig(level=logging.WARN, format=format, style=style, datefmt=date_format)
|
|
logger.setLevel(logging.EXTRA_DEBUG) # type: ignore
|
|
elif verbosity == 3:
|
|
logging.basicConfig(level=logging.INFO, format=format, style=style, datefmt=date_format)
|
|
logger.setLevel(logging.EXTRA_DEBUG) # type: ignore
|
|
elif verbosity == 4:
|
|
logging.basicConfig(level=logging.INFO, format=format, style=style, datefmt=date_format)
|
|
logger.setLevel(logging.WEBSOCKET_DATA) # type: ignore
|
|
elif verbosity == 5:
|
|
logging.basicConfig(level=logging.DEBUG, format=format, style=style, datefmt=date_format)
|
|
logger.setLevel(logging.WEBSOCKET_DATA) # type: ignore
|
|
|
|
|
|
def human_readable_size(num):
|
|
"""Turns a number into a human readable number with ISO/IEC 80000 binary prefixes.
|
|
|
|
Based on: https://stackoverflow.com/a/1094933
|
|
|
|
Args:
|
|
num (int): The number to be converted into human readable format
|
|
"""
|
|
for unit in ["B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"]:
|
|
if abs(num) < 1024.0:
|
|
return f"{num:3.1f}{unit}"
|
|
num /= 1024.0
|
|
raise ValueError("`num` too large, ran out of prefixes")
|
|
|
|
|
|
class UnifiProtectBackup:
|
|
"""Backup Unifi protect event clips using rclone.
|
|
|
|
Listens to the Unifi Protect websocket for events. When a completed motion or smart detection
|
|
event is detected, it will download the clip and back it up using rclone
|
|
|
|
Attributes:
|
|
retention (str): How long should event clips be backed up for. Format as per the
|
|
`--max-age` argument of `rclone`
|
|
(https://rclone.org/filtering/#max-age-don-t-transfer-any-file-older-than-this)
|
|
rclone_args (str): Extra args passed directly to `rclone rcat`.
|
|
ignore_cameras (List[str]): List of camera IDs for which to not backup events
|
|
verbose (int): How verbose to setup logging, see :func:`setup_logging` for details.
|
|
_download_queue (asyncio.Queue): Queue of events that need to be backed up
|
|
_unsub (Callable): Unsubscribe from the websocket callback
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
address: str,
|
|
username: str,
|
|
password: str,
|
|
verify_ssl: bool,
|
|
rclone_destination: str,
|
|
retention: str,
|
|
rclone_args: str,
|
|
ignore_cameras: List[str],
|
|
verbose: int,
|
|
port: int = 443,
|
|
):
|
|
"""Will configure logging settings and the Unifi Protect API (but not actually connect).
|
|
|
|
Args:
|
|
address (str): Base address of the Unifi Protect instance
|
|
port (int): Post of the Unifi Protect instance, usually 443
|
|
username (str): Username to log into Unifi Protect instance
|
|
password (str): Password for Unifi Protect user
|
|
verify_ssl (bool): Flag for if SSL certificates should be validated
|
|
rclone_destination (str): `rclone` destination path in the format
|
|
{rclone remote}:{path on remote}. E.g.
|
|
`gdrive:/backups/unifi_protect`
|
|
retention (str): How long should event clips be backed up for. Format as per the
|
|
`--max-age` argument of `rclone`
|
|
(https://rclone.org/filtering/#max-age-don-t-transfer-any-file-older-than-this)
|
|
rclone_args (str): A bandwidth limit which is passed to the `--bwlimit` argument of
|
|
`rclone` (https://rclone.org/docs/#bwlimit-bandwidth-spec)
|
|
ignore_cameras (List[str]): List of camera IDs for which to not backup events
|
|
verbose (int): How verbose to setup logging, see :func:`setup_logging` for details.
|
|
"""
|
|
setup_logging(verbose)
|
|
|
|
logger.debug("Config:")
|
|
logger.debug(f" {address=}")
|
|
logger.debug(f" {port=}")
|
|
logger.debug(f" {username=}")
|
|
if verbose < 5:
|
|
logger.debug(" password=REDACTED")
|
|
else:
|
|
logger.debug(f" {password=}")
|
|
|
|
logger.debug(f" {verify_ssl=}")
|
|
logger.debug(f" {rclone_destination=}")
|
|
logger.debug(f" {retention=}")
|
|
logger.debug(f" {rclone_args=}")
|
|
logger.debug(f" {ignore_cameras=}")
|
|
logger.debug(f" {verbose=}")
|
|
|
|
self.rclone_destination = rclone_destination
|
|
self.retention = retention
|
|
self.rclone_args = rclone_args
|
|
|
|
self.address = address
|
|
self.port = port
|
|
self.username = username
|
|
self.password = password
|
|
self.verify_ssl = verify_ssl
|
|
|
|
self._protect = ProtectApiClient(
|
|
self.address,
|
|
self.port,
|
|
self.username,
|
|
self.password,
|
|
verify_ssl=self.verify_ssl,
|
|
subscribed_models={ModelType.EVENT},
|
|
)
|
|
self.ignore_cameras = ignore_cameras
|
|
self._download_queue: asyncio.Queue = asyncio.Queue()
|
|
self._unsub: Callable[[], None]
|
|
|
|
async def start(self):
|
|
"""Bootstrap the backup process and kick off the main loop.
|
|
|
|
You should run this to start the realtime backup of Unifi Protect clips as they are created
|
|
|
|
"""
|
|
logger.info("Starting...")
|
|
|
|
# Ensure rclone is installed and properly configured
|
|
logger.info("Checking rclone configuration...")
|
|
await self._check_rclone()
|
|
|
|
# Start the pyunifiprotect connection by calling `update`
|
|
logger.info("Connecting to Unifi Protect...")
|
|
await self._protect.update()
|
|
|
|
# Get a mapping of camera ids -> names
|
|
logger.info("Found cameras:")
|
|
for camera in self._protect.bootstrap.cameras.values():
|
|
logger.info(f" - {camera.id}: {camera.name}")
|
|
|
|
# Subscribe to the websocket
|
|
self._unsub = self._protect.subscribe_websocket(self._websocket_callback)
|
|
|
|
# Set up a "purge" task to run at midnight each day to delete old recordings and empty directories
|
|
logger.info("Setting up purge task...")
|
|
|
|
@aiocron.crontab("0 0 * * *")
|
|
async def rclone_purge_old():
|
|
logger.info("Deleting old files...")
|
|
cmd = f"rclone delete -vv --min-age {self.retention} '{self.rclone_destination}'"
|
|
cmd += f" && rclone rmdirs -vv --leave-root '{self.rclone_destination}'"
|
|
proc = await asyncio.create_subprocess_shell(
|
|
cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, stderr = await proc.communicate()
|
|
if proc.returncode == 0:
|
|
logger.extra_debug(f"stdout:\n{stdout.decode()}") # type: ignore
|
|
logger.extra_debug(f"stderr:\n{stderr.decode()}") # type: ignore
|
|
logger.info("Successfully deleted old files")
|
|
else:
|
|
logger.warn("Failed to purge old files")
|
|
logger.warn(f"stdout:\n{stdout.decode()}")
|
|
logger.warn(f"stderr:\n{stderr.decode()}")
|
|
|
|
# We need to catch websocket disconnect and trigger a reconnect.
|
|
@aiocron.crontab("* * * * *")
|
|
async def check_websocket_and_reconnect():
|
|
logger.extra_debug("Checking the status of the websocket...")
|
|
if self._protect.check_ws():
|
|
logger.extra_debug("Websocket is connected.")
|
|
else:
|
|
logger.warn("Lost connection to Unifi Protect.")
|
|
|
|
# Unsubscribe, close the session.
|
|
self._unsub()
|
|
await self._protect.close_session()
|
|
|
|
while True:
|
|
logger.warn("Attempting reconnect...")
|
|
|
|
try:
|
|
# Start again from scratch. In principle if Unifi
|
|
# Protect has not been restarted we should just be able
|
|
# to call self._protect.update() to reconnect to the
|
|
# websocket. However, if the server has been restarted a
|
|
# call to self._protect.check_ws() returns true and some
|
|
# seconds later pyunifiprotect detects the websocket as
|
|
# disconnected again. Therefore, kill it all and try
|
|
# again!
|
|
replacement_protect = ProtectApiClient(
|
|
self.address,
|
|
self.port,
|
|
self.username,
|
|
self.password,
|
|
verify_ssl=self.verify_ssl,
|
|
subscribed_models={ModelType.EVENT},
|
|
)
|
|
# Start the pyunifiprotect connection by calling `update`
|
|
await replacement_protect.update()
|
|
if replacement_protect.check_ws():
|
|
self._protect = replacement_protect
|
|
self._unsub = self._protect.subscribe_websocket(self._websocket_callback)
|
|
break
|
|
else:
|
|
logger.warn("Unable to establish connection to Unifi Protect")
|
|
except Exception as e:
|
|
logger.warn("Unexpected exception occurred while trying to reconnect:")
|
|
logger.exception(e)
|
|
finally:
|
|
# If we get here we need to close the replacement session again
|
|
await replacement_protect.close_session()
|
|
|
|
# Back off for a little while
|
|
await asyncio.sleep(10)
|
|
|
|
logger.info("Re-established connection to Unifi Protect and to the websocket.")
|
|
|
|
# Launches the main loop
|
|
logger.info("Listening for events...")
|
|
await self._backup_events()
|
|
|
|
logger.info("Stopping...")
|
|
|
|
# Unsubscribes from the websocket
|
|
self._unsub()
|
|
|
|
async def _check_rclone(self) -> None:
|
|
"""Check if rclone is installed and the specified remote is configured.
|
|
|
|
Raises:
|
|
RcloneException: If rclone is not installed or it failed to list remotes
|
|
ValueError: The given rclone destination is for a remote that is not configured
|
|
|
|
"""
|
|
rclone = shutil.which('rclone')
|
|
logger.debug(f"rclone found: {rclone}")
|
|
if not rclone:
|
|
raise RuntimeError("`rclone` is not installed on this system")
|
|
|
|
cmd = "rclone listremotes -vv"
|
|
proc = await asyncio.create_subprocess_shell(
|
|
cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, stderr = await proc.communicate()
|
|
logger.extra_debug(f"stdout:\n{stdout.decode()}") # type: ignore
|
|
logger.extra_debug(f"stderr:\n{stderr.decode()}") # type: ignore
|
|
if proc.returncode != 0:
|
|
raise RcloneException(stdout.decode(), stderr.decode(), proc.returncode)
|
|
|
|
# Check if the destination is for a configured remote
|
|
for line in stdout.splitlines():
|
|
if self.rclone_destination.startswith(line.decode()):
|
|
break
|
|
else:
|
|
remote = self.rclone_destination.split(":")[0]
|
|
raise ValueError(f"rclone does not have a remote called `{remote}`")
|
|
|
|
def _websocket_callback(self, msg: WSSubscriptionMessage) -> None:
|
|
"""Callback for "EVENT" websocket messages.
|
|
|
|
Filters the incoming events, and puts completed events onto the download queue
|
|
|
|
Args:
|
|
msg (Event): Incoming event data
|
|
"""
|
|
logger.websocket_data(msg) # type: ignore
|
|
|
|
# We are only interested in updates that end motion/smartdetection event
|
|
assert isinstance(msg.new_obj, Event)
|
|
if msg.action != WSAction.UPDATE:
|
|
return
|
|
if msg.new_obj.camera_id in self.ignore_cameras:
|
|
return
|
|
if msg.new_obj.end is None:
|
|
return
|
|
if msg.new_obj.type not in {EventType.MOTION, EventType.SMART_DETECT}:
|
|
return
|
|
self._download_queue.put_nowait(msg.new_obj)
|
|
logger.debug(f"Adding event {msg.new_obj.id} to queue (Current queue={self._download_queue.qsize()})")
|
|
|
|
async def _backup_events(self) -> None:
|
|
"""Main loop for backing up events.
|
|
|
|
Waits for an event in the queue, then downloads the corresponding clip and uploads it using rclone.
|
|
If errors occur it will simply log the errors and wait for the next event. In a future release,
|
|
retries will be added.
|
|
|
|
"""
|
|
while True:
|
|
try:
|
|
event = await self._download_queue.get()
|
|
|
|
logger.info(f"Backing up event: {event.id}")
|
|
logger.debug(f"Remaining Queue: {self._download_queue.qsize()}")
|
|
logger.debug(f" Camera: {await self._get_camera_name(event.camera_id)}")
|
|
logger.debug(f" Type: {event.type}")
|
|
logger.debug(f" Start: {event.start.strftime('%Y-%m-%dT%H-%M-%S')}")
|
|
logger.debug(f" End: {event.end.strftime('%Y-%m-%dT%H-%M-%S')}")
|
|
logger.debug(f" Duration: {event.end-event.start}")
|
|
|
|
# Download video
|
|
logger.debug(" Downloading video...")
|
|
for x in range(5):
|
|
try:
|
|
video = await self._protect.get_camera_video(event.camera_id, event.start, event.end)
|
|
assert isinstance(video, bytes)
|
|
break
|
|
except (AssertionError, ClientPayloadError, TimeoutError) as e:
|
|
logger.warn(f" Failed download attempt {x+1}, retying in 1s")
|
|
logger.exception(e)
|
|
await asyncio.sleep(1)
|
|
else:
|
|
logger.warn(f"Download failed after 5 attempts, abandoning event {event.id}:")
|
|
continue
|
|
|
|
destination = await self.generate_file_path(event)
|
|
|
|
logger.debug(" Uploading video via rclone...")
|
|
logger.debug(f" To: {destination}")
|
|
logger.debug(f" Size: {human_readable_size(len(video))}")
|
|
for x in range(5):
|
|
try:
|
|
await self._upload_video(video, destination, self.rclone_args)
|
|
break
|
|
except RcloneException as e:
|
|
logger.warn(f" Failed upload attempt {x+1}, retying in 1s")
|
|
logger.exception(e)
|
|
await asyncio.sleep(1)
|
|
else:
|
|
logger.warn(f"Upload failed after 5 attempts, abandoning event {event.id}:")
|
|
continue
|
|
|
|
logger.info("Backed up successfully!")
|
|
|
|
except Exception as e:
|
|
logger.warn(f"Unexpected exception occurred, abandoning event {event.id}:")
|
|
logger.exception(e)
|
|
|
|
async def _upload_video(self, video: bytes, destination: pathlib.Path, rclone_args: str):
|
|
"""Upload video using rclone.
|
|
|
|
In order to avoid writing to disk, the video file data is piped directly
|
|
to the rclone process and uploaded using the `rcat` function of rclone.
|
|
|
|
Args:
|
|
video (bytes): The data to be written to the file
|
|
destination (pathlib.Path): Where rclone should write the file
|
|
rclone_args (str): Optional extra arguments to pass to `rclone`
|
|
|
|
Raises:
|
|
RuntimeError: If rclone returns a non-zero exit code
|
|
"""
|
|
cmd = f"rclone rcat -vv {rclone_args} '{destination}'"
|
|
proc = await asyncio.create_subprocess_shell(
|
|
cmd,
|
|
stdin=asyncio.subprocess.PIPE,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, stderr = await proc.communicate(video)
|
|
if proc.returncode == 0:
|
|
logger.extra_debug(f"stdout:\n{stdout.decode()}") # type: ignore
|
|
logger.extra_debug(f"stderr:\n{stderr.decode()}") # type: ignore
|
|
else:
|
|
raise RcloneException(stdout.decode(), stderr.decode(), proc.returncode)
|
|
|
|
async def generate_file_path(self, event: Event) -> pathlib.Path:
|
|
"""Generates the rclone destination path for the provided event.
|
|
|
|
Generates paths in the following structure:
|
|
::
|
|
rclone_destination
|
|
|- Camera Name
|
|
|- {Date}
|
|
|- {start timestamp} {event type} ({detections}).mp4
|
|
|
|
Args:
|
|
event: The event for which to create an output path
|
|
|
|
Returns:
|
|
pathlib.Path: The rclone path the event should be backed up to
|
|
|
|
"""
|
|
path = pathlib.Path(self.rclone_destination)
|
|
assert isinstance(event.camera_id, str)
|
|
path /= await self._get_camera_name(event.camera_id) # directory per camera
|
|
path /= event.start.strftime("%Y-%m-%d") # Directory per day
|
|
|
|
file_name = f"{event.start.strftime('%Y-%m-%dT%H-%M-%S')} {event.type}"
|
|
|
|
if event.smart_detect_types:
|
|
detections = " ".join(event.smart_detect_types)
|
|
file_name += f" ({detections})"
|
|
file_name += ".mp4"
|
|
|
|
path /= file_name
|
|
|
|
return path
|
|
|
|
async def _get_camera_name(self, id: str):
|
|
try:
|
|
return self._protect.bootstrap.cameras[id].name
|
|
except KeyError:
|
|
# Refresh cameras
|
|
logger.debug(f"Unknown camera id: '{id}', checking API")
|
|
|
|
try:
|
|
await self._protect.update(force=True)
|
|
except NvrError:
|
|
logger.debug(f"Unknown camera id: '{id}'")
|
|
raise
|
|
|
|
name = self._protect.bootstrap.cameras[id].name
|
|
logger.debug(f"Found camera - {id}: {name}")
|
|
return name
|