Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_basic_graph(self):
proxy = model.get_proxy(ENTITY, cleaned=False)
graph = Graph(edge_types=registry.pivots)
graph.add(proxy)
assert len(graph.iternodes()) > 1, graph.to_dict()
assert len(graph.proxies) == 1, graph.proxies
assert len(graph.queued) == 0, graph.proxies
graph.add(None)
assert len(graph.proxies) == 1, graph.proxies
assert len(graph.queued) == 0, graph.proxies
def ingest(self, file_path, entity):
try:
entity.schema = model.get('Audio')
metadata = MediaInfo.parse(file_path)
for track in metadata.tracks:
entity.add('title', track.title)
entity.add('generator', track.writing_application)
entity.add('generator', track.writing_library)
entity.add('generator', track.publisher)
entity.add('authoredAt', self.parse_timestamp(track.recorded_date)) # noqa
entity.add('authoredAt', self.parse_timestamp(track.tagged_date)) # noqa
entity.add('authoredAt', self.parse_timestamp(track.encoded_date)) # noqa
modified_at = self.parse_timestamp(track.file_last_modification_date) # noqa
entity.add('modifiedAt', modified_at)
if track.sampling_rate:
entity.add('samplingRate', track.sampling_rate)
entity.add('duration', track.duration)
except Exception as ex:
raise ProcessingException("Could not read audio: %r", ex) from ex
def add_schemata(self):
for schema in model:
self.add_class(schema)
def validate(infile, outfile):
try:
while True:
entity = read_entity(infile)
if entity is None:
break
clean = model.make_entity(entity.schema)
clean.id = entity.id
for (prop, value) in entity.itervalues():
clean.add(prop, value)
write_object(outfile, clean)
except BrokenPipeError:
raise click.Abort()
def score(self):
if self.subject is None or self.candidate is None:
return 0.0
if self.subject.id == self.candidate.id:
return 1.0
return compare(model, self.subject, self.candidate)
def check_schema(value):
schema = model.get(value)
if schema is None:
msg = gettext('Invalid schema name: %s')
raise ValueError(msg % value)
return True
def model(self):
return model.get(self.schema)
def ingest(self, file_path, entity):
entity.schema = model.get('Workbook')
self.extract_ole_metadata(file_path, entity)
try:
book = xlrd.open_workbook(file_path, formatting_info=False)
except Exception as err:
raise ProcessingException('Invalid Excel file: %s' % err) from err
try:
for sheet in book.sheets():
table = self.manager.make_entity('Table', parent=entity)
table.make_id(entity.id, sheet.name)
table.set('title', sheet.name)
self.emit_row_tuples(table, self.generate_csv(sheet))
if table.has('csvHash'):
self.manager.emit_entity(table)
except XLRDError as err:
raise ProcessingException('Invalid Excel file: %s' % err) from err
def add_property(self, prop):
self.graph.add((prop.uri, RDF.type, RDF.Property))
self.graph.add((prop.uri, RDFS.isDefinedBy, self.uri))
self.graph.add((prop.uri, RDFS.label, Literal(prop.label)))
if prop.description is not None:
self.graph.add((prop.uri, RDFS.comment, Literal(prop.description)))
self.graph.add((prop.uri, RDFS.domain, prop.schema.uri))
if prop.range is not None:
range_uri = model.get(prop.range).uri
self.graph.add((prop.uri, RDFS.range, range_uri))
if prop.reverse is not None:
self.graph.add((prop.uri, OWL.inverseOf, prop.reverse.uri))
if prop.type == registry.date:
self.graph.add((prop.uri, RDFS.range, XSD.dateTime))