From 30f3e952f6fa6eca1b97dfa1578ab20e3cf18b77 Mon Sep 17 00:00:00 2001 From: henryruhs Date: Thu, 31 Oct 2024 14:56:19 +0100 Subject: [PATCH] Finalize temp path --- facefusion.ini | 1 + facefusion/program.py | 12 ++++++------ facefusion/temp_helper.py | 23 +++++++++++------------ tests/test_ffmpeg.py | 19 ++++++++----------- tests/test_temp_helper.py | 1 + 5 files changed, 27 insertions(+), 29 deletions(-) diff --git a/facefusion.ini b/facefusion.ini index 8b23a0e3..c8cbeadf 100644 --- a/facefusion.ini +++ b/facefusion.ini @@ -1,4 +1,5 @@ [paths] +temp_path = jobs_path = source_paths = target_path = diff --git a/facefusion/program.py b/facefusion/program.py index 713386c5..af3b52d7 100755 --- a/facefusion/program.py +++ b/facefusion/program.py @@ -233,8 +233,8 @@ def create_program() -> ArgumentParser: program.add_argument('-v', '--version', version = metadata.get('name') + ' ' + metadata.get('version'), action = 'version') sub_program = program.add_subparsers(dest = 'command') # general - sub_program.add_parser('run', help = wording.get('help.run'), parents = [ create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_uis_program(), collect_job_program() ], formatter_class = create_help_formatter_large) - sub_program.add_parser('headless-run', help = wording.get('help.headless_run'), parents = [ create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), collect_job_program() ], formatter_class = create_help_formatter_large) + sub_program.add_parser('run', help = wording.get('help.run'), parents = [ create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_uis_program(), collect_job_program() ], formatter_class = create_help_formatter_large) + sub_program.add_parser('headless-run', help = wording.get('help.headless_run'), parents = [ create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), collect_job_program() ], formatter_class = create_help_formatter_large) sub_program.add_parser('force-download', help = wording.get('help.force_download'), parents = [ create_log_level_program() ], formatter_class = create_help_formatter_large) # job manager sub_program.add_parser('job-list', help = wording.get('help.job_list'), parents = [ create_job_status_program(), create_jobs_path_program(), create_log_level_program() ], formatter_class = create_help_formatter_large) @@ -248,10 +248,10 @@ def create_program() -> ArgumentParser: sub_program.add_parser('job-insert-step', help = wording.get('help.job_insert_step'), parents = [ create_job_id_program(), create_step_index_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_log_level_program() ], formatter_class = create_help_formatter_large) sub_program.add_parser('job-remove-step', help = wording.get('help.job_remove_step'), parents = [ create_job_id_program(), create_step_index_program(), create_jobs_path_program(), create_log_level_program() ], formatter_class = create_help_formatter_large) # job runner - sub_program.add_parser('job-run', help = wording.get('help.job_run'), parents = [ create_job_id_program(), create_config_path_program(), create_jobs_path_program(), collect_job_program() ], formatter_class = create_help_formatter_large) - sub_program.add_parser('job-run-all', help = wording.get('help.job_run_all'), parents = [ create_config_path_program(), create_jobs_path_program(), collect_job_program() ], formatter_class = create_help_formatter_large) - sub_program.add_parser('job-retry', help = wording.get('help.job_retry'), parents = [ create_job_id_program(), create_config_path_program(), create_jobs_path_program(), collect_job_program() ], formatter_class = create_help_formatter_large) - sub_program.add_parser('job-retry-all', help = wording.get('help.job_retry_all'), parents = [ create_config_path_program(), create_jobs_path_program(), collect_job_program() ], formatter_class = create_help_formatter_large) + sub_program.add_parser('job-run', help = wording.get('help.job_run'), parents = [ create_job_id_program(), create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), collect_job_program() ], formatter_class = create_help_formatter_large) + sub_program.add_parser('job-run-all', help = wording.get('help.job_run_all'), parents = [ create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), collect_job_program() ], formatter_class = create_help_formatter_large) + sub_program.add_parser('job-retry', help = wording.get('help.job_retry'), parents = [ create_job_id_program(), create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), collect_job_program() ], formatter_class = create_help_formatter_large) + sub_program.add_parser('job-retry-all', help = wording.get('help.job_retry_all'), parents = [ create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), collect_job_program() ], formatter_class = create_help_formatter_large) return ArgumentParser(parents = [ program ], formatter_class = create_help_formatter_small, add_help = True) diff --git a/facefusion/temp_helper.py b/facefusion/temp_helper.py index 2f761bb4..5e5c69df 100644 --- a/facefusion/temp_helper.py +++ b/facefusion/temp_helper.py @@ -1,6 +1,5 @@ import glob import os -import tempfile from typing import List from facefusion import state_manager @@ -18,19 +17,9 @@ def move_temp_file(file_path : str, move_path : str) -> bool: return move_file(temp_file_path, move_path) -def get_temp_frame_paths(target_path : str) -> List[str]: - temp_frames_pattern = get_temp_frames_pattern(target_path, '*') - return sorted(glob.glob(temp_frames_pattern)) - - -def get_temp_frames_pattern(target_path : str, temp_frame_prefix : str) -> str: - temp_directory_path = get_temp_directory_path(target_path) - return os.path.join(temp_directory_path, temp_frame_prefix + '.' + state_manager.get_item('temp_frame_format')) - - def get_temp_directory_path(file_path : str) -> str: temp_file_name, _ = os.path.splitext(os.path.basename(file_path)) - return os.path.join(tempfile.gettempdir(), 'facefusion', temp_file_name) + return os.path.join(state_manager.get_item('temp_path'), 'facefusion', temp_file_name) def create_temp_directory(file_path : str) -> bool: @@ -43,3 +32,13 @@ def clear_temp_directory(file_path : str) -> bool: temp_directory_path = get_temp_directory_path(file_path) return remove_directory(temp_directory_path) return True + + +def get_temp_frame_paths(target_path : str) -> List[str]: + temp_frames_pattern = get_temp_frames_pattern(target_path, '*') + return sorted(glob.glob(temp_frames_pattern)) + + +def get_temp_frames_pattern(target_path : str, temp_frame_prefix : str) -> str: + temp_directory_path = get_temp_directory_path(target_path) + return os.path.join(temp_directory_path, temp_frame_prefix + '.' + state_manager.get_item('temp_frame_format')) diff --git a/tests/test_ffmpeg.py b/tests/test_ffmpeg.py index 52761656..fa8e014a 100644 --- a/tests/test_ffmpeg.py +++ b/tests/test_ffmpeg.py @@ -1,5 +1,5 @@ -import glob import subprocess +import tempfile import pytest @@ -7,7 +7,7 @@ from facefusion import process_manager, state_manager from facefusion.download import conditional_download from facefusion.ffmpeg import concat_video, extract_frames, read_audio_buffer, replace_audio, restore_audio from facefusion.filesystem import copy_file -from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_directory_path, get_temp_file_path +from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_file_path, get_temp_frame_paths from .helper import get_test_example_file, get_test_examples_directory, get_test_output_file, prepare_test_output_directory @@ -26,7 +26,8 @@ def before_all() -> None: subprocess.run([ 'ffmpeg', '-i', get_test_example_file('target-240p.mp4'), '-vf', 'fps=60', get_test_example_file('target-240p-60fps.mp4') ]) subprocess.run([ 'ffmpeg', '-i', get_test_example_file('source.mp3'), '-i', get_test_example_file('target-240p.mp4'), '-ar', '16000', get_test_example_file('target-240p-16khz.mp4') ]) subprocess.run([ 'ffmpeg', '-i', get_test_example_file('source.mp3'), '-i', get_test_example_file('target-240p.mp4'), '-ar', '48000', get_test_example_file('target-240p-48khz.mp4') ]) - state_manager.init_item('temp_frame_format', 'jpg') + state_manager.init_item('temp_path', tempfile.gettempdir()) + state_manager.init_item('temp_frame_format', 'png') state_manager.init_item('output_audio_encoder', 'aac') @@ -46,11 +47,10 @@ def test_extract_frames() -> None: ] for target_path in target_paths: - temp_directory_path = get_temp_directory_path(target_path) create_temp_directory(target_path) assert extract_frames(target_path, '452x240', 30.0) is True - assert len(glob.glob1(temp_directory_path, '*.jpg')) == 324 + assert len(get_temp_frame_paths(target_path)) == 324 clear_temp_directory(target_path) @@ -65,11 +65,10 @@ def test_extract_frames_with_trim_start() -> None: ] for target_path, frame_total in target_paths: - temp_directory_path = get_temp_directory_path(target_path) create_temp_directory(target_path) assert extract_frames(target_path, '452x240', 30.0) is True - assert len(glob.glob1(temp_directory_path, '*.jpg')) == frame_total + assert len(get_temp_frame_paths(target_path)) == frame_total clear_temp_directory(target_path) @@ -85,11 +84,10 @@ def test_extract_frames_with_trim_start_and_trim_end() -> None: ] for target_path, frame_total in target_paths: - temp_directory_path = get_temp_directory_path(target_path) create_temp_directory(target_path) assert extract_frames(target_path, '452x240', 30.0) is True - assert len(glob.glob1(temp_directory_path, '*.jpg')) == frame_total + assert len(get_temp_frame_paths(target_path)) == frame_total clear_temp_directory(target_path) @@ -104,11 +102,10 @@ def test_extract_frames_with_trim_end() -> None: ] for target_path, frame_total in target_paths: - temp_directory_path = get_temp_directory_path(target_path) create_temp_directory(target_path) assert extract_frames(target_path, '426x240', 30.0) is True - assert len(glob.glob1(temp_directory_path, '*.jpg')) == frame_total + assert len(get_temp_frame_paths(target_path)) == frame_total clear_temp_directory(target_path) diff --git a/tests/test_temp_helper.py b/tests/test_temp_helper.py index 48aad129..6903d2ca 100644 --- a/tests/test_temp_helper.py +++ b/tests/test_temp_helper.py @@ -15,6 +15,7 @@ def before_all() -> None: [ 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/target-240p.mp4' ]) + state_manager.init_item('temp_path', tempfile.gettempdir()) state_manager.init_item('temp_frame_format', 'png')