import cv2 import numpy as np import onnxruntime as rt import sys from insightface.app import FaceAnalysis sys.path.insert(1, './recognition') from scrfd import SCRFD from arcface_onnx import ArcFaceONNX import os.path as osp import os from pathlib import Path from tqdm import tqdm import ffmpeg import random import multiprocessing as mp from concurrent.futures import ThreadPoolExecutor from insightface.model_zoo.inswapper import INSwapper import psutil from enum import Enum from insightface.app.common import Face from insightface.utils.storage import ensure_available import re import subprocess import urllib.request # Face enhancement imports try: from gfpgan import GFPGANer GFPGAN_AVAILABLE = True except ImportError: GFPGAN_AVAILABLE = False print("GFPGAN not available - face enhancement disabled") class RefacerMode(Enum): CPU, CUDA, COREML, TENSORRT = range(1, 5) class Refacer: def __init__(self,force_cpu=False,colab_performance=False): self.first_face = False self.force_cpu = force_cpu self.colab_performance = colab_performance self.__check_encoders() self.__check_providers() self.total_mem = psutil.virtual_memory().total self.__init_apps() # Advanced temporal smoothing for reducing flickering self.prev_faces = [] # Store faces from previous frame self.face_tracking_threshold = 0.15 # Lower IOU threshold for fast motion tracking self.face_memory = {} # Track face state across frames self.occlusion_tolerance = 10 # Higher tolerance for fast motion and occlusions self.last_swapped_frame = None # Cache last successfully swapped frame for stability self.stable_swap_count = 0 # Count consecutive stable swaps # Quality enhancement settings self.enable_color_correction = True # Match skin tone and lighting self.enable_seamless_clone = False # Disabled - INSwapper already handles blending self.enable_temporal_blend = True # Smooth frame transitions self.temporal_blend_alpha = 0.15 # Blend 15% with previous frame self.prev_blended_frame = None # For temporal smoothing self.enable_face_enhancement = GFPGAN_AVAILABLE # Face restoration with GFPGAN self.face_enhancer = None # Initialize GFPGAN for face enhancement if self.enable_face_enhancement: try: print("Initializing GFPGAN face enhancer...") self.face_enhancer = GFPGANer( model_path='https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.3.pth', upscale=1, # Don't upscale, just enhance arch='clean', channel_multiplier=2, bg_upsampler=None # Don't enhance background ) print("GFPGAN initialized successfully!") except Exception as e: print(f"GFPGAN initialization failed: {e}") self.enable_face_enhancement = False def __check_providers(self): if self.force_cpu : self.providers = ['CPUExecutionProvider'] else: self.providers = rt.get_available_providers() rt.set_default_logger_severity(4) self.sess_options = rt.SessionOptions() self.sess_options.execution_mode = rt.ExecutionMode.ORT_SEQUENTIAL self.sess_options.graph_optimization_level = rt.GraphOptimizationLevel.ORT_ENABLE_ALL if len(self.providers) == 1 and 'CPUExecutionProvider' in self.providers: self.mode = RefacerMode.CPU self.use_num_cpus = mp.cpu_count()-1 self.sess_options.intra_op_num_threads = int(self.use_num_cpus/3) print(f"CPU mode with providers {self.providers}") elif self.colab_performance: self.mode = RefacerMode.TENSORRT self.use_num_cpus = mp.cpu_count()-1 self.sess_options.intra_op_num_threads = int(self.use_num_cpus/3) print(f"TENSORRT mode with providers {self.providers}") elif 'CoreMLExecutionProvider' in self.providers: self.mode = RefacerMode.COREML self.use_num_cpus = mp.cpu_count()-1 self.sess_options.intra_op_num_threads = int(self.use_num_cpus/3) print(f"CoreML mode with providers {self.providers}") elif 'CUDAExecutionProvider' in self.providers: self.mode = RefacerMode.CUDA self.use_num_cpus = 2 self.sess_options.intra_op_num_threads = 1 if 'TensorrtExecutionProvider' in self.providers: self.providers.remove('TensorrtExecutionProvider') print(f"CUDA mode with providers {self.providers}") """ elif 'TensorrtExecutionProvider' in self.providers: self.mode = RefacerMode.TENSORRT #self.use_num_cpus = 1 #self.sess_options.intra_op_num_threads = 1 self.use_num_cpus = mp.cpu_count()-1 self.sess_options.intra_op_num_threads = int(self.use_num_cpus/3) print(f"TENSORRT mode with providers {self.providers}") """ def __download_model(self, model_path): """Download the inswapper model if it doesn't exist""" if os.path.exists(model_path): return print(f"Model file {model_path} not found. Downloading...") # Direct download from reliable sources sources = [ { 'name': 'Hugging Face - ezioruan', 'url': 'https://huggingface.co/ezioruan/inswapper_128.onnx/resolve/main/inswapper_128.onnx', }, { 'name': 'Hugging Face - ashleykleynhans', 'url': 'https://huggingface.co/ashleykleynhans/inswapper/resolve/main/inswapper_128.onnx', }, { 'name': 'Hugging Face - public-data', 'url': 'https://huggingface.co/public-data/insightface/resolve/main/models/inswapper_128.onnx', } ] for source in sources: try: print(f"Trying to download from {source['name']}...") # Use urllib with headers to avoid blocking import ssl ssl._create_default_https_context = ssl._create_unverified_context req = urllib.request.Request( source['url'], headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } ) with urllib.request.urlopen(req, timeout=300) as response: total_size = int(response.headers.get('content-length', 0)) print(f"Downloading {total_size / (1024*1024):.1f} MB...") with open(model_path, 'wb') as f: downloaded = 0 chunk_size = 8192 while True: chunk = response.read(chunk_size) if not chunk: break f.write(chunk) downloaded += len(chunk) if total_size > 0: percent = (downloaded / total_size) * 100 if int(percent) % 10 == 0: # Print every 10% print(f"Progress: {percent:.0f}%") # Verify the file if os.path.exists(model_path) and os.path.getsize(model_path) > 500000000: # > 500MB print(f"✅ Successfully downloaded model from {source['name']} ({os.path.getsize(model_path) / (1024*1024):.1f} MB)") return else: print(f"❌ Downloaded file seems incomplete (size: {os.path.getsize(model_path) if os.path.exists(model_path) else 0} bytes)") if os.path.exists(model_path): os.remove(model_path) except Exception as e: print(f"❌ Failed to download from {source['name']}: {str(e)}") if os.path.exists(model_path): os.remove(model_path) continue raise Exception( "❌ Failed to download inswapper_128.onnx from all sources.\n\n" "Please upload the model file manually:\n" "1. Download from: https://huggingface.co/ezioruan/inswapper_128.onnx/resolve/main/inswapper_128.onnx\n" "2. Upload to your Space via the Files tab\n" "3. Name it: inswapper_128.onnx" ) def __init_apps(self): assets_dir = ensure_available('models', 'buffalo_l', root='~/.insightface') model_path = os.path.join(assets_dir, 'det_10g.onnx') sess_face = rt.InferenceSession(model_path, self.sess_options, providers=self.providers) self.face_detector = SCRFD(model_path,sess_face) self.face_detector.prepare(0,input_size=(640, 640)) model_path = os.path.join(assets_dir , 'w600k_r50.onnx') sess_rec = rt.InferenceSession(model_path, self.sess_options, providers=self.providers) self.rec_app = ArcFaceONNX(model_path,sess_rec) self.rec_app.prepare(0) model_path = 'inswapper_128.onnx' # Download model if it doesn't exist self.__download_model(model_path) sess_swap = rt.InferenceSession(model_path, self.sess_options, providers=self.providers) self.face_swapper = INSwapper(model_path,sess_swap) def prepare_faces(self, faces): self.replacement_faces=[] for face in faces: #image1 = cv2.imread(face.origin) if "origin" in face: face_threshold = face['threshold'] bboxes1, kpss1 = self.face_detector.autodetect(face['origin'], max_num=1) if len(kpss1)<1: raise Exception('No face detected on "Face to replace" image') feat_original = self.rec_app.get(face['origin'], kpss1[0]) else: face_threshold = 0 self.first_face = True feat_original = None print('No origin image: First face change') #image2 = cv2.imread(face.destination) _faces = self.__get_faces(face['destination'],max_num=1) if len(_faces)<1: raise Exception('No face detected on "Destination face" image') self.replacement_faces.append((feat_original,_faces[0],face_threshold)) def __convert_video(self,video_path,output_video_path): if self.video_has_audio: print("Merging audio with the refaced video...") new_path = output_video_path + str(random.randint(0,999)) + "_c.mp4" #stream = ffmpeg.input(output_video_path) in1 = ffmpeg.input(output_video_path) in2 = ffmpeg.input(video_path) out = ffmpeg.output(in1.video, in2.audio, new_path,video_bitrate=self.ffmpeg_video_bitrate,vcodec=self.ffmpeg_video_encoder) out.run(overwrite_output=True,quiet=True) else: new_path = output_video_path print("The video doesn't have audio, so post-processing is not necessary") print(f"The process has finished.\nThe refaced video can be found at {os.path.abspath(new_path)}") return new_path def __get_faces(self,frame,max_num=0): bboxes, kpss = self.face_detector.detect(frame,max_num=max_num,metric='default') if bboxes.shape[0] == 0: return [] ret = [] for i in range(bboxes.shape[0]): bbox = bboxes[i, 0:4] det_score = bboxes[i, 4] kps = None if kpss is not None: kps = kpss[i] face = Face(bbox=bbox, kps=kps, det_score=det_score) face.embedding = self.rec_app.get(frame, kps) ret.append(face) return ret def __compute_iou(self, box1, box2): """Compute Intersection over Union for face tracking""" x1 = max(box1[0], box2[0]) y1 = max(box1[1], box2[1]) x2 = min(box1[2], box2[2]) y2 = min(box1[3], box2[3]) intersection = max(0, x2 - x1) * max(0, y2 - y1) area1 = (box1[2] - box1[0]) * (box1[3] - box1[1]) area2 = (box2[2] - box2[0]) * (box2[3] - box2[1]) union = area1 + area2 - intersection return intersection / union if union > 0 else 0 def __enhance_face_gfpgan(self, swapped_face, bbox): """Enhance face quality using GFPGAN""" if not self.enable_face_enhancement or self.face_enhancer is None: return swapped_face try: x1, y1, x2, y2 = map(int, bbox) x1, y1 = max(0, x1), max(0, y1) x2, y2 = min(swapped_face.shape[1], x2), min(swapped_face.shape[0], y2) if x2 <= x1 or y2 <= y1: return swapped_face # Extract face region face_region = swapped_face[y1:y2, x1:x2].copy() # Enhance with GFPGAN _, _, enhanced_face = self.face_enhancer.enhance( face_region, has_aligned=False, only_center_face=True, paste_back=True ) if enhanced_face is not None: # Create result image result = swapped_face.copy() result[y1:y2, x1:x2] = enhanced_face return result else: return swapped_face except Exception as e: print(f"GFPGAN enhancement failed: {e}") return swapped_face def __color_correct_face(self, swapped_face, target_face, bbox): """Apply color correction to match lighting and skin tone""" try: x1, y1, x2, y2 = map(int, bbox) x1, y1 = max(0, x1), max(0, y1) x2, y2 = min(swapped_face.shape[1], x2), min(swapped_face.shape[0], y2) if x2 <= x1 or y2 <= y1: return swapped_face # Work on a copy to avoid modifying original result = swapped_face.copy() # Extract face regions swapped_region = result[y1:y2, x1:x2].copy() target_region = target_face[y1:y2, x1:x2] if swapped_region.size == 0 or target_region.size == 0: return swapped_face # Calculate mean and std for each channel for i in range(3): # BGR channels swapped_mean, swapped_std = cv2.meanStdDev(swapped_region[:,:,i]) target_mean, target_std = cv2.meanStdDev(target_region[:,:,i]) # Avoid division by zero if swapped_std[0][0] > 1: # Only if there's enough variance # Match the color distribution (subtle adjustment) factor = min(target_std[0][0] / swapped_std[0][0], 1.5) # Limit adjustment swapped_region[:,:,i] = np.clip( (swapped_region[:,:,i] - swapped_mean[0][0]) * factor * 0.5 + swapped_mean[0][0] * 0.5 + target_mean[0][0] * 0.5, 0, 255 ).astype(np.uint8) # Put corrected region back result[y1:y2, x1:x2] = swapped_region return result except Exception as e: print(f"Color correction failed: {e}") return swapped_face def __seamless_blend(self, swapped_face, target_face, bbox): """Apply seamless cloning for better edge integration""" try: x1, y1, x2, y2 = map(int, bbox) x1, y1 = max(0, x1), max(0, y1) x2, y2 = min(swapped_face.shape[1], x2), min(swapped_face.shape[0], y2) if x2 <= x1 or y2 <= y1: return swapped_face # Create center point for seamless clone center_x = (x1 + x2) // 2 center_y = (y1 + y2) // 2 center = (center_x, center_y) # Create mask for the face region mask = np.zeros(target_face.shape[:2], dtype=np.uint8) # Create elliptical mask for more natural blending width = x2 - x1 height = y2 - y1 cv2.ellipse(mask, center, (width//2, height//2), 0, 0, 360, 255, -1) # Apply Gaussian blur to mask for softer edges mask = cv2.GaussianBlur(mask, (15, 15), 0) # Use cv2.seamlessClone for better blending try: result = cv2.seamlessClone(swapped_face, target_face, mask, center, cv2.NORMAL_CLONE) return result except: # Fallback to alpha blending if seamless clone fails mask_3channel = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR) / 255.0 result = (swapped_face * mask_3channel + target_face * (1 - mask_3channel)).astype(np.uint8) return result except Exception as e: print(f"Seamless blending failed: {e}") return swapped_face def __temporal_smooth(self, current_frame): """Apply temporal smoothing to reduce frame-to-frame jitter""" if not self.enable_temporal_blend or self.prev_blended_frame is None: self.prev_blended_frame = current_frame.copy() return current_frame try: # Blend with previous frame for smoothness alpha = self.temporal_blend_alpha smoothed = cv2.addWeighted( current_frame, 1 - alpha, self.prev_blended_frame, alpha, 0 ) self.prev_blended_frame = smoothed.copy() return smoothed except: self.prev_blended_frame = current_frame.copy() return current_frame def __enhance_quality(self, swapped_frame, original_frame, bbox): """Apply all quality enhancements to the swapped frame""" result = swapped_frame.copy() # 1. GFPGAN face enhancement (if available) if self.enable_face_enhancement: try: result = self.__enhance_face_gfpgan(result, bbox) except Exception as e: print(f"Skipping GFPGAN enhancement: {e}") pass # 2. Subtle color correction to match lighting (optional, conservative) if self.enable_color_correction: try: result = self.__color_correct_face(result, original_frame, bbox) except Exception as e: print(f"Skipping color correction: {e}") pass # 3. Skip seamless blending - INSwapper already handles this # The seamless_clone was causing black backgrounds # 4. Light sharpening only if needed try: # Very subtle sharpening to maintain detail kernel = np.array([[0, -0.25, 0], [-0.25, 2, -0.25], [0, -0.25, 0]]) sharpened = cv2.filter2D(result, -1, kernel) # Blend 30% sharpened with 70% original result = cv2.addWeighted(result, 0.7, sharpened, 0.3, 0) except Exception as e: print(f"Skipping sharpening: {e}") pass # 5. Temporal smoothing for motion stability try: result = self.__temporal_smooth(result) except Exception as e: print(f"Skipping temporal smoothing: {e}") pass return result def process_first_face(self,frame): faces = self.__get_faces(frame,max_num=1) # Aggressive anti-flicker: handle no detection or weak detection if len(faces) == 0: # No face detected - check if we have stable swap history if 'first_face' in self.face_memory: self.face_memory['first_face']['frames_missing'] += 1 # During occlusion/fast motion, use last good swap to maintain stability if self.face_memory['first_face']['frames_missing'] <= self.occlusion_tolerance: if self.last_swapped_frame is not None and self.stable_swap_count > 3: # Use cached swap result for stability return self.last_swapped_frame.copy() return frame # Skip swapping but show original else: # Face gone too long, reset del self.face_memory['first_face'] self.last_swapped_frame = None self.stable_swap_count = 0 return frame # Face detected - determine if we should swap if len(faces) != 0: # Check if tracked face (motion consistency) tracked = False iou = 0 if 'first_face' in self.face_memory and len(self.prev_faces) > 0: iou = self.__compute_iou(faces[0].bbox, self.prev_faces[0].bbox) # Very lenient tracking for fast motion if iou > self.face_tracking_threshold: tracked = True # Decision: swap if good confidence OR tracked OR stable history should_swap = False if faces[0].det_score > 0.5: should_swap = True # High confidence elif tracked and faces[0].det_score > 0.3: should_swap = True # Tracked with reasonable confidence elif self.stable_swap_count > 5 and faces[0].det_score > 0.25: should_swap = True # Stable history, accept lower confidence if should_swap: # Perform basic swap swapped_frame = self.face_swapper.get(frame.copy(), faces[0], self.replacement_faces[0][1], paste_back=True) # Apply quality enhancements swapped_frame = self.__enhance_quality(swapped_frame, frame, faces[0].bbox) # Cache this successful swap self.last_swapped_frame = swapped_frame.copy() self.stable_swap_count += 1 # Update memory self.face_memory['first_face'] = { 'bbox': faces[0].bbox, 'confidence': faces[0].det_score, 'frames_missing': 0 } self.prev_faces = [faces[0]] return swapped_frame else: # Low confidence, not tracked - use cached if available if self.last_swapped_frame is not None and self.stable_swap_count > 3: return self.last_swapped_frame.copy() # Otherwise maintain tracking but don't swap self.prev_faces = [faces[0]] return frame return frame def process_faces(self,frame): faces = self.__get_faces(frame,max_num=0) # Handle no faces detected - use cached swap if stable if len(faces) == 0: if self.last_swapped_frame is not None and self.stable_swap_count > 5: return self.last_swapped_frame.copy() return frame # Aggressive temporal smoothing with motion tracking matched_faces = [] for face in faces: best_match = None best_iou = 0 face_id = None # Try to match with previous frame faces - lenient for fast motion for prev_idx, prev_face in enumerate(self.prev_faces): iou = self.__compute_iou(face.bbox, prev_face.bbox) if iou > best_iou and iou > self.face_tracking_threshold: best_iou = iou best_match = prev_face face_id = prev_idx matched_faces.append((face, best_match, best_iou, face_id)) # Process face swapping with aggressive stability swapped = False for rep_face in self.replacement_faces: for i in range(len(faces) - 1, -1, -1): matched_info = matched_faces[i] if i < len(matched_faces) else (faces[i], None, 0, None) current_face = matched_info[0] prev_match = matched_info[1] iou_score = matched_info[2] # Very aggressive threshold reduction for tracked faces effective_threshold = rep_face[2] if prev_match is not None: if iou_score > 0.3: # Tracked with some overlap effective_threshold = max(0, rep_face[2] - 0.25) # Major boost elif iou_score > 0.1: # Weak tracking (fast motion) effective_threshold = max(0, rep_face[2] - 0.15) # More lenient confidence check for tracked faces min_confidence = 0.5 if prev_match is not None and iou_score > 0.2: min_confidence = 0.25 # Much lower for tracked elif self.stable_swap_count > 5: min_confidence = 0.35 # Lower if we have stable history if current_face.det_score < min_confidence: continue # Compute face similarity sim = self.rec_app.compute_sim(rep_face[0], faces[i].embedding) # Perform swap if similarity meets threshold if sim >= effective_threshold: # Perform basic swap temp_frame = self.face_swapper.get(frame.copy(), faces[i], rep_face[1], paste_back=True) # Apply quality enhancements frame = self.__enhance_quality(temp_frame, frame, current_face.bbox) swapped = True del faces[i] break # Update tracking state if swapped: self.last_swapped_frame = frame.copy() self.stable_swap_count += 1 else: self.stable_swap_count = max(0, self.stable_swap_count - 1) # Store current faces for next frame tracking self.prev_faces = self.__get_faces(frame, max_num=0) return frame def __check_video_has_audio(self,video_path): self.video_has_audio = False probe = ffmpeg.probe(video_path) audio_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'audio'), None) if audio_stream is not None: self.video_has_audio = True def reface_group(self, faces, frames, output): # Sequential processing to maintain temporal consistency # Parallel processing breaks face tracking and causes flickering process_func = self.process_first_face if self.first_face else self.process_faces for frame in tqdm(frames, desc="Processing frames"): result = process_func(frame) output.write(result) def reface(self, video_path, faces): self.__check_video_has_audio(video_path) output_video_path = os.path.join('out',Path(video_path).name) self.prepare_faces(faces) # Reset all temporal tracking for new video self.prev_faces = [] self.face_memory = {} self.last_swapped_frame = None self.stable_swap_count = 0 self.prev_blended_frame = None # Reset temporal smoothing cap = cv2.VideoCapture(video_path) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) print(f"Total frames: {total_frames}") fps = cap.get(cv2.CAP_PROP_FPS) frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fourcc = cv2.VideoWriter_fourcc(*'mp4v') output = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height)) frames=[] self.k = 1 with tqdm(total=total_frames,desc="Extracting frames") as pbar: while cap.isOpened(): flag, frame = cap.read() if flag and len(frame)>0: frames.append(frame.copy()) pbar.update() else: break if (len(frames) > 1000): self.reface_group(faces,frames,output) frames=[] cap.release() pbar.close() self.reface_group(faces,frames,output) frames=[] output.release() return self.__convert_video(video_path,output_video_path) def __try_ffmpeg_encoder(self, vcodec): print(f"Trying FFMPEG {vcodec} encoder") command = ['ffmpeg', '-y', '-f','lavfi','-i','testsrc=duration=1:size=1280x720:rate=30','-vcodec',vcodec,'testsrc.mp4'] try: subprocess.run(command, check=True, capture_output=True).stderr except subprocess.CalledProcessError as e: print(f"FFMPEG {vcodec} encoder doesn't work -> Disabled.") return False print(f"FFMPEG {vcodec} encoder works") return True def __check_encoders(self): self.ffmpeg_video_encoder='libx264' self.ffmpeg_video_bitrate='0' pattern = r"encoders: ([a-zA-Z0-9_]+(?: [a-zA-Z0-9_]+)*)" command = ['ffmpeg', '-codecs', '--list-encoders'] commandout = subprocess.run(command, check=True, capture_output=True).stdout result = commandout.decode('utf-8').split('\n') for r in result: if "264" in r: encoders = re.search(pattern, r).group(1).split(' ') for v_c in Refacer.VIDEO_CODECS: for v_k in encoders: if v_c == v_k: if self.__try_ffmpeg_encoder(v_k): self.ffmpeg_video_encoder=v_k self.ffmpeg_video_bitrate=Refacer.VIDEO_CODECS[v_k] print(f"Video codec for FFMPEG: {self.ffmpeg_video_encoder}") return VIDEO_CODECS = { 'h264_videotoolbox':'0', #osx HW acceleration 'h264_nvenc':'0', #NVIDIA HW acceleration #'h264_qsv', #Intel HW acceleration #'h264_vaapi', #Intel HW acceleration #'h264_omx', #HW acceleration 'libx264':'0' #No HW acceleration }