Skip to content

Sessions Module

voice_agent.sessions

Session management for Claude Code SDK.

ChatStoredState dataclass

Stored state for all sessions in a chat.

Attributes:

Name Type Description
active_session str

Name of the currently active session.

sessions dict[str, StoredSession]

Mapping of session names to stored sessions.

Source code in src/voice_agent/sessions/storage.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@dataclass
class ChatStoredState:
    """Stored state for all sessions in a chat.

    Attributes:
        active_session: Name of the currently active session.
        sessions: Mapping of session names to stored sessions.
    """

    active_session: str
    sessions: dict[str, StoredSession] = field(default_factory=dict)

    def to_dict(self) -> dict[str, Any]:
        """Convert to dictionary for JSON serialization."""
        return {
            "active_session": self.active_session,
            "sessions": {name: s.to_dict() for name, s in self.sessions.items()},
        }

    @classmethod
    def from_dict(cls, chat_id: int, data: dict[str, Any]) -> "ChatStoredState":
        """Create from dictionary.

        Args:
            chat_id: Telegram chat ID (needed for session creation).
            data: Raw dictionary data.
        """
        sessions = {}
        for name, s_data in data.get("sessions", {}).items():
            # Ensure chat_id is set on each session
            s_data["chat_id"] = chat_id
            sessions[name] = StoredSession.from_dict(s_data)
        return cls(
            active_session=data.get("active_session", "main"),
            sessions=sessions,
        )

from_dict(chat_id, data) classmethod

Create from dictionary.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID (needed for session creation).

required
data dict[str, Any]

Raw dictionary data.

required
Source code in src/voice_agent/sessions/storage.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@classmethod
def from_dict(cls, chat_id: int, data: dict[str, Any]) -> "ChatStoredState":
    """Create from dictionary.

    Args:
        chat_id: Telegram chat ID (needed for session creation).
        data: Raw dictionary data.
    """
    sessions = {}
    for name, s_data in data.get("sessions", {}).items():
        # Ensure chat_id is set on each session
        s_data["chat_id"] = chat_id
        sessions[name] = StoredSession.from_dict(s_data)
    return cls(
        active_session=data.get("active_session", "main"),
        sessions=sessions,
    )

to_dict()

Convert to dictionary for JSON serialization.

