mirror of
https://github.com/ep1cman/unifi-protect-backup.git
synced 2025-12-05 23:53:30 +00:00
Compare commits
23 Commits
8dd78d2083
...
e1c70d6b99
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e1c70d6b99 | ||
|
|
7439ac9bda | ||
|
|
e3cbcc819e | ||
|
|
ccb816ddbc | ||
|
|
9d2d6558a6 | ||
|
|
3c5056614c | ||
|
|
1f18c06e17 | ||
|
|
3181080bca | ||
|
|
2c0afeaaa4 | ||
|
|
8bbe8a08c2 | ||
|
|
4edd936cb6 | ||
|
|
88bb5ba378 | ||
|
|
bab1d8f81d | ||
|
|
956ed73714 | ||
|
|
d9216a14f9 | ||
|
|
e79bbb206b | ||
|
|
018063c413 | ||
|
|
9f0ab3030f | ||
|
|
01a90b899d | ||
|
|
df62f671e1 | ||
|
|
d41d2ef083 | ||
|
|
61370be1c2 | ||
|
|
aa75ec6f97 |
@@ -1,5 +1,5 @@
|
||||
[bumpversion]
|
||||
current_version = 0.11.0
|
||||
current_version = 0.12.0
|
||||
commit = True
|
||||
tag = True
|
||||
|
||||
@@ -8,8 +8,8 @@ search = version = "{current_version}"
|
||||
replace = version = "{new_version}"
|
||||
|
||||
[bumpversion:file:unifi_protect_backup/__init__.py]
|
||||
search = __version__ = '{current_version}'
|
||||
replace = __version__ = '{new_version}'
|
||||
search = __version__ = "{current_version}"
|
||||
replace = __version__ = "{new_version}"
|
||||
|
||||
[bumpversion:file:Dockerfile]
|
||||
search = COPY dist/unifi_protect_backup-{current_version}.tar.gz sdist.tar.gz
|
||||
|
||||
36
.github/workflows/dev.yml
vendored
36
.github/workflows/dev.yml
vendored
@@ -14,38 +14,6 @@ on:
|
||||
# A workflow run is made up of one or more jobs that can run sequentially or in parallel
|
||||
jobs:
|
||||
# This workflow contains a single job called "test"
|
||||
test:
|
||||
# The type of runner that the job will run on
|
||||
strategy:
|
||||
matrix:
|
||||
python-versions: [3.9]
|
||||
os: [ubuntu-18.04, macos-latest, windows-latest]
|
||||
runs-on: ${{ matrix.os }}
|
||||
|
||||
# Steps represent a sequence of tasks that will be executed as part of the job
|
||||
steps:
|
||||
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: ${{ matrix.python-versions }}
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
pip install poetry tox tox-gh-actions
|
||||
|
||||
- name: test with tox
|
||||
run:
|
||||
tox
|
||||
|
||||
- name: list files
|
||||
run: ls -l .
|
||||
|
||||
- uses: codecov/codecov-action@v1
|
||||
with:
|
||||
fail_ci_if_error: true
|
||||
files: coverage.xml
|
||||
|
||||
dev_container:
|
||||
name: Create dev container
|
||||
@@ -57,9 +25,9 @@ jobs:
|
||||
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
- uses: actions/setup-python@v2
|
||||
- uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: 3.10
|
||||
python-version: 3.13
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
|
||||
10
.github/workflows/release.yml
vendored
10
.github/workflows/release.yml
vendored
@@ -2,24 +2,17 @@
|
||||
|
||||
name: release & publish workflow
|
||||
|
||||
# Controls when the action will run.
|
||||
on:
|
||||
# Triggers the workflow on push events but only for the master branch
|
||||
push:
|
||||
tags:
|
||||
- 'v*'
|
||||
|
||||
# Allows you to run this workflow manually from the Actions tab
|
||||
workflow_dispatch:
|
||||
|
||||
# A workflow run is made up of one or more jobs that can run sequentially or in parallel
|
||||
jobs:
|
||||
# This workflow contains a single job called "release"
|
||||
release:
|
||||
name: Create Release
|
||||
runs-on: ubuntu-20.04
|
||||
|
||||
# Steps represent a sequence of tasks that will be executed as part of the job
|
||||
steps:
|
||||
- name: Get version from tag
|
||||
id: tag_name
|
||||
@@ -27,7 +20,6 @@ jobs:
|
||||
echo ::set-output name=current_version::${GITHUB_REF#refs/tags/v}
|
||||
shell: bash
|
||||
|
||||
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
- name: Get Changelog Entry
|
||||
@@ -39,7 +31,7 @@ jobs:
|
||||
|
||||
- uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: 3.10
|
||||
python-version: "3.10"
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
|
||||
@@ -19,7 +19,7 @@ repos:
|
||||
# Run the formatter.
|
||||
- id: ruff-format
|
||||
- repo: https://github.com/pre-commit/mirrors-mypy
|
||||
rev: v1.11.1
|
||||
rev: v1.14.1
|
||||
hooks:
|
||||
- id: mypy
|
||||
exclude: tests/
|
||||
@@ -27,3 +27,4 @@ repos:
|
||||
- types-pytz
|
||||
- types-cryptography
|
||||
- types-python-dateutil
|
||||
- types-aiofiles
|
||||
|
||||
15
CHANGELOG.md
15
CHANGELOG.md
@@ -4,11 +4,18 @@ All notable changes to this project will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [0.12.0] - 2024-08-06
|
||||
### Fixed
|
||||
## [0.12.0] - 2025-01-18
|
||||
### Added
|
||||
- Tool now targets UIProtect instead of pyunifiprotect which should help any lingering auth issues with Unifi OS 4.X
|
||||
- Python Version bumped to 3.10 (based on UIPortect need)
|
||||
- (had to make the dev and test dependencies required instead of extras to get poetry to work)
|
||||
- Python Version bumped to 3.10 (based on UIProtect need)
|
||||
- The ability to specify only specific cameras to backup
|
||||
- Re-enabled the experimental downloader after adding a monkey patch for UIProtect to include the unmerged code
|
||||
- Switched linter to `ruff`
|
||||
- Added support for SMART_DETECT_LINE events
|
||||
-
|
||||
### Fixed
|
||||
- Unifi now returns unfinished events, this is now handled correctly
|
||||
- Login attempts now use an exponentially increasing delay to try work around aggressive rate limiting on logins
|
||||
|
||||
## [0.11.0] - 2024-06-08
|
||||
### Added
|
||||
|
||||
@@ -7,7 +7,7 @@ LABEL maintainer="ep1cman"
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY dist/unifi_protect_backup-0.11.0.tar.gz sdist.tar.gz
|
||||
COPY dist/unifi_protect_backup-0.12.0.tar.gz sdist.tar.gz
|
||||
|
||||
# https://github.com/rust-lang/cargo/issues/2808
|
||||
ENV CARGO_NET_GIT_FETCH_WITH_CLI=true
|
||||
|
||||
1820
poetry.lock
generated
1820
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -1,7 +1,7 @@
|
||||
[tool]
|
||||
[tool.poetry]
|
||||
name = "unifi_protect_backup"
|
||||
version = "0.11.0"
|
||||
version = "0.12.0"
|
||||
homepage = "https://github.com/ep1cman/unifi-protect-backup"
|
||||
description = "Python tool to backup unifi event clips in realtime."
|
||||
authors = ["sebastian.goscik <sebastian@goscik.com>"]
|
||||
@@ -43,6 +43,7 @@ types-python-dateutil = "^2.8.19.10"
|
||||
bump2version = "^1.0.1"
|
||||
pre-commit = "^2.12.0"
|
||||
ruff = "^0.5.7"
|
||||
types-aiofiles = "^24.1.0.20241221"
|
||||
|
||||
[tool.poetry.group.test]
|
||||
optional = true
|
||||
@@ -66,6 +67,9 @@ target-version = "py310"
|
||||
|
||||
[tool.mypy]
|
||||
allow_redefinition=true
|
||||
exclude = [
|
||||
'unifi_protect_backup/uiprotect_patch.py'
|
||||
]
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core>=1.0.0"]
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
__author__ = """sebastian.goscik"""
|
||||
__email__ = "sebastian@goscik.com"
|
||||
__version__ = "0.11.0"
|
||||
__version__ = "0.12.0"
|
||||
|
||||
from .downloader import VideoDownloader
|
||||
from .downloader_experimental import VideoDownloaderExperimental
|
||||
|
||||
@@ -175,16 +175,24 @@ class VideoDownloader:
|
||||
|
||||
async def _download(self, event: Event) -> Optional[bytes]:
|
||||
"""Downloads the video clip for the given event."""
|
||||
self.logger.debug(" Downloading video...")
|
||||
|
||||
for x in range(5):
|
||||
self.logger.debug(" Downloading video...")
|
||||
assert isinstance(event.camera_id, str)
|
||||
assert isinstance(event.start, datetime)
|
||||
assert isinstance(event.end, datetime)
|
||||
|
||||
request_start_time = datetime.now()
|
||||
try:
|
||||
video = await self._protect.get_camera_video(event.camera_id, event.start, event.end)
|
||||
assert isinstance(video, bytes)
|
||||
break
|
||||
except (AssertionError, ClientPayloadError, TimeoutError) as e:
|
||||
diff_seconds = (datetime.now() - request_start_time).total_seconds()
|
||||
if diff_seconds > 60:
|
||||
self.logger.error(f"Ignoring event. Total wait: {diff_seconds}. Camera: {await get_camera_name(self._protect, event.camera_id)}. Start: {event.start.strftime('%Y-%m-%dT%H-%M-%S')} ({event.start.timestamp()}) End: {event.end.strftime('%Y-%m-%dT%H-%M-%S')} ({event.end.timestamp()})", exc_info=e)
|
||||
await self._ignore_event(event)
|
||||
break
|
||||
self.logger.warning(f" Failed download attempt {x+1}, retying in 1s", exc_info=e)
|
||||
await asyncio.sleep(1)
|
||||
else:
|
||||
|
||||
@@ -85,8 +85,6 @@ class VideoDownloaderExperimental:
|
||||
else:
|
||||
self._has_ffprobe = False
|
||||
|
||||
raise RuntimeError("The `uiprotect` library is currently missing the features for this to work.")
|
||||
|
||||
async def start(self):
|
||||
"""Main loop."""
|
||||
self.logger.info("Starting Downloader")
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from time import sleep
|
||||
from typing import List, Optional
|
||||
from typing import List
|
||||
|
||||
from uiprotect.api import ProtectApiClient
|
||||
from uiprotect.websocket import WebsocketState
|
||||
@@ -23,7 +23,7 @@ class EventListener:
|
||||
protect: ProtectApiClient,
|
||||
detection_types: List[str],
|
||||
ignore_cameras: List[str],
|
||||
cameras: Optional[List[str]] = None,
|
||||
cameras: List[str],
|
||||
):
|
||||
"""Init.
|
||||
|
||||
@@ -32,7 +32,7 @@ class EventListener:
|
||||
protect (ProtectApiClient): UniFI Protect API client to use
|
||||
detection_types (List[str]): Desired Event detection types to look for
|
||||
ignore_cameras (List[str]): Cameras IDs to ignore events from
|
||||
cameras (Optional[List[str]]): Cameras IDs to ONLY include events from
|
||||
cameras (List[str]): Cameras IDs to ONLY include events from
|
||||
"""
|
||||
self._event_queue: asyncio.Queue = event_queue
|
||||
self._protect: ProtectApiClient = protect
|
||||
@@ -40,7 +40,7 @@ class EventListener:
|
||||
self._unsub_websocketstate = None
|
||||
self.detection_types: List[str] = detection_types
|
||||
self.ignore_cameras: List[str] = ignore_cameras
|
||||
self.cameras: Optional[List[str]] = cameras
|
||||
self.cameras: List[str] = cameras
|
||||
|
||||
async def start(self):
|
||||
"""Main Loop."""
|
||||
@@ -63,7 +63,7 @@ class EventListener:
|
||||
return
|
||||
if msg.new_obj.camera_id in self.ignore_cameras:
|
||||
return
|
||||
if self.cameras is not None and msg.new_obj.camera_id not in self.cameras:
|
||||
if self.cameras and msg.new_obj.camera_id not in self.cameras:
|
||||
return
|
||||
if "end" not in msg.changed_data:
|
||||
return
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import AsyncIterator, List, Optional
|
||||
from typing import AsyncIterator, List
|
||||
|
||||
import aiosqlite
|
||||
from dateutil.relativedelta import relativedelta
|
||||
@@ -28,8 +28,8 @@ class MissingEventChecker:
|
||||
uploader: VideoUploader,
|
||||
retention: relativedelta,
|
||||
detection_types: List[str],
|
||||
ignore_cameras: List[str] = [],
|
||||
cameras: Optional[List[str]] = None,
|
||||
ignore_cameras: List[str],
|
||||
cameras: List[str],
|
||||
interval: int = 60 * 5,
|
||||
) -> None:
|
||||
"""Init.
|
||||
@@ -43,7 +43,7 @@ class MissingEventChecker:
|
||||
retention (relativedelta): Retention period to limit search window
|
||||
detection_types (List[str]): Detection types wanted to limit search
|
||||
ignore_cameras (List[str]): Ignored camera IDs to limit search
|
||||
cameras (Optional[List[str]]): Included (ONLY) camera IDs to limit search
|
||||
cameras (List[str]): Included (ONLY) camera IDs to limit search
|
||||
interval (int): How frequently, in seconds, to check for missing events,
|
||||
"""
|
||||
self._protect: ProtectApiClient = protect
|
||||
@@ -54,7 +54,7 @@ class MissingEventChecker:
|
||||
self.retention: relativedelta = retention
|
||||
self.detection_types: List[str] = detection_types
|
||||
self.ignore_cameras: List[str] = ignore_cameras
|
||||
self.cameras: Optional[List[str]] = cameras
|
||||
self.cameras: List[str] = cameras
|
||||
self.interval: int = interval
|
||||
|
||||
async def _get_missing_events(self) -> AsyncIterator[Event]:
|
||||
@@ -87,7 +87,7 @@ class MissingEventChecker:
|
||||
break # No completed events to process
|
||||
|
||||
# Next chunks start time should be the end of the oldest complete event in the current chunk
|
||||
start_time = max([event.end for event in unifi_events.values()])
|
||||
start_time = max([event.end for event in unifi_events.values() if event.end is not None])
|
||||
|
||||
# Get list of events that have been backed up from the database
|
||||
|
||||
@@ -103,9 +103,9 @@ class MissingEventChecker:
|
||||
downloading_event_ids.add(current_download.id)
|
||||
|
||||
uploading_event_ids = {event.id for event, video in self._uploader.upload_queue._queue} # type: ignore
|
||||
current_upload = self._uploader.current_event
|
||||
if current_upload is not None:
|
||||
uploading_event_ids.add(current_upload.id)
|
||||
for current_upload in self._uploader.current_events:
|
||||
if current_upload is not None:
|
||||
uploading_event_ids.add(current_upload.id)
|
||||
|
||||
missing_event_ids = set(unifi_events.keys()) - (db_event_ids | downloading_event_ids | uploading_event_ids)
|
||||
|
||||
@@ -116,7 +116,7 @@ class MissingEventChecker:
|
||||
return False # This event is still on-going
|
||||
if event.camera_id in self.ignore_cameras:
|
||||
return False
|
||||
if self.cameras is not None and event.camera_id not in self.cameras:
|
||||
if self.cameras and event.camera_id not in self.cameras:
|
||||
return False
|
||||
if event.type is EventType.MOTION and "motion" not in self.detection_types:
|
||||
return False
|
||||
|
||||
@@ -85,5 +85,5 @@ class Purge:
|
||||
logger.error("Unexpected exception occurred during purge:", exc_info=e)
|
||||
|
||||
next_purge_time = datetime.now() + self.interval
|
||||
logger.extra_debug(f"sleeping until {next_purge_time}")
|
||||
logger.debug(f"sleeping until {next_purge_time}")
|
||||
await wait_until(next_purge_time)
|
||||
|
||||
135
unifi_protect_backup/uiprotect_patch.py
Normal file
135
unifi_protect_backup/uiprotect_patch.py
Normal file
@@ -0,0 +1,135 @@
|
||||
import enum
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
import aiofiles
|
||||
|
||||
from uiprotect.data import Version
|
||||
from uiprotect.exceptions import BadRequest
|
||||
from uiprotect.utils import to_js_time
|
||||
|
||||
|
||||
# First, let's add the new VideoExportType enum
|
||||
class VideoExportType(str, enum.Enum):
|
||||
TIMELAPSE = "timelapse"
|
||||
ROTATING = "rotating"
|
||||
|
||||
|
||||
def monkey_patch_experimental_downloader():
|
||||
from uiprotect.api import ProtectApiClient
|
||||
|
||||
# Add the version constant
|
||||
ProtectApiClient.NEW_DOWNLOAD_VERSION = Version("4.0.0") # You'll need to import Version from uiprotect
|
||||
|
||||
async def _validate_channel_id(self, camera_id: str, channel_index: int) -> None:
|
||||
if self._bootstrap is None:
|
||||
await self.update()
|
||||
try:
|
||||
camera = self._bootstrap.cameras[camera_id]
|
||||
camera.channels[channel_index]
|
||||
except (IndexError, AttributeError, KeyError) as e:
|
||||
raise BadRequest(f"Invalid input: {e}") from e
|
||||
|
||||
async def prepare_camera_video(
|
||||
self,
|
||||
camera_id: str,
|
||||
start: datetime,
|
||||
end: datetime,
|
||||
channel_index: int = 0,
|
||||
validate_channel_id: bool = True,
|
||||
fps: Optional[int] = None,
|
||||
filename: Optional[str] = None,
|
||||
) -> Optional[dict[str, Any]]:
|
||||
if self.bootstrap.nvr.version < self.NEW_DOWNLOAD_VERSION:
|
||||
raise ValueError("This method is only support from Unifi Protect version >= 4.0.0.")
|
||||
|
||||
if validate_channel_id:
|
||||
await self._validate_channel_id(camera_id, channel_index)
|
||||
|
||||
params = {
|
||||
"camera": camera_id,
|
||||
"start": to_js_time(start),
|
||||
"end": to_js_time(end),
|
||||
}
|
||||
|
||||
if channel_index == 3:
|
||||
params.update({"lens": 2})
|
||||
else:
|
||||
params.update({"channel": channel_index})
|
||||
|
||||
if fps is not None and fps > 0:
|
||||
params["fps"] = fps
|
||||
params["type"] = VideoExportType.TIMELAPSE.value
|
||||
else:
|
||||
params["type"] = VideoExportType.ROTATING.value
|
||||
|
||||
if not filename:
|
||||
start_str = start.strftime("%m-%d-%Y, %H.%M.%S %Z")
|
||||
end_str = end.strftime("%m-%d-%Y, %H.%M.%S %Z")
|
||||
filename = f"{camera_id} {start_str} - {end_str}.mp4"
|
||||
|
||||
params["filename"] = filename
|
||||
|
||||
return await self.api_request(
|
||||
"video/prepare",
|
||||
params=params,
|
||||
raise_exception=True,
|
||||
)
|
||||
|
||||
async def download_camera_video(
|
||||
self,
|
||||
camera_id: str,
|
||||
filename: str,
|
||||
output_file: Optional[Path] = None,
|
||||
iterator_callback: Optional[callable] = None,
|
||||
progress_callback: Optional[callable] = None,
|
||||
chunk_size: int = 65536,
|
||||
) -> Optional[bytes]:
|
||||
if self.bootstrap.nvr.version < self.NEW_DOWNLOAD_VERSION:
|
||||
raise ValueError("This method is only support from Unifi Protect version >= 4.0.0.")
|
||||
|
||||
params = {
|
||||
"camera": camera_id,
|
||||
"filename": filename,
|
||||
}
|
||||
|
||||
if iterator_callback is None and progress_callback is None and output_file is None:
|
||||
return await self.api_request_raw(
|
||||
"video/download",
|
||||
params=params,
|
||||
raise_exception=False,
|
||||
)
|
||||
|
||||
r = await self.request(
|
||||
"get",
|
||||
f"{self.api_path}video/download",
|
||||
auto_close=False,
|
||||
timeout=0,
|
||||
params=params,
|
||||
)
|
||||
|
||||
if output_file is not None:
|
||||
async with aiofiles.open(output_file, "wb") as output:
|
||||
|
||||
async def callback(total: int, chunk: Optional[bytes]) -> None:
|
||||
if iterator_callback is not None:
|
||||
await iterator_callback(total, chunk)
|
||||
if chunk is not None:
|
||||
await output.write(chunk)
|
||||
|
||||
await self._stream_response(r, chunk_size, callback, progress_callback)
|
||||
else:
|
||||
await self._stream_response(
|
||||
r,
|
||||
chunk_size,
|
||||
iterator_callback,
|
||||
progress_callback,
|
||||
)
|
||||
r.close()
|
||||
return None
|
||||
|
||||
# Patch the methods into the class
|
||||
ProtectApiClient._validate_channel_id = _validate_channel_id
|
||||
ProtectApiClient.prepare_camera_video = prepare_camera_video
|
||||
ProtectApiClient.download_camera_video = download_camera_video
|
||||
@@ -29,11 +29,19 @@ from unifi_protect_backup.utils import (
|
||||
setup_logging,
|
||||
)
|
||||
|
||||
from unifi_protect_backup.uiprotect_patch import monkey_patch_experimental_downloader
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# TODO: https://github.com/cjrh/aiorun#id6 (smart shield)
|
||||
|
||||
|
||||
# We have been waiting for a long time for this PR to get merged
|
||||
# https://github.com/uilibs/uiprotect/pull/249
|
||||
# Since it has not progressed, we will for now patch in the functionality ourselves
|
||||
monkey_patch_experimental_downloader()
|
||||
|
||||
|
||||
async def create_database(path: str):
|
||||
"""Creates sqlite database and creates the events abd backups tables."""
|
||||
db = await aiosqlite.connect(path)
|
||||
@@ -76,7 +84,7 @@ class UnifiProtectBackup:
|
||||
color_logging: bool = False,
|
||||
download_rate_limit: float | None = None,
|
||||
port: int = 443,
|
||||
use_experimental_downloader: bool = False,
|
||||
use_experimental_downloader: bool = False
|
||||
):
|
||||
"""Will configure logging settings and the Unifi Protect API (but not actually connect).
|
||||
|
||||
@@ -273,7 +281,7 @@ class UnifiProtectBackup:
|
||||
self.rclone_args,
|
||||
self.file_structure_format,
|
||||
self._db,
|
||||
self.color_logging,
|
||||
self.color_logging
|
||||
)
|
||||
tasks.append(uploader.start())
|
||||
|
||||
|
||||
@@ -4,7 +4,8 @@ import logging
|
||||
import pathlib
|
||||
import re
|
||||
from datetime import datetime
|
||||
|
||||
import os
|
||||
import asyncio
|
||||
import aiosqlite
|
||||
from uiprotect import ProtectApiClient
|
||||
from uiprotect.data.nvr import Event
|
||||
@@ -52,12 +53,42 @@ class VideoUploader:
|
||||
self._rclone_args: str = rclone_args
|
||||
self._file_structure_format: str = file_structure_format
|
||||
self._db: aiosqlite.Connection = db
|
||||
self.current_event = None
|
||||
self.current_events = []
|
||||
|
||||
self.base_logger = logging.getLogger(__name__)
|
||||
setup_event_logger(self.base_logger, color_logging)
|
||||
self.logger = logging.LoggerAdapter(self.base_logger, {"event": ""})
|
||||
|
||||
async def _upload_worker(self, semaphore, worker_id):
|
||||
async with semaphore:
|
||||
while True:
|
||||
try:
|
||||
event, video = await self.upload_queue.get()
|
||||
self.current_events[worker_id] = event
|
||||
|
||||
logger = logging.LoggerAdapter(self.base_logger, {'event': f' [{event.id}]'})
|
||||
|
||||
logger.info(f"Uploading event: {event.id}")
|
||||
logger.debug(
|
||||
f" Remaining Upload Queue: {self.upload_queue.qsize_files()}"
|
||||
f" ({human_readable_size(self.upload_queue.qsize())})"
|
||||
)
|
||||
|
||||
destination = await self._generate_file_path(event)
|
||||
logger.debug(f" Destination: {destination}")
|
||||
|
||||
try:
|
||||
await self._upload_video(video, destination, self._rclone_args)
|
||||
await self._update_database(event, destination)
|
||||
logger.debug("Uploaded")
|
||||
except SubprocessException:
|
||||
logger.error(f" Failed to upload file: '{destination}'")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected exception occurred, abandoning event {event.id}:", exc_info=e)
|
||||
|
||||
self.current_events[worker_id] = None
|
||||
|
||||
async def start(self):
|
||||
"""Main loop.
|
||||
|
||||
@@ -65,33 +96,13 @@ class VideoUploader:
|
||||
using rclone, finally it updates the database
|
||||
"""
|
||||
self.logger.info("Starting Uploader")
|
||||
while True:
|
||||
try:
|
||||
event, video = await self.upload_queue.get()
|
||||
self.current_event = event
|
||||
|
||||
rclone_transfers = int(os.getenv('RCLONE_PARALLEL_UPLOADS', '1'))
|
||||
self.current_events = [None] * rclone_transfers
|
||||
semaphore = asyncio.Semaphore(rclone_transfers)
|
||||
|
||||
self.logger = logging.LoggerAdapter(self.base_logger, {"event": f" [{event.id}]"})
|
||||
|
||||
self.logger.info(f"Uploading event: {event.id}")
|
||||
self.logger.debug(
|
||||
f" Remaining Upload Queue: {self.upload_queue.qsize_files()}"
|
||||
f" ({human_readable_size(self.upload_queue.qsize())})"
|
||||
)
|
||||
|
||||
destination = await self._generate_file_path(event)
|
||||
self.logger.debug(f" Destination: {destination}")
|
||||
|
||||
try:
|
||||
await self._upload_video(video, destination, self._rclone_args)
|
||||
await self._update_database(event, destination)
|
||||
self.logger.debug("Uploaded")
|
||||
except SubprocessException:
|
||||
self.logger.error(f" Failed to upload file: '{destination}'")
|
||||
|
||||
self.current_event = None
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Unexpected exception occurred, abandoning event {event.id}:", exc_info=e)
|
||||
workers = [self._upload_worker(semaphore, i) for i in range(rclone_transfers)]
|
||||
await asyncio.gather(*workers)
|
||||
|
||||
async def _upload_video(self, video: bytes, destination: pathlib.Path, rclone_args: str):
|
||||
"""Upload video using rclone.
|
||||
@@ -163,7 +174,7 @@ class VideoUploader:
|
||||
"camera_name": await get_camera_name(self._protect, event.camera_id),
|
||||
}
|
||||
|
||||
file_path = self._file_structure_format.format(**format_context)
|
||||
file_path = self._file_structure_format.format(**format_context).lower()
|
||||
file_path = re.sub(r"[^\w\-_\.\(\)/ ]", "", file_path) # Sanitize any invalid chars
|
||||
|
||||
file_path = file_path.replace(" ", "_")
|
||||
return pathlib.Path(f"{self._rclone_destination}/{file_path}")
|
||||
|
||||
Reference in New Issue
Block a user