Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
else:
filter_str = ""
query = """
PREFIX wwdt:
PREFIX wwd:
SELECT ?localitem ?ext_id WHERE {{
SERVICE {{
?wditem wwdt:{prop} ?ext_id .
{filter_str}
}}
?localitem wdt:P3 ?wditem
}}
""".format(prop=prop, filter_str=filter_str)
results = wdi_core.WDItemEngine.execute_sparql_query(query, endpoint=self.sparql_endpoint_url)['results']['bindings']
results = [{k: v['value'] for k, v in x.items()} for x in results]
for r in results:
r['localitem'] = r['localitem'].split('/')[-1]
if not results:
return None
id_qid = defaultdict(set)
for r in results:
id_qid[r['ext_id']].add(r['localitem'])
if return_as_set:
return dict(id_qid)
else:
return {x['ext_id']: x['localitem'] for x in results}
?psv wikibase:geoLatitude ?geoLatitude .
?psv wikibase:geoLongitude ?geoLongitude .
?psv wikibase:geoGlobe ?geoGlobe .
?psv wikibase:geoPrecision ?geoPrecision .
OPTIONAL {
?s2 ?pr ?q .
FILTER(STRSTARTS(STR(?pr), "http://www.wikidata.org/prop/qualifier/"))
}
}
'''
if not __debug__:
print(query)
r = wdi_core.WDItemEngine.execute_sparql_query(query=query, prefix=prefix)
for i in r['results']['bindings']:
i['p'] = i['p']['value'].split('/')[-1]
if 's2' in i:
i['s2'] = i['s2']['value'].split('/')[-1]
if 'q' in i:
i['q'] = i['q']['value'].split('/')[-1]
if 'pr' in i:
i['pr'] = i['pr']['value'].split('/')[-1]
if 'v' in i:
if i['v']['type'] == 'literal':
i['v'] = i['v']['value']
elif i['v']['type'] == 'uri':
if 'www.wikidata.org/entity/' in i['v']['value']:
i['v'] = i['v']['value'].split('/')[-1]
def list_formats(cls, lang=None):
"""Queries Wikidata for formats and returns a list of FileFormat instances."""
if not lang:
lang = LANG
query = [
"SELECT ?idFileFormat ?idFileFormatLabel",
"(GROUP_CONCAT(DISTINCT ?mediaType; SEPARATOR='|') AS ?mediaTypes)",
"WHERE {",
"?idFileFormat wdt:P31 wd:Q235557.",
"OPTIONAL { ?idFileFormat wdt:P1163 ?mediaType }",
"SERVICE wikibase:label {{ bd:serviceParam wikibase:language '{}' }}".format(lang),
"}",
"GROUP BY ?idFileFormat ?idFileFormatLabel",
"ORDER BY ?idFileFormatLabel"
]
results_json = wdi_core.WDItemEngine.execute_sparql_query(" ".join(query))
results = [cls(x['idFileFormat']['value'].replace('http://www.wikidata.org/entity/', ''),
x['idFileFormatLabel']['value'],
x['mediaTypes']['value'].split('|') if x['mediaTypes']['value'] else [])
for x in results_json['results']['bindings']]
return results
# only use this statement if mapping relation type is exact, or mrt is not specified
mrt_qualifiers = [q for q in statement.get_qualifiers() if q.get_prop_nr() == mrt_pid]
if (len(mrt_qualifiers) == 1) and (mrt_qualifiers[0].get_value() != int(exact_qid[1:])):
continue
# TODO: implement special treatment when searching for date/coordinate values
data_point = statement.get_value()
if isinstance(data_point, tuple):
data_point = data_point[0]
core_props = self.core_props
if wd_property in core_props:
tmp_qids = set()
# if mrt_pid is "PXXX", this is fine, because the part of the SPARQL query using it is optional
query = statement.sparql_query.format(mrt_pid=mrt_pid, pid=wd_property, value=data_point)
results = WDItemEngine.execute_sparql_query(query=query, endpoint=self.sparql_endpoint_url)
for i in results['results']['bindings']:
qid = i['item_id']['value'].split('/')[-1]
if ('mrt' not in i) or ('mrt' in i and i['mrt']['value'].split('/')[-1] == exact_qid):
tmp_qids.add(qid)
qid_list.update(tmp_qids)
# Protocol in what property the conflict arises
if wd_property in conflict_source:
conflict_source[wd_property].append(tmp_qids)
else:
conflict_source[wd_property] = [tmp_qids]
if len(tmp_qids) > 1:
raise ManualInterventionReqException(
If `raise_on_duplicate` is False and `return_as_set` is True, the following can be returned:
{ 'A0KH68': {'Q23429083'}, 'B023F44': {'Q237623', 'Q839742'} }
:return: dict
"""
query = "SELECT ?id ?item ?mrt WHERE {"
query += "?item p:{} ?s .\n?s ps:{} ?id .\n".format(prop, prop)
query += "OPTIONAL {?s pq:P4390 ?mrt}\n"
if filters:
for f in filters:
query += "?item wdt:{} wd:{} .\n".format(f[0], f[1])
query = query + "}"
results = WDItemEngine.execute_sparql_query(query, endpoint=endpoint)['results']['bindings']
results = [{k: v['value'] for k, v in x.items()} for x in results]
for r in results:
r['item'] = r['item'].split('/')[-1]
if 'mrt' in r:
r['mrt'] = r['mrt'].split('/')[-1]
if not results:
return None
if prefer_exact_match:
df = pd.DataFrame(results)
if 'mrt' not in df:
df['mrt'] = ''
df.mrt = df.mrt.fillna('')
df['keep'] = True
# check if a QID has more than one extID
def process_query_string(query):
"""Use WikiDataIntegrator Engine to process a SPARQL Query."""
result = WDItemEngine.execute_sparql_query(query)
bindings = result['results'].get('bindings')
return _format_wikidata_bindings(bindings)
def process_query_string(query):
result = WDItemEngine.execute_sparql_query(query)
bindings = result['results'].get('bindings')
return format_wikidata_bindings(bindings)
def search(cls, search_string, lang="en"):
"""
Query Wikidata to get search results.
Args:
search_string (str):
lang (str):
Returns (List[FileFormatExtSearchResult]):
"""
query = cls._build_query(search_string.replace('.', "").lower(), lang)
results_json = wdi_core.WDItemEngine.execute_sparql_query(query)
objects = cls._assemble_results(results_json)
return objects