Compare commits

...

3 Commits

10 changed files with 418 additions and 207 deletions

View File

@@ -9,43 +9,38 @@
STORAGE_TYPE=r2 STORAGE_TYPE=r2
# ==================== Cloudflare R2 配置 ==================== # ==================== Cloudflare R2 配置 ====================
# R2 账户 ID
R2_ACCOUNT_ID=your-account-id
# R2 访问凭证 # R2 访问凭证
ACCESS_KEY_ID=YOUR_R2_ACCESS_KEY_ID R2_ACCESS_KEY_ID=YOUR_R2_ACCESS_KEY_ID
SECRET_ACCESS_KEY=YOUR_R2_SECRET_ACCESS_KEY R2_SECRET_ACCESS_KEY=YOUR_R2_SECRET_ACCESS_KEY
# R2 存储桶名称 # R2 存储桶名称
R2_BUCKET_NAME=drive R2_BUCKET_NAME=drive
# R2 Endpoint URL # R2 公共访问域名 (可选,例如: https://pub-<bucket-name>.r2.dev)
R2_ENDPOINT_URL=https://<your-account-id>.r2.cloudflarestorage.com R2_PUBLIC_DOMAIN=https://pub-<bucket-name>.r2.dev
# R2 区域 (默认: auto)
R2_REGION=auto
# R2 公共访问 URL (可选,例如: https://pub-<bucket-name>.r2.dev)
R2_PUBLIC_URL=https://pub-<bucket-name>.r2.dev
# R2 预签名 URL 过期时间(秒,默认: 3600
R2_PRESIGN_EXPIRES=3600
# ==================== GitHub 存储配置 ==================== # ==================== GitHub 存储配置 ====================
# GitHub 仓库所有者 # GitHub 仓库 (格式: owner/repo)
GITHUB_REPO_OWNER=your-username GITHUB_REPO=your-username/your-repo
# GitHub 仓库名称
GITHUB_REPO_NAME=your-repo
# GitHub 访问令牌 (需要 repo 权限) # GitHub 访问令牌 (需要 repo 权限)
GITHUB_ACCESS_TOKEN=your-access-token GITHUB_TOKEN=your-access-token
# GitHub 分支名称 (默认: main) # GitHub 分支名称 (默认: main)
GITHUB_BRANCH=main GITHUB_BRANCH=main
# GitHub Raw 文件反向代理 URL (可选,用于加速访问)
# 例如: https://raw.fastgit.org 或 https://ghproxy.com/https://raw.githubusercontent.com
# 留空则使用官方 raw.githubusercontent.com
GITHUB_RAW_PROXY_URL=
# ==================== 应用配置 ==================== # ==================== 应用配置 ====================
# 服务器配置
HOST=0.0.0.0
PORT=5000
DEBUG=false
# 缩略图缓存时间(秒,默认: 3600 # 缩略图缓存时间(秒,默认: 3600
THUMB_TTL_SECONDS=3600 THUMB_TTL_SECONDS=3600
# 预签名 URL 过期时间(秒,默认: 3600
PRESIGNED_URL_EXPIRES=3600
THUMB_TTL_SECONDS=3600

View File

