Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_prepare_credentials_for_rasa_x_if_rasa_channel_not_given(tmpdir: Path):
credentials_path = str(tmpdir / "credentials.yml")
io_utils.write_yaml_file({}, credentials_path)
tmp_credentials = x._prepare_credentials_for_rasa_x(
credentials_path, "http://localhost:5002"
)
actual = io_utils.read_config_file(tmp_credentials)
assert actual["rasa"]["url"] == "http://localhost:5002"
def read_global_config() -> Dict[Text, Any]:
"""Read global Rasa configuration."""
# noinspection PyBroadException
try:
return rasa.utils.io.read_config_file(GLOBAL_USER_CONFIG_PATH)
except Exception:
# if things go south we pretend there is no config
return {}
def __init__(
self,
config_file: Text,
domain_path: Optional[Text] = None,
training_data_paths: Optional[Union[List[Text], Text]] = None,
project_directory: Optional[Text] = None,
):
self.config = io_utils.read_config_file(config_file)
if domain_path:
self._domain_paths = [domain_path]
else:
self._domain_paths = []
self._story_paths = []
self._nlu_paths = []
self._imports = []
self._additional_paths = training_data_paths or []
self._project_directory = project_directory or os.path.dirname(config_file)
self._init_from_dict(self.config, self._project_directory)
extra_story_files, extra_nlu_files = data.get_core_nlu_files(
training_data_paths
)
self._story_paths += list(extra_story_files)
def __init__(
self,
config_file: Optional[Text] = None,
domain_path: Optional[Text] = None,
training_data_paths: Optional[Union[List[Text], Text]] = None,
):
if config_file and os.path.exists(config_file):
self.config = io_utils.read_config_file(config_file)
else:
self.config = {}
self._domain_path = domain_path
self.story_files, self.nlu_files = data.get_core_nlu_files(training_data_paths)
domain_path: Optional[Text] = None,
training_data_paths: Optional[Union[List[Text], Text]] = None,
):
self._domain_path = domain_path
self._story_files, self._nlu_files = data.get_core_nlu_files(
training_data_paths
)
self.core_config = {}
self.nlu_config = {}
if config_file:
if not isinstance(config_file, list): config_file = [config_file]
for file in config_file:
if not os.path.exists(file): continue
config = io_utils.read_config_file(file)
lang = config["language"]
self.core_config = {"policies": config["policies"]}
self.nlu_config[lang] = {"pipeline": config["pipeline"], "data": lang}
def _prepare_credentials_for_rasa_x(
credentials_path: Optional[Text], rasa_x_url: Optional[Text] = None
) -> Text:
credentials_path = cli_utils.get_validated_path(
credentials_path, "credentials", DEFAULT_CREDENTIALS_PATH, True
)
if credentials_path:
credentials = io_utils.read_config_file(credentials_path)
else:
credentials = {}
# this makes sure the Rasa X is properly configured no matter what
if rasa_x_url:
credentials["rasa"] = {"url": rasa_x_url}
dumped_credentials = yaml.dump(credentials, default_flow_style=False)
tmp_credentials = io_utils.create_temporary_file(dumped_credentials, "yml")
return tmp_credentials
def _prepare_credentials_for_rasa_x(
credentials_path: Optional[Text], rasa_x_url: Optional[Text] = None
) -> Text:
credentials_path = cli_utils.get_validated_path(
credentials_path, "credentials", DEFAULT_CREDENTIALS_PATH, True
)
if credentials_path:
credentials = io_utils.read_config_file(credentials_path)
else:
credentials = {}
# this makes sure the Rasa X is properly configured no matter what
if rasa_x_url:
credentials["rasa"] = {"url": rasa_x_url}
dumped_credentials = yaml.dump(credentials, default_flow_style=False)
tmp_credentials = io_utils.create_temporary_file(dumped_credentials, "yml")
return tmp_credentials
def _init_from_file(self, path: Text) -> None:
path = os.path.abspath(path)
if os.path.exists(path) and data.is_config_file(path):
config = io_utils.read_config_file(path)
parent_directory = os.path.dirname(path)
self._init_from_dict(config, parent_directory)
else:
raise_warning(f"'{path}' does not exist or is not a valid config file.")
def create_http_input_channels(
channel: Optional[Text], credentials_file: Optional[Text]
) -> List["InputChannel"]:
"""Instantiate the chosen input channel."""
if credentials_file:
all_credentials = rasa.utils.io.read_config_file(credentials_file)
else:
all_credentials = {}
if channel:
if len(all_credentials) > 1:
logger.info(
"Connecting to channel '{}' which was specified by the "
"'--connector' argument. Any other channels will be ignored. "
"To connect to all given channels, omit the '--connector' "
"argument.".format(channel)
)
return [_create_single_channel(channel, all_credentials.get(channel))]
else:
return [_create_single_channel(c, k) for c, k in all_credentials.items()]