Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _error_handling(self, res):
"""
Helper function to do the error handling
:params res: the response from a request
"""
# making the error handling generic so if an status_code starting with 2 doesn't exist, we raise the error
if res.status_code // 100 != 2:
error = self._get_response_json(res)
raise exceptions.KeenApiError(error)
def _throw_key_missing(key, relying_on_master):
message = ("The Keen IO API requires a {0} key to perform queries. "
"Please set a '{0}_key' when initializing the "
"KeenApi object.")
if relying_on_master:
message += ' The "master_key" is set, but one should prefer the key with least privilege.'
raise exceptions.InvalidEnvironmentError(message.format(key))
"""
super(KeenClient, self).__init__()
# do some validation
self.check_project_id(project_id)
# Set up an api client to be used for querying and optionally passed
# into a default persistence strategy.
self.api = api_class(project_id, write_key=write_key, read_key=read_key,
get_timeout=get_timeout, post_timeout=post_timeout,
master_key=master_key, base_url=base_url)
if persistence_strategy:
# validate the given persistence strategy
if not isinstance(persistence_strategy, BasePersistenceStrategy):
raise exceptions.InvalidPersistenceStrategyError()
if not persistence_strategy:
# setup a default persistence strategy
persistence_strategy = persistence_strategies \
.DirectPersistenceStrategy(self.api)
self.project_id = project_id
self.persistence_strategy = persistence_strategy
self.get_timeout = get_timeout
self.post_timeout = post_timeout
self.saved_queries = saved_queries.SavedQueriesInterface(self.api)
self.cached_datasets = cached_datasets.CachedDatasetsInterface(self.api)