Source code in src/voice_agent/sessions/storage.py
61
62
63
64
65
66
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary for JSON serialization."""
    return {
        "active_session": self.active_session,
        "sessions": {name: s.to_dict() for name, s in self.sessions.items()},
    }

ImageAttachment dataclass

An image attachment for sending to Claude.

Attributes:

Name Type Description
data str

Base64-encoded image data.

media_type str

MIME type of the image (e.g. "image/jpeg").

Source code in src/voice_agent/sessions/image.py
 6
 7
 8
 9
10
11
12
13
14
15
16
@dataclass
class ImageAttachment:
    """An image attachment for sending to Claude.

    Attributes:
        data: Base64-encoded image data.
        media_type: MIME type of the image (e.g. "image/jpeg").
    """

    data: str
    media_type: str

PermissionHandler

Handles permission requests for Claude SDK tool calls.

Attributes:

Name Type Description
pending PendingPermission | None

Current pending permission, if any.

timeout

Seconds to wait for user approval.

notify_callback

Async callback to notify user of permission request.

sticky_approvals list[StickyApproval]

List of sticky approval rules for auto-approving.

Source code in src/voice_agent/sessions/permissions.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
class PermissionHandler:
    """Handles permission requests for Claude SDK tool calls.

    Attributes:
        pending: Current pending permission, if any.
        timeout: Seconds to wait for user approval.
        notify_callback: Async callback to notify user of permission request.
        sticky_approvals: List of sticky approval rules for auto-approving.
    """

    def __init__(
        self,
        timeout: int = 300,
        notify_callback: Callable[[str, dict[str, Any]], Coroutine[Any, Any, None]]
        | None = None,
    ) -> None:
        """Initialize the permission handler.

        Args:
            timeout: Seconds to wait for user approval.
            notify_callback: Async function to notify user of permission request.
        """
        self.pending: PendingPermission | None = None
        self.timeout = timeout
        self.notify_callback = notify_callback
        self.sticky_approvals: list[StickyApproval] = []

    def has_pending(self) -> bool:
        """Check if there's a pending permission request.

        Returns:
            True if a permission is pending.
        """
        return (
            self.pending is not None and self.pending.state == PermissionState.PENDING
        )

    def get_pending_description(self) -> str | None:
        """Get a human-readable description of the pending permission.

        Returns:
            Description string or None if no pending permission.
        """
        if not self.pending:
            return None

        tool = self.pending.tool_name
        inputs = self.pending.input_data

        if tool == "Bash":
            return f"Run command: {inputs.get('command', 'unknown')}"
        if tool == "Write":
            return f"Write file: {inputs.get('file_path', 'unknown')}"
        if tool == "Edit":
            return f"Edit file: {inputs.get('file_path', 'unknown')}"

        return f"Use tool: {tool}"

    def _check_sticky_approval(
        self, tool_name: str, input_data: dict[str, Any]
    ) -> bool:
        """Check if a tool call matches any sticky approval.

        Args:
            tool_name: Name of the tool.
            input_data: Input parameters for the tool.

        Returns:
            True if the tool call matches a sticky approval.
        """
        return any(
            approval.matches(tool_name, input_data)
            for approval in self.sticky_approvals
        )

    async def request_permission(
        self, tool_name: str, input_data: dict[str, Any]
    ) -> tuple[bool, str | None]:
        """Request permission for a tool call.

        Auto-approves safe tools and sticky approvals, queues others for user.

        Args:
            tool_name: Name of the tool.
            input_data: Input parameters for the tool.

        Returns:
            Tuple of (approved, deny_message).
        """
        # Auto-approve safe tools
        if is_safe_tool_call(tool_name, input_data):
            return True, None

        # Check sticky approvals
        if self._check_sticky_approval(tool_name, input_data):
            return True, None

        # Create pending permission
        self.pending = PendingPermission(tool_name=tool_name, input_data=input_data)

        # Notify user if callback provided
        if self.notify_callback:
            await self.notify_callback(tool_name, input_data)

        # Wait for user response with timeout
        try:
            await asyncio.wait_for(self.pending.event.wait(), timeout=self.timeout)
            approved = self.pending.state == PermissionState.APPROVED
            message = self.pending.deny_message
            self.pending = None
            return approved, message
        except asyncio.TimeoutError:
            self.pending.state = PermissionState.TIMEOUT
            self.pending = None
            return False, "Permission request timed out"

    def approve(self) -> bool:
        """Approve the pending permission.

        Returns:
            True if there was a pending permission to approve.
        """
        if not self.pending or self.pending.state != PermissionState.PENDING:
            return False
        self.pending.state = PermissionState.APPROVED
        self.pending.event.set()
        return True

    def deny(self, message: str | None = None) -> bool:
        """Deny the pending permission.

        Args:
            message: Optional message explaining denial.

        Returns:
            True if there was a pending permission to deny.
        """
        if not self.pending or self.pending.state != PermissionState.PENDING:
            return False
        self.pending.state = PermissionState.DENIED
        self.pending.deny_message = message or "User rejected"
        self.pending.event.set()
        return True

    def sticky_approve(self) -> StickyApproval | None:
        """Approve pending permission and create sticky rule for similar calls.

        Creates a sticky approval rule based on the current pending permission,
        then approves it.

        Returns:
            The created StickyApproval or None if no pending permission.
        """
        if not self.pending or self.pending.state != PermissionState.PENDING:
            return None

        tool_name = self.pending.tool_name
        field_name = TOOL_FIELD_NAMES.get(tool_name)

        # Create sticky approval for all calls to this tool
        sticky = StickyApproval(
            tool_name=tool_name,
            pattern=None,
            field_name=field_name,
        )
        self.sticky_approvals.append(sticky)

        # Approve the current request
        self.pending.state = PermissionState.APPROVED
        self.pending.event.set()

        return sticky

    def get_sticky_approvals(self) -> list[StickyApproval]:
        """Get all active sticky approvals.

        Returns:
            List of sticky approval rules.
        """
        return list(self.sticky_approvals)

    def clear_sticky_approvals(self) -> int:
        """Clear all sticky approvals.

        Returns:
            Number of approvals cleared.
        """
        count = len(self.sticky_approvals)
        self.sticky_approvals.clear()
        return count

    def remove_sticky_approval(self, index: int) -> StickyApproval | None:
        """Remove a sticky approval by index.

        Args:
            index: Index of the approval to remove (0-based).

        Returns:
            The removed StickyApproval or None if index invalid.
        """
        if 0 <= index < len(self.sticky_approvals):
            return self.sticky_approvals.pop(index)
        return None

__init__(timeout=300, notify_callback=None)

Initialize the permission handler.

Parameters:

Name Type Description Default
timeout int

Seconds to wait for user approval.

300
notify_callback Callable[[str, dict[str, Any]], Coroutine[Any, Any, None]] | None

Async function to notify user of permission request.

None
Source code in src/voice_agent/sessions/permissions.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def __init__(
    self,
    timeout: int = 300,
    notify_callback: Callable[[str, dict[str, Any]], Coroutine[Any, Any, None]]
    | None = None,
) -> None:
    """Initialize the permission handler.

    Args:
        timeout: Seconds to wait for user approval.
        notify_callback: Async function to notify user of permission request.
    """
    self.pending: PendingPermission | None = None
    self.timeout = timeout
    self.notify_callback = notify_callback
    self.sticky_approvals: list[StickyApproval] = []

approve()

Approve the pending permission.

Returns:

Type Description
bool

True if there was a pending permission to approve.

Source code in src/voice_agent/sessions/permissions.py
276
277
278
279
280
281
282
283
284
285
286
def approve(self) -> bool:
    """Approve the pending permission.

    Returns:
        True if there was a pending permission to approve.
    """
    if not self.pending or self.pending.state != PermissionState.PENDING:
        return False
    self.pending.state = PermissionState.APPROVED
    self.pending.event.set()
    return True

clear_sticky_approvals()

Clear all sticky approvals.

Returns:

Type Description
int

Number of approvals cleared.

Source code in src/voice_agent/sessions/permissions.py
341
342
343
344
345
346
347
348
349
def clear_sticky_approvals(self) -> int:
    """Clear all sticky approvals.

    Returns:
        Number of approvals cleared.
    """
    count = len(self.sticky_approvals)
    self.sticky_approvals.clear()
    return count

deny(message=None)

Deny the pending permission.

Parameters:

Name Type Description Default
message str | None

Optional message explaining denial.

None

Returns:

Type Description
bool

True if there was a pending permission to deny.

Source code in src/voice_agent/sessions/permissions.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def deny(self, message: str | None = None) -> bool:
    """Deny the pending permission.

    Args:
        message: Optional message explaining denial.

    Returns:
        True if there was a pending permission to deny.
    """
    if not self.pending or self.pending.state != PermissionState.PENDING:
        return False
    self.pending.state = PermissionState.DENIED
    self.pending.deny_message = message or "User rejected"
    self.pending.event.set()
    return True

get_pending_description()

Get a human-readable description of the pending permission.

Returns:

Type Description
str | None

Description string or None if no pending permission.

Source code in src/voice_agent/sessions/permissions.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def get_pending_description(self) -> str | None:
    """Get a human-readable description of the pending permission.

    Returns:
        Description string or None if no pending permission.
    """
    if not self.pending:
        return None

    tool = self.pending.tool_name
    inputs = self.pending.input_data

    if tool == "Bash":
        return f"Run command: {inputs.get('command', 'unknown')}"
    if tool == "Write":
        return f"Write file: {inputs.get('file_path', 'unknown')}"
    if tool == "Edit":
        return f"Edit file: {inputs.get('file_path', 'unknown')}"

    return f"Use tool: {tool}"

get_sticky_approvals()

Get all active sticky approvals.

Returns:

Type Description
list[StickyApproval]

List of sticky approval rules.

Source code in src/voice_agent/sessions/permissions.py
333
334
335
336
337
338
339
def get_sticky_approvals(self) -> list[StickyApproval]:
    """Get all active sticky approvals.

    Returns:
        List of sticky approval rules.
    """
    return list(self.sticky_approvals)

has_pending()

Check if there's a pending permission request.

Returns:

Type Description
bool

True if a permission is pending.

Source code in src/voice_agent/sessions/permissions.py
187
188
189
190
191
192
193
194
195
def has_pending(self) -> bool:
    """Check if there's a pending permission request.

    Returns:
        True if a permission is pending.
    """
    return (
        self.pending is not None and self.pending.state == PermissionState.PENDING
    )

remove_sticky_approval(index)

Remove a sticky approval by index.

Parameters:

Name Type Description Default
index int

Index of the approval to remove (0-based).

required

Returns:

Type Description
StickyApproval | None

The removed StickyApproval or None if index invalid.

Source code in src/voice_agent/sessions/permissions.py
351
352
353
354
355
356
357
358
359
360
361
362
def remove_sticky_approval(self, index: int) -> StickyApproval | None:
    """Remove a sticky approval by index.

    Args:
        index: Index of the approval to remove (0-based).

    Returns:
        The removed StickyApproval or None if index invalid.
    """
    if 0 <= index < len(self.sticky_approvals):
        return self.sticky_approvals.pop(index)
    return None

request_permission(tool_name, input_data) async

Request permission for a tool call.

Auto-approves safe tools and sticky approvals, queues others for user.

Parameters:

Name Type Description Default
tool_name str

Name of the tool.

required
input_data dict[str, Any]

Input parameters for the tool.

required

Returns:

Type Description
tuple[bool, str | None]

Tuple of (approved, deny_message).

Source code in src/voice_agent/sessions/permissions.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
async def request_permission(
    self, tool_name: str, input_data: dict[str, Any]
) -> tuple[bool, str | None]:
    """Request permission for a tool call.

    Auto-approves safe tools and sticky approvals, queues others for user.

    Args:
        tool_name: Name of the tool.
        input_data: Input parameters for the tool.

    Returns:
        Tuple of (approved, deny_message).
    """
    # Auto-approve safe tools
    if is_safe_tool_call(tool_name, input_data):
        return True, None

    # Check sticky approvals
    if self._check_sticky_approval(tool_name, input_data):
        return True, None

    # Create pending permission
    self.pending = PendingPermission(tool_name=tool_name, input_data=input_data)

    # Notify user if callback provided
    if self.notify_callback:
        await self.notify_callback(tool_name, input_data)

    # Wait for user response with timeout
    try:
        await asyncio.wait_for(self.pending.event.wait(), timeout=self.timeout)
        approved = self.pending.state == PermissionState.APPROVED
        message = self.pending.deny_message
        self.pending = None
        return approved, message
    except asyncio.TimeoutError:
        self.pending.state = PermissionState.TIMEOUT
        self.pending = None
        return False, "Permission request timed out"

sticky_approve()

Approve pending permission and create sticky rule for similar calls.

Creates a sticky approval rule based on the current pending permission, then approves it.

Returns:

Type Description
StickyApproval | None

The created StickyApproval or None if no pending permission.

Source code in src/voice_agent/sessions/permissions.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
def sticky_approve(self) -> StickyApproval | None:
    """Approve pending permission and create sticky rule for similar calls.

    Creates a sticky approval rule based on the current pending permission,
    then approves it.

    Returns:
        The created StickyApproval or None if no pending permission.
    """
    if not self.pending or self.pending.state != PermissionState.PENDING:
        return None

    tool_name = self.pending.tool_name
    field_name = TOOL_FIELD_NAMES.get(tool_name)

    # Create sticky approval for all calls to this tool
    sticky = StickyApproval(
        tool_name=tool_name,
        pattern=None,
        field_name=field_name,
    )
    self.sticky_approvals.append(sticky)

    # Approve the current request
    self.pending.state = PermissionState.APPROVED
    self.pending.event.set()

    return sticky

PermissionState

Bases: Enum

State of a pending permission request.

Source code in src/voice_agent/sessions/permissions.py
13
14
15
16
17
18
19
class PermissionState(Enum):
    """State of a pending permission request."""

    PENDING = auto()
    APPROVED = auto()
    DENIED = auto()
    TIMEOUT = auto()

Session dataclass

A Claude Code session.

Attributes:

Name Type Description
chat_id int

Telegram chat ID this session belongs to.

name str

Session name within the chat.

cwd str

Working directory for the session.

created_at datetime

When the session was created.

message_count int

Number of messages exchanged.

permission_handler PermissionHandler

Handler for permission requests.

sdk_client ClaudeSDKClient | None

Persistent ClaudeSDKClient instance.

claude_session_id str | None

Claude CLI session ID for resume.

Source code in src/voice_agent/sessions/manager.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@dataclass
class Session:
    """A Claude Code session.

    Attributes:
        chat_id: Telegram chat ID this session belongs to.
        name: Session name within the chat.
        cwd: Working directory for the session.
        created_at: When the session was created.
        message_count: Number of messages exchanged.
        permission_handler: Handler for permission requests.
        sdk_client: Persistent ClaudeSDKClient instance.
        claude_session_id: Claude CLI session ID for resume.
    """

    chat_id: int
    name: str
    cwd: str
    created_at: datetime = field(default_factory=datetime.now)
    message_count: int = 0
    permission_handler: PermissionHandler = field(default_factory=PermissionHandler)
    sdk_client: "ClaudeSDKClient | None" = None
    claude_session_id: str | None = None

    def get_status(self) -> str:
        """Get a human-readable status of this session.

        Returns:
            Status string.
        """
        age = datetime.now() - self.created_at
        hours, remainder = divmod(int(age.total_seconds()), 3600)
        minutes, _ = divmod(remainder, 60)

        status_parts = [
            f"Session: {self.name}",
            f"Working directory: {self.cwd}",
            f"Messages: {self.message_count}",
            f"Age: {hours}h {minutes}m",
        ]

        if self.permission_handler.has_pending():
            desc = self.permission_handler.get_pending_description()
            status_parts.append(f"Pending approval: {desc}")

        sticky_approvals = self.permission_handler.get_sticky_approvals()
        if sticky_approvals:
            status_parts.append(f"Sticky approvals ({len(sticky_approvals)}):")
            for approval in sticky_approvals:
                status_parts.append(f"  - {approval.describe()}")

        return "\n".join(status_parts)

get_status()

Get a human-readable status of this session.

Returns:

Type Description
str

Status string.

Source code in src/voice_agent/sessions/manager.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def get_status(self) -> str:
    """Get a human-readable status of this session.

    Returns:
        Status string.
    """
    age = datetime.now() - self.created_at
    hours, remainder = divmod(int(age.total_seconds()), 3600)
    minutes, _ = divmod(remainder, 60)

    status_parts = [
        f"Session: {self.name}",
        f"Working directory: {self.cwd}",
        f"Messages: {self.message_count}",
        f"Age: {hours}h {minutes}m",
    ]

    if self.permission_handler.has_pending():
        desc = self.permission_handler.get_pending_description()
        status_parts.append(f"Pending approval: {desc}")

    sticky_approvals = self.permission_handler.get_sticky_approvals()
    if sticky_approvals:
        status_parts.append(f"Sticky approvals ({len(sticky_approvals)}):")
        for approval in sticky_approvals:
            status_parts.append(f"  - {approval.describe()}")

    return "\n".join(status_parts)

SessionInfo dataclass

Summary info for a session, used in listings.

Attributes:

Name Type Description
name str

Session name.

message_count int

Number of messages exchanged.

cwd str

Working directory.

is_active bool

Whether this is the active session.

Source code in src/voice_agent/sessions/manager.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@dataclass
class SessionInfo:
    """Summary info for a session, used in listings.

    Attributes:
        name: Session name.
        message_count: Number of messages exchanged.
        cwd: Working directory.
        is_active: Whether this is the active session.
    """

    name: str
    message_count: int
    cwd: str
    is_active: bool

SessionManager

Manages Claude Code sessions per chat with multi-session support.

Attributes:

Name Type Description
sessions dict[int, dict[str, Session]]

Mapping of chat_id to dict of session_name to Session.

active_sessions dict[int, str]

Mapping of chat_id to active session name.

default_cwd

Default working directory for new sessions.

permission_timeout

Timeout for permission requests.

storage

Optional persistent storage for sessions.

Source code in src/voice_agent/sessions/manager.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
class SessionManager:
    """Manages Claude Code sessions per chat with multi-session support.

    Attributes:
        sessions: Mapping of chat_id to dict of session_name to Session.
        active_sessions: Mapping of chat_id to active session name.
        default_cwd: Default working directory for new sessions.
        permission_timeout: Timeout for permission requests.
        storage: Optional persistent storage for sessions.
    """

    def __init__(
        self,
        default_cwd: str = "/code",
        permission_timeout: int = 300,
        storage: "SessionStorage | None" = None,
    ) -> None:
        """Initialize the session manager.

        Args:
            default_cwd: Default working directory for new sessions.
            permission_timeout: Timeout in seconds for permission requests.
            storage: Optional storage for session persistence.
        """
        self.sessions: dict[int, dict[str, Session]] = {}
        self.active_sessions: dict[int, str] = {}
        self.default_cwd = default_cwd
        self.permission_timeout = permission_timeout
        self.storage = storage
        self._notify_callbacks: dict[int, Any] = {}
        self._restore_sessions()

    def _restore_sessions(self) -> None:
        """Restore sessions from storage."""
        if not self.storage:
            return

        for chat_id in self.storage.list_all_chats():
            state = self.storage.get_chat_state(chat_id)
            if not state:
                continue

            self.sessions[chat_id] = {}
            self.active_sessions[chat_id] = state.active_session

            for stored in state.sessions.values():
                try:
                    created_at = datetime.fromisoformat(stored.created_at)
                except ValueError:
                    created_at = datetime.now()

                session = Session(
                    chat_id=stored.chat_id,
                    name=stored.name,
                    cwd=stored.cwd,
                    created_at=created_at,
                    message_count=stored.message_count,
                    claude_session_id=stored.claude_session_id,
                    permission_handler=PermissionHandler(
                        timeout=self.permission_timeout,
                        notify_callback=self._notify_callbacks.get(stored.chat_id),
                    ),
                )
                self.sessions[chat_id][stored.name] = session

    def _persist_session(self, session: Session) -> None:
        """Persist a session to storage."""
        if not self.storage:
            return

        from voice_agent.sessions.storage import StoredSession

        stored = StoredSession(
            chat_id=session.chat_id,
            name=session.name,
            cwd=session.cwd,
            created_at=session.created_at.isoformat(),
            message_count=session.message_count,
            claude_session_id=session.claude_session_id,
        )
        self.storage.save(stored)

    def set_notify_callback(self, chat_id: int, callback: Any) -> None:
        """Set the notification callback for a chat.

        Args:
            chat_id: Telegram chat ID.
            callback: Async function to call for notifications.
        """
        self._notify_callbacks[chat_id] = callback
        # Also update existing session's permission handler
        if chat_id in self.sessions:
            for session in self.sessions[chat_id].values():
                session.permission_handler.notify_callback = callback

    def _get_active_session_name(self, chat_id: int) -> str:
        """Get the active session name for a chat.

        Args:
            chat_id: Telegram chat ID.

        Returns:
            Active session name, defaults to "main".
        """
        return self.active_sessions.get(chat_id, "main")

    def get_or_create(
        self, chat_id: int, cwd: str | None = None, name: str | None = None
    ) -> Session:
        """Get existing active session or create new one.

        Args:
            chat_id: Telegram chat ID.
            cwd: Working directory (uses default if not specified).
            name: Session name (uses active session if not specified).

        Returns:
            The session for this chat.
        """
        if chat_id not in self.sessions:
            self.sessions[chat_id] = {}
            self.active_sessions[chat_id] = "main"

        session_name = name or self._get_active_session_name(chat_id)

        if session_name not in self.sessions[chat_id]:
            effective_cwd = cwd or self.default_cwd
            session = Session(
                chat_id=chat_id,
                name=session_name,
                cwd=effective_cwd,
                permission_handler=PermissionHandler(
                    timeout=self.permission_timeout,
                    notify_callback=self._notify_callbacks.get(chat_id),
                ),
            )
            self.sessions[chat_id][session_name] = session
            self._persist_session(session)
            # If creating the active session name, ensure it's set
            if session_name == self._get_active_session_name(chat_id):
                self.active_sessions[chat_id] = session_name
                if self.storage:
                    self.storage.set_active_session(chat_id, session_name)

        return self.sessions[chat_id][session_name]

    async def create_new_async(
        self, chat_id: int, cwd: str | None = None, name: str | None = None
    ) -> Session:
        """Create a new session, replacing any existing one with same name.

        Args:
            chat_id: Telegram chat ID.
            cwd: Working directory.
            name: Session name (uses "main" if not specified).

        Returns:
            The new session.
        """
        if chat_id not in self.sessions:
            self.sessions[chat_id] = {}
            self.active_sessions[chat_id] = "main"

        session_name = name or "main"

        # Clean up old session if exists
        if session_name in self.sessions.get(chat_id, {}):
            old_session = self.sessions[chat_id][session_name]
            await self._close_client(old_session)

        effective_cwd = cwd or self.default_cwd
        session = Session(
            chat_id=chat_id,
            name=session_name,
            cwd=effective_cwd,
            permission_handler=PermissionHandler(
                timeout=self.permission_timeout,
                notify_callback=self._notify_callbacks.get(chat_id),
            ),
        )
        self.sessions[chat_id][session_name] = session
        self.active_sessions[chat_id] = session_name
        self._persist_session(session)
        if self.storage:
            self.storage.set_active_session(chat_id, session_name)
        return session

    def create_new(
        self, chat_id: int, cwd: str | None = None, name: str | None = None
    ) -> Session:
        """Create a new session synchronously (closes client in background).

        Args:
            chat_id: Telegram chat ID.
            cwd: Working directory.
            name: Session name (uses "main" if not specified).

        Returns:
            The new session.
        """
        if chat_id not in self.sessions:
            self.sessions[chat_id] = {}
            self.active_sessions[chat_id] = "main"

        session_name = name or "main"

        # Clean up old session if exists
        if session_name in self.sessions.get(chat_id, {}):
            old_session = self.sessions[chat_id][session_name]
            if old_session.sdk_client is not None:
                asyncio.create_task(self._close_client(old_session))

        effective_cwd = cwd or self.default_cwd
        session = Session(
            chat_id=chat_id,
            name=session_name,
            cwd=effective_cwd,
            permission_handler=PermissionHandler(
                timeout=self.permission_timeout,
                notify_callback=self._notify_callbacks.get(chat_id),
            ),
        )
        self.sessions[chat_id][session_name] = session
        self.active_sessions[chat_id] = session_name
        self._persist_session(session)
        if self.storage:
            self.storage.set_active_session(chat_id, session_name)
        return session

    def get(self, chat_id: int, name: str | None = None) -> Session | None:
        """Get session for a chat if it exists.

        Args:
            chat_id: Telegram chat ID.
            name: Session name (uses active session if not specified).

        Returns:
            Session or None.
        """
        if chat_id not in self.sessions:
            return None

        session_name = name or self._get_active_session_name(chat_id)
        return self.sessions[chat_id].get(session_name)

    def list_sessions(self, chat_id: int) -> list[SessionInfo]:
        """List all sessions for a chat.

        Args:
            chat_id: Telegram chat ID.

        Returns:
            List of SessionInfo objects.
        """
        if chat_id not in self.sessions:
            return []

        active = self._get_active_session_name(chat_id)
        return [
            SessionInfo(
                name=session.name,
                message_count=session.message_count,
                cwd=session.cwd,
                is_active=session.name == active,
            )
            for session in self.sessions[chat_id].values()
        ]

    def get_active_session_name(self, chat_id: int) -> str | None:
        """Get the name of the active session.

        Args:
            chat_id: Telegram chat ID.

        Returns:
            Active session name or None if no sessions.
        """
        if chat_id not in self.sessions:
            return None
        return self._get_active_session_name(chat_id)

    def switch_session(self, chat_id: int, name: str) -> Session | None:
        """Switch to a different session.

        Args:
            chat_id: Telegram chat ID.
            name: Session name to switch to.

        Returns:
            The switched-to session, or None if not found.
        """
        if chat_id not in self.sessions:
            return None

        if name not in self.sessions[chat_id]:
            return None

        self.active_sessions[chat_id] = name
        if self.storage:
            self.storage.set_active_session(chat_id, name)
        return self.sessions[chat_id][name]

    def generate_session_name(self, chat_id: int) -> str:
        """Generate a unique session name for a chat.

        Args:
            chat_id: Telegram chat ID.

        Returns:
            A unique session name like "session-2", "session-3", etc.
        """
        if chat_id not in self.sessions:
            return "session-2"

        existing = set(self.sessions[chat_id].keys())
        counter = 2
        while f"session-{counter}" in existing:
            counter += 1
        return f"session-{counter}"

    def rename_session(self, chat_id: int, old_name: str, new_name: str) -> bool:
        """Rename a session.

        Args:
            chat_id: Telegram chat ID.
            old_name: Current session name.
            new_name: New session name.

        Returns:
            True if renamed, False if not found or name already exists.
        """
        if chat_id not in self.sessions:
            return False

        if old_name not in self.sessions[chat_id]:
            return False

        if new_name in self.sessions[chat_id]:
            return False

        session = self.sessions[chat_id].pop(old_name)
        session.name = new_name
        self.sessions[chat_id][new_name] = session

        if self.active_sessions.get(chat_id) == old_name:
            self.active_sessions[chat_id] = new_name

        if self.storage:
            self.storage.rename_session(chat_id, old_name, new_name)

        return True

    def set_cwd(self, chat_id: int, cwd: str) -> Session:
        """Set the working directory for the active session.

        Args:
            chat_id: Telegram chat ID.
            cwd: New working directory.

        Returns:
            The updated session.
        """
        session = self.get_or_create(chat_id)
        session.cwd = cwd
        self._persist_session(session)
        return session

    async def _get_or_create_client(self, session: Session) -> "ClaudeSDKClient":
        """Get or create a ClaudeSDKClient for the session.

        Args:
            session: The session to get/create client for.

        Returns:
            ClaudeSDKClient instance.
        """
        if session.sdk_client is None:
            import shutil

            from claude_agent_sdk import (
                ClaudeAgentOptions,
                ClaudeSDKClient,
                PermissionResultAllow,
                PermissionResultDeny,
                ToolPermissionContext,
            )

            # Use system Claude CLI (2.0+) instead of bundled SDK version (1.3.5)
            # The SDK's bundled CLI is too old and lacks required features
            cli_path = shutil.which("claude")

            async def permission_callback(
                tool_name: str,
                input_data: dict[str, Any],
                context: ToolPermissionContext,
            ) -> PermissionResultAllow | PermissionResultDeny:
                """Handle tool permission requests via the session's handler."""
                (
                    approved,
                    deny_message,
                ) = await session.permission_handler.request_permission(
                    tool_name, input_data
                )
                if approved:
                    return PermissionResultAllow()
                return PermissionResultDeny(message=deny_message or "Permission denied")

            options = ClaudeAgentOptions(
                cwd=session.cwd,
                can_use_tool=permission_callback,
                cli_path=cli_path,
                # Load user, project, and local settings (CLAUDE.md, MCP servers, etc.)
                setting_sources=["user", "project", "local"],
                # Resume prior conversation if we have a stored session ID
                resume=session.claude_session_id,
            )
            session.sdk_client = ClaudeSDKClient(options=options)
            await session.sdk_client.__aenter__()
            logger.info(
                "Created new SDK client for chat %s session %s (CLI: %s)",
                session.chat_id,
                session.name,
                cli_path,
            )

        return session.sdk_client

    async def _close_client(self, session: Session) -> None:
        """Close the SDK client for a session.

        Args:
            session: The session whose client to close.
        """
        if session.sdk_client is not None:
            client = session.sdk_client
            session.sdk_client = None

            # The SDK client cannot be closed from a different async task than
            # where it was created. Instead of calling __aexit__, we directly
            # terminate the underlying subprocess to avoid spinning task groups.
            try:
                transport = getattr(client, "_transport", None)
                if transport is not None:
                    process = getattr(transport, "_process", None)
                    if process is not None:
                        process.terminate()
                        logger.info("Terminated SDK client subprocess")
            except Exception as e:
                logger.warning("Error terminating SDK client: %s", e)

    def _build_multimodal_message(
        self,
        prompt: str,
        images: list[ImageAttachment],
    ) -> dict[str, Any]:
        """Build a stream-json user message with image content blocks.

        Args:
            prompt: Text prompt to accompany the images.
            images: Image attachments to include.

        Returns:
            A message dict suitable for ``client.query()``.
        """
        content: list[dict[str, Any]] = [
            {
                "type": "image",
                "source": {
                    "type": "base64",
                    "media_type": img.media_type,
                    "data": img.data,
                },
            }
            for img in images
        ]
        content.append({"type": "text", "text": prompt})
        return {
            "type": "user",
            "message": {"role": "user", "content": content},
            "parent_tool_use_id": None,
            "session_id": "default",
        }

    async def send_prompt(
        self,
        chat_id: int,
        prompt: str,
        resume: bool = True,
        images: list[ImageAttachment] | None = None,
    ) -> AsyncIterator[str]:
        """Send a prompt to the active session and stream responses.

        Uses ClaudeSDKClient for persistent sessions - no token reload.

        Args:
            chat_id: Telegram chat ID.
            prompt: The prompt to send.
            resume: Whether to resume existing session if available.
            images: Optional image attachments for multimodal prompts.

        Yields:
            Response chunks from Claude.
        """
        from claude_agent_sdk import AssistantMessage, ResultMessage, TextBlock

        session = self.get_or_create(chat_id)
        session.message_count += 1

        try:
            client = await self._get_or_create_client(session)

            if images:
                user_msg = self._build_multimodal_message(prompt, images)

                async def _single_message() -> AsyncIterator[dict[str, Any]]:
                    yield user_msg

                await client.query(_single_message())
            else:
                await client.query(prompt)

            async for msg in client.receive_response():
                if isinstance(msg, AssistantMessage):
                    for block in msg.content:
                        if isinstance(block, TextBlock):
                            yield block.text
                elif isinstance(msg, ResultMessage):
                    if msg.session_id and msg.session_id != session.claude_session_id:
                        session.claude_session_id = msg.session_id
                        logger.info(
                            "Chat %s session %s claude_session_id: %s",
                            chat_id,
                            session.name,
                            msg.session_id,
                        )
                    if msg.total_cost_usd:
                        logger.info(
                            "Chat %s session %s cost: $%.4f",
                            chat_id,
                            session.name,
                            msg.total_cost_usd,
                        )

            # Persist updated session
            self._persist_session(session)

        except ImportError:
            yield "Error: claude-agent-sdk not installed."
        except Exception as e:
            logger.exception("Error in send_prompt for chat %s", chat_id)
            yield f"Error: {e}"
            # Close client on error so it can be recreated
            await self._close_client(session)

    def get_status(self, chat_id: int) -> str | None:
        """Get status of the active session.

        Args:
            chat_id: Telegram chat ID.

        Returns:
            Status string or None if no session.
        """
        session = self.get(chat_id)
        if not session:
            return None
        return session.get_status()

    async def close_session_async(self, chat_id: int, name: str) -> bool:
        """Close a specific session asynchronously.

        Args:
            chat_id: Telegram chat ID.
            name: Session name to close.

        Returns:
            True if session was closed, False if not found.
        """
        if chat_id not in self.sessions:
            return False

        if name not in self.sessions[chat_id]:
            return False

        session = self.sessions[chat_id][name]
        await self._close_client(session)

        del self.sessions[chat_id][name]

        if self.storage:
            self.storage.delete_session(chat_id, name)

        # Update active session if we closed the active one
        if self.active_sessions.get(chat_id) == name:
            if self.sessions[chat_id]:
                new_active = next(iter(self.sessions[chat_id]))
                self.active_sessions[chat_id] = new_active
                if self.storage:
                    self.storage.set_active_session(chat_id, new_active)
            else:
                del self.sessions[chat_id]
                del self.active_sessions[chat_id]

        return True

    def close_session(self, chat_id: int, name: str) -> bool:
        """Close a specific session synchronously.

        Args:
            chat_id: Telegram chat ID.
            name: Session name to close.

        Returns:
            True if session was closed, False if not found.
        """
        if chat_id not in self.sessions:
            return False

        if name not in self.sessions[chat_id]:
            return False

        session = self.sessions[chat_id][name]
        if session.sdk_client is not None:
            asyncio.create_task(self._close_client(session))

        del self.sessions[chat_id][name]

        if self.storage:
            self.storage.delete_session(chat_id, name)

        # Update active session if we closed the active one
        if self.active_sessions.get(chat_id) == name:
            if self.sessions[chat_id]:
                new_active = next(iter(self.sessions[chat_id]))
                self.active_sessions[chat_id] = new_active
                if self.storage:
                    self.storage.set_active_session(chat_id, new_active)
            else:
                del self.sessions[chat_id]
                del self.active_sessions[chat_id]

        return True

    # Legacy compatibility methods

    async def delete_session_async(self, chat_id: int) -> bool:
        """Delete the active session asynchronously (legacy compatibility).

        Args:
            chat_id: Telegram chat ID.

        Returns:
            True if session was deleted, False if not found.
        """
        name = self._get_active_session_name(chat_id)
        return await self.close_session_async(chat_id, name)

    def delete_session(self, chat_id: int) -> bool:
        """Delete the active session synchronously (legacy compatibility).

        Args:
            chat_id: Telegram chat ID.

        Returns:
            True if session was deleted, False if not found.
        """
        name = self._get_active_session_name(chat_id)
        return self.close_session(chat_id, name)

    def set_claude_session_id(self, chat_id: int, session_id: str | None) -> None:
        """Set the Claude session ID for resume capability.

        Args:
            chat_id: Telegram chat ID.
            session_id: Claude CLI session ID or None to clear.
        """
        session = self.get(chat_id)
        if session:
            session.claude_session_id = session_id
            self._persist_session(session)

    def has_resumable_session(self, chat_id: int) -> bool:
        """Check if a chat has a resumable Claude session.

        Args:
            chat_id: Telegram chat ID.

        Returns:
            True if there's a session with a Claude session ID.
        """
        session = self.get(chat_id)
        return session is not None and session.claude_session_id is not None

