Files
JARVIS/backend/app/services/webdav_service.py

128 lines
4.8 KiB
Python
Raw Normal View History

from dataclasses import dataclass, field
from urllib.parse import quote, urljoin
import xml.etree.ElementTree as ET
import httpx
from app.models.remote_mount import RemoteMount
from app.services.secret_service import decrypt_secret
WEBDAV_NAMESPACE = {
"d": "DAV:",
}
@dataclass
class WebDavNode:
path: str
name: str
is_dir: bool
size: int | None = None
modified_at: str | None = None
etag: str | None = None
children: list["WebDavNode"] = field(default_factory=list)
class WebDavService:
def __init__(self, mount: RemoteMount):
self.mount = mount
self.username = mount.username or None
self.password = decrypt_secret(mount.password_encrypted)
def _normalize_remote_path(self, remote_path: str | None = None) -> str:
path = remote_path or self.mount.root_path or "/"
if not path.startswith("/"):
path = f"/{path}"
return path
def _build_url(self, remote_path: str | None = None) -> str:
path = self._normalize_remote_path(remote_path)
encoded = "/".join(quote(segment) for segment in path.split("/") if segment)
if not encoded:
return self.mount.base_url.rstrip("/") + "/"
return urljoin(self.mount.base_url.rstrip("/") + "/", encoded)
async def list_directory(self, remote_path: str | None = None) -> list[WebDavNode]:
path = self._normalize_remote_path(remote_path)
body = """<?xml version="1.0" encoding="utf-8" ?>
<d:propfind xmlns:d="DAV:">
<d:prop>
<d:displayname />
<d:resourcetype />
<d:getcontentlength />
<d:getlastmodified />
<d:getetag />
</d:prop>
</d:propfind>"""
async with httpx.AsyncClient(timeout=30.0, auth=self._auth()) as client:
response = await client.request(
"PROPFIND",
self._build_url(path),
headers={"Depth": "1", "Content-Type": "application/xml"},
content=body,
)
response.raise_for_status()
return self._parse_propfind(path, response.text)
async def list_tree(self, remote_path: str | None = None, max_depth: int = 4) -> list[WebDavNode]:
path = self._normalize_remote_path(remote_path)
nodes = await self.list_directory(path)
if max_depth <= 1:
return nodes
for node in nodes:
if node.is_dir:
node.children = await self.list_tree(node.path, max_depth=max_depth - 1)
return nodes
async def download_file(self, remote_path: str) -> tuple[bytes, str]:
normalized = self._normalize_remote_path(remote_path)
async with httpx.AsyncClient(timeout=120.0, auth=self._auth()) as client:
response = await client.get(self._build_url(normalized))
response.raise_for_status()
name = normalized.rstrip("/").split("/")[-1] or "remote-file"
return response.content, name
def _auth(self) -> httpx.BasicAuth | None:
if self.username and self.password:
return httpx.BasicAuth(self.username, self.password)
return None
def _parse_propfind(self, parent_path: str, payload: str) -> list[WebDavNode]:
root = ET.fromstring(payload)
nodes: list[WebDavNode] = []
for response in root.findall("d:response", WEBDAV_NAMESPACE):
href = response.findtext("d:href", default="", namespaces=WEBDAV_NAMESPACE)
if not href:
continue
normalized_href = "/" + href.split("://", 1)[-1].split("/", 1)[-1].strip("/")
normalized_href = "/" if normalized_href == "/" else normalized_href.rstrip("/")
normalized_parent = self._normalize_remote_path(parent_path).rstrip("/") or "/"
if normalized_href.rstrip("/") == normalized_parent.rstrip("/"):
continue
prop = response.find("d:propstat/d:prop", WEBDAV_NAMESPACE)
if prop is None:
continue
is_dir = prop.find("d:resourcetype/d:collection", WEBDAV_NAMESPACE) is not None
display_name = prop.findtext("d:displayname", default="", namespaces=WEBDAV_NAMESPACE) or normalized_href.split("/")[-1]
size_text = prop.findtext("d:getcontentlength", default="", namespaces=WEBDAV_NAMESPACE)
etag = prop.findtext("d:getetag", default=None, namespaces=WEBDAV_NAMESPACE)
modified_at = prop.findtext("d:getlastmodified", default=None, namespaces=WEBDAV_NAMESPACE)
nodes.append(WebDavNode(
path=normalized_href,
name=display_name,
is_dir=is_dir,
size=int(size_text) if size_text.isdigit() else None,
etag=etag,
modified_at=modified_at,
))
nodes.sort(key=lambda item: (not item.is_dir, item.name.lower()))
return nodes