Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from .plugins import Clean, Create, Link, Shell
plugin_paths = []
for directory in plugin_directories:
for plugin_path in glob.glob(os.path.join(directory, '*.py')):
plugin_paths.append(plugin_path)
for plugin_path in options.plugins:
plugin_paths.append(plugin_path)
for plugin_path in plugin_paths:
abspath = os.path.abspath(plugin_path)
module.load(abspath)
if not options.config_file:
log.error('No configuration file specified')
exit(1)
tasks = read_config(options.config_file)
if not isinstance(tasks, list):
raise ReadingError('Configuration file must be a list of tasks')
if options.base_directory:
base_directory = options.base_directory
else:
# default to directory of config file
base_directory = os.path.dirname(os.path.realpath(options.config_file))
os.chdir(base_directory)
dispatcher = Dispatcher(base_directory)
success = dispatcher.dispatch(tasks)
if success:
log.info('\n==> All tasks executed successfully')
else:
raise DispatchError('\n==> Some tasks were not executed successfully')
except (ReadingError, DispatchError) as e:
log.error('%s' % e)
exit(1)
except KeyboardInterrupt:
tasks = read_config(options.config_file)
if not isinstance(tasks, list):
raise ReadingError('Configuration file must be a list of tasks')
if options.base_directory:
base_directory = options.base_directory
else:
# default to directory of config file
base_directory = os.path.dirname(os.path.realpath(options.config_file))
os.chdir(base_directory)
dispatcher = Dispatcher(base_directory)
success = dispatcher.dispatch(tasks)
if success:
log.info('\n==> All tasks executed successfully')
else:
raise DispatchError('\n==> Some tasks were not executed successfully')
except (ReadingError, DispatchError) as e:
log.error('%s' % e)
exit(1)
except KeyboardInterrupt:
log.error('\n==> Operation aborted')
exit(1)
def _read(self, config_file_path):
try:
_, ext = os.path.splitext(config_file_path)
with open(config_file_path) as fin:
if ext == '.json':
data = json.load(fin)
else:
data = yaml.safe_load(fin)
return data
except Exception as e:
msg = string.indent_lines(str(e))
raise ReadingError('Could not read config file:\n%s' % msg)