@@ -96,20 +96,18 @@ python app.py
```env ```env
STORAGE_TYPE=r2 STORAGE_TYPE=r2
# R2 账户 ID
R2_ACCOUNT_ID=your-account-id
# R2 访问凭证 # R2 访问凭证
ACCESS_KEY_ID=your_access_key_id R2_ACCESS_KEY_ID=your_access_key_id
SECRET_ACCESS_KEY=your_secret_access_key R2_SECRET_ACCESS_KEY=your_secret_access_key
# R2 存储桶配置 # R2 存储桶配置
R2_BUCKET_NAME=your_bucket_name R2_BUCKET_NAME=your_bucket_name
R2_ENDPOINT_URL=https://your-account-id.r2.cloudflarestorage.com
R2_REGION=auto
# 可选:公共访问 URL # 可选:公共访问域名
R2_PUBLIC_URL=https://pub-your-bucket.r2.dev R2_PUBLIC_DOMAIN=https://pub-your-bucket.r2.dev
# 可选:预签名 URL 过期时间(秒)
R2_PRESIGN_EXPIRES=3600
``` ```
### GitHub Repository 配置 ### GitHub Repository 配置
@@ -117,26 +115,15 @@ R2_PRESIGN_EXPIRES=3600
```env ```env
STORAGE_TYPE=github STORAGE_TYPE=github
# GitHub 仓库所有者(用户名或组织) # GitHub 仓库 (格式: owner/repo)
GITHUB_REPO_OWNER=your-username GITHUB_REPO=your-username/your-repo
# GitHub 仓库名称
GITHUB_REPO_NAME=your-repo
# GitHub 个人访问令牌(需要 repo 权限) # GitHub 个人访问令牌(需要 repo 权限)
# 获取方式https://github.com/settings/tokens # 获取方式https://github.com/settings/tokens
GITHUB_ACCESS_TOKEN=ghp_your_token_here GITHUB_TOKEN=ghp_your_token_here
# GitHub 分支名称(可选,默认: main # GitHub 分支名称(可选,默认: main
GITHUB_BRANCH=main GITHUB_BRANCH=main
# GitHub Raw 文件反向代理 URL可选用于加速访问
# 常用反向代理:
# - https://raw.fastgit.org (推荐,速度快)
# - https://ghproxy.com
# - https://raw.kgithub.com
# 留空则使用官方 raw.githubusercontent.com国内可能较慢
GITHUB_RAW_PROXY_URL=https://raw.fastgit.org
``` ```
## 项目结构 ## 项目结构
@@ -144,6 +131,8 @@ GITHUB_RAW_PROXY_URL=https://raw.fastgit.org
```bash ```bash
cloud-index/ cloud-index/
├── app.py # Flask 应用主入口 ├── app.py # Flask 应用主入口
├── config.py # 统一配置管理
├── utils.py # 工具函数模块
├── handlers/ ├── handlers/
│ └── routes.py # 路由处理器 │ └── routes.py # 路由处理器
├── storages/ # 存储后端实现 ├── storages/ # 存储后端实现
@@ -153,11 +142,16 @@ cloud-index/
│ ├── r2.py # Cloudflare R2 实现 │ ├── r2.py # Cloudflare R2 实现
│ └── github.py # GitHub Repository 实现 │ └── github.py # GitHub Repository 实现
├── templates/ # HTML 模板 ├── templates/ # HTML 模板
│ ├── base.html
│ ├── index.html │ ├── index.html
│ └── footer.html │ └── footer.html
├── static/ # 静态资源 ├── static/ # 静态资源
── thumbs/ ── css/
│ │ └── main.css
│ └── js/
│ └── main.js
├── .env.example # 环境变量示例 ├── .env.example # 环境变量示例
├── pyproject.toml # 项目配置和依赖
└── requirements.txt # Python 依赖 └── requirements.txt # Python 依赖
``` ```

76
app.py
View File

@@ -1,14 +1,15 @@
import os
import tomllib import tomllib
from pathlib import Path from pathlib import Path
import dotenv
from flask import Flask from flask import Flask
import utils
from config import Config
from handlers.routes import main_route from handlers.routes import main_route
from storages.factory import StorageFactory from storages.factory import StorageFactory
dotenv.load_dotenv() # 验证配置
Config.validate()
# 从 pyproject.toml 读取版本号 # 从 pyproject.toml 读取版本号
@@ -34,73 +35,18 @@ app.register_blueprint(main_route)
# 初始化存储(使用工厂模式) # 初始化存储(使用工厂模式)
storage = StorageFactory.get_storage() storage = StorageFactory.get_storage()
# 缩略图默认 TTL可通过环境变量覆盖
THUMB_TTL = int(os.environ.get("THUMB_TTL_SECONDS", "3600"))
# 注册模板过滤器
# 注册一个安全的 filesizeformat 过滤器,处理 None 和非数字值
@app.template_filter("filesizeformat") @app.template_filter("filesizeformat")
def filesizeformat_filter(value): def filesizeformat_filter(value):
try: """格式化文件大小"""
if value is None: return utils.format_file_size(value)
return "-"
num = float(value) # 使用 float 而不是 int 以保持精度
except Exception:
return "-"
for unit in ["B", "KB", "MB", "GB", "TB"]:
if num < 1024:
# 对于字节,显示整数
if unit == "B":
return f"{int(num)}{unit}"
# 其他单位保留两位小数
return f"{num:.2f}{unit}"
num = num / 1024.0
return f"{num:.2f}PB"
# 注册一个文件图标过滤器
@app.template_filter("fileicon") @app.template_filter("fileicon")
def fileicon_filter(filename): def fileicon_filter(filename):
if not filename: """获取文件图标"""
return "fas fa-file" return utils.get_file_icon(filename)
ext = filename.lower().split(".")[-1] if "." in filename else ""
# 图片文件
if ext in ["jpg", "jpeg", "png", "gif", "bmp", "webp", "svg"]:
return "fas fa-image"
# 音频文件
if ext in ["mp3", "wav", "ogg", "flac", "m4a", "aac"]:
return "fas fa-music"
# 视频文件
if ext in ["mp4", "webm", "avi", "mov", "wmv", "flv", "mkv"]:
return "fas fa-video"
# 文档文件
if ext in ["pdf", "doc", "docx", "txt", "md", "rtf"]:
return "fas fa-file-alt"
# 压缩文件
if ext in ["zip", "rar", "7z", "tar", "gz"]:
return "fas fa-file-archive"
# 代码文件
if ext in ["py", "js", "html", "css", "java", "cpp", "c", "php"]:
return "fas fa-file-code"
# 表格文件
if ext in ["xls", "xlsx", "csv"]:
return "fas fa-file-excel"
# 演示文件
if ext in ["ppt", "pptx"]:
return "fas fa-file-powerpoint"
# 默认文件图标
return "fas fa-file"
def get_public_url(key: str) -> str: def get_public_url(key: str) -> str:
@@ -135,6 +81,4 @@ def inject_version():
if __name__ == "__main__": if __name__ == "__main__":
port = int(os.environ.get("PORT", 5000)) app.run(host=Config.HOST, port=Config.PORT, debug=Config.DEBUG)
host = os.environ.get("HOST", "0.0.0.0")
app.run(host=host, port=port, debug=True)

