Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_add_new_pump(sdb, manufacturer, model):
'''Test if adding a new unique pump works properly'''
sdb.add_pump(manufacturer = manufacturer, model=model)
q = Query()
result = sdb.pumps.search((q.manufacturer == manufacturer) &
(q.model == model))
expected_result = {
'manufacturer': manufacturer,
'model': model
}
assert result[0] == expected_result
Name of the phase to get data for
prop : str
Property to get data for
datasets : espei.utils.PickleableTinyDB
Datasets to search for data
additional_query : tinydb.Query
A TinyDB Query object to search for. If None, a Query() will be created that does nothing.
Returns
-------
list
List of dictionary datasets that match the criteria
"""
if additional_query is None:
additional_query = tinydb.Query()
# TODO: we should only search and get phases that have the same sublattice_site_ratios as the phase in the database
desired_data = datasets.search(
(tinydb.where('output').test(lambda x: x in prop)) &
(tinydb.where('components').test(lambda x: set(x).issubset(comps))) &
(tinydb.where('phases') == [phase_name]) &
additional_query
)
return desired_data
def __init__(self, db_name, domain, raise_if_table_doesnt_exist=True, only_enabled=False, only_users=False):
self.db = None
self.table = None
self.only_enabled = (Query().enabled.exists() if only_enabled else Query().ntlmhash.exists()) & ( Query().enabled == True if only_enabled else Query().ntlmhash.exists())
self.only_users = (Query().username.exists() if only_users else Query().ntlmhash.exists()) & (Query().username.test(lambda v: not v.endswith("$")) if only_users else Query().ntlmhash.exists())
serialization = SerializationMiddleware()
serialization.register_serializer(DateTimeSerializer(), "datetime")
self.db = TinyDB(db_name, storage=CachingMiddleware(serialization))
tables = list(self.db.tables())
if raise_if_table_doesnt_exist and domain not in tables:
raise DomainDoesntExist("Hashes for domain '{}' do not exist in database.".format(domain), tables)
self.table = self.db.table(domain)
def user_by_deviceid(deviceid):
users = db_get().table("users")
User = Query()
return users.get(User.devices.any([deviceid]))
def _getpk(self):
r = self.db.get(Query().name==self.pk)
if r is None:
# todo: return 404 not found error. on GET and _('pipe').post()
raise IOError('{} Not found'.format(self.pk))
elif 'data' in r.keys():
return r['data']
else:
return r
def _to_condition(self, filter):
import tinydb
query = tinydb.Query()
condition = query._id.exists()
for name, value in legacy.iteritems(filter):
if name.startswith("$"): continue
query = tinydb.Query()
_condition = getattr(query, name).__eq__(value)
condition &= _condition
return condition
def searchsvchostname(cls, hostname):
return cls._searchstring_re(Query().infos.service_hostname, hostname)
def searchOrCreateChyron(text):
Chyron = Query()
chyrons = chyron_table.search(Chyron.text == text)
if not chyrons:
#Chyron doesn't exist creating now.
return createChyron(text)
else: return chyrons[0].eid
raise PullError('Modified files will be overwritten by pull, specify merge mode '+str(filesmod))
elif merge_mode == 'keep':
filespull = list(set(filespull)-set(filesmod))
elif merge_mode == 'overwrite':
pass
else:
raise ValueError('invalid merge mode')
filessync = self._pullpush_luigi(filespull, 'get')
# scan local files after pull
fileslocal, _ = self.scan_local(files=filessync, attributes=True)
# update db
_tinydb_insert(self.dbfiles, filessync, filesremote, fileslocal)
self.dbconfig.upsert({'name': self.name, 'remote': self.settings, 'pipe': self.settings}, Query().name == self.name)
# print README.md
if 'README.md' in filessync:
print('############### README ###############')
with open(self.dirpath/'README.md', 'r') as fhandle:
print(fhandle.read())
print('############### README ###############')
if 'LICENSE.md' in filessync:
print('############### LICENSE ###############')
with open(self.dirpath/'LICENSE.md', 'r') as fhandle:
print(fhandle.read())
print('############### LICENSE ###############')
return filessync
def __db_upsert(self, force_insert=False):
"""Function to insert(or update) the record to the database.
Args:
force_insert (bool): Force insert flag.
Returns:
str: Response.
"""
if self.table.search((Query().id == self.id)):
if force_insert:
self.table.update({'id': self.id, 'name': self.name, 'url': self.url, 'server': self.server, 'to_be_used': self.to_be_used, 'stream_ids': self.stream_ids, 'active_stream_id': self.active_stream_id}, Query().id == self.id)
else:
return "Already Exist"
else:
self.table.insert({
'id': self.id,
'name': self.name,
'url': self.url,
'server': self.server,
'to_be_used': self.to_be_used,
'stream_ids': self.stream_ids,
'active_stream_id': self.active_stream_id
}) # insert the given data
return ""