Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.host = conf_parser.get('main', 'host')
self.access_token = conf_parser.get('main', 'access_token')
self.project_id = conf_parser.get('main', 'project_id')
self.community_id = conf_parser.get('main', 'community_id')
self.workflow_id = conf_parser.get('main', 'workflow_id')
self.locale = conf_parser.get('main', 'default_locale')
try:
# todo this try block will stop once one of them gets an exception..
self.project_name = conf_parser.get('main', 'project_name')
self.download_dir = conf_parser.get('main', 'download_folder')
self.watch_dir = conf_parser.get('main', 'watch_folder')
watch_locales = conf_parser.get('main', 'watch_locales')
self.watch_locales = set(watch_locales.split(','))
except configparser.NoOptionError:
if not self.project_name:
self.api = ApiCalls(self.host, self.access_token)
project_info = self.api.get_project_info(self.community_id)
self.project_name = project_info[self.project_id]
config_file_name, conf_parser = self.init_config_file()
log_info = 'Updated project name'
self.update_config_file('project_name', self.project_name, conf_parser, config_file_name, log_info)
else:
self.git_username = ''
self.update_config_file('git_username', self.git_username, conf_parser, config_file_name, "")
if conf_parser.has_option('main', 'git_password'):
self.git_password = conf_parser.get('main', 'git_password')
else:
self.git_password = ''
self.update_config_file('git_password', self.git_password, conf_parser, config_file_name, "")
if conf_parser.has_option('main', 'append_option'):
self.append_option = conf_parser.get('main', 'append_option')
else:
self.append_option = 'none'
self.update_config_file('append_option', self.append_option, conf_parser, config_file_name, "")
except NoOptionError as e:
if not self.project_name:
self.api = ApiCalls(self.host, self.access_token)
project_info = self.api.get_project_info(self.community_id)
self.project_name = project_info[self.project_id]
config_file_name, conf_parser = self.init_config_file()
log_info = 'Updated project name'
self.update_config_file('project_name', self.project_name, conf_parser, config_file_name, log_info)
self.download_option = 'clone'
self.download_dir = None # directory where downloaded translation will be stored
self.watch_locales = set() # if specified, add these target locales to any files in the watch folder
self.git_autocommit = None
self.git_username = ''
self.git_password = ''
self.append_option = 'none'
self.locale_folders = {}
if not self._is_initialized():
raise exceptions.UninitializedError("This project is not initialized. Please run init command.")
self._initialize_self()
self.watch = watch
self.doc_manager = DocumentManager(self.path)
self.folder_manager = FolderManager(self.path)
self.timeout = timeout
self.api = ApiCalls(self.host, self.access_token, self.watch, self.timeout)
self.git_auto = Git_Auto(self.path)
self.error_file_name = os.path.join(self.path, CONF_DIR, ERROR_FN)
def get_communities(self, host):
if self.access_token is not "":
self.apiCall = ApiCalls(host, self.access_token)
self.communities = self.apiCall.get_communities_info()
if not self.communities:
self.create_global(self.access_token, host)
community_info = self.apiCall.get_communities_info()
if not self.communities:
return None
if len(self.communities) == 0:
return None
else:
return self.communities
logger.error("\nReinit canceled")
# End Python 3
return
# confirm if deleting existing folder
if not confirm or confirm in ['n', 'N']:
return False
else:
# delete the corresponding project online
logger.info('Deleting old project folder and creating new one...')
config_file_name = os.path.join(project_path, CONF_DIR, CONF_FN)
if os.path.isfile(config_file_name):
old_config = ConfigParser()
old_config.read(config_file_name)
project_id = old_config.get('main', 'project_id')
access_token = old_config.get('main', 'access_token')
api = ApiCalls(host, access_token)
response = api.delete_project(project_id)
if response.status_code != 204 and response.status_code != 404:
try:
error = response.json()['messages'][0]
raise exceptions.RequestFailedError(error)
except (AttributeError, IndexError):
raise exceptions.RequestFailedError("Failed to delete and re-initialize project")
# delete existing folder
to_remove = os.path.join(project_path, CONF_DIR)
shutil.rmtree(to_remove)
else:
raise exceptions.ResourceNotFound("Cannot find config file, please re-initialize project")
return access_token
return True
access_token = to_init
ran_oauth = False
if not access_token:
access_token = check_global()
if not access_token or reset:
from ltk.auth import run_oauth
access_token = run_oauth(host)
ran_oauth = True
print("access_token: "+str(access_token))
if ran_oauth:
# create or overwrite global file
create_global(access_token)
api = ApiCalls(host, access_token)
# create a directory
try:
os.mkdir(os.path.join(project_path, CONF_DIR))
except OSError:
pass
logger.info('Initializing project...')
config_file_name = os.path.join(project_path, CONF_DIR, CONF_FN)
# create the config file and add info
config_file = open(config_file_name, 'w')
config_parser = configparser.ConfigParser()
config_parser.add_section('main')
config_parser.set('main', 'access_token', access_token)
config_parser.set('main', 'host', host)
# config_parser.set('main', 'root_path', project_path)
def get_workflows_solo(self, community_id, host, access_token):
self.apiCall = ApiCalls(host, access_token)
response = self.apiCall.list_workflows(community_id)
if response.status_code != 200:
raise_error(response.json(), "Failed to list workflows")
ids, titles = log_id_names(response.json())
self.workflow_info = dict(zip(ids, titles))
return self.workflow_info
self.update_config_file('default_metadata', json.dumps(self.default_metadata), conf_parser, config_file_name, "")
if conf_parser.has_option('main', 'metadata_prompt'):
self.metadata_prompt = (conf_parser.get('main', 'metadata_prompt').lower() == 'on')
else:
self.metadata_prompt = False
self.update_config_file('metadata_prompt', 'off', conf_parser, config_file_name, "")
if conf_parser.has_option('main', 'metadata_fields'):
self.metadata_fields = json.loads(conf_parser.get('main', 'metadata_fields'))
else:
self.metadata_fields = METADATA_FIELDS
self.update_config_file('metadata_fields', json.dumps(self.metadata_fields), conf_parser, config_file_name, "")
except NoOptionError as e:
if not self.project_name:
self.api = ApiCalls(self.host, self.access_token)
project_info = self.api.get_project_info(self.community_id)
self.project_name = project_info[self.project_id]
config_file_name, conf_parser = self.init_config_file()
log_info = 'Updated project name'
self.update_config_file('project_name', self.project_name, conf_parser, config_file_name, log_info)
def get_workflows_solo(self, community_id, host, access_token):
self.apiCall = ApiCalls(host, access_token)
response = self.apiCall.list_workflows(community_id)
if response.status_code != 200:
raise_error(response.json(), "Failed to list workflows")
ids, titles = log_id_names(response.json())
self.workflow_info = dict(zip(ids, titles))
return self.workflow_info