Spaces:
Sleeping
Sleeping
| """ | |
| ULTRA-ROBUST CALL CENTER ANALYTICS | |
| =================================== | |
| ✅ Multiple gender detection models with voting | |
| ✅ Best STT model (Whisper Large-v3 + optimizations) | |
| ✅ Enhanced for European accents | |
| ✅ Robust pitch analysis with multiple methods | |
| ✅ Production-grade accuracy | |
| MODELS USED: | |
| - STT: Whisper Large-v3 (best for accents) | |
| - Gender: 3 models + voting system | |
| - Age: Wav2Vec2 Large + validation | |
| - Diarization: pyannote 3.1 (SOTA) | |
| """ | |
| from keybert import KeyBERT | |
| from sentence_transformers import SentenceTransformer | |
| import os | |
| import sys | |
| import logging | |
| import torch | |
| import librosa | |
| import whisper | |
| import numpy as np | |
| import warnings | |
| import json | |
| import gc | |
| from collections import Counter, defaultdict | |
| from pyannote.audio import Pipeline | |
| from transformers import ( | |
| pipeline, | |
| Wav2Vec2Processor, | |
| Wav2Vec2ForSequenceClassification, | |
| AutoModelForAudioClassification, | |
| AutoFeatureExtractor | |
| ) | |
| from datetime import datetime | |
| from scipy import signal as scipy_signal | |
| from scipy.stats import mode as scipy_mode | |
| import parselmouth | |
| from parselmouth.praat import call | |
| os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" | |
| os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" | |
| logging.getLogger("pyannote").setLevel(logging.ERROR) | |
| logging.getLogger("transformers").setLevel(logging.ERROR) | |
| warnings.filterwarnings("ignore") | |
| class NumpyEncoder(json.JSONEncoder): | |
| def default(self, obj): | |
| if isinstance(obj, np.integer): return int(obj) | |
| if isinstance(obj, np.floating): return float(obj) | |
| if isinstance(obj, np.ndarray): return obj.tolist() | |
| return super(NumpyEncoder, self).default(obj) | |
| class UltraRobustCallAnalytics: | |
| def __init__(self, hf_token=None, device=None): | |
| # 1. DEFINE DEVICE FIRST (Move this up) | |
| self.device = device if device else ("cuda" if torch.cuda.is_available() else "cpu") | |
| print(f"🚀 Initializing ULTRA-ROBUST Analytics Engine on {self.device}...") | |
| print("="*70) | |
| # 2. NOW YOU CAN FLUSH MEMORY (Move this down) | |
| self._flush_memory() | |
| # ===== BEST STT MODEL: Whisper Large-v3 ===== | |
| try: | |
| print(" → Loading Whisper Large-v3 (BEST for accents)...") | |
| self.stt_model = whisper.load_model("large-v3", device=self.device) | |
| self.stt_model_name = "large-v3" | |
| print(" ✓ Whisper Large-v3 loaded") | |
| except: | |
| print(" ⚠ Falling back to Large-v2...") | |
| try: | |
| self.stt_model = whisper.load_model("large-v2", device=self.device) | |
| self.stt_model_name = "large-v2" | |
| print(" ✓ Whisper Large-v2 loaded") | |
| except: | |
| print(" ⚠ Final fallback to Medium...") | |
| self.stt_model = whisper.load_model("medium", device=self.device) | |
| self.stt_model_name = "medium" | |
| print(" ✓ Whisper Medium loaded") | |
| # ===== DIARIZATION ===== | |
| self.diarization_pipeline = None | |
| if hf_token: | |
| print(f" → Attempting to load Pyannote with token starting: {hf_token[:4]}...") | |
| # Universal Loader: Tries 'token' (New) then 'use_auth_token' (Old) | |
| try: | |
| # Attempt 1: New Syntax | |
| self.diarization_pipeline = Pipeline.from_pretrained( | |
| "pyannote/speaker-diarization-3.1", | |
| token=hf_token | |
| ).to(torch.device(self.device)) | |
| print(" ✓ Diarization loaded (New Syntax)") | |
| except TypeError: | |
| # Attempt 2: Old Syntax (Fallback) | |
| print(" ⚠ New syntax failed, trying legacy syntax...") | |
| try: | |
| self.diarization_pipeline = Pipeline.from_pretrained( | |
| "pyannote/speaker-diarization-3.1", | |
| use_auth_token=hf_token | |
| ).to(torch.device(self.device)) | |
| print(" ✓ Diarization loaded (Legacy Syntax)") | |
| except Exception as e: | |
| print(f" ❌ CRITICAL PYANNOTE ERROR (Legacy): {e}") | |
| except Exception as e: | |
| print(f" ❌ CRITICAL PYANNOTE ERROR: {e}") | |
| # ===== EMOTION CLASSIFIER ===== | |
| print(" → Loading emotion classifier...") | |
| self.emotion_classifier = pipeline( | |
| "audio-classification", | |
| model="superb/wav2vec2-base-superb-er", | |
| device=0 if self.device == "cuda" else -1 | |
| ) | |
| print(" ✓ Emotion classifier loaded") | |
| # ===== MULTIPLE GENDER MODELS FOR VOTING ===== | |
| print("\n → Loading MULTIPLE gender detection models...") | |
| self.gender_models = {} | |
| # Model 1: Age-Gender (Primary) | |
| try: | |
| print(" Loading Gender Model 1: audeering/wav2vec2-large...") | |
| self.ag_model_name = "audeering/wav2vec2-large-robust-24-ft-age-gender" | |
| self.ag_processor = Wav2Vec2Processor.from_pretrained(self.ag_model_name) | |
| self.ag_model = Wav2Vec2ForSequenceClassification.from_pretrained(self.ag_model_name) | |
| self.ag_model.to(self.device).eval() | |
| self.gender_models['audeering'] = { | |
| 'processor': self.ag_processor, | |
| 'model': self.ag_model | |
| } | |
| print(" ✓ Model 1 loaded") | |
| except Exception as e: | |
| print(f" ✗ Model 1 failed: {e}") | |
| # Model 2: Alefiury Gender Classifier | |
| try: | |
| print(" Loading Gender Model 2: alefiury/wav2vec2-large-xlsr-53-gender...") | |
| model2_name = "alefiury/wav2vec2-large-xlsr-53-gender-recognition-librispeech" | |
| processor2 = AutoFeatureExtractor.from_pretrained(model2_name) | |
| model2 = AutoModelForAudioClassification.from_pretrained(model2_name) | |
| model2.to(self.device).eval() | |
| self.gender_models['alefiury'] = { | |
| 'processor': processor2, | |
| 'model': model2 | |
| } | |
| print(" ✓ Model 2 loaded") | |
| except Exception as e: | |
| print(f" ✗ Model 2 failed: {e}") | |
| # Model 3: MIT Gender Detection | |
| try: | |
| print(" Loading Gender Model 3: MIT/ast-finetuned-speech-commands...") | |
| model3_name = "MIT/ast-finetuned-speech-commands-v2" | |
| processor3 = AutoFeatureExtractor.from_pretrained(model3_name) | |
| model3 = AutoModelForAudioClassification.from_pretrained(model3_name) | |
| model3.to(self.device).eval() | |
| self.gender_models['mit'] = { | |
| 'processor': processor3, | |
| 'model': model3 | |
| } | |
| print(" ✓ Model 3 loaded") | |
| except Exception as e: | |
| print(f" ✗ Model 3 failed: {e}") | |
| print(f" ✓ Loaded {len(self.gender_models)} gender detection models") | |
| print("\n" + "="*70) | |
| print("✅ Engine initialized successfully") | |
| print("="*70 + "\n") | |
| print(" → Loading KeyBERT for keyword extraction...") | |
| try: | |
| self.keyword_model = KeyBERT('all-MiniLM-L6-v2') | |
| print(" ✓ Keyword extractor loaded") | |
| except Exception as e: | |
| print(f" ⚠ Keyword model failed: {e}") | |
| self.keyword_model = None | |
| print(" → Loading zero-shot topic classifier...") | |
| try: | |
| self.topic_classifier = pipeline( | |
| "zero-shot-classification", | |
| model="facebook/bart-large-mnli", | |
| device=0 if self.device == "cuda" else -1 | |
| ) | |
| self.topic_labels = [ | |
| "billing_payment", | |
| "technical_support", | |
| "product_inquiry", | |
| "complaint_issue", | |
| "account_management", | |
| "sales_marketing", | |
| "service_cancellation", | |
| "feedback_survey", | |
| "appointment_scheduling", | |
| "general_inquiry" | |
| ] | |
| print(" ✓ Topic classifier loaded") | |
| except Exception as e: | |
| print(f" ⚠ Topic classifier failed: {e}") | |
| self.topic_classifier = None | |
| def process_call(self, audio_path): | |
| """Main processing with maximum robustness""" | |
| if not os.path.exists(audio_path): | |
| raise FileNotFoundError(f"Audio file not found: {audio_path}") | |
| self._flush_memory() | |
| print(f"📁 Processing: {audio_path}") | |
| print("="*70) | |
| # Load and preprocess | |
| wav, sr = librosa.load(audio_path, sr=16000, mono=True) | |
| wav = wav.astype(np.float32) | |
| # Audio enhancement for call center quality | |
| wav = self._enhance_audio_for_callcenter(wav, sr) | |
| duration = len(wav) / sr | |
| print(f" ✓ Audio loaded: {duration:.1f}s @ {sr}Hz") | |
| # Enhanced diarization | |
| print("\n → Running enhanced diarization...") | |
| segments = self._run_enhanced_diarization(wav, sr, audio_path) | |
| print(f" ✓ Found {len(set(s['speaker'] for s in segments))} speakers, {len(segments)} segments") | |
| # Smart merging | |
| merged = self._merge_segments_smart(segments, min_gap=0.25) | |
| print(f" ✓ Merged to {len(merged)} segments") | |
| # Process segments | |
| results = [] | |
| spk_audio_buffer = defaultdict(list) | |
| pad = int(0.1 * sr) # Increased padding | |
| print("\n → Transcribing with Whisper Large-v3...") | |
| for i, seg in enumerate(merged): | |
| seg_duration = seg['end'] - seg['start'] | |
| if seg_duration < 0.1: | |
| continue | |
| start_idx = max(0, int(seg['start'] * sr) - pad) | |
| end_idx = min(len(wav), int(seg['end'] * sr) + pad) | |
| chunk = wav[start_idx:end_idx] | |
| if self._is_silence(chunk): | |
| continue | |
| # Collect audio for biometrics | |
| if seg_duration > 0.4: | |
| spk_audio_buffer[seg['speaker']].append(chunk) | |
| # ENHANCED TRANSCRIPTION | |
| text = self._transcribe_chunk_robust(chunk, sr) | |
| if not text: | |
| continue | |
| emotion = self._detect_emotion(chunk) | |
| sentiment = self._map_emotion_to_sentiment(emotion) | |
| speech_rate = self._calculate_speech_rate(text, seg_duration) | |
| keywords = self._extract_keywords(text, top_n=5) | |
| topic = self._classify_topic(text) | |
| results.append({ | |
| "segment_id": i + 1, | |
| "start": float(f"{seg['start']:.2f}"), | |
| "end": float(f"{seg['end']:.2f}"), | |
| "duration": float(f"{seg_duration:.2f}"), | |
| "speaker": seg['speaker'], | |
| "role": "UNKNOWN", | |
| "text": text, | |
| "emotion": emotion, | |
| "sentiment": sentiment, # NEW | |
| "speech_rate": speech_rate, # NEW | |
| "keywords": keywords, # NEW | |
| "topic": topic, # NEW | |
| "tone": self._calculate_tone_advanced(chunk, sr, text) | |
| }) | |
| if (i + 1) % 10 == 0: | |
| print(f" Processed {i + 1}/{len(merged)} segments...") | |
| print(f" ✓ Transcribed {len(results)} segments with text") | |
| # Assign roles | |
| print("\n → Assigning speaker roles...") | |
| results = self._assign_roles_smart(results) | |
| identification = {} | |
| for r in results: | |
| identification[r['speaker']] = r['role'] | |
| print(f" ✓ Roles: {identification}") | |
| # ULTRA-ROBUST BIOMETRICS WITH VOTING | |
| print("\n → Analyzing biometrics with multi-model voting...") | |
| biometrics = self._analyze_biometrics_ultra_robust(spk_audio_buffer, results, wav, sr) | |
| for spk, bio in biometrics.items(): | |
| print(f" {spk}: {bio['gender']} (confidence: {bio['gender_confidence']:.2f}), {bio['age_bracket']}") | |
| # Customer journey | |
| print("\n → Analyzing customer journey...") | |
| cust_metrics = self._analyze_customer_journey(results) | |
| print(f" ✓ Journey: {cust_metrics['emotional_arc']}") | |
| # Agent KPI | |
| print("\n → Analyzing agent performance...") | |
| agent_metrics = self._analyze_agent_kpi(results, cust_metrics['impact_score']) | |
| print(f" ✓ Agent score: {agent_metrics.get('overall_score', 'N/A')}/100") | |
| # Compile output | |
| call_summary = self._aggregate_call_insights(results) | |
| final_output = { | |
| "metadata": { | |
| "file": os.path.basename(audio_path), | |
| "duration_seconds": float(f"{duration:.2f}"), | |
| "sample_rate": sr, | |
| "total_segments": len(results), | |
| "stt_model": self.stt_model_name, | |
| "gender_models_used": len(self.gender_models), | |
| "speakers": biometrics, | |
| "call_summary": call_summary # NEW | |
| }, | |
| "identification": identification, | |
| "agent_metrics": agent_metrics, | |
| "customer_metrics": cust_metrics, | |
| "transcript": results | |
| } | |
| self._flush_memory() | |
| print("\n" + "="*70) | |
| print("✅ Processing complete") | |
| print("="*70 + "\n") | |
| return final_output | |
| def _enhance_audio_for_callcenter(self, wav, sr): | |
| """Enhance audio quality for better transcription""" | |
| # 1. Normalize | |
| wav = wav / (np.max(np.abs(wav)) + 1e-7) | |
| # 2. High-pass filter to remove low-frequency noise | |
| try: | |
| sos = scipy_signal.butter(4, 80, 'hp', fs=sr, output='sos') | |
| wav = scipy_signal.sosfilt(sos, wav) | |
| except: | |
| pass | |
| # 3. Gentle compression to balance volume | |
| wav = np.sign(wav) * np.log1p(np.abs(wav) * 10) / np.log1p(10) | |
| return wav.astype(np.float32) | |
| def _transcribe_chunk_robust(self, chunk, sr): | |
| """ | |
| ULTRA-ROBUST TRANSCRIPTION | |
| Optimized for: | |
| - European accents | |
| - Call center quality | |
| - Background noise | |
| """ | |
| # Ensure minimum length | |
| if len(chunk) < sr * 0.3: | |
| pad = np.zeros(int(sr * 0.5), dtype=np.float32) | |
| chunk = np.concatenate([pad, chunk, pad]) | |
| try: | |
| # BEST SETTINGS FOR CALL CENTER + EUROPEAN ACCENTS | |
| result = self.stt_model.transcribe( | |
| chunk.astype(np.float32), | |
| language="en", # English only | |
| task="transcribe", | |
| # Quality settings | |
| beam_size=5, # Higher = more accurate but slower | |
| best_of=5, # Sample best of 5 runs | |
| temperature=0.0, # Deterministic | |
| # Accent handling | |
| condition_on_previous_text=True, # Use context | |
| # Noise handling | |
| compression_ratio_threshold=2.4, # More lenient | |
| logprob_threshold=-1.0, # More lenient | |
| no_speech_threshold=0.6, # Standard | |
| # Speed vs accuracy | |
| fp16=(self.device == "cuda"), # Use FP16 on GPU | |
| # Word timestamps for quality check | |
| word_timestamps=True | |
| ) | |
| text = result['text'].strip() | |
| # Quality filters | |
| if len(text) < 2: | |
| return None | |
| # Filter garbage | |
| garbage = ["you", "thank you", ".", "...", "bye", "okay"] | |
| if text.lower() in garbage: | |
| return None | |
| # Check if it's actual speech (has vowels and consonants) | |
| if not any(c in text.lower() for c in 'aeiou'): | |
| return None | |
| # Check word-level confidence if available | |
| if 'words' in result and result['words']: | |
| avg_prob = np.mean([w.get('probability', 1.0) for w in result['words']]) | |
| if avg_prob < 0.3: # Very low confidence | |
| return None | |
| return text | |
| except Exception as e: | |
| print(f" ⚠ Transcription error: {e}") | |
| return None | |
| def _analyze_biometrics_ultra_robust(self, audio_buffer, transcript, full_wav, sr): | |
| """ | |
| ULTRA-ROBUST GENDER DETECTION | |
| Uses multiple models + voting + pitch + conversation context | |
| """ | |
| profiles = {} | |
| # Collect conversation context | |
| context_gender = self._extract_gender_from_conversation(transcript) | |
| for spk, chunks in audio_buffer.items(): | |
| if not chunks: | |
| continue | |
| print(f"\n Analyzing {spk}...") | |
| # Concatenate audio (max 15 seconds from different parts) | |
| raw_audio = self._prepare_audio_for_analysis(chunks, sr) | |
| # ===== METHOD 1: ADVANCED PITCH ANALYSIS ===== | |
| pitch_gender, pitch_confidence, pitch_stats = self._analyze_pitch_robust(raw_audio, sr, full_wav, transcript, spk) | |
| print(f" Pitch analysis: {pitch_gender} (conf: {pitch_confidence:.2f})") | |
| # ===== METHOD 2: MULTI-MODEL AI VOTING ===== | |
| ai_gender, ai_confidence, all_predictions = self._multi_model_gender_detection(raw_audio, sr) | |
| print(f" AI models: {ai_gender} (conf: {ai_confidence:.2f})") | |
| print(f" Individual: {all_predictions}") | |
| # ===== METHOD 3: CONVERSATION CONTEXT ===== | |
| context_gend = context_gender.get(spk, "UNKNOWN") | |
| print(f" Context clues: {context_gend}") | |
| # ===== METHOD 4: FORMANT ANALYSIS ===== | |
| formant_gender, formant_confidence = self._analyze_formants(raw_audio, sr) | |
| print(f" Formant analysis: {formant_gender} (conf: {formant_confidence:.2f})") | |
| # ===== VOTING SYSTEM WITH CONFIDENCE WEIGHTING ===== | |
| votes = [] | |
| # Context vote (HIGHEST priority if available) | |
| if context_gend != "UNKNOWN": | |
| votes.extend([context_gend] * 4) # 4 votes for context | |
| # Pitch vote (HIGH priority) | |
| if pitch_confidence > 0.6: | |
| votes.extend([pitch_gender] * 3) # 3 votes for confident pitch | |
| elif pitch_confidence > 0.4: | |
| votes.append(pitch_gender) # 1 vote for moderate pitch | |
| # AI models vote (MEDIUM priority) | |
| if ai_confidence > 0.7: | |
| votes.extend([ai_gender] * 2) # 2 votes for confident AI | |
| elif ai_confidence > 0.5: | |
| votes.append(ai_gender) # 1 vote for moderate AI | |
| # Formant vote (MEDIUM priority) | |
| if formant_confidence > 0.6: | |
| votes.extend([formant_gender] * 2) | |
| elif formant_confidence > 0.4: | |
| votes.append(formant_gender) | |
| # Count votes | |
| if votes: | |
| vote_counts = Counter(votes) | |
| final_gender = vote_counts.most_common(1)[0][0] | |
| total_votes = len(votes) | |
| winning_votes = vote_counts[final_gender] | |
| final_confidence = winning_votes / total_votes | |
| else: | |
| # Fallback | |
| final_gender = ai_gender if ai_confidence > 0.5 else "UNKNOWN" | |
| final_confidence = ai_confidence | |
| print(f" FINAL: {final_gender} (confidence: {final_confidence:.2f})") | |
| print(f" Vote breakdown: {dict(Counter(votes))}") | |
| # ===== AGE DETECTION ===== | |
| age_bracket = self._detect_age_robust(raw_audio, sr, pitch_stats) | |
| # Get role | |
| role = [r['role'] for r in transcript if r['speaker'] == spk] | |
| role = role[0] if role else "UNKNOWN" | |
| profiles[spk] = { | |
| "gender": final_gender, | |
| "gender_confidence": round(final_confidence, 2), | |
| "gender_methods": { | |
| "context": context_gend, | |
| "pitch": f"{pitch_gender} ({pitch_confidence:.2f})", | |
| "ai_models": f"{ai_gender} ({ai_confidence:.2f})", | |
| "formants": f"{formant_gender} ({formant_confidence:.2f})", | |
| "vote_breakdown": dict(Counter(votes)) | |
| }, | |
| "age_bracket": age_bracket, | |
| "voice_stats": { | |
| "avg_pitch_hz": pitch_stats['mean'], | |
| "pitch_range": f"{pitch_stats['min']:.0f}-{pitch_stats['max']:.0f}Hz", | |
| "pitch_std": pitch_stats['std'] | |
| } | |
| } | |
| return profiles | |
| def _prepare_audio_for_analysis(self, chunks, sr, max_duration=15): | |
| """Prepare audio by taking samples from different parts""" | |
| raw = np.concatenate(chunks) | |
| # Take samples from beginning, middle, end | |
| if len(raw) > sr * max_duration: | |
| segment_len = sr * 5 # 5 seconds each | |
| total_len = len(raw) | |
| samples = [] | |
| # Beginning | |
| samples.append(raw[:segment_len]) | |
| # Middle | |
| mid_start = (total_len // 2) - (segment_len // 2) | |
| samples.append(raw[mid_start:mid_start + segment_len]) | |
| # End | |
| samples.append(raw[-segment_len:]) | |
| raw = np.concatenate(samples) | |
| # Normalize | |
| raw = raw - np.mean(raw) | |
| std = np.std(raw) | |
| if std > 1e-7: | |
| raw = raw / std | |
| return raw | |
| def _analyze_pitch_robust(self, audio, sr, full_wav, transcript, speaker): | |
| """Advanced pitch analysis using multiple methods""" | |
| # Collect all pitch values from transcript | |
| transcript_pitches = [ | |
| t['tone']['pitch_hz'] | |
| for t in transcript | |
| if t['speaker'] == speaker and t['tone']['pitch_hz'] > 60 | |
| ] | |
| # Method 1: YIN algorithm | |
| try: | |
| f0_yin = librosa.yin(audio.astype(np.float64), fmin=60, fmax=400, sr=sr) | |
| f0_yin_valid = f0_yin[f0_yin > 0] | |
| except: | |
| f0_yin_valid = [] | |
| # Method 2: PYIN (probabilistic YIN) | |
| try: | |
| f0_pyin, voiced_flag, voiced_probs = librosa.pyin( | |
| audio.astype(np.float64), | |
| fmin=60, | |
| fmax=400, | |
| sr=sr | |
| ) | |
| f0_pyin_valid = f0_pyin[~np.isnan(f0_pyin)] | |
| except: | |
| f0_pyin_valid = [] | |
| # Combine all pitch measurements | |
| all_pitches = [] | |
| if len(f0_yin_valid) > 0: | |
| all_pitches.extend(f0_yin_valid) | |
| if len(f0_pyin_valid) > 0: | |
| all_pitches.extend(f0_pyin_valid) | |
| if len(transcript_pitches) > 0: | |
| all_pitches.extend(transcript_pitches) | |
| if len(all_pitches) == 0: | |
| return "UNKNOWN", 0.0, {'mean': 0, 'std': 0, 'min': 0, 'max': 0} | |
| # Calculate statistics | |
| mean_pitch = np.mean(all_pitches) | |
| std_pitch = np.std(all_pitches) | |
| min_pitch = np.min(all_pitches) | |
| max_pitch = np.max(all_pitches) | |
| pitch_stats = { | |
| 'mean': round(mean_pitch, 1), | |
| 'std': round(std_pitch, 1), | |
| 'min': round(min_pitch, 1), | |
| 'max': round(max_pitch, 1) | |
| } | |
| # Gender classification with refined thresholds | |
| # Research-based ranges: | |
| # Male: 85-180 Hz (average ~120 Hz) | |
| # Female: 165-255 Hz (average ~210 Hz) | |
| if mean_pitch < 150: | |
| gender = "MALE" | |
| # Confidence based on how far below 150 | |
| confidence = min(1.0, (150 - mean_pitch) / 40) | |
| elif mean_pitch > 180: | |
| gender = "FEMALE" | |
| # Confidence based on how far above 180 | |
| confidence = min(1.0, (mean_pitch - 180) / 40) | |
| else: | |
| # Ambiguous range (150-180 Hz) | |
| if mean_pitch < 165: | |
| gender = "MALE" | |
| confidence = 0.5 | |
| else: | |
| gender = "FEMALE" | |
| confidence = 0.5 | |
| return gender, confidence, pitch_stats | |
| def _multi_model_gender_detection(self, audio, sr): | |
| """Run multiple AI models and aggregate predictions""" | |
| predictions = [] | |
| confidences = [] | |
| for model_name, model_dict in self.gender_models.items(): | |
| try: | |
| processor = model_dict['processor'] | |
| model = model_dict['model'] | |
| # Prepare inputs | |
| inputs = processor( | |
| audio, | |
| sampling_rate=sr, | |
| return_tensors="pt", | |
| padding=True | |
| ).to(self.device) | |
| # Predict | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| logits = outputs.logits | |
| probs = torch.softmax(logits, dim=-1)[0].cpu().numpy() | |
| # Extract gender prediction | |
| labels = model.config.id2label | |
| # Find male/female labels (different models use different names) | |
| male_score = 0 | |
| female_score = 0 | |
| for idx, label in labels.items(): | |
| label_lower = label.lower() | |
| if 'male' in label_lower and 'female' not in label_lower: | |
| male_score = max(male_score, probs[idx]) | |
| elif 'female' in label_lower: | |
| female_score = max(female_score, probs[idx]) | |
| if male_score > female_score: | |
| predictions.append("MALE") | |
| confidences.append(male_score) | |
| else: | |
| predictions.append("FEMALE") | |
| confidences.append(female_score) | |
| except Exception as e: | |
| print(f" Model {model_name} error: {e}") | |
| continue | |
| if not predictions: | |
| return "UNKNOWN", 0.0, {} | |
| # Aggregate predictions | |
| pred_counter = Counter(predictions) | |
| majority_vote = pred_counter.most_common(1)[0][0] | |
| # Calculate confidence | |
| majority_indices = [i for i, p in enumerate(predictions) if p == majority_vote] | |
| avg_confidence = np.mean([confidences[i] for i in majority_indices]) | |
| # Individual predictions | |
| individual = { | |
| f"model_{i+1}": f"{pred} ({conf:.2f})" | |
| for i, (pred, conf) in enumerate(zip(predictions, confidences)) | |
| } | |
| return majority_vote, float(avg_confidence), individual | |
| def _extract_gender_from_conversation(self, transcript): | |
| """Extract gender clues from conversation""" | |
| context_map = {} | |
| # Extended keyword lists | |
| male_keywords = [ | |
| "sir", "mr.", "mister", "mr ", "gentleman", "he", "him", "his", | |
| "man", "guy", "male", "father", "dad", "son", "brother", "husband" | |
| ] | |
| female_keywords = [ | |
| "ma'am", "miss", "mrs", "mrs.", "madam", "madame", "ms", "ms.", | |
| "she", "her", "hers", "woman", "lady", "female", "mother", "mom", | |
| "daughter", "sister", "wife" | |
| ] | |
| for line in transcript: | |
| if line['role'] == "AGENT": | |
| txt = line['text'].lower() | |
| # Find who agent is talking to | |
| customers = [x['speaker'] for x in transcript if x['role'] == "CUSTOMER"] | |
| if not customers: | |
| continue | |
| target = customers[0] | |
| # Check for keywords | |
| if any(keyword in txt for keyword in male_keywords): | |
| context_map[target] = "MALE" | |
| elif any(keyword in txt for keyword in female_keywords): | |
| context_map[target] = "FEMALE" | |
| return context_map | |
| def _analyze_formants(self, audio, sr): | |
| """Analyze formant frequencies (F1, F2) for gender detection""" | |
| try: | |
| # Use Praat for formant analysis | |
| import parselmouth | |
| from parselmouth.praat import call | |
| snd = parselmouth.Sound(audio, sampling_frequency=sr) | |
| formant = snd.to_formant_burg() | |
| # Extract F1 and F2 for voiced segments | |
| f1_values = [] | |
| f2_values = [] | |
| duration = snd.get_total_duration() | |
| time_step = 0.01 # 10ms steps | |
| for t in np.arange(0, duration, time_step): | |
| f1 = formant.get_value_at_time(1, t) | |
| f2 = formant.get_value_at_time(2, t) | |
| if not np.isnan(f1) and not np.isnan(f2): | |
| f1_values.append(f1) | |
| f2_values.append(f2) | |
| if len(f1_values) < 10: | |
| return "UNKNOWN", 0.0 | |
| avg_f1 = np.mean(f1_values) | |
| avg_f2 = np.mean(f2_values) | |
| # Gender classification based on formants | |
| # Typical ranges: | |
| # Male: F1 ~120 Hz, F2 ~1200 Hz | |
| # Female: F1 ~220 Hz, F2 ~2100 Hz | |
| # Combined metric | |
| if avg_f1 < 170 and avg_f2 < 1650: | |
| gender = "MALE" | |
| confidence = 0.7 | |
| elif avg_f1 > 190 and avg_f2 > 1750: | |
| gender = "FEMALE" | |
| confidence = 0.7 | |
| else: | |
| # Use F2 as primary indicator | |
| if avg_f2 < 1600: | |
| gender = "MALE" | |
| else: | |
| gender = "FEMALE" | |
| confidence = 0.5 | |
| return gender, confidence | |
| except ImportError: | |
| return "UNKNOWN", 0.0 | |
| except Exception as e: | |
| return "UNKNOWN", 0.0 | |
| def _detect_age_robust(self, audio, sr, pitch_stats): | |
| """Robust age detection""" | |
| try: | |
| if 'audeering' not in self.gender_models: | |
| return "26-35" # Default | |
| processor = self.gender_models['audeering']['processor'] | |
| model = self.gender_models['audeering']['model'] | |
| inputs = processor(audio, sampling_rate=sr, return_tensors="pt").to(self.device) | |
| with torch.no_grad(): | |
| logits = model(**inputs).logits | |
| probs = torch.softmax(logits, dim=-1)[0].cpu().numpy() | |
| # Map labels to age buckets (aggregating across genders) | |
| # Labels usually look like: 'female_20-29', 'male_20-29', etc. | |
| labels = model.config.id2label | |
| age_scores = defaultdict(float) | |
| for i, score in enumerate(probs): | |
| label = labels[i] | |
| # Extract age part (assuming format gender_age) | |
| parts = label.split('_') | |
| if len(parts) > 1: | |
| age_group = parts[-1] # e.g., "20-29" | |
| age_scores[age_group] += score | |
| # Get best age bracket | |
| if age_scores: | |
| best_age = max(age_scores, key=age_scores.get) | |
| return best_age | |
| return "UNKNOWN" | |
| except Exception as e: | |
| print(f" ⚠ Age detection failed: {e}") | |
| return "UNKNOWN" | |
| def _run_enhanced_diarization(self, wav, sr, file_path): | |
| """ | |
| Run Pyannote diarization or fallback to simple segmentation | |
| """ | |
| if self.diarization_pipeline is None: | |
| print(" ⚠ No auth token provided, using energy-based fallback segmentation") | |
| return self._energy_based_segmentation(wav, sr) | |
| try: | |
| # Run pipeline | |
| diarization = self.diarization_pipeline(file_path, min_speakers=2, max_speakers=2) | |
| segments = [] | |
| for turn, _, speaker in diarization.itertracks(yield_label=True): | |
| segments.append({ | |
| "start": turn.start, | |
| "end": turn.end, | |
| "speaker": speaker | |
| }) | |
| return segments | |
| except Exception as e: | |
| print(f" ⚠ Diarization error: {e}, using fallback") | |
| return self._energy_based_segmentation(wav, sr) | |
| def _energy_based_segmentation(self, wav, sr): | |
| """Fallback if deep learning diarization fails""" | |
| # Simple energy detection to split speech from silence | |
| # Treating as single speaker (SPEAKER_00) | |
| intervals = librosa.effects.split(wav, top_db=30) | |
| segments = [] | |
| for start, end in intervals: | |
| segments.append({ | |
| "start": start / sr, | |
| "end": end / sr, | |
| "speaker": "SPEAKER_00" | |
| }) | |
| return segments | |
| def _merge_segments_smart(self, segments, min_gap=0.5): | |
| """Merge segments from same speaker that are close together""" | |
| if not segments: | |
| return [] | |
| merged = [] | |
| current = segments[0] | |
| for next_seg in segments[1:]: | |
| # If same speaker and gap is small | |
| if (next_seg['speaker'] == current['speaker'] and | |
| (next_seg['start'] - current['end']) < min_gap): | |
| # Extend current segment | |
| current['end'] = next_seg['end'] | |
| else: | |
| merged.append(current) | |
| current = next_seg | |
| merged.append(current) | |
| return merged | |
| def _is_silence(self, chunk, threshold=0.005): | |
| """Check if audio chunk is essentially silence""" | |
| return np.max(np.abs(chunk)) < threshold | |
| def _detect_emotion(self, chunk): | |
| """Detect emotion from audio chunk""" | |
| try: | |
| # Ensure chunk is long enough for model | |
| if len(chunk) < 16000 * 0.5: | |
| return "neutral" | |
| # Use the pipeline loaded in init | |
| # Note: Pipeline expects file path or numpy array | |
| preds = self.emotion_classifier(chunk, top_k=1) | |
| return preds[0]['label'] | |
| except: | |
| return "neutral" | |
| def _calculate_tone_advanced(self, chunk, sr, text): | |
| """ | |
| Calculate pitch, jitter, and shimmer using Parselmouth (Praat) | |
| """ | |
| try: | |
| if len(chunk) < sr * 0.1: | |
| return {"pitch_hz": 0, "jitter": 0, "shimmer": 0} | |
| snd = parselmouth.Sound(chunk, sampling_frequency=sr) | |
| # Pitch | |
| pitch = snd.to_pitch() | |
| pitch_val = pitch.selected_array['frequency'] | |
| pitch_val = pitch_val[pitch_val != 0] | |
| avg_pitch = np.mean(pitch_val) if len(pitch_val) > 0 else 0 | |
| # Pulses for Jitter/Shimmer | |
| point_process = call(snd, "To PointProcess (periodic, cc)", 75, 500) | |
| try: | |
| jitter = call(point_process, "Get jitter (local)", 0, 0, 0.0001, 0.02, 1.3) | |
| except: | |
| jitter = 0 | |
| try: | |
| shimmer = call([snd, point_process], "Get shimmer (local)", 0, 0, 0.0001, 0.02, 1.3, 1.6) | |
| except: | |
| shimmer = 0 | |
| return { | |
| "pitch_hz": round(float(avg_pitch), 1), | |
| "jitter": round(float(jitter * 100), 2), # percentage | |
| "shimmer": round(float(shimmer * 100), 2) # db | |
| } | |
| except: | |
| return {"pitch_hz": 0, "jitter": 0, "shimmer": 0} | |
| def _assign_roles_smart(self, results): | |
| """ | |
| Assign AGENT vs CUSTOMER roles using Golden Phrases and Verbosity. | |
| """ | |
| speakers = list(set(r['speaker'] for r in results)) | |
| if len(speakers) == 1: | |
| # If only one speaker found, assume it's the Agent monologuing | |
| for r in results: r['role'] = "AGENT" | |
| return results | |
| speaker_scores = defaultdict(int) | |
| word_counts = defaultdict(int) | |
| # 1. GOLDEN PHRASES (Almost 100% guarantee of Agent) | |
| # These override normal scoring | |
| golden_agent_phrases = [ | |
| "my name is", "this is steve", "this is sam", "this is mike", # Common names | |
| "calling from", "on a recorded line", "green solutions", | |
| "energy solutions", "federal government", "rebate program" | |
| ] | |
| # 2. STANDARD SCORING KEYWORDS | |
| agent_keywords = [ | |
| "manager", "supervisor", "qualified", "eligible", | |
| "whatsapp", "ping you", "verification", "consumption" | |
| ] | |
| customer_keywords = [ | |
| "who is this", "stop calling", "not interested", | |
| "take me off", "do not call", "why are you asking" | |
| ] | |
| agent_found_via_golden = None | |
| for res in results: | |
| text = res['text'].lower() | |
| spk = res['speaker'] | |
| # Count words for verbosity check | |
| words = text.split() | |
| word_counts[spk] += len(words) | |
| # Check Golden Phrases (Instant Win) | |
| if agent_found_via_golden is None: | |
| for phrase in golden_agent_phrases: | |
| if phrase in text: | |
| print(f" ★ Golden Phrase found for {spk}: '{phrase}'") | |
| agent_found_via_golden = spk | |
| break | |
| # Standard Scoring | |
| if any(k in text for k in agent_keywords): | |
| speaker_scores[spk] += 2 | |
| if any(k in text for k in customer_keywords): | |
| speaker_scores[spk] -= 3 # Strong negative for objections | |
| # 3. DECISION LOGIC | |
| final_agent = None | |
| if agent_found_via_golden: | |
| # If we found a golden phrase, trust it implicitly | |
| final_agent = agent_found_via_golden | |
| else: | |
| # Fallback: Verbosity Check (Agent usually talks more) | |
| # Get speaker with max words | |
| talkative_spk = max(word_counts, key=word_counts.get) | |
| total_words = sum(word_counts.values()) | |
| # If one speaker dominates >60% of conversation, likely the agent | |
| if word_counts[talkative_spk] / max(1, total_words) > 0.60: | |
| speaker_scores[talkative_spk] += 5 | |
| # Validating scores | |
| final_agent = max(speaker_scores, key=speaker_scores.get) | |
| # 4. ASSIGN ROLES | |
| print(f" ✓ Role Assignment: Identified {final_agent} as AGENT") | |
| identification = {} | |
| for res in results: | |
| if res['speaker'] == final_agent: | |
| res['role'] = "AGENT" | |
| else: | |
| res['role'] = "CUSTOMER" | |
| identification[res['speaker']] = res['role'] | |
| return results | |
| def _analyze_customer_journey(self, results): | |
| """Analyze sentiment flow of the customer""" | |
| cust_segments = [r for r in results if r['role'] == "CUSTOMER"] | |
| if not cust_segments: | |
| return {"emotional_arc": "No customer audio", "impact_score": 0} | |
| # Map emotions to scores | |
| emo_map = { | |
| "happy": 1.0, "joy": 1.0, "neutral": 0.1, | |
| "sad": -0.5, "angry": -1.0, "frustrated": -1.0 | |
| } | |
| start_score = sum(emo_map.get(s['emotion'], 0) for s in cust_segments[:3]) / min(3, len(cust_segments)) | |
| end_score = sum(emo_map.get(s['emotion'], 0) for s in cust_segments[-3:]) / min(3, len(cust_segments)) | |
| impact = end_score - start_score | |
| if impact > 0.2: arc = "Positive Resolution" | |
| elif impact < -0.2: arc = "Negative Escalation" | |
| else: arc = "Neutral/Unresolved" | |
| return { | |
| "emotional_arc": arc, | |
| "start_sentiment": round(start_score, 2), | |
| "end_sentiment": round(end_score, 2), | |
| "impact_score": round(impact, 2) | |
| } | |
| def _analyze_agent_kpi(self, results, customer_impact): | |
| """Calculate Agent performance metrics""" | |
| agent_segments = [r for r in results if r['role'] == "AGENT"] | |
| if not agent_segments: | |
| return {"overall_score": 0} | |
| # 1. Politeness (Keyword based) | |
| polite_words = ["please", "thank", "sorry", "apologize", "appreciate"] | |
| total_words = sum(len(s['text'].split()) for s in agent_segments) | |
| polite_count = sum(1 for s in agent_segments if any(w in s['text'].lower() for w in polite_words)) | |
| politeness_score = min(100, (polite_count / max(1, len(agent_segments))) * 200) | |
| # 2. Tone Consistency (Jitter/Shimmer variance) | |
| jitter_vals = [s['tone']['jitter'] for s in agent_segments] | |
| tone_stability = 100 - min(100, np.std(jitter_vals) * 10) if jitter_vals else 50 | |
| # 3. Resolution Impact (from customer journey) | |
| # Map -1.0 to 1.0 range -> 0 to 100 | |
| resolution_score = 50 + (customer_impact * 50) | |
| resolution_score = max(0, min(100, resolution_score)) | |
| # Overall Weighted Score | |
| overall = ( | |
| (politeness_score * 0.3) + | |
| (tone_stability * 0.2) + | |
| (resolution_score * 0.5) | |
| ) | |
| return { | |
| "overall_score": int(overall), | |
| "politeness": int(politeness_score), | |
| "tone_stability": int(tone_stability), | |
| "resolution_effectiveness": int(resolution_score) | |
| } | |
| def _flush_memory(self): | |
| """Aggressive memory cleanup""" | |
| gc.collect() | |
| if self.device == "cuda": | |
| torch.cuda.empty_cache() | |
| def _map_emotion_to_sentiment(self, emotion): | |
| """Map emotion labels to sentiment with polarity score""" | |
| emotion_lower = emotion.lower() | |
| positive_emotions = { | |
| 'happy': 0.8, 'joy': 0.9, 'excited': 0.85, | |
| 'pleased': 0.7, 'satisfied': 0.75, 'content': 0.6 | |
| } | |
| negative_emotions = { | |
| 'sad': -0.6, 'angry': -0.9, 'frustrated': -0.8, | |
| 'annoyed': -0.7, 'disappointed': -0.65, 'upset': -0.75 | |
| } | |
| if emotion_lower in positive_emotions: | |
| return { | |
| "sentiment": "positive", | |
| "polarity_score": positive_emotions[emotion_lower], | |
| "confidence": "high" | |
| } | |
| if emotion_lower in negative_emotions: | |
| return { | |
| "sentiment": "negative", | |
| "polarity_score": negative_emotions[emotion_lower], | |
| "confidence": "high" | |
| } | |
| return { | |
| "sentiment": "neutral", | |
| "polarity_score": 0.0, | |
| "confidence": "medium" | |
| } | |
| def _calculate_speech_rate(self, text, duration_seconds): | |
| """Calculate words per minute (WPM) and classify pace""" | |
| if duration_seconds < 0.1: | |
| return {"wpm": 0, "word_count": 0, "speech_pace": "unknown"} | |
| words = text.split() | |
| word_count = len(words) | |
| wpm = (word_count / (duration_seconds / 60.0)) if duration_seconds > 0 else 0 | |
| if wpm < 100: pace = "slow" | |
| elif wpm < 140: pace = "normal" | |
| elif wpm < 180: pace = "fast" | |
| else: pace = "very_fast" | |
| return { | |
| "wpm": round(wpm, 1), | |
| "word_count": word_count, | |
| "speech_pace": pace | |
| } | |
| def _extract_keywords(self, text, top_n=5): | |
| """Extract keywords/keyphrases using KeyBERT""" | |
| if self.keyword_model is None or len(text.split()) < 3: | |
| return [] | |
| try: | |
| keywords = self.keyword_model.extract_keywords( | |
| text, | |
| keyphrase_ngram_range=(1, 2), | |
| stop_words='english', | |
| top_n=top_n, | |
| use_maxsum=True, | |
| nr_candidates=20 | |
| ) | |
| return [ | |
| {"keyword": kw[0], "relevance": round(float(kw[1]), 3)} | |
| for kw in keywords | |
| ] | |
| except: | |
| return [] | |
| def _classify_topic(self, text): | |
| """Classify text into call center topics""" | |
| if self.topic_classifier is None or len(text.split()) < 5: | |
| return {"topic": "unknown", "confidence": 0.0} | |
| try: | |
| result = self.topic_classifier(text, self.topic_labels, multi_label=False) | |
| return { | |
| "topic": result['labels'][0], | |
| "confidence": round(float(result['scores'][0]), 3), | |
| "top_3_topics": [ | |
| {"topic": label, "score": round(float(score), 3)} | |
| for label, score in zip(result['labels'][:3], result['scores'][:3]) | |
| ] | |
| } | |
| except: | |
| return {"topic": "unknown", "confidence": 0.0} | |
| def _aggregate_call_insights(self, results): | |
| """Aggregate keywords and topics at call level""" | |
| if not results: | |
| return {"top_keywords": [], "primary_topic": {"topic": "unknown"}} | |
| all_keywords = {} | |
| for seg in results: | |
| if 'keywords' in seg: | |
| for kw in seg['keywords']: | |
| keyword = kw['keyword'] | |
| score = kw['relevance'] | |
| all_keywords[keyword] = max(all_keywords.get(keyword, 0), score) | |
| top_keywords = [ | |
| {"keyword": k, "relevance": round(v, 3)} | |
| for k, v in sorted(all_keywords.items(), key=lambda x: x[1], reverse=True)[:10] | |
| ] | |
| # Aggregate topics | |
| topic_votes = defaultdict(float) | |
| for seg in results: | |
| if 'topic' in seg and seg['topic']['confidence'] > 0.5: | |
| topic_votes[seg['topic']['topic']] += seg['topic']['confidence'] | |
| primary_topic = { | |
| "topic": max(topic_votes, key=topic_votes.get) if topic_votes else "unknown", | |
| "confidence": round(topic_votes[max(topic_votes, key=topic_votes.get)] / len(results), 3) if topic_votes else 0.0 | |
| } | |
| # Calculate stats | |
| total_words = sum(seg.get('speech_rate', {}).get('word_count', 0) for seg in results) | |
| wpm_values = [seg.get('speech_rate', {}).get('wpm', 0) for seg in results if seg.get('speech_rate', {}).get('wpm', 0) > 0] | |
| average_wpm = round(np.mean(wpm_values), 1) if wpm_values else 0 | |
| return { | |
| "top_keywords": top_keywords, | |
| "primary_topic": primary_topic, | |
| "total_words": total_words, | |
| "average_wpm": average_wpm | |
| } | |
| if __name__ == "__main__": | |
| # Example usage | |
| print("Initialize with: analyzer = UltraRobustCallAnalytics(hf_token='YOUR_TOKEN')") | |
| print("Process with: result = analyzer.process_call('path/to/audio.wav')") |