Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def reset_api(self):
"""Reset the api connection values
"""
self.api = BigML(self.USERNAME, self.API_KEY)
self.api_dev_mode = BigML(self.USERNAME, self.API_KEY, dev_mode=True)
@before.each_feature
def setup_resources(feature):
world.api = BigML(world.USERNAME, world.API_KEY)
world.api_dev_mode = BigML(world.USERNAME, world.API_KEY, dev_mode=True)
sources = world.api.list_sources()
if sources['code'] != HTTP_OK:
assert False, ("Unable to list your sources. Please check the"
" BigML domain and credentials to be:\n\n%s" %
world.api.connection_info())
else:
assert True
world.init_sources_count = sources['meta']['total_count']
datasets = world.api.list_datasets()
assert datasets['code'] == HTTP_OK
world.init_datasets_count = datasets['meta']['total_count']
models = world.api.list_models("ensemble=false")
assert models['code'] == HTTP_OK
def create_batch_topic_distribution(topic_model, test_dataset,
batch_topic_distribution_args, args,
api=None, session_file=None,
path=None, log=None):
"""Creates remote batch topic distribution
"""
if api is None:
api = bigml.api.BigML()
message = dated("Creating batch topic distribution.\n")
log_message(message, log_file=session_file, console=args.verbosity)
batch_topic_distribution = api.create_batch_topic_distribution( \
topic_model, test_dataset, batch_topic_distribution_args, retries=None)
log_created_resources( \
"batch_topic_distribution", path,
bigml.api.get_batch_topic_distribution_id(batch_topic_distribution),
mode='a')
batch_topic_distribution_id = check_resource_error(
batch_topic_distribution,
"Failed to create batch topic distribution: ")
try:
batch_topic_distribution = check_resource( \
batch_topic_distribution, api.get_batch_topic_distribution)
except ValueError, exception:
sys.exit("Failed to get a finished batch topic distribution: %s"
def get_script(script, api=None, verbosity=True,
session_file=None):
"""Retrieves the script in its actual state
"""
if api is None:
api = bigml.api.BigML()
if (isinstance(script, basestring) or
bigml.api.get_status(script)['code'] != bigml.api.FINISHED):
message = dated("Retrieving script. %s\n" %
get_url(script))
log_message(message, log_file=session_file,
console=verbosity)
try:
script = check_resource(script, api.get_script)
except ValueError, exception:
sys.exit("Failed to get a compiled script: %s" % str(exception))
return script
def create_time_series(datasets, time_series_ids,
time_series_args,
args, api=None, path=None,
session_file=None, log=None):
"""Create remote time-series
"""
if api is None:
api = bigml.api.BigML()
time_series_set = time_series_ids[:]
existing_time_series = len(time_series_set)
time_series_args_list = []
datasets = datasets[existing_time_series:]
# if resuming and all time-series were created,
# there will be no datasets left
if datasets:
if isinstance(time_series_args, list):
time_series_args_list = time_series_args
# Only one time-series per command, at present
number_of_time_series = 1
message = dated("Creating %s time-series.\n" %
number_of_time_series)
log_message(message, log_file=session_file,
def update_topic_model(topic_model, topic_model_args,
args, api=None, path=None, session_file=None):
"""Updates topic model properties
"""
if api is None:
api = bigml.api.BigML()
message = dated("Updating topic model. %s\n" %
get_url(topic_model))
log_message(message, log_file=session_file,
console=args.verbosity)
topic_model = api.update_topic_model(topic_model, \
topic_model_args)
check_resource_error(topic_model,
"Failed to update topic model: %s"
% topic_model['resource'])
topic_model = check_resource(topic_model,
api.get_topic_model,
query_string=FIELDS_QS)
if is_shared(topic_model):
message = dated("Shared topic model link. %s\n" %
get_url(topic_model, shared=True))
from bigml.api import BigML
api = BigML()
source1 = api.create_source("iris.csv", \
{'name': u'my_source_name'})
api.ok(source1)
def get_models(model_ids, args, api=None, session_file=None):
"""Retrieves remote models in its actual status
"""
if api is None:
api = bigml.api.BigML()
model_id = ""
models = model_ids
single_model = len(model_ids) == 1
if single_model:
model_id = model_ids[0]
message = dated("Retrieving %s. %s\n" %
(plural("model", len(model_ids)),
get_url(model_id)))
log_message(message, log_file=session_file, console=args.verbosity)
if len(model_ids) < args.max_batch_models:
models = []
for model in model_ids:
try:
# if there's more than one model the first one must contain
# the entire field structure to be used as reference.
query_string = (
from bigml.api import BigML
api = BigML()
source1 = api.create_source("iris.csv")
api.ok(source1)
dataset1 = api.create_dataset(source1, \
{'name': u'iris'})
api.ok(dataset1)
anomaly1 = api.create_anomaly(dataset1, \
{'anomaly_seed': u'2c249dda00fbf54ab4cdd850532a584f286af5b6', 'name': u'iris'})
api.ok(anomaly1)
anomalyscore1 = api.create_anomaly_score(anomaly1, \
{u'petal length': 0.5,
u'petal width': 0.5,
u'sepal length': 1,
def update_time_series(time_series, time_series_args,
args, api=None, path=None, session_file=None):
"""Updates time-series properties
"""
if api is None:
api = bigml.api.BigML()
message = dated("Updating time-series. %s\n" %
get_url(time_series))
log_message(message, log_file=session_file,
console=args.verbosity)
time_series = api.update_time_series(time_series, \
time_series_args)
check_resource_error(time_series,
"Failed to update time-series: %s"
% time_series['resource'])
time_series = check_resource(time_series,
api.get_time_series,
query_string=FIELDS_QS)
if is_shared(time_series):
message = dated("Shared time-series link. %s\n" %
get_url(time_series, shared=True))