Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def delete_statistical_test(self, statistical_test):
"""Deletes a statistical test.
"""
check_resource_type(statistical_test, STATISTICAL_TEST_PATH,
message="A statistical test id is needed.")
statistical_test_id = get_statistical_test_id(statistical_test)
if statistical_test_id:
return self._delete("%s%s" % (self.url, statistical_test_id))
def update_anomaly(self, anomaly, changes):
"""Updates an anomaly detector.
"""
check_resource_type(anomaly, ANOMALY_PATH,
message="An anomaly detector id is needed.")
anomaly_id = get_anomaly_id(anomaly)
if anomaly_id:
body = json.dumps(changes)
return self._update("%s%s" % (self.url, anomaly_id), body)
def get_topic_model(self, topic_model, query_string='',
shared_username=None, shared_api_key=None):
"""Retrieves a Topic Model.
The topic_model parameter should be a string containing the
topic model ID or the dict returned by create_topic_model.
As the topic model is an evolving object that is processed
until it reaches the FINISHED or FAULTY state, the function will
return a dict that encloses the topic model values and state info
available at the time it is called.
If this is a shared topic model, the username and sharing api key
must also be provided.
"""
check_resource_type(topic_model, TOPIC_MODEL_PATH,
message="A Topic Model id is needed.")
topic_model_id = get_topic_model_id(topic_model)
if topic_model_id:
return self._get("%s%s" % (self.url, topic_model_id),
query_string=query_string,
shared_username=shared_username,
shared_api_key=shared_api_key)
def update_model(self, model, changes):
"""Updates a model.
"""
check_resource_type(model, MODEL_PATH,
message="A model id is needed.")
model_id = get_model_id(model)
if model_id:
body = json.dumps(changes)
return self._update("%s%s" % (self.url, model_id), body)
def update_pca(self, pca, changes):
"""Updates a PCA.
"""
check_resource_type(pca, PCA_PATH,
message="A PCA id is needed.")
pca_id = get_pca_id(pca)
if pca_id:
body = json.dumps(changes)
return self._update(
"%s%s" % (self.url, pca_id), body)
def error_counts(self, dataset, raise_on_error=True):
"""Returns the ids of the fields that contain errors and their number.
The dataset argument can be either a dataset resource structure
or a dataset id (that will be used to retrieve the associated
remote resource).
"""
errors_dict = {}
if not isinstance(dataset, dict) or 'object' not in dataset:
check_resource_type(dataset, DATASET_PATH,
message="A dataset id is needed.")
dataset_id = get_dataset_id(dataset)
dataset = check_resource(dataset_id, self.get_dataset,
raise_on_error=raise_on_error)
if not raise_on_error and dataset['error'] is not None:
dataset_id = None
else:
dataset_id = get_dataset_id(dataset)
if dataset_id:
errors = dataset.get('object', {}).get(
'status', {}).get('field_errors', {})
for field_id in errors:
errors_dict[field_id] = errors[field_id]['total']
return errors_dict
def delete_association_set(self, association_set):
"""Deletes an association set.
"""
check_resource_type(association_set, ASSOCIATION_SET_PATH,
message="An association set id is needed.")
association_set_id = get_association_set_id(association_set)
if association_set_id:
return self._delete("%s%s" % (self.url, association_set_id))
def delete_prediction(self, prediction):
"""Deletes a prediction.
"""
check_resource_type(prediction, PREDICTION_PATH,
message="A prediction id is needed.")
prediction_id = get_prediction_id(prediction)
if prediction_id:
return self._delete("%s%s" % (self.url, prediction_id))
def delete_anomaly_score(self, anomaly_score):
"""Deletes an anomaly_score.
"""
check_resource_type(anomaly_score, ANOMALY_SCORE_PATH,
message="An anomaly_score id is needed.")
anomaly_score_id = get_anomaly_score_id(anomaly_score)
if anomaly_score_id:
return self._delete("%s%s" % (self.url, anomaly_score_id))
def delete_model(self, model):
"""Deletes a model.
"""
check_resource_type(model, MODEL_PATH,
message="A model id is needed.")
model_id = get_model_id(model)
if model_id:
return self._delete("%s%s" % (self.url, model_id))