83
config.py Normal file
View File

@@ -0,0 +1,83 @@
"""
配置管理模块
集中管理所有环境变量和应用配置
"""
import os
from typing import Optional
import dotenv
# 加载环境变量
dotenv.load_dotenv()
class Config:
"""应用配置类"""
# 存储配置
STORAGE_TYPE: str = os.getenv("STORAGE_TYPE", "").lower()
# R2 配置
R2_ACCOUNT_ID: Optional[str] = os.getenv("R2_ACCOUNT_ID")
R2_ACCESS_KEY_ID: Optional[str] = os.getenv("R2_ACCESS_KEY_ID")
R2_SECRET_ACCESS_KEY: Optional[str] = os.getenv("R2_SECRET_ACCESS_KEY")
R2_BUCKET_NAME: Optional[str] = os.getenv("R2_BUCKET_NAME")
R2_PUBLIC_DOMAIN: Optional[str] = os.getenv("R2_PUBLIC_DOMAIN")
# GitHub 配置
GITHUB_TOKEN: Optional[str] = os.getenv("GITHUB_TOKEN")
GITHUB_REPO: Optional[str] = os.getenv("GITHUB_REPO") # 格式: owner/repo
GITHUB_BRANCH: str = os.getenv("GITHUB_BRANCH", "main")
# 应用配置
HOST: str = os.getenv("HOST", "0.0.0.0")
PORT: int = int(os.getenv("PORT", "5000"))
DEBUG: bool = os.getenv("DEBUG", "false").lower() == "true"
# 缩略图配置
THUMB_TTL_SECONDS: int = int(os.getenv("THUMB_TTL_SECONDS", "3600"))
THUMB_SIZE: tuple[int, int] = (300, 300) # 缩略图尺寸
# URL过期时间配置
PRESIGNED_URL_EXPIRES: int = int(os.getenv("PRESIGNED_URL_EXPIRES", "3600"))
@classmethod
def validate(cls) -> None:
"""验证必需的配置项是否已设置"""
if not cls.STORAGE_TYPE:
raise ValueError("STORAGE_TYPE environment variable is not set. Supported types: r2, github")
if cls.STORAGE_TYPE == "r2":
required = ["R2_ACCOUNT_ID", "R2_ACCESS_KEY_ID", "R2_SECRET_ACCESS_KEY", "R2_BUCKET_NAME"]
missing = [key for key in required if not getattr(cls, key)]
if missing:
raise ValueError(f"Missing required R2 configuration: {', '.join(missing)}")
elif cls.STORAGE_TYPE == "github":
required = ["GITHUB_TOKEN", "GITHUB_REPO"]
missing = [key for key in required if not getattr(cls, key)]
if missing:
raise ValueError(f"Missing required GitHub configuration: {', '.join(missing)}")
elif cls.STORAGE_TYPE not in ["r2", "github"]:
raise ValueError(f"Unsupported storage type: {cls.STORAGE_TYPE}. Supported types: r2, github")
@classmethod
def get_storage_config(cls) -> dict:
"""获取当前存储类型的配置字典"""
if cls.STORAGE_TYPE == "r2":
return {
"account_id": cls.R2_ACCOUNT_ID,
"access_key_id": cls.R2_ACCESS_KEY_ID,
"secret_access_key": cls.R2_SECRET_ACCESS_KEY,
"bucket_name": cls.R2_BUCKET_NAME,
"public_domain": cls.R2_PUBLIC_DOMAIN,
}
elif cls.STORAGE_TYPE == "github":
return {
"token": cls.GITHUB_TOKEN,
"repo": cls.GITHUB_REPO,
"branch": cls.GITHUB_BRANCH,
}
return {}

View File

