mirror of
https://github.com/RhenCloud/Cloud-Index.git
synced 2025-12-06 15:26:10 +08:00
489 lines
15 KiB
Python
489 lines
15 KiB
Python
import hashlib
|
||
import os
|
||
from datetime import datetime
|
||
from typing import Any, Dict, List
|
||
|
||
from flask import Blueprint, Response, abort, jsonify, redirect, render_template, request
|
||
|
||
from storages.factory import StorageFactory
|
||
|
||
main_route = Blueprint("main", __name__)
|
||
|
||
# 初始化存储(使用工厂模式)
|
||
storage = StorageFactory.get_storage()
|
||
|
||
# 缩略图默认 TTL(秒),可通过环境变量覆盖
|
||
THUMB_TTL = int(os.environ.get("THUMB_TTL_SECONDS", "3600"))
|
||
|
||
|
||
def format_timestamp(timestamp) -> str:
|
||
"""
|
||
格式化时间戳为人类可读的格式
|
||
"""
|
||
return storage.format_timestamp(timestamp)
|
||
|
||
|
||
def get_public_url(key: str) -> str:
|
||
"""
|
||
生成对象的公共访问 URL
|
||
"""
|
||
return storage.get_public_url(key)
|
||
|
||
|
||
def get_file_url(key: str) -> str:
|
||
"""生成通过服务器访问文件的 URL"""
|
||
return f"/file/{key}"
|
||
|
||
|
||
def build_file_entry(obj: Dict[str, Any], prefix: str) -> Dict[str, Any] | None:
|
||
"""根据对象信息构建文件条目。"""
|
||
key = obj.get("Key", "")
|
||
if not key:
|
||
return None
|
||
|
||
if prefix and key == prefix:
|
||
return None
|
||
|
||
if key.endswith("/"):
|
||
return None
|
||
|
||
rel_name = key[len(prefix) :] if prefix else key
|
||
|
||
entry: Dict[str, Any] = {
|
||
"name": rel_name,
|
||
"key": key,
|
||
"size": obj.get("Size"),
|
||
"last_modified": format_timestamp(obj.get("LastModified")),
|
||
"is_dir": False,
|
||
"file_url": get_file_url(key),
|
||
}
|
||
|
||
public_url = get_public_url(key)
|
||
if public_url:
|
||
entry["public_url"] = public_url
|
||
|
||
presigned = storage.generate_presigned_url(key)
|
||
if presigned:
|
||
entry["presigned_url"] = presigned
|
||
|
||
return entry
|
||
|
||
|
||
def build_directory_entry(prefix_value: str | None, current_prefix: str) -> Dict[str, Any] | None:
|
||
"""根据前缀构建目录条目。"""
|
||
if not prefix_value:
|
||
return None
|
||
|
||
rel = prefix_value[len(current_prefix) :].rstrip("/") if current_prefix else prefix_value.rstrip("/")
|
||
|
||
return {"name": rel, "key": prefix_value, "is_dir": True}
|
||
|
||
|
||
def build_entries(response: Dict[str, Any], prefix: str) -> List[Dict[str, Any]]:
|
||
"""将存储响应转换为用于模板渲染的条目列表。"""
|
||
entries: List[Dict[str, Any]] = []
|
||
|
||
for obj in response.get("Contents", []):
|
||
entry = build_file_entry(obj, prefix)
|
||
if entry:
|
||
entries.append(entry)
|
||
|
||
for pref in response.get("CommonPrefixes", []):
|
||
directory_entry = build_directory_entry(pref.get("Prefix"), prefix)
|
||
if directory_entry:
|
||
entries.append(directory_entry)
|
||
|
||
entries.sort(key=lambda x: (not x.get("is_dir", False), x["name"]))
|
||
return entries
|
||
|
||
|
||
def build_crumbs(prefix: str) -> List[Dict[str, str]]:
|
||
"""根据当前前缀构建导航数据。"""
|
||
crumbs: List[Dict[str, str]] = []
|
||
if prefix:
|
||
segs = prefix.rstrip("/").split("/")
|
||
acc = ""
|
||
for seg in segs:
|
||
acc = acc + seg + "/"
|
||
crumbs.append({"name": seg, "prefix": acc})
|
||
return crumbs
|
||
|
||
|
||
@main_route.route("/")
|
||
def index():
|
||
"""
|
||
返回文件和目录列表的 HTML 页面。
|
||
"""
|
||
try:
|
||
prefix = request.args.get("prefix", "") or ""
|
||
|
||
response = storage.list_objects(prefix)
|
||
entries = build_entries(response, prefix)
|
||
crumbs = build_crumbs(prefix)
|
||
|
||
return render_template(
|
||
"index.html",
|
||
entries=entries,
|
||
current_prefix=prefix,
|
||
crumbs=crumbs,
|
||
current_year=datetime.now().year,
|
||
)
|
||
except Exception:
|
||
abort(500)
|
||
|
||
|
||
@main_route.route("/<path:prefix_path>")
|
||
def browse(prefix_path):
|
||
"""目录路由。将 URL /a/b 映射为 prefix 'a/b/' 并重用 index 的逻辑。"""
|
||
try:
|
||
prefix = prefix_path or ""
|
||
if prefix and not prefix.endswith("/"):
|
||
prefix = prefix + "/"
|
||
|
||
response = storage.list_objects(prefix)
|
||
entries = build_entries(response, prefix)
|
||
crumbs = build_crumbs(prefix)
|
||
|
||
return render_template(
|
||
"index.html",
|
||
entries=entries,
|
||
current_prefix=prefix,
|
||
crumbs=crumbs,
|
||
current_year=datetime.now().year,
|
||
)
|
||
except Exception:
|
||
abort(500)
|
||
|
||
|
||
@main_route.route("/file/<path:file_path>")
|
||
def serve_file(file_path):
|
||
"""重定向到原始存储 URL,节省服务器资源"""
|
||
try:
|
||
# 验证文件存在
|
||
try:
|
||
storage.get_object_info(file_path)
|
||
except Exception:
|
||
abort(404)
|
||
|
||
# 尝试获取预签名 URL(用于私有存储或需要时间限制的 URL)
|
||
presigned = storage.generate_presigned_url(file_path)
|
||
if presigned:
|
||
return redirect(presigned)
|
||
|
||
# 如果没有预签名 URL,尝试获取公共 URL
|
||
public_url = storage.get_public_url(file_path)
|
||
if public_url:
|
||
return redirect(public_url)
|
||
|
||
# 如果都没有可用的 URL,返回错误
|
||
abort(403)
|
||
|
||
except Exception:
|
||
abort(500)
|
||
|
||
|
||
@main_route.route("/download/<path:file_path>")
|
||
def download_file(file_path):
|
||
"""下载文件,支持所有存储类型"""
|
||
try:
|
||
# 验证文件存在
|
||
try:
|
||
storage.get_object_info(file_path)
|
||
except Exception:
|
||
abort(404)
|
||
|
||
# 使用存储后端的统一接口生成下载响应
|
||
download_response = storage.generate_download_response(file_path)
|
||
|
||
if not download_response:
|
||
abort(403)
|
||
|
||
# 根据响应类型处理
|
||
if download_response["type"] == "redirect":
|
||
return redirect(download_response["url"])
|
||
elif download_response["type"] == "content":
|
||
return Response(
|
||
download_response["content"],
|
||
headers=download_response["headers"],
|
||
mimetype=download_response["mimetype"],
|
||
)
|
||
else:
|
||
abort(500)
|
||
|
||
except Exception as e:
|
||
print(f"Download error: {e}")
|
||
abort(500)
|
||
|
||
|
||
@main_route.route("/thumb/<path:file_path>")
|
||
def thumb(file_path):
|
||
"""返回图片的缩略图,使用 Vercel Cache Headers 避免重复从 R2 拉取"""
|
||
# 设置更长的缓存控制头以支持浏览器本地缓存
|
||
cache_headers = {
|
||
"Cache-Control": f"public, max-age={THUMB_TTL}",
|
||
"ETag": f'W/"{hashlib.md5(file_path.encode("utf-8")).hexdigest()}"',
|
||
}
|
||
|
||
# 先检查客户端是否已经有缓存版本
|
||
etag = request.headers.get("If-None-Match")
|
||
if etag and etag == cache_headers["ETag"]:
|
||
return Response(status=304, headers=cache_headers)
|
||
|
||
# 从获取原始对象并生成缩略图
|
||
try:
|
||
# 对于较大的源文件,避免在函数内读取并处理,优先使用预签名 URL
|
||
try:
|
||
info = storage.get_object_info(file_path)
|
||
except Exception:
|
||
abort(404)
|
||
|
||
size = int(info.get("ContentLength", 0) or 0)
|
||
limit = 6 * 1024 * 1024
|
||
if size > limit:
|
||
presigned = storage.generate_presigned_url(file_path)
|
||
if presigned:
|
||
return redirect(presigned)
|
||
abort(413)
|
||
|
||
thumb_bytes = storage.generate_thumbnail(file_path)
|
||
response = Response(thumb_bytes, mimetype="image/jpeg")
|
||
response.headers.update(cache_headers)
|
||
return response
|
||
except Exception:
|
||
abort(404)
|
||
|
||
|
||
@main_route.route("/upload", methods=["POST"])
|
||
def upload():
|
||
"""上传文件到存储"""
|
||
try:
|
||
# 检查是否有文件
|
||
if "file" not in request.files:
|
||
return jsonify({"success": False, "error": "No file provided"}), 400
|
||
|
||
file = request.files["file"]
|
||
|
||
# 检查文件名
|
||
if file.filename == "":
|
||
return jsonify({"success": False, "error": "No file selected"}), 400
|
||
|
||
# 获取目标路径(可选)
|
||
prefix = request.form.get("prefix", "")
|
||
if prefix and not prefix.endswith("/"):
|
||
prefix = prefix + "/"
|
||
|
||
# 构建完整的文件路径
|
||
file_path = prefix + file.filename
|
||
|
||
# 读取文件数据
|
||
file_data = file.read()
|
||
|
||
# 获取文件类型
|
||
content_type = file.content_type
|
||
|
||
# 上传文件
|
||
success = storage.upload_file(file_path, file_data, content_type)
|
||
|
||
if success:
|
||
return jsonify(
|
||
{
|
||
"success": True,
|
||
"message": "File uploaded successfully",
|
||
"path": file_path,
|
||
}
|
||
)
|
||
else:
|
||
return jsonify({"success": False, "error": "Upload failed"}), 500
|
||
|
||
except Exception as e:
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
|
||
@main_route.route("/delete/<path:file_path>", methods=["DELETE", "POST"])
|
||
def delete(file_path):
|
||
"""删除存储中的文件"""
|
||
try:
|
||
# 删除文件
|
||
success = storage.delete_file(file_path)
|
||
|
||
if success:
|
||
return jsonify({"success": True, "message": "File deleted successfully"})
|
||
else:
|
||
return jsonify({"success": False, "error": "Delete failed"}), 500
|
||
|
||
except Exception as e:
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
|
||
@main_route.route("/rename/<path:old_key>", methods=["POST"])
|
||
def rename(old_key):
|
||
"""重命名存储中的文件"""
|
||
try:
|
||
data = request.get_json()
|
||
new_name = data.get("newName")
|
||
|
||
if not new_name:
|
||
return jsonify({"success": False, "error": "New name not provided"}), 400
|
||
|
||
# 构建新的文件路径
|
||
prefix = os.path.dirname(old_key)
|
||
if prefix:
|
||
new_key = f"{prefix}/{new_name}"
|
||
else:
|
||
new_key = new_name
|
||
|
||
# 重命名文件
|
||
success = storage.rename_file(old_key, new_key)
|
||
|
||
if success:
|
||
return jsonify(
|
||
{
|
||
"success": True,
|
||
"message": "File renamed successfully",
|
||
"newKey": new_key,
|
||
}
|
||
)
|
||
else:
|
||
return jsonify({"success": False, "error": "Rename failed"}), 500
|
||
|
||
except Exception as e:
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
|
||
@main_route.route("/delete_folder/<path:prefix>", methods=["DELETE"])
|
||
def delete_folder_route(prefix):
|
||
"""删除存储中的文件夹"""
|
||
try:
|
||
if not prefix.endswith("/"):
|
||
prefix += "/"
|
||
success = storage.delete_folder(prefix)
|
||
if success:
|
||
return jsonify({"success": True, "message": "Folder deleted successfully"})
|
||
else:
|
||
return jsonify({"success": False, "error": "Folder delete failed"}), 500
|
||
except Exception as e:
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
|
||
@main_route.route("/rename_folder/<path:old_prefix>", methods=["POST"])
|
||
def rename_folder_route(old_prefix):
|
||
"""重命名存储中的文件夹"""
|
||
try:
|
||
data = request.get_json()
|
||
new_name = data.get("newName")
|
||
|
||
if not new_name:
|
||
return jsonify({"success": False, "error": "New name not provided"}), 400
|
||
|
||
if not old_prefix.endswith("/"):
|
||
old_prefix += "/"
|
||
|
||
# 构建新的文件夹路径
|
||
parent_prefix = os.path.dirname(os.path.dirname(old_prefix))
|
||
if parent_prefix:
|
||
new_prefix = f"{parent_prefix}/{new_name}/"
|
||
else:
|
||
new_prefix = f"{new_name}/"
|
||
|
||
success = storage.rename_folder(old_prefix, new_prefix)
|
||
|
||
if success:
|
||
return jsonify(
|
||
{
|
||
"success": True,
|
||
"message": "Folder renamed successfully",
|
||
"newPrefix": new_prefix,
|
||
}
|
||
)
|
||
else:
|
||
return jsonify({"success": False, "error": "Folder rename failed"}), 500
|
||
except Exception as e:
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
|
||
@main_route.route("/copy", methods=["POST"])
|
||
def copy_item():
|
||
"""复制文件或文件夹"""
|
||
try:
|
||
data = request.get_json()
|
||
source = data.get("source")
|
||
destination = data.get("destination")
|
||
is_folder = data.get("is_folder", False)
|
||
|
||
if not source or not destination:
|
||
return (
|
||
jsonify({"success": False, "error": "Source or destination not provided"}),
|
||
400,
|
||
)
|
||
|
||
if is_folder:
|
||
if not source.endswith("/"):
|
||
source += "/"
|
||
if not destination.endswith("/"):
|
||
destination += "/"
|
||
success = storage.copy_folder(source, destination)
|
||
else:
|
||
success = storage.copy_file(source, destination)
|
||
|
||
if success:
|
||
return jsonify({"success": True, "message": "Item copied successfully"})
|
||
else:
|
||
return jsonify({"success": False, "error": "Copy failed"}), 500
|
||
except Exception as e:
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
|
||
@main_route.route("/move", methods=["POST"])
|
||
def move_item():
|
||
"""移动文件或文件夹"""
|
||
try:
|
||
data = request.get_json()
|
||
source = data.get("source")
|
||
destination = data.get("destination")
|
||
is_folder = data.get("is_folder", False)
|
||
|
||
if not source or not destination:
|
||
return (
|
||
jsonify({"success": False, "error": "Source or destination not provided"}),
|
||
400,
|
||
)
|
||
|
||
if is_folder:
|
||
if not source.endswith("/"):
|
||
source += "/"
|
||
if not destination.endswith("/"):
|
||
destination += "/"
|
||
success = storage.rename_folder(source, destination)
|
||
else:
|
||
success = storage.rename_file(source, destination)
|
||
|
||
if success:
|
||
return jsonify({"success": True, "message": "Item moved successfully"})
|
||
else:
|
||
return jsonify({"success": False, "error": "Move failed"}), 500
|
||
except Exception as e:
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
|
||
@main_route.route("/create_folder", methods=["POST"])
|
||
def create_folder_route():
|
||
"""创建文件夹"""
|
||
try:
|
||
data = request.get_json()
|
||
path = data.get("path")
|
||
|
||
if not path:
|
||
return jsonify({"success": False, "error": "Path not provided"}), 400
|
||
|
||
if not path.endswith("/"):
|
||
path += "/"
|
||
|
||
success = storage.create_folder(path)
|
||
|
||
if success:
|
||
return jsonify({"success": True, "message": "Folder created successfully"})
|
||
else:
|
||
return jsonify({"success": False, "error": "Folder creation failed"}), 500
|
||
except Exception as e:
|
||
return jsonify({"success": False, "error": str(e)}), 500
|