mirror of
https://github.com/RhenCloud/Cloud-Index.git
synced 2025-12-06 15:26:10 +08:00
Compare commits
3 Commits
e404764ea9
...
0a640a91fc
| Author | SHA1 | Date | |
|---|---|---|---|
| 0a640a91fc | |||
| 33fe06c59e | |||
| 07a8fafff2 |
43
.env.example
43
.env.example
@@ -9,43 +9,38 @@
|
|||||||
STORAGE_TYPE=r2
|
STORAGE_TYPE=r2
|
||||||
|
|
||||||
# ==================== Cloudflare R2 配置 ====================
|
# ==================== Cloudflare R2 配置 ====================
|
||||||
|
# R2 账户 ID
|
||||||
|
R2_ACCOUNT_ID=your-account-id
|
||||||
|
|
||||||
# R2 访问凭证
|
# R2 访问凭证
|
||||||
ACCESS_KEY_ID=YOUR_R2_ACCESS_KEY_ID
|
R2_ACCESS_KEY_ID=YOUR_R2_ACCESS_KEY_ID
|
||||||
SECRET_ACCESS_KEY=YOUR_R2_SECRET_ACCESS_KEY
|
R2_SECRET_ACCESS_KEY=YOUR_R2_SECRET_ACCESS_KEY
|
||||||
|
|
||||||
# R2 存储桶名称
|
# R2 存储桶名称
|
||||||
R2_BUCKET_NAME=drive
|
R2_BUCKET_NAME=drive
|
||||||
|
|
||||||
# R2 Endpoint URL
|
# R2 公共访问域名 (可选,例如: https://pub-<bucket-name>.r2.dev)
|
||||||
R2_ENDPOINT_URL=https://<your-account-id>.r2.cloudflarestorage.com
|
R2_PUBLIC_DOMAIN=https://pub-<bucket-name>.r2.dev
|
||||||
|
|
||||||
# R2 区域 (默认: auto)
|
|
||||||
R2_REGION=auto
|
|
||||||
|
|
||||||
# R2 公共访问 URL (可选,例如: https://pub-<bucket-name>.r2.dev)
|
|
||||||
R2_PUBLIC_URL=https://pub-<bucket-name>.r2.dev
|
|
||||||
|
|
||||||
# R2 预签名 URL 过期时间(秒,默认: 3600)
|
|
||||||
R2_PRESIGN_EXPIRES=3600
|
|
||||||
|
|
||||||
# ==================== GitHub 存储配置 ====================
|
# ==================== GitHub 存储配置 ====================
|
||||||
# GitHub 仓库所有者
|
# GitHub 仓库 (格式: owner/repo)
|
||||||
GITHUB_REPO_OWNER=your-username
|
GITHUB_REPO=your-username/your-repo
|
||||||
|
|
||||||
# GitHub 仓库名称
|
|
||||||
GITHUB_REPO_NAME=your-repo
|
|
||||||
|
|
||||||
# GitHub 访问令牌 (需要 repo 权限)
|
# GitHub 访问令牌 (需要 repo 权限)
|
||||||
GITHUB_ACCESS_TOKEN=your-access-token
|
GITHUB_TOKEN=your-access-token
|
||||||
|
|
||||||
# GitHub 分支名称 (默认: main)
|
# GitHub 分支名称 (默认: main)
|
||||||
GITHUB_BRANCH=main
|
GITHUB_BRANCH=main
|
||||||
|
|
||||||
# GitHub Raw 文件反向代理 URL (可选,用于加速访问)
|
|
||||||
# 例如: https://raw.fastgit.org 或 https://ghproxy.com/https://raw.githubusercontent.com
|
|
||||||
# 留空则使用官方 raw.githubusercontent.com
|
|
||||||
GITHUB_RAW_PROXY_URL=
|
|
||||||
|
|
||||||
# ==================== 应用配置 ====================
|
# ==================== 应用配置 ====================
|
||||||
|
# 服务器配置
|
||||||
|
HOST=0.0.0.0
|
||||||
|
PORT=5000
|
||||||
|
DEBUG=false
|
||||||
|
|
||||||
# 缩略图缓存时间(秒,默认: 3600)
|
# 缩略图缓存时间(秒,默认: 3600)
|
||||||
THUMB_TTL_SECONDS=3600
|
THUMB_TTL_SECONDS=3600
|
||||||
|
|
||||||
|
# 预签名 URL 过期时间(秒,默认: 3600)
|
||||||
|
PRESIGNED_URL_EXPIRES=3600
|
||||||
|
THUMB_TTL_SECONDS=3600
|
||||||
|
|||||||
42
README.md
42
README.md
@@ -96,20 +96,18 @@ python app.py
|
|||||||
```env
|
```env
|
||||||
STORAGE_TYPE=r2
|
STORAGE_TYPE=r2
|
||||||
|
|
||||||
|
# R2 账户 ID
|
||||||
|
R2_ACCOUNT_ID=your-account-id
|
||||||
|
|
||||||
# R2 访问凭证
|
# R2 访问凭证
|
||||||
ACCESS_KEY_ID=your_access_key_id
|
R2_ACCESS_KEY_ID=your_access_key_id
|
||||||
SECRET_ACCESS_KEY=your_secret_access_key
|
R2_SECRET_ACCESS_KEY=your_secret_access_key
|
||||||
|
|
||||||
# R2 存储桶配置
|
# R2 存储桶配置
|
||||||
R2_BUCKET_NAME=your_bucket_name
|
R2_BUCKET_NAME=your_bucket_name
|
||||||
R2_ENDPOINT_URL=https://your-account-id.r2.cloudflarestorage.com
|
|
||||||
R2_REGION=auto
|
|
||||||
|
|
||||||
# 可选:公共访问 URL
|
# 可选:公共访问域名
|
||||||
R2_PUBLIC_URL=https://pub-your-bucket.r2.dev
|
R2_PUBLIC_DOMAIN=https://pub-your-bucket.r2.dev
|
||||||
|
|
||||||
# 可选:预签名 URL 过期时间(秒)
|
|
||||||
R2_PRESIGN_EXPIRES=3600
|
|
||||||
```
|
```
|
||||||
|
|
||||||
### GitHub Repository 配置
|
### GitHub Repository 配置
|
||||||
@@ -117,26 +115,15 @@ R2_PRESIGN_EXPIRES=3600
|
|||||||
```env
|
```env
|
||||||
STORAGE_TYPE=github
|
STORAGE_TYPE=github
|
||||||
|
|
||||||
# GitHub 仓库所有者(用户名或组织)
|
# GitHub 仓库 (格式: owner/repo)
|
||||||
GITHUB_REPO_OWNER=your-username
|
GITHUB_REPO=your-username/your-repo
|
||||||
|
|
||||||
# GitHub 仓库名称
|
|
||||||
GITHUB_REPO_NAME=your-repo
|
|
||||||
|
|
||||||
# GitHub 个人访问令牌(需要 repo 权限)
|
# GitHub 个人访问令牌(需要 repo 权限)
|
||||||
# 获取方式:https://github.com/settings/tokens
|
# 获取方式:https://github.com/settings/tokens
|
||||||
GITHUB_ACCESS_TOKEN=ghp_your_token_here
|
GITHUB_TOKEN=ghp_your_token_here
|
||||||
|
|
||||||
# GitHub 分支名称(可选,默认: main)
|
# GitHub 分支名称(可选,默认: main)
|
||||||
GITHUB_BRANCH=main
|
GITHUB_BRANCH=main
|
||||||
|
|
||||||
# GitHub Raw 文件反向代理 URL(可选,用于加速访问)
|
|
||||||
# 常用反向代理:
|
|
||||||
# - https://raw.fastgit.org (推荐,速度快)
|
|
||||||
# - https://ghproxy.com
|
|
||||||
# - https://raw.kgithub.com
|
|
||||||
# 留空则使用官方 raw.githubusercontent.com(国内可能较慢)
|
|
||||||
GITHUB_RAW_PROXY_URL=https://raw.fastgit.org
|
|
||||||
```
|
```
|
||||||
|
|
||||||
## 项目结构
|
## 项目结构
|
||||||
@@ -144,6 +131,8 @@ GITHUB_RAW_PROXY_URL=https://raw.fastgit.org
|
|||||||
```bash
|
```bash
|
||||||
cloud-index/
|
cloud-index/
|
||||||
├── app.py # Flask 应用主入口
|
├── app.py # Flask 应用主入口
|
||||||
|
├── config.py # 统一配置管理
|
||||||
|
├── utils.py # 工具函数模块
|
||||||
├── handlers/
|
├── handlers/
|
||||||
│ └── routes.py # 路由处理器
|
│ └── routes.py # 路由处理器
|
||||||
├── storages/ # 存储后端实现
|
├── storages/ # 存储后端实现
|
||||||
@@ -153,11 +142,16 @@ cloud-index/
|
|||||||
│ ├── r2.py # Cloudflare R2 实现
|
│ ├── r2.py # Cloudflare R2 实现
|
||||||
│ └── github.py # GitHub Repository 实现
|
│ └── github.py # GitHub Repository 实现
|
||||||
├── templates/ # HTML 模板
|
├── templates/ # HTML 模板
|
||||||
|
│ ├── base.html
|
||||||
│ ├── index.html
|
│ ├── index.html
|
||||||
│ └── footer.html
|
│ └── footer.html
|
||||||
├── static/ # 静态资源
|
├── static/ # 静态资源
|
||||||
│ └── thumbs/
|
│ ├── css/
|
||||||
|
│ │ └── main.css
|
||||||
|
│ └── js/
|
||||||
|
│ └── main.js
|
||||||
├── .env.example # 环境变量示例
|
├── .env.example # 环境变量示例
|
||||||
|
├── pyproject.toml # 项目配置和依赖
|
||||||
└── requirements.txt # Python 依赖
|
└── requirements.txt # Python 依赖
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|||||||
76
app.py
76
app.py
@@ -1,14 +1,15 @@
|
|||||||
import os
|
|
||||||
import tomllib
|
import tomllib
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
import dotenv
|
|
||||||
from flask import Flask
|
from flask import Flask
|
||||||
|
|
||||||
|
import utils
|
||||||
|
from config import Config
|
||||||
from handlers.routes import main_route
|
from handlers.routes import main_route
|
||||||
from storages.factory import StorageFactory
|
from storages.factory import StorageFactory
|
||||||
|
|
||||||
dotenv.load_dotenv()
|
# 验证配置
|
||||||
|
Config.validate()
|
||||||
|
|
||||||
|
|
||||||
# 从 pyproject.toml 读取版本号
|
# 从 pyproject.toml 读取版本号
|
||||||
@@ -34,73 +35,18 @@ app.register_blueprint(main_route)
|
|||||||
# 初始化存储(使用工厂模式)
|
# 初始化存储(使用工厂模式)
|
||||||
storage = StorageFactory.get_storage()
|
storage = StorageFactory.get_storage()
|
||||||
|
|
||||||
# 缩略图默认 TTL(秒),可通过环境变量覆盖
|
|
||||||
THUMB_TTL = int(os.environ.get("THUMB_TTL_SECONDS", "3600"))
|
|
||||||
|
|
||||||
|
# 注册模板过滤器
|
||||||
# 注册一个安全的 filesizeformat 过滤器,处理 None 和非数字值
|
|
||||||
@app.template_filter("filesizeformat")
|
@app.template_filter("filesizeformat")
|
||||||
def filesizeformat_filter(value):
|
def filesizeformat_filter(value):
|
||||||
try:
|
"""格式化文件大小"""
|
||||||
if value is None:
|
return utils.format_file_size(value)
|
||||||
return "-"
|
|
||||||
num = float(value) # 使用 float 而不是 int 以保持精度
|
|
||||||
except Exception:
|
|
||||||
return "-"
|
|
||||||
|
|
||||||
for unit in ["B", "KB", "MB", "GB", "TB"]:
|
|
||||||
if num < 1024:
|
|
||||||
# 对于字节,显示整数
|
|
||||||
if unit == "B":
|
|
||||||
return f"{int(num)}{unit}"
|
|
||||||
# 其他单位保留两位小数
|
|
||||||
return f"{num:.2f}{unit}"
|
|
||||||
num = num / 1024.0
|
|
||||||
return f"{num:.2f}PB"
|
|
||||||
|
|
||||||
|
|
||||||
# 注册一个文件图标过滤器
|
|
||||||
@app.template_filter("fileicon")
|
@app.template_filter("fileicon")
|
||||||
def fileicon_filter(filename):
|
def fileicon_filter(filename):
|
||||||
if not filename:
|
"""获取文件图标"""
|
||||||
return "fas fa-file"
|
return utils.get_file_icon(filename)
|
||||||
|
|
||||||
ext = filename.lower().split(".")[-1] if "." in filename else ""
|
|
||||||
|
|
||||||
# 图片文件
|
|
||||||
if ext in ["jpg", "jpeg", "png", "gif", "bmp", "webp", "svg"]:
|
|
||||||
return "fas fa-image"
|
|
||||||
|
|
||||||
# 音频文件
|
|
||||||
if ext in ["mp3", "wav", "ogg", "flac", "m4a", "aac"]:
|
|
||||||
return "fas fa-music"
|
|
||||||
|
|
||||||
# 视频文件
|
|
||||||
if ext in ["mp4", "webm", "avi", "mov", "wmv", "flv", "mkv"]:
|
|
||||||
return "fas fa-video"
|
|
||||||
|
|
||||||
# 文档文件
|
|
||||||
if ext in ["pdf", "doc", "docx", "txt", "md", "rtf"]:
|
|
||||||
return "fas fa-file-alt"
|
|
||||||
|
|
||||||
# 压缩文件
|
|
||||||
if ext in ["zip", "rar", "7z", "tar", "gz"]:
|
|
||||||
return "fas fa-file-archive"
|
|
||||||
|
|
||||||
# 代码文件
|
|
||||||
if ext in ["py", "js", "html", "css", "java", "cpp", "c", "php"]:
|
|
||||||
return "fas fa-file-code"
|
|
||||||
|
|
||||||
# 表格文件
|
|
||||||
if ext in ["xls", "xlsx", "csv"]:
|
|
||||||
return "fas fa-file-excel"
|
|
||||||
|
|
||||||
# 演示文件
|
|
||||||
if ext in ["ppt", "pptx"]:
|
|
||||||
return "fas fa-file-powerpoint"
|
|
||||||
|
|
||||||
# 默认文件图标
|
|
||||||
return "fas fa-file"
|
|
||||||
|
|
||||||
|
|
||||||
def get_public_url(key: str) -> str:
|
def get_public_url(key: str) -> str:
|
||||||
@@ -135,6 +81,4 @@ def inject_version():
|
|||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
port = int(os.environ.get("PORT", 5000))
|
app.run(host=Config.HOST, port=Config.PORT, debug=Config.DEBUG)
|
||||||
host = os.environ.get("HOST", "0.0.0.0")
|
|
||||||
app.run(host=host, port=port, debug=True)
|
|
||||||
|
|||||||
83
config.py
Normal file
83
config.py
Normal file
@@ -0,0 +1,83 @@
|
|||||||
|
"""
|
||||||
|
配置管理模块
|
||||||
|
集中管理所有环境变量和应用配置
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import dotenv
|
||||||
|
|
||||||
|
# 加载环境变量
|
||||||
|
dotenv.load_dotenv()
|
||||||
|
|
||||||
|
|
||||||
|
class Config:
|
||||||
|
"""应用配置类"""
|
||||||
|
|
||||||
|
# 存储配置
|
||||||
|
STORAGE_TYPE: str = os.getenv("STORAGE_TYPE", "").lower()
|
||||||
|
|
||||||
|
# R2 配置
|
||||||
|
R2_ACCOUNT_ID: Optional[str] = os.getenv("R2_ACCOUNT_ID")
|
||||||
|
R2_ACCESS_KEY_ID: Optional[str] = os.getenv("R2_ACCESS_KEY_ID")
|
||||||
|
R2_SECRET_ACCESS_KEY: Optional[str] = os.getenv("R2_SECRET_ACCESS_KEY")
|
||||||
|
R2_BUCKET_NAME: Optional[str] = os.getenv("R2_BUCKET_NAME")
|
||||||
|
R2_PUBLIC_DOMAIN: Optional[str] = os.getenv("R2_PUBLIC_DOMAIN")
|
||||||
|
|
||||||
|
# GitHub 配置
|
||||||
|
GITHUB_TOKEN: Optional[str] = os.getenv("GITHUB_TOKEN")
|
||||||
|
GITHUB_REPO: Optional[str] = os.getenv("GITHUB_REPO") # 格式: owner/repo
|
||||||
|
GITHUB_BRANCH: str = os.getenv("GITHUB_BRANCH", "main")
|
||||||
|
|
||||||
|
# 应用配置
|
||||||
|
HOST: str = os.getenv("HOST", "0.0.0.0")
|
||||||
|
PORT: int = int(os.getenv("PORT", "5000"))
|
||||||
|
DEBUG: bool = os.getenv("DEBUG", "false").lower() == "true"
|
||||||
|
|
||||||
|
# 缩略图配置
|
||||||
|
THUMB_TTL_SECONDS: int = int(os.getenv("THUMB_TTL_SECONDS", "3600"))
|
||||||
|
THUMB_SIZE: tuple[int, int] = (300, 300) # 缩略图尺寸
|
||||||
|
|
||||||
|
# URL过期时间配置
|
||||||
|
PRESIGNED_URL_EXPIRES: int = int(os.getenv("PRESIGNED_URL_EXPIRES", "3600"))
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def validate(cls) -> None:
|
||||||
|
"""验证必需的配置项是否已设置"""
|
||||||
|
if not cls.STORAGE_TYPE:
|
||||||
|
raise ValueError("STORAGE_TYPE environment variable is not set. Supported types: r2, github")
|
||||||
|
|
||||||
|
if cls.STORAGE_TYPE == "r2":
|
||||||
|
required = ["R2_ACCOUNT_ID", "R2_ACCESS_KEY_ID", "R2_SECRET_ACCESS_KEY", "R2_BUCKET_NAME"]
|
||||||
|
missing = [key for key in required if not getattr(cls, key)]
|
||||||
|
if missing:
|
||||||
|
raise ValueError(f"Missing required R2 configuration: {', '.join(missing)}")
|
||||||
|
|
||||||
|
elif cls.STORAGE_TYPE == "github":
|
||||||
|
required = ["GITHUB_TOKEN", "GITHUB_REPO"]
|
||||||
|
missing = [key for key in required if not getattr(cls, key)]
|
||||||
|
if missing:
|
||||||
|
raise ValueError(f"Missing required GitHub configuration: {', '.join(missing)}")
|
||||||
|
|
||||||
|
elif cls.STORAGE_TYPE not in ["r2", "github"]:
|
||||||
|
raise ValueError(f"Unsupported storage type: {cls.STORAGE_TYPE}. Supported types: r2, github")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_storage_config(cls) -> dict:
|
||||||
|
"""获取当前存储类型的配置字典"""
|
||||||
|
if cls.STORAGE_TYPE == "r2":
|
||||||
|
return {
|
||||||
|
"account_id": cls.R2_ACCOUNT_ID,
|
||||||
|
"access_key_id": cls.R2_ACCESS_KEY_ID,
|
||||||
|
"secret_access_key": cls.R2_SECRET_ACCESS_KEY,
|
||||||
|
"bucket_name": cls.R2_BUCKET_NAME,
|
||||||
|
"public_domain": cls.R2_PUBLIC_DOMAIN,
|
||||||
|
}
|
||||||
|
elif cls.STORAGE_TYPE == "github":
|
||||||
|
return {
|
||||||
|
"token": cls.GITHUB_TOKEN,
|
||||||
|
"repo": cls.GITHUB_REPO,
|
||||||
|
"branch": cls.GITHUB_BRANCH,
|
||||||
|
}
|
||||||
|
return {}
|
||||||
@@ -1,33 +1,24 @@
|
|||||||
import hashlib
|
import hashlib
|
||||||
import os
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Any, Dict, List
|
from typing import Any, Dict, List
|
||||||
|
|
||||||
from flask import Blueprint, Response, abort, jsonify, redirect, render_template, request
|
from flask import Blueprint, Response, abort, jsonify, redirect, render_template, request
|
||||||
|
|
||||||
|
from config import Config
|
||||||
from storages.factory import StorageFactory
|
from storages.factory import StorageFactory
|
||||||
|
|
||||||
main_route = Blueprint("main", __name__)
|
main_route = Blueprint("main", __name__)
|
||||||
|
|
||||||
# 初始化存储(使用工厂模式)
|
# 延迟初始化的存储实例
|
||||||
storage = StorageFactory.get_storage()
|
_storage = None
|
||||||
|
|
||||||
# 缩略图默认 TTL(秒),可通过环境变量覆盖
|
|
||||||
THUMB_TTL = int(os.environ.get("THUMB_TTL_SECONDS", "3600"))
|
|
||||||
|
|
||||||
|
|
||||||
def format_timestamp(timestamp) -> str:
|
def get_storage():
|
||||||
"""
|
"""获取存储实例(延迟初始化)"""
|
||||||
格式化时间戳为人类可读的格式
|
global _storage
|
||||||
"""
|
if _storage is None:
|
||||||
return storage.format_timestamp(timestamp)
|
_storage = StorageFactory.get_storage()
|
||||||
|
return _storage
|
||||||
|
|
||||||
def get_public_url(key: str) -> str:
|
|
||||||
"""
|
|
||||||
生成对象的公共访问 URL
|
|
||||||
"""
|
|
||||||
return storage.get_public_url(key)
|
|
||||||
|
|
||||||
|
|
||||||
def get_file_url(key: str) -> str:
|
def get_file_url(key: str) -> str:
|
||||||
@@ -37,6 +28,7 @@ def get_file_url(key: str) -> str:
|
|||||||
|
|
||||||
def build_file_entry(obj: Dict[str, Any], prefix: str) -> Dict[str, Any] | None:
|
def build_file_entry(obj: Dict[str, Any], prefix: str) -> Dict[str, Any] | None:
|
||||||
"""根据对象信息构建文件条目。"""
|
"""根据对象信息构建文件条目。"""
|
||||||
|
storage = get_storage()
|
||||||
key = obj.get("Key", "")
|
key = obj.get("Key", "")
|
||||||
if not key:
|
if not key:
|
||||||
return None
|
return None
|
||||||
@@ -53,12 +45,12 @@ def build_file_entry(obj: Dict[str, Any], prefix: str) -> Dict[str, Any] | None:
|
|||||||
"name": rel_name,
|
"name": rel_name,
|
||||||
"key": key,
|
"key": key,
|
||||||
"size": obj.get("Size"),
|
"size": obj.get("Size"),
|
||||||
"last_modified": format_timestamp(obj.get("LastModified")),
|
"last_modified": storage.format_timestamp(obj.get("LastModified")),
|
||||||
"is_dir": False,
|
"is_dir": False,
|
||||||
"file_url": get_file_url(key),
|
"file_url": get_file_url(key),
|
||||||
}
|
}
|
||||||
|
|
||||||
public_url = get_public_url(key)
|
public_url = storage.get_public_url(key)
|
||||||
if public_url:
|
if public_url:
|
||||||
entry["public_url"] = public_url
|
entry["public_url"] = public_url
|
||||||
|
|
||||||
@@ -115,6 +107,7 @@ def index():
|
|||||||
返回文件和目录列表的 HTML 页面。
|
返回文件和目录列表的 HTML 页面。
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
|
storage = get_storage()
|
||||||
prefix = request.args.get("prefix", "") or ""
|
prefix = request.args.get("prefix", "") or ""
|
||||||
|
|
||||||
response = storage.list_objects(prefix)
|
response = storage.list_objects(prefix)
|
||||||
@@ -136,6 +129,7 @@ def index():
|
|||||||
def browse(prefix_path):
|
def browse(prefix_path):
|
||||||
"""目录路由。将 URL /a/b 映射为 prefix 'a/b/' 并重用 index 的逻辑。"""
|
"""目录路由。将 URL /a/b 映射为 prefix 'a/b/' 并重用 index 的逻辑。"""
|
||||||
try:
|
try:
|
||||||
|
storage = get_storage()
|
||||||
prefix = prefix_path or ""
|
prefix = prefix_path or ""
|
||||||
if prefix and not prefix.endswith("/"):
|
if prefix and not prefix.endswith("/"):
|
||||||
prefix = prefix + "/"
|
prefix = prefix + "/"
|
||||||
@@ -159,6 +153,7 @@ def browse(prefix_path):
|
|||||||
def serve_file(file_path):
|
def serve_file(file_path):
|
||||||
"""重定向到原始存储 URL,节省服务器资源"""
|
"""重定向到原始存储 URL,节省服务器资源"""
|
||||||
try:
|
try:
|
||||||
|
storage = get_storage()
|
||||||
# 验证文件存在
|
# 验证文件存在
|
||||||
try:
|
try:
|
||||||
storage.get_object_info(file_path)
|
storage.get_object_info(file_path)
|
||||||
@@ -186,6 +181,7 @@ def serve_file(file_path):
|
|||||||
def download_file(file_path):
|
def download_file(file_path):
|
||||||
"""下载文件,支持所有存储类型"""
|
"""下载文件,支持所有存储类型"""
|
||||||
try:
|
try:
|
||||||
|
storage = get_storage()
|
||||||
# 验证文件存在
|
# 验证文件存在
|
||||||
try:
|
try:
|
||||||
storage.get_object_info(file_path)
|
storage.get_object_info(file_path)
|
||||||
@@ -218,9 +214,10 @@ def download_file(file_path):
|
|||||||
@main_route.route("/thumb/<path:file_path>")
|
@main_route.route("/thumb/<path:file_path>")
|
||||||
def thumb(file_path):
|
def thumb(file_path):
|
||||||
"""返回图片的缩略图,使用 Vercel Cache Headers 避免重复从 R2 拉取"""
|
"""返回图片的缩略图,使用 Vercel Cache Headers 避免重复从 R2 拉取"""
|
||||||
|
storage = get_storage()
|
||||||
# 设置更长的缓存控制头以支持浏览器本地缓存
|
# 设置更长的缓存控制头以支持浏览器本地缓存
|
||||||
cache_headers = {
|
cache_headers = {
|
||||||
"Cache-Control": f"public, max-age={THUMB_TTL}",
|
"Cache-Control": f"public, max-age={Config.THUMB_TTL_SECONDS}",
|
||||||
"ETag": f'W/"{hashlib.md5(file_path.encode("utf-8")).hexdigest()}"',
|
"ETag": f'W/"{hashlib.md5(file_path.encode("utf-8")).hexdigest()}"',
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -257,6 +254,7 @@ def thumb(file_path):
|
|||||||
def upload():
|
def upload():
|
||||||
"""上传文件到存储"""
|
"""上传文件到存储"""
|
||||||
try:
|
try:
|
||||||
|
storage = get_storage()
|
||||||
# 检查是否有文件
|
# 检查是否有文件
|
||||||
if "file" not in request.files:
|
if "file" not in request.files:
|
||||||
return jsonify({"success": False, "error": "No file provided"}), 400
|
return jsonify({"success": False, "error": "No file provided"}), 400
|
||||||
@@ -303,6 +301,7 @@ def upload():
|
|||||||
def delete(file_path):
|
def delete(file_path):
|
||||||
"""删除存储中的文件"""
|
"""删除存储中的文件"""
|
||||||
try:
|
try:
|
||||||
|
storage = get_storage()
|
||||||
# 删除文件
|
# 删除文件
|
||||||
success = storage.delete_file(file_path)
|
success = storage.delete_file(file_path)
|
||||||
|
|
||||||
@@ -319,6 +318,7 @@ def delete(file_path):
|
|||||||
def rename(old_key):
|
def rename(old_key):
|
||||||
"""重命名存储中的文件"""
|
"""重命名存储中的文件"""
|
||||||
try:
|
try:
|
||||||
|
storage = get_storage()
|
||||||
data = request.get_json()
|
data = request.get_json()
|
||||||
new_name = data.get("newName")
|
new_name = data.get("newName")
|
||||||
|
|
||||||
@@ -326,9 +326,9 @@ def rename(old_key):
|
|||||||
return jsonify({"success": False, "error": "New name not provided"}), 400
|
return jsonify({"success": False, "error": "New name not provided"}), 400
|
||||||
|
|
||||||
# 构建新的文件路径
|
# 构建新的文件路径
|
||||||
prefix = os.path.dirname(old_key)
|
old_key_parts = old_key.rsplit("/", 1)
|
||||||
if prefix:
|
if len(old_key_parts) > 1:
|
||||||
new_key = f"{prefix}/{new_name}"
|
new_key = f"{old_key_parts[0]}/{new_name}"
|
||||||
else:
|
else:
|
||||||
new_key = new_name
|
new_key = new_name
|
||||||
|
|
||||||
@@ -354,6 +354,7 @@ def rename(old_key):
|
|||||||
def delete_folder_route(prefix):
|
def delete_folder_route(prefix):
|
||||||
"""删除存储中的文件夹"""
|
"""删除存储中的文件夹"""
|
||||||
try:
|
try:
|
||||||
|
storage = get_storage()
|
||||||
if not prefix.endswith("/"):
|
if not prefix.endswith("/"):
|
||||||
prefix += "/"
|
prefix += "/"
|
||||||
success = storage.delete_folder(prefix)
|
success = storage.delete_folder(prefix)
|
||||||
@@ -369,6 +370,7 @@ def delete_folder_route(prefix):
|
|||||||
def rename_folder_route(old_prefix):
|
def rename_folder_route(old_prefix):
|
||||||
"""重命名存储中的文件夹"""
|
"""重命名存储中的文件夹"""
|
||||||
try:
|
try:
|
||||||
|
storage = get_storage()
|
||||||
data = request.get_json()
|
data = request.get_json()
|
||||||
new_name = data.get("newName")
|
new_name = data.get("newName")
|
||||||
|
|
||||||
@@ -379,9 +381,9 @@ def rename_folder_route(old_prefix):
|
|||||||
old_prefix += "/"
|
old_prefix += "/"
|
||||||
|
|
||||||
# 构建新的文件夹路径
|
# 构建新的文件夹路径
|
||||||
parent_prefix = os.path.dirname(os.path.dirname(old_prefix))
|
prefix_parts = old_prefix.rstrip("/").rsplit("/", 1)
|
||||||
if parent_prefix:
|
if len(prefix_parts) > 1:
|
||||||
new_prefix = f"{parent_prefix}/{new_name}/"
|
new_prefix = f"{prefix_parts[0]}/{new_name}/"
|
||||||
else:
|
else:
|
||||||
new_prefix = f"{new_name}/"
|
new_prefix = f"{new_name}/"
|
||||||
|
|
||||||
@@ -405,6 +407,7 @@ def rename_folder_route(old_prefix):
|
|||||||
def copy_item():
|
def copy_item():
|
||||||
"""复制文件或文件夹"""
|
"""复制文件或文件夹"""
|
||||||
try:
|
try:
|
||||||
|
storage = get_storage()
|
||||||
data = request.get_json()
|
data = request.get_json()
|
||||||
source = data.get("source")
|
source = data.get("source")
|
||||||
destination = data.get("destination")
|
destination = data.get("destination")
|
||||||
@@ -437,6 +440,7 @@ def copy_item():
|
|||||||
def move_item():
|
def move_item():
|
||||||
"""移动文件或文件夹"""
|
"""移动文件或文件夹"""
|
||||||
try:
|
try:
|
||||||
|
storage = get_storage()
|
||||||
data = request.get_json()
|
data = request.get_json()
|
||||||
source = data.get("source")
|
source = data.get("source")
|
||||||
destination = data.get("destination")
|
destination = data.get("destination")
|
||||||
@@ -469,6 +473,7 @@ def move_item():
|
|||||||
def create_folder_route():
|
def create_folder_route():
|
||||||
"""创建文件夹"""
|
"""创建文件夹"""
|
||||||
try:
|
try:
|
||||||
|
storage = get_storage()
|
||||||
data = request.get_json()
|
data = request.get_json()
|
||||||
path = data.get("path")
|
path = data.get("path")
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[project]
|
[project]
|
||||||
name = "Cloud-Index"
|
name = "Cloud-Index"
|
||||||
version = "0.9.0"
|
version = "0.10.1"
|
||||||
description = "A cloud storage index system based on a number of cloud storage platforms"
|
description = "A cloud storage index system based on a number of cloud storage platforms"
|
||||||
authors = [{ name = "RhenCloud", email = "i@rhen.cloud" }]
|
authors = [{ name = "RhenCloud", email = "i@rhen.cloud" }]
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
|
|||||||
@@ -1,14 +1,11 @@
|
|||||||
import os
|
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
import dotenv
|
from config import Config
|
||||||
|
|
||||||
from .base import BaseStorage
|
from .base import BaseStorage
|
||||||
from .github import GitHubStorage
|
from .github import GitHubStorage
|
||||||
from .r2 import R2Storage
|
from .r2 import R2Storage
|
||||||
|
|
||||||
dotenv.load_dotenv()
|
|
||||||
|
|
||||||
|
|
||||||
class StorageFactory:
|
class StorageFactory:
|
||||||
"""存储工厂类,根据配置创建对应的存储实例"""
|
"""存储工厂类,根据配置创建对应的存储实例"""
|
||||||
@@ -29,24 +26,19 @@ class StorageFactory:
|
|||||||
if cls._instance is not None:
|
if cls._instance is not None:
|
||||||
return cls._instance
|
return cls._instance
|
||||||
|
|
||||||
storage_type = os.getenv("STORAGE_TYPE")
|
storage_type = Config.STORAGE_TYPE
|
||||||
|
|
||||||
if not storage_type:
|
if not storage_type:
|
||||||
raise RuntimeError(
|
raise RuntimeError("STORAGE_TYPE environment variable is not set. Supported types: r2, github")
|
||||||
"STORAGE_TYPE environment variable is not set. "
|
|
||||||
"Supported types: r2, github"
|
|
||||||
)
|
|
||||||
|
|
||||||
storage_type = storage_type.lower()
|
|
||||||
|
|
||||||
if storage_type == "r2":
|
if storage_type == "r2":
|
||||||
cls._instance = R2Storage()
|
cls._instance = R2Storage()
|
||||||
elif storage_type == "github":
|
elif storage_type == "github":
|
||||||
cls._instance = GitHubStorage()
|
cls._instance = GitHubStorage()
|
||||||
else:
|
else:
|
||||||
raise RuntimeError(
|
raise RuntimeError(f"Unsupported storage type: {storage_type}. Supported types: r2, github")
|
||||||
f"Unsupported storage type: {storage_type}. Supported types: r2, github"
|
|
||||||
)
|
return cls._instance
|
||||||
|
|
||||||
return cls._instance
|
return cls._instance
|
||||||
|
|
||||||
|
|||||||
@@ -1,16 +1,14 @@
|
|||||||
import base64
|
import base64
|
||||||
import os
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
from typing import Any, Dict
|
from typing import Any, Dict
|
||||||
|
|
||||||
import dotenv
|
|
||||||
import requests
|
import requests
|
||||||
from PIL import Image
|
from PIL import Image
|
||||||
|
|
||||||
from .base import BaseStorage
|
from config import Config
|
||||||
|
|
||||||
dotenv.load_dotenv()
|
from .base import BaseStorage
|
||||||
|
|
||||||
|
|
||||||
class StreamWrapper:
|
class StreamWrapper:
|
||||||
@@ -50,29 +48,30 @@ class GitHubStorage(BaseStorage):
|
|||||||
"""基于 GitHub 仓库的存储实现"""
|
"""基于 GitHub 仓库的存储实现"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.repo_owner = os.getenv("GITHUB_REPO_OWNER")
|
"""初始化 GitHub 存储客户端"""
|
||||||
self.repo_name = os.getenv("GITHUB_REPO_NAME")
|
self.token = Config.GITHUB_TOKEN
|
||||||
self.access_token = os.getenv("GITHUB_ACCESS_TOKEN")
|
repo_full = Config.GITHUB_REPO # 格式: owner/repo
|
||||||
self.branch = os.getenv("GITHUB_BRANCH", "main")
|
self.branch = Config.GITHUB_BRANCH
|
||||||
# 反向代理 URL,用于加速 GitHub raw 文件访问
|
|
||||||
# 例如:https://raw. githubusercontent.com/ 的反向代理 URL
|
|
||||||
self.raw_proxy_url = os.getenv("GITHUB_RAW_PROXY_URL", "").rstrip("/")
|
|
||||||
|
|
||||||
if not all([self.repo_owner, self.repo_name, self.access_token]):
|
if not self.token or not repo_full:
|
||||||
raise RuntimeError("GITHUB_REPO_OWNER, GITHUB_REPO_NAME, GITHUB_ACCESS_TOKEN must be set")
|
raise RuntimeError("GITHUB_TOKEN and GITHUB_REPO must be set")
|
||||||
|
|
||||||
|
# 解析 owner/repo
|
||||||
|
repo_parts = repo_full.split("/")
|
||||||
|
if len(repo_parts) != 2:
|
||||||
|
raise RuntimeError(f"GITHUB_REPO must be in format 'owner/repo', got: {repo_full}")
|
||||||
|
|
||||||
|
self.repo_owner = repo_parts[0]
|
||||||
|
self.repo_name = repo_parts[1]
|
||||||
|
self.repo = repo_full
|
||||||
|
|
||||||
self.api_base_url = f"https://api.github.com/repos/{self.repo_owner}/{self.repo_name}"
|
self.api_base_url = f"https://api.github.com/repos/{self.repo_owner}/{self.repo_name}"
|
||||||
|
self.raw_content_url = f"https://raw.githubusercontent.com/{self.repo_owner}/{self.repo_name}/{self.branch}"
|
||||||
# 如果配置了代理 URL,则使用代理 URL;否则使用官方 raw.githubusercontent.com
|
|
||||||
if self.raw_proxy_url:
|
|
||||||
self.raw_content_url = f"{self.raw_proxy_url}/https://raw.githubusercontent.com/{self.repo_owner}/{self.repo_name}/{self.branch}"
|
|
||||||
else:
|
|
||||||
self.raw_content_url = f"https://raw.githubusercontent.com/{self.repo_owner}/{self.repo_name}/{self.branch}"
|
|
||||||
|
|
||||||
def _headers(self) -> Dict[str, str]:
|
def _headers(self) -> Dict[str, str]:
|
||||||
"""返回 API 请求的公共头部信息"""
|
"""返回 API 请求的公共头部信息"""
|
||||||
return {
|
return {
|
||||||
"Authorization": f"token {self.access_token}",
|
"Authorization": f"token {self.token}",
|
||||||
"Accept": "application/vnd.github.v3+json",
|
"Accept": "application/vnd.github.v3+json",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
101
storages/r2.py
101
storages/r2.py
@@ -3,31 +3,34 @@ from io import BytesIO
|
|||||||
from typing import Any, Dict
|
from typing import Any, Dict
|
||||||
|
|
||||||
import boto3
|
import boto3
|
||||||
import dotenv
|
from botocore.config import Config as BotocoreConfig
|
||||||
from botocore.config import Config
|
|
||||||
from PIL import Image
|
from PIL import Image
|
||||||
|
|
||||||
|
from config import Config
|
||||||
|
|
||||||
from .base import BaseStorage
|
from .base import BaseStorage
|
||||||
|
|
||||||
dotenv.load_dotenv()
|
|
||||||
|
|
||||||
|
|
||||||
class R2Storage(BaseStorage):
|
class R2Storage(BaseStorage):
|
||||||
def __init__(self):
|
"""Cloudflare R2 存储后端实现"""
|
||||||
self.endpoint = os.getenv("R2_ENDPOINT_URL")
|
|
||||||
if not self.endpoint:
|
|
||||||
raise RuntimeError("R2_ENDPOINT_URL environment variable is not set")
|
|
||||||
|
|
||||||
self.access_key = os.getenv("ACCESS_KEY_ID") or os.getenv("ACCESS_KEY_ID")
|
def __init__(self):
|
||||||
self.secret_key = os.getenv("SECRET_ACCESS_KEY") or os.getenv(
|
"""初始化 R2 存储客户端"""
|
||||||
"SECRET_ACCESS_KEY"
|
# 从统一配置中读取
|
||||||
)
|
account_id = Config.R2_ACCOUNT_ID
|
||||||
|
if not account_id:
|
||||||
|
raise RuntimeError("R2_ACCOUNT_ID environment variable is not set")
|
||||||
|
|
||||||
|
self.endpoint = f"https://{account_id}.r2.cloudflarestorage.com"
|
||||||
|
self.access_key = Config.R2_ACCESS_KEY_ID
|
||||||
|
self.secret_key = Config.R2_SECRET_ACCESS_KEY
|
||||||
|
|
||||||
if not self.access_key or not self.secret_key:
|
if not self.access_key or not self.secret_key:
|
||||||
raise RuntimeError("ACCESS_KEY_ID and SECRET_ACCESS_KEY must be set")
|
raise RuntimeError("R2_ACCESS_KEY_ID and R2_SECRET_ACCESS_KEY must be set")
|
||||||
|
|
||||||
self.region_name = os.getenv("R2_REGION", "auto")
|
self.region_name = "auto"
|
||||||
self.bucket_name = os.getenv("R2_BUCKET_NAME")
|
self.bucket_name = Config.R2_BUCKET_NAME
|
||||||
|
self.public_domain = Config.R2_PUBLIC_DOMAIN
|
||||||
|
|
||||||
def get_s3_client(self):
|
def get_s3_client(self):
|
||||||
"""
|
"""
|
||||||
@@ -38,7 +41,7 @@ class R2Storage(BaseStorage):
|
|||||||
endpoint_url=self.endpoint,
|
endpoint_url=self.endpoint,
|
||||||
aws_access_key_id=self.access_key,
|
aws_access_key_id=self.access_key,
|
||||||
aws_secret_access_key=self.secret_key,
|
aws_secret_access_key=self.secret_key,
|
||||||
config=Config(signature_version="s3v4"),
|
config=BotocoreConfig(signature_version="s3v4"),
|
||||||
region_name=self.region_name,
|
region_name=self.region_name,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -95,10 +98,9 @@ class R2Storage(BaseStorage):
|
|||||||
"""
|
"""
|
||||||
生成对象的公共访问 URL
|
生成对象的公共访问 URL
|
||||||
"""
|
"""
|
||||||
base_url = os.getenv("R2_PUBLIC_URL")
|
if not self.public_domain:
|
||||||
if not base_url:
|
|
||||||
return None
|
return None
|
||||||
return f"{base_url.rstrip('/')}/{key}"
|
return f"{self.public_domain.rstrip('/')}/{key}"
|
||||||
|
|
||||||
def generate_thumbnail(self, file_path: str) -> bytes:
|
def generate_thumbnail(self, file_path: str) -> bytes:
|
||||||
"""
|
"""
|
||||||
@@ -162,9 +164,7 @@ class R2Storage(BaseStorage):
|
|||||||
|
|
||||||
# 复制对象到新路径
|
# 复制对象到新路径
|
||||||
copy_source = {"Bucket": self.bucket_name, "Key": old_key}
|
copy_source = {"Bucket": self.bucket_name, "Key": old_key}
|
||||||
s3_client.copy_object(
|
s3_client.copy_object(CopySource=copy_source, Bucket=self.bucket_name, Key=new_key)
|
||||||
CopySource=copy_source, Bucket=self.bucket_name, Key=new_key
|
|
||||||
)
|
|
||||||
|
|
||||||
# 删除原对象
|
# 删除原对象
|
||||||
s3_client.delete_object(Bucket=self.bucket_name, Key=old_key)
|
s3_client.delete_object(Bucket=self.bucket_name, Key=old_key)
|
||||||
@@ -195,9 +195,7 @@ class R2Storage(BaseStorage):
|
|||||||
# 分批次删除,S3/R2 一次最多删除 1000 个
|
# 分批次删除,S3/R2 一次最多删除 1000 个
|
||||||
for i in range(0, len(objects_to_delete), 1000):
|
for i in range(0, len(objects_to_delete), 1000):
|
||||||
chunk = objects_to_delete[i : i + 1000]
|
chunk = objects_to_delete[i : i + 1000]
|
||||||
s3_client.delete_objects(
|
s3_client.delete_objects(Bucket=self.bucket_name, Delete={"Objects": chunk})
|
||||||
Bucket=self.bucket_name, Delete={"Objects": chunk}
|
|
||||||
)
|
|
||||||
|
|
||||||
return True
|
return True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -225,9 +223,7 @@ class R2Storage(BaseStorage):
|
|||||||
for old_key in objects_to_rename:
|
for old_key in objects_to_rename:
|
||||||
new_key = old_key.replace(old_prefix, new_prefix, 1)
|
new_key = old_key.replace(old_prefix, new_prefix, 1)
|
||||||
copy_source = {"Bucket": self.bucket_name, "Key": old_key}
|
copy_source = {"Bucket": self.bucket_name, "Key": old_key}
|
||||||
s3_client.copy_object(
|
s3_client.copy_object(CopySource=copy_source, Bucket=self.bucket_name, Key=new_key)
|
||||||
CopySource=copy_source, Bucket=self.bucket_name, Key=new_key
|
|
||||||
)
|
|
||||||
|
|
||||||
# 删除旧文件夹下的所有对象
|
# 删除旧文件夹下的所有对象
|
||||||
self.delete_folder(old_prefix)
|
self.delete_folder(old_prefix)
|
||||||
@@ -244,9 +240,7 @@ class R2Storage(BaseStorage):
|
|||||||
try:
|
try:
|
||||||
s3_client = self.get_s3_client()
|
s3_client = self.get_s3_client()
|
||||||
copy_source = {"Bucket": self.bucket_name, "Key": source_key}
|
copy_source = {"Bucket": self.bucket_name, "Key": source_key}
|
||||||
s3_client.copy_object(
|
s3_client.copy_object(CopySource=copy_source, Bucket=self.bucket_name, Key=dest_key)
|
||||||
CopySource=copy_source, Bucket=self.bucket_name, Key=dest_key
|
|
||||||
)
|
|
||||||
return True
|
return True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"File copy failed: {str(e)}")
|
print(f"File copy failed: {str(e)}")
|
||||||
@@ -322,3 +316,48 @@ class R2Storage(BaseStorage):
|
|||||||
}
|
}
|
||||||
|
|
||||||
return content_types.get(ext, "application/octet-stream")
|
return content_types.get(ext, "application/octet-stream")
|
||||||
|
|
||||||
|
def generate_download_response(self, key: str) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
生成文件下载响应(R2 实现)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key: 对象键名(文件路径)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
包含下载信息的字典
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
s3_client = self.get_s3_client()
|
||||||
|
file_name = key.split("/")[-1] if "/" in key else key
|
||||||
|
|
||||||
|
# 使用 RFC 5987 编码处理文件名
|
||||||
|
from urllib.parse import quote
|
||||||
|
|
||||||
|
encoded_filename = quote(file_name.encode("utf-8"), safe="")
|
||||||
|
|
||||||
|
# 生成带有 Content-Disposition 的预签名 URL
|
||||||
|
expires = int(os.getenv("R2_PRESIGN_EXPIRES", "3600"))
|
||||||
|
|
||||||
|
url = s3_client.generate_presigned_url(
|
||||||
|
"get_object",
|
||||||
|
Params={
|
||||||
|
"Bucket": self.bucket_name,
|
||||||
|
"Key": key,
|
||||||
|
"ResponseContentDisposition": f"attachment; filename=\"{file_name}\"; filename*=UTF-8''{encoded_filename}",
|
||||||
|
},
|
||||||
|
ExpiresIn=expires,
|
||||||
|
)
|
||||||
|
|
||||||
|
if url:
|
||||||
|
return {"type": "redirect", "url": url}
|
||||||
|
|
||||||
|
# 如果预签名 URL 失败,尝试公共 URL
|
||||||
|
public_url = self.get_public_url(key)
|
||||||
|
if public_url:
|
||||||
|
return {"type": "redirect", "url": public_url}
|
||||||
|
|
||||||
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
print(f"R2 download response generation failed: {str(e)}")
|
||||||
|
return None
|
||||||
|
|||||||
160
utils.py
Normal file
160
utils.py
Normal file
@@ -0,0 +1,160 @@
|
|||||||
|
"""
|
||||||
|
工具函数模块
|
||||||
|
集中管理常用的辅助函数
|
||||||
|
"""
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
|
||||||
|
def format_timestamp(timestamp) -> str:
|
||||||
|
"""
|
||||||
|
格式化时间戳为人类可读的格式
|
||||||
|
|
||||||
|
Args:
|
||||||
|
timestamp: 时间戳对象(datetime 或其他)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
格式化后的时间字符串
|
||||||
|
"""
|
||||||
|
if isinstance(timestamp, datetime):
|
||||||
|
return timestamp.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
return str(timestamp)
|
||||||
|
|
||||||
|
|
||||||
|
def format_file_size(size_bytes: Optional[int]) -> str:
|
||||||
|
"""
|
||||||
|
格式化文件大小为人类可读的格式
|
||||||
|
|
||||||
|
Args:
|
||||||
|
size_bytes: 文件大小(字节)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
格式化后的大小字符串(如 "1.23MB")
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if size_bytes is None:
|
||||||
|
return "-"
|
||||||
|
num = float(size_bytes)
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
return "-"
|
||||||
|
|
||||||
|
for unit in ["B", "KB", "MB", "GB", "TB"]:
|
||||||
|
if num < 1024:
|
||||||
|
# 对于字节,显示整数
|
||||||
|
if unit == "B":
|
||||||
|
return f"{int(num)}{unit}"
|
||||||
|
# 其他单位保留两位小数
|
||||||
|
return f"{num:.2f}{unit}"
|
||||||
|
num = num / 1024.0
|
||||||
|
return f"{num:.2f}PB"
|
||||||
|
|
||||||
|
|
||||||
|
def get_file_icon(filename: Optional[str]) -> str:
|
||||||
|
"""
|
||||||
|
根据文件名返回对应的 Font Awesome 图标类名
|
||||||
|
|
||||||
|
Args:
|
||||||
|
filename: 文件名
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Font Awesome 图标类名
|
||||||
|
"""
|
||||||
|
if not filename:
|
||||||
|
return "fas fa-file"
|
||||||
|
|
||||||
|
ext = filename.lower().split(".")[-1] if "." in filename else ""
|
||||||
|
|
||||||
|
# 定义文件类型映射
|
||||||
|
icon_map = {
|
||||||
|
"fas fa-image": ["jpg", "jpeg", "png", "gif", "bmp", "webp", "svg"],
|
||||||
|
"fas fa-music": ["mp3", "wav", "ogg", "flac", "m4a", "aac"],
|
||||||
|
"fas fa-video": ["mp4", "webm", "avi", "mov", "wmv", "flv", "mkv"],
|
||||||
|
"fas fa-file-alt": ["pdf", "doc", "docx", "txt", "md", "rtf"],
|
||||||
|
"fas fa-file-archive": ["zip", "rar", "7z", "tar", "gz"],
|
||||||
|
"fas fa-file-code": ["py", "js", "html", "css", "java", "cpp", "c", "php"],
|
||||||
|
"fas fa-file-excel": ["xls", "xlsx", "csv"],
|
||||||
|
"fas fa-file-powerpoint": ["ppt", "pptx"],
|
||||||
|
}
|
||||||
|
|
||||||
|
for icon, extensions in icon_map.items():
|
||||||
|
if ext in extensions:
|
||||||
|
return icon
|
||||||
|
|
||||||
|
return "fas fa-file"
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_path(path: str, is_folder: bool = False) -> str:
|
||||||
|
"""
|
||||||
|
规范化路径格式
|
||||||
|
|
||||||
|
Args:
|
||||||
|
path: 原始路径
|
||||||
|
is_folder: 是否为文件夹
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
规范化后的路径(文件夹以 / 结尾)
|
||||||
|
"""
|
||||||
|
path = path.strip()
|
||||||
|
if is_folder and not path.endswith("/"):
|
||||||
|
return path + "/"
|
||||||
|
if not is_folder and path.endswith("/"):
|
||||||
|
return path.rstrip("/")
|
||||||
|
return path
|
||||||
|
|
||||||
|
|
||||||
|
def get_file_extension(filename: str) -> str:
|
||||||
|
"""
|
||||||
|
获取文件扩展名
|
||||||
|
|
||||||
|
Args:
|
||||||
|
filename: 文件名
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
小写的文件扩展名(不含点)
|
||||||
|
"""
|
||||||
|
if not filename or "." not in filename:
|
||||||
|
return ""
|
||||||
|
return filename.lower().split(".")[-1]
|
||||||
|
|
||||||
|
|
||||||
|
def is_image_file(filename: str) -> bool:
|
||||||
|
"""
|
||||||
|
判断文件是否为图片
|
||||||
|
|
||||||
|
Args:
|
||||||
|
filename: 文件名
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
如果是图片文件返回 True
|
||||||
|
"""
|
||||||
|
image_extensions = ["jpg", "jpeg", "png", "gif", "bmp", "webp", "svg", "ico"]
|
||||||
|
return get_file_extension(filename) in image_extensions
|
||||||
|
|
||||||
|
|
||||||
|
def is_video_file(filename: str) -> bool:
|
||||||
|
"""
|
||||||
|
判断文件是否为视频
|
||||||
|
|
||||||
|
Args:
|
||||||
|
filename: 文件名
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
如果是视频文件返回 True
|
||||||
|
"""
|
||||||
|
video_extensions = ["mp4", "webm", "ogg", "mov", "avi", "mkv", "m4v"]
|
||||||
|
return get_file_extension(filename) in video_extensions
|
||||||
|
|
||||||
|
|
||||||
|
def is_audio_file(filename: str) -> bool:
|
||||||
|
"""
|
||||||
|
判断文件是否为音频
|
||||||
|
|
||||||
|
Args:
|
||||||
|
filename: 文件名
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
如果是音频文件返回 True
|
||||||
|
"""
|
||||||
|
audio_extensions = ["mp3", "wav", "ogg", "m4a", "aac", "flac", "opus", "weba"]
|
||||||
|
return get_file_extension(filename) in audio_extensions
|
||||||
Reference in New Issue
Block a user