@@ -1,33 +1,24 @@
import hashlib import hashlib
import os
from datetime import datetime from datetime import datetime
from typing import Any, Dict, List from typing import Any, Dict, List
from flask import Blueprint, Response, abort, jsonify, redirect, render_template, request from flask import Blueprint, Response, abort, jsonify, redirect, render_template, request
from config import Config
from storages.factory import StorageFactory from storages.factory import StorageFactory
main_route = Blueprint("main", __name__) main_route = Blueprint("main", __name__)
# 初始化存储(使用工厂模式) # 延迟初始化存储实例
storage = StorageFactory.get_storage() _storage = None
# 缩略图默认 TTL可通过环境变量覆盖
THUMB_TTL = int(os.environ.get("THUMB_TTL_SECONDS", "3600"))
def format_timestamp(timestamp) -> str: def get_storage():
""" """获取存储实例(延迟初始化)"""
格式化时间戳为人类可读的格式 global _storage
""" if _storage is None:
return storage.format_timestamp(timestamp) _storage = StorageFactory.get_storage()
return _storage
def get_public_url(key: str) -> str:
"""
生成对象的公共访问 URL
"""
return storage.get_public_url(key)
def get_file_url(key: str) -> str: def get_file_url(key: str) -> str:
@@ -37,6 +28,7 @@ def get_file_url(key: str) -> str:
def build_file_entry(obj: Dict[str, Any], prefix: str) -> Dict[str, Any] | None: def build_file_entry(obj: Dict[str, Any], prefix: str) -> Dict[str, Any] | None:
"""根据对象信息构建文件条目。""" """根据对象信息构建文件条目。"""
storage = get_storage()
key = obj.get("Key", "") key = obj.get("Key", "")
if not key: if not key:
return None return None
@@ -53,12 +45,12 @@ def build_file_entry(obj: Dict[str, Any], prefix: str) -> Dict[str, Any] | None:
"name": rel_name, "name": rel_name,
"key": key, "key": key,
"size": obj.get("Size"), "size": obj.get("Size"),
"last_modified": format_timestamp(obj.get("LastModified")), "last_modified": storage.format_timestamp(obj.get("LastModified")),
"is_dir": False, "is_dir": False,
"file_url": get_file_url(key), "file_url": get_file_url(key),
} }
public_url = get_public_url(key) public_url = storage.get_public_url(key)
if public_url: if public_url:
entry["public_url"] = public_url entry["public_url"] = public_url
@@ -115,6 +107,7 @@ def index():
返回文件和目录列表的 HTML 页面。 返回文件和目录列表的 HTML 页面。
""" """
try: try:
storage = get_storage()
prefix = request.args.get("prefix", "") or "" prefix = request.args.get("prefix", "") or ""
response = storage.list_objects(prefix) response = storage.list_objects(prefix)
@@ -136,6 +129,7 @@ def index():
def browse(prefix_path): def browse(prefix_path):
"""目录路由。将 URL /a/b 映射为 prefix 'a/b/' 并重用 index 的逻辑。""" """目录路由。将 URL /a/b 映射为 prefix 'a/b/' 并重用 index 的逻辑。"""
try: try:
storage = get_storage()
prefix = prefix_path or "" prefix = prefix_path or ""
if prefix and not prefix.endswith("/"): if prefix and not prefix.endswith("/"):
prefix = prefix + "/" prefix = prefix + "/"
@@ -159,6 +153,7 @@ def browse(prefix_path):
def serve_file(file_path): def serve_file(file_path):
"""重定向到原始存储 URL节省服务器资源""" """重定向到原始存储 URL节省服务器资源"""
try: try:
storage = get_storage()
# 验证文件存在 # 验证文件存在
try: try:
storage.get_object_info(file_path) storage.get_object_info(file_path)
@@ -186,6 +181,7 @@ def serve_file(file_path):
def download_file(file_path): def download_file(file_path):
"""下载文件,支持所有存储类型""" """下载文件,支持所有存储类型"""
try: try:
storage = get_storage()
# 验证文件存在 # 验证文件存在
try: try:
storage.get_object_info(file_path) storage.get_object_info(file_path)
@@ -218,9 +214,10 @@ def download_file(file_path):
@main_route.route("/thumb/<path:file_path>") @main_route.route("/thumb/<path:file_path>")
def thumb(file_path): def thumb(file_path):
"""返回图片的缩略图,使用 Vercel Cache Headers 避免重复从 R2 拉取""" """返回图片的缩略图,使用 Vercel Cache Headers 避免重复从 R2 拉取"""
storage = get_storage()
# 设置更长的缓存控制头以支持浏览器本地缓存 # 设置更长的缓存控制头以支持浏览器本地缓存
cache_headers = { cache_headers = {
"Cache-Control": f"public, max-age={THUMB_TTL}", "Cache-Control": f"public, max-age={Config.THUMB_TTL_SECONDS}",
"ETag": f'W/"{hashlib.md5(file_path.encode("utf-8")).hexdigest()}"', "ETag": f'W/"{hashlib.md5(file_path.encode("utf-8")).hexdigest()}"',
} }
@@ -257,6 +254,7 @@ def thumb(file_path):
def upload(): def upload():
"""上传文件到存储""" """上传文件到存储"""
try: try:
storage = get_storage()
# 检查是否有文件 # 检查是否有文件
if "file" not in request.files: if "file" not in request.files:
return jsonify({"success": False, "error": "No file provided"}), 400 return jsonify({"success": False, "error": "No file provided"}), 400
@@ -303,6 +301,7 @@ def upload():
def delete(file_path): def delete(file_path):
"""删除存储中的文件""" """删除存储中的文件"""
try: try:
storage = get_storage()
# 删除文件 # 删除文件
success = storage.delete_file(file_path) success = storage.delete_file(file_path)
@@ -319,6 +318,7 @@ def delete(file_path):
def rename(old_key): def rename(old_key):
"""重命名存储中的文件""" """重命名存储中的文件"""
try: try:
storage = get_storage()
data = request.get_json() data = request.get_json()
new_name = data.get("newName") new_name = data.get("newName")
@@ -326,9 +326,9 @@ def rename(old_key):
return jsonify({"success": False, "error": "New name not provided"}), 400 return jsonify({"success": False, "error": "New name not provided"}), 400
# 构建新的文件路径 # 构建新的文件路径
prefix = os.path.dirname(old_key) old_key_parts = old_key.rsplit("/", 1)
if prefix: if len(old_key_parts) > 1:
new_key = f"{prefix}/{new_name}" new_key = f"{old_key_parts[0]}/{new_name}"
else: else:
new_key = new_name new_key = new_name
@@ -354,6 +354,7 @@ def rename(old_key):
def delete_folder_route(prefix): def delete_folder_route(prefix):
"""删除存储中的文件夹""" """删除存储中的文件夹"""
try: try:
storage = get_storage()
if not prefix.endswith("/"): if not prefix.endswith("/"):
prefix += "/" prefix += "/"
success = storage.delete_folder(prefix) success = storage.delete_folder(prefix)
@@ -369,6 +370,7 @@ def delete_folder_route(prefix):
def rename_folder_route(old_prefix): def rename_folder_route(old_prefix):
"""重命名存储中的文件夹""" """重命名存储中的文件夹"""
try: try:
storage = get_storage()
data = request.get_json() data = request.get_json()
new_name = data.get("newName") new_name = data.get("newName")
@@ -379,9 +381,9 @@ def rename_folder_route(old_prefix):
old_prefix += "/" old_prefix += "/"
# 构建新的文件夹路径 # 构建新的文件夹路径
parent_prefix = os.path.dirname(os.path.dirname(old_prefix)) prefix_parts = old_prefix.rstrip("/").rsplit("/", 1)
if parent_prefix: if len(prefix_parts) > 1:
new_prefix = f"{parent_prefix}/{new_name}/" new_prefix = f"{prefix_parts[0]}/{new_name}/"
else: else:
new_prefix = f"{new_name}/" new_prefix = f"{new_name}/"
@@ -405,6 +407,7 @@ def rename_folder_route(old_prefix):
def copy_item(): def copy_item():
"""复制文件或文件夹""" """复制文件或文件夹"""
try: try:
storage = get_storage()
data = request.get_json() data = request.get_json()
source = data.get("source") source = data.get("source")
destination = data.get("destination") destination = data.get("destination")
@@ -437,6 +440,7 @@ def copy_item():
def move_item(): def move_item():
"""移动文件或文件夹""" """移动文件或文件夹"""
try: try:
storage = get_storage()
data = request.get_json() data = request.get_json()
source = data.get("source") source = data.get("source")
destination = data.get("destination") destination = data.get("destination")
@@ -469,6 +473,7 @@ def move_item():
def create_folder_route(): def create_folder_route():
"""创建文件夹""" """创建文件夹"""
try: try:
storage = get_storage()
data = request.get_json() data = request.get_json()
path = data.get("path") path = data.get("path")

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "Cloud-Index" name = "Cloud-Index"
version = "0.9.0" version = "0.10.1"
description = "A cloud storage index system based on a number of cloud storage platforms" description = "A cloud storage index system based on a number of cloud storage platforms"
authors = [{ name = "RhenCloud", email = "i@rhen.cloud" }] authors = [{ name = "RhenCloud", email = "i@rhen.cloud" }]
readme = "README.md" readme = "README.md"

