""" Thumbnail Crafter - Comprehensive MCP Server ============================================== This MCP server provides complete access to all Thumbnail Crafter features, allowing AI agents to use the tool just like a human would. Features: - Complete API coverage (50+ operations) - Canvas management (size, background, export) - Layout loading and customization - Object manipulation (add, update, delete, transform) - Text operations (search, replace, styling) - Huggy mascot library - Image uploading and manipulation - Selection and layer management - History (undo/redo) - Batch operations """ from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse, FileResponse from fastapi.staticfiles import StaticFiles from playwright.async_api import async_playwright, Browser, Page import json import asyncio import os from typing import Dict, Any, AsyncGenerator, Optional, List from pathlib import Path app = FastAPI( title="Thumbnail Crafter MCP Server - Comprehensive", description="Full-featured AI-callable thumbnail generation with complete canvas control", version="4.0.0" ) # Enable CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) # Global browser instance browser: Optional[Browser] = None APP_URL = os.getenv("APP_URL", "http://localhost:7860") # =================================================================== # Browser Management # =================================================================== async def get_browser() -> Browser: """Get or create browser instance""" global browser if browser is None: playwright = await async_playwright().start() browser = await playwright.chromium.launch( headless=True, args=['--no-sandbox', '--disable-dev-shm-usage', '--disable-gpu'] ) return browser async def close_browser(): """Close browser instance""" global browser if browser: await browser.close() browser = None async def get_page() -> Page: """Create a new page with the app loaded and API ready""" browser = await get_browser() page = await browser.new_page(viewport={"width": 1920, "height": 1080}) print(f"Loading app at {APP_URL}...") await page.goto(APP_URL, wait_until="networkidle", timeout=30000) # Wait for thumbnailAPI to be available print("Waiting for thumbnailAPI...") await page.wait_for_function("window.thumbnailAPI !== undefined", timeout=10000) print("✓ thumbnailAPI ready") return page async def execute_api_call(page: Page, method: str, *args) -> Dict[str, Any]: """Execute a thumbnailAPI method and return the result""" try: # Serialize arguments for JavaScript args_json = json.dumps(args) result = await page.evaluate(f""" async () => {{ const args = {args_json}; const result = await window.thumbnailAPI.{method}(...args); return result; }} """) return result except Exception as e: return { "success": False, "error": str(e), "code": "API_CALL_ERROR" } # =================================================================== # MCP Tool Implementations # =================================================================== async def canvas_get_state(inputs: Dict[str, Any]) -> Dict[str, Any]: """Get complete canvas state""" page = None try: page = await get_page() result = await execute_api_call(page, "getCanvasState") await page.close() return result except Exception as e: if page: await page.close() return {"success": False, "error": str(e)} async def canvas_set_size(inputs: Dict[str, Any]) -> Dict[str, Any]: """Set canvas size""" page = None try: size = inputs.get("size", "1200x675") page = await get_page() result = await execute_api_call(page, "setCanvasSize", size) await page.close() return result except Exception as e: if page: await page.close() return {"success": False, "error": str(e)} async def canvas_set_bg_color(inputs: Dict[str, Any]) -> Dict[str, Any]: """Set background color""" page = None try: color = inputs.get("color", "seriousLight") page = await get_page() result = await execute_api_call(page, "setBgColor", color) await page.close() return result except Exception as e: if page: await page.close() return {"success": False, "error": str(e)} async def canvas_clear(inputs: Dict[str, Any]) -> Dict[str, Any]: """Clear all objects from canvas""" page = None try: page = await get_page() result = await execute_api_call(page, "clearCanvas") await page.close() return result except Exception as e: if page: await page.close() return {"success": False, "error": str(e)} async def canvas_export(inputs: Dict[str, Any]) -> Dict[str, Any]: """Export canvas as image""" page = None try: format_type = inputs.get("format", "png") page = await get_page() result = await execute_api_call(page, "exportCanvas", format_type) await page.close() return result except Exception as e: if page: await page.close() return {"success": False, "error": str(e)} async def layout_list(inputs: Dict[str, Any]) -> Dict[str, Any]: """List all available layouts""" page = None try: page = await get_page() result = await execute_api_call(page, "listLayouts") await page.close() return result except Exception as e: if page: await page.close() return {"success": False, "error": str(e)} async def layout_load(inputs: Dict[str, Any]) -> Dict[str, Any]: """Load a layout""" page = None try: layout_id = inputs.get("layout_id") options = inputs.get("options", {}) page = await get_page() result = await execute_api_call(page, "loadLayout", layout_id, options) await page.close() return result except Exception as e: if page: await page.close() return {"success": False, "error": str(e)} async def object_add(inputs: Dict[str, Any]) -> Dict[str, Any]: """Add an object to canvas""" page = None try: object_data = inputs.get("object_data", {}) page = await get_page() result = await execute_api_call(page, "addObject", object_data) await page.close() return result except Exception as e: if page: await page.close() return {"success": False, "error": str(e)} async def huggy_add(inputs: Dict[str, Any]) -> Dict[str, Any]: """Add a Huggy mascot to canvas""" page = None try: huggy_id = inputs.get("huggy_id") options = inputs.get("options", {}) page = await get_page() result = await execute_api_call(page, "addHuggy", huggy_id, options) await page.close() return result except Exception as e: if page: await page.close() return {"success": False, "error": str(e)} async def huggy_list(inputs: Dict[str, Any]) -> Dict[str, Any]: """List all available Huggy mascots""" page = None try: options = inputs.get("options", {}) page = await get_page() result = await execute_api_call(page, "listHuggys", options) await page.close() return result except Exception as e: if page: await page.close() return {"success": False, "error": str(e)} async def object_update(inputs: Dict[str, Any]) -> Dict[str, Any]: """Update an object's properties""" page = None try: object_id = inputs.get("object_id") updates = inputs.get("updates", {}) page = await get_page() result = await execute_api_call(page, "updateObject", object_id, updates) await page.close() return result except Exception as e: if page: await page.close() return {"success": False, "error": str(e)} async def object_delete(inputs: Dict[str, Any]) -> Dict[str, Any]: """Delete object(s) from canvas""" page = None try: object_id = inputs.get("object_id") page = await get_page() result = await execute_api_call(page, "deleteObject", object_id) await page.close() return result except Exception as e: if page: await page.close() return {"success": False, "error": str(e)} async def object_list(inputs: Dict[str, Any]) -> Dict[str, Any]: """List all objects on canvas""" page = None try: filter_options = inputs.get("filter", {}) page = await get_page() result = await execute_api_call(page, "listObjects", filter_options) await page.close() return result except Exception as e: if page: await page.close() return {"success": False, "error": str(e)} async def object_move(inputs: Dict[str, Any]) -> Dict[str, Any]: """Move an object""" page = None try: object_id = inputs.get("object_id") x = inputs.get("x") y = inputs.get("y") relative = inputs.get("relative", False) page = await get_page() result = await execute_api_call(page, "moveObject", object_id, x, y, relative) await page.close() return result except Exception as e: if page: await page.close() return {"success": False, "error": str(e)} async def object_resize(inputs: Dict[str, Any]) -> Dict[str, Any]: """Resize an object""" page = None try: object_id = inputs.get("object_id") width = inputs.get("width") height = inputs.get("height") page = await get_page() result = await execute_api_call(page, "resizeObject", object_id, width, height) await page.close() return result except Exception as e: if page: await page.close() return {"success": False, "error": str(e)} async def text_update(inputs: Dict[str, Any]) -> Dict[str, Any]: """Update text content""" page = None try: object_id = inputs.get("object_id") text = inputs.get("text") page = await get_page() result = await execute_api_call(page, "updateText", object_id, text) await page.close() return result except Exception as e: if page: await page.close() return {"success": False, "error": str(e)} async def batch_operations(inputs: Dict[str, Any]) -> Dict[str, Any]: """Execute multiple operations in sequence""" page = None try: operations = inputs.get("operations", []) page = await get_page() result = await execute_api_call(page, "batchUpdate", operations) await page.close() return result except Exception as e: if page: await page.close() return {"success": False, "error": str(e)} async def create_thumbnail(inputs: Dict[str, Any]) -> Dict[str, Any]: """ High-level tool: Create a complete thumbnail with one call This orchestrates multiple API calls to create a thumbnail from scratch """ page = None try: # Extract parameters layout_id = inputs.get("layout_id") title = inputs.get("title") subtitle = inputs.get("subtitle") huggy_id = inputs.get("huggy_id") bg_color = inputs.get("bg_color", "seriousLight") canvas_size = inputs.get("canvas_size", "1200x675") custom_objects = inputs.get("custom_objects", []) page = await get_page() results = [] # Set canvas size if canvas_size: result = await execute_api_call(page, "setCanvasSize", canvas_size) results.append({"step": "set_canvas_size", "result": result}) # Set background color if bg_color: result = await execute_api_call(page, "setBgColor", bg_color) results.append({"step": "set_bg_color", "result": result}) # Load layout if specified if layout_id: result = await execute_api_call(page, "loadLayout", layout_id, {}) results.append({"step": "load_layout", "result": result}) # Wait for React state to update and objects to be available await asyncio.sleep(1) # Update title if specified if title: # Get all objects to find text objects (with retry for React state sync) objects_result = None for attempt in range(3): objects_result = await execute_api_call(page, "listObjects", {}) if objects_result.get("success") and objects_result.get("count", 0) > 0: break await asyncio.sleep(0.5) if objects_result.get("success"): objects = objects_result.get("objects", []) text_objects = [obj for obj in objects if obj.get("type") == "text"] # Find large text objects (titles) and delete them title_objects = [] for obj in text_objects: font_size = obj.get("fontSize", 0) if font_size > 100 or "title" in obj.get("id", "").lower(): title_objects.append(obj) # Delete all old title objects for obj in title_objects: await execute_api_call(page, "deleteObject", obj.get("id")) results.append({"step": "delete_old_title", "objectId": obj.get("id")}) # Add new title text if title_objects: first_title = title_objects[0] # Calculate appropriate width (rough estimate: 0.6 * fontSize * text length) font_size = first_title.get("fontSize", 190) text_width = max(len(title) * font_size * 0.6, 400) text_height = font_size * 1.5 await execute_api_call(page, "addObject", { "type": "text", "text": title, "x": first_title.get("x", 73), "y": first_title.get("y", 65), "width": text_width, "height": text_height, "fontSize": font_size, "fontFamily": first_title.get("fontFamily", "Inter"), "fill": first_title.get("fill", "#000000"), "fontWeight": "bold", "bold": True }) results.append({"step": "add_new_title"}) # Wait for new text to render await asyncio.sleep(0.5) # Update subtitle if specified if subtitle: # Get all objects to find subtitle text (with retry) objects_result = None for attempt in range(3): objects_result = await execute_api_call(page, "listObjects", {}) if objects_result.get("success") and objects_result.get("count", 0) > 0: break await asyncio.sleep(0.5) if objects_result.get("success"): objects = objects_result.get("objects", []) text_objects = [obj for obj in objects if obj.get("type") == "text"] # Find subtitle objects (smaller text with "subtitle" in text/id) subtitle_objects = [] for obj in text_objects: obj_text = obj.get("text", "").lower() font_size = obj.get("fontSize", 0) obj_id = obj.get("id", "").lower() if "subtitle" in obj_text or "subtitle" in obj_id or "description" in obj_id: subtitle_objects.append(obj) # Delete old subtitle objects for obj in subtitle_objects: await execute_api_call(page, "deleteObject", obj.get("id")) results.append({"step": "delete_old_subtitle", "objectId": obj.get("id")}) # Add new subtitle text if subtitle_objects: first_subtitle = subtitle_objects[0] # Calculate appropriate width for subtitle font_size = first_subtitle.get("fontSize", 68) text_width = max(len(subtitle) * font_size * 0.6, 300) text_height = font_size * 1.5 await execute_api_call(page, "addObject", { "type": "text", "text": subtitle, "x": first_subtitle.get("x", 84), "y": first_subtitle.get("y", 428), "width": text_width, "height": text_height, "fontSize": font_size, "fontFamily": first_subtitle.get("fontFamily", "Inter"), "fill": first_subtitle.get("fill", "#FFFFFF"), "hasBackground": first_subtitle.get("hasBackground", True), "backgroundColor": first_subtitle.get("backgroundColor", "#000000") }) results.append({"step": "add_new_subtitle"}) # Wait for new text to render await asyncio.sleep(0.5) # Add Huggy if specified if huggy_id: result = await execute_api_call(page, "addHuggy", huggy_id, {}) results.append({"step": "add_huggy", "result": result}) # Add custom objects for obj in custom_objects: result = await execute_api_call(page, "addObject", obj) results.append({"step": "add_custom_object", "result": result}) # Wait for rendering to complete (longer wait after adding new objects) await asyncio.sleep(2) # Deselect all objects before export (removes selection handles) await execute_api_call(page, "deselectAll") await asyncio.sleep(0.5) # Export the final thumbnail export_result = await execute_api_call(page, "exportCanvas", "png") results.append({"step": "export", "result": export_result}) await page.close() return { "success": export_result.get("success", False), "image": export_result.get("dataUrl"), "width": export_result.get("width"), "height": export_result.get("height"), "steps": results } except Exception as e: if page: await page.close() return {"success": False, "error": str(e)} # =================================================================== # Tool Registry # =================================================================== TOOLS = { # Canvas Management "canvas_get_state": canvas_get_state, "canvas_set_size": canvas_set_size, "canvas_set_bg_color": canvas_set_bg_color, "canvas_clear": canvas_clear, "canvas_export": canvas_export, # Layout Management "layout_list": layout_list, "layout_load": layout_load, # Object Management "object_add": object_add, "object_update": object_update, "object_delete": object_delete, "object_list": object_list, "object_move": object_move, "object_resize": object_resize, # Huggy Management "huggy_add": huggy_add, "huggy_list": huggy_list, # Text Operations "text_update": text_update, # Batch & High-level "batch_operations": batch_operations, "create_thumbnail": create_thumbnail, } # =================================================================== # MCP Protocol Implementation # =================================================================== async def mcp_response(request: Request) -> AsyncGenerator[str, None]: """MCP server entry point""" try: payload = await request.json() tool_name = payload.get("name") arguments = payload.get("arguments", {}) # Route to appropriate tool if tool_name in TOOLS: result = await TOOLS[tool_name](arguments) else: result = {"success": False, "error": f"Unknown tool: {tool_name}"} # Stream response yield json.dumps({"output": True, "data": result}) + "\n" yield json.dumps({"output": False}) + "\n" except Exception as e: error_response = { "output": True, "data": { "success": False, "error": str(e) } } yield json.dumps(error_response) + "\n" yield json.dumps({"output": False}) + "\n" @app.post("/tools") async def tools_endpoint(request: Request): """MCP tools endpoint""" return StreamingResponse( mcp_response(request), media_type="application/json" ) @app.get("/api/info") async def api_info(): return { "name": "Thumbnail Crafter MCP Server - Comprehensive", "version": "4.0.0", "mode": "full_api_coverage", "features": [ "Complete canvas control", "All 50+ API operations exposed", "Layout library access", "44+ Huggy mascots", "Text manipulation", "Batch operations", "High-level thumbnail creation" ], "available_tools": list(TOOLS.keys()), "tool_count": len(TOOLS) } @app.get("/health") async def health_check(): return {"status": "healthy", "service": "thumbnail-crafter-comprehensive"} @app.options("/") async def options_root(): """Handle OPTIONS for CORS""" return {} @app.post("/") async def root_mcp_handler(request: Request): """Handle MCP requests at root - redirect to /mcp handler""" # Forward to the MCP POST handler return await mcp_post(request) # =================================================================== # MCP Protocol Endpoints (for HuggingChat and other MCP clients) # =================================================================== @app.get("/mcp") async def mcp_endpoint(): """MCP endpoint for SSE transport""" async def event_generator(): # Send initial connection message yield f"data: {json.dumps({'jsonrpc': '2.0', 'method': 'initialized'})}\n\n" # Keep connection alive while True: await asyncio.sleep(30) yield f": keepalive\n\n" return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", } ) @app.post("/mcp") async def mcp_post(request: Request): """Handle MCP protocol messages via POST""" try: body = await request.json() method = body.get("method") params = body.get("params", {}) request_id = body.get("id") # Handle initialize request if method == "initialize": return { "jsonrpc": "2.0", "id": request_id, "result": { "protocolVersion": "2024-11-05", "capabilities": { "tools": {} }, "serverInfo": { "name": "thumbnail-crafter", "version": "4.0.0" } } } # Handle tools/list request elif method == "tools/list": tools_list = [] for tool_name, tool_func in TOOLS.items(): tool_def = { "name": tool_name, "description": tool_func.__doc__ or f"Tool: {tool_name}", "inputSchema": { "type": "object", "properties": {}, "required": [] } } tools_list.append(tool_def) return { "jsonrpc": "2.0", "id": request_id, "result": { "tools": tools_list } } # Handle tools/call request elif method == "tools/call": tool_name = params.get("name") arguments = params.get("arguments", {}) if tool_name not in TOOLS: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32601, "message": f"Tool not found: {tool_name}" } } # Execute the tool result = await TOOLS[tool_name](arguments) # Check if result contains an image content_items = [] if result.get("success") and result.get("image"): # Add image as image content image_data = result["image"] # Extract base64 data (remove data:image/png;base64, prefix if present) if image_data.startswith("data:"): image_data = image_data.split(",", 1)[1] content_items.append({ "type": "image", "data": image_data, "mimeType": "image/png" }) # Add text description description = f"Successfully created thumbnail ({result.get('width')}x{result.get('height')}px)" if result.get("steps"): description += f"\n\nSteps completed:\n" for step in result["steps"]: step_name = step.get("step", "").replace("_", " ").title() description += f"- {step_name}\n" content_items.append({ "type": "text", "text": description }) elif result.get("success") and result.get("dataUrl"): # Handle canvas_export result format image_data = result["dataUrl"] # Extract base64 data if image_data.startswith("data:"): image_data = image_data.split(",", 1)[1] content_items.append({ "type": "image", "data": image_data, "mimeType": "image/png" }) content_items.append({ "type": "text", "text": f"Canvas exported successfully ({result.get('width')}x{result.get('height')}px)" }) else: # Regular text response for non-image results content_items.append({ "type": "text", "text": json.dumps(result, indent=2) }) return { "jsonrpc": "2.0", "id": request_id, "result": { "content": content_items } } # Handle other methods else: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32601, "message": f"Method not found: {method}" } } except Exception as e: return { "jsonrpc": "2.0", "id": request_id if 'request_id' in locals() else None, "error": { "code": -32603, "message": f"Internal error: {str(e)}" } } # =================================================================== # Lifecycle # =================================================================== @app.on_event("startup") async def startup_event(): print("=" * 60) print("Thumbnail Crafter MCP Server - Comprehensive Mode") print("=" * 60) print("Initializing browser...") await get_browser() print("✓ Browser ready") print(f"✓ App URL: {APP_URL}") print(f"✓ {len(TOOLS)} tools available") print("✓ Full API coverage enabled") print("=" * 60) @app.on_event("shutdown") async def shutdown_event(): print("Shutting down...") await close_browser() # =================================================================== # Static Files # =================================================================== static_dir = Path("dist") if static_dir.exists(): app.mount("/assets", StaticFiles(directory=static_dir / "assets"), name="assets") @app.get("/") async def serve_frontend(): index_file = static_dir / "index.html" if index_file.exists(): return FileResponse(index_file) return {"message": "Frontend not built"} @app.get("/{full_path:path}") async def serve_spa(full_path: str): if full_path.startswith(("api/", "tools", "health", "mcp")): return {"error": "Not found"} file_path = static_dir / full_path if file_path.exists() and file_path.is_file(): return FileResponse(file_path) index_file = static_dir / "index.html" if index_file.exists(): return FileResponse(index_file) return {"error": "File not found"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)