Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_Source():
from clld.db.models.common import Source
d = Source(id='abc')
assert d.gbs_identifier is None
d = Source(id='abc', jsondata={'gbs': {'volumeInfo': {}}})
assert d.gbs_identifier is None
d = Source(
id='abc',
jsondata={
'gbs': {
'volumeInfo': {
'industryIdentifiers': [{'type': 'x', 'identifier': 'y'}]}}})
assert d.gbs_identifier == 'y'
d = Source(
id='abc',
jsondata={
'gbs': {
'volumeInfo': {
'industryIdentifiers': [{'type': 'ISBN_10', 'identifier': ''}]}}})
def ia_func(command, args, sources=None): # pragma: no cover
def words(s):
return set(slug(s.strip(), remove_whitespace=False).split())
log = args.log
count = 0
if not sources:
sources = DBSession.query(common.Source)\
.order_by(common.Source.id)\
.options(joinedload(common.Source.data))
else:
if callable(sources):
sources = sources()
i = 0
for i, source in enumerate(page_query(sources, verbose=True, commit=True)):
filepath = args.data_file('ia', 'source%s.json' % source.id)
if command in ['verify', 'update']:
if filepath.exists():
with open(filepath) as fp:
try:
data = json.load(fp)
except ValueError:
continue
if not data['response']['numFound']:
'ID': item.id,
'Name': item.name,
'Description': item.description,
'Contributors': item.formatted_contributors(),
}
if model == Sentence:
return {
'ID': item.id,
'Language_ID': self.pk2id['Language'][item.language_pk],
'Primary_Text': item.name,
'Analyzed_Word': item.analyzed.split('\t') if item.analyzed else [],
'Gloss': item.gloss.split('\t') if item.gloss else [],
'Translated_Text': item.description,
'Comment': item.comment,
}
if model == Source:
return source2source(req, item)
if model == Value:
res = {
'ID': item.id,
'Language_ID': self.pk2id['Language'][item.valueset.language_pk],
'Parameter_ID': self.pk2id['Parameter'][item.valueset.parameter_pk],
'Contribution_ID': self.pk2id['Contribution'][item.valueset.contribution_pk],
'Value': (item.domainelement.name if item.domainelement else item.name) or '-',
'Source': [
'{0}{1}'.format(self.pk2id['Source'][spk], d) for spk, d in iterrefs(item)],
}
if item.domainelement_pk:
res['Code_ID'] = self.pk2id['DomainElement'][item.domainelement_pk]
if self.module == 'Wordlist':
res['Form'] = res['Value']
return res
def order(self):
return Source.bibtex_type
def __new__(cls, name, model, interface, with_index=True, with_rdfdump=True):
return super(Resource, cls).__new__(cls, name, model, interface, with_index, with_rdfdump)
@property
def plural(self):
return self.name + 's'
RESOURCES = [
Resource('dataset', common.Dataset, interfaces.IDataset, with_index=False),
Resource('contribution', common.Contribution, interfaces.IContribution),
Resource('parameter', common.Parameter, interfaces.IParameter),
Resource('language', common.Language, interfaces.ILanguage),
Resource('contributor', common.Contributor, interfaces.IContributor),
Resource('source', common.Source, interfaces.ISource),
Resource('sentence', common.Sentence, interfaces.ISentence),
Resource('valueset', common.ValueSet, interfaces.IValueSet),
Resource('value', common.Value, interfaces.IValue),
Resource('unitparameter', common.UnitParameter, interfaces.IUnitParameter),
Resource('unit', common.Unit, interfaces.IUnit),
Resource('unitvalue', common.UnitValue, interfaces.IUnitValue),
Resource(
'combination',
common.Combination,
interfaces.ICombination,
with_index=False,
with_rdfdump=False),
]
languoid_map[l.id] = l
for i, rec in enumerate(get_bib(args)):
if i and i % 1000 == 0:
print i, 'records done', stats['updated'] + stats['new'], 'changed'
if len(rec.keys()) < 6:
# not enough information!
stats.update(['skipped'])
continue
changed = False
assert rec.get('glottolog_ref_id')
id_ = int(rec.get('glottolog_ref_id'))
ref = DBSession.query(Source).get(id_)
update = True if ref else False
kw = {
'pk': id_,
'bibtex_type': rec.genre,
'id': str(id_),
'jsondata': {'bibtexkey': rec.id},
}
for source, target in FIELD_MAP.items():
if target is None:
continue
value = rec.get(source)
if value:
value = unescape(value)
if target:
count = 0
api_url = "https://www.googleapis.com/books/v1/volumes?"
if command == 'cleanup':
for fname in args.data_file('gbs').glob('*.json'):
try:
fname = Path(fname)
data = jsonlib.load(fname)
if data.get('totalItems') == 0:
fname.unlink()
except ValueError:
fname.unlink()
return
if not sources:
sources = DBSession.query(common.Source)\
.order_by(common.Source.id)\
.options(joinedload(common.Source.data))
if callable(sources):
sources = sources()
for i, source in enumerate(page_query(sources, verbose=True, commit=True)):
filepath = args.data_file('gbs', 'source%s.json' % source.id)
if command == 'update':
source.google_book_search_id = None
source.update_jsondata(gbs={})
if command in ['verify', 'update']:
if filepath.exists():
try:
data = jsonlib.load(filepath)
'ms': {}
}
for name, cls, kw in [
('languoids', LanguoidsMultiSelect, dict(
url=request.route_url('glottolog.childnodes'))),
('macroareas', MultiSelect, dict(collection=res['macroareas'])),
('doctypes', MultiSelect, dict(collection=res['doctypes'])),
]:
res['ms'][name] = cls(request, name, 'ms' + name, **kw)
res['params'], reqparams = get_params(request.params, **res)
res['refs'] = getRefs(res['params'])
if res['refs']:
res['dt'] = Refs(request, Source, cq=1, **reqparams)
fmt = request.params.get('format')
if fmt:
db = bibtex.Database([ref.bibtex() for ref in res['refs']])
for name, adapter in request.registry.getAdapters([db], IRepresentation):
if name == fmt:
return adapter.render_to_response(db, request)
return HTTPNotAcceptable()
return res
def bibtex2source(rec, cls=common.Source, lowercase_id=False):
year = bibtex.unescape(rec.get('year', 'nd'))
fields = {}
jsondata = {}
for field in bibtex.FIELDS:
if field in rec:
value = bibtex.unescape(rec[field])
container = fields if hasattr(cls, field) else jsondata
container[field] = value
etal = ''
eds = ''
authors = rec.get('author')
if not authors:
authors = rec.get('editor', '')
if authors:
eds = ' (eds.)'
def __new__(cls, name, model, interface, with_index=True, with_rdfdump=True):
return super(Resource, cls).__new__(cls, name, model, interface, with_index, with_rdfdump)
@property
def plural(self):
return self.name + 's'
RESOURCES = [
Resource('dataset', common.Dataset, interfaces.IDataset, with_index=False),
Resource('contribution', common.Contribution, interfaces.IContribution),
Resource('parameter', common.Parameter, interfaces.IParameter),
Resource('language', common.Language, interfaces.ILanguage),
Resource('contributor', common.Contributor, interfaces.IContributor),
Resource('source', common.Source, interfaces.ISource),
Resource('sentence', common.Sentence, interfaces.ISentence),
Resource('valueset', common.ValueSet, interfaces.IValueSet),
Resource('value', common.Value, interfaces.IValue),
Resource('unitparameter', common.UnitParameter, interfaces.IUnitParameter),
Resource('unit', common.Unit, interfaces.IUnit),
Resource('unitvalue', common.UnitValue, interfaces.IUnitValue),
Resource(
'combination',
common.Combination,
interfaces.ICombination,
with_index=False,
with_rdfdump=False),
]