mirror of
https://github.com/ep1cman/unifi-protect-backup.git
synced 2025-12-19 09:43:18 +00:00
Also refactored code that checks if an event should be backed up into one common shared function.
493 lines
17 KiB
Python
493 lines
17 KiB
Python
"""Utility functions used throughout the code, kept here to allow re use and/or minimize clutter elsewhere."""
|
|
|
|
import asyncio
|
|
import logging
|
|
import re
|
|
from datetime import datetime
|
|
from typing import Optional, Set
|
|
|
|
from apprise import NotifyType
|
|
from async_lru import alru_cache
|
|
from uiprotect import ProtectApiClient
|
|
from uiprotect.data.nvr import Event
|
|
from uiprotect.data.types import EventType, SmartDetectObjectType, SmartDetectAudioType
|
|
|
|
from unifi_protect_backup import notifications
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def add_logging_level(levelName: str, levelNum: int, methodName: Optional[str] = None) -> None:
|
|
"""Comprehensively adds a new logging level to the `logging` module and the currently configured logging class.
|
|
|
|
`levelName` becomes an attribute of the `logging` module with the value
|
|
`levelNum`. `methodName` becomes a convenience method for both `logging`
|
|
itself and the class returned by `logging.getLoggerClass()` (usually just
|
|
`logging.Logger`).
|
|
|
|
To avoid accidental clobbering of existing attributes, this method will
|
|
raise an `AttributeError` if the level name is already an attribute of the
|
|
`logging` module or if the method name is already present
|
|
|
|
Credit: https://stackoverflow.com/a/35804945
|
|
|
|
Args:
|
|
levelName (str): The name of the new logging level (in all caps).
|
|
levelNum (int): The priority value of the logging level, lower=more verbose.
|
|
methodName (str): The name of the method used to log using this.
|
|
If `methodName` is not specified, `levelName.lower()` is used.
|
|
|
|
Example:
|
|
::
|
|
>>> add_logging_level('TRACE', logging.DEBUG - 5)
|
|
>>> logging.getLogger(__name__).setLevel("TRACE")
|
|
>>> logging.getLogger(__name__).trace('that worked')
|
|
>>> logging.trace('so did this')
|
|
>>> logging.TRACE
|
|
5
|
|
|
|
"""
|
|
if not methodName:
|
|
methodName = levelName.lower()
|
|
|
|
if hasattr(logging, levelName):
|
|
raise AttributeError("{} already defined in logging module".format(levelName))
|
|
if hasattr(logging, methodName):
|
|
raise AttributeError("{} already defined in logging module".format(methodName))
|
|
if hasattr(logging.getLoggerClass(), methodName):
|
|
raise AttributeError("{} already defined in logger class".format(methodName))
|
|
|
|
# This method was inspired by the answers to Stack Overflow post
|
|
# http://stackoverflow.com/q/2183233/2988730, especially
|
|
# http://stackoverflow.com/a/13638084/2988730
|
|
def logForLevel(self, message, *args, **kwargs):
|
|
if self.isEnabledFor(levelNum):
|
|
self._log(levelNum, message, args, **kwargs)
|
|
|
|
def logToRoot(message, *args, **kwargs):
|
|
logging.log(levelNum, message, *args, **kwargs)
|
|
|
|
def adapterLog(self, msg, *args, **kwargs):
|
|
"""Delegate an error call to the underlying logger."""
|
|
self.log(levelNum, msg, *args, **kwargs)
|
|
|
|
logging.addLevelName(levelNum, levelName)
|
|
setattr(logging, levelName, levelNum)
|
|
setattr(logging.getLoggerClass(), methodName, logForLevel)
|
|
setattr(logging, methodName, logToRoot)
|
|
setattr(logging.LoggerAdapter, methodName, adapterLog)
|
|
|
|
|
|
color_logging = False
|
|
|
|
|
|
def add_color_to_record_levelname(record):
|
|
"""Colorizes logging level names."""
|
|
levelno = record.levelno
|
|
if levelno >= logging.CRITICAL:
|
|
color = "\x1b[31;1m" # RED
|
|
elif levelno >= logging.ERROR:
|
|
color = "\x1b[31;1m" # RED
|
|
elif levelno >= logging.WARNING:
|
|
color = "\x1b[33;1m" # YELLOW
|
|
elif levelno >= logging.INFO:
|
|
color = "\x1b[32;1m" # GREEN
|
|
elif levelno >= logging.DEBUG:
|
|
color = "\x1b[36;1m" # CYAN
|
|
elif levelno >= logging.EXTRA_DEBUG:
|
|
color = "\x1b[35;1m" # MAGENTA
|
|
else:
|
|
color = "\x1b[0m"
|
|
|
|
return f"{color}{record.levelname}\x1b[0m"
|
|
|
|
|
|
class AppriseStreamHandler(logging.StreamHandler):
|
|
"""Logging handler that also sends logging output to configured Apprise notifiers."""
|
|
|
|
def __init__(self, color_logging: bool, *args, **kwargs):
|
|
"""Init.
|
|
|
|
Args:
|
|
color_logging (bool): If true logging levels will be colorized
|
|
*args (): Positional arguments to pass to StreamHandler
|
|
**kwargs: Keyword arguments to pass to StreamHandler
|
|
|
|
"""
|
|
super().__init__(*args, **kwargs)
|
|
self.color_logging = color_logging
|
|
|
|
def _emit_apprise(self, record):
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
except RuntimeError:
|
|
return # There is no running loop
|
|
|
|
msg = self.format(record)
|
|
logging_map = {
|
|
logging.ERROR: NotifyType.FAILURE,
|
|
logging.WARNING: NotifyType.WARNING,
|
|
logging.INFO: NotifyType.INFO,
|
|
logging.DEBUG: NotifyType.INFO,
|
|
logging.EXTRA_DEBUG: NotifyType.INFO,
|
|
logging.WEBSOCKET_DATA: NotifyType.INFO,
|
|
}
|
|
|
|
# Only try notifying if there are notification servers configured
|
|
# and the asyncio loop isn't closed (aka we are quitting)
|
|
if notifications.notifier.servers and not loop.is_closed():
|
|
notify = notifications.notifier.async_notify(
|
|
body=msg,
|
|
title=record.levelname,
|
|
notify_type=logging_map[record.levelno],
|
|
tag=[record.levelname],
|
|
)
|
|
if loop.is_running():
|
|
asyncio.create_task(notify)
|
|
else:
|
|
loop.run_until_complete(notify)
|
|
|
|
def _emit_stream(self, record):
|
|
record.levelname = f"{record.levelname:^11s}" # Pad level name to max width
|
|
if self.color_logging:
|
|
record.levelname = add_color_to_record_levelname(record)
|
|
|
|
msg = self.format(record)
|
|
stream = self.stream
|
|
# issue 35046: merged two stream.writes into one.
|
|
stream.write(msg + self.terminator)
|
|
self.flush()
|
|
|
|
def emit(self, record):
|
|
"""Emit log to stdout and apprise."""
|
|
try:
|
|
self._emit_apprise(record)
|
|
except RecursionError: # See issue 36272
|
|
raise
|
|
except Exception:
|
|
self.handleError(record)
|
|
|
|
try:
|
|
self._emit_stream(record)
|
|
except RecursionError: # See issue 36272
|
|
raise
|
|
except Exception:
|
|
self.handleError(record)
|
|
|
|
|
|
def create_logging_handler(format, color_logging):
|
|
"""Construct apprise logging handler for the given format."""
|
|
date_format = "%Y-%m-%d %H:%M:%S"
|
|
style = "{"
|
|
|
|
sh = AppriseStreamHandler(color_logging)
|
|
formatter = logging.Formatter(format, date_format, style)
|
|
sh.setFormatter(formatter)
|
|
return sh
|
|
|
|
|
|
def setup_logging(verbosity: int, color_logging: bool = False) -> None:
|
|
"""Configure loggers to provided the desired level of verbosity.
|
|
|
|
Verbosity 0: Only log info messages created by `unifi-protect-backup`, and all warnings
|
|
verbosity 1: Only log info & debug messages created by `unifi-protect-backup`, and all warnings
|
|
verbosity 2: Log info & debug messages created by `unifi-protect-backup`, command output, and
|
|
all warnings
|
|
Verbosity 3: Log debug messages created by `unifi-protect-backup`, command output, all info
|
|
messages, and all warnings
|
|
Verbosity 4: Log debug messages created by `unifi-protect-backup` command output, all info
|
|
messages, all warnings, and websocket data
|
|
Verbosity 5: Log websocket data, command output, all debug messages, all info messages and all
|
|
warnings
|
|
|
|
Args:
|
|
verbosity (int): The desired level of verbosity
|
|
color_logging (bool): If colors should be used in the log (default=False)
|
|
|
|
"""
|
|
add_logging_level(
|
|
"EXTRA_DEBUG",
|
|
logging.DEBUG - 1,
|
|
)
|
|
add_logging_level(
|
|
"WEBSOCKET_DATA",
|
|
logging.DEBUG - 2,
|
|
)
|
|
|
|
format = "{asctime} [{levelname:^11s}] {name:<42} : {message}"
|
|
sh = create_logging_handler(format, color_logging)
|
|
|
|
logger = logging.getLogger("unifi_protect_backup")
|
|
logger.addHandler(sh)
|
|
logger.propagate = False
|
|
|
|
if verbosity == 0:
|
|
logging.basicConfig(level=logging.WARN, handlers=[sh])
|
|
logger.setLevel(logging.INFO)
|
|
elif verbosity == 1:
|
|
logging.basicConfig(level=logging.WARN, handlers=[sh])
|
|
logger.setLevel(logging.DEBUG)
|
|
elif verbosity == 2:
|
|
logging.basicConfig(level=logging.WARN, handlers=[sh])
|
|
logger.setLevel(logging.EXTRA_DEBUG) # type: ignore
|
|
elif verbosity == 3:
|
|
logging.basicConfig(level=logging.INFO, handlers=[sh])
|
|
logger.setLevel(logging.EXTRA_DEBUG) # type: ignore
|
|
elif verbosity == 4:
|
|
logging.basicConfig(level=logging.INFO, handlers=[sh])
|
|
logger.setLevel(logging.WEBSOCKET_DATA) # type: ignore
|
|
elif verbosity >= 5:
|
|
logging.basicConfig(level=logging.DEBUG, handlers=[sh])
|
|
logger.setLevel(logging.WEBSOCKET_DATA) # type: ignore
|
|
|
|
|
|
_initialized_loggers = []
|
|
|
|
|
|
def setup_event_logger(logger, color_logging):
|
|
"""Set up a logger that also displays the event ID currently being processed."""
|
|
global _initialized_loggers
|
|
if logger not in _initialized_loggers:
|
|
format = "{asctime} [{levelname:^11s}] {name:<42} :{event} {message}"
|
|
sh = create_logging_handler(format, color_logging)
|
|
logger.addHandler(sh)
|
|
logger.propagate = False
|
|
_initialized_loggers.append(logger)
|
|
|
|
|
|
_suffixes = ["B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"]
|
|
|
|
|
|
def human_readable_size(num: float):
|
|
"""Turn a number into a human readable number with ISO/IEC 80000 binary prefixes.
|
|
|
|
Based on: https://stackoverflow.com/a/1094933
|
|
|
|
Args:
|
|
num (int): The number to be converted into human readable format
|
|
|
|
"""
|
|
for unit in _suffixes:
|
|
if abs(num) < 1024.0:
|
|
return f"{num:3.1f}{unit}"
|
|
num /= 1024.0
|
|
raise ValueError("`num` too large, ran out of prefixes")
|
|
|
|
|
|
def human_readable_to_float(num: str):
|
|
"""Turn a human readable ISO/IEC 80000 suffix value to its full float value."""
|
|
pattern = r"([\d.]+)(" + "|".join(_suffixes) + ")"
|
|
result = re.match(pattern, num)
|
|
if result is None:
|
|
raise ValueError(f"Value '{num}' is not a valid ISO/IEC 80000 binary value")
|
|
|
|
value = float(result[1])
|
|
suffix = result[2]
|
|
multiplier = 1024 ** _suffixes.index(suffix)
|
|
return value * multiplier
|
|
|
|
|
|
# Cached so that actions like uploads can continue when the connection to the api is lost
|
|
# No max size, and a 6 hour ttl
|
|
@alru_cache(None, ttl=60 * 60 * 6)
|
|
async def get_camera_name(protect: ProtectApiClient, id: str):
|
|
"""Return the name for the camera with the given ID.
|
|
|
|
If the camera ID is not know, it tries refreshing the cached data
|
|
"""
|
|
# Wait for unifi protect to be connected
|
|
await protect.connect_event.wait() # type: ignore
|
|
|
|
try:
|
|
return protect.bootstrap.cameras[id].name
|
|
except KeyError:
|
|
# Refresh cameras
|
|
logger.debug(f"Unknown camera id: '{id}', checking API")
|
|
|
|
await protect.update()
|
|
|
|
try:
|
|
name = protect.bootstrap.cameras[id].name
|
|
except KeyError:
|
|
logger.debug(f"Unknown camera id: '{id}'")
|
|
raise
|
|
|
|
logger.debug(f"Found camera - {id}: {name}")
|
|
return name
|
|
|
|
|
|
class SubprocessException(Exception):
|
|
"""Class to capture: stdout, stderr, and return code of Subprocess errors."""
|
|
|
|
def __init__(self, stdout, stderr, returncode):
|
|
"""Exception class for when rclone does not exit with `0`.
|
|
|
|
Args:
|
|
stdout (str): What rclone output to stdout
|
|
stderr (str): What rclone output to stderr
|
|
returncode (str): The return code of the rclone process
|
|
|
|
"""
|
|
super().__init__()
|
|
self.stdout: str = stdout
|
|
self.stderr: str = stderr
|
|
self.returncode: int = returncode
|
|
|
|
def __str__(self):
|
|
"""Turn exception into a human readable form."""
|
|
return f"Return Code: {self.returncode}\nStdout:\n{self.stdout}\nStderr:\n{self.stderr}"
|
|
|
|
|
|
async def run_command(cmd: str, data=None):
|
|
"""Run the given command returning the exit code, stdout and stderr."""
|
|
proc = await asyncio.create_subprocess_shell(
|
|
cmd,
|
|
stdin=asyncio.subprocess.PIPE,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, stderr = await proc.communicate(data)
|
|
stdout = stdout.decode()
|
|
stdout_indented = "\t" + stdout.replace("\n", "\n\t").strip()
|
|
stderr = stderr.decode()
|
|
stderr_indented = "\t" + stderr.replace("\n", "\n\t").strip()
|
|
|
|
if proc.returncode != 0:
|
|
logger.error(f"Failed to run: '{cmd}")
|
|
logger.error(f"stdout:\n{stdout_indented}")
|
|
logger.error(f"stderr:\n{stderr_indented}")
|
|
else:
|
|
logger.extra_debug(f"stdout:\n{stdout_indented}") # type: ignore
|
|
logger.extra_debug(f"stderr:\n{stderr_indented}") # type: ignore
|
|
|
|
return proc.returncode, stdout, stderr
|
|
|
|
|
|
class VideoQueue(asyncio.Queue):
|
|
"""A queue that limits the number of bytes it can store rather than discrete entries."""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
"""Init."""
|
|
super().__init__(*args, **kwargs)
|
|
self._bytes_sum = 0
|
|
|
|
def qsize(self):
|
|
"""Get number of items in the queue."""
|
|
return self._bytes_sum
|
|
|
|
def qsize_files(self):
|
|
"""Get number of items in the queue."""
|
|
return super().qsize()
|
|
|
|
def _get(self):
|
|
data = self._queue.popleft()
|
|
self._bytes_sum -= len(data[1])
|
|
return data
|
|
|
|
def _put(self, item: tuple[Event, bytes]):
|
|
self._queue.append(item) # type: ignore
|
|
self._bytes_sum += len(item[1])
|
|
|
|
def full(self, item: tuple[Event, bytes] | None = None):
|
|
"""Return True if there are maxsize bytes in the queue.
|
|
|
|
optionally if `item` is provided, it will return False if there is enough space to
|
|
fit it, otherwise it will return True
|
|
|
|
Note: if the Queue was initialized with maxsize=0 (the default),
|
|
then full() is never True.
|
|
"""
|
|
if self._maxsize <= 0: # type: ignore
|
|
return False
|
|
else:
|
|
if item is None:
|
|
return self.qsize() >= self._maxsize # type: ignore
|
|
else:
|
|
return self.qsize() + len(item[1]) >= self._maxsize # type: ignore
|
|
|
|
async def put(self, item: tuple[Event, bytes]):
|
|
"""Put an item into the queue.
|
|
|
|
Put an item into the queue. If the queue is full, wait until a free
|
|
slot is available before adding item.
|
|
"""
|
|
if len(item[1]) > self._maxsize: # type: ignore
|
|
raise ValueError(
|
|
f"Item is larger ({human_readable_size(len(item[1]))}) "
|
|
f"than the size of the buffer ({human_readable_size(self._maxsize)})" # type: ignore
|
|
)
|
|
|
|
while self.full(item):
|
|
putter = self._get_loop().create_future() # type: ignore
|
|
self._putters.append(putter) # type: ignore
|
|
try:
|
|
await putter
|
|
except: # noqa: E722
|
|
putter.cancel() # Just in case putter is not done yet.
|
|
try:
|
|
# Clean self._putters from canceled putters.
|
|
self._putters.remove(putter) # type: ignore
|
|
except ValueError:
|
|
# The putter could be removed from self._putters by a
|
|
# previous get_nowait call.
|
|
pass
|
|
if not self.full(item) and not putter.cancelled():
|
|
# We were woken up by get_nowait(), but can't take
|
|
# the call. Wake up the next in line.
|
|
self._wakeup_next(self._putters) # type: ignore
|
|
raise
|
|
return self.put_nowait(item)
|
|
|
|
def put_nowait(self, item: tuple[Event, bytes]):
|
|
"""Put an item into the queue without blocking.
|
|
|
|
If no free slot is immediately available, raise QueueFull.
|
|
"""
|
|
if self.full(item):
|
|
raise asyncio.QueueFull
|
|
self._put(item)
|
|
self._unfinished_tasks += 1 # type: ignore
|
|
self._finished.clear() # type: ignore
|
|
self._wakeup_next(self._getters) # type: ignore
|
|
|
|
|
|
async def wait_until(dt):
|
|
"""Sleep until the specified datetime."""
|
|
now = datetime.now()
|
|
await asyncio.sleep((dt - now).total_seconds())
|
|
|
|
|
|
EVENT_TYPES_MAP = {
|
|
EventType.MOTION: {"motion"},
|
|
EventType.RING: {"ring"},
|
|
EventType.SMART_DETECT_LINE: {"line"},
|
|
EventType.FINGERPRINT_IDENTIFIED: {"fingerprint"},
|
|
EventType.NFC_CARD_SCANNED: {"nfc"},
|
|
EventType.SMART_DETECT: {t for t in SmartDetectObjectType.values() if t not in SmartDetectAudioType.values()},
|
|
EventType.SMART_AUDIO_DETECT: {f"{t}" for t in SmartDetectAudioType.values()},
|
|
}
|
|
|
|
|
|
def wanted_event_type(event, wanted_detection_types: Set[str], cameras: Set[str], ignore_cameras: Set[str]):
|
|
"""Return True if this event is one we want."""
|
|
if event.start is None or event.end is None:
|
|
return False # This event is still on-going
|
|
|
|
if event.camera_id in ignore_cameras:
|
|
return False
|
|
|
|
if cameras and event.camera_id not in cameras:
|
|
return False
|
|
|
|
if event.type not in EVENT_TYPES_MAP:
|
|
return False
|
|
|
|
if event.type in [EventType.SMART_DETECT, EventType.SMART_AUDIO_DETECT]:
|
|
detection_types = set(event.smart_detect_types)
|
|
else:
|
|
detection_types = EVENT_TYPES_MAP[event.type]
|
|
if not detection_types & wanted_detection_types: # No intersection
|
|
return False
|
|
|
|
return True
|