feat: 完整的 R2 文件管理系统初始化

- 实现 11 个 API 端点:上传、删除、重命名、复制、移动、创建文件夹等
- 构建存储抽象层,支持多后端(R2、S3、GitHub)
- 添加完整的前端界面:列表/网格视图、文件操作、批量管理
- 添加深色/浅色主题支持
- 迁移至 Jinja2 模板继承架构
- 添加完整的 API 文档和项目文档
- 优化 Vercel 部署配置
This commit is contained in:
2025-11-08 17:45:37 +08:00
parent dcdd617a68
commit b065133519
18 changed files with 4005 additions and 754 deletions

6
storages/__init__.py Normal file
View File

@@ -0,0 +1,6 @@
from .base import BaseStorage
from .cnbcool import CnbCoolStorage
from .factory import StorageFactory
from .r2 import R2Storage
__all__ = ["BaseStorage", "R2Storage", "CnbCoolStorage", "StorageFactory"]

210
storages/base.py Normal file
View File

@@ -0,0 +1,210 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Dict
class BaseStorage(ABC):
"""存储后端的基类,定义统一接口"""
@abstractmethod
def list_objects(self, prefix: str = "") -> Dict[str, Any]:
"""
列出存储桶中的对象
Args:
prefix: 对象前缀(用于目录浏览)
Returns:
包含对象列表的字典
"""
pass
@abstractmethod
def get_object_info(self, key: str) -> Dict[str, Any]:
"""
获取对象基本信息
Args:
key: 对象键名
Returns:
对象元数据
"""
pass
@abstractmethod
def get_object(self, key: str) -> Dict[str, Any]:
"""
获取对象内容
Args:
key: 对象键名
Returns:
包含对象内容的字典
"""
pass
@abstractmethod
def generate_presigned_url(self, key: str, expires: int = None) -> str:
"""
为指定对象生成预签名 URL
Args:
key: 对象键名
expires: 过期时间(秒)
Returns:
预签名 URL失败返回 None
"""
pass
@abstractmethod
def get_public_url(self, key: str) -> str:
"""
生成对象的公共访问 URL
Args:
key: 对象键名
Returns:
公共 URL未配置返回 None
"""
pass
def format_timestamp(self, timestamp) -> str:
"""
格式化时间戳为人类可读的格式
Args:
timestamp: 时间戳对象
Returns:
格式化后的时间字符串
"""
if isinstance(timestamp, datetime):
return timestamp.strftime("%Y-%m-%d %H:%M:%S")
return str(timestamp)
@abstractmethod
def generate_thumbnail(self, file_path: str) -> bytes:
"""
生成图片缩略图
Args:
file_path: 文件路径
Returns:
缩略图字节数据
"""
pass
@abstractmethod
def upload_file(self, key: str, file_data: bytes, content_type: str = None) -> bool:
"""
上传文件到存储
Args:
key: 对象键名(文件路径)
file_data: 文件二进制数据
content_type: 文件类型MIME type
Returns:
上传成功返回 True失败返回 False
"""
pass
@abstractmethod
def delete_file(self, key: str) -> bool:
"""
删除存储中的文件
Args:
key: 对象键名(文件路径)
Returns:
删除成功返回 True失败返回 False
"""
pass
@abstractmethod
def rename_file(self, old_key: str, new_key: str) -> bool:
"""
重命名存储中的文件
Args:
old_key: 旧的对象键名
new_key: 新的对象键名
Returns:
重命名成功返回 True失败返回 False
"""
pass
@abstractmethod
def delete_folder(self, prefix: str) -> bool:
"""
删除存储中的文件夹(前缀)
Args:
prefix: 要删除的文件夹前缀
Returns:
删除成功返回 True失败返回 False
"""
pass
@abstractmethod
def rename_folder(self, old_prefix: str, new_prefix: str) -> bool:
"""
重命名存储中的文件夹(前缀)
Args:
old_prefix: 旧的文件夹前缀
new_prefix: 新的文件夹前缀
Returns:
重命名成功返回 True失败返回 False
"""
pass
@abstractmethod
def copy_file(self, source_key: str, dest_key: str) -> bool:
"""
复制存储中的文件
Args:
source_key: 源对象键名
dest_key: 目标对象键名
Returns:
复制成功返回 True失败返回 False
"""
pass
@abstractmethod
def copy_folder(self, source_prefix: str, dest_prefix: str) -> bool:
"""
复制存储中的文件夹(前缀)
Args:
source_prefix: 源文件夹前缀
dest_prefix: 目标文件夹前缀
Returns:
复制成功返回 True失败返回 False
"""
pass
@abstractmethod
def create_folder(self, key: str) -> bool:
"""
创建文件夹
Args:
key: 文件夹路径(以 / 结尾)
Returns:
创建成功返回 True失败返回 False
"""
pass

54
storages/factory.py Normal file
View File