__init__(default_cwd='/code', permission_timeout=300, storage=None)

Initialize the session manager.

Parameters:

Name Type Description Default
default_cwd str

Default working directory for new sessions.

'/code'
permission_timeout int

Timeout in seconds for permission requests.

300
storage SessionStorage | None

Optional storage for session persistence.

None
Source code in src/voice_agent/sessions/manager.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def __init__(
    self,
    default_cwd: str = "/code",
    permission_timeout: int = 300,
    storage: "SessionStorage | None" = None,
) -> None:
    """Initialize the session manager.

    Args:
        default_cwd: Default working directory for new sessions.
        permission_timeout: Timeout in seconds for permission requests.
        storage: Optional storage for session persistence.
    """
    self.sessions: dict[int, dict[str, Session]] = {}
    self.active_sessions: dict[int, str] = {}
    self.default_cwd = default_cwd
    self.permission_timeout = permission_timeout
    self.storage = storage
    self._notify_callbacks: dict[int, Any] = {}
    self._restore_sessions()

close_session(chat_id, name)

Close a specific session synchronously.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
name str

Session name to close.

required

Returns:

Type Description
bool

True if session was closed, False if not found.

Source code in src/voice_agent/sessions/manager.py
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
def close_session(self, chat_id: int, name: str) -> bool:
    """Close a specific session synchronously.

    Args:
        chat_id: Telegram chat ID.
        name: Session name to close.

    Returns:
        True if session was closed, False if not found.
    """
    if chat_id not in self.sessions:
        return False

    if name not in self.sessions[chat_id]:
        return False

    session = self.sessions[chat_id][name]
    if session.sdk_client is not None:
        asyncio.create_task(self._close_client(session))

    del self.sessions[chat_id][name]

    if self.storage:
        self.storage.delete_session(chat_id, name)

    # Update active session if we closed the active one
    if self.active_sessions.get(chat_id) == name:
        if self.sessions[chat_id]:
            new_active = next(iter(self.sessions[chat_id]))
            self.active_sessions[chat_id] = new_active
            if self.storage:
                self.storage.set_active_session(chat_id, new_active)
        else:
            del self.sessions[chat_id]
            del self.active_sessions[chat_id]

    return True

close_session_async(chat_id, name) async

Close a specific session asynchronously.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
name str

Session name to close.

required

Returns:

Type Description
bool

True if session was closed, False if not found.

Source code in src/voice_agent/sessions/manager.py
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
async def close_session_async(self, chat_id: int, name: str) -> bool:
    """Close a specific session asynchronously.

    Args:
        chat_id: Telegram chat ID.
        name: Session name to close.

    Returns:
        True if session was closed, False if not found.
    """
    if chat_id not in self.sessions:
        return False

    if name not in self.sessions[chat_id]:
        return False

    session = self.sessions[chat_id][name]
    await self._close_client(session)

    del self.sessions[chat_id][name]

    if self.storage:
        self.storage.delete_session(chat_id, name)

    # Update active session if we closed the active one
    if self.active_sessions.get(chat_id) == name:
        if self.sessions[chat_id]:
            new_active = next(iter(self.sessions[chat_id]))
            self.active_sessions[chat_id] = new_active
            if self.storage:
                self.storage.set_active_session(chat_id, new_active)
        else:
            del self.sessions[chat_id]
            del self.active_sessions[chat_id]

    return True

create_new(chat_id, cwd=None, name=None)

Create a new session synchronously (closes client in background).

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
cwd str | None

Working directory.

None
name str | None

Session name (uses "main" if not specified).

None

Returns:

Type Description
Session

The new session.

Source code in src/voice_agent/sessions/manager.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
def create_new(
    self, chat_id: int, cwd: str | None = None, name: str | None = None
) -> Session:
    """Create a new session synchronously (closes client in background).

    Args:
        chat_id: Telegram chat ID.
        cwd: Working directory.
        name: Session name (uses "main" if not specified).

    Returns:
        The new session.
    """
    if chat_id not in self.sessions:
        self.sessions[chat_id] = {}
        self.active_sessions[chat_id] = "main"

    session_name = name or "main"

    # Clean up old session if exists
    if session_name in self.sessions.get(chat_id, {}):
        old_session = self.sessions[chat_id][session_name]
        if old_session.sdk_client is not None:
            asyncio.create_task(self._close_client(old_session))

    effective_cwd = cwd or self.default_cwd
    session = Session(
        chat_id=chat_id,
        name=session_name,
        cwd=effective_cwd,
        permission_handler=PermissionHandler(
            timeout=self.permission_timeout,
            notify_callback=self._notify_callbacks.get(chat_id),
        ),
    )
    self.sessions[chat_id][session_name] = session
    self.active_sessions[chat_id] = session_name
    self._persist_session(session)
    if self.storage:
        self.storage.set_active_session(chat_id, session_name)
    return session

create_new_async(chat_id, cwd=None, name=None) async

Create a new session, replacing any existing one with same name.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
cwd str | None

Working directory.

None
name str | None

Session name (uses "main" if not specified).

None

Returns:

Type Description
Session

The new session.

Source code in src/voice_agent/sessions/manager.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
async def create_new_async(
    self, chat_id: int, cwd: str | None = None, name: str | None = None
) -> Session:
    """Create a new session, replacing any existing one with same name.

    Args:
        chat_id: Telegram chat ID.
        cwd: Working directory.
        name: Session name (uses "main" if not specified).

    Returns:
        The new session.
    """
    if chat_id not in self.sessions:
        self.sessions[chat_id] = {}
        self.active_sessions[chat_id] = "main"

    session_name = name or "main"

    # Clean up old session if exists
    if session_name in self.sessions.get(chat_id, {}):
        old_session = self.sessions[chat_id][session_name]
        await self._close_client(old_session)

    effective_cwd = cwd or self.default_cwd
    session = Session(
        chat_id=chat_id,
        name=session_name,
        cwd=effective_cwd,
        permission_handler=PermissionHandler(
            timeout=self.permission_timeout,
            notify_callback=self._notify_callbacks.get(chat_id),
        ),
    )
    self.sessions[chat_id][session_name] = session
    self.active_sessions[chat_id] = session_name
    self._persist_session(session)
    if self.storage:
        self.storage.set_active_session(chat_id, session_name)
    return session

delete_session(chat_id)

Delete the active session synchronously (legacy compatibility).

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required

Returns:

Type Description
bool

True if session was deleted, False if not found.

Source code in src/voice_agent/sessions/manager.py
752
753
754
755
756
757
758
759
760
761
762
def delete_session(self, chat_id: int) -> bool:
    """Delete the active session synchronously (legacy compatibility).

    Args:
        chat_id: Telegram chat ID.

    Returns:
        True if session was deleted, False if not found.
    """
    name = self._get_active_session_name(chat_id)
    return self.close_session(chat_id, name)

delete_session_async(chat_id) async

Delete the active session asynchronously (legacy compatibility).

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required

Returns:

Type Description
bool

True if session was deleted, False if not found.

Source code in src/voice_agent/sessions/manager.py
740
741
742
743
744
745
746
747
748
749
750
async def delete_session_async(self, chat_id: int) -> bool:
    """Delete the active session asynchronously (legacy compatibility).

    Args:
        chat_id: Telegram chat ID.

    Returns:
        True if session was deleted, False if not found.
    """
    name = self._get_active_session_name(chat_id)
    return await self.close_session_async(chat_id, name)

generate_session_name(chat_id)

Generate a unique session name for a chat.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required

Returns:

Type Description
str

A unique session name like "session-2", "session-3", etc.

Source code in src/voice_agent/sessions/manager.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
def generate_session_name(self, chat_id: int) -> str:
    """Generate a unique session name for a chat.

    Args:
        chat_id: Telegram chat ID.

    Returns:
        A unique session name like "session-2", "session-3", etc.
    """
    if chat_id not in self.sessions:
        return "session-2"

    existing = set(self.sessions[chat_id].keys())
    counter = 2
    while f"session-{counter}" in existing:
        counter += 1
    return f"session-{counter}"

get(chat_id, name=None)

Get session for a chat if it exists.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
name str | None

Session name (uses active session if not specified).

None

Returns:

Type Description
Session | None

Session or None.

Source code in src/voice_agent/sessions/manager.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
def get(self, chat_id: int, name: str | None = None) -> Session | None:
    """Get session for a chat if it exists.

    Args:
        chat_id: Telegram chat ID.
        name: Session name (uses active session if not specified).

    Returns:
        Session or None.
    """
    if chat_id not in self.sessions:
        return None

    session_name = name or self._get_active_session_name(chat_id)
    return self.sessions[chat_id].get(session_name)

get_active_session_name(chat_id)

Get the name of the active session.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required

Returns:

Type Description
str | None

Active session name or None if no sessions.

Source code in src/voice_agent/sessions/manager.py
363
364
365
366
367
368
369
370
371
372
373
374
def get_active_session_name(self, chat_id: int) -> str | None:
    """Get the name of the active session.

    Args:
        chat_id: Telegram chat ID.

    Returns:
        Active session name or None if no sessions.
    """
    if chat_id not in self.sessions:
        return None
    return self._get_active_session_name(chat_id)

get_or_create(chat_id, cwd=None, name=None)

Get existing active session or create new one.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
cwd str | None

Working directory (uses default if not specified).

None
name str | None

Session name (uses active session if not specified).

None

Returns:

Type Description
Session

The session for this chat.

Source code in src/voice_agent/sessions/manager.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def get_or_create(
    self, chat_id: int, cwd: str | None = None, name: str | None = None
) -> Session:
    """Get existing active session or create new one.

    Args:
        chat_id: Telegram chat ID.
        cwd: Working directory (uses default if not specified).
        name: Session name (uses active session if not specified).

    Returns:
        The session for this chat.
    """
    if chat_id not in self.sessions:
        self.sessions[chat_id] = {}
        self.active_sessions[chat_id] = "main"

    session_name = name or self._get_active_session_name(chat_id)

    if session_name not in self.sessions[chat_id]:
        effective_cwd = cwd or self.default_cwd
        session = Session(
            chat_id=chat_id,
            name=session_name,
            cwd=effective_cwd,
            permission_handler=PermissionHandler(
                timeout=self.permission_timeout,
                notify_callback=self._notify_callbacks.get(chat_id),
            ),
        )
        self.sessions[chat_id][session_name] = session
        self._persist_session(session)
        # If creating the active session name, ensure it's set
        if session_name == self._get_active_session_name(chat_id):
            self.active_sessions[chat_id] = session_name
            if self.storage:
                self.storage.set_active_session(chat_id, session_name)

    return self.sessions[chat_id][session_name]

get_status(chat_id)

Get status of the active session.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required

Returns:

Type Description
str | None

Status string or None if no session.

Source code in src/voice_agent/sessions/manager.py
649
650
651
652
653
654
655
656
657
658
659
660
661
def get_status(self, chat_id: int) -> str | None:
    """Get status of the active session.

    Args:
        chat_id: Telegram chat ID.

    Returns:
        Status string or None if no session.
    """
    session = self.get(chat_id)
    if not session:
        return None
    return session.get_status()

has_resumable_session(chat_id)

Check if a chat has a resumable Claude session.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required

Returns:

Type Description
bool

True if there's a session with a Claude session ID.

Source code in src/voice_agent/sessions/manager.py
776
777
778
779
780
781
782
783
784
785
786
def has_resumable_session(self, chat_id: int) -> bool:
    """Check if a chat has a resumable Claude session.

    Args:
        chat_id: Telegram chat ID.

    Returns:
        True if there's a session with a Claude session ID.
    """
    session = self.get(chat_id)
    return session is not None and session.claude_session_id is not None

list_sessions(chat_id)

List all sessions for a chat.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required

Returns:

Type Description
list[SessionInfo]

List of SessionInfo objects.

Source code in src/voice_agent/sessions/manager.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
def list_sessions(self, chat_id: int) -> list[SessionInfo]:
    """List all sessions for a chat.

    Args:
        chat_id: Telegram chat ID.

    Returns:
        List of SessionInfo objects.
    """
    if chat_id not in self.sessions:
        return []

    active = self._get_active_session_name(chat_id)
    return [
        SessionInfo(
            name=session.name,
            message_count=session.message_count,
            cwd=session.cwd,
            is_active=session.name == active,
        )
        for session in self.sessions[chat_id].values()
    ]