View File

@@ -1,14 +1,11 @@
import os
from typing import Optional from typing import Optional
import dotenv from config import Config
from .base import BaseStorage from .base import BaseStorage
from .github import GitHubStorage from .github import GitHubStorage
from .r2 import R2Storage from .r2 import R2Storage
dotenv.load_dotenv()
class StorageFactory: class StorageFactory:
"""存储工厂类,根据配置创建对应的存储实例""" """存储工厂类,根据配置创建对应的存储实例"""
@@ -29,24 +26,19 @@ class StorageFactory:
if cls._instance is not None: if cls._instance is not None:
return cls._instance return cls._instance
storage_type = os.getenv("STORAGE_TYPE") storage_type = Config.STORAGE_TYPE
if not storage_type: if not storage_type:
raise RuntimeError( raise RuntimeError("STORAGE_TYPE environment variable is not set. Supported types: r2, github")
"STORAGE_TYPE environment variable is not set. "
"Supported types: r2, github"
)
storage_type = storage_type.lower()
if storage_type == "r2": if storage_type == "r2":
cls._instance = R2Storage() cls._instance = R2Storage()
elif storage_type == "github": elif storage_type == "github":
cls._instance = GitHubStorage() cls._instance = GitHubStorage()
else: else:
raise RuntimeError( raise RuntimeError(f"Unsupported storage type: {storage_type}. Supported types: r2, github")
f"Unsupported storage type: {storage_type}. Supported types: r2, github"
) return cls._instance
return cls._instance return cls._instance

