Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def convert_to_tsv(relation_df):
"""Converts the Dataframe to tsv for PBG to read.
each row is in the triplet format that defines one edge/relationship in the graph
columns: start,label,end
- start: id of the 'from' node
- end: id of the 'to' node
- label: type of the relationship
Arguments:
relation_df {[Dataframe]} -- Dataframe in above mentioned format
"""
try:
tsv_path = os.path.join(
os.getcwd(),
GLOBAL_CONFIG["PROJECT_NAME"],
GLOBAL_CONFIG["DATA_DIRECTORY"],
GLOBAL_CONFIG["TSV_FILE_NAME"] + ".tsv",
) # default myproject/data/graph.tsv
logging.info(f"WRITING TSV FILE TO {tsv_path}")
relation_df[["start", "label", "end"]].to_csv(
tsv_path, sep="\t", header=False, index=False
)
except Exception as e:
logging.info("error in converting to tsv")
logging.info(e, exc_info=True)
sys.exit(e)
def get_checkpoint_version():
"""returns the latest version of the embeddings
Returns:
[int] -- version of the embeddings
"""
try:
GLOBAL_CONFIG = load_config("GLOBAL_CONFIG")
checkpoint_version_file = os.path.join(
GLOBAL_CONFIG["PROJECT_NAME"],
GLOBAL_CONFIG["CHECKPOINT_DIRECTORY"],
"checkpoint_version.txt",
)
with open(checkpoint_version_file, "r") as f:
version = f.read()
f.close()
version = int(version.split()[0].strip())
logging.info(f"Latest checkpoint version: {version}")
return version
except Exception as e:
logging.error(f"Could locate checkpoint version file: {e}", exc_info=True)
"""exports the graph database as a json file
"""
try:
export_file_name = GLOBAL_CONFIG["JSON_EXPORT_FILE"] + ".json"
graph_file_path = os.path.abspath(
os.path.join(
DATA_DIRECTORY, export_file_name
) # default: myproject/data/graph.json
)
logging.info(f"""EXPORTING GRAPH DATABASE TO {graph_file_path}...... """)
query = (
f"""CALL apoc.export.json.all('{graph_file_path}'"""
+ """,{batchSize:500})"""
)
graph_connection.run(query)
if os.path.exists(graph_file_path):
logging.info("Done...")
else:
logging.info("export failed! try again!")
except Exception as e:
logging.info(
"""error in exporting data.
Possible problemas may include incorrect url and credentials.
Or absence of apoc procedures.
Also make sure apoc settings are configured in neo4j.conf"""
)
logging.info(e, exc_info=True)
sys.exit(e)
def read_embeddings(entity_type, partition_number):
"""Reads embeddings (.h5) files
Arguments:
entity_type {[str]} -- label of node
partition_number {[int]} -- partition in which the node lies
Returns:
[type] -- [description]
"""
try:
version = get_checkpoint_version()
embedding_file = f"embeddings_{entity_type}_{partition_number}.v{version}.h5"
logging.info(f"embedding file name: {embedding_file}")
embeddings_path = os.path.join(
GLOBAL_CONFIG["PROJECT_NAME"],
GLOBAL_CONFIG["CHECKPOINT_DIRECTORY"],
embedding_file,
)
with h5py.File(embeddings_path, "r") as hf:
embeddings = hf["embeddings"][...]
hf.close()
return embeddings
except Exception as e:
logging.info(f"error in reading embedding h5 file: {e}", exc_info=True)
def export_graph_to_json():
"""exports the graph database as a json file
"""
try:
export_file_name = GLOBAL_CONFIG["JSON_EXPORT_FILE"] + ".json"
graph_file_path = os.path.abspath(
os.path.join(
DATA_DIRECTORY, export_file_name
) # default: myproject/data/graph.json
)
logging.info(f"""EXPORTING GRAPH DATABASE TO {graph_file_path}...... """)
query = (
f"""CALL apoc.export.json.all('{graph_file_path}'"""
+ """,{batchSize:500})"""
)
graph_connection.run(query)
if os.path.exists(graph_file_path):
logging.info("Done...")
else:
logging.info("export failed! try again!")
except Exception as e:
logging.info(
"""error in exporting data.
global GLOBAL_CONFIG
global DATA_DIRECTORY
global CHECKPOINT_DIRECTORY
global FILENAMES
GLOBAL_CONFIG = load_config("GLOBAL_CONFIG")
FILENAMES = {
"train": os.path.join(
os.getcwd(),
GLOBAL_CONFIG["PROJECT_NAME"],
GLOBAL_CONFIG["DATA_DIRECTORY"],
GLOBAL_CONFIG["TSV_FILE_NAME"] + ".tsv",
)
} # path to tsv file with train data
DATA_DIRECTORY = os.path.join(
os.getcwd(), GLOBAL_CONFIG["PROJECT_NAME"], GLOBAL_CONFIG["DATA_DIRECTORY"]
)
CHECKPOINT_DIRECTORY = os.path.join(
os.getcwd(),
GLOBAL_CONFIG["PROJECT_NAME"],
GLOBAL_CONFIG["CHECKPOINT_DIRECTORY"],
)
Returns:
[dict] -- [config in PBG format]
"""
try:
from embeoj.utils import load_config
logging.info(f"""CREATING CONFIGURATION FILE FOR TRAINING...... """)
default_config = load_config("OPTIONAL_PBG_SETTINGS")
pbg_config = export_meta_data()
pbg_config["num_epochs"] = GLOBAL_CONFIG["EPOCHS"]
pbg_config["dimension"] = GLOBAL_CONFIG["EMBEDDING_DIMENSIONS"]
pbg_config["entity_path"] = DATA_DIRECTORY
# change if num of partitions > 1
pbg_config["edge_paths"] = [
os.path.join(
DATA_DIRECTORY, GLOBAL_CONFIG["TSV_FILE_NAME"] + "_partitioned"
)
]
pbg_config["checkpoint_path"] = CHECKPOINT_DIRECTORY
operator = default_config["operator"]
for relation in pbg_config["relations"]:
relation["operator"] = operator # adds operator for each relation
pbg_config = {
**pbg_config,
**default_config,
} # merge the default and added config
del pbg_config["operator"] # removes extra key from config to avoid error
return pbg_config
except Exception as e:
logging.info("Could not create pbg config")
logging.info(e, exc_info=True)
def save_index(entity_type, partition_number):
"""Saves the index file
Arguments:
entity_file {[str]} -- Name of the entity files
Returns:
[type] -- created index
"""
try:
index_filename = f"index_{entity_type}_{partition_number}.index"
index_path = os.path.join(CHECKPOINT_DIRECTORY, "index", index_filename)
embeddings = read_embeddings(entity_type, partition_number)
if not os.path.exists(index_path):
logging.info(f"creating new index file {index_filename}")
index = create_faiss_index()
if FAISS_INDEX_NAME == "IndexIVFFlat":
index.train(embeddings)
index.add(embeddings)
faiss.write_index(index, index_path)
else:
logging.info("index exists ")
except Exception as e:
logging.info(f"error in index creation: {e}", exc_info=True)
def initialise_config():
from embeoj.utils import load_config
global GLOBAL_CONFIG
global json_path
GLOBAL_CONFIG = load_config("GLOBAL_CONFIG")
json_path = os.path.join(
os.getcwd(),
GLOBAL_CONFIG["PROJECT_NAME"],
GLOBAL_CONFIG["DATA_DIRECTORY"],
GLOBAL_CONFIG["JSON_EXPORT_FILE"] + ".json",
) # path to the json dump of the graph db
def initialise_config():
from embeoj.utils import load_config
global GLOBAL_CONFIG
global DATA_DIRECTORY
global CHECKPOINT_DIRECTORY
global FILENAMES
GLOBAL_CONFIG = load_config("GLOBAL_CONFIG")
FILENAMES = {
"train": os.path.join(
os.getcwd(),
GLOBAL_CONFIG["PROJECT_NAME"],
GLOBAL_CONFIG["DATA_DIRECTORY"],
GLOBAL_CONFIG["TSV_FILE_NAME"] + ".tsv",
)
} # path to tsv file with train data
DATA_DIRECTORY = os.path.join(
os.getcwd(), GLOBAL_CONFIG["PROJECT_NAME"], GLOBAL_CONFIG["DATA_DIRECTORY"]
)
CHECKPOINT_DIRECTORY = os.path.join(
os.getcwd(),
GLOBAL_CONFIG["PROJECT_NAME"],
GLOBAL_CONFIG["CHECKPOINT_DIRECTORY"],
)