textai-v2 / ui /models.py
rbt2025's picture
Deploy TextAI v2 - Clean architecture
72abdff verified
"""
Model Manager UI Component
Unified model management with HF search modal
"""
import gradio as gr
from typing import List, Dict, Optional
from core.models import get_model_service
from core.state import get_state
from core.logger import logger
from core.config import RECOMMENDED_QUANTS
def build_model_manager_ui():
"""Build the model manager interface with HF search modal"""
# ══════════════════════════════════════════════════════════════════
# HELPER FUNCTIONS
# ══════════════════════════════════════════════════════════════════
def get_installed_models_table() -> List[List]:
"""Get installed models for table display"""
service = get_model_service()
models = service.get_installed_models()
loaded_id = get_state().get_loaded_model_id()
rows = []
for m in models:
status = "● Loaded" if m["id"] == loaded_id else "Ready"
size_mb = m.get("size_bytes", 0) / (1024 * 1024)
rows.append([
m["id"],
m["name"],
m.get("model_type", "gguf").upper(),
f"{size_mb:.1f} MB",
m.get("quant", "-"),
status
])
return rows
def get_loaded_model_display() -> str:
"""Get currently loaded model name"""
service = get_model_service()
model = service.get_loaded_model()
return f"● {model['name']}" if model else "No model loaded"
# ══════════════════════════════════════════════════════════════════
# EVENT HANDLERS - INSTALLED MODELS
# ══════════════════════════════════════════════════════════════════
def on_select_model(evt: gr.SelectData, table_data) -> str:
"""Select model from table"""
try:
row_idx = evt.index[0] if isinstance(evt.index, (list, tuple)) else evt.index
# Handle both DataFrame and list
if hasattr(table_data, 'values'):
data = table_data.values.tolist()
else:
data = table_data if table_data else []
if data and row_idx < len(data):
model_id = data[row_idx][0]
logger.info("ModelMgr", f"Selected: {model_id}")
return model_id
except Exception as e:
logger.error("ModelMgr", f"Select error: {e}")
return ""
def on_load_model(model_id: str):
"""Load selected model"""
if not model_id:
return get_installed_models_table(), get_loaded_model_display(), "Select a model first"
service = get_model_service()
result = service.load_model(model_id)
if result["success"]:
return get_installed_models_table(), get_loaded_model_display(), f"βœ“ Loaded: {result.get('name', model_id)}"
else:
return get_installed_models_table(), get_loaded_model_display(), f"βœ— Error: {result.get('error')}"
def on_unload_model():
"""Unload current model"""
service = get_model_service()
service.unload_model()
return get_installed_models_table(), get_loaded_model_display(), "Model unloaded"
def on_delete_model(model_id: str):
"""Delete selected model"""
if not model_id:
return get_installed_models_table(), "", "Select a model first"
service = get_model_service()
result = service.delete_model(model_id)
if result["success"]:
return get_installed_models_table(), "", f"βœ“ {result.get('message')}"
else:
return get_installed_models_table(), model_id, f"βœ— Error: {result.get('error')}"
def on_refresh():
"""Refresh models table"""
return get_installed_models_table(), get_loaded_model_display()
# ══════════════════════════════════════════════════════════════════
# EVENT HANDLERS - HF SEARCH MODAL
# ══════════════════════════════════════════════════════════════════
def on_search_hf(query: str, max_params: str):
"""Search HuggingFace for models"""
logger.info("ModelMgr", f"HF Search: {query}, max_params={max_params}")
service = get_model_service()
max_p = float(max_params) if max_params != "any" else 100.0
results, status = service.search_hf_models(query, max_p, limit=15)
# Format for table
rows = []
for r in results:
params = f"{r['params_b']}B" if r.get('params_b') else "?"
size = f"~{r['est_size_gb']}GB" if r.get('est_size_gb') else "?"
compat = r.get('compatibility', {}).get('label', '?')
downloads = f"{r.get('downloads', 0):,}"
installed = "βœ“" if r.get('is_installed') else ""
rows.append([r['id'], params, size, compat, downloads, installed])
return rows, status
def on_select_hf_model(evt: gr.SelectData, table_data) -> tuple:
"""Select model from HF search results"""
try:
row_idx = evt.index[0] if isinstance(evt.index, (list, tuple)) else evt.index
# Handle both DataFrame and list
if hasattr(table_data, 'values'):
# It's a DataFrame
data = table_data.values.tolist()
else:
data = table_data if table_data else []
if data and row_idx < len(data):
repo_id = data[row_idx][0]
logger.info("ModelMgr", f"Selected HF model: {repo_id}")
return repo_id, gr.update(visible=True)
except Exception as e:
logger.error("ModelMgr", f"HF select error: {e}")
return "", gr.update(visible=False)
def on_get_files(repo_id: str):
"""Get GGUF files for selected model"""
if not repo_id:
return [], "Select a model first"
logger.info("ModelMgr", f"Getting files for: {repo_id}")
service = get_model_service()
files = service.get_hf_model_files(repo_id)
if not files:
return [], "No GGUF files found"
# Format for display with radio selection
rows = []
for f in files:
rec = "β˜… Recommended" if f["recommended"] else ""
installed = "βœ“ Installed" if f["is_installed"] else ""
rows.append([f["filename"], f["quant"], rec, installed])
return rows, f"Found {len(files)} files"
def on_select_file(evt: gr.SelectData, table_data) -> str:
"""Select file from files list"""
try:
row_idx = evt.index[0] if isinstance(evt.index, (list, tuple)) else evt.index
# Handle both DataFrame and list
if hasattr(table_data, 'values'):
data = table_data.values.tolist()
else:
data = table_data if table_data else []
if data and row_idx < len(data):
filename = data[row_idx][0]
return filename
except Exception as e:
logger.error("ModelMgr", f"File select error: {e}")
return ""
def on_download_model(repo_id: str, filename: str):
"""Download selected model"""
if not repo_id or not filename:
return "Select a model and file first", get_installed_models_table()
logger.info("ModelMgr", f"Downloading: {repo_id}/{filename}")
service = get_model_service()
result = service.download_model(repo_id, filename)
if result["success"]:
return f"βœ“ {result.get('message')}", get_installed_models_table()
elif result.get("duplicate"):
return f"⚠️ {result.get('error')} - Choose a different quantization.", get_installed_models_table()
else:
return f"βœ— Error: {result.get('error')}", get_installed_models_table()
def on_auto_download(repo_id: str):
"""Auto-download best quantization"""
if not repo_id:
return "Select a model first", get_installed_models_table()
service = get_model_service()
files = service.get_hf_model_files(repo_id)
if not files:
return "No GGUF files found", get_installed_models_table()
# Find best file (Q4_K_M preferred)
best_file = None
for quant in RECOMMENDED_QUANTS + ["Q4_0", "Q5_0"]:
for f in files:
if f["quant"] == quant and not f["is_installed"]:
best_file = f["filename"]
break
if best_file:
break
if not best_file:
# Try first non-installed file
for f in files:
if not f["is_installed"]:
best_file = f["filename"]
break
if not best_file:
return "⚠️ All quantizations already installed", get_installed_models_table()
return on_download_model(repo_id, best_file)
# ══════════════════════════════════════════════════════════════════
# BUILD UI
# ══════════════════════════════════════════════════════════════════
with gr.Column(elem_classes="model-manager"):
gr.Markdown("# Model Manager")
# Current Model Status
with gr.Row():
loaded_model_display = gr.Textbox(
value=get_loaded_model_display(),
label="Currently Loaded",
interactive=False,
scale=3
)
btn_unload = gr.Button("Unload", size="sm", variant="stop")
btn_refresh = gr.Button("πŸ”„ Refresh", size="sm")
gr.Markdown("---")
# ─────────────────────────────────────────────────────────────
# INSTALLED MODELS TABLE
# ─────────────────────────────────────────────────────────────
gr.Markdown("### Installed Models")
gr.Markdown("*Click a row to select, then use actions below*")
installed_table = gr.Dataframe(
headers=["ID", "Name", "Type", "Size", "Quant", "Status"],
value=get_installed_models_table(),
interactive=False,
row_count=6,
elem_classes="model-table"
)
selected_model_id = gr.Textbox(
label="Selected",
interactive=False,
visible=True
)
with gr.Row():
btn_load = gr.Button("β–Ά Load Selected", variant="primary")
btn_delete = gr.Button("πŸ—‘οΈ Delete Selected", variant="stop")
btn_configure = gr.Button("βš™οΈ Configure")
action_status = gr.Textbox(
label="",
show_label=False,
interactive=False
)
gr.Markdown("---")
# ─────────────────────────────────────────────────────────────
# ADD FROM HUGGINGFACE
# ─────────────────────────────────────────────────────────────
gr.Markdown("### Add from HuggingFace")
with gr.Row():
hf_search_input = gr.Textbox(
placeholder="Search models (tinyllama, phi, mistral...)",
show_label=False,
scale=4
)
hf_max_params = gr.Dropdown(
choices=[("< 3B (Fast)", "3"), ("< 7B (OK)", "7"), ("Any", "any")],
value="7",
label="Size",
scale=1
)
btn_search = gr.Button("πŸ” Search", variant="primary")
hf_status = gr.Textbox(
label="",
show_label=False,
interactive=False
)
# Search Results
hf_results_table = gr.Dataframe(
headers=["Model ID", "Params", "Est. Size", "Compat", "Downloads", "Installed"],
value=[],
interactive=False,
row_count=8,
elem_classes="model-table"
)
selected_hf_repo = gr.Textbox(
label="Selected Model",
interactive=False
)
# File Selection Panel (shown after selecting a model)
with gr.Column(visible=False) as file_panel:
gr.Markdown("#### Select Quantization")
files_table = gr.Dataframe(
headers=["Filename", "Quant", "Recommended", "Status"],
value=[],
interactive=False,
row_count=6
)
selected_file = gr.Textbox(
label="Selected File",
interactive=False
)
with gr.Row():
btn_download = gr.Button("⬇️ Download Selected", variant="primary")
btn_auto_download = gr.Button("⚑ Auto Download (Best Q4)")
btn_close_files = gr.Button("Close")
download_status = gr.Textbox(
label="",
show_label=False,
interactive=False
)
gr.Markdown("---")
gr.Markdown("**Legend:** βœ… Best (<1.5B) | βœ… Good (<3B) | ⚠️ OK (<7B) | ❌ Too Large (>7B)")
# ══════════════════════════════════════════════════════════════════
# WIRE UP EVENTS
# ══════════════════════════════════════════════════════════════════
# Installed models table
installed_table.select(
on_select_model,
inputs=[installed_table],
outputs=[selected_model_id]
)
btn_load.click(
on_load_model,
inputs=[selected_model_id],
outputs=[installed_table, loaded_model_display, action_status]
)
btn_unload.click(
on_unload_model,
outputs=[installed_table, loaded_model_display, action_status]
)
btn_delete.click(
on_delete_model,
inputs=[selected_model_id],
outputs=[installed_table, selected_model_id, action_status]
)
btn_refresh.click(
on_refresh,
outputs=[installed_table, loaded_model_display]
)
# HF Search
btn_search.click(
on_search_hf,
inputs=[hf_search_input, hf_max_params],
outputs=[hf_results_table, hf_status]
)
hf_search_input.submit(
on_search_hf,
inputs=[hf_search_input, hf_max_params],
outputs=[hf_results_table, hf_status]
)
# HF model selection
hf_results_table.select(
on_select_hf_model,
inputs=[hf_results_table],
outputs=[selected_hf_repo, file_panel]
)
# When HF model selected, load its files
selected_hf_repo.change(
on_get_files,
inputs=[selected_hf_repo],
outputs=[files_table, download_status]
)
# File selection
files_table.select(
on_select_file,
inputs=[files_table],
outputs=[selected_file]
)
# Download
btn_download.click(
on_download_model,
inputs=[selected_hf_repo, selected_file],
outputs=[download_status, installed_table]
)
btn_auto_download.click(
on_auto_download,
inputs=[selected_hf_repo],
outputs=[download_status, installed_table]
)
btn_close_files.click(
lambda: gr.update(visible=False),
outputs=[file_panel]
)
# Return components for external access
return {
"installed_table": installed_table,
"loaded_display": loaded_model_display,
"selected_id": selected_model_id,
"refresh": on_refresh
}