View File

@@ -1,16 +1,14 @@
import base64 import base64
import os
from datetime import datetime from datetime import datetime
from io import BytesIO from io import BytesIO
from typing import Any, Dict from typing import Any, Dict
import dotenv
import requests import requests
from PIL import Image from PIL import Image
from .base import BaseStorage from config import Config
dotenv.load_dotenv() from .base import BaseStorage
class StreamWrapper: class StreamWrapper:
@@ -50,29 +48,30 @@ class GitHubStorage(BaseStorage):
"""基于 GitHub 仓库的存储实现""" """基于 GitHub 仓库的存储实现"""
def __init__(self): def __init__(self):
self.repo_owner = os.getenv("GITHUB_REPO_OWNER") """初始化 GitHub 存储客户端"""
self.repo_name = os.getenv("GITHUB_REPO_NAME") self.token = Config.GITHUB_TOKEN
self.access_token = os.getenv("GITHUB_ACCESS_TOKEN") repo_full = Config.GITHUB_REPO # 格式: owner/repo
self.branch = os.getenv("GITHUB_BRANCH", "main") self.branch = Config.GITHUB_BRANCH
# 反向代理 URL用于加速 GitHub raw 文件访问
# 例如https://raw. githubusercontent.com/ 的反向代理 URL
self.raw_proxy_url = os.getenv("GITHUB_RAW_PROXY_URL", "").rstrip("/")
if not all([self.repo_owner, self.repo_name, self.access_token]): if not self.token or not repo_full:
raise RuntimeError("GITHUB_REPO_OWNER, GITHUB_REPO_NAME, GITHUB_ACCESS_TOKEN must be set") raise RuntimeError("GITHUB_TOKEN and GITHUB_REPO must be set")
# 解析 owner/repo
repo_parts = repo_full.split("/")
if len(repo_parts) != 2:
raise RuntimeError(f"GITHUB_REPO must be in format 'owner/repo', got: {repo_full}")
self.repo_owner = repo_parts[0]
self.repo_name = repo_parts[1]
self.repo = repo_full
self.api_base_url = f"https://api.github.com/repos/{self.repo_owner}/{self.repo_name}" self.api_base_url = f"https://api.github.com/repos/{self.repo_owner}/{self.repo_name}"
# 如果配置了代理 URL则使用代理 URL否则使用官方 raw.githubusercontent.com
if self.raw_proxy_url:
self.raw_content_url = f"{self.raw_proxy_url}/https://raw.githubusercontent.com/{self.repo_owner}/{self.repo_name}/{self.branch}"
else:
self.raw_content_url = f"https://raw.githubusercontent.com/{self.repo_owner}/{self.repo_name}/{self.branch}" self.raw_content_url = f"https://raw.githubusercontent.com/{self.repo_owner}/{self.repo_name}/{self.branch}"
def _headers(self) -> Dict[str, str]: def _headers(self) -> Dict[str, str]:
"""返回 API 请求的公共头部信息""" """返回 API 请求的公共头部信息"""
return { return {
"Authorization": f"token {self.access_token}", "Authorization": f"token {self.token}",
"Accept": "application/vnd.github.v3+json", "Accept": "application/vnd.github.v3+json",
} }

View File

