Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_load_error(adapter):
""" Test load ffprobe exception """
with raises(SpleeterError):
adapter.load(
'Paris City Jazz',
TEST_OFFSET,
TEST_DURATION,
TEST_SAMPLE_RATE)
def test_filename_confilct():
""" Test error handling with static pattern. """
separator = Separator(TEST_CONFIGURATIONS[0][0])
with TemporaryDirectory() as directory:
with pytest.raises(SpleeterError):
separator.separate_to_file(
TEST_AUDIO_DESCRIPTOR,
directory,
filename_format='I wanna be your lover')
:param descriptor: Configuration descriptor to use for lookup.
:returns: Loaded description as dict.
:raise ValueError: If required embedded configuration does not exists.
:raise SpleeterError: If required configuration file does not exists.
"""
# Embedded configuration reading.
if descriptor.startswith(_EMBEDDED_CONFIGURATION_PREFIX):
name = descriptor[len(_EMBEDDED_CONFIGURATION_PREFIX):]
if not loader.is_resource(resources, f'{name}.json'):
raise SpleeterError(f'No embedded configuration {name} found')
with loader.open_text(resources, f'{name}.json') as stream:
return json.load(stream)
# Standard file reading.
if not exists(descriptor):
raise SpleeterError(f'Configuration file {descriptor} not found')
with open(descriptor, 'r') as stream:
return json.load(stream)
"""
try:
parser = create_argument_parser()
arguments = parser.parse_args(argv[1:])
enable_logging()
if arguments.verbose:
enable_tensorflow_logging()
if arguments.command == 'separate':
from .commands.separate import entrypoint
elif arguments.command == 'train':
from .commands.train import entrypoint
elif arguments.command == 'evaluate':
from .commands.evaluate import entrypoint
params = load_configuration(arguments.configuration)
entrypoint(arguments, params)
except SpleeterError as e:
get_logger().error(e)
"""
waveform, _ = audio_adapter.load(
audio_descriptor,
offset=offset,
duration=duration,
sample_rate=self._sample_rate)
sources = self.separate(waveform)
filename = splitext(basename(audio_descriptor))[0]
generated = []
for instrument, data in sources.items():
path = join(destination, filename_format.format(
filename=filename,
instrument=instrument,
codec=codec))
if path in generated:
raise SpleeterError((
f'Separated source path conflict : {path},'
'please check your filename format'))
generated.append(path)
task = self._pool.apply_async(audio_adapter.save, (
path,
data,
self._sample_rate,
codec,
bitrate))
self._tasks.append(task)
if synchronous:
self.join()