Finalize temp path

This commit is contained in:
henryruhs 2024-10-31 14:56:19 +01:00
parent 4566775c87
commit 30f3e952f6
5 changed files with 27 additions and 29 deletions

View File

@ -1,4 +1,5 @@
[paths]
temp_path =
jobs_path =
source_paths =
target_path =

View File

@ -233,8 +233,8 @@ def create_program() -> ArgumentParser:
program.add_argument('-v', '--version', version = metadata.get('name') + ' ' + metadata.get('version'), action = 'version')
sub_program = program.add_subparsers(dest = 'command')
# general
sub_program.add_parser('run', help = wording.get('help.run'), parents = [ create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_uis_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('headless-run', help = wording.get('help.headless_run'), parents = [ create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('run', help = wording.get('help.run'), parents = [ create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_uis_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('headless-run', help = wording.get('help.headless_run'), parents = [ create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('force-download', help = wording.get('help.force_download'), parents = [ create_log_level_program() ], formatter_class = create_help_formatter_large)
# job manager
sub_program.add_parser('job-list', help = wording.get('help.job_list'), parents = [ create_job_status_program(), create_jobs_path_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
@ -248,10 +248,10 @@ def create_program() -> ArgumentParser:
sub_program.add_parser('job-insert-step', help = wording.get('help.job_insert_step'), parents = [ create_job_id_program(), create_step_index_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-remove-step', help = wording.get('help.job_remove_step'), parents = [ create_job_id_program(), create_step_index_program(), create_jobs_path_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
# job runner
sub_program.add_parser('job-run', help = wording.get('help.job_run'), parents = [ create_job_id_program(), create_config_path_program(), create_jobs_path_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-run-all', help = wording.get('help.job_run_all'), parents = [ create_config_path_program(), create_jobs_path_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-retry', help = wording.get('help.job_retry'), parents = [ create_job_id_program(), create_config_path_program(), create_jobs_path_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-retry-all', help = wording.get('help.job_retry_all'), parents = [ create_config_path_program(), create_jobs_path_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-run', help = wording.get('help.job_run'), parents = [ create_job_id_program(), create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-run-all', help = wording.get('help.job_run_all'), parents = [ create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-retry', help = wording.get('help.job_retry'), parents = [ create_job_id_program(), create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-retry-all', help = wording.get('help.job_retry_all'), parents = [ create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
return ArgumentParser(parents = [ program ], formatter_class = create_help_formatter_small, add_help = True)

View File

@ -1,6 +1,5 @@
import glob
import os
import tempfile
from typing import List
from facefusion import state_manager
@ -18,19 +17,9 @@ def move_temp_file(file_path : str, move_path : str) -> bool:
return move_file(temp_file_path, move_path)
def get_temp_frame_paths(target_path : str) -> List[str]:
temp_frames_pattern = get_temp_frames_pattern(target_path, '*')
return sorted(glob.glob(temp_frames_pattern))
def get_temp_frames_pattern(target_path : str, temp_frame_prefix : str) -> str:
temp_directory_path = get_temp_directory_path(target_path)
return os.path.join(temp_directory_path, temp_frame_prefix + '.' + state_manager.get_item('temp_frame_format'))
def get_temp_directory_path(file_path : str) -> str:
temp_file_name, _ = os.path.splitext(os.path.basename(file_path))
return os.path.join(tempfile.gettempdir(), 'facefusion', temp_file_name)
return os.path.join(state_manager.get_item('temp_path'), 'facefusion', temp_file_name)
def create_temp_directory(file_path : str) -> bool:
@ -43,3 +32,13 @@ def clear_temp_directory(file_path : str) -> bool:
temp_directory_path = get_temp_directory_path(file_path)
return remove_directory(temp_directory_path)
return True
def get_temp_frame_paths(target_path : str) -> List[str]:
temp_frames_pattern = get_temp_frames_pattern(target_path, '*')
return sorted(glob.glob(temp_frames_pattern))
def get_temp_frames_pattern(target_path : str, temp_frame_prefix : str) -> str:
temp_directory_path = get_temp_directory_path(target_path)
return os.path.join(temp_directory_path, temp_frame_prefix + '.' + state_manager.get_item('temp_frame_format'))

View File

@ -1,5 +1,5 @@
import glob
import subprocess
import tempfile
import pytest
@ -7,7 +7,7 @@ from facefusion import process_manager, state_manager
from facefusion.download import conditional_download
from facefusion.ffmpeg import concat_video, extract_frames, read_audio_buffer, replace_audio, restore_audio
from facefusion.filesystem import copy_file
from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_directory_path, get_temp_file_path
from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_file_path, get_temp_frame_paths
from .helper import get_test_example_file, get_test_examples_directory, get_test_output_file, prepare_test_output_directory
@ -26,7 +26,8 @@ def before_all() -> None:
subprocess.run([ 'ffmpeg', '-i', get_test_example_file('target-240p.mp4'), '-vf', 'fps=60', get_test_example_file('target-240p-60fps.mp4') ])
subprocess.run([ 'ffmpeg', '-i', get_test_example_file('source.mp3'), '-i', get_test_example_file('target-240p.mp4'), '-ar', '16000', get_test_example_file('target-240p-16khz.mp4') ])
subprocess.run([ 'ffmpeg', '-i', get_test_example_file('source.mp3'), '-i', get_test_example_file('target-240p.mp4'), '-ar', '48000', get_test_example_file('target-240p-48khz.mp4') ])
state_manager.init_item('temp_frame_format', 'jpg')
state_manager.init_item('temp_path', tempfile.gettempdir())
state_manager.init_item('temp_frame_format', 'png')
state_manager.init_item('output_audio_encoder', 'aac')
@ -46,11 +47,10 @@ def test_extract_frames() -> None:
]
for target_path in target_paths:
temp_directory_path = get_temp_directory_path(target_path)
create_temp_directory(target_path)
assert extract_frames(target_path, '452x240', 30.0) is True
assert len(glob.glob1(temp_directory_path, '*.jpg')) == 324
assert len(get_temp_frame_paths(target_path)) == 324
clear_temp_directory(target_path)
@ -65,11 +65,10 @@ def test_extract_frames_with_trim_start() -> None:
]
for target_path, frame_total in target_paths:
temp_directory_path = get_temp_directory_path(target_path)
create_temp_directory(target_path)
assert extract_frames(target_path, '452x240', 30.0) is True
assert len(glob.glob1(temp_directory_path, '*.jpg')) == frame_total
assert len(get_temp_frame_paths(target_path)) == frame_total
clear_temp_directory(target_path)
@ -85,11 +84,10 @@ def test_extract_frames_with_trim_start_and_trim_end() -> None:
]
for target_path, frame_total in target_paths:
temp_directory_path = get_temp_directory_path(target_path)
create_temp_directory(target_path)
assert extract_frames(target_path, '452x240', 30.0) is True
assert len(glob.glob1(temp_directory_path, '*.jpg')) == frame_total
assert len(get_temp_frame_paths(target_path)) == frame_total
clear_temp_directory(target_path)
@ -104,11 +102,10 @@ def test_extract_frames_with_trim_end() -> None:
]
for target_path, frame_total in target_paths:
temp_directory_path = get_temp_directory_path(target_path)
create_temp_directory(target_path)
assert extract_frames(target_path, '426x240', 30.0) is True
assert len(glob.glob1(temp_directory_path, '*.jpg')) == frame_total
assert len(get_temp_frame_paths(target_path)) == frame_total
clear_temp_directory(target_path)

View File

@ -15,6 +15,7 @@ def before_all() -> None:
[
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/target-240p.mp4'
])
state_manager.init_item('temp_path', tempfile.gettempdir())
state_manager.init_item('temp_frame_format', 'png')