rename_session(chat_id, old_name, new_name)

Rename a session.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
old_name str

Current session name.

required
new_name str

New session name.

required

Returns:

Type Description
bool

True if renamed, False if not found or name already exists.

Source code in src/voice_agent/sessions/manager.py
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
def rename_session(self, chat_id: int, old_name: str, new_name: str) -> bool:
    """Rename a session.

    Args:
        chat_id: Telegram chat ID.
        old_name: Current session name.
        new_name: New session name.

    Returns:
        True if renamed, False if not found or name already exists.
    """
    if chat_id not in self.sessions:
        return False

    if old_name not in self.sessions[chat_id]:
        return False

    if new_name in self.sessions[chat_id]:
        return False

    session = self.sessions[chat_id].pop(old_name)
    session.name = new_name
    self.sessions[chat_id][new_name] = session

    if self.active_sessions.get(chat_id) == old_name:
        self.active_sessions[chat_id] = new_name

    if self.storage:
        self.storage.rename_session(chat_id, old_name, new_name)

    return True

send_prompt(chat_id, prompt, resume=True, images=None) async

Send a prompt to the active session and stream responses.

Uses ClaudeSDKClient for persistent sessions - no token reload.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
prompt str

The prompt to send.

required
resume bool

Whether to resume existing session if available.

True
images list[ImageAttachment] | None

Optional image attachments for multimodal prompts.

None

Yields:

Type Description
AsyncIterator[str]

Response chunks from Claude.

Source code in src/voice_agent/sessions/manager.py
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
async def send_prompt(
    self,
    chat_id: int,
    prompt: str,
    resume: bool = True,
    images: list[ImageAttachment] | None = None,
) -> AsyncIterator[str]:
    """Send a prompt to the active session and stream responses.

    Uses ClaudeSDKClient for persistent sessions - no token reload.

    Args:
        chat_id: Telegram chat ID.
        prompt: The prompt to send.
        resume: Whether to resume existing session if available.
        images: Optional image attachments for multimodal prompts.

    Yields:
        Response chunks from Claude.
    """
    from claude_agent_sdk import AssistantMessage, ResultMessage, TextBlock

    session = self.get_or_create(chat_id)
    session.message_count += 1

    try:
        client = await self._get_or_create_client(session)

        if images:
            user_msg = self._build_multimodal_message(prompt, images)

            async def _single_message() -> AsyncIterator[dict[str, Any]]:
                yield user_msg

            await client.query(_single_message())
        else:
            await client.query(prompt)

        async for msg in client.receive_response():
            if isinstance(msg, AssistantMessage):
                for block in msg.content:
                    if isinstance(block, TextBlock):
                        yield block.text
            elif isinstance(msg, ResultMessage):
                if msg.session_id and msg.session_id != session.claude_session_id:
                    session.claude_session_id = msg.session_id
                    logger.info(
                        "Chat %s session %s claude_session_id: %s",
                        chat_id,
                        session.name,
                        msg.session_id,
                    )
                if msg.total_cost_usd:
                    logger.info(
                        "Chat %s session %s cost: $%.4f",
                        chat_id,
                        session.name,
                        msg.total_cost_usd,
                    )

        # Persist updated session
        self._persist_session(session)

    except ImportError:
        yield "Error: claude-agent-sdk not installed."
    except Exception as e:
        logger.exception("Error in send_prompt for chat %s", chat_id)
        yield f"Error: {e}"
        # Close client on error so it can be recreated
        await self._close_client(session)

set_claude_session_id(chat_id, session_id)

Set the Claude session ID for resume capability.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
session_id str | None

Claude CLI session ID or None to clear.

required
Source code in src/voice_agent/sessions/manager.py
764
765
766
767
768
769
770
771
772
773
774
def set_claude_session_id(self, chat_id: int, session_id: str | None) -> None:
    """Set the Claude session ID for resume capability.

    Args:
        chat_id: Telegram chat ID.
        session_id: Claude CLI session ID or None to clear.
    """
    session = self.get(chat_id)
    if session:
        session.claude_session_id = session_id
        self._persist_session(session)

set_cwd(chat_id, cwd)

Set the working directory for the active session.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
cwd str

New working directory.

required

Returns:

Type Description
Session

The updated session.

Source code in src/voice_agent/sessions/manager.py
447
448
449
450
451
452
453
454
455
456
457
458
459
460
def set_cwd(self, chat_id: int, cwd: str) -> Session:
    """Set the working directory for the active session.

    Args:
        chat_id: Telegram chat ID.
        cwd: New working directory.

    Returns:
        The updated session.
    """
    session = self.get_or_create(chat_id)
    session.cwd = cwd
    self._persist_session(session)
    return session

set_notify_callback(chat_id, callback)

Set the notification callback for a chat.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
callback Any

Async function to call for notifications.

required
Source code in src/voice_agent/sessions/manager.py
177
178
179
180
181
182
183
184
185
186
187
188
def set_notify_callback(self, chat_id: int, callback: Any) -> None:
    """Set the notification callback for a chat.

    Args:
        chat_id: Telegram chat ID.
        callback: Async function to call for notifications.
    """
    self._notify_callbacks[chat_id] = callback
    # Also update existing session's permission handler
    if chat_id in self.sessions:
        for session in self.sessions[chat_id].values():
            session.permission_handler.notify_callback = callback

switch_session(chat_id, name)

Switch to a different session.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
name str

Session name to switch to.

required

Returns:

Type Description
Session | None

The switched-to session, or None if not found.

Source code in src/voice_agent/sessions/manager.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
def switch_session(self, chat_id: int, name: str) -> Session | None:
    """Switch to a different session.

    Args:
        chat_id: Telegram chat ID.
        name: Session name to switch to.

    Returns:
        The switched-to session, or None if not found.
    """
    if chat_id not in self.sessions:
        return None

    if name not in self.sessions[chat_id]:
        return None

    self.active_sessions[chat_id] = name
    if self.storage:
        self.storage.set_active_session(chat_id, name)
    return self.sessions[chat_id][name]

SessionStorage

Persistent storage for session data.

Stores sessions in a JSON file with multi-session support per chat.

Attributes:

Name Type Description
path

Path to the JSON storage file.

Source code in src/voice_agent/sessions/storage.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
class SessionStorage:
    """Persistent storage for session data.

    Stores sessions in a JSON file with multi-session support per chat.

    Attributes:
        path: Path to the JSON storage file.
    """

    def __init__(self, path: str | Path = "sessions.json") -> None:
        """Initialize storage.

        Args:
            path: Path to the JSON storage file.
        """
        self.path = Path(path)
        self._data: dict[int, ChatStoredState] = {}
        self._load()

    def _is_old_format(self, data: dict[str, Any]) -> bool:
        """Check if data is in old single-session format.

        Old format has "cwd" at top level of session data.
        New format has "active_session" and "sessions" keys.
        """
        return "cwd" in data

    def _migrate_old_format(
        self, chat_id: int, old_data: dict[str, Any]
    ) -> ChatStoredState:
        """Migrate old single-session format to new multi-session format.

        Args:
            chat_id: Telegram chat ID.
            old_data: Old format session data with cwd at top level.

        Returns:
            ChatStoredState with single "main" session.
        """
        old_data["name"] = "main"
        session = StoredSession.from_dict(old_data)
        return ChatStoredState(
            active_session="main",
            sessions={"main": session},
        )

    def _load(self) -> None:
        """Load sessions from disk with automatic migration."""
        if not self.path.exists():
            return

        try:
            with open(self.path) as f:
                raw = json.load(f)
                needs_save = False
                for chat_id_str, chat_data in raw.items():
                    chat_id = int(chat_id_str)
                    if self._is_old_format(chat_data):
                        # Migrate old format
                        self._data[chat_id] = self._migrate_old_format(
                            chat_id, chat_data
                        )
                        needs_save = True
                    else:
                        self._data[chat_id] = ChatStoredState.from_dict(
                            chat_id, chat_data
                        )
                # Save migrated data
                if needs_save:
                    self._save()
        except (json.JSONDecodeError, KeyError, ValueError):
            # Corrupted file, start fresh
            self._data = {}

    def _save(self) -> None:
        """Save sessions to disk."""
        raw = {str(k): v.to_dict() for k, v in self._data.items()}
        with open(self.path, "w") as f:
            json.dump(raw, f, indent=2)

    def get_chat_state(self, chat_id: int) -> ChatStoredState | None:
        """Get stored state for a chat.

        Args:
            chat_id: Telegram chat ID.

        Returns:
            ChatStoredState or None if not found.
        """
        return self._data.get(chat_id)

    def get_session(self, chat_id: int, name: str) -> StoredSession | None:
        """Get a specific session for a chat.

        Args:
            chat_id: Telegram chat ID.
            name: Session name.

        Returns:
            StoredSession or None if not found.
        """
        state = self._data.get(chat_id)
        if state:
            return state.sessions.get(name)
        return None

    def get_active_session(self, chat_id: int) -> StoredSession | None:
        """Get the active session for a chat.

        Args:
            chat_id: Telegram chat ID.

        Returns:
            Active StoredSession or None if not found.
        """
        state = self._data.get(chat_id)
        if state:
            return state.sessions.get(state.active_session)
        return None

    def save_session(self, session: StoredSession) -> None:
        """Save a session.

        Args:
            session: Session to save.
        """
        chat_id = session.chat_id
        if chat_id not in self._data:
            self._data[chat_id] = ChatStoredState(
                active_session=session.name,
                sessions={},
            )
        self._data[chat_id].sessions[session.name] = session
        self._save()

    def set_active_session(self, chat_id: int, name: str) -> bool:
        """Set the active session for a chat.

        Args:
            chat_id: Telegram chat ID.
            name: Session name to make active.

        Returns:
            True if successful, False if session not found.
        """
        state = self._data.get(chat_id)
        if state and name in state.sessions:
            state.active_session = name
            self._save()
            return True
        return False

    def delete_session(self, chat_id: int, name: str) -> bool:
        """Delete a specific session.

        Args:
            chat_id: Telegram chat ID.
            name: Session name to delete.

        Returns:
            True if deleted, False if not found.
        """
        state = self._data.get(chat_id)
        if not state or name not in state.sessions:
            return False

        del state.sessions[name]

        # If we deleted the active session, switch to another or remove chat
        if state.active_session == name:
            if state.sessions:
                state.active_session = next(iter(state.sessions))
            else:
                del self._data[chat_id]

        self._save()
        return True

    def rename_session(self, chat_id: int, old_name: str, new_name: str) -> bool:
        """Rename a session.

        Args:
            chat_id: Telegram chat ID.
            old_name: Current session name.
            new_name: New session name.

        Returns:
            True if renamed, False if not found or name exists.
        """
        state = self._data.get(chat_id)
        if not state or old_name not in state.sessions:
            return False

        if new_name in state.sessions:
            return False

        session = state.sessions.pop(old_name)
        session.name = new_name
        state.sessions[new_name] = session

        if state.active_session == old_name:
            state.active_session = new_name

        self._save()
        return True

    def delete_chat(self, chat_id: int) -> bool:
        """Delete all sessions for a chat.

        Args:
            chat_id: Telegram chat ID.

        Returns:
            True if deleted, False if not found.
        """
        if chat_id in self._data:
            del self._data[chat_id]
            self._save()
            return True
        return False

    def list_sessions(self, chat_id: int) -> list[StoredSession]:
        """List all sessions for a chat.

        Args:
            chat_id: Telegram chat ID.

        Returns:
            List of stored sessions.
        """
        state = self._data.get(chat_id)
        if state:
            return list(state.sessions.values())
        return []

    def list_all_chats(self) -> list[int]:
        """List all chat IDs with sessions.

        Returns:
            List of chat IDs.
        """
        return list(self._data.keys())

    # Legacy compatibility methods

    def get(self, chat_id: int) -> StoredSession | None:
        """Get active session for a chat (legacy compatibility).

        Args:
            chat_id: Telegram chat ID.

        Returns:
            Active StoredSession or None if not found.
        """
        return self.get_active_session(chat_id)

    def save(self, session: StoredSession) -> None:
        """Save a session (legacy compatibility).

        Args:
            session: Session to save.
        """
        self.save_session(session)

    def delete(self, chat_id: int) -> None:
        """Delete all sessions for a chat (legacy compatibility).

        Args:
            chat_id: Telegram chat ID.
        """
        self.delete_chat(chat_id)

    def list_all(self) -> list[StoredSession]:
        """List all active sessions across all chats (legacy compatibility).

        Returns:
            List of active stored sessions.
        """
        result = []
        for chat_id in self._data:
            session = self.get_active_session(chat_id)
            if session:
                result.append(session)
        return result

__init__(path='sessions.json')

Initialize storage.

Parameters:

Name Type Description Default
path str | Path

Path to the JSON storage file.

'sessions.json'
Source code in src/voice_agent/sessions/storage.py
 96
 97
 98
 99
100
101
102
103
104
def __init__(self, path: str | Path = "sessions.json") -> None:
    """Initialize storage.

    Args:
        path: Path to the JSON storage file.
    """
    self.path = Path(path)
    self._data: dict[int, ChatStoredState] = {}
    self._load()

delete(chat_id)

Delete all sessions for a chat (legacy compatibility).

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
Source code in src/voice_agent/sessions/storage.py
351
352
353
354
355
356
357
def delete(self, chat_id: int) -> None:
    """Delete all sessions for a chat (legacy compatibility).

    Args:
        chat_id: Telegram chat ID.
    """
    self.delete_chat(chat_id)

delete_chat(chat_id)

Delete all sessions for a chat.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required

Returns:

Type Description
bool

True if deleted, False if not found.

Source code in src/voice_agent/sessions/storage.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
def delete_chat(self, chat_id: int) -> bool:
    """Delete all sessions for a chat.

    Args:
        chat_id: Telegram chat ID.

    Returns:
        True if deleted, False if not found.
    """
    if chat_id in self._data:
        del self._data[chat_id]
        self._save()
        return True
    return False

delete_session(chat_id, name)

Delete a specific session.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
name str

Session name to delete.

required

Returns:

Type Description
bool

True if deleted, False if not found.

Source code in src/voice_agent/sessions/storage.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
def delete_session(self, chat_id: int, name: str) -> bool:
    """Delete a specific session.

    Args:
        chat_id: Telegram chat ID.
        name: Session name to delete.

    Returns:
        True if deleted, False if not found.
    """
    state = self._data.get(chat_id)
    if not state or name not in state.sessions:
        return False

    del state.sessions[name]

    # If we deleted the active session, switch to another or remove chat
    if state.active_session == name:
        if state.sessions:
            state.active_session = next(iter(state.sessions))
        else:
            del self._data[chat_id]

    self._save()
    return True

get(chat_id)

Get active session for a chat (legacy compatibility).

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required

Returns:

Type Description
StoredSession | None

Active StoredSession or None if not found.

Source code in src/voice_agent/sessions/storage.py
332
333
334
335
336
337
338
339
340
341
def get(self, chat_id: int) -> StoredSession | None:
    """Get active session for a chat (legacy compatibility).

    Args:
        chat_id: Telegram chat ID.

    Returns:
        Active StoredSession or None if not found.
    """
    return self.get_active_session(chat_id)

get_active_session(chat_id)

Get the active session for a chat.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required

Returns:

Type Description
StoredSession | None

Active StoredSession or None if not found.

Source code in src/voice_agent/sessions/storage.py
193
194
195
196
197
198
199
200
201
202
203
204
205
def get_active_session(self, chat_id: int) -> StoredSession | None:
    """Get the active session for a chat.

    Args:
        chat_id: Telegram chat ID.

    Returns:
        Active StoredSession or None if not found.
    """
    state = self._data.get(chat_id)
    if state:
        return state.sessions.get(state.active_session)
    return None

get_chat_state(chat_id)

Get stored state for a chat.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required

Returns:

Type Description
ChatStoredState | None

ChatStoredState or None if not found.

Source code in src/voice_agent/sessions/storage.py
167
168
169
170
171
172
173
174
175
176
def get_chat_state(self, chat_id: int) -> ChatStoredState | None:
    """Get stored state for a chat.

    Args:
        chat_id: Telegram chat ID.

    Returns:
        ChatStoredState or None if not found.
    """
    return self._data.get(chat_id)

get_session(chat_id, name)

Get a specific session for a chat.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
name str

Session name.

required

Returns:

Type Description
StoredSession | None

StoredSession or None if not found.

Source code in src/voice_agent/sessions/storage.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def get_session(self, chat_id: int, name: str) -> StoredSession | None:
    """Get a specific session for a chat.

    Args:
        chat_id: Telegram chat ID.
        name: Session name.

    Returns:
        StoredSession or None if not found.
    """
    state = self._data.get(chat_id)
    if state:
        return state.sessions.get(name)
    return None

