diff --git a/.env.example b/.env.example index e4aee31..e66bbf1 100644 --- a/.env.example +++ b/.env.example @@ -1,46 +1,75 @@ -# 存储类型选择 -## Done -# r2 | Cloudflare R2 -# github | GitHub Repository -## TODO LIST: -# cnbcool | Tencent cnb.cool -# s3 | Amazon S3 -# Microsoft Onedrive +# ==================== 存储类型选择 ==================== +# 支持的存储类型 (Supported Storage Types): +# - r2: Cloudflare R2 +# - github: GitHub Repository +# - onedrive: Microsoft OneDrive STORAGE_TYPE=r2 # ==================== Cloudflare R2 配置 ==================== -# R2 账户 ID +# 仅当 STORAGE_TYPE=r2 时需要配置 + +# R2 账户 ID (Cloudflare 账户 ID) R2_ACCOUNT_ID=your-account-id -# R2 访问凭证 +# R2 访问密钥 ID R2_ACCESS_KEY_ID=YOUR_R2_ACCESS_KEY_ID + +# R2 访问密钥密码 R2_SECRET_ACCESS_KEY=YOUR_R2_SECRET_ACCESS_KEY # R2 存储桶名称 R2_BUCKET_NAME=drive -# R2 公共访问域名 (可选,例如: https://pub-.r2.dev) +# R2 公共访问域名 (可选,用于生成公共 URL) +# 示例: https://pub-.r2.dev R2_PUBLIC_DOMAIN=https://pub-.r2.dev # ==================== GitHub 存储配置 ==================== -# GitHub 仓库 (格式: owner/repo) +# 仅当 STORAGE_TYPE=github 时需要配置 + +# GitHub 仓库地址 (格式: owner/repo) +# 示例: RhenCloud/Cloud-Index GITHUB_REPO=your-username/your-repo # GitHub 访问令牌 (需要 repo 权限) +# 生成: https://github.com/settings/tokens GITHUB_TOKEN=your-access-token # GitHub 分支名称 (默认: main) GITHUB_BRANCH=main +# ==================== Microsoft OneDrive 配置 ==================== +# 仅当 STORAGE_TYPE=onedrive 时需要配置 + +# OneDrive 访问令牌 +ONEDRIVE_REFRESH_TOKEN=your-refresh-token +ONEDRIVE_CLIENT_ID=your-client-id +ONEDRIVE_CLIENT_SECRET=your-client-secret + +# OneDrive 文件夹 ID (可选,留空则使用根目录) +# 默认值: 使用 /me/drive/root (OneDrive 根目录) +# ONEDRIVE_FOLDER_ID=folder-item-id + # ==================== 应用配置 ==================== -# 服务器配置 + +# 服务器监听地址 +# 0.0.0.0: 监听所有网卡 +# 127.0.0.1: 仅本地访问 HOST=0.0.0.0 + +# 服务器监听端口 (默认: 5000) PORT=5000 + +# 调试模式 (true/false,默认: false) +# 生产环境应设置为 false DEBUG=false -# 缩略图缓存时间(秒,默认: 3600) +# ==================== 缓存和 URL 配置 ==================== + +# 缩略图缓存时间 (秒,默认: 3600) +# 用于浏览器缓存缩略图,减少服务器负担 THUMB_TTL_SECONDS=3600 -# 预签名 URL 过期时间(秒,默认: 3600) +# 预签名 URL 过期时间 (秒,默认: 3600) +# 用于生成临时访问链接,3600 秒 = 1 小时 PRESIGNED_URL_EXPIRES=3600 -THUMB_TTL_SECONDS=3600 diff --git a/README.md b/README.md index 0826523..72be4e5 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ - [x] Github Repo 储存支持 - [ ] Github Release 储存支持 -- [ ] Microsoft Onedrive 储存支持 +- [x] Microsoft Onedrive 储存支持 - [ ] 基于数据库的用户/权限管理 - [ ] 操作日志记录 - [ ] Office Documents 预览支持 @@ -48,6 +48,7 @@ - **Cloudflare R2** - Cloudflare 的对象存储服务(S3 兼容) - **Amazon S3** - Amazon S3 对象存储服务 - **GitHub Repository** - 基于 GitHub Repository 的存储服务 +- **Microsoft Onedrive** - Microsoft Onedrive 云端硬盘 ## 快速开始 diff --git a/config.py b/config.py index 1be3829..bc1cbf5 100644 --- a/config.py +++ b/config.py @@ -30,6 +30,13 @@ class Config: GITHUB_REPO: Optional[str] = os.getenv("GITHUB_REPO") # 格式: owner/repo GITHUB_BRANCH: str = os.getenv("GITHUB_BRANCH", "main") + # OneDrive 配置 + ONEDRIVE_REFRESH_TOKEN: Optional[str] = os.getenv("ONEDRIVE_REFRESH_TOKEN") + ONEDRIVE_CLIENT_ID: Optional[str] = os.getenv("ONEDRIVE_CLIENT_ID") + ONEDRIVE_CLIENT_SECRET: Optional[str] = os.getenv("ONEDRIVE_CLIENT_SECRET") + ONEDRIVE_FOLDER_ID: Optional[str] = os.getenv("ONEDRIVE_FOLDER_ID") # 可选,默认使用 /me/drive/root + ONEDRIVE_REDIRECT_URI: Optional[str] = os.getenv("ONEDRIVE_REDIRECT_URI") # 可选,刷新令牌时某些应用需要 + # 应用配置 HOST: str = os.getenv("HOST", "0.0.0.0") PORT: int = int(os.getenv("PORT", "5000")) @@ -46,7 +53,7 @@ class Config: def validate(cls) -> None: """验证必需的配置项是否已设置""" if not cls.STORAGE_TYPE: - raise ValueError("STORAGE_TYPE environment variable is not set. Supported types: r2, github") + raise ValueError("STORAGE_TYPE environment variable is not set. Supported types: r2, github, onedrive") if cls.STORAGE_TYPE == "r2": required = ["R2_ACCOUNT_ID", "R2_ACCESS_KEY_ID", "R2_SECRET_ACCESS_KEY", "R2_BUCKET_NAME"] @@ -60,8 +67,14 @@ class Config: if missing: raise ValueError(f"Missing required GitHub configuration: {', '.join(missing)}") - elif cls.STORAGE_TYPE not in ["r2", "github"]: - raise ValueError(f"Unsupported storage type: {cls.STORAGE_TYPE}. Supported types: r2, github") + elif cls.STORAGE_TYPE == "onedrive": + required = ["ONEDRIVE_REFRESH_TOKEN", "ONEDRIVE_CLIENT_ID", "ONEDRIVE_CLIENT_SECRET"] + missing = [key for key in required if not getattr(cls, key)] + if missing: + raise ValueError(f"Missing required OneDrive configuration: {', '.join(missing)}") + + elif cls.STORAGE_TYPE not in ["r2", "github", "onedrive"]: + raise ValueError(f"Unsupported storage type: {cls.STORAGE_TYPE}. Supported types: r2, github, onedrive") @classmethod def get_storage_config(cls) -> dict: @@ -80,4 +93,12 @@ class Config: "repo": cls.GITHUB_REPO, "branch": cls.GITHUB_BRANCH, } + elif cls.STORAGE_TYPE == "onedrive": + return { + "client_id": cls.ONEDRIVE_CLIENT_ID, + "client_secret": cls.ONEDRIVE_CLIENT_SECRET, + "refresh_token": cls.ONEDRIVE_REFRESH_TOKEN, + "folder_id": cls.ONEDRIVE_FOLDER_ID, + "redirect_uri": cls.ONEDRIVE_REDIRECT_URI, + } return {} diff --git a/handlers/routes.py b/handlers/routes.py index 86197b5..6a50df1 100644 --- a/handlers/routes.py +++ b/handlers/routes.py @@ -50,13 +50,8 @@ def build_file_entry(obj: Dict[str, Any], prefix: str) -> Dict[str, Any] | None: "file_url": get_file_url(key), } - public_url = storage.get_public_url(key) - if public_url: - entry["public_url"] = public_url - - presigned = storage.generate_presigned_url(key) - if presigned: - entry["presigned_url"] = presigned + # 性能优化:避免在列表阶段为每个文件预取公共/预签名链接 + # 统一走 /file/ 路由,点击时再获取所需链接 return entry diff --git a/storages/factory.py b/storages/factory.py index 0d8408e..443ba52 100644 --- a/storages/factory.py +++ b/storages/factory.py @@ -4,6 +4,7 @@ from config import Config from .base import BaseStorage from .github import GitHubStorage +from .onedrive import OnedriveStorage from .r2 import R2Storage @@ -29,16 +30,16 @@ class StorageFactory: storage_type = Config.STORAGE_TYPE if not storage_type: - raise RuntimeError("STORAGE_TYPE environment variable is not set. Supported types: r2, github") + raise RuntimeError("STORAGE_TYPE environment variable is not set. Supported types: r2, github, onedrive") if storage_type == "r2": cls._instance = R2Storage() elif storage_type == "github": cls._instance = GitHubStorage() + elif storage_type == "onedrive": + cls._instance = OnedriveStorage() else: - raise RuntimeError(f"Unsupported storage type: {storage_type}. Supported types: r2, github") - - return cls._instance + raise RuntimeError(f"Unsupported storage type: {storage_type}. Supported types: r2, github, onedrive") return cls._instance diff --git a/storages/onedrive.py b/storages/onedrive.py new file mode 100644 index 0000000..8ea8e0d --- /dev/null +++ b/storages/onedrive.py @@ -0,0 +1,686 @@ +from datetime import datetime, timedelta +from io import BytesIO +from typing import Any, Dict, Optional + +import requests +from PIL import Image + +from config import Config + +from .base import BaseStorage + + +class _InvalidGrant(Exception): + """内部异常:用于标记 invalid_grant 以便进行下一次尝试""" + + pass + + +class OnedriveStorage(BaseStorage): + """基于 OneDrive 的存储实现,支持自动令牌刷新""" + + def __init__(self): + """初始化 OneDrive 存储客户端""" + self.client_id = Config.ONEDRIVE_CLIENT_ID + self.client_secret = Config.ONEDRIVE_CLIENT_SECRET + self.refresh_token = Config.ONEDRIVE_REFRESH_TOKEN + self.folder_id = Config.ONEDRIVE_FOLDER_ID + self.graph_api_url = "https://graph.microsoft.com/v1.0" + + if not (self.client_id and self.client_secret and self.refresh_token): + raise RuntimeError("ONEDRIVE_CLIENT_ID, ONEDRIVE_CLIENT_SECRET, and ONEDRIVE_REFRESH_TOKEN must be set") + + # 如果没有指定 folder_id,使用 /me/drive/root + self.folder_item_id = self.folder_id or "root" + + # 初始化 access_token 并刷新 + self.access_token = None + self._refresh_token() + + def _refresh_token(self) -> None: + """刷新 OneDrive 访问令牌""" + configured_scopes = getattr(Config, "ONEDRIVE_SCOPES", None) + attempts = [None] + ([configured_scopes] if configured_scopes else []) + ["Files.ReadWrite.All offline_access"] + token_json = self._perform_refresh_attempts(attempts) + self.access_token = token_json["access_token"] + new_refresh = token_json.get("refresh_token") + if new_refresh: + self.refresh_token = new_refresh + self._access_token_expires_in = token_json.get("expires_in") + + def _perform_refresh_attempts(self, attempts: list) -> dict: + errors: list[str] = [] + + for idx, scope in enumerate(attempts, start=1): + try: + token_json = self._do_refresh_attempt(scope) + return token_json + except _InvalidGrant as e: + errors.append(f"Attempt {idx} invalid_grant: {str(e)}") + continue + + raise RuntimeError("Failed to refresh OneDrive token: " + " | ".join(errors)) + + def _do_refresh_attempt(self, scope: str | None) -> dict: + url = "https://login.microsoftonline.com/common/oauth2/v2.0/token" + headers = {"Content-Type": "application/x-www-form-urlencoded"} + + payload = { + "client_id": self.client_id, + "grant_type": "refresh_token", + "refresh_token": self.refresh_token, + } + if scope: + payload["scope"] = scope + if self.client_secret: + payload["client_secret"] = self.client_secret + redirect_uri = getattr(Config, "ONEDRIVE_REDIRECT_URI", None) + if redirect_uri: + payload["redirect_uri"] = redirect_uri + + resp = requests.post(url, data=payload, headers=headers, timeout=20) + try: + detail = resp.json() + except Exception: + detail = resp.text + + if resp.status_code != 200: + # 如果是 invalid_grant,抛出内部异常以触发下一次尝试 + if isinstance(detail, dict) and detail.get("error") == "invalid_grant": + raise _InvalidGrant(detail) + raise RuntimeError(f"Token endpoint error {resp.status_code}: {detail}") + + if isinstance(detail, dict) and detail.get("error"): + if detail.get("error") == "invalid_grant": + raise _InvalidGrant(detail) + raise RuntimeError(f"Token response error: {detail}") + + token_json = detail if isinstance(detail, dict) else {} + access_token = token_json.get("access_token") + if not access_token: + # 视为无效,交给下一次尝试 + raise _InvalidGrant({"error": "missing_access_token", "detail": detail}) + return token_json + + def _try_refresh(self, func): + """尝试执行函数,如果失败则刷新令牌后重试(处理令牌过期)""" + try: + return func() + except Exception as e: + if "401" in str(e) or "Unauthorized" in str(e): + # 令牌过期,刷新并重试 + self._refresh_token() + return func() + raise + + def _api_request(self, method: str, url: str, **kwargs): + """ + 执行 API 请求,自动处理令牌过期 + + Args: + method: HTTP 方法 (GET, POST, PUT, DELETE, PATCH) + url: API URL + **kwargs: 其他请求参数 + + Returns: + 响应对象 + """ + + def _do_request(): + response = requests.request(method, url, headers=self._headers(), **kwargs) + if response.status_code == 401: + raise RuntimeError("Unauthorized - OneDrive access token expired") + return response + + return self._try_refresh(_do_request) + + def _headers(self) -> Dict[str, str]: + """返回 API 请求的公共头部信息""" + return { + "Authorization": f"Bearer {self.access_token}", + "Content-Type": "application/json", + } + + def _verify_connection(self) -> None: + """验证 OneDrive 连接是否有效""" + try: + url = f"{self.graph_api_url}/me/drive" + response = requests.get(url, headers=self._headers(), timeout=10) + if response.status_code != 200: + raise RuntimeError(f"OneDrive connection failed: {response.text}") + except Exception as e: + raise RuntimeError(f"Failed to connect to OneDrive: {str(e)}") from None + + def _get_folder_id(self, path: str) -> Optional[str]: + """获取指定路径的文件夹 ID""" + if not path or path == "/": + return self.folder_item_id + + try: + # 直接获取该路径对应项的元数据(而不是其子项),以拿到该文件夹自身的 id + path = path.strip("/") + if self.folder_item_id == "root": + url = f"{self.graph_api_url}/me/drive/root:/{path}:" + else: + url = f"{self.graph_api_url}/me/drive/items/{self.folder_item_id}:/{path}:" + response = requests.get(url, headers=self._headers(), timeout=10) + + if response.status_code == 200: + item = response.json() + if item.get("folder"): + return item.get("id") + except Exception: + pass + + return None + + def list_objects(self, prefix: str = "") -> Dict[str, Any]: + """ + 列出存储桶中的对象 + + Args: + prefix: 对象前缀(用于目录浏览) + + Returns: + 包含对象列表的字典 + """ + try: + prefix = prefix.rstrip("/") if prefix else "" + + # 直接基于路径列出,避免先查 ID 再列出造成的额外往返 + select = "$select=name,size,lastModifiedDateTime,id,folder,file" + if prefix: + if self.folder_item_id == "root": + url = f"{self.graph_api_url}/me/drive/root:/{prefix}:/children?{select}" + else: + url = f"{self.graph_api_url}/me/drive/items/{self.folder_item_id}:/{prefix}:/children?{select}" + else: + if self.folder_item_id == "root": + url = f"{self.graph_api_url}/me/drive/root/children?{select}" + else: + url = f"{self.graph_api_url}/me/drive/items/{self.folder_item_id}/children?{select}" + response = self._api_request("GET", url) + response.raise_for_status() + + items = response.json().get("value", []) + files = [] + folders = [] + + for item in items: + # 跳过特殊文件 + if item.get("name", "").startswith("."): + continue + + item_path = f"{prefix}/{item.get('name')}" if prefix else item.get("name") + + if "folder" in item: + # 这是一个文件夹 + folders.append({"Prefix": f"{item_path}/"}) + else: + # 这是一个文件 + size = item.get("size", 0) + modified_time = item.get("lastModifiedDateTime", datetime.now().isoformat()) + + # 解析 ISO 格式时间 + try: + if isinstance(modified_time, str): + modified_time = datetime.fromisoformat(modified_time.replace("Z", "+00:00")) + except Exception: + modified_time = datetime.now() + + files.append( + { + "Key": item_path, + "Size": size, + "LastModified": modified_time, + "ETag": item.get("id", ""), + } + ) + + return { + "Contents": sorted(files, key=lambda x: x["Key"]), + "CommonPrefixes": sorted(folders, key=lambda x: x["Prefix"]), + } + except Exception as e: + import traceback + + traceback.print_exc() + raise RuntimeError(f"Failed to list OneDrive objects: {str(e)}") from None + + def get_object_info(self, key: str) -> Dict[str, Any]: + """ + 获取对象基本信息 + + Args: + key: 对象键名 + + Returns: + 对象元数据 + """ + try: + key = key.strip("/") + url = f"{self.graph_api_url}/me/drive/items/{self.folder_item_id}:/{key}:" + + response = self._api_request("GET", url) + response.raise_for_status() + + item = response.json() + return { + "Key": key, + "Size": item.get("size", 0), + "LastModified": item.get("lastModifiedDateTime", datetime.now().isoformat()), + "ETag": item.get("id", ""), + } + except Exception as e: + raise RuntimeError(f"Failed to get OneDrive object info: {str(e)}") from None + + def get_object(self, key: str) -> Dict[str, Any]: + """ + 获取对象内容 + + Args: + key: 对象键名 + + Returns: + 包含对象内容的字典 + """ + try: + key = key.strip("/") + + # 获取下载 URL + url = f"{self.graph_api_url}/me/drive/items/{self.folder_item_id}:/{key}:/content" + + response = self._api_request("GET", url) + response.raise_for_status() + + return { + "Body": response.content, + "ContentType": response.headers.get("Content-Type", "application/octet-stream"), + } + except Exception as e: + raise RuntimeError(f"Failed to get OneDrive object: {str(e)}") from None + + def generate_presigned_url(self, key: str, expires: int = None) -> str: + """ + 为指定对象生成预签名 URL + + Args: + key: 对象键名 + expires: 过期时间(秒) + + Returns: + 预签名 URL,失败返回 None + """ + try: + expires = expires or Config.PRESIGNED_URL_EXPIRES + key = key.strip("/") + + # OneDrive 会自动处理预签名 URL + url = f"{self.graph_api_url}/me/drive/items/{self.folder_item_id}:/{key}:/createLink" + + request_body = { + "type": "view", + "expirationDateTime": (datetime.utcnow() + timedelta(seconds=expires)).isoformat() + "Z", + } + + response = requests.post(url, headers=self._headers(), json=request_body) + + if response.status_code == 201: + return response.json().get("link", {}).get("webUrl") + + return None + except Exception: + return None + + def get_public_url(self, key: str) -> str: + """ + 生成对象的公共访问 URL + + Args: + key: 对象键名 + + Returns: + 公共 URL,未配置返回 None + """ + try: + key = key.strip("/") + # OneDrive 的共享 URL + url = f"{self.graph_api_url}/me/drive/items/{self.folder_item_id}:/{key}:/webUrl" + + response = requests.get(url, headers=self._headers()) + if response.status_code == 200: + return response.json().get("webUrl") + + return None + except Exception: + return None + + def upload_file(self, key: str, file_data: bytes, content_type: str = None) -> bool: + """ + 上传文件 + + Args: + key: 对象键名 + file_data: 文件内容 + content_type: 文件类型(MIME type,可选) + + Returns: + 上传成功返回 True,失败返回 False + """ + try: + key = key.strip("/") + url = f"{self.graph_api_url}/me/drive/items/{self.folder_item_id}:/{key}:/content" + + # 自定义头部(因为我们需要指定 Content-Type) + headers = { + "Authorization": f"Bearer {self.access_token}", + "Content-Type": content_type or "application/octet-stream", + } + + response = requests.put(url, headers=headers, data=file_data) + response.raise_for_status() + + return response.status_code == 200 or response.status_code == 201 + except Exception as e: + raise RuntimeError( + f"Failed to upload file to OneDrive: {str(e)}" if isinstance(e, Exception) else str(e) + ) from None + + def delete_file(self, key: str) -> bool: + """ + 删除文件 + + Args: + key: 对象键名 + + Returns: + 删除成功返回 True,失败返回 False + """ + try: + key = key.strip("/") + url = f"{self.graph_api_url}/me/drive/items/{self.folder_item_id}:/{key}:" + + response = self._api_request("DELETE", url) + return response.status_code == 204 + except Exception as e: + raise RuntimeError(f"Failed to delete file from OneDrive: {str(e)}") from None + + def copy_file(self, source_key: str, dest_key: str) -> bool: + """ + 复制文件 + + Args: + source_key: 源文件键名 + dest_key: 目标文件键名 + + Returns: + 复制成功返回 True,失败返回 False + """ + try: + source = source_key.strip("/") + destination = dest_key.strip("/") + + # 获取源文件 ID + source_url = f"{self.graph_api_url}/me/drive/items/{self.folder_item_id}:/{source}:" + source_response = self._api_request("GET", source_url) + source_response.raise_for_status() + source_item = source_response.json() + + # 复制文件 + copy_url = f"{self.graph_api_url}/me/drive/items/{source_item.get('id')}/copy" + copy_body = { + "parentReference": {"id": self.folder_item_id}, + "name": destination.split("/")[-1], + } + + copy_response = self._api_request("POST", copy_url, json=copy_body) + copy_response.raise_for_status() + + return copy_response.status_code == 202 or copy_response.status_code == 200 + except Exception as e: + raise RuntimeError(f"Failed to copy file in OneDrive: {str(e)}") from None + + def generate_thumbnail(self, file_path: str) -> bytes: + """ + 生成图片缩略图 + + Args: + file_path: 文件路径 + + Returns: + 缩略图字节数据 + """ + try: + file_path = file_path.strip("/") + + # 获取缩略图 URL + url = f"{self.graph_api_url}/me/drive/items/{self.folder_item_id}:/{file_path}:/thumbnails" + + response = requests.get(url, headers=self._headers()) + response.raise_for_status() + + thumbnails = response.json().get("value", []) + if thumbnails: + thumb_set = thumbnails[0] + # 获取中等大小的缩略图 + thumb_url = thumb_set.get("c", {}).get("url") or thumb_set.get("m", {}).get("url") + + if thumb_url: + thumb_response = requests.get(thumb_url) + if thumb_response.status_code == 200: + return thumb_response.content + + # 如果没有缩略图,尝试生成一个 + return self._generate_fallback_thumbnail(file_path) + except Exception: + return None + + def _generate_fallback_thumbnail(self, file_path: str) -> bytes: + """ + 生成备用缩略图(当 OneDrive 没有缩略图时) + + Args: + file_path: 文件路径 + + Returns: + 缩略图字节数据 + """ + try: + file_obj = self.get_object(file_path) + img = Image.open(BytesIO(file_obj["Body"])) + + # 调整大小 + img.thumbnail(Config.THUMB_SIZE, Image.Resampling.LANCZOS) + + # 保存为 PNG + thumb_buffer = BytesIO() + img.save(thumb_buffer, format="PNG") + return thumb_buffer.getvalue() + except Exception: + return None + + def rename_file(self, old_key: str, new_key: str) -> bool: + """ + 重命名文件 + + Args: + old_key: 旧的文件键名 + new_key: 新的文件键名 + + Returns: + 重命名成功返回 True,失败返回 False + """ + try: + old_key = old_key.strip("/") + new_key = new_key.strip("/") + + # 获取旧文件的 ID + url = f"{self.graph_api_url}/me/drive/items/{self.folder_item_id}:/{old_key}:" + response = self._api_request("GET", url) + response.raise_for_status() + item_id = response.json().get("id") + + # 更新文件名 + update_url = f"{self.graph_api_url}/me/drive/items/{item_id}" + update_body = {"name": new_key.split("/")[-1]} + + response = self._api_request("PATCH", update_url, json=update_body) + response.raise_for_status() + + return response.status_code == 200 + except Exception as e: + raise RuntimeError( + f"Failed to rename file in OneDrive: {str(e)}" if isinstance(e, Exception) else str(e) + ) from None + + def delete_folder(self, prefix: str) -> bool: + """ + 删除文件夹及其中的所有文件 + + Args: + prefix: 文件夹前缀 + + Returns: + 删除成功返回 True,失败返回 False + """ + try: + prefix = prefix.rstrip("/") + + # 获取文件夹中的所有项目 + items = self.list_objects(prefix) + + # 删除所有文件 + for file in items.get("Contents", []): + self.delete_file(file["Key"]) + + # 删除所有子文件夹 + for folder in items.get("CommonPrefixes", []): + self.delete_folder(folder["Prefix"]) + + # 删除文件夹本身 + url = f"{self.graph_api_url}/me/drive/items/{self.folder_item_id}:/{prefix}:" + response = self._api_request("DELETE", url) + + return response.status_code == 204 + except Exception as e: + raise RuntimeError( + f"Failed to delete folder from OneDrive: {str(e)}" if isinstance(e, Exception) else str(e) + ) from None + + def rename_folder(self, old_prefix: str, new_prefix: str) -> bool: + """ + 重命名文件夹 + + Args: + old_prefix: 旧的文件夹前缀 + new_prefix: 新的文件夹前缀 + + Returns: + 重命名成功返回 True,失败返回 False + """ + try: + old_prefix = old_prefix.rstrip("/") + new_prefix = new_prefix.rstrip("/") + + # 获取旧文件夹的 ID + url = f"{self.graph_api_url}/me/drive/items/{self.folder_item_id}:/{old_prefix}:" + response = self._api_request("GET", url) + response.raise_for_status() + item_id = response.json().get("id") + + # 更新文件夹名 + update_url = f"{self.graph_api_url}/me/drive/items/{item_id}" + update_body = {"name": new_prefix.split("/")[-1]} + + response = self._api_request("PATCH", update_url, json=update_body) + response.raise_for_status() + + return response.status_code == 200 + except Exception as e: + raise RuntimeError( + f"Failed to rename folder in OneDrive: {str(e)}" if isinstance(e, Exception) else str(e) + ) from None + + def copy_folder(self, source_prefix: str, dest_prefix: str) -> bool: + """ + 复制文件夹及其中的所有文件 + + Args: + source_prefix: 源文件夹前缀 + dest_prefix: 目标文件夹前缀 + + Returns: + 复制成功返回 True,失败返回 False + """ + try: + source_prefix = source_prefix.rstrip("/") + dest_prefix = dest_prefix.rstrip("/") + + # 创建目标文件夹 + self.create_folder(dest_prefix + "/") + + # 获取源文件夹中的所有项目 + items = self.list_objects(source_prefix) + + # 复制所有文件 + for file in items.get("Contents", []): + source_key = file["Key"] + # 保持相对路径 + relative_path = source_key[len(source_prefix) + 1 :] + dest_key = f"{dest_prefix}/{relative_path}" + self.copy_file(source_key, dest_key) + + # 递归复制子文件夹 + for folder in items.get("CommonPrefixes", []): + source_folder = folder["Prefix"].rstrip("/") + relative_folder = source_folder[len(source_prefix) + 1 :] + dest_folder = f"{dest_prefix}/{relative_folder}" + self.copy_folder(source_folder, dest_folder) + + return True + except Exception as e: + raise RuntimeError( + f"Failed to copy folder in OneDrive: {str(e)}" if isinstance(e, Exception) else str(e) + ) from None + + def create_folder(self, key: str) -> bool: + """ + 创建文件夹 + + Args: + key: 文件夹路径(以 / 结尾) + + Returns: + 创建成功返回 True,失败返回 False + """ + try: + key = key.rstrip("/") + + # 获取父文件夹 ID + parts = key.split("/") + parent_id = self.folder_item_id + + # 创建每一层文件夹 + for _, folder_name in enumerate(parts): + # 检查文件夹是否已存在 + existing_url = f"{self.graph_api_url}/me/drive/items/{parent_id}:/{folder_name}:" + existing_response = self._api_request("GET", existing_url) + + if existing_response.status_code == 200: + parent_id = existing_response.json().get("id") + continue + + # 创建新文件夹 + create_url = f"{self.graph_api_url}/me/drive/items/{parent_id}/children" + create_body = {"name": folder_name, "folder": {}} + + create_response = self._api_request("POST", create_url, json=create_body) + create_response.raise_for_status() + + parent_id = create_response.json().get("id") + + return True + except Exception as e: + raise RuntimeError( + f"Failed to create folder in OneDrive: {str(e)}" if isinstance(e, Exception) else str(e) + ) from None