import hashlib import os from datetime import datetime from typing import Any, Dict, List from flask import ( Blueprint, Response, abort, jsonify, redirect, render_template, request, ) from storages.factory import StorageFactory main_route = Blueprint("main", __name__) # 初始化存储(使用工厂模式) storage = StorageFactory.get_storage() # 缩略图默认 TTL(秒),可通过环境变量覆盖 THUMB_TTL = int(os.environ.get("THUMB_TTL_SECONDS", "3600")) def format_timestamp(timestamp) -> str: """ 格式化时间戳为人类可读的格式 """ return storage.format_timestamp(timestamp) def get_public_url(key: str) -> str: """ 生成对象的公共访问 URL """ return storage.get_public_url(key) def get_file_url(key: str) -> str: """生成通过服务器访问文件的 URL""" return f"/file/{key}" def build_file_entry(obj: Dict[str, Any], prefix: str) -> Dict[str, Any] | None: """根据对象信息构建文件条目。""" key = obj.get("Key", "") if not key: return None if prefix and key == prefix: return None if key.endswith("/"): return None rel_name = key[len(prefix) :] if prefix else key entry: Dict[str, Any] = { "name": rel_name, "key": key, "size": obj.get("Size"), "last_modified": format_timestamp(obj.get("LastModified")), "is_dir": False, "file_url": get_file_url(key), } public_url = get_public_url(key) if public_url: entry["public_url"] = public_url presigned = storage.generate_presigned_url(key) if presigned: entry["presigned_url"] = presigned return entry def build_directory_entry( prefix_value: str | None, current_prefix: str ) -> Dict[str, Any] | None: """根据前缀构建目录条目。""" if not prefix_value: return None rel = ( prefix_value[len(current_prefix) :].rstrip("/") if current_prefix else prefix_value.rstrip("/") ) return {"name": rel, "key": prefix_value, "is_dir": True} def build_entries(response: Dict[str, Any], prefix: str) -> List[Dict[str, Any]]: """将存储响应转换为用于模板渲染的条目列表。""" entries: List[Dict[str, Any]] = [] for obj in response.get("Contents", []): entry = build_file_entry(obj, prefix) if entry: entries.append(entry) for pref in response.get("CommonPrefixes", []): directory_entry = build_directory_entry(pref.get("Prefix"), prefix) if directory_entry: entries.append(directory_entry) entries.sort(key=lambda x: (not x.get("is_dir", False), x["name"])) return entries def build_crumbs(prefix: str) -> List[Dict[str, str]]: """根据当前前缀构建面包屑导航数据。""" crumbs: List[Dict[str, str]] = [] if prefix: segs = prefix.rstrip("/").split("/") acc = "" for seg in segs: acc = acc + seg + "/" crumbs.append({"name": seg, "prefix": acc}) return crumbs @main_route.route("/") def index(): """ 返回文件和目录列表的 HTML 页面。 """ try: prefix = request.args.get("prefix", "") or "" response = storage.list_objects(prefix) entries = build_entries(response, prefix) crumbs = build_crumbs(prefix) return render_template( "index.html", entries=entries, current_prefix=prefix, crumbs=crumbs, current_year=datetime.now().year, ) except Exception: abort(500) @main_route.route("/") def browse(prefix_path): """目录路由。将 URL /a/b 映射为 prefix 'a/b/' 并重用 index 的逻辑。""" try: prefix = prefix_path or "" if prefix and not prefix.endswith("/"): prefix = prefix + "/" response = storage.list_objects(prefix) entries = build_entries(response, prefix) crumbs = build_crumbs(prefix) return render_template( "index.html", entries=entries, current_prefix=prefix, crumbs=crumbs, current_year=datetime.now().year, ) except Exception: abort(500) @main_route.route("/file/") def serve_file(file_path): """通过服务器提供文件访问""" try: # 获取文件的基本信息,验证文件存在并检查大小 try: info = storage.get_object_info(file_path) except Exception: abort(404) size = int(info.get("ContentLength", 0) or 0) limit = 6 * 1024 * 1024 # 6 MB # 如果文件较大,避免通过 Serverless 函数传输,返回预签名 URL 的重定向 if size > limit: presigned = storage.generate_presigned_url(file_path) if presigned: return redirect(presigned) # 如果没有预签名 URL,则返回 413(Payload Too Large) abort(413) # 小文件:直接从存储获取并通过 Response 返回(流式) file_obj = storage.get_object(file_path) headers = { "Content-Type": file_obj.get("ContentType", "application/octet-stream"), "Content-Length": str(file_obj.get("ContentLength", 0)), } return Response( file_obj["Body"].iter_chunks(), headers=headers, direct_passthrough=True ) except Exception: abort(500) @main_route.route("/thumb/") def thumb(file_path): """返回图片的缩略图,使用 Vercel Cache Headers 避免重复从 R2 拉取""" # 设置更长的缓存控制头以支持浏览器本地缓存 cache_headers = { "Cache-Control": f"public, max-age={THUMB_TTL}", "ETag": f'W/"{hashlib.md5(file_path.encode("utf-8")).hexdigest()}"', } # 先检查客户端是否已经有缓存版本 etag = request.headers.get("If-None-Match") if etag and etag == cache_headers["ETag"]: return Response(status=304, headers=cache_headers) # 从获取原始对象并生成缩略图 try: # 对于较大的源文件,避免在函数内读取并处理,优先使用预签名 URL try: info = storage.get_object_info(file_path) except Exception: abort(404) size = int(info.get("ContentLength", 0) or 0) limit = 6 * 1024 * 1024 if size > limit: presigned = storage.generate_presigned_url(file_path) if presigned: return redirect(presigned) abort(413) thumb_bytes = storage.generate_thumbnail(file_path) response = Response(thumb_bytes, mimetype="image/jpeg") response.headers.update(cache_headers) return response except Exception: abort(404) @main_route.route("/upload", methods=["POST"]) def upload(): """上传文件到存储""" try: # 检查是否有文件 if "file" not in request.files: return jsonify({"success": False, "error": "No file provided"}), 400 file = request.files["file"] # 检查文件名 if file.filename == "": return jsonify({"success": False, "error": "No file selected"}), 400 # 获取目标路径(可选) prefix = request.form.get("prefix", "") if prefix and not prefix.endswith("/"): prefix = prefix + "/" # 构建完整的文件路径 file_path = prefix + file.filename # 读取文件数据 file_data = file.read() # 获取文件类型 content_type = file.content_type # 上传文件 success = storage.upload_file(file_path, file_data, content_type) if success: return jsonify( { "success": True, "message": "File uploaded successfully", "path": file_path, } ) else: return jsonify({"success": False, "error": "Upload failed"}), 500 except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @main_route.route("/delete/", methods=["DELETE", "POST"]) def delete(file_path): """删除存储中的文件""" try: # 删除文件 success = storage.delete_file(file_path) if success: return jsonify({"success": True, "message": "File deleted successfully"}) else: return jsonify({"success": False, "error": "Delete failed"}), 500 except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @main_route.route("/rename/", methods=["POST"]) def rename(old_key): """重命名存储中的文件""" try: data = request.get_json() new_name = data.get("newName") if not new_name: return jsonify({"success": False, "error": "New name not provided"}), 400 # 构建新的文件路径 prefix = os.path.dirname(old_key) if prefix: new_key = f"{prefix}/{new_name}" else: new_key = new_name # 重命名文件 success = storage.rename_file(old_key, new_key) if success: return jsonify( { "success": True, "message": "File renamed successfully", "newKey": new_key, } ) else: return jsonify({"success": False, "error": "Rename failed"}), 500 except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @main_route.route("/delete_folder/", methods=["DELETE"]) def delete_folder_route(prefix): """删除存储中的文件夹""" try: if not prefix.endswith("/"): prefix += "/" success = storage.delete_folder(prefix) if success: return jsonify({"success": True, "message": "Folder deleted successfully"}) else: return jsonify({"success": False, "error": "Folder delete failed"}), 500 except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @main_route.route("/rename_folder/", methods=["POST"]) def rename_folder_route(old_prefix): """重命名存储中的文件夹""" try: data = request.get_json() new_name = data.get("newName") if not new_name: return jsonify({"success": False, "error": "New name not provided"}), 400 if not old_prefix.endswith("/"): old_prefix += "/" # 构建新的文件夹路径 parent_prefix = os.path.dirname(os.path.dirname(old_prefix)) if parent_prefix: new_prefix = f"{parent_prefix}/{new_name}/" else: new_prefix = f"{new_name}/" success = storage.rename_folder(old_prefix, new_prefix) if success: return jsonify( { "success": True, "message": "Folder renamed successfully", "newPrefix": new_prefix, } ) else: return jsonify({"success": False, "error": "Folder rename failed"}), 500 except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @main_route.route("/copy", methods=["POST"]) def copy_item(): """复制文件或文件夹""" try: data = request.get_json() source = data.get("source") destination = data.get("destination") is_folder = data.get("is_folder", False) if not source or not destination: return ( jsonify( {"success": False, "error": "Source or destination not provided"} ), 400, ) if is_folder: if not source.endswith("/"): source += "/" if not destination.endswith("/"): destination += "/" success = storage.copy_folder(source, destination) else: success = storage.copy_file(source, destination) if success: return jsonify({"success": True, "message": "Item copied successfully"}) else: return jsonify({"success": False, "error": "Copy failed"}), 500 except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @main_route.route("/move", methods=["POST"]) def move_item(): """移动文件或文件夹""" try: data = request.get_json() source = data.get("source") destination = data.get("destination") is_folder = data.get("is_folder", False) if not source or not destination: return ( jsonify( {"success": False, "error": "Source or destination not provided"} ), 400, ) if is_folder: if not source.endswith("/"): source += "/" if not destination.endswith("/"): destination += "/" success = storage.rename_folder(source, destination) else: success = storage.rename_file(source, destination) if success: return jsonify({"success": True, "message": "Item moved successfully"}) else: return jsonify({"success": False, "error": "Move failed"}), 500 except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @main_route.route("/create_folder", methods=["POST"]) def create_folder_route(): """创建文件夹""" try: data = request.get_json() path = data.get("path") if not path: return jsonify({"success": False, "error": "Path not provided"}), 400 if not path.endswith("/"): path += "/" success = storage.create_folder(path) if success: return jsonify({"success": True, "message": "Folder created successfully"}) else: return jsonify({"success": False, "error": "Folder creation failed"}), 500 except Exception as e: return jsonify({"success": False, "error": str(e)}), 500