File size: 8,405 Bytes
e406afb
 
 
 
 
 
 
 
 
de7d69a
 
 
e406afb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
"""
Session Service - Chat session management
"""
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional

from core.config import SESSIONS_DIR
from core.state import get_state
from core.logger import logger


class SessionService:
    """
    Manages chat sessions.
    Sessions are stored in state + individual files for messages.
    """

    def __init__(self):
        self._state = get_state()

    # ══════════════════════════════════════════════════════════════════
    # SESSION CRUD
    # ══════════════════════════════════════════════════════════════════

    def create_session(
        self,
        title: str = "",
        session_type: str = "chat",
        system_prompt: str = ""
    ) -> Dict:
        """Create a new chat session"""
        session_id = str(uuid.uuid4())[:8]

        if not title:
            now = datetime.now()
            prefix = "Chat" if session_type == "chat" else "Roleplay"
            title = f"{prefix} {now.strftime('%m/%d %H:%M')}"

        session = {
            "id": session_id,
            "title": title,
            "type": session_type,
            "system_prompt": system_prompt,
            "created_at": datetime.now().isoformat(),
            "updated_at": datetime.now().isoformat(),
            "message_count": 0,
            "model_id": self._state.get_loaded_model_id()
        }

        # Save session metadata to state
        self._state.add_session(session)

        # Create empty messages file
        self._save_messages(session_id, [])

        logger.event("Sessions", f"Created: {session_id}", {"title": title})
        return session

    def get_session(self, session_id: str) -> Optional[Dict]:
        """Get session by ID with messages"""
        sessions = self._state.get_sessions()
        for s in sessions:
            if s["id"] == session_id:
                session = s.copy()
                session["messages"] = self._load_messages(session_id)
                return session
        return None

    def get_all_sessions(self) -> List[Dict]:
        """Get all sessions (without messages)"""
        return self._state.get_sessions()

    def update_session(self, session_id: str, updates: Dict) -> bool:
        """Update session metadata"""
        return self._state.update_session(session_id, updates)

    def delete_session(self, session_id: str) -> bool:
        """Delete session and its messages"""
        # Delete messages file
        msg_file = SESSIONS_DIR / f"{session_id}.json"
        if msg_file.exists():
            msg_file.unlink()

        # Remove from state
        result = self._state.delete_session(session_id)
        if result:
            logger.event("Sessions", f"Deleted: {session_id}")
        return result

    def rename_session(self, session_id: str, new_title: str) -> bool:
        """Rename a session"""
        return self._state.update_session(session_id, {"title": new_title})

    def clear_session(self, session_id: str) -> bool:
        """Clear all messages from a session"""
        self._save_messages(session_id, [])
        self._state.update_session(session_id, {"message_count": 0})
        return True

    # ══════════════════════════════════════════════════════════════════
    # MESSAGE HANDLING
    # ══════════════════════════════════════════════════════════════════

    def add_message(self, session_id: str, role: str, content: str) -> bool:
        """Add a message to session"""
        messages = self._load_messages(session_id)

        message = {
            "role": role,
            "content": content,
            "timestamp": datetime.now().isoformat()
        }
        messages.append(message)

        self._save_messages(session_id, messages)

        # Update session metadata
        self._state.update_session(session_id, {
            "message_count": len(messages)
        })

        # Auto-generate title from first user message
        if role == "user" and len(messages) == 1:
            title = content[:40] + "..." if len(content) > 40 else content
            self._state.update_session(session_id, {"title": title})

        return True

    def get_messages(self, session_id: str) -> List[Dict]:
        """Get all messages for a session"""
        return self._load_messages(session_id)

    def _load_messages(self, session_id: str) -> List[Dict]:
        """Load messages from file"""
        msg_file = SESSIONS_DIR / f"{session_id}.json"
        if msg_file.exists():
            try:
                return json.loads(msg_file.read_text())
            except:
                pass
        return []

    def _save_messages(self, session_id: str, messages: List[Dict]):
        """Save messages to file"""
        msg_file = SESSIONS_DIR / f"{session_id}.json"
        msg_file.write_text(json.dumps(messages, indent=2))

    # ══════════════════════════════════════════════════════════════════
    # UTILITY
    # ══════════════════════════════════════════════════════════════════

    def get_active_session(self) -> Optional[Dict]:
        """Get the currently active session"""
        session_id = self._state.get_active_session_id()
        if session_id:
            return self.get_session(session_id)
        return None

    def set_active_session(self, session_id: str):
        """Set active session"""
        self._state.set_active_session(session_id)

    def get_session_for_display(self, session_id: str) -> Optional[Dict]:
        """Get session formatted for UI display"""
        session = self.get_session(session_id)
        if not session:
            return None

        # Format messages for Gradio chatbot (tuple format)
        history = []
        user_msg = None
        for msg in session.get("messages", []):
            if msg["role"] == "user":
                user_msg = msg["content"]
            elif msg["role"] == "assistant" and user_msg:
                history.append((user_msg, msg["content"]))
                user_msg = None
        if user_msg:
            history.append((user_msg, None))

        return {
            "id": session["id"],
            "title": session["title"],
            "type": session["type"],
            "system_prompt": session["system_prompt"],
            "history": history,
            "message_count": session["message_count"]
        }

    def group_sessions_by_date(self) -> Dict[str, List[Dict]]:
        """Group sessions by date for sidebar display"""
        sessions = self.get_all_sessions()
        today = datetime.now().date()

        groups = {
            "Today": [],
            "Yesterday": [],
            "This Week": [],
            "Older": []
        }

        for s in sessions:
            try:
                updated = datetime.fromisoformat(s["updated_at"]).date()
                diff = (today - updated).days

                if diff == 0:
                    groups["Today"].append(s)
                elif diff == 1:
                    groups["Yesterday"].append(s)
                elif diff < 7:
                    groups["This Week"].append(s)
                else:
                    groups["Older"].append(s)
            except:
                groups["Older"].append(s)

        # Remove empty groups
        return {k: v for k, v in groups.items() if v}


# Singleton
_session_service: Optional[SessionService] = None


def get_session_service() -> SessionService:
    """Get singleton session service"""
    global _session_service
    if _session_service is None:
        _session_service = SessionService()
    return _session_service