ruff format

This commit is contained in:
Sebastian Goscik
2024-08-09 23:43:03 +01:00
parent f9d74c27f9
commit bbd70f49bf
10 changed files with 90 additions and 91 deletions

View File

@@ -1,8 +1,8 @@
"""Top-level package for Unifi Protect Backup."""
__author__ = """sebastian.goscik"""
__email__ = 'sebastian@goscik.com'
__version__ = '0.11.0'
__email__ = "sebastian@goscik.com"
__version__ = "0.11.0"
from .downloader import VideoDownloader
from .downloader_experimental import VideoDownloaderExperimental

View File

@@ -15,7 +15,7 @@ DETECTION_TYPES = ["motion", "person", "vehicle", "ring"]
def _parse_detection_types(ctx, param, value):
# split columns by ',' and remove whitespace
types = [t.strip() for t in value.split(',')]
types = [t.strip() for t in value.split(",")]
# validate passed columns
for t in types:
@@ -30,7 +30,7 @@ def parse_rclone_retention(ctx, param, retention) -> relativedelta:
matches = {k: int(v) for v, k in re.findall(r"([\d]+)(ms|s|m|h|d|w|M|y)", retention)}
# Check that we matched the whole string
if len(retention) != len(''.join([f"{v}{k}" for k, v in matches.items()])):
if len(retention) != len("".join([f"{v}{k}" for k, v in matches.items()])):
raise click.BadParameter("See here for expected format: https://rclone.org/docs/#time-option")
return relativedelta(
@@ -47,60 +47,60 @@ def parse_rclone_retention(ctx, param, retention) -> relativedelta:
@click.command(context_settings=dict(max_content_width=100))
@click.version_option(__version__)
@click.option('--address', required=True, envvar='UFP_ADDRESS', help='Address of Unifi Protect instance')
@click.option('--port', default=443, envvar='UFP_PORT', show_default=True, help='Port of Unifi Protect instance')
@click.option('--username', required=True, envvar='UFP_USERNAME', help='Username to login to Unifi Protect instance')
@click.option('--password', required=True, envvar='UFP_PASSWORD', help='Password for Unifi Protect user')
@click.option("--address", required=True, envvar="UFP_ADDRESS", help="Address of Unifi Protect instance")
@click.option("--port", default=443, envvar="UFP_PORT", show_default=True, help="Port of Unifi Protect instance")
@click.option("--username", required=True, envvar="UFP_USERNAME", help="Username to login to Unifi Protect instance")
@click.option("--password", required=True, envvar="UFP_PASSWORD", help="Password for Unifi Protect user")
@click.option(
'--verify-ssl/--no-verify-ssl',
"--verify-ssl/--no-verify-ssl",
default=True,
show_default=True,
envvar='UFP_SSL_VERIFY',
envvar="UFP_SSL_VERIFY",
help="Set if you do not have a valid HTTPS Certificate for your instance",
)
@click.option(
'--rclone-destination',
"--rclone-destination",
required=True,
envvar='RCLONE_DESTINATION',
envvar="RCLONE_DESTINATION",
help="`rclone` destination path in the format {rclone remote}:{path on remote}."
" E.g. `gdrive:/backups/unifi_protect`",
)
@click.option(
'--retention',
default='7d',
"--retention",
default="7d",
show_default=True,
envvar='RCLONE_RETENTION',
envvar="RCLONE_RETENTION",
help="How long should event clips be backed up for. Format as per the `--max-age` argument of `rclone` "
"(https://rclone.org/filtering/#max-age-don-t-transfer-any-file-older-than-this)",
callback=parse_rclone_retention,
)
@click.option(
'--rclone-args',
default='',
envvar='RCLONE_ARGS',
"--rclone-args",
default="",
envvar="RCLONE_ARGS",
help="Optional extra arguments to pass to `rclone rcat` directly. Common usage for this would "
"be to set a bandwidth limit, for example.",
)
@click.option(
'--rclone-purge-args',
default='',
envvar='RCLONE_PURGE_ARGS',
"--rclone-purge-args",
default="",
envvar="RCLONE_PURGE_ARGS",
help="Optional extra arguments to pass to `rclone delete` directly. Common usage for this would "
"be to execute a permanent delete instead of using the recycle bin on a destination. "
"Google Drive example: `--drive-use-trash=false`",
)
@click.option(
'--detection-types',
envvar='DETECTION_TYPES',
default=','.join(DETECTION_TYPES),
"--detection-types",
envvar="DETECTION_TYPES",
default=",".join(DETECTION_TYPES),
show_default=True,
help="A comma separated list of which types of detections to backup. "
f"Valid options are: {', '.join([f'`{t}`' for t in DETECTION_TYPES])}",
callback=_parse_detection_types,
)
@click.option(
'--ignore-camera',
'ignore_cameras',
"--ignore-camera",
"ignore_cameras",
multiple=True,
envvar="IGNORE_CAMERAS",
help="IDs of cameras for which events should not be backed up. Use multiple times to ignore "
@@ -109,16 +109,16 @@ def parse_rclone_retention(ctx, param, retention) -> relativedelta:
"that you wish to backup.",
)
@click.option(
'--file-structure-format',
envvar='FILE_STRUCTURE_FORMAT',
"--file-structure-format",
envvar="FILE_STRUCTURE_FORMAT",
default="{camera_name}/{event.start:%Y-%m-%d}/{event.end:%Y-%m-%dT%H-%M-%S} {detection_type}.mp4",
show_default=True,
help="A Python format string used to generate the file structure/name on the rclone remote."
"For details of the fields available, see the projects `README.md` file.",
)
@click.option(
'-v',
'--verbose',
"-v",
"--verbose",
count=True,
help="How verbose the logging output should be."
"""
@@ -138,38 +138,38 @@ all warnings, and websocket data
""",
)
@click.option(
'--sqlite_path',
default='events.sqlite',
envvar='SQLITE_PATH',
"--sqlite_path",
default="events.sqlite",
envvar="SQLITE_PATH",
help="Path to the SQLite database to use/create",
)
@click.option(
'--color-logging/--plain-logging',
"--color-logging/--plain-logging",
default=False,
show_default=True,
envvar='COLOR_LOGGING',
envvar="COLOR_LOGGING",
help="Set if you want to use color in logging output",
)
@click.option(
'--download-buffer-size',
default='512MiB',
"--download-buffer-size",
default="512MiB",
show_default=True,
envvar='DOWNLOAD_BUFFER_SIZE',
envvar="DOWNLOAD_BUFFER_SIZE",
help='How big the download buffer should be (you can use suffixes like "B", "KiB", "MiB", "GiB")',
callback=lambda ctx, param, value: human_readable_to_float(value),
)
@click.option(
'--purge_interval',
default='1d',
"--purge_interval",
default="1d",
show_default=True,
envvar='PURGE_INTERVAL',
envvar="PURGE_INTERVAL",
help="How frequently to check for file to purge.\n\nNOTE: Can create a lot of API calls, so be careful if "
"your cloud provider charges you per api call",
callback=parse_rclone_retention,
)
@click.option(
'--apprise-notifier',
'apprise_notifiers',
"--apprise-notifier",
"apprise_notifiers",
multiple=True,
envvar="APPRISE_NOTIFIERS",
help="""\b
@@ -187,39 +187,39 @@ If no tags are specified, it defaults to ERROR
More details about supported platforms can be found here: https://github.com/caronc/apprise""",
)
@click.option(
'--skip-missing',
"--skip-missing",
default=False,
show_default=True,
is_flag=True,
envvar='SKIP_MISSING',
envvar="SKIP_MISSING",
help="""\b
If set, events which are 'missing' at the start will be ignored.
Subsequent missing events will be downloaded (e.g. a missed event)
""",
)
@click.option(
'--download-rate-limit',
"--download-rate-limit",
default=None,
show_default=True,
envvar='DOWNLOAD_RATELIMIT',
envvar="DOWNLOAD_RATELIMIT",
type=float,
help="Limit how events can be downloaded in one minute. Disabled by default",
)
@click.option(
'--max-event-length',
"--max-event-length",
default=2 * 60 * 60,
show_default=True,
envvar='MAX_EVENT_LENGTH',
envvar="MAX_EVENT_LENGTH",
type=int,
help="Only download events shorter than this maximum length, in seconds",
)
@click.option(
'--experimental-downloader',
'use_experimental_downloader',
"--experimental-downloader",
"use_experimental_downloader",
default=False,
show_default=True,
is_flag=True,
envvar='EXPERIMENTAL_DOWNLOADER',
envvar="EXPERIMENTAL_DOWNLOADER",
help="""\b
If set, a new experimental download mechanism will be used to match
what the web UI does. This might be more stable if you are experiencing

View File

@@ -29,14 +29,14 @@ from unifi_protect_backup.utils import (
async def get_video_length(video: bytes) -> float:
"""Uses ffprobe to get the length of the video file passed in as a byte stream."""
returncode, stdout, stderr = await run_command(
'ffprobe -v quiet -show_streams -select_streams v:0 -of json -', video
"ffprobe -v quiet -show_streams -select_streams v:0 -of json -", video
)
if returncode != 0:
raise SubprocessException(stdout, stderr, returncode)
json_data = json.loads(stdout)
return float(json_data['streams'][0]['duration'])
return float(json_data["streams"][0]["duration"])
class VideoDownloader:
@@ -75,10 +75,10 @@ class VideoDownloader:
self.base_logger = logging.getLogger(__name__)
setup_event_logger(self.base_logger, color_logging)
self.logger = logging.LoggerAdapter(self.base_logger, {'event': ''})
self.logger = logging.LoggerAdapter(self.base_logger, {"event": ""})
# Check if `ffprobe` is available
ffprobe = shutil.which('ffprobe')
ffprobe = shutil.which("ffprobe")
if ffprobe is not None:
self.logger.debug(f"ffprobe found: {ffprobe}")
self._has_ffprobe = True
@@ -100,7 +100,7 @@ class VideoDownloader:
event = await self.download_queue.get()
self.current_event = event
self.logger = logging.LoggerAdapter(self.base_logger, {'event': f' [{event.id}]'})
self.logger = logging.LoggerAdapter(self.base_logger, {"event": f" [{event.id}]"})
# Fix timezones since uiprotect sets all timestamps to UTC. Instead localize them to
# the timezone of the unifi protect NVR.

View File

@@ -29,14 +29,14 @@ from unifi_protect_backup.utils import (
async def get_video_length(video: bytes) -> float:
"""Uses ffprobe to get the length of the video file passed in as a byte stream."""
returncode, stdout, stderr = await run_command(
'ffprobe -v quiet -show_streams -select_streams v:0 -of json -', video
"ffprobe -v quiet -show_streams -select_streams v:0 -of json -", video
)
if returncode != 0:
raise SubprocessException(stdout, stderr, returncode)
json_data = json.loads(stdout)
return float(json_data['streams'][0]['duration'])
return float(json_data["streams"][0]["duration"])
class VideoDownloaderExperimental:
@@ -75,10 +75,10 @@ class VideoDownloaderExperimental:
self.base_logger = logging.getLogger(__name__)
setup_event_logger(self.base_logger, color_logging)
self.logger = logging.LoggerAdapter(self.base_logger, {'event': ''})
self.logger = logging.LoggerAdapter(self.base_logger, {"event": ""})
# Check if `ffprobe` is available
ffprobe = shutil.which('ffprobe')
ffprobe = shutil.which("ffprobe")
if ffprobe is not None:
self.logger.debug(f"ffprobe found: {ffprobe}")
self._has_ffprobe = True
@@ -100,7 +100,7 @@ class VideoDownloaderExperimental:
event = await self.download_queue.get()
self.current_event = event
self.logger = logging.LoggerAdapter(self.base_logger, {'event': f' [{event.id}]'})
self.logger = logging.LoggerAdapter(self.base_logger, {"event": f" [{event.id}]"})
# Fix timezones since uiprotect sets all timestamps to UTC. Instead localize them to
# the timezone of the unifi protect NVR.
@@ -182,7 +182,7 @@ class VideoDownloaderExperimental:
assert isinstance(event.end, datetime)
try:
prepared_video_file = await self._protect.prepare_camera_video(event.camera_id, event.start, event.end)
video = await self._protect.download_camera_video(event.camera_id, prepared_video_file['fileName'])
video = await self._protect.download_camera_video(event.camera_id, prepared_video_file["fileName"])
assert isinstance(video, bytes)
break
except (AssertionError, ClientPayloadError, TimeoutError) as e:

View File

@@ -61,7 +61,7 @@ class EventListener:
return
if msg.new_obj.camera_id in self.ignore_cameras:
return
if 'end' not in msg.changed_data:
if "end" not in msg.changed_data:
return
if msg.new_obj.type not in [EventType.MOTION, EventType.SMART_DETECT, EventType.RING]:
return
@@ -89,8 +89,8 @@ class EventListener:
# Unifi protect has started sending the event id in the websocket as a {event_id}-{camera_id} but when the
# API is queried they only have {event_id}. Keeping track of these both of these would be complicated so
# instead we fudge the ID here to match what the API returns
if '-' in msg.new_obj.id:
msg.new_obj.id = msg.new_obj.id.split('-')[0]
if "-" in msg.new_obj.id:
msg.new_obj.id = msg.new_obj.id.split("-")[0]
logger.debug(f"Adding event {msg.new_obj.id} to queue (Current download queue={self._event_queue.qsize()})")

View File

@@ -8,11 +8,11 @@ notifier = apprise.Apprise()
def add_notification_service(url):
"""Add apprise URI with support for tags e.g. TAG1,TAG2=PROTOCOL://settings."""
config = apprise.AppriseConfig()
config.add_config(url, format='text')
config.add_config(url, format="text")
# If not tags are specified, default to errors otherwise ALL logging will
# be spammed to the notification service
if not config.servers()[0].tags:
config.servers()[0].tags = {'ERROR'}
config.servers()[0].tags = {"ERROR"}
notifier.add(config)

View File

@@ -64,7 +64,6 @@ class Purge:
f"SELECT * FROM events WHERE end < {retention_oldest_time}"
) as event_cursor:
async for event_id, event_type, camera_id, event_start, event_end in event_cursor:
logger.info(f"Purging event: {event_id}.")
# For every backup for this event
@@ -86,5 +85,5 @@ class Purge:
logger.error("Unexpected exception occurred during purge:", exc_info=e)
next_purge_time = datetime.now() + self.interval
logger.extra_debug(f'sleeping until {next_purge_time}')
logger.extra_debug(f"sleeping until {next_purge_time}")
await wait_until(next_purge_time)

View File

@@ -212,8 +212,8 @@ class UnifiProtectBackup:
logger.info(f" - {camera.id}: {camera.name}")
# Print timezone info for debugging
logger.debug(f'NVR TZ: {self._protect.bootstrap.nvr.timezone}')
logger.debug(f'Local TZ: {datetime.now(timezone.utc).astimezone().tzinfo}')
logger.debug(f"NVR TZ: {self._protect.bootstrap.nvr.timezone}")
logger.debug(f"Local TZ: {datetime.now(timezone.utc).astimezone().tzinfo}")
tasks = []
@@ -313,7 +313,7 @@ class UnifiProtectBackup:
ValueError: The given rclone destination is for a remote that is not configured
"""
rclone = shutil.which('rclone')
rclone = shutil.which("rclone")
if not rclone:
raise RuntimeError("`rclone` is not installed on this system")
logger.debug(f"rclone found: {rclone}")

View File

@@ -56,7 +56,7 @@ class VideoUploader:
self.base_logger = logging.getLogger(__name__)
setup_event_logger(self.base_logger, color_logging)
self.logger = logging.LoggerAdapter(self.base_logger, {'event': ''})
self.logger = logging.LoggerAdapter(self.base_logger, {"event": ""})
async def start(self):
"""Main loop.
@@ -70,7 +70,7 @@ class VideoUploader:
event, video = await self.upload_queue.get()
self.current_event = event
self.logger = logging.LoggerAdapter(self.base_logger, {'event': f' [{event.id}]'})
self.logger = logging.LoggerAdapter(self.base_logger, {"event": f" [{event.id}]"})
self.logger.info(f"Uploading event: {event.id}")
self.logger.debug(
@@ -164,6 +164,6 @@ class VideoUploader:
}
file_path = self._file_structure_format.format(**format_context)
file_path = re.sub(r'[^\w\-_\.\(\)/ ]', '', file_path) # Sanitize any invalid chars
file_path = re.sub(r"[^\w\-_\.\(\)/ ]", "", file_path) # Sanitize any invalid chars
return pathlib.Path(f"{self._rclone_destination}/{file_path}")

View File

@@ -50,11 +50,11 @@ def add_logging_level(levelName: str, levelNum: int, methodName: Optional[str] =
methodName = levelName.lower()
if hasattr(logging, levelName):
raise AttributeError('{} already defined in logging module'.format(levelName))
raise AttributeError("{} already defined in logging module".format(levelName))
if hasattr(logging, methodName):
raise AttributeError('{} already defined in logging module'.format(methodName))
raise AttributeError("{} already defined in logging module".format(methodName))
if hasattr(logging.getLoggerClass(), methodName):
raise AttributeError('{} already defined in logger class'.format(methodName))
raise AttributeError("{} already defined in logger class".format(methodName))
# This method was inspired by the answers to Stack Overflow post
# http://stackoverflow.com/q/2183233/2988730, especially
@@ -84,19 +84,19 @@ def add_color_to_record_levelname(record):
"""Colorizes logging level names."""
levelno = record.levelno
if levelno >= logging.CRITICAL:
color = '\x1b[31;1m' # RED
color = "\x1b[31;1m" # RED
elif levelno >= logging.ERROR:
color = '\x1b[31;1m' # RED
color = "\x1b[31;1m" # RED
elif levelno >= logging.WARNING:
color = '\x1b[33;1m' # YELLOW
color = "\x1b[33;1m" # YELLOW
elif levelno >= logging.INFO:
color = '\x1b[32;1m' # GREEN
color = "\x1b[32;1m" # GREEN
elif levelno >= logging.DEBUG:
color = '\x1b[36;1m' # CYAN
color = "\x1b[36;1m" # CYAN
elif levelno >= logging.EXTRA_DEBUG:
color = '\x1b[35;1m' # MAGENTA
color = "\x1b[35;1m" # MAGENTA
else:
color = '\x1b[0m'
color = "\x1b[0m"
return f"{color}{record.levelname}\x1b[0m"
@@ -174,7 +174,7 @@ class AppriseStreamHandler(logging.StreamHandler):
def create_logging_handler(format, color_logging):
"""Constructs apprise logging handler for the given format."""
date_format = "%Y-%m-%d %H:%M:%S"
style = '{'
style = "{"
sh = AppriseStreamHandler(color_logging)
formatter = logging.Formatter(format, date_format, style)
@@ -203,11 +203,11 @@ def setup_logging(verbosity: int, color_logging: bool = False, apprise_notifiers
"""
add_logging_level(
'EXTRA_DEBUG',
"EXTRA_DEBUG",
logging.DEBUG - 1,
)
add_logging_level(
'WEBSOCKET_DATA',
"WEBSOCKET_DATA",
logging.DEBUG - 2,
)
@@ -337,9 +337,9 @@ async def run_command(cmd: str, data=None):
)
stdout, stderr = await proc.communicate(data)
stdout = stdout.decode()
stdout_indented = '\t' + stdout.replace('\n', '\n\t').strip()
stdout_indented = "\t" + stdout.replace("\n", "\n\t").strip()
stderr = stderr.decode()
stderr_indented = '\t' + stderr.replace('\n', '\n\t').strip()
stderr_indented = "\t" + stderr.replace("\n", "\n\t").strip()
if proc.returncode != 0:
logger.error(f"Failed to run: '{cmd}")