list_all()

List all active sessions across all chats (legacy compatibility).

Returns:

Type Description
list[StoredSession]

List of active stored sessions.

Source code in src/voice_agent/sessions/storage.py
359
360
361
362
363
364
365
366
367
368
369
370
def list_all(self) -> list[StoredSession]:
    """List all active sessions across all chats (legacy compatibility).

    Returns:
        List of active stored sessions.
    """
    result = []
    for chat_id in self._data:
        session = self.get_active_session(chat_id)
        if session:
            result.append(session)
    return result

list_all_chats()

List all chat IDs with sessions.

Returns:

Type Description
list[int]

List of chat IDs.

Source code in src/voice_agent/sessions/storage.py
322
323
324
325
326
327
328
def list_all_chats(self) -> list[int]:
    """List all chat IDs with sessions.

    Returns:
        List of chat IDs.
    """
    return list(self._data.keys())

list_sessions(chat_id)

List all sessions for a chat.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required

Returns:

Type Description
list[StoredSession]

List of stored sessions.

Source code in src/voice_agent/sessions/storage.py
308
309
310
311
312
313
314
315
316
317
318
319
320
def list_sessions(self, chat_id: int) -> list[StoredSession]:
    """List all sessions for a chat.

    Args:
        chat_id: Telegram chat ID.

    Returns:
        List of stored sessions.
    """
    state = self._data.get(chat_id)
    if state:
        return list(state.sessions.values())
    return []

rename_session(chat_id, old_name, new_name)

Rename a session.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
old_name str

Current session name.

required
new_name str

New session name.

required

Returns:

Type Description
bool

True if renamed, False if not found or name exists.

Source code in src/voice_agent/sessions/storage.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
def rename_session(self, chat_id: int, old_name: str, new_name: str) -> bool:
    """Rename a session.

    Args:
        chat_id: Telegram chat ID.
        old_name: Current session name.
        new_name: New session name.

    Returns:
        True if renamed, False if not found or name exists.
    """
    state = self._data.get(chat_id)
    if not state or old_name not in state.sessions:
        return False

    if new_name in state.sessions:
        return False

    session = state.sessions.pop(old_name)
    session.name = new_name
    state.sessions[new_name] = session

    if state.active_session == old_name:
        state.active_session = new_name

    self._save()
    return True

save(session)

Save a session (legacy compatibility).

Parameters:

Name Type Description Default
session StoredSession

Session to save.

required
Source code in src/voice_agent/sessions/storage.py
343
344
345
346
347
348
349
def save(self, session: StoredSession) -> None:
    """Save a session (legacy compatibility).

    Args:
        session: Session to save.
    """
    self.save_session(session)

save_session(session)

Save a session.

Parameters:

Name Type Description Default
session StoredSession

Session to save.

required
Source code in src/voice_agent/sessions/storage.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def save_session(self, session: StoredSession) -> None:
    """Save a session.

    Args:
        session: Session to save.
    """
    chat_id = session.chat_id
    if chat_id not in self._data:
        self._data[chat_id] = ChatStoredState(
            active_session=session.name,
            sessions={},
        )
    self._data[chat_id].sessions[session.name] = session
    self._save()

set_active_session(chat_id, name)

Set the active session for a chat.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
name str

Session name to make active.

required

Returns:

Type Description
bool

True if successful, False if session not found.

Source code in src/voice_agent/sessions/storage.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def set_active_session(self, chat_id: int, name: str) -> bool:
    """Set the active session for a chat.

    Args:
        chat_id: Telegram chat ID.
        name: Session name to make active.

    Returns:
        True if successful, False if session not found.
    """
    state = self._data.get(chat_id)
    if state and name in state.sessions:
        state.active_session = name
        self._save()
        return True
    return False

StickyApproval dataclass

A sticky approval rule that auto-approves matching tool calls.

Attributes:

Name Type Description
tool_name str

Name of the tool to auto-approve.

pattern str | None

Optional regex pattern to match against field value.

field_name str | None

Field to match pattern against ('command', 'file_path', etc).

Source code in src/voice_agent/sessions/permissions.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@dataclass
class StickyApproval:
    """A sticky approval rule that auto-approves matching tool calls.

    Attributes:
        tool_name: Name of the tool to auto-approve.
        pattern: Optional regex pattern to match against field value.
        field_name: Field to match pattern against ('command', 'file_path', etc).
    """

    tool_name: str
    pattern: str | None = None
    field_name: str | None = None

    def matches(self, tool_name: str, input_data: dict[str, Any]) -> bool:
        """Check if this sticky approval matches a tool call.

        Args:
            tool_name: Name of the tool being called.
            input_data: Input parameters for the tool.

        Returns:
            True if the tool call matches this sticky approval.
        """
        if tool_name != self.tool_name:
            return False

        # No pattern means match all calls to this tool
        if self.pattern is None:
            return True

        # Get field value to match against
        if not self.field_name:
            return True

        field_value = input_data.get(self.field_name, "")
        if not field_value:
            return False

        return bool(re.search(self.pattern, field_value))

    def describe(self) -> str:
        """Get a human-readable description of this approval.

        Returns:
            Description string.
        """
        if self.pattern:
            return f"{self.tool_name} matching '{self.pattern}'"
        return f"all {self.tool_name}"

describe()

Get a human-readable description of this approval.

Returns:

Type Description
str

Description string.

Source code in src/voice_agent/sessions/permissions.py
72
73
74
75
76
77
78
79
80
def describe(self) -> str:
    """Get a human-readable description of this approval.

    Returns:
        Description string.
    """
    if self.pattern:
        return f"{self.tool_name} matching '{self.pattern}'"
    return f"all {self.tool_name}"

matches(tool_name, input_data)

Check if this sticky approval matches a tool call.

Parameters:

Name Type Description Default
tool_name str

Name of the tool being called.

required
input_data dict[str, Any]

Input parameters for the tool.

required

Returns:

Type Description
bool

True if the tool call matches this sticky approval.

Source code in src/voice_agent/sessions/permissions.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def matches(self, tool_name: str, input_data: dict[str, Any]) -> bool:
    """Check if this sticky approval matches a tool call.

    Args:
        tool_name: Name of the tool being called.
        input_data: Input parameters for the tool.

    Returns:
        True if the tool call matches this sticky approval.
    """
    if tool_name != self.tool_name:
        return False

    # No pattern means match all calls to this tool
    if self.pattern is None:
        return True

    # Get field value to match against
    if not self.field_name:
        return True

    field_value = input_data.get(self.field_name, "")
    if not field_value:
        return False

    return bool(re.search(self.pattern, field_value))

StoredSession dataclass

Serializable session data for persistence.

Attributes:

Name Type Description
chat_id int

Telegram chat ID.

name str

Session name within the chat.

cwd str

Working directory.

created_at str

ISO format timestamp.

message_count int

Number of messages exchanged.

claude_session_id str | None

Claude CLI session ID for resume.

Source code in src/voice_agent/sessions/storage.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@dataclass
class StoredSession:
    """Serializable session data for persistence.

    Attributes:
        chat_id: Telegram chat ID.
        name: Session name within the chat.
        cwd: Working directory.
        created_at: ISO format timestamp.
        message_count: Number of messages exchanged.
        claude_session_id: Claude CLI session ID for resume.
    """

    chat_id: int
    name: str
    cwd: str
    created_at: str
    message_count: int
    claude_session_id: str | None = None

    def to_dict(self) -> dict[str, Any]:
        """Convert to dictionary for JSON serialization."""
        return asdict(self)

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "StoredSession":
        """Create from dictionary."""
        return cls(
            chat_id=data["chat_id"],
            name=data.get("name", "main"),
            cwd=data["cwd"],
            created_at=data["created_at"],
            message_count=data["message_count"],
            claude_session_id=data.get("claude_session_id"),
        )

from_dict(data) classmethod

Create from dictionary.

Source code in src/voice_agent/sessions/storage.py
36
37
38
39
40
41
42
43
44
45
46
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "StoredSession":
    """Create from dictionary."""
    return cls(
        chat_id=data["chat_id"],
        name=data.get("name", "main"),
        cwd=data["cwd"],
        created_at=data["created_at"],
        message_count=data["message_count"],
        claude_session_id=data.get("claude_session_id"),
    )

to_dict()

Convert to dictionary for JSON serialization.

