added progress bar to ffmpeg extract and merge

This commit is contained in:
chuckkay 2024-09-07 08:01:56 -04:00 committed by GitHub
parent 96282f192f
commit ea992e4f92
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,175 +1,244 @@
import os import os
import subprocess import subprocess
import tempfile import tempfile
from typing import List, Optional from typing import List, Optional
from tqdm import tqdm # Import tqdm for progress bar
import filetype import filetype
import re
from facefusion import logger, process_manager, state_manager from facefusion import logger, process_manager, state_manager
from facefusion.filesystem import remove_file from facefusion.filesystem import remove_file
from facefusion.temp_helper import get_temp_file_path, get_temp_frames_pattern from facefusion.temp_helper import get_temp_file_path, get_temp_frames_pattern
from facefusion.typing import AudioBuffer, Fps, OutputVideoPreset from facefusion.typing import AudioBuffer, Fps, OutputVideoPreset
from facefusion.vision import restrict_video_fps from facefusion.vision import count_video_frame_total, restrict_video_fps
def run_ffmpeg(args : List[str]) -> subprocess.Popen[bytes]: def run_ffmpeg(args: List[str]) -> subprocess.Popen[bytes]:
commands = [ 'ffmpeg', '-hide_banner', '-loglevel', 'error' ] commands = ['ffmpeg', '-hide_banner', '-loglevel', 'error']
commands.extend(args) commands.extend(args)
process = subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE) process = subprocess.Popen(commands, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
while process_manager.is_processing(): while process_manager.is_processing():
try: try:
if state_manager.get_item('log_level') == 'debug': if state_manager.get_item('log_level') == 'debug':
log_debug(process) log_debug(process)
process.wait(timeout = 0.5) process.wait(timeout=0.5)
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
continue continue
return process return process
if process_manager.is_stopping(): if process_manager.is_stopping():
process.terminate() process.terminate()
return process return process
def open_ffmpeg(args : List[str]) -> subprocess.Popen[bytes]: def open_ffmpeg(args: List[str]) -> subprocess.Popen[bytes]:
commands = [ 'ffmpeg', '-hide_banner', '-loglevel', 'quiet' ] commands = ['ffmpeg', '-hide_banner', '-loglevel', 'quiet']
commands.extend(args) commands.extend(args)
return subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE) return subprocess.Popen(commands, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
def log_debug(process : subprocess.Popen[bytes]) -> None: def log_debug(process: subprocess.Popen[bytes]) -> None:
_, stderr = process.communicate() _, stderr = process.communicate()
errors = stderr.decode().split(os.linesep) errors = stderr.decode().split(os.linesep)
for error in errors: for error in errors:
if error.strip(): if error.strip():
logger.debug(error.strip(), __name__) logger.debug(error.strip(), __name__)
def extract_frames(target_path : str, temp_video_resolution : str, temp_video_fps : Fps) -> bool: def extract_frames(target_path: str, temp_video_resolution: str, temp_video_fps: Fps) -> bool:
trim_frame_start = state_manager.get_item('trim_frame_start') trim_frame_start = state_manager.get_item('trim_frame_start')
trim_frame_end = state_manager.get_item('trim_frame_end') trim_frame_end = state_manager.get_item('trim_frame_end')
temp_frames_pattern = get_temp_frames_pattern(target_path, '%08d') temp_frames_pattern = get_temp_frames_pattern(target_path, '%08d')
commands = [ '-i', target_path, '-s', str(temp_video_resolution), '-q:v', '0' ] commands = ['-i', target_path, '-s', str(temp_video_resolution), '-q:v', '0']
if isinstance(trim_frame_start, int) and isinstance(trim_frame_end, int): if isinstance(trim_frame_start, int) and isinstance(trim_frame_end, int):
commands.extend([ '-vf', 'trim=start_frame=' + str(trim_frame_start) + ':end_frame=' + str(trim_frame_end) + ',fps=' + str(temp_video_fps) ]) commands.extend(['-vf', 'trim=start_frame=' + str(trim_frame_start) + ':end_frame=' + str(trim_frame_end) + ',fps=' + str(temp_video_fps)])
elif isinstance(trim_frame_start, int): frame_count = (trim_frame_end - trim_frame_start)
commands.extend([ '-vf', 'trim=start_frame=' + str(trim_frame_start) + ',fps=' + str(temp_video_fps) ]) elif isinstance(trim_frame_start, int):
elif isinstance(trim_frame_end, int): commands.extend(['-vf', 'trim=start_frame=' + str(trim_frame_start) + ',fps=' + str(temp_video_fps)])
commands.extend([ '-vf', 'trim=end_frame=' + str(trim_frame_end) + ',fps=' + str(temp_video_fps) ]) target_frame_count = count_video_frame_total(target_path)
else: frame_count = (target_frame_count - trim_frame_start)
commands.extend([ '-vf', 'fps=' + str(temp_video_fps) ]) elif isinstance(trim_frame_end, int):
commands.extend([ '-vsync', '0', temp_frames_pattern ]) commands.extend(['-vf', 'trim=end_frame=' + str(trim_frame_end) + ',fps=' + str(temp_video_fps)])
return run_ffmpeg(commands).returncode == 0 frame_count = trim_frame_end
else:
commands.extend(['-vf', 'fps=' + str(temp_video_fps)])
def merge_video(target_path : str, output_video_resolution : str, output_video_fps : Fps) -> bool: frame_count = count_video_frame_total(target_path)
temp_video_fps = restrict_video_fps(target_path, output_video_fps)
temp_file_path = get_temp_file_path(target_path) commands.extend(['-vsync', '0', temp_frames_pattern])
temp_frames_pattern = get_temp_frames_pattern(target_path, '%08d')
commands = [ '-r', str(temp_video_fps), '-i', temp_frames_pattern, '-s', str(output_video_resolution), '-c:v', state_manager.get_item('output_video_encoder') ] # Run ffmpeg and monitor progress
process = subprocess.Popen(['ffmpeg'] + commands, stderr=subprocess.PIPE, text=True)
if state_manager.get_item('output_video_encoder') in [ 'libx264', 'libx265' ]: pbar = tqdm(total=frame_count, desc="Extracting frames", unit = 'frame', ascii = ' =')
output_video_compression = round(51 - (state_manager.get_item('output_video_quality') * 0.51)) frame_re = re.compile(r'frame=\s*(\d+)')
commands.extend([ '-crf', str(output_video_compression), '-preset', state_manager.get_item('output_video_preset') ]) previous_frame = 0
if state_manager.get_item('output_video_encoder') in [ 'libvpx-vp9' ]:
output_video_compression = round(63 - (state_manager.get_item('output_video_quality') * 0.63)) while True:
commands.extend([ '-crf', str(output_video_compression) ]) output = process.stderr.readline()
if state_manager.get_item('output_video_encoder') in [ 'h264_nvenc', 'hevc_nvenc' ]: if output == '' and process.poll() is not None:
output_video_compression = round(51 - (state_manager.get_item('output_video_quality') * 0.51)) break
commands.extend([ '-cq', str(output_video_compression), '-preset', map_nvenc_preset(state_manager.get_item('output_video_preset')) ]) if not process_manager.is_processing():
if state_manager.get_item('output_video_encoder') in [ 'h264_amf', 'hevc_amf' ]: process.terminate()
output_video_compression = round(51 - (state_manager.get_item('output_video_quality') * 0.51)) pbar.close()
commands.extend([ '-qp_i', str(output_video_compression), '-qp_p', str(output_video_compression), '-quality', map_amf_preset(state_manager.get_item('output_video_preset')) ]) return False # Indicate the process was canceled
if state_manager.get_item('output_video_encoder') in [ 'h264_videotoolbox', 'hevc_videotoolbox' ]: if output:
commands.extend([ '-q:v', str(state_manager.get_item('output_video_quality')) ]) match = frame_re.search(output)
commands.extend([ '-vf', 'framerate=fps=' + str(output_video_fps), '-pix_fmt', 'yuv420p', '-colorspace', 'bt709', '-y', temp_file_path ]) if match:
return run_ffmpeg(commands).returncode == 0 frame = int(match.group(1))
pbar.update(frame - previous_frame)
previous_frame = frame
def concat_video(output_path : str, temp_output_paths : List[str]) -> bool:
concat_video_path = tempfile.mktemp() pbar.update(frame_count - previous_frame) # Ensure the progress bar reaches 100%
pbar.close()
with open(concat_video_path, 'w') as concat_video_file: process.wait()
for temp_output_path in temp_output_paths: return process.returncode == 0
concat_video_file.write('file \'' + os.path.abspath(temp_output_path) + '\'' + os.linesep)
concat_video_file.flush()
concat_video_file.close() def merge_video(target_path: str, output_video_resolution: str, output_video_fps: Fps) -> bool:
commands = [ '-f', 'concat', '-safe', '0', '-i', concat_video_file.name, '-c:v', 'copy', '-c:a', state_manager.get_item('output_audio_encoder'), '-y', os.path.abspath(output_path) ] temp_video_fps = restrict_video_fps(target_path, output_video_fps)
process = run_ffmpeg(commands) temp_file_path = get_temp_file_path(target_path)
process.communicate() temp_frames_pattern = get_temp_frames_pattern(target_path, '%08d')
remove_file(concat_video_path) commands = ['-r', str(temp_video_fps), '-i', temp_frames_pattern, '-s', str(output_video_resolution), '-c:v', state_manager.get_item('output_video_encoder')]
return process.returncode == 0
if state_manager.get_item('output_video_encoder') in ['libx264', 'libx265']:
output_video_compression = round(51 - (state_manager.get_item('output_video_quality') * 0.51))
def copy_image(target_path : str, temp_image_resolution : str) -> bool: commands.extend(['-crf', str(output_video_compression), '-preset', state_manager.get_item('output_video_preset')])
temp_file_path = get_temp_file_path(target_path) if state_manager.get_item('output_video_encoder') in ['libvpx-vp9']:
temp_image_compression = calc_image_compression(target_path, 100) output_video_compression = round(63 - (state_manager.get_item('output_video_quality') * 0.63))
commands = [ '-i', target_path, '-s', str(temp_image_resolution), '-q:v', str(temp_image_compression), '-y', temp_file_path ] commands.extend(['-crf', str(output_video_compression)])
return run_ffmpeg(commands).returncode == 0 if state_manager.get_item('output_video_encoder') in ['h264_nvenc', 'hevc_nvenc']:
output_video_compression = round(51 - (state_manager.get_item('output_video_quality') * 0.51))
commands.extend(['-cq', str(output_video_compression), '-preset', map_nvenc_preset(state_manager.get_item('output_video_preset'))])
def finalize_image(target_path : str, output_path : str, output_image_resolution : str) -> bool: if state_manager.get_item('output_video_encoder') in ['h264_amf', 'hevc_amf']:
temp_file_path = get_temp_file_path(target_path) output_video_compression = round(51 - (state_manager.get_item('output_video_quality') * 0.51))
output_image_compression = calc_image_compression(target_path, state_manager.get_item('output_image_quality')) commands.extend(['-qp_i', str(output_video_compression), '-qp_p', str(output_video_compression), '-quality', map_amf_preset(state_manager.get_item('output_video_preset'))])
commands = [ '-i', temp_file_path, '-s', str(output_image_resolution), '-q:v', str(output_image_compression), '-y', output_path ] if state_manager.get_item('output_video_encoder') in ['h264_videotoolbox', 'hevc_videotoolbox']:
return run_ffmpeg(commands).returncode == 0 commands.extend(['-q:v', str(state_manager.get_item('output_video_quality'))])
commands.extend(['-vf', 'framerate=fps=' + str(output_video_fps), '-pix_fmt', 'yuv420p', '-colorspace', 'bt709', '-y', temp_file_path])
def calc_image_compression(image_path : str, image_quality : int) -> int: # Calculate frame count
is_webp = filetype.guess_mime(image_path) == 'image/webp' trim_frame_start = state_manager.get_item('trim_frame_start')
if is_webp: trim_frame_end = state_manager.get_item('trim_frame_end')
image_quality = 100 - image_quality
return round(31 - (image_quality * 0.31)) if isinstance(trim_frame_start, int) and isinstance(trim_frame_end, int):
frame_count = (trim_frame_end - trim_frame_start)
elif isinstance(trim_frame_start, int):
def read_audio_buffer(target_path : str, sample_rate : int, channel_total : int) -> Optional[AudioBuffer]: target_frame_count = count_video_frame_total(target_path)
commands = [ '-i', target_path, '-vn', '-f', 's16le', '-acodec', 'pcm_s16le', '-ar', str(sample_rate), '-ac', str(channel_total), '-' ] frame_count = (target_frame_count - trim_frame_start)
process = open_ffmpeg(commands) elif isinstance(trim_frame_end, int):
audio_buffer, _ = process.communicate() frame_count = trim_frame_end
if process.returncode == 0: else:
return audio_buffer frame_count = count_video_frame_total(target_path)
return None
# Run ffmpeg and monitor progress
process = subprocess.Popen(['ffmpeg'] + commands, stderr=subprocess.PIPE, text=True)
def restore_audio(target_path : str, output_path : str, output_video_fps : Fps) -> bool: pbar = tqdm(total=frame_count, desc="Merging video", unit = 'frame', ascii = ' =')
trim_frame_start = state_manager.get_item('trim_frame_start') frame_re = re.compile(r'frame=\s*(\d+)')
trim_frame_end = state_manager.get_item('trim_frame_end') previous_frame = 0
temp_file_path = get_temp_file_path(target_path)
commands = [ '-i', temp_file_path ] while True:
output = process.stderr.readline()
if isinstance(trim_frame_start, int): if output == '' and process.poll() is not None:
start_time = trim_frame_start / output_video_fps break
commands.extend([ '-ss', str(start_time) ]) if not process_manager.is_processing():
if isinstance(trim_frame_end, int): process.terminate()
end_time = trim_frame_end / output_video_fps pbar.close()
commands.extend([ '-to', str(end_time) ]) return False # Indicate the process was canceled
commands.extend([ '-i', target_path, '-c:v', 'copy', '-c:a', state_manager.get_item('output_audio_encoder'), '-map', '0:v:0', '-map', '1:a:0', '-shortest', '-y', output_path ]) if output:
return run_ffmpeg(commands).returncode == 0 match = frame_re.search(output)
if match:
frame = int(match.group(1))
def replace_audio(target_path : str, audio_path : str, output_path : str) -> bool: pbar.update(frame - previous_frame)
temp_file_path = get_temp_file_path(target_path) previous_frame = frame
commands = [ '-i', temp_file_path, '-i', audio_path, '-c:a', state_manager.get_item('output_audio_encoder'), '-af', 'apad', '-shortest', '-y', output_path ]
return run_ffmpeg(commands).returncode == 0 pbar.update(frame_count - previous_frame) # Ensure the progress bar reaches 100%
pbar.close()
process.wait()
def map_nvenc_preset(output_video_preset : OutputVideoPreset) -> Optional[str]: return process.returncode == 0
if output_video_preset in [ 'ultrafast', 'superfast', 'veryfast', 'faster', 'fast' ]:
return 'fast' def concat_video(output_path : str, temp_output_paths : List[str]) -> bool:
if output_video_preset == 'medium': concat_video_path = tempfile.mktemp()
return 'medium'
if output_video_preset in [ 'slow', 'slower', 'veryslow' ]: with open(concat_video_path, 'w') as concat_video_file:
return 'slow' for temp_output_path in temp_output_paths:
return None concat_video_file.write('file \'' + os.path.abspath(temp_output_path) + '\'' + os.linesep)
concat_video_file.flush()
concat_video_file.close()
def map_amf_preset(output_video_preset : OutputVideoPreset) -> Optional[str]: commands = [ '-f', 'concat', '-safe', '0', '-i', concat_video_file.name, '-c:v', 'copy', '-c:a', state_manager.get_item('output_audio_encoder'), '-y', os.path.abspath(output_path) ]
if output_video_preset in [ 'ultrafast', 'superfast', 'veryfast' ]: process = run_ffmpeg(commands)
return 'speed' process.communicate()
if output_video_preset in [ 'faster', 'fast', 'medium' ]: remove_file(concat_video_path)
return 'balanced' return process.returncode == 0
if output_video_preset in [ 'slow', 'slower', 'veryslow' ]:
return 'quality'
return None def copy_image(target_path : str, temp_image_resolution : str) -> bool:
temp_file_path = get_temp_file_path(target_path)
temp_image_compression = calc_image_compression(target_path, 100)
commands = [ '-i', target_path, '-s', str(temp_image_resolution), '-q:v', str(temp_image_compression), '-y', temp_file_path ]
return run_ffmpeg(commands).returncode == 0
def finalize_image(target_path : str, output_path : str, output_image_resolution : str) -> bool:
temp_file_path = get_temp_file_path(target_path)
output_image_compression = calc_image_compression(target_path, state_manager.get_item('output_image_quality'))
commands = [ '-i', temp_file_path, '-s', str(output_image_resolution), '-q:v', str(output_image_compression), '-y', output_path ]
return run_ffmpeg(commands).returncode == 0
def calc_image_compression(image_path : str, image_quality : int) -> int:
is_webp = filetype.guess_mime(image_path) == 'image/webp'
if is_webp:
image_quality = 100 - image_quality
return round(31 - (image_quality * 0.31))
def read_audio_buffer(target_path : str, sample_rate : int, channel_total : int) -> Optional[AudioBuffer]:
commands = [ '-i', target_path, '-vn', '-f', 's16le', '-acodec', 'pcm_s16le', '-ar', str(sample_rate), '-ac', str(channel_total), '-' ]
process = open_ffmpeg(commands)
audio_buffer, _ = process.communicate()
if process.returncode == 0:
return audio_buffer
return None
def restore_audio(target_path : str, output_path : str, output_video_fps : Fps) -> bool:
trim_frame_start = state_manager.get_item('trim_frame_start')
trim_frame_end = state_manager.get_item('trim_frame_end')
temp_file_path = get_temp_file_path(target_path)
commands = [ '-i', temp_file_path ]
if isinstance(trim_frame_start, int):
start_time = trim_frame_start / output_video_fps
commands.extend([ '-ss', str(start_time) ])
if isinstance(trim_frame_end, int):
end_time = trim_frame_end / output_video_fps
commands.extend([ '-to', str(end_time) ])
commands.extend([ '-i', target_path, '-c:v', 'copy', '-c:a', state_manager.get_item('output_audio_encoder'), '-map', '0:v:0', '-map', '1:a:0', '-shortest', '-y', output_path ])
return run_ffmpeg(commands).returncode == 0
def replace_audio(target_path : str, audio_path : str, output_path : str) -> bool:
temp_file_path = get_temp_file_path(target_path)
commands = [ '-i', temp_file_path, '-i', audio_path, '-c:a', state_manager.get_item('output_audio_encoder'), '-af', 'apad', '-shortest', '-y', output_path ]
return run_ffmpeg(commands).returncode == 0
def map_nvenc_preset(output_video_preset : OutputVideoPreset) -> Optional[str]:
if output_video_preset in [ 'ultrafast', 'superfast', 'veryfast', 'faster', 'fast' ]:
return 'fast'
if output_video_preset == 'medium':
return 'medium'
if output_video_preset in [ 'slow', 'slower', 'veryslow' ]:
return 'slow'
return None
def map_amf_preset(output_video_preset : OutputVideoPreset) -> Optional[str]:
if output_video_preset in [ 'ultrafast', 'superfast', 'veryfast' ]:
return 'speed'
if output_video_preset in [ 'faster', 'fast', 'medium' ]:
return 'balanced'
if output_video_preset in [ 'slow', 'slower', 'veryslow' ]:
return 'quality'
return None