@@ -3,31 +3,34 @@ from io import BytesIO
from typing import Any, Dict from typing import Any, Dict
import boto3 import boto3
import dotenv from botocore.config import Config as BotocoreConfig
from botocore.config import Config
from PIL import Image from PIL import Image
from config import Config
from .base import BaseStorage from .base import BaseStorage
dotenv.load_dotenv()
class R2Storage(BaseStorage): class R2Storage(BaseStorage):
def __init__(self): """Cloudflare R2 存储后端实现"""
self.endpoint = os.getenv("R2_ENDPOINT_URL")
if not self.endpoint:
raise RuntimeError("R2_ENDPOINT_URL environment variable is not set")
self.access_key = os.getenv("ACCESS_KEY_ID") or os.getenv("ACCESS_KEY_ID") def __init__(self):
self.secret_key = os.getenv("SECRET_ACCESS_KEY") or os.getenv( """初始化 R2 存储客户端"""
"SECRET_ACCESS_KEY" # 从统一配置中读取
) account_id = Config.R2_ACCOUNT_ID
if not account_id:
raise RuntimeError("R2_ACCOUNT_ID environment variable is not set")
self.endpoint = f"https://{account_id}.r2.cloudflarestorage.com"
self.access_key = Config.R2_ACCESS_KEY_ID
self.secret_key = Config.R2_SECRET_ACCESS_KEY
if not self.access_key or not self.secret_key: if not self.access_key or not self.secret_key:
raise RuntimeError("ACCESS_KEY_ID and SECRET_ACCESS_KEY must be set") raise RuntimeError("R2_ACCESS_KEY_ID and R2_SECRET_ACCESS_KEY must be set")
self.region_name = os.getenv("R2_REGION", "auto") self.region_name = "auto"
self.bucket_name = os.getenv("R2_BUCKET_NAME") self.bucket_name = Config.R2_BUCKET_NAME
self.public_domain = Config.R2_PUBLIC_DOMAIN
def get_s3_client(self): def get_s3_client(self):
""" """
@@ -38,7 +41,7 @@ class R2Storage(BaseStorage):
endpoint_url=self.endpoint, endpoint_url=self.endpoint,
aws_access_key_id=self.access_key, aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key, aws_secret_access_key=self.secret_key,
config=Config(signature_version="s3v4"), config=BotocoreConfig(signature_version="s3v4"),
region_name=self.region_name, region_name=self.region_name,
) )
@@ -95,10 +98,9 @@ class R2Storage(BaseStorage):
""" """
生成对象的公共访问 URL 生成对象的公共访问 URL
""" """
base_url = os.getenv("R2_PUBLIC_URL") if not self.public_domain:
if not base_url:
return None return None
return f"{base_url.rstrip('/')}/{key}" return f"{self.public_domain.rstrip('/')}/{key}"
def generate_thumbnail(self, file_path: str) -> bytes: def generate_thumbnail(self, file_path: str) -> bytes:
""" """
@@ -162,9 +164,7 @@ class R2Storage(BaseStorage):
# 复制对象到新路径 # 复制对象到新路径
copy_source = {"Bucket": self.bucket_name, "Key": old_key} copy_source = {"Bucket": self.bucket_name, "Key": old_key}
s3_client.copy_object( s3_client.copy_object(CopySource=copy_source, Bucket=self.bucket_name, Key=new_key)
CopySource=copy_source, Bucket=self.bucket_name, Key=new_key
)
# 删除原对象 # 删除原对象
s3_client.delete_object(Bucket=self.bucket_name, Key=old_key) s3_client.delete_object(Bucket=self.bucket_name, Key=old_key)
@@ -195,9 +195,7 @@ class R2Storage(BaseStorage):
# 分批次删除S3/R2 一次最多删除 1000 个 # 分批次删除S3/R2 一次最多删除 1000 个
for i in range(0, len(objects_to_delete), 1000): for i in range(0, len(objects_to_delete), 1000):
chunk = objects_to_delete[i : i + 1000] chunk = objects_to_delete[i : i + 1000]
s3_client.delete_objects( s3_client.delete_objects(Bucket=self.bucket_name, Delete={"Objects": chunk})
Bucket=self.bucket_name, Delete={"Objects": chunk}
)
return True return True
except Exception as e: except Exception as e:
@@ -225,9 +223,7 @@ class R2Storage(BaseStorage):
for old_key in objects_to_rename: for old_key in objects_to_rename:
new_key = old_key.replace(old_prefix, new_prefix, 1) new_key = old_key.replace(old_prefix, new_prefix, 1)
copy_source = {"Bucket": self.bucket_name, "Key": old_key} copy_source = {"Bucket": self.bucket_name, "Key": old_key}
s3_client.copy_object( s3_client.copy_object(CopySource=copy_source, Bucket=self.bucket_name, Key=new_key)
CopySource=copy_source, Bucket=self.bucket_name, Key=new_key
)
# 删除旧文件夹下的所有对象 # 删除旧文件夹下的所有对象
self.delete_folder(old_prefix) self.delete_folder(old_prefix)
@@ -244,9 +240,7 @@ class R2Storage(BaseStorage):
try: try:
s3_client = self.get_s3_client() s3_client = self.get_s3_client()
copy_source = {"Bucket": self.bucket_name, "Key": source_key} copy_source = {"Bucket": self.bucket_name, "Key": source_key}
s3_client.copy_object( s3_client.copy_object(CopySource=copy_source, Bucket=self.bucket_name, Key=dest_key)
CopySource=copy_source, Bucket=self.bucket_name, Key=dest_key
)
return True return True
except Exception as e: except Exception as e:
print(f"File copy failed: {str(e)}") print(f"File copy failed: {str(e)}")
@@ -322,3 +316,48 @@ class R2Storage(BaseStorage):
} }
return content_types.get(ext, "application/octet-stream") return content_types.get(ext, "application/octet-stream")
def generate_download_response(self, key: str) -> Dict[str, Any]:
"""
生成文件下载响应R2 实现)
Args:
key: 对象键名(文件路径)
Returns:
包含下载信息的字典
"""
try:
s3_client = self.get_s3_client()
file_name = key.split("/")[-1] if "/" in key else key
# 使用 RFC 5987 编码处理文件名
from urllib.parse import quote
encoded_filename = quote(file_name.encode("utf-8"), safe="")
# 生成带有 Content-Disposition 的预签名 URL
expires = int(os.getenv("R2_PRESIGN_EXPIRES", "3600"))
url = s3_client.generate_presigned_url(
"get_object",
Params={
"Bucket": self.bucket_name,
"Key": key,
"ResponseContentDisposition": f"attachment; filename=\"{file_name}\"; filename*=UTF-8''{encoded_filename}",
},
ExpiresIn=expires,
)
if url:
return {"type": "redirect", "url": url}
# 如果预签名 URL 失败,尝试公共 URL
public_url = self.get_public_url(key)
if public_url:
return {"type": "redirect", "url": public_url}
return None
except Exception as e:
print(f"R2 download response generation failed: {str(e)}")
return None