Source code in src/voice_agent/sessions/storage.py
32
33
34
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary for JSON serialization."""
    return asdict(self)

voice_agent.sessions.manager

Session management for Claude Code SDK.

Manages Claude SDK client instances and their lifecycle.

Session dataclass

A Claude Code session.

Attributes:

Name Type Description
chat_id int

Telegram chat ID this session belongs to.

name str

Session name within the chat.

cwd str

Working directory for the session.

created_at datetime

When the session was created.

message_count int

Number of messages exchanged.

permission_handler PermissionHandler

Handler for permission requests.

sdk_client ClaudeSDKClient | None

Persistent ClaudeSDKClient instance.

claude_session_id str | None

Claude CLI session ID for resume.

Source code in src/voice_agent/sessions/manager.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@dataclass
class Session:
    """A Claude Code session.

    Attributes:
        chat_id: Telegram chat ID this session belongs to.
        name: Session name within the chat.
        cwd: Working directory for the session.
        created_at: When the session was created.
        message_count: Number of messages exchanged.
        permission_handler: Handler for permission requests.
        sdk_client: Persistent ClaudeSDKClient instance.
        claude_session_id: Claude CLI session ID for resume.
    """

    chat_id: int
    name: str
    cwd: str
    created_at: datetime = field(default_factory=datetime.now)
    message_count: int = 0
    permission_handler: PermissionHandler = field(default_factory=PermissionHandler)
    sdk_client: "ClaudeSDKClient | None" = None
    claude_session_id: str | None = None

    def get_status(self) -> str:
        """Get a human-readable status of this session.

        Returns:
            Status string.
        """
        age = datetime.now() - self.created_at
        hours, remainder = divmod(int(age.total_seconds()), 3600)
        minutes, _ = divmod(remainder, 60)

        status_parts = [
            f"Session: {self.name}",
            f"Working directory: {self.cwd}",
            f"Messages: {self.message_count}",
            f"Age: {hours}h {minutes}m",
        ]

        if self.permission_handler.has_pending():
            desc = self.permission_handler.get_pending_description()
            status_parts.append(f"Pending approval: {desc}")

        sticky_approvals = self.permission_handler.get_sticky_approvals()
        if sticky_approvals:
            status_parts.append(f"Sticky approvals ({len(sticky_approvals)}):")
            for approval in sticky_approvals:
                status_parts.append(f"  - {approval.describe()}")

        return "\n".join(status_parts)

get_status()

Get a human-readable status of this session.

Returns:

Type Description
str

Status string.

Source code in src/voice_agent/sessions/manager.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def get_status(self) -> str:
    """Get a human-readable status of this session.

    Returns:
        Status string.
    """
    age = datetime.now() - self.created_at
    hours, remainder = divmod(int(age.total_seconds()), 3600)
    minutes, _ = divmod(remainder, 60)

    status_parts = [
        f"Session: {self.name}",
        f"Working directory: {self.cwd}",
        f"Messages: {self.message_count}",
        f"Age: {hours}h {minutes}m",
    ]

    if self.permission_handler.has_pending():
        desc = self.permission_handler.get_pending_description()
        status_parts.append(f"Pending approval: {desc}")

    sticky_approvals = self.permission_handler.get_sticky_approvals()
    if sticky_approvals:
        status_parts.append(f"Sticky approvals ({len(sticky_approvals)}):")
        for approval in sticky_approvals:
            status_parts.append(f"  - {approval.describe()}")

    return "\n".join(status_parts)

SessionInfo dataclass

Summary info for a session, used in listings.

Attributes:

Name Type Description
name str

Session name.

message_count int

Number of messages exchanged.

cwd str

Working directory.

is_active bool

Whether this is the active session.

Source code in src/voice_agent/sessions/manager.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@dataclass
class SessionInfo:
    """Summary info for a session, used in listings.

    Attributes:
        name: Session name.
        message_count: Number of messages exchanged.
        cwd: Working directory.
        is_active: Whether this is the active session.
    """

    name: str
    message_count: int
    cwd: str
    is_active: bool

SessionManager

Manages Claude Code sessions per chat with multi-session support.

Attributes:

Name Type Description
sessions dict[int, dict[str, Session]]

Mapping of chat_id to dict of session_name to Session.

active_sessions dict[int, str]

Mapping of chat_id to active session name.

default_cwd

Default working directory for new sessions.

permission_timeout

Timeout for permission requests.

storage

Optional persistent storage for sessions.

Source code in src/voice_agent/sessions/manager.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
class SessionManager:
    """Manages Claude Code sessions per chat with multi-session support.

    Attributes:
        sessions: Mapping of chat_id to dict of session_name to Session.
        active_sessions: Mapping of chat_id to active session name.
        default_cwd: Default working directory for new sessions.
        permission_timeout: Timeout for permission requests.
        storage: Optional persistent storage for sessions.
    """

    def __init__(
        self,
        default_cwd: str = "/code",
        permission_timeout: int = 300,
        storage: "SessionStorage | None" = None,
    ) -> None:
        """Initialize the session manager.

        Args:
            default_cwd: Default working directory for new sessions.
            permission_timeout: Timeout in seconds for permission requests.
            storage: Optional storage for session persistence.
        """
        self.sessions: dict[int, dict[str, Session]] = {}
        self.active_sessions: dict[int, str] = {}
        self.default_cwd = default_cwd
        self.permission_timeout = permission_timeout
        self.storage = storage
        self._notify_callbacks: dict[int, Any] = {}
        self._restore_sessions()

    def _restore_sessions(self) -> None:
        """Restore sessions from storage."""
        if not self.storage:
            return

        for chat_id in self.storage.list_all_chats():
            state = self.storage.get_chat_state(chat_id)
            if not state:
                continue

            self.sessions[chat_id] = {}
            self.active_sessions[chat_id] = state.active_session

            for stored in state.sessions.values():
                try:
                    created_at = datetime.fromisoformat(stored.created_at)
                except ValueError:
                    created_at = datetime.now()

                session = Session(
                    chat_id=stored.chat_id,
                    name=stored.name,
                    cwd=stored.cwd,
                    created_at=created_at,
                    message_count=stored.message_count,
                    claude_session_id=stored.claude_session_id,
                    permission_handler=PermissionHandler(
                        timeout=self.permission_timeout,
                        notify_callback=self._notify_callbacks.get(stored.chat_id),
                    ),
                )
                self.sessions[chat_id][stored.name] = session

    def _persist_session(self, session: Session) -> None:
        """Persist a session to storage."""
        if not self.storage:
            return

        from voice_agent.sessions.storage import StoredSession

        stored = StoredSession(
            chat_id=session.chat_id,
            name=session.name,
            cwd=session.cwd,
            created_at=session.created_at.isoformat(),
            message_count=session.message_count,
            claude_session_id=session.claude_session_id,
        )
        self.storage.save(stored)

    def set_notify_callback(self, chat_id: int, callback: Any) -> None:
        """Set the notification callback for a chat.

        Args:
            chat_id: Telegram chat ID.
            callback: Async function to call for notifications.
        """
        self._notify_callbacks[chat_id] = callback
        # Also update existing session's permission handler
        if chat_id in self.sessions:
            for session in self.sessions[chat_id].values():
                session.permission_handler.notify_callback = callback

    def _get_active_session_name(self, chat_id: int) -> str:
        """Get the active session name for a chat.

        Args:
            chat_id: Telegram chat ID.

        Returns:
            Active session name, defaults to "main".
        """
        return self.active_sessions.get(chat_id, "main")

    def get_or_create(
        self, chat_id: int, cwd: str | None = None, name: str | None = None
    ) -> Session:
        """Get existing active session or create new one.

        Args:
            chat_id: Telegram chat ID.
            cwd: Working directory (uses default if not specified).
            name: Session name (uses active session if not specified).

        Returns:
            The session for this chat.
        """
        if chat_id not in self.sessions:
            self.sessions[chat_id] = {}
            self.active_sessions[chat_id] = "main"

        session_name = name or self._get_active_session_name(chat_id)

        if session_name not in self.sessions[chat_id]:
            effective_cwd = cwd or self.default_cwd
            session = Session(
                chat_id=chat_id,
                name=session_name,
                cwd=effective_cwd,
                permission_handler=PermissionHandler(
                    timeout=self.permission_timeout,
                    notify_callback=self._notify_callbacks.get(chat_id),
                ),
            )
            self.sessions[chat_id][session_name] = session
            self._persist_session(session)
            # If creating the active session name, ensure it's set
            if session_name == self._get_active_session_name(chat_id):
                self.active_sessions[chat_id] = session_name
                if self.storage:
                    self.storage.set_active_session(chat_id, session_name)

        return self.sessions[chat_id][session_name]

    async def create_new_async(
        self, chat_id: int, cwd: str | None = None, name: str | None = None
    ) -> Session:
        """Create a new session, replacing any existing one with same name.

        Args:
            chat_id: Telegram chat ID.
            cwd: Working directory.
            name: Session name (uses "main" if not specified).

        Returns:
            The new session.
        """
        if chat_id not in self.sessions:
            self.sessions[chat_id] = {}
            self.active_sessions[chat_id] = "main"

        session_name = name or "main"

        # Clean up old session if exists
        if session_name in self.sessions.get(chat_id, {}):
            old_session = self.sessions[chat_id][session_name]
            await self._close_client(old_session)

        effective_cwd = cwd or self.default_cwd
        session = Session(
            chat_id=chat_id,
            name=session_name,
            cwd=effective_cwd,
            permission_handler=PermissionHandler(
                timeout=self.permission_timeout,
                notify_callback=self._notify_callbacks.get(chat_id),
            ),
        )
        self.sessions[chat_id][session_name] = session
        self.active_sessions[chat_id] = session_name
        self._persist_session(session)
        if self.storage:
            self.storage.set_active_session(chat_id, session_name)
        return session

    def create_new(
        self, chat_id: int, cwd: str | None = None, name: str | None = None
    ) -> Session:
        """Create a new session synchronously (closes client in background).

        Args:
            chat_id: Telegram chat ID.
            cwd: Working directory.
            name: Session name (uses "main" if not specified).

        Returns:
            The new session.
        """
        if chat_id not in self.sessions:
            self.sessions[chat_id] = {}
            self.active_sessions[chat_id] = "main"

        session_name = name or "main"

        # Clean up old session if exists
        if session_name in self.sessions.get(chat_id, {}):
            old_session = self.sessions[chat_id][session_name]
            if old_session.sdk_client is not None:
                asyncio.create_task(self._close_client(old_session))

        effective_cwd = cwd or self.default_cwd
        session = Session(
            chat_id=chat_id,
            name=session_name,
            cwd=effective_cwd,
            permission_handler=PermissionHandler(
                timeout=self.permission_timeout,
                notify_callback=self._notify_callbacks.get(chat_id),
            ),
        )
        self.sessions[chat_id][session_name] = session
        self.active_sessions[chat_id] = session_name
        self._persist_session(session)
        if self.storage:
            self.storage.set_active_session(chat_id, session_name)
        return session

    def get(self, chat_id: int, name: str | None = None) -> Session | None:
        """Get session for a chat if it exists.

        Args:
            chat_id: Telegram chat ID.
            name: Session name (uses active session if not specified).

        Returns:
            Session or None.
        """
        if chat_id not in self.sessions:
            return None

        session_name = name or self._get_active_session_name(chat_id)
        return self.sessions[chat_id].get(session_name)

    def list_sessions(self, chat_id: int) -> list[SessionInfo]:
        """List all sessions for a chat.

        Args:
            chat_id: Telegram chat ID.

        Returns:
            List of SessionInfo objects.
        """
        if chat_id not in self.sessions:
            return []

        active = self._get_active_session_name(chat_id)
        return [
            SessionInfo(
                name=session.name,
                message_count=session.message_count,
                cwd=session.cwd,
                is_active=session.name == active,
            )
            for session in self.sessions[chat_id].values()
        ]

    def get_active_session_name(self, chat_id: int) -> str | None:
        """Get the name of the active session.

        Args:
            chat_id: Telegram chat ID.

        Returns:
            Active session name or None if no sessions.
        """
        if chat_id not in self.sessions:
            return None
        return self._get_active_session_name(chat_id)

    def switch_session(self, chat_id: int, name: str) -> Session | None:
        """Switch to a different session.

        Args:
            chat_id: Telegram chat ID.
            name: Session name to switch to.

        Returns:
            The switched-to session, or None if not found.
        """
        if chat_id not in self.sessions:
            return None

        if name not in self.sessions[chat_id]:
            return None

        self.active_sessions[chat_id] = name
        if self.storage:
            self.storage.set_active_session(chat_id, name)
        return self.sessions[chat_id][name]

    def generate_session_name(self, chat_id: int) -> str:
        """Generate a unique session name for a chat.

        Args:
            chat_id: Telegram chat ID.

        Returns:
            A unique session name like "session-2", "session-3", etc.
        """
        if chat_id not in self.sessions:
            return "session-2"

        existing = set(self.sessions[chat_id].keys())
        counter = 2
        while f"session-{counter}" in existing:
            counter += 1
        return f"session-{counter}"

    def rename_session(self, chat_id: int, old_name: str, new_name: str) -> bool:
        """Rename a session.

        Args:
            chat_id: Telegram chat ID.
            old_name: Current session name.
            new_name: New session name.

        Returns:
            True if renamed, False if not found or name already exists.
        """
        if chat_id not in self.sessions:
            return False

        if old_name not in self.sessions[chat_id]:
            return False

        if new_name in self.sessions[chat_id]:
            return False

        session = self.sessions[chat_id].pop(old_name)
        session.name = new_name
        self.sessions[chat_id][new_name] = session

        if self.active_sessions.get(chat_id) == old_name:
            self.active_sessions[chat_id] = new_name

        if self.storage:
            self.storage.rename_session(chat_id, old_name, new_name)

        return True

    def set_cwd(self, chat_id: int, cwd: str) -> Session:
        """Set the working directory for the active session.

        Args:
            chat_id: Telegram chat ID.
            cwd: New working directory.

        Returns:
            The updated session.
        """
        session = self.get_or_create(chat_id)
        session.cwd = cwd
        self._persist_session(session)
        return session

    async def _get_or_create_client(self, session: Session) -> "ClaudeSDKClient":
        """Get or create a ClaudeSDKClient for the session.

        Args:
            session: The session to get/create client for.

        Returns:
            ClaudeSDKClient instance.
        """
        if session.sdk_client is None:
            import shutil

            from claude_agent_sdk import (
                ClaudeAgentOptions,
                ClaudeSDKClient,
                PermissionResultAllow,
                PermissionResultDeny,
                ToolPermissionContext,
            )

            # Use system Claude CLI (2.0+) instead of bundled SDK version (1.3.5)
            # The SDK's bundled CLI is too old and lacks required features
            cli_path = shutil.which("claude")

            async def permission_callback(
                tool_name: str,
                input_data: dict[str, Any],
                context: ToolPermissionContext,
            ) -> PermissionResultAllow | PermissionResultDeny:
                """Handle tool permission requests via the session's handler."""
                (
                    approved,
                    deny_message,
                ) = await session.permission_handler.request_permission(
                    tool_name, input_data
                )
                if approved:
                    return PermissionResultAllow()
                return PermissionResultDeny(message=deny_message or "Permission denied")

            options = ClaudeAgentOptions(
                cwd=session.cwd,
                can_use_tool=permission_callback,
                cli_path=cli_path,
                # Load user, project, and local settings (CLAUDE.md, MCP servers, etc.)
                setting_sources=["user", "project", "local"],
                # Resume prior conversation if we have a stored session ID
                resume=session.claude_session_id,
            )
            session.sdk_client = ClaudeSDKClient(options=options)
            await session.sdk_client.__aenter__()
            logger.info(
                "Created new SDK client for chat %s session %s (CLI: %s)",
                session.chat_id,
                session.name,
                cli_path,
            )

        return session.sdk_client

    async def _close_client(self, session: Session) -> None:
        """Close the SDK client for a session.

        Args:
            session: The session whose client to close.
        """
        if session.sdk_client is not None:
            client = session.sdk_client
            session.sdk_client = None

            # The SDK client cannot be closed from a different async task than
            # where it was created. Instead of calling __aexit__, we directly
            # terminate the underlying subprocess to avoid spinning task groups.
            try:
                transport = getattr(client, "_transport", None)
                if transport is not None:
                    process = getattr(transport, "_process", None)
                    if process is not None:
                        process.terminate()
                        logger.info("Terminated SDK client subprocess")
            except Exception as e:
                logger.warning("Error terminating SDK client: %s", e)

    def _build_multimodal_message(
        self,
        prompt: str,
        images: list[ImageAttachment],
    ) -> dict[str, Any]:
        """Build a stream-json user message with image content blocks.

        Args:
            prompt: Text prompt to accompany the images.
            images: Image attachments to include.

        Returns:
            A message dict suitable for ``client.query()``.
        """
        content: list[dict[str, Any]] = [
            {
                "type": "image",
                "source": {
                    "type": "base64",
                    "media_type": img.media_type,
                    "data": img.data,
                },
            }
            for img in images
        ]
        content.append({"type": "text", "text": prompt})
        return {
            "type": "user",
            "message": {"role": "user", "content": content},
            "parent_tool_use_id": None,
            "session_id": "default",
        }

    async def send_prompt(
        self,
        chat_id: int,
        prompt: str,
        resume: bool = True,
        images: list[ImageAttachment] | None = None,
    ) -> AsyncIterator[str]:
        """Send a prompt to the active session and stream responses.

        Uses ClaudeSDKClient for persistent sessions - no token reload.

        Args:
            chat_id: Telegram chat ID.
            prompt: The prompt to send.
            resume: Whether to resume existing session if available.
            images: Optional image attachments for multimodal prompts.

        Yields:
            Response chunks from Claude.
        """
        from claude_agent_sdk import AssistantMessage, ResultMessage, TextBlock

        session = self.get_or_create(chat_id)
        session.message_count += 1

        try:
            client = await self._get_or_create_client(session)

            if images:
                user_msg = self._build_multimodal_message(prompt, images)

                async def _single_message() -> AsyncIterator[dict[str, Any]]:
                    yield user_msg

                await client.query(_single_message())
            else:
                await client.query(prompt)

            async for msg in client.receive_response():
                if isinstance(msg, AssistantMessage):
                    for block in msg.content:
                        if isinstance(block, TextBlock):
                            yield block.text
                elif isinstance(msg, ResultMessage):
                    if msg.session_id and msg.session_id != session.claude_session_id:
                        session.claude_session_id = msg.session_id
                        logger.info(
                            "Chat %s session %s claude_session_id: %s",
                            chat_id,
                            session.name,
                            msg.session_id,
                        )
                    if msg.total_cost_usd:
                        logger.info(
                            "Chat %s session %s cost: $%.4f",
                            chat_id,
                            session.name,
                            msg.total_cost_usd,
                        )

            # Persist updated session
            self._persist_session(session)

        except ImportError:
            yield "Error: claude-agent-sdk not installed."
        except Exception as e:
            logger.exception("Error in send_prompt for chat %s", chat_id)
            yield f"Error: {e}"
            # Close client on error so it can be recreated
            await self._close_client(session)

    def get_status(self, chat_id: int) -> str | None:
        """Get status of the active session.

        Args:
            chat_id: Telegram chat ID.

        Returns:
            Status string or None if no session.
        """
        session = self.get(chat_id)
        if not session:
            return None
        return session.get_status()

    async def close_session_async(self, chat_id: int, name: str) -> bool:
        """Close a specific session asynchronously.

        Args:
            chat_id: Telegram chat ID.
            name: Session name to close.

        Returns:
            True if session was closed, False if not found.
        """
        if chat_id not in self.sessions:
            return False

        if name not in self.sessions[chat_id]:
            return False

        session = self.sessions[chat_id][name]
        await self._close_client(session)

        del self.sessions[chat_id][name]

        if self.storage:
            self.storage.delete_session(chat_id, name)

        # Update active session if we closed the active one
        if self.active_sessions.get(chat_id) == name:
            if self.sessions[chat_id]:
                new_active = next(iter(self.sessions[chat_id]))
                self.active_sessions[chat_id] = new_active
                if self.storage:
                    self.storage.set_active_session(chat_id, new_active)
            else:
                del self.sessions[chat_id]
                del self.active_sessions[chat_id]

        return True

    def close_session(self, chat_id: int, name: str) -> bool:
        """Close a specific session synchronously.

        Args:
            chat_id: Telegram chat ID.
            name: Session name to close.

        Returns:
            True if session was closed, False if not found.
        """
        if chat_id not in self.sessions:
            return False

        if name not in self.sessions[chat_id]:
            return False

        session = self.sessions[chat_id][name]
        if session.sdk_client is not None:
            asyncio.create_task(self._close_client(session))

        del self.sessions[chat_id][name]

        if self.storage:
            self.storage.delete_session(chat_id, name)

        # Update active session if we closed the active one
        if self.active_sessions.get(chat_id) == name:
            if self.sessions[chat_id]:
                new_active = next(iter(self.sessions[chat_id]))
                self.active_sessions[chat_id] = new_active
                if self.storage:
                    self.storage.set_active_session(chat_id, new_active)
            else:
                del self.sessions[chat_id]
                del self.active_sessions[chat_id]

        return True

    # Legacy compatibility methods

    async def delete_session_async(self, chat_id: int) -> bool:
        """Delete the active session asynchronously (legacy compatibility).

        Args:
            chat_id: Telegram chat ID.

        Returns:
            True if session was deleted, False if not found.
        """
        name = self._get_active_session_name(chat_id)
        return await self.close_session_async(chat_id, name)

    def delete_session(self, chat_id: int) -> bool:
        """Delete the active session synchronously (legacy compatibility).

        Args:
            chat_id: Telegram chat ID.

        Returns:
            True if session was deleted, False if not found.
        """
        name = self._get_active_session_name(chat_id)
        return self.close_session(chat_id, name)

    def set_claude_session_id(self, chat_id: int, session_id: str | None) -> None:
        """Set the Claude session ID for resume capability.

        Args:
            chat_id: Telegram chat ID.
            session_id: Claude CLI session ID or None to clear.
        """
        session = self.get(chat_id)
        if session:
            session.claude_session_id = session_id
            self._persist_session(session)

    def has_resumable_session(self, chat_id: int) -> bool:
        """Check if a chat has a resumable Claude session.

        Args:
            chat_id: Telegram chat ID.

        Returns:
            True if there's a session with a Claude session ID.
        """
        session = self.get(chat_id)
        return session is not None and session.claude_session_id is not None

__init__(default_cwd='/code', permission_timeout=300, storage=None)

Initialize the session manager.

Parameters:

Name Type Description Default
default_cwd str

Default working directory for new sessions.

'/code'
permission_timeout int

Timeout in seconds for permission requests.

300
storage SessionStorage | None

Optional storage for session persistence.

None
Source code in src/voice_agent/sessions/manager.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def __init__(
    self,
    default_cwd: str = "/code",
    permission_timeout: int = 300,
    storage: "SessionStorage | None" = None,
) -> None:
    """Initialize the session manager.

    Args:
        default_cwd: Default working directory for new sessions.
        permission_timeout: Timeout in seconds for permission requests.
        storage: Optional storage for session persistence.
    """
    self.sessions: dict[int, dict[str, Session]] = {}
    self.active_sessions: dict[int, str] = {}
    self.default_cwd = default_cwd
    self.permission_timeout = permission_timeout
    self.storage = storage
    self._notify_callbacks: dict[int, Any] = {}
    self._restore_sessions()

close_session(chat_id, name)

Close a specific session synchronously.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
name str

Session name to close.

required

Returns:

Type Description
bool

True if session was closed, False if not found.

Source code in src/voice_agent/sessions/manager.py
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
def close_session(self, chat_id: int, name: str) -> bool:
    """Close a specific session synchronously.

    Args:
        chat_id: Telegram chat ID.
        name: Session name to close.

    Returns:
        True if session was closed, False if not found.
    """
    if chat_id not in self.sessions:
        return False

    if name not in self.sessions[chat_id]:
        return False

    session = self.sessions[chat_id][name]
    if session.sdk_client is not None:
        asyncio.create_task(self._close_client(session))

    del self.sessions[chat_id][name]

    if self.storage:
        self.storage.delete_session(chat_id, name)

    # Update active session if we closed the active one
    if self.active_sessions.get(chat_id) == name:
        if self.sessions[chat_id]:
            new_active = next(iter(self.sessions[chat_id]))
            self.active_sessions[chat_id] = new_active
            if self.storage:
                self.storage.set_active_session(chat_id, new_active)
        else:
            del self.sessions[chat_id]
            del self.active_sessions[chat_id]

    return True

close_session_async(chat_id, name) async

Close a specific session asynchronously.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
name str

Session name to close.

required

Returns:

Type Description
bool

True if session was closed, False if not found.

Source code in src/voice_agent/sessions/manager.py
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
async def close_session_async(self, chat_id: int, name: str) -> bool:
    """Close a specific session asynchronously.

    Args:
        chat_id: Telegram chat ID.
        name: Session name to close.

    Returns:
        True if session was closed, False if not found.
    """
    if chat_id not in self.sessions:
        return False

    if name not in self.sessions[chat_id]:
        return False

    session = self.sessions[chat_id][name]
    await self._close_client(session)

    del self.sessions[chat_id][name]

    if self.storage:
        self.storage.delete_session(chat_id, name)

    # Update active session if we closed the active one
    if self.active_sessions.get(chat_id) == name:
        if self.sessions[chat_id]:
            new_active = next(iter(self.sessions[chat_id]))
            self.active_sessions[chat_id] = new_active
            if self.storage:
                self.storage.set_active_session(chat_id, new_active)
        else:
            del self.sessions[chat_id]
            del self.active_sessions[chat_id]

    return True