@@ -0,0 +1,54 @@
import os
from typing import Optional
import dotenv
from .base import BaseStorage
# from .cnbcool import CnbCoolStorage
from .r2 import R2Storage
dotenv.load_dotenv()
class StorageFactory:
"""存储工厂类,根据配置创建对应的存储实例"""
_instance: Optional[BaseStorage] = None
@classmethod
def get_storage(cls) -> BaseStorage:
"""
获取存储实例(单例模式)
Returns:
BaseStorage: 存储实例
Raises:
RuntimeError: 当存储类型未配置或不支持时
"""
if cls._instance is not None:
return cls._instance
storage_type = os.getenv(
"STORAGE_TYPE",
).lower()
if storage_type == "r2":
cls._instance = R2Storage()
# elif storage_type == "github":
# cls._instance = GithubStorage()
# elif storage_type == "cnbcool":
# cls._instance = CnbCoolStorage()
else:
raise RuntimeError(
f"Unsupported storage type: {storage_type}. "
f"Supported types: r2, cnbcool"
)
return cls._instance
@classmethod
def reset(cls):
"""重置单例实例(主要用于测试)"""
cls._instance = None

324
storages/r2.py Normal file
View File

@@ -0,0 +1,324 @@
import os
from io import BytesIO
from typing import Any, Dict
import boto3
import dotenv
from botocore.config import Config
from PIL import Image
from .base import BaseStorage
dotenv.load_dotenv()
class R2Storage(BaseStorage):
def __init__(self):
self.endpoint = os.getenv("R2_ENDPOINT_URL")
if not self.endpoint:
raise RuntimeError("R2_ENDPOINT_URL environment variable is not set")
self.access_key = os.getenv("ACCESS_KEY_ID") or os.getenv("ACCESS_KEY_ID")
self.secret_key = os.getenv("SECRET_ACCESS_KEY") or os.getenv(
"SECRET_ACCESS_KEY"
)
if not self.access_key or not self.secret_key:
raise RuntimeError("ACCESS_KEY_ID and SECRET_ACCESS_KEY must be set")
self.region_name = os.getenv("R2_REGION", "auto")
self.bucket_name = os.getenv("R2_BUCKET_NAME")
def get_s3_client(self):
"""
创建并返回配置好的 S3 客户端,用于访问 R2 存储
"""
return boto3.client(
"s3",
endpoint_url=self.endpoint,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
config=Config(signature_version="s3v4"),
region_name=self.region_name,
)
def list_objects(self, prefix: str = "") -> Dict[str, Any]:
"""
列出存储桶中的对象
"""
s3_client = self.get_s3_client()
if prefix and not prefix.endswith("/"):
prefix = prefix + "/"
list_kwargs = {"Bucket": self.bucket_name, "Delimiter": "/"}
if prefix:
list_kwargs["Prefix"] = prefix
return s3_client.list_objects_v2(**list_kwargs)
def get_object_info(self, key: str) -> Dict[str, Any]:
"""
获取对象基本信息
"""
s3_client = self.get_s3_client()
return s3_client.head_object(Bucket=self.bucket_name, Key=key)
def get_object(self, key: str) -> Dict[str, Any]:
"""
获取对象内容
"""
s3_client = self.get_s3_client()
return s3_client.get_object(Bucket=self.bucket_name, Key=key)
def generate_presigned_url(self, key: str, expires: int = None) -> str:
"""为指定对象生成 presigned URLGET"""
s3_client = self.get_s3_client()
if expires is None:
try:
expires = int(os.getenv("R2_PRESIGN_EXPIRES", "3600"))
except Exception:
expires = 3600
try:
url = s3_client.generate_presigned_url(
"get_object",
Params={"Bucket": self.bucket_name, "Key": key},
ExpiresIn=expires,
)
return url
except Exception:
return None
def get_public_url(self, key: str) -> str:
"""
生成对象的公共访问 URL
"""
base_url = os.getenv("R2_PUBLIC_URL")
if not base_url:
return None
return f"{base_url.rstrip('/')}/{key}"
def generate_thumbnail(self, file_path: str) -> bytes:
"""
生成图片缩略图
"""
try:
obj = self.get_object(file_path)
data = obj["Body"].read()
img = Image.open(BytesIO(data))
img = img.convert("RGB")
img.thumbnail((320, 320))
buf = BytesIO()
img.save(buf, "JPEG", quality=80, optimize=True)
buf.seek(0)
return buf.getvalue()
except Exception:
raise
def upload_file(self, key: str, file_data: bytes, content_type: str = None) -> bool:
"""
上传文件到 R2 存储
"""
try:
s3_client = self.get_s3_client()
# 如果没有指定 content_type尝试根据文件扩展名猜测
if not content_type:
content_type = self._guess_content_type(key)
# 上传文件
s3_client.put_object(
Bucket=self.bucket_name,
Key=key,
Body=file_data,
ContentType=content_type,
)
return True
except Exception as e:
print(f"Upload failed: {str(e)}")
return False
def delete_file(self, key: str) -> bool:
"""
从 R2 存储删除文件
"""
try:
s3_client = self.get_s3_client()
s3_client.delete_object(Bucket=self.bucket_name, Key=key)
return True
except Exception as e:
print(f"Delete failed: {str(e)}")
return False
def rename_file(self, old_key: str, new_key: str) -> bool:
"""
重命名 R2 中的对象,通过复制和删除实现
"""
try:
s3_client = self.get_s3_client()
# 复制对象到新路径
copy_source = {"Bucket": self.bucket_name, "Key": old_key}
s3_client.copy_object(
CopySource=copy_source, Bucket=self.bucket_name, Key=new_key
)
# 删除原对象
s3_client.delete_object(Bucket=self.bucket_name, Key=old_key)
return True
except Exception as e:
print(f"Rename failed: {str(e)}")
return False
def delete_folder(self, prefix: str) -> bool:
"""
删除 R2 中整个文件夹(前缀)下的所有对象
"""
try:
s3_client = self.get_s3_client()
paginator = s3_client.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=self.bucket_name, Prefix=prefix)
objects_to_delete = []
for page in pages:
if "Contents" in page:
for obj in page["Contents"]:
objects_to_delete.append({"Key": obj["Key"]})
if not objects_to_delete:
return True # 文件夹为空,直接返回成功
# 分批次删除S3/R2 一次最多删除 1000 个
for i in range(0, len(objects_to_delete), 1000):
chunk = objects_to_delete[i : i + 1000]
s3_client.delete_objects(
Bucket=self.bucket_name, Delete={"Objects": chunk}
)
return True
except Exception as e:
print(f"Folder delete failed: {str(e)}")
return False
def rename_folder(self, old_prefix: str, new_prefix: str) -> bool:
"""
重命名 R2 中的文件夹(前缀),通过复制和删除实现
"""
try:
s3_client = self.get_s3_client()
paginator = s3_client.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=self.bucket_name, Prefix=old_prefix)
objects_to_rename = []
for page in pages:
if "Contents" in page:
for obj in page["Contents"]:
objects_to_rename.append(obj["Key"])
if not objects_to_rename:
return True # 文件夹为空,直接返回成功
for old_key in objects_to_rename:
new_key = old_key.replace(old_prefix, new_prefix, 1)
copy_source = {"Bucket": self.bucket_name, "Key": old_key}
s3_client.copy_object(
CopySource=copy_source, Bucket=self.bucket_name, Key=new_key
)
# 删除旧文件夹下的所有对象
self.delete_folder(old_prefix)
return True
except Exception as e:
print(f"Folder rename failed: {str(e)}")
return False
def copy_file(self, source_key: str, dest_key: str) -> bool:
"""
复制 R2 中的对象
"""
try:
s3_client = self.get_s3_client()
copy_source = {"Bucket": self.bucket_name, "Key": source_key}
s3_client.copy_object(
CopySource=copy_source, Bucket=self.bucket_name, Key=dest_key
)
return True
except Exception as e:
print(f"File copy failed: {str(e)}")
return False
def copy_folder(self, source_prefix: str, dest_prefix: str) -> bool:
"""
复制 R2 中的文件夹(前缀)
"""
try:
s3_client = self.get_s3_client()
paginator = s3_client.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=self.bucket_name, Prefix=source_prefix)
for page in pages:
if "Contents" in page:
for obj in page["Contents"]:
old_key = obj["Key"]
new_key = old_key.replace(source_prefix, dest_prefix, 1)
copy_source = {"Bucket": self.bucket_name, "Key": old_key}
s3_client.copy_object(
CopySource=copy_source,
Bucket=self.bucket_name,
Key=new_key,
)
return True
except Exception as e:
print(f"Folder copy failed: {str(e)}")
return False
def create_folder(self, key: str) -> bool:
"""
在 R2 中创建文件夹(通过创建一个以 / 结尾的 0 字节对象)
"""
try:
s3_client = self.get_s3_client()
s3_client.put_object(Bucket=self.bucket_name, Key=key, Body=b"")
return True
except Exception as e:
print(f"Folder creation failed: {str(e)}")
return False
def _guess_content_type(self, filename: str) -> str:
"""
根据文件扩展名猜测 Content-Type
"""
ext = filename.lower().split(".")[-1] if "." in filename else ""
content_types = {
"jpg": "image/jpeg",
"jpeg": "image/jpeg",
"png": "image/png",
"gif": "image/gif",
"webp": "image/webp",
"svg": "image/svg+xml",
"pdf": "application/pdf",
"html": "text/html",
"css": "text/css",
"js": "application/javascript",
"json": "application/json",
"xml": "application/xml",
"txt": "text/plain",
"md": "text/markdown",
"mp4": "video/mp4",
"webm": "video/webm",
"mp3": "audio/mpeg",
"wav": "audio/wav",
"zip": "application/zip",
"rar": "application/x-rar-compressed",
"7z": "application/x-7z-compressed",
"tar": "application/x-tar",
"gz": "application/gzip",
}
return content_types.get(ext, "application/octet-stream")