160
utils.py Normal file
View File

@@ -0,0 +1,160 @@
"""
工具函数模块
集中管理常用的辅助函数
"""
from datetime import datetime
from typing import Optional
def format_timestamp(timestamp) -> str:
"""
格式化时间戳为人类可读的格式
Args:
timestamp: 时间戳对象datetime 或其他)
Returns:
格式化后的时间字符串
"""
if isinstance(timestamp, datetime):
return timestamp.strftime("%Y-%m-%d %H:%M:%S")
return str(timestamp)
def format_file_size(size_bytes: Optional[int]) -> str:
"""
格式化文件大小为人类可读的格式
Args:
size_bytes: 文件大小(字节)
Returns:
格式化后的大小字符串(如 "1.23MB"
"""
try:
if size_bytes is None:
return "-"
num = float(size_bytes)
except (ValueError, TypeError):
return "-"
for unit in ["B", "KB", "MB", "GB", "TB"]:
if num < 1024:
# 对于字节,显示整数
if unit == "B":
return f"{int(num)}{unit}"
# 其他单位保留两位小数
return f"{num:.2f}{unit}"
num = num / 1024.0
return f"{num:.2f}PB"
def get_file_icon(filename: Optional[str]) -> str:
"""
根据文件名返回对应的 Font Awesome 图标类名
Args:
filename: 文件名
Returns:
Font Awesome 图标类名
"""
if not filename:
return "fas fa-file"
ext = filename.lower().split(".")[-1] if "." in filename else ""
# 定义文件类型映射
icon_map = {
"fas fa-image": ["jpg", "jpeg", "png", "gif", "bmp", "webp", "svg"],
"fas fa-music": ["mp3", "wav", "ogg", "flac", "m4a", "aac"],
"fas fa-video": ["mp4", "webm", "avi", "mov", "wmv", "flv", "mkv"],
"fas fa-file-alt": ["pdf", "doc", "docx", "txt", "md", "rtf"],
"fas fa-file-archive": ["zip", "rar", "7z", "tar", "gz"],
"fas fa-file-code": ["py", "js", "html", "css", "java", "cpp", "c", "php"],
"fas fa-file-excel": ["xls", "xlsx", "csv"],
"fas fa-file-powerpoint": ["ppt", "pptx"],
}
for icon, extensions in icon_map.items():
if ext in extensions:
return icon
return "fas fa-file"
def normalize_path(path: str, is_folder: bool = False) -> str:
"""
规范化路径格式
Args:
path: 原始路径
is_folder: 是否为文件夹
Returns:
规范化后的路径(文件夹以 / 结尾)
"""
path = path.strip()
if is_folder and not path.endswith("/"):
return path + "/"
if not is_folder and path.endswith("/"):
return path.rstrip("/")
return path
def get_file_extension(filename: str) -> str:
"""
获取文件扩展名
Args:
filename: 文件名
Returns:
小写的文件扩展名(不含点)
"""
if not filename or "." not in filename:
return ""
return filename.lower().split(".")[-1]
def is_image_file(filename: str) -> bool:
"""
判断文件是否为图片
Args:
filename: 文件名
Returns:
如果是图片文件返回 True
"""
image_extensions = ["jpg", "jpeg", "png", "gif", "bmp", "webp", "svg", "ico"]
return get_file_extension(filename) in image_extensions
def is_video_file(filename: str) -> bool:
"""
判断文件是否为视频
Args:
filename: 文件名
Returns:
如果是视频文件返回 True
"""
video_extensions = ["mp4", "webm", "ogg", "mov", "avi", "mkv", "m4v"]
return get_file_extension(filename) in video_extensions
def is_audio_file(filename: str) -> bool:
"""
判断文件是否为音频
Args:
filename: 文件名
Returns:
如果是音频文件返回 True
"""
audio_extensions = ["mp3", "wav", "ogg", "m4a", "aac", "flac", "opus", "weba"]
return get_file_extension(filename) in audio_extensions