Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_search():
search_query = {"filter_class": "Document"}
search_query["filter_date_begin"] = "2010"
search_query["filter_date_end"] = "2013"
search_query["filter_languages"] = ["en", "fr"]
search_query["filter_types"] = ["Book", "BookPart"]
search_query["rec_class"] = "SearchQuery"
search_query["rec_metajson"] = 1
search_query["result_batch_size"] = 100
search_query["result_bibliographic_styles"] = ["mla"]
search_query["result_offset"] = 0
search_query["result_sorts"] = [{"field": "rec_type", "order": "asc"}]
search_query["search_terms"] = [{"index": "title", "operator": "and", "value": "Cheyenne"}, {"index": "title", "operator": "or", "value": "technique"}]
print "search_query:"
print jsonbson.dumps_json(search_query, True)
search_result = repository_service.search(None, search_query)
print "search_result:"
print jsonbson.dumps_bson(search_result, True)
document["keywords"] = [x.strip() for x in csv_row["Sujet(s) de l'enquete"].split("\n") if x.strip()]
if "Nom Auteur 1" in csv_row:
name_familly = csv_row["Nom Auteur 1"]
name_given = affiliation = ""
if "Prenom Auteur 1" in csv_row:
name_given = csv_row["Prenom Auteur 1"]
if "Affiliation Auteur 1" in csv_row:
affiliation = csv_row["Affiliation Auteur 1"]
document["creators"] = creators
else:
logging.error("Unknown input_format: {}".format(input_format))
logging.info(jsonbson.dumps_json(document, True))
return document
def load_metajson_file(file_path):
with open(file_path) as metajson_file:
metajson = jsonbson.load_json_file(metajson_file)
if "records" in metajson:
for record in metajson["records"]:
if record:
yield metajson_service.load_dict(record)
def conf_fields(corpus, folder):
fields_dir = os.path.abspath(os.path.join(config_service.config_path, "corpus", folder, "fields"))
if os.path.exists(fields_dir):
files = os.listdir(fields_dir)
if files:
results = []
for file_name in os.listdir(fields_dir):
if file_name.endswith(".json"):
with open(os.path.join(fields_dir, file_name), 'r') as field_file:
try:
json_field = jsonbson.load_json_file(field_file)
results.append(repository_service.save_field(corpus, json_field))
except ValueError as e:
logging.error("ERROR: Field file is not valid JSON : {} {} {}".format(folder, file_name, e))
return results
def authenticate(username, password):
#logging.debug(AUTHENTICATE)
data = {"username": username, "password": password}
data_encoded = urllib.urlencode(data)
url = config["endpoint"] + AUTHENTICATE
#logging.debug(url)
request = urllib2.Request(url, data_encoded)
response = urllib2.urlopen(request)
result = jsonbson.load_json_str(response.read())
logging.debug(result)
auth_dict = {"token": result["token"], "user_id": username}
return auth_dict
#logging.debug(DOCUMENTS)
url = config["endpoint"] + DOCUMENTS
#logging.debug(url)
#logging.debug(item)
if "tags" in item:
tags = jsonbson.dumps_json(item["tags"])
del item["tags"]
logging.debug(tags)
item["tags"] = tags
params_encoded = urllib.urlencode(item)
logging.debug(params_encoded)
request = urllib2.Request(url, params_encoded)
response = urllib2.urlopen(request)
result = response.read()
#logging.debug(result)
return jsonbson.load_json_str(result)
try:
pass
except:
logging.error("*** Error uploading item to sven : {}".format(item["title"]))
return {"status": "error"}
def authenticate(username, password):
#logging.debug(AUTHENTICATE)
auth = {"username": username, "password": password}
auth_encoded = urllib.urlencode(auth)
url = config["endpoint"] + AUTHENTICATE
logging.debug(url)
request = urllib2.Request(url, auth_encoded)
response = urllib2.urlopen(request)
result = jsonbson.load_json_str(response.read())
logging.debug(result)
anta_auth = {"token": result["token"], "user_id": username}
return anta_auth