|
|
""" |
|
|
Session Service - Chat session management |
|
|
""" |
|
|
import json |
|
|
import uuid |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Optional |
|
|
|
|
|
from core.config import SESSIONS_DIR |
|
|
from core.state import get_state |
|
|
from core.logger import logger |
|
|
|
|
|
|
|
|
class SessionService: |
|
|
""" |
|
|
Manages chat sessions. |
|
|
Sessions are stored in state + individual files for messages. |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self._state = get_state() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_session( |
|
|
self, |
|
|
title: str = "", |
|
|
session_type: str = "chat", |
|
|
system_prompt: str = "" |
|
|
) -> Dict: |
|
|
"""Create a new chat session""" |
|
|
session_id = str(uuid.uuid4())[:8] |
|
|
|
|
|
if not title: |
|
|
now = datetime.now() |
|
|
prefix = "Chat" if session_type == "chat" else "Roleplay" |
|
|
title = f"{prefix} {now.strftime('%m/%d %H:%M')}" |
|
|
|
|
|
session = { |
|
|
"id": session_id, |
|
|
"title": title, |
|
|
"type": session_type, |
|
|
"system_prompt": system_prompt, |
|
|
"created_at": datetime.now().isoformat(), |
|
|
"updated_at": datetime.now().isoformat(), |
|
|
"message_count": 0, |
|
|
"model_id": self._state.get_loaded_model_id() |
|
|
} |
|
|
|
|
|
|
|
|
self._state.add_session(session) |
|
|
|
|
|
|
|
|
self._save_messages(session_id, []) |
|
|
|
|
|
logger.event("Sessions", f"Created: {session_id}", {"title": title}) |
|
|
return session |
|
|
|
|
|
def get_session(self, session_id: str) -> Optional[Dict]: |
|
|
"""Get session by ID with messages""" |
|
|
sessions = self._state.get_sessions() |
|
|
for s in sessions: |
|
|
if s["id"] == session_id: |
|
|
session = s.copy() |
|
|
session["messages"] = self._load_messages(session_id) |
|
|
return session |
|
|
return None |
|
|
|
|
|
def get_all_sessions(self) -> List[Dict]: |
|
|
"""Get all sessions (without messages)""" |
|
|
return self._state.get_sessions() |
|
|
|
|
|
def update_session(self, session_id: str, updates: Dict) -> bool: |
|
|
"""Update session metadata""" |
|
|
return self._state.update_session(session_id, updates) |
|
|
|
|
|
def delete_session(self, session_id: str) -> bool: |
|
|
"""Delete session and its messages""" |
|
|
|
|
|
msg_file = SESSIONS_DIR / f"{session_id}.json" |
|
|
if msg_file.exists(): |
|
|
msg_file.unlink() |
|
|
|
|
|
|
|
|
result = self._state.delete_session(session_id) |
|
|
if result: |
|
|
logger.event("Sessions", f"Deleted: {session_id}") |
|
|
return result |
|
|
|
|
|
def rename_session(self, session_id: str, new_title: str) -> bool: |
|
|
"""Rename a session""" |
|
|
return self._state.update_session(session_id, {"title": new_title}) |
|
|
|
|
|
def clear_session(self, session_id: str) -> bool: |
|
|
"""Clear all messages from a session""" |
|
|
self._save_messages(session_id, []) |
|
|
self._state.update_session(session_id, {"message_count": 0}) |
|
|
return True |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def add_message(self, session_id: str, role: str, content: str) -> bool: |
|
|
"""Add a message to session""" |
|
|
messages = self._load_messages(session_id) |
|
|
|
|
|
message = { |
|
|
"role": role, |
|
|
"content": content, |
|
|
"timestamp": datetime.now().isoformat() |
|
|
} |
|
|
messages.append(message) |
|
|
|
|
|
self._save_messages(session_id, messages) |
|
|
|
|
|
|
|
|
self._state.update_session(session_id, { |
|
|
"message_count": len(messages) |
|
|
}) |
|
|
|
|
|
|
|
|
if role == "user" and len(messages) == 1: |
|
|
title = content[:40] + "..." if len(content) > 40 else content |
|
|
self._state.update_session(session_id, {"title": title}) |
|
|
|
|
|
return True |
|
|
|
|
|
def get_messages(self, session_id: str) -> List[Dict]: |
|
|
"""Get all messages for a session""" |
|
|
return self._load_messages(session_id) |
|
|
|
|
|
def _load_messages(self, session_id: str) -> List[Dict]: |
|
|
"""Load messages from file""" |
|
|
msg_file = SESSIONS_DIR / f"{session_id}.json" |
|
|
if msg_file.exists(): |
|
|
try: |
|
|
return json.loads(msg_file.read_text()) |
|
|
except: |
|
|
pass |
|
|
return [] |
|
|
|
|
|
def _save_messages(self, session_id: str, messages: List[Dict]): |
|
|
"""Save messages to file""" |
|
|
msg_file = SESSIONS_DIR / f"{session_id}.json" |
|
|
msg_file.write_text(json.dumps(messages, indent=2)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_active_session(self) -> Optional[Dict]: |
|
|
"""Get the currently active session""" |
|
|
session_id = self._state.get_active_session_id() |
|
|
if session_id: |
|
|
return self.get_session(session_id) |
|
|
return None |
|
|
|
|
|
def set_active_session(self, session_id: str): |
|
|
"""Set active session""" |
|
|
self._state.set_active_session(session_id) |
|
|
|
|
|
def get_session_for_display(self, session_id: str) -> Optional[Dict]: |
|
|
"""Get session formatted for UI display""" |
|
|
session = self.get_session(session_id) |
|
|
if not session: |
|
|
return None |
|
|
|
|
|
|
|
|
history = [] |
|
|
user_msg = None |
|
|
for msg in session.get("messages", []): |
|
|
if msg["role"] == "user": |
|
|
user_msg = msg["content"] |
|
|
elif msg["role"] == "assistant" and user_msg: |
|
|
history.append((user_msg, msg["content"])) |
|
|
user_msg = None |
|
|
if user_msg: |
|
|
history.append((user_msg, None)) |
|
|
|
|
|
return { |
|
|
"id": session["id"], |
|
|
"title": session["title"], |
|
|
"type": session["type"], |
|
|
"system_prompt": session["system_prompt"], |
|
|
"history": history, |
|
|
"message_count": session["message_count"] |
|
|
} |
|
|
|
|
|
def group_sessions_by_date(self) -> Dict[str, List[Dict]]: |
|
|
"""Group sessions by date for sidebar display""" |
|
|
sessions = self.get_all_sessions() |
|
|
today = datetime.now().date() |
|
|
|
|
|
groups = { |
|
|
"Today": [], |
|
|
"Yesterday": [], |
|
|
"This Week": [], |
|
|
"Older": [] |
|
|
} |
|
|
|
|
|
for s in sessions: |
|
|
try: |
|
|
updated = datetime.fromisoformat(s["updated_at"]).date() |
|
|
diff = (today - updated).days |
|
|
|
|
|
if diff == 0: |
|
|
groups["Today"].append(s) |
|
|
elif diff == 1: |
|
|
groups["Yesterday"].append(s) |
|
|
elif diff < 7: |
|
|
groups["This Week"].append(s) |
|
|
else: |
|
|
groups["Older"].append(s) |
|
|
except: |
|
|
groups["Older"].append(s) |
|
|
|
|
|
|
|
|
return {k: v for k, v in groups.items() if v} |
|
|
|
|
|
|
|
|
|
|
|
_session_service: Optional[SessionService] = None |
|
|
|
|
|
|
|
|
def get_session_service() -> SessionService: |
|
|
"""Get singleton session service""" |
|
|
global _session_service |
|
|
if _session_service is None: |
|
|
_session_service = SessionService() |
|
|
return _session_service |
|
|
|