Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@requires_key(KeenKeys.READ)
def results(self, dataset_name, index_by, timeframe):
""" Retrieve results from a Cached Dataset. Read key must be set.
"""
url = "{0}/{1}/results".format(self._cached_datasets_url, dataset_name)
index_by = index_by if isinstance(index_by, str) else json.dumps(index_by)
timeframe = timeframe if isinstance(timeframe, str) else json.dumps(timeframe)
query_params = {
"index_by": index_by,
"timeframe": timeframe
}
return self._get_json(
HTTPMethods.GET, url, self._get_read_key(), params=query_params
)
def method_wrapper(self, *args, **kwargs):
for case in switch(key_type):
if case(KeenKeys.READ):
if not self._get_read_key():
_throw_key_missing(KeenKeys.READ, bool(self._get_master_key()))
break
if case(KeenKeys.WRITE):
if not self._get_write_key():
_throw_key_missing(KeenKeys.WRITE, bool(self._get_master_key()))
break
if case(KeenKeys.MASTER):
if not self._get_master_key():
_throw_key_missing(KeenKeys.MASTER, False)
break
return func(self, *args, **kwargs)
@requires_key(KeenKeys.READ)
def results(self, query_name):
"""
Gets a single saved query with a 'result' object for a project from the
Keen IO API given a query name.
Read or Master key must be set.
"""
url = "{0}/{1}/result".format(self.saved_query_url, query_name)
response = self._get_json(HTTPMethods.GET, url, self._get_read_key())
return response
@requires_key(KeenKeys.READ)
def query(self, analysis_type, params, all_keys=False):
"""
Performs a query using the Keen IO analysis API. A read key must be set first.
"""
if not self._order_by_is_valid_or_none(params):
raise ValueError("order_by given is invalid or is missing required group_by.")
if not self._limit_is_valid_or_none(params):
raise ValueError("limit given is invalid or is missing required order_by.")
url = "{0}/{1}/projects/{2}/queries/{3}".format(self.base_url, self.api_version,
self.project_id, analysis_type)
headers = utilities.headers(self.read_key)
payload = params
response = self.fulfill(HTTPMethods.GET, url, params=payload, headers=headers, timeout=self.get_timeout)
def method_wrapper(self, *args, **kwargs):
for case in switch(key_type):
if case(KeenKeys.READ):
if not self._get_read_key():
_throw_key_missing(KeenKeys.READ, bool(self._get_master_key()))
break
if case(KeenKeys.WRITE):
if not self._get_write_key():
_throw_key_missing(KeenKeys.WRITE, bool(self._get_master_key()))
break
if case(KeenKeys.MASTER):
if not self._get_master_key():
_throw_key_missing(KeenKeys.MASTER, False)
break
return func(self, *args, **kwargs)