create_new(chat_id, cwd=None, name=None)

Create a new session synchronously (closes client in background).

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
cwd str | None

Working directory.

None
name str | None

Session name (uses "main" if not specified).

None

Returns:

Type Description
Session

The new session.

Source code in src/voice_agent/sessions/manager.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
def create_new(
    self, chat_id: int, cwd: str | None = None, name: str | None = None
) -> Session:
    """Create a new session synchronously (closes client in background).

    Args:
        chat_id: Telegram chat ID.
        cwd: Working directory.
        name: Session name (uses "main" if not specified).

    Returns:
        The new session.
    """
    if chat_id not in self.sessions:
        self.sessions[chat_id] = {}
        self.active_sessions[chat_id] = "main"

    session_name = name or "main"

    # Clean up old session if exists
    if session_name in self.sessions.get(chat_id, {}):
        old_session = self.sessions[chat_id][session_name]
        if old_session.sdk_client is not None:
            asyncio.create_task(self._close_client(old_session))

    effective_cwd = cwd or self.default_cwd
    session = Session(
        chat_id=chat_id,
        name=session_name,
        cwd=effective_cwd,
        permission_handler=PermissionHandler(
            timeout=self.permission_timeout,
            notify_callback=self._notify_callbacks.get(chat_id),
        ),
    )
    self.sessions[chat_id][session_name] = session
    self.active_sessions[chat_id] = session_name
    self._persist_session(session)
    if self.storage:
        self.storage.set_active_session(chat_id, session_name)
    return session

create_new_async(chat_id, cwd=None, name=None) async

Create a new session, replacing any existing one with same name.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
cwd str | None

Working directory.

None
name str | None

Session name (uses "main" if not specified).

None

Returns:

Type Description
Session

The new session.

Source code in src/voice_agent/sessions/manager.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
async def create_new_async(
    self, chat_id: int, cwd: str | None = None, name: str | None = None
) -> Session:
    """Create a new session, replacing any existing one with same name.

    Args:
        chat_id: Telegram chat ID.
        cwd: Working directory.
        name: Session name (uses "main" if not specified).

    Returns:
        The new session.
    """
    if chat_id not in self.sessions:
        self.sessions[chat_id] = {}
        self.active_sessions[chat_id] = "main"

    session_name = name or "main"

    # Clean up old session if exists
    if session_name in self.sessions.get(chat_id, {}):
        old_session = self.sessions[chat_id][session_name]
        await self._close_client(old_session)

    effective_cwd = cwd or self.default_cwd
    session = Session(
        chat_id=chat_id,
        name=session_name,
        cwd=effective_cwd,
        permission_handler=PermissionHandler(
            timeout=self.permission_timeout,
            notify_callback=self._notify_callbacks.get(chat_id),
        ),
    )
    self.sessions[chat_id][session_name] = session
    self.active_sessions[chat_id] = session_name
    self._persist_session(session)
    if self.storage:
        self.storage.set_active_session(chat_id, session_name)
    return session

delete_session(chat_id)

Delete the active session synchronously (legacy compatibility).

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required

Returns:

Type Description
bool

True if session was deleted, False if not found.

Source code in src/voice_agent/sessions/manager.py
752
753
754
755
756
757
758
759
760
761
762
def delete_session(self, chat_id: int) -> bool:
    """Delete the active session synchronously (legacy compatibility).

    Args:
        chat_id: Telegram chat ID.

    Returns:
        True if session was deleted, False if not found.
    """
    name = self._get_active_session_name(chat_id)
    return self.close_session(chat_id, name)

delete_session_async(chat_id) async

Delete the active session asynchronously (legacy compatibility).

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required

Returns:

Type Description
bool

True if session was deleted, False if not found.

Source code in src/voice_agent/sessions/manager.py
740
741
742
743
744
745
746
747
748
749
750
async def delete_session_async(self, chat_id: int) -> bool:
    """Delete the active session asynchronously (legacy compatibility).

    Args:
        chat_id: Telegram chat ID.

    Returns:
        True if session was deleted, False if not found.
    """
    name = self._get_active_session_name(chat_id)
    return await self.close_session_async(chat_id, name)

generate_session_name(chat_id)

Generate a unique session name for a chat.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required

Returns:

Type Description
str

A unique session name like "session-2", "session-3", etc.

Source code in src/voice_agent/sessions/manager.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
def generate_session_name(self, chat_id: int) -> str:
    """Generate a unique session name for a chat.

    Args:
        chat_id: Telegram chat ID.

    Returns:
        A unique session name like "session-2", "session-3", etc.
    """
    if chat_id not in self.sessions:
        return "session-2"

    existing = set(self.sessions[chat_id].keys())
    counter = 2
    while f"session-{counter}" in existing:
        counter += 1
    return f"session-{counter}"

get(chat_id, name=None)

Get session for a chat if it exists.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
name str | None

Session name (uses active session if not specified).

None

Returns:

Type Description
Session | None

Session or None.

Source code in src/voice_agent/sessions/manager.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
def get(self, chat_id: int, name: str | None = None) -> Session | None:
    """Get session for a chat if it exists.

    Args:
        chat_id: Telegram chat ID.
        name: Session name (uses active session if not specified).

    Returns:
        Session or None.
    """
    if chat_id not in self.sessions:
        return None

    session_name = name or self._get_active_session_name(chat_id)
    return self.sessions[chat_id].get(session_name)

get_active_session_name(chat_id)

Get the name of the active session.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required

Returns:

Type Description
str | None

Active session name or None if no sessions.

Source code in src/voice_agent/sessions/manager.py
363
364
365
366
367
368
369
370
371
372
373
374
def get_active_session_name(self, chat_id: int) -> str | None:
    """Get the name of the active session.

    Args:
        chat_id: Telegram chat ID.

    Returns:
        Active session name or None if no sessions.
    """
    if chat_id not in self.sessions:
        return None
    return self._get_active_session_name(chat_id)

get_or_create(chat_id, cwd=None, name=None)

Get existing active session or create new one.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
cwd str | None

Working directory (uses default if not specified).

None
name str | None

Session name (uses active session if not specified).

None

Returns:

Type Description
Session

The session for this chat.

Source code in src/voice_agent/sessions/manager.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def get_or_create(
    self, chat_id: int, cwd: str | None = None, name: str | None = None
) -> Session:
    """Get existing active session or create new one.

    Args:
        chat_id: Telegram chat ID.
        cwd: Working directory (uses default if not specified).
        name: Session name (uses active session if not specified).

    Returns:
        The session for this chat.
    """
    if chat_id not in self.sessions:
        self.sessions[chat_id] = {}
        self.active_sessions[chat_id] = "main"

    session_name = name or self._get_active_session_name(chat_id)

    if session_name not in self.sessions[chat_id]:
        effective_cwd = cwd or self.default_cwd
        session = Session(
            chat_id=chat_id,
            name=session_name,
            cwd=effective_cwd,
            permission_handler=PermissionHandler(
                timeout=self.permission_timeout,
                notify_callback=self._notify_callbacks.get(chat_id),
            ),
        )
        self.sessions[chat_id][session_name] = session
        self._persist_session(session)
        # If creating the active session name, ensure it's set
        if session_name == self._get_active_session_name(chat_id):
            self.active_sessions[chat_id] = session_name
            if self.storage:
                self.storage.set_active_session(chat_id, session_name)

    return self.sessions[chat_id][session_name]

get_status(chat_id)

Get status of the active session.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required

Returns:

Type Description
str | None

Status string or None if no session.

Source code in src/voice_agent/sessions/manager.py
649
650
651
652
653
654
655
656
657
658
659
660
661
def get_status(self, chat_id: int) -> str | None:
    """Get status of the active session.

    Args:
        chat_id: Telegram chat ID.

    Returns:
        Status string or None if no session.
    """
    session = self.get(chat_id)
    if not session:
        return None
    return session.get_status()

has_resumable_session(chat_id)

Check if a chat has a resumable Claude session.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required

Returns:

Type Description
bool

True if there's a session with a Claude session ID.

Source code in src/voice_agent/sessions/manager.py
776
777
778
779
780
781
782
783
784
785
786
def has_resumable_session(self, chat_id: int) -> bool:
    """Check if a chat has a resumable Claude session.

    Args:
        chat_id: Telegram chat ID.

    Returns:
        True if there's a session with a Claude session ID.
    """
    session = self.get(chat_id)
    return session is not None and session.claude_session_id is not None

list_sessions(chat_id)

List all sessions for a chat.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required

Returns:

Type Description
list[SessionInfo]

List of SessionInfo objects.

Source code in src/voice_agent/sessions/manager.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
def list_sessions(self, chat_id: int) -> list[SessionInfo]:
    """List all sessions for a chat.

    Args:
        chat_id: Telegram chat ID.

    Returns:
        List of SessionInfo objects.
    """
    if chat_id not in self.sessions:
        return []

    active = self._get_active_session_name(chat_id)
    return [
        SessionInfo(
            name=session.name,
            message_count=session.message_count,
            cwd=session.cwd,
            is_active=session.name == active,
        )
        for session in self.sessions[chat_id].values()
    ]

rename_session(chat_id, old_name, new_name)

Rename a session.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
old_name str

Current session name.

required
new_name str

New session name.

required

Returns:

Type Description
bool

True if renamed, False if not found or name already exists.

Source code in src/voice_agent/sessions/manager.py
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
def rename_session(self, chat_id: int, old_name: str, new_name: str) -> bool:
    """Rename a session.

    Args:
        chat_id: Telegram chat ID.
        old_name: Current session name.
        new_name: New session name.

    Returns:
        True if renamed, False if not found or name already exists.
    """
    if chat_id not in self.sessions:
        return False

    if old_name not in self.sessions[chat_id]:
        return False

    if new_name in self.sessions[chat_id]:
        return False

    session = self.sessions[chat_id].pop(old_name)
    session.name = new_name
    self.sessions[chat_id][new_name] = session

    if self.active_sessions.get(chat_id) == old_name:
        self.active_sessions[chat_id] = new_name

    if self.storage:
        self.storage.rename_session(chat_id, old_name, new_name)

    return True

send_prompt(chat_id, prompt, resume=True, images=None) async

Send a prompt to the active session and stream responses.

Uses ClaudeSDKClient for persistent sessions - no token reload.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
prompt str

The prompt to send.

required
resume bool

Whether to resume existing session if available.

True
images list[ImageAttachment] | None

Optional image attachments for multimodal prompts.

None

Yields:

Type Description
AsyncIterator[str]

Response chunks from Claude.

Source code in src/voice_agent/sessions/manager.py
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
async def send_prompt(
    self,
    chat_id: int,
    prompt: str,
    resume: bool = True,
    images: list[ImageAttachment] | None = None,
) -> AsyncIterator[str]:
    """Send a prompt to the active session and stream responses.

    Uses ClaudeSDKClient for persistent sessions - no token reload.

    Args:
        chat_id: Telegram chat ID.
        prompt: The prompt to send.
        resume: Whether to resume existing session if available.
        images: Optional image attachments for multimodal prompts.

    Yields:
        Response chunks from Claude.
    """
    from claude_agent_sdk import AssistantMessage, ResultMessage, TextBlock

    session = self.get_or_create(chat_id)
    session.message_count += 1

    try:
        client = await self._get_or_create_client(session)

        if images:
            user_msg = self._build_multimodal_message(prompt, images)

            async def _single_message() -> AsyncIterator[dict[str, Any]]:
                yield user_msg

            await client.query(_single_message())
        else:
            await client.query(prompt)

        async for msg in client.receive_response():
            if isinstance(msg, AssistantMessage):
                for block in msg.content:
                    if isinstance(block, TextBlock):
                        yield block.text
            elif isinstance(msg, ResultMessage):
                if msg.session_id and msg.session_id != session.claude_session_id:
                    session.claude_session_id = msg.session_id
                    logger.info(
                        "Chat %s session %s claude_session_id: %s",
                        chat_id,
                        session.name,
                        msg.session_id,
                    )
                if msg.total_cost_usd:
                    logger.info(
                        "Chat %s session %s cost: $%.4f",
                        chat_id,
                        session.name,
                        msg.total_cost_usd,
                    )

        # Persist updated session
        self._persist_session(session)

    except ImportError:
        yield "Error: claude-agent-sdk not installed."
    except Exception as e:
        logger.exception("Error in send_prompt for chat %s", chat_id)
        yield f"Error: {e}"
        # Close client on error so it can be recreated
        await self._close_client(session)

set_claude_session_id(chat_id, session_id)

Set the Claude session ID for resume capability.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
session_id str | None

Claude CLI session ID or None to clear.

required
Source code in src/voice_agent/sessions/manager.py
764
765
766
767
768
769
770
771
772
773
774
def set_claude_session_id(self, chat_id: int, session_id: str | None) -> None:
    """Set the Claude session ID for resume capability.

    Args:
        chat_id: Telegram chat ID.
        session_id: Claude CLI session ID or None to clear.
    """
    session = self.get(chat_id)
    if session:
        session.claude_session_id = session_id
        self._persist_session(session)

set_cwd(chat_id, cwd)

Set the working directory for the active session.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
cwd str

New working directory.

required

Returns:

Type Description
Session

The updated session.

Source code in src/voice_agent/sessions/manager.py
447
448
449
450
451
452
453
454
455
456
457
458
459
460
def set_cwd(self, chat_id: int, cwd: str) -> Session:
    """Set the working directory for the active session.

    Args:
        chat_id: Telegram chat ID.
        cwd: New working directory.

    Returns:
        The updated session.
    """
    session = self.get_or_create(chat_id)
    session.cwd = cwd
    self._persist_session(session)
    return session

set_notify_callback(chat_id, callback)

Set the notification callback for a chat.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
callback Any

Async function to call for notifications.

required
Source code in src/voice_agent/sessions/manager.py
177
178
179
180
181
182
183
184
185
186
187
188
def set_notify_callback(self, chat_id: int, callback: Any) -> None:
    """Set the notification callback for a chat.

    Args:
        chat_id: Telegram chat ID.
        callback: Async function to call for notifications.
    """
    self._notify_callbacks[chat_id] = callback
    # Also update existing session's permission handler
    if chat_id in self.sessions:
        for session in self.sessions[chat_id].values():
            session.permission_handler.notify_callback = callback

switch_session(chat_id, name)

Switch to a different session.

Parameters:

Name Type Description Default
chat_id int

Telegram chat ID.

required
name str

Session name to switch to.

required

Returns:

Type Description
Session | None

The switched-to session, or None if not found.

Source code in src/voice_agent/sessions/manager.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
def switch_session(self, chat_id: int, name: str) -> Session | None:
    """Switch to a different session.

    Args:
        chat_id: Telegram chat ID.
        name: Session name to switch to.

    Returns:
        The switched-to session, or None if not found.
    """
    if chat_id not in self.sessions:
        return None

    if name not in self.sessions[chat_id]:
        return None

    self.active_sessions[chat_id] = name
    if self.storage:
        self.storage.set_active_session(chat_id, name)
    return self.sessions[chat_id][name]

voice_agent.sessions.permissions

Permission handling for Claude SDK tool calls.

Manages the canUseTool callback flow for requesting user approval.

PendingPermission dataclass

A permission request waiting for user approval.

Attributes:

Name Type Description
tool_name str

Name of the tool requesting permission.

input_data dict[str, Any]

Input parameters for the tool.

event Event

Event to signal when user responds.

state PermissionState

Current state of the permission.

deny_message str | None

Optional message explaining denial.

Source code in src/voice_agent/sessions/permissions.py
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@dataclass
class PendingPermission:
    """A permission request waiting for user approval.

    Attributes:
        tool_name: Name of the tool requesting permission.
        input_data: Input parameters for the tool.
        event: Event to signal when user responds.
        state: Current state of the permission.
        deny_message: Optional message explaining denial.
    """

    tool_name: str
    input_data: dict[str, Any]
    event: asyncio.Event = field(default_factory=asyncio.Event)
    state: PermissionState = PermissionState.PENDING
    deny_message: str | None = None

PermissionHandler

Handles permission requests for Claude SDK tool calls.

Attributes:

Name Type Description
pending PendingPermission | None

Current pending permission, if any.

timeout

Seconds to wait for user approval.

notify_callback

Async callback to notify user of permission request.

sticky_approvals list[StickyApproval]

List of sticky approval rules for auto-approving.

