Spaces:
Running
Running
| """REST API routes.""" | |
| import json | |
| from datetime import datetime | |
| from typing import Optional | |
| from fastapi import APIRouter, HTTPException, BackgroundTasks | |
| from fastapi.responses import JSONResponse | |
| from coderag.api.schemas import ( | |
| IndexRepositoryRequest, | |
| IndexRepositoryResponse, | |
| QueryRequest, | |
| QueryResponse, | |
| ListRepositoriesResponse, | |
| RepositoryInfo, | |
| CitationResponse, | |
| RetrievedChunkResponse, | |
| ErrorResponse, | |
| ) | |
| from coderag.config import get_settings | |
| from coderag.generation.generator import ResponseGenerator | |
| from coderag.indexing.embeddings import EmbeddingGenerator | |
| from coderag.indexing.vectorstore import VectorStore | |
| from coderag.ingestion.chunker import CodeChunker | |
| from coderag.ingestion.filter import FileFilter | |
| from coderag.ingestion.loader import RepositoryLoader | |
| from coderag.ingestion.validator import GitHubURLValidator, ValidationError | |
| from coderag.logging import get_logger | |
| from coderag.models.document import Document | |
| from coderag.models.query import Query as QueryModel | |
| from coderag.models.repository import Repository, RepositoryStatus | |
| logger = get_logger(__name__) | |
| router = APIRouter() | |
| # Global state (in production, use a proper database) | |
| settings = get_settings() | |
| repos_file = settings.data_dir / "repositories.json" | |
| repositories: dict[str, Repository] = {} | |
| def load_repositories() -> None: | |
| """Load repositories from disk.""" | |
| global repositories | |
| if repos_file.exists(): | |
| try: | |
| data = json.loads(repos_file.read_text()) | |
| repositories = {r["id"]: Repository.from_dict(r) for r in data} | |
| except Exception as e: | |
| logger.error("Failed to load repositories", error=str(e)) | |
| def save_repositories() -> None: | |
| """Save repositories to disk.""" | |
| repos_file.parent.mkdir(parents=True, exist_ok=True) | |
| data = [r.to_dict() for r in repositories.values()] | |
| repos_file.write_text(json.dumps(data, indent=2)) | |
| # Load on startup | |
| load_repositories() | |
| def resolve_repo_id(partial_id: str) -> Optional[str]: | |
| """Resolve a partial repository ID to a full ID. | |
| Supports both full UUIDs and partial IDs (first 8+ characters). | |
| Returns None if no match or multiple matches found. | |
| """ | |
| # First try exact match | |
| if partial_id in repositories: | |
| return partial_id | |
| # Try prefix match (minimum 8 characters recommended) | |
| matches = [rid for rid in repositories.keys() if rid.startswith(partial_id)] | |
| if len(matches) == 1: | |
| return matches[0] | |
| return None | |
| def get_repo_or_404(repo_id: str) -> Repository: | |
| """Get a repository by ID (full or partial), raising 404 if not found.""" | |
| full_id = resolve_repo_id(repo_id) | |
| if full_id is None: | |
| raise HTTPException(status_code=404, detail="Repository not found") | |
| return repositories[full_id] | |
| async def index_repository_task( | |
| url: str, | |
| repo_id: str, | |
| branch: Optional[str], | |
| include_patterns: Optional[list[str]], | |
| exclude_patterns: Optional[list[str]], | |
| ) -> None: | |
| """Background task to index a repository.""" | |
| repo = repositories[repo_id] | |
| try: | |
| # Validate and clone | |
| validator = GitHubURLValidator() | |
| repo_info = await validator.validate_repository(url) | |
| branch = branch or repo_info.branch or "main" | |
| loader = RepositoryLoader() | |
| repo_path = loader.clone_repository(repo_info, branch) | |
| repo.clone_path = repo_path | |
| repo.status = RepositoryStatus.INDEXING | |
| save_repositories() | |
| # Filter files | |
| file_filter = FileFilter( | |
| include_patterns=include_patterns, | |
| exclude_patterns=exclude_patterns, | |
| ) | |
| files = list(file_filter.filter_files(repo_path)) | |
| # Load documents | |
| documents = [] | |
| for file_path in files: | |
| try: | |
| doc = Document.from_file(file_path, repo_path, repo.id) | |
| documents.append(doc) | |
| except Exception as e: | |
| logger.warning("Failed to load file", path=str(file_path), error=str(e)) | |
| # Chunk | |
| chunker = CodeChunker() | |
| chunks = [] | |
| for doc in documents: | |
| for chunk in chunker.chunk_document(doc): | |
| chunks.append(chunk) | |
| # Embed and store | |
| if chunks: | |
| vectorstore = VectorStore() | |
| vectorstore.delete_repo_chunks(repo.id) | |
| embedder = EmbeddingGenerator() | |
| embedded_chunks = embedder.embed_chunks(chunks) | |
| vectorstore.add_chunks(embedded_chunks) | |
| # Update status | |
| repo.chunk_count = len(chunks) | |
| repo.indexed_at = datetime.now() | |
| repo.status = RepositoryStatus.READY | |
| save_repositories() | |
| logger.info("Repository indexed", repo_id=repo_id, chunks=len(chunks)) | |
| except Exception as e: | |
| logger.error("Indexing failed", repo_id=repo_id, error=str(e)) | |
| repo.status = RepositoryStatus.ERROR | |
| repo.error_message = str(e) | |
| save_repositories() | |
| async def index_repository( | |
| request: IndexRepositoryRequest, | |
| background_tasks: BackgroundTasks, | |
| ) -> IndexRepositoryResponse: | |
| """Index a GitHub repository.""" | |
| # Create repository record | |
| repo = Repository( | |
| url=request.url, | |
| branch=request.branch or "main", | |
| status=RepositoryStatus.PENDING, | |
| ) | |
| repositories[repo.id] = repo | |
| save_repositories() | |
| # Start background indexing | |
| background_tasks.add_task( | |
| index_repository_task, | |
| request.url, | |
| repo.id, | |
| request.branch, | |
| request.include_patterns, | |
| request.exclude_patterns, | |
| ) | |
| return IndexRepositoryResponse( | |
| repo_id=repo.id, | |
| status=repo.status.value, | |
| message="Repository indexing started", | |
| ) | |
| async def query_repository(request: QueryRequest) -> QueryResponse: | |
| """Query a repository. | |
| Supports both full repository IDs and partial IDs (first 8+ characters). | |
| """ | |
| # Check repository exists (supports partial IDs) | |
| repo = get_repo_or_404(request.repo_id) | |
| if repo.status != RepositoryStatus.READY: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Repository not ready (status: {repo.status.value})", | |
| ) | |
| try: | |
| # Generate response (use resolved repo.id for consistency) | |
| generator = ResponseGenerator() | |
| query = QueryModel( | |
| question=request.question, | |
| repo_id=repo.id, # Use resolved full ID | |
| top_k=request.top_k, | |
| ) | |
| response = generator.generate(query) | |
| # Convert to API schema | |
| return QueryResponse( | |
| answer=response.answer, | |
| citations=[ | |
| CitationResponse( | |
| file_path=c.file_path, | |
| start_line=c.start_line, | |
| end_line=c.end_line, | |
| ) | |
| for c in response.citations | |
| ], | |
| retrieved_chunks=[ | |
| RetrievedChunkResponse( | |
| chunk_id=c.chunk_id, | |
| file_path=c.file_path, | |
| start_line=c.start_line, | |
| end_line=c.end_line, | |
| relevance_score=c.relevance_score, | |
| chunk_type=c.chunk_type, | |
| name=c.name, | |
| content=c.content, | |
| ) | |
| for c in response.retrieved_chunks | |
| ], | |
| grounded=response.grounded, | |
| query_id=response.query_id, | |
| ) | |
| except Exception as e: | |
| logger.error("Query failed", error=str(e)) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def list_repositories() -> ListRepositoriesResponse: | |
| """List all repositories.""" | |
| return ListRepositoriesResponse( | |
| repositories=[ | |
| RepositoryInfo( | |
| id=repo.id, | |
| url=repo.url, | |
| branch=repo.branch, | |
| chunk_count=repo.chunk_count, | |
| status=repo.status.value, | |
| indexed_at=repo.indexed_at, | |
| error_message=repo.error_message, | |
| ) | |
| for repo in repositories.values() | |
| ] | |
| ) | |
| async def get_repository(repo_id: str) -> RepositoryInfo: | |
| """Get repository details. | |
| Supports both full repository IDs and partial IDs (first 8+ characters). | |
| """ | |
| repo = get_repo_or_404(repo_id) | |
| return RepositoryInfo( | |
| id=repo.id, | |
| url=repo.url, | |
| branch=repo.branch, | |
| chunk_count=repo.chunk_count, | |
| status=repo.status.value, | |
| indexed_at=repo.indexed_at, | |
| error_message=repo.error_message, | |
| ) | |
| async def delete_repository(repo_id: str) -> dict: | |
| """Delete a repository. | |
| Supports both full repository IDs and partial IDs (first 8+ characters). | |
| """ | |
| repo = get_repo_or_404(repo_id) | |
| try: | |
| # Delete from vector store (use resolved full ID) | |
| vectorstore = VectorStore() | |
| vectorstore.delete_repo_chunks(repo.id) | |
| # Delete from records (use resolved full ID) | |
| del repositories[repo.id] | |
| save_repositories() | |
| return {"message": f"Repository {repo.full_name} deleted"} | |
| except Exception as e: | |
| logger.error("Delete failed", error=str(e)) | |
| raise HTTPException(status_code=500, detail=str(e)) | |