Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test():
base_dir = os.path.join(os.getcwd(), "data")
logging.info("base_dir: ".format(base_dir))
openurl = "url_ver=Z39.88-2004&rft_val_fmt=info:ofi/fmt:kev:mtx:journal&rft.atitle=Isolation of a common receptor for coxsackie B&rft.jtitle=Science&rft.aulast=Bergelson&rft.auinit=J&rft.date=1997&rft.volume=275&rft.spage=1320&rft.epage=1323"
metajson_list = crossref.query_openurl_and_retrieve_metadata(openurl, True)
if metajson_list:
output_path = os.path.join(base_dir, "result", "result_crossref_metajon.json")
logging.info(io_service.export_metajson_collection("test_crossref", "Crossref import test", metajson_list, output_path))
else:
assert False
def test_search_mongo():
mongo_query = {"$or": [{"title": {"$options": "i", "$regex": "Cheyenne"}}, {"title": {"$options": "i", "$regex": "technique"}}]}
mongo_query = {"$and": [{"$or": [{"title": {"$options": "i", "$regex": "Cheyenne"}}, {"title": {"$options": "i", "$regex": "technique"}}]}, {"publishers": {"$regex": "press", "$options": 'i'}}]}
search_result = repository_service.search_mongo(None, mongo_query)
print "search_result:"
print jsonbson.dumps_bson(search_result, True)
def test_search():
search_query = {"filter_class": "Document"}
search_query["filter_date_begin"] = "2010"
search_query["filter_date_end"] = "2013"
search_query["filter_languages"] = ["en", "fr"]
search_query["filter_types"] = ["Book", "BookPart"]
search_query["rec_class"] = "SearchQuery"
search_query["rec_metajson"] = 1
search_query["result_batch_size"] = 100
search_query["result_bibliographic_styles"] = ["mla"]
search_query["result_offset"] = 0
search_query["result_sorts"] = [{"field": "rec_type", "order": "asc"}]
search_query["search_terms"] = [{"index": "title", "operator": "and", "value": "Cheyenne"}, {"index": "title", "operator": "or", "value": "technique"}]
print "search_query:"
print jsonbson.dumps_json(search_query, True)
search_result = repository_service.search(None, search_query)
print "search_result:"
print jsonbson.dumps_bson(search_result, True)
search_query["filter_types"] = ["Book", "BookPart"]
search_query["rec_class"] = "SearchQuery"
search_query["rec_metajson"] = 1
search_query["result_batch_size"] = 100
search_query["result_bibliographic_styles"] = ["mla"]
search_query["result_offset"] = 0
search_query["result_sorts"] = [{"field": "rec_type", "order": "asc"}]
search_query["search_terms"] = [{"index": "title", "operator": "and", "value": "Cheyenne"}, {"index": "title", "operator": "or", "value": "technique"}]
print "search_query:"
print jsonbson.dumps_json(search_query, True)
search_result = repository_service.search(None, search_query)
print "search_result:"
print jsonbson.dumps_bson(search_result, True)
creator = Creator()
if role:
creator["roles"] = [role]
if rec_class == constants.REC_CLASS_EVENT:
event = Event()
event["title"] = formatted_name
creator["agent"] = event
elif rec_class == constants.REC_CLASS_ORGUNIT:
orgunit = Orgunit()
orgunit["name"] = formatted_name
creator["agent"] = orgunit
elif rec_class == constants.REC_CLASS_PERSON or rec_class == constants.REC_CLASS_FAMILY:
# class is "Person" or "Family"
name_given = ""
name_middle = ""
name_family = ""
name_prefix = ""
name_terms_of_address = ""
date_birth = ""
date_death = ""
parenthesis_index = formatted_name.rfind("(")
if parenthesis_index != -1:
#may be like: name (date_birth-date_death)
dates_part = formatted_name[parenthesis_index + 1:-1].strip()
date_birth = dates_part[:4]
date_death = dates_part[5:]
def load_dict(meta_dict):
if "rec_class" not in meta_dict:
return Common(meta_dict)
elif meta_dict["rec_class"] == "Document":
return Document(meta_dict)
elif meta_dict["rec_class"] == "Person":
return Person(meta_dict)
elif meta_dict["rec_class"] == "Orgunit":
return Orgunit(meta_dict)
elif meta_dict["rec_class"] == "Project":
return Project(meta_dict)
elif meta_dict["rec_class"] == "Event":
return Event(meta_dict)
elif meta_dict["rec_class"] == "Family":
return Family(meta_dict)
elif meta_dict["rec_class"] == "Field":
return Field(meta_dict)
elif meta_dict["rec_class"] == "Resource":
return Resource(meta_dict)
elif meta_dict["rec_class"] == "Target":
return Target(meta_dict)
def csv_dict_reader_to_metasjon(csv_row, input_format, source, rec_id_prefix):
document = Document()
if source:
document["rec_source"] = source
if input_format == constants.FORMAT_CSV_SITPOL:
#logging.debug("csv_dict_reader_to_metasjon type(csv_row): {}".format(type(csv_row)))
#print csv_row
document["title"] = csv_row["title"]
classifications_sitpol = [x.strip() for x in csv_row["classifications_sitpol"].split(";") if x.strip()]
if classifications_sitpol:
document["classifications_sitpol"] = classifications_sitpol
classifications_ddc = [x.strip() for x in csv_row["classifications_ddc"].split(";") if x.strip()]
if classifications_ddc:
document["classifications_ddc"] = classifications_ddc
formatted_names = [x.strip() for x in csv_row["creators@role=pbl"].split(";") if x.strip()]
if formatted_names:
#logging.debug("csv_dict_reader_to_metasjon type(csv_row): {}".format(type(csv_row)))
#print csv_row
document["title"] = csv_row["title"]
classifications_sitpol = [x.strip() for x in csv_row["classifications_sitpol"].split(";") if x.strip()]
if classifications_sitpol:
document["classifications_sitpol"] = classifications_sitpol
classifications_ddc = [x.strip() for x in csv_row["classifications_ddc"].split(";") if x.strip()]
if classifications_ddc:
document["classifications_ddc"] = classifications_ddc
formatted_names = [x.strip() for x in csv_row["creators@role=pbl"].split(";") if x.strip()]
if formatted_names:
#logging.debug("formatted_names: {}".format(formatted_names))
creators = []
for formatted_name in formatted_names:
if formatted_name:
creator = creator_service.formatted_name_to_creator(formatted_name, None, "pbl")
if creator:
creators.append(creator)
if creators:
document["creators"] = creators
document["date_last_accessed"] = datetime.now().isoformat()
document["descriptions"] = [{"language":"fr", "value":csv_row["descriptions@lang=fr"]}]
keywords_fr = [x.strip() for x in csv_row["keywords@lang=fr"].split(";") if x.strip()]
keywords_en = [x.strip() for x in csv_row["keywords@lang=en"].split(";") if x.strip()]
keywords = {}
if keywords_fr:
keywords["fr"] = keywords_fr
if keywords_en:
keywords["en"] = keywords_en
if keywords:
document["keywords"] = keywords
document["languages"] = [x.strip() for x in csv_row["languages"].split(";") if x.strip()]
def get_rml_call(rml):
""" call -> call """
result = {}
rml_call = rml.find(xmletree.prefixtag("rml", "call"))
if rml_call is not None:
call = Call()
# funding -> funding
rml_funding = rml_call.find(xmletree.prefixtag("rml", "funding"))
if rml_funding is not None:
# name -> agent.name
name = get_rml_element_text(rml_funding, "name")
funding = creator_service.formatted_name_to_creator(name, constants.REC_CLASS_ORGUNIT, None)
if funding is None:
funding = Creator()
funding["agent"] = Orgunit()
# identifier -> agent.rec_id & agent.identifiers
funding["agent"].update(get_rml_identifiers(rml_funding))
# programme -> programme
funding.update(get_rml_element_text_and_set_key(rml_funding, "programme", "programme"))
# scheme -> scheme
funding.update(get_rml_element_text_and_set_key(rml_funding, "scheme", "scheme"))
# contribution -> contribution
funding.update(get_rml_money_and_set_key(rml_funding, "contribution", "contribution"))
# dateEnd -> date_end
teaching.update(get_rml_element_text_and_set_key(rml_teaching, "dateEnd", "date_end"))
# description -> descriptions[i]
teaching.update(get_rml_textlangs_and_set_key(rml_teaching, "description", "descriptions"))
# level -> level
teaching.update(get_rml_element_text_and_set_key(rml_teaching, "level", "level"))
# title -> title
teaching.update(get_rml_element_text_and_set_key(rml_teaching, "title", "title"))
# creators
# name -> creators[0].agent.name
name = get_rml_element_text(rml_teaching, "name")
creator = creator_service.formatted_name_to_creator(name, constants.REC_CLASS_ORGUNIT, "dgg")
if creator is None:
creator = Creator()
creator["agent"] = Orgunit()
creator["roles"] = "dgg"
# identifiers -> creators[0].agent.rec_id or creators[0].agent.identifiers
creator["agent"].update(get_rml_identifiers(rml_teaching))
if "name" in creator["agent"] or "rec_id" in creator["agent"] or "identifiers" in creator["agent"]:
teaching["creators"] = [creator]
if teaching is not None:
teachings.append(teaching)
if teachings:
result["teachings"] = teachings
return result