Source code in src/voice_agent/sessions/permissions.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
class PermissionHandler:
    """Handles permission requests for Claude SDK tool calls.

    Attributes:
        pending: Current pending permission, if any.
        timeout: Seconds to wait for user approval.
        notify_callback: Async callback to notify user of permission request.
        sticky_approvals: List of sticky approval rules for auto-approving.
    """

    def __init__(
        self,
        timeout: int = 300,
        notify_callback: Callable[[str, dict[str, Any]], Coroutine[Any, Any, None]]
        | None = None,
    ) -> None:
        """Initialize the permission handler.

        Args:
            timeout: Seconds to wait for user approval.
            notify_callback: Async function to notify user of permission request.
        """
        self.pending: PendingPermission | None = None
        self.timeout = timeout
        self.notify_callback = notify_callback
        self.sticky_approvals: list[StickyApproval] = []

    def has_pending(self) -> bool:
        """Check if there's a pending permission request.

        Returns:
            True if a permission is pending.
        """
        return (
            self.pending is not None and self.pending.state == PermissionState.PENDING
        )

    def get_pending_description(self) -> str | None:
        """Get a human-readable description of the pending permission.

        Returns:
            Description string or None if no pending permission.
        """
        if not self.pending:
            return None

        tool = self.pending.tool_name
        inputs = self.pending.input_data

        if tool == "Bash":
            return f"Run command: {inputs.get('command', 'unknown')}"
        if tool == "Write":
            return f"Write file: {inputs.get('file_path', 'unknown')}"
        if tool == "Edit":
            return f"Edit file: {inputs.get('file_path', 'unknown')}"

        return f"Use tool: {tool}"

    def _check_sticky_approval(
        self, tool_name: str, input_data: dict[str, Any]
    ) -> bool:
        """Check if a tool call matches any sticky approval.

        Args:
            tool_name: Name of the tool.
            input_data: Input parameters for the tool.

        Returns:
            True if the tool call matches a sticky approval.
        """
        return any(
            approval.matches(tool_name, input_data)
            for approval in self.sticky_approvals
        )

    async def request_permission(
        self, tool_name: str, input_data: dict[str, Any]
    ) -> tuple[bool, str | None]:
        """Request permission for a tool call.

        Auto-approves safe tools and sticky approvals, queues others for user.

        Args:
            tool_name: Name of the tool.
            input_data: Input parameters for the tool.

        Returns:
            Tuple of (approved, deny_message).
        """
        # Auto-approve safe tools
        if is_safe_tool_call(tool_name, input_data):
            return True, None

        # Check sticky approvals
        if self._check_sticky_approval(tool_name, input_data):
            return True, None

        # Create pending permission
        self.pending = PendingPermission(tool_name=tool_name, input_data=input_data)

        # Notify user if callback provided
        if self.notify_callback:
            await self.notify_callback(tool_name, input_data)

        # Wait for user response with timeout
        try:
            await asyncio.wait_for(self.pending.event.wait(), timeout=self.timeout)
            approved = self.pending.state == PermissionState.APPROVED
            message = self.pending.deny_message
            self.pending = None
            return approved, message
        except asyncio.TimeoutError:
            self.pending.state = PermissionState.TIMEOUT
            self.pending = None
            return False, "Permission request timed out"

    def approve(self) -> bool:
        """Approve the pending permission.

        Returns:
            True if there was a pending permission to approve.
        """
        if not self.pending or self.pending.state != PermissionState.PENDING:
            return False
        self.pending.state = PermissionState.APPROVED
        self.pending.event.set()
        return True

    def deny(self, message: str | None = None) -> bool:
        """Deny the pending permission.

        Args:
            message: Optional message explaining denial.

        Returns:
            True if there was a pending permission to deny.
        """
        if not self.pending or self.pending.state != PermissionState.PENDING:
            return False
        self.pending.state = PermissionState.DENIED
        self.pending.deny_message = message or "User rejected"
        self.pending.event.set()
        return True

    def sticky_approve(self) -> StickyApproval | None:
        """Approve pending permission and create sticky rule for similar calls.

        Creates a sticky approval rule based on the current pending permission,
        then approves it.

        Returns:
            The created StickyApproval or None if no pending permission.
        """
        if not self.pending or self.pending.state != PermissionState.PENDING:
            return None

        tool_name = self.pending.tool_name
        field_name = TOOL_FIELD_NAMES.get(tool_name)

        # Create sticky approval for all calls to this tool
        sticky = StickyApproval(
            tool_name=tool_name,
            pattern=None,
            field_name=field_name,
        )
        self.sticky_approvals.append(sticky)

        # Approve the current request
        self.pending.state = PermissionState.APPROVED
        self.pending.event.set()

        return sticky

    def get_sticky_approvals(self) -> list[StickyApproval]:
        """Get all active sticky approvals.

        Returns:
            List of sticky approval rules.
        """
        return list(self.sticky_approvals)

    def clear_sticky_approvals(self) -> int:
        """Clear all sticky approvals.

        Returns:
            Number of approvals cleared.
        """
        count = len(self.sticky_approvals)
        self.sticky_approvals.clear()
        return count

    def remove_sticky_approval(self, index: int) -> StickyApproval | None:
        """Remove a sticky approval by index.

        Args:
            index: Index of the approval to remove (0-based).

        Returns:
            The removed StickyApproval or None if index invalid.
        """
        if 0 <= index < len(self.sticky_approvals):
            return self.sticky_approvals.pop(index)
        return None

__init__(timeout=300, notify_callback=None)

Initialize the permission handler.

Parameters:

Name Type Description Default
timeout int

Seconds to wait for user approval.

300
notify_callback Callable[[str, dict[str, Any]], Coroutine[Any, Any, None]] | None

Async function to notify user of permission request.

None
Source code in src/voice_agent/sessions/permissions.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def __init__(
    self,
    timeout: int = 300,
    notify_callback: Callable[[str, dict[str, Any]], Coroutine[Any, Any, None]]
    | None = None,
) -> None:
    """Initialize the permission handler.

    Args:
        timeout: Seconds to wait for user approval.
        notify_callback: Async function to notify user of permission request.
    """
    self.pending: PendingPermission | None = None
    self.timeout = timeout
    self.notify_callback = notify_callback
    self.sticky_approvals: list[StickyApproval] = []

approve()

Approve the pending permission.

Returns:

Type Description
bool

True if there was a pending permission to approve.

Source code in src/voice_agent/sessions/permissions.py
276
277
278
279
280
281
282
283
284
285
286
def approve(self) -> bool:
    """Approve the pending permission.

    Returns:
        True if there was a pending permission to approve.
    """
    if not self.pending or self.pending.state != PermissionState.PENDING:
        return False
    self.pending.state = PermissionState.APPROVED
    self.pending.event.set()
    return True

clear_sticky_approvals()

Clear all sticky approvals.

Returns:

Type Description
int

Number of approvals cleared.

Source code in src/voice_agent/sessions/permissions.py
341
342
343
344
345
346
347
348
349
def clear_sticky_approvals(self) -> int:
    """Clear all sticky approvals.

    Returns:
        Number of approvals cleared.
    """
    count = len(self.sticky_approvals)
    self.sticky_approvals.clear()
    return count

deny(message=None)

Deny the pending permission.

Parameters:

Name Type Description Default
message str | None

Optional message explaining denial.

None

Returns:

Type Description
bool

True if there was a pending permission to deny.

Source code in src/voice_agent/sessions/permissions.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def deny(self, message: str | None = None) -> bool:
    """Deny the pending permission.

    Args:
        message: Optional message explaining denial.

    Returns:
        True if there was a pending permission to deny.
    """
    if not self.pending or self.pending.state != PermissionState.PENDING:
        return False
    self.pending.state = PermissionState.DENIED
    self.pending.deny_message = message or "User rejected"
    self.pending.event.set()
    return True

get_pending_description()

Get a human-readable description of the pending permission.

Returns:

Type Description
str | None

Description string or None if no pending permission.

Source code in src/voice_agent/sessions/permissions.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def get_pending_description(self) -> str | None:
    """Get a human-readable description of the pending permission.

    Returns:
        Description string or None if no pending permission.
    """
    if not self.pending:
        return None

    tool = self.pending.tool_name
    inputs = self.pending.input_data

    if tool == "Bash":
        return f"Run command: {inputs.get('command', 'unknown')}"
    if tool == "Write":
        return f"Write file: {inputs.get('file_path', 'unknown')}"
    if tool == "Edit":
        return f"Edit file: {inputs.get('file_path', 'unknown')}"

    return f"Use tool: {tool}"

get_sticky_approvals()

Get all active sticky approvals.

Returns:

Type Description
list[StickyApproval]

List of sticky approval rules.

Source code in src/voice_agent/sessions/permissions.py
333
334
335
336
337
338
339
def get_sticky_approvals(self) -> list[StickyApproval]:
    """Get all active sticky approvals.

    Returns:
        List of sticky approval rules.
    """
    return list(self.sticky_approvals)

has_pending()

Check if there's a pending permission request.

Returns:

Type Description
bool

True if a permission is pending.

Source code in src/voice_agent/sessions/permissions.py
187
188
189
190
191
192
193
194
195
def has_pending(self) -> bool:
    """Check if there's a pending permission request.

    Returns:
        True if a permission is pending.
    """
    return (
        self.pending is not None and self.pending.state == PermissionState.PENDING
    )

remove_sticky_approval(index)

Remove a sticky approval by index.

Parameters:

Name Type Description Default
index int

Index of the approval to remove (0-based).

required

Returns:

Type Description
StickyApproval | None

The removed StickyApproval or None if index invalid.

Source code in src/voice_agent/sessions/permissions.py
351
352
353
354
355
356
357
358
359
360
361
362
def remove_sticky_approval(self, index: int) -> StickyApproval | None:
    """Remove a sticky approval by index.

    Args:
        index: Index of the approval to remove (0-based).

    Returns:
        The removed StickyApproval or None if index invalid.
    """
    if 0 <= index < len(self.sticky_approvals):
        return self.sticky_approvals.pop(index)
    return None

request_permission(tool_name, input_data) async

Request permission for a tool call.

Auto-approves safe tools and sticky approvals, queues others for user.

Parameters:

Name Type Description Default
tool_name str

Name of the tool.

required
input_data dict[str, Any]

Input parameters for the tool.

required

Returns:

Type Description
tuple[bool, str | None]

Tuple of (approved, deny_message).

Source code in src/voice_agent/sessions/permissions.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
async def request_permission(
    self, tool_name: str, input_data: dict[str, Any]
) -> tuple[bool, str | None]:
    """Request permission for a tool call.

    Auto-approves safe tools and sticky approvals, queues others for user.

    Args:
        tool_name: Name of the tool.
        input_data: Input parameters for the tool.

    Returns:
        Tuple of (approved, deny_message).
    """
    # Auto-approve safe tools
    if is_safe_tool_call(tool_name, input_data):
        return True, None

    # Check sticky approvals
    if self._check_sticky_approval(tool_name, input_data):
        return True, None

    # Create pending permission
    self.pending = PendingPermission(tool_name=tool_name, input_data=input_data)

    # Notify user if callback provided
    if self.notify_callback:
        await self.notify_callback(tool_name, input_data)

    # Wait for user response with timeout
    try:
        await asyncio.wait_for(self.pending.event.wait(), timeout=self.timeout)
        approved = self.pending.state == PermissionState.APPROVED
        message = self.pending.deny_message
        self.pending = None
        return approved, message
    except asyncio.TimeoutError:
        self.pending.state = PermissionState.TIMEOUT
        self.pending = None
        return False, "Permission request timed out"

sticky_approve()

Approve pending permission and create sticky rule for similar calls.

Creates a sticky approval rule based on the current pending permission, then approves it.

Returns:

Type Description
StickyApproval | None

The created StickyApproval or None if no pending permission.

Source code in src/voice_agent/sessions/permissions.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
def sticky_approve(self) -> StickyApproval | None:
    """Approve pending permission and create sticky rule for similar calls.

    Creates a sticky approval rule based on the current pending permission,
    then approves it.

    Returns:
        The created StickyApproval or None if no pending permission.
    """
    if not self.pending or self.pending.state != PermissionState.PENDING:
        return None

    tool_name = self.pending.tool_name
    field_name = TOOL_FIELD_NAMES.get(tool_name)

    # Create sticky approval for all calls to this tool
    sticky = StickyApproval(
        tool_name=tool_name,
        pattern=None,
        field_name=field_name,
    )
    self.sticky_approvals.append(sticky)

    # Approve the current request
    self.pending.state = PermissionState.APPROVED
    self.pending.event.set()

    return sticky

PermissionState

Bases: Enum

State of a pending permission request.

Source code in src/voice_agent/sessions/permissions.py
13
14
15
16
17
18
19
class PermissionState(Enum):
    """State of a pending permission request."""

    PENDING = auto()
    APPROVED = auto()
    DENIED = auto()
    TIMEOUT = auto()

StickyApproval dataclass

A sticky approval rule that auto-approves matching tool calls.

Attributes:

Name Type Description
tool_name str

Name of the tool to auto-approve.

pattern str | None

Optional regex pattern to match against field value.

field_name str | None

Field to match pattern against ('command', 'file_path', etc).

Source code in src/voice_agent/sessions/permissions.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@dataclass
class StickyApproval:
    """A sticky approval rule that auto-approves matching tool calls.

    Attributes:
        tool_name: Name of the tool to auto-approve.
        pattern: Optional regex pattern to match against field value.
        field_name: Field to match pattern against ('command', 'file_path', etc).
    """

    tool_name: str
    pattern: str | None = None
    field_name: str | None = None

    def matches(self, tool_name: str, input_data: dict[str, Any]) -> bool:
        """Check if this sticky approval matches a tool call.

        Args:
            tool_name: Name of the tool being called.
            input_data: Input parameters for the tool.

        Returns:
            True if the tool call matches this sticky approval.
        """
        if tool_name != self.tool_name:
            return False

        # No pattern means match all calls to this tool
        if self.pattern is None:
            return True

        # Get field value to match against
        if not self.field_name:
            return True

        field_value = input_data.get(self.field_name, "")
        if not field_value:
            return False

        return bool(re.search(self.pattern, field_value))

    def describe(self) -> str:
        """Get a human-readable description of this approval.

        Returns:
            Description string.
        """
        if self.pattern:
            return f"{self.tool_name} matching '{self.pattern}'"
        return f"all {self.tool_name}"

describe()

Get a human-readable description of this approval.

Returns:

Type Description
str

Description string.

Source code in src/voice_agent/sessions/permissions.py
72
73
74
75
76
77
78
79
80
def describe(self) -> str:
    """Get a human-readable description of this approval.

    Returns:
        Description string.
    """
    if self.pattern:
        return f"{self.tool_name} matching '{self.pattern}'"
    return f"all {self.tool_name}"

matches(tool_name, input_data)

Check if this sticky approval matches a tool call.

Parameters:

Name Type Description Default
tool_name str

Name of the tool being called.

required
input_data dict[str, Any]

Input parameters for the tool.

required

Returns:

Type Description
bool

True if the tool call matches this sticky approval.

Source code in src/voice_agent/sessions/permissions.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def matches(self, tool_name: str, input_data: dict[str, Any]) -> bool:
    """Check if this sticky approval matches a tool call.

    Args:
        tool_name: Name of the tool being called.
        input_data: Input parameters for the tool.

    Returns:
        True if the tool call matches this sticky approval.
    """
    if tool_name != self.tool_name:
        return False

    # No pattern means match all calls to this tool
    if self.pattern is None:
        return True

    # Get field value to match against
    if not self.field_name:
        return True

    field_value = input_data.get(self.field_name, "")
    if not field_value:
        return False

    return bool(re.search(self.pattern, field_value))

is_safe_bash_command(command)

Check if a bash command is safe to auto-approve.

Parameters:

Name Type Description Default
command str

The bash command string.

required

Returns:

Type Description
bool

True if the command is safe (read-only).

Source code in src/voice_agent/sessions/permissions.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def is_safe_bash_command(command: str) -> bool:
    """Check if a bash command is safe to auto-approve.

    Args:
        command: The bash command string.

    Returns:
        True if the command is safe (read-only).
    """
    command = command.strip()
    for pattern in SAFE_BASH_PATTERNS:
        if command.startswith(pattern):
            return True
    return False

is_safe_tool_call(tool_name, input_data)

Check if a tool call is safe to auto-approve.

Parameters:

Name Type Description Default
tool_name str

Name of the tool.

required
input_data dict[str, Any]

Input parameters for the tool.

required

Returns:

Type Description
bool

True if the tool call is safe.

Source code in src/voice_agent/sessions/permissions.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def is_safe_tool_call(tool_name: str, input_data: dict[str, Any]) -> bool:
    """Check if a tool call is safe to auto-approve.

    Args:
        tool_name: Name of the tool.
        input_data: Input parameters for the tool.

    Returns:
        True if the tool call is safe.
    """
    if tool_name in SAFE_TOOLS:
        return True

    if tool_name == "Bash":
        command = input_data.get("command", "")
        return is_safe_bash_command(command)

    return False