Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# this is to create context specific Metadata classes that take the
# database from the given alias at the time of use
class Metadata(Document):
"""
Metadata stores information about objects in OmegaStore
"""
# fields
#: this is the name of the data
name = StringField(unique_with=['bucket', 'prefix'])
#: bucket
bucket = StringField()
#: prefix
prefix = StringField()
#: kind of data
kind = StringField(choices=MDREGISTRY.KINDS)
#: for PANDAS_HDF and SKLEARN_JOBLIB this is the gridfile
gridfile = FileField(
db_alias='omega',
collection_name=settings().OMEGA_MONGO_COLLECTION)
#: for PANDAS_DFROWS this is the collection
collection = StringField()
#: for PYTHON_DATA this is the actual document
objid = ObjectIdField()
#: omegaml technical attributes, e.g. column indicies
kind_meta = DictField()
#: customer-defined other meta attributes
attributes = DictField()
#: s3file attributes
s3file = DictField()
#: location URI
uri = StringField()
def register_backend(self, kind, backend):
"""
register a backend class
:param kind: (str) the backend kind
:param backend: (class) the backend class
"""
self.defaults.OMEGA_STORE_BACKENDS[kind] = load_class(backend)
if kind not in MDREGISTRY.KINDS:
MDREGISTRY.KINDS.append(kind)
return self
if backend is not None:
return backend.get(name, **kwargs)
if meta.kind == MDREGISTRY.SKLEARN_JOBLIB:
backend = self.get_backend(name)
return backend.get_model(name)
elif meta.kind == MDREGISTRY.SPARK_MLLIB:
backend = self.get_backend(name)
return backend.get_model(name, version)
elif meta.kind == MDREGISTRY.PANDAS_DFROWS:
return self.get_dataframe_documents(name, version=version,
**kwargs)
elif meta.kind == MDREGISTRY.PANDAS_SEROWS:
return self.get_dataframe_documents(name, version=version,
is_series=True,
**kwargs)
elif meta.kind == MDREGISTRY.PANDAS_DFGROUP:
return self.get_dataframe_dfgroup(
name, version=version, **kwargs)
elif meta.kind == MDREGISTRY.PYTHON_DATA:
return self.get_python_data(name, version=version)
elif meta.kind == MDREGISTRY.PANDAS_HDF:
return self.get_dataframe_hdf(name, version=version)
return self.get_object_as_python(meta, version=version)
def on_success(self, retval, task_id, *args, **kwargs):
om = self.om
args, kwargs = args[0:2]
nbfile = args[0]
meta = om.jobs.metadata(nbfile)
attrs = meta.attributes
attrs['state'] = 'SUCCESS'
attrs['task_id'] = task_id
meta.kind = MDREGISTRY.OMEGAML_JOBS
if not kwargs:
pass
else:
attrs['last_run_time'] = kwargs.get('run_at')
attrs['next_run_time'] = kwargs.get('next_run_time')
meta.attributes = attrs
meta.save()
similar to put_dataframe_as_documents no data will be replaced by
default. that is, obj is appended as new documents into the objects'
mongo collection. to replace the data, specify append=False.
"""
collection = self.collection(name)
if append is False:
collection.drop()
elif append is None and collection.count(limit=1):
from warnings import warn
warn('%s already exists, will append rows' % name)
objid = collection.insert({'data': obj})
return self._make_metadata(name=name,
prefix=self.prefix,
bucket=self.bucket,
kind=MDREGISTRY.PYTHON_DATA,
collection=collection.name,
attributes=attributes,
objid=objid).save()
df_idxcols = [col for col in obj.columns if col.startswith('_idx')]
if df_idxcols:
keys, idx_kwargs = MongoQueryOps().make_index(df_idxcols)
collection.create_index(keys, **idx_kwargs)
# create index on row id
keys, idx_kwargs = MongoQueryOps().make_index(['_om#rowid'])
collection.create_index(keys, **idx_kwargs)
# bulk insert
# -- get native objects
# -- seems to be required since pymongo 3.3.x. if not converted
# pymongo raises Cannot Encode object for int64 types
obj = obj.astype('O')
fast_insert(obj, self, name)
kind = (MDREGISTRY.PANDAS_SEROWS
if store_series
else MDREGISTRY.PANDAS_DFROWS)
meta = self._make_metadata(name=name,
prefix=self.prefix,
bucket=self.bucket,
kind=kind,
kind_meta=kind_meta,
attributes=attributes,
collection=collection.name)
return meta.save()
:param force_python: Return as a python object
:param kwargs: kwargs depending on object kind
:return: an object, estimator, pipelines, data array or pandas dataframe
previously stored with put()
"""
meta = self.metadata(name, version=version)
if meta is None:
return None
if not force_python:
backend = self.get_backend(name) if not kind else self.get_backend_bykind(kind)
if backend is not None:
return backend.get(name, **kwargs)
if meta.kind == MDREGISTRY.SKLEARN_JOBLIB:
backend = self.get_backend(name)
return backend.get_model(name)
elif meta.kind == MDREGISTRY.SPARK_MLLIB:
backend = self.get_backend(name)
return backend.get_model(name, version)
elif meta.kind == MDREGISTRY.PANDAS_DFROWS:
return self.get_dataframe_documents(name, version=version,
**kwargs)
elif meta.kind == MDREGISTRY.PANDAS_SEROWS:
return self.get_dataframe_documents(name, version=version,
is_series=True,
**kwargs)
elif meta.kind == MDREGISTRY.PANDAS_DFGROUP:
return self.get_dataframe_dfgroup(
name, version=version, **kwargs)
elif meta.kind == MDREGISTRY.PYTHON_DATA:
return self.get_python_data(name, version=version)
elif meta.kind == MDREGISTRY.PANDAS_HDF:
return self.get_dataframe_hdf(name, version=version)
def __init__(self, prefix=None, store=None, defaults=None):
self.defaults = defaults or omega_settings()
prefix = prefix or 'jobs'
self.store = store or OmegaStore(prefix=prefix)
self.kind = MDREGISTRY.OMEGAML_JOBS
self._include_dir_placeholder = True
# convenience so you can do om.jobs.schedule(..., run_at=om.jobs.Schedule(....))
self.Schedule = JobSchedule
obj.columns = stored_columns
# create mongon indicies for data frame index columns
df_idxcols = [col for col in obj.columns if col.startswith('_idx')]
if df_idxcols:
keys, idx_kwargs = MongoQueryOps().make_index(df_idxcols)
collection.create_index(keys, **idx_kwargs)
# create index on row id
keys, idx_kwargs = MongoQueryOps().make_index(['_om#rowid'])
collection.create_index(keys, **idx_kwargs)
# bulk insert
# -- get native objects
# -- seems to be required since pymongo 3.3.x. if not converted
# pymongo raises Cannot Encode object for int64 types
obj = obj.astype('O')
fast_insert(obj, self, name)
kind = (MDREGISTRY.PANDAS_SEROWS
if store_series
else MDREGISTRY.PANDAS_DFROWS)
meta = self._make_metadata(name=name,
prefix=self.prefix,
bucket=self.bucket,
kind=kind,
kind_meta=kind_meta,
attributes=attributes,
collection=collection.name)
return meta.save()