Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
import_info = self.visit_ontology_imports(
ontology_path, visited_paths, rec_visited_paths)
if import_info is None:
return
json_file_path, visited_paths, rec_visited_paths = import_info
with open(json_file_path, 'r') as f:
spec_dict = json.load(f)
# Parse imported ontologies. Users can import them via a path relative
# to the PYTHONPATH.
relative_imports: Set[str] = set(
spec_dict.get(SchemaKeywords.imports, []))
for rel_import in relative_imports:
full_pkg_path: str = self.find_import_path(rel_import)
logging.info('Imported ontology at: %s', full_pkg_path)
self.parse_ontology_spec(
full_pkg_path, destination_dir, merged_schema, visited_paths,
rec_visited_paths)
# Once the ontology for all the imported files is generated, generate
# ontology of the current file.
# Print relative json path in the ontology if the current directory is
# the installation directory - example, when running the test cases
curr_forte_dir = utils.get_current_forte_dir()
print_json_file = json_file_path
if self.installed_forte_dir is not None and os.path.samefile(
child_type = 'child_type'
member_type = 'member_type'
default_value = 'default'
element_type = 'item_type'
dict_key_type = 'key_type'
dict_value_type = 'value_type'
REQUIRED_IMPORTS: List[str] = ['typing']
TOP_MOST_MODULE_NAME = 'forte.data.ontology.core'
DEFAULT_CONSTRAINTS_KEYS = {
"BaseLink": {SchemaKeywords.parent_type: "ParentType",
SchemaKeywords.child_type: "ChildType"},
"BaseGroup": {SchemaKeywords.member_type: "MemberType"}
}
AUTO_GEN_SIGNATURE = '***automatically_generated***'
AUTO_GEN_FILENAME = '.generated'
AUTO_DEL_FILENAME = '.deleted'
SOURCE_JSON_PFX = "***source json:"
SOURCE_JSON_SFX = "***"
SOURCE_JSON_TEMP = Template(f"{SOURCE_JSON_PFX}$file_path{SOURCE_JSON_SFX}")
def get_ignore_error_lines(json_filepath: str) -> List[str]:
source_json_sign = SOURCE_JSON_TEMP.substitute(file_path=json_filepath)
return [
f'# {AUTO_GEN_SIGNATURE}',
f'# {source_json_sign}',
json config. Appends entry code to the corresponding module. Creates a
new module file if module is generated for the first time.
Args:
schema: Ontology dictionary extracted from a json config.
source_json_file: Path of the source json file.
merged_schema: The merged schema is used to remember all
definitions during parsing.
Returns:
Modules to be imported by dependencies of the current ontology.
"""
entry_definitions: List[Dict] = schema[SchemaKeywords.definitions]
merged_schema.extend(entry_definitions)
allowed_packages = set(
schema.get(SchemaKeywords.prefixes, []) + [DEFAULT_PREFIX])
sorted_prefixes = analyze_packages(allowed_packages)
file_desc = file_header(
schema.get(SchemaKeywords.description, ""),
schema.get(SchemaKeywords.ontology_name, "")
)
for definition in entry_definitions:
raw_entry_name = definition[SchemaKeywords.entry_name]
validate_entry(raw_entry_name, sorted_prefixes)
if raw_entry_name in self.allowed_types_tree:
warnings.warn(
f"Class {raw_entry_name} already present in the "
f"ontology, will be overridden.", DuplicateEntriesWarning)
self.allowed_types_tree[raw_entry_name] = set()
Modules to be imported by dependencies of the current ontology.
"""
entry_definitions: List[Dict] = schema[SchemaKeywords.definitions]
merged_schema.extend(entry_definitions)
allowed_packages = set(
schema.get(SchemaKeywords.prefixes, []) + [DEFAULT_PREFIX])
sorted_prefixes = analyze_packages(allowed_packages)
file_desc = file_header(
schema.get(SchemaKeywords.description, ""),
schema.get(SchemaKeywords.ontology_name, "")
)
for definition in entry_definitions:
raw_entry_name = definition[SchemaKeywords.entry_name]
validate_entry(raw_entry_name, sorted_prefixes)
if raw_entry_name in self.allowed_types_tree:
warnings.warn(
f"Class {raw_entry_name} already present in the "
f"ontology, will be overridden.", DuplicateEntriesWarning)
self.allowed_types_tree[raw_entry_name] = set()
# Add the entry definition to the import managers.
# This time adding to the root manager so everyone can access it
# if needed, but they will only appear in the import list when
# requested.
# Entry class should be added to the imports before the attributes
# to be able to used as the attribute type for the same entry.
self.import_managers.root.add_object_to_import(raw_entry_name)
def parse_dict(
self, manager: ImportManager, schema: Dict, entry_name: EntryName,
att_name: str, att_type: str, desc: str):
if (SchemaKeywords.dict_key_type not in schema
or SchemaKeywords.dict_value_type not in schema):
raise TypeNotDeclaredException(
f"Item type of the attribute {att_name} for the entry "
f" {entry_name.class_name} not declared. This attribute is "
f"a composite type: {att_type}, it should have a "
f"{SchemaKeywords.dict_key_type} and "
f"{SchemaKeywords.dict_value_type}.")
key_type = schema[SchemaKeywords.dict_key_type]
if not valid_composite_key(key_type):
raise UnsupportedTypeException(
f"Key type {key_type} for entry {entry_name.name}'s "
f"attribute {att_name} is not supported, we only support a "
f"limited set of keys.")
value_type = schema[SchemaKeywords.dict_value_type]
if is_composite_type(value_type):
# Case of nested.
raise UnsupportedTypeException(
def parse_property(self, entry_name: EntryName, schema: Dict) -> Property:
"""
Parses instance and class properties defined in an entry schema and
checks for the constraints allowed by the ontology generation system.
Args:
entry_name: Entry Name object that contains various form of the
entry's name.
schema: Entry definition schema
Returns: An object of class `code_generation_util.FileItem` containing
the generated code.
"""
att_name = schema[SchemaKeywords.attribute_name]
att_type = schema[SchemaKeywords.attribute_type]
manager: ImportManager = self.import_managers.get(
entry_name.module_name)
# schema type should be present in the validation tree
# TODO: Remove this hack
if not manager.is_known_name(att_type):
raise TypeNotDeclaredException(
f"Attribute type '{att_type}' for the entry "
f"'{entry_name.name}' of the attribute '{att_name}' not "
f"declared in the ontology")
desc = schema.get(SchemaKeywords.description, None)
default_val = schema.get(SchemaKeywords.default_value, None)
# element type should be present in the validation tree
att_name = schema[SchemaKeywords.attribute_name]
att_type = schema[SchemaKeywords.attribute_type]
manager: ImportManager = self.import_managers.get(
entry_name.module_name)
# schema type should be present in the validation tree
# TODO: Remove this hack
if not manager.is_known_name(att_type):
raise TypeNotDeclaredException(
f"Attribute type '{att_type}' for the entry "
f"'{entry_name.name}' of the attribute '{att_name}' not "
f"declared in the ontology")
desc = schema.get(SchemaKeywords.description, None)
default_val = schema.get(SchemaKeywords.default_value, None)
# element type should be present in the validation tree
if att_type in COMPOSITES:
if att_type == 'List':
return self.parse_list(
manager, schema, entry_name, att_name, att_type, desc)
elif att_type == 'Dict':
return self.parse_dict(
manager, schema, entry_name, att_name, att_type, desc)
elif att_type in NON_COMPOSITES or manager.is_imported(att_type):
return self.parse_non_composite(
manager, att_name, att_type, desc, default_val)
raise UnsupportedTypeException(
f"{att_type} is not a supported type.")
def parse_schema(self, schema: Dict, source_json_file: str,
merged_schema: List[Dict]):
r""" Generates ontology code for a parsed schema extracted from a
json config. Appends entry code to the corresponding module. Creates a
new module file if module is generated for the first time.
Args:
schema: Ontology dictionary extracted from a json config.
source_json_file: Path of the source json file.
merged_schema: The merged schema is used to remember all
definitions during parsing.
Returns:
Modules to be imported by dependencies of the current ontology.
"""
entry_definitions: List[Dict] = schema[SchemaKeywords.definitions]
merged_schema.extend(entry_definitions)
allowed_packages = set(
schema.get(SchemaKeywords.prefixes, []) + [DEFAULT_PREFIX])
sorted_prefixes = analyze_packages(allowed_packages)
file_desc = file_header(
schema.get(SchemaKeywords.description, ""),
schema.get(SchemaKeywords.ontology_name, "")
)
for definition in entry_definitions:
raw_entry_name = definition[SchemaKeywords.entry_name]
validate_entry(raw_entry_name, sorted_prefixes)
if raw_entry_name in self.allowed_types_tree:
def parse_entry(self, entry_name: EntryName,
schema: Dict) -> Tuple[EntryDefinition, List[str]]:
"""
Args:
entry_name: Object holds various name form of the entry.
schema: Dictionary containing specifications for an entry.
Returns: extracted entry information: entry package string, entry
filename, entry class entry_name, generated entry code and entry
attribute names.
"""
this_manager = self.import_managers.get(entry_name.module_name)
# Determine the parent entry of this entry.
parent_entry: str = schema[SchemaKeywords.parent_entry]
if parent_entry.startswith(TOP_MOST_MODULE_NAME):
raise ParentEntryNotSupportedException(
f"The parent entry {parent_entry} cannot be directly inherited,"
f" please inherit a type from {top.__name__} or your own"
f" ontology."
)
if not this_manager.is_imported(parent_entry):
raise ParentEntryNotDeclaredException(
f"The parent entry {parent_entry} is not declared. It is "
f"neither in the base entries nor in custom entries. "
f"Please check them ontology specification, and make sure the "
f"entry is defined before this."
)
attribute_type = 'type'
parent_type = 'parent_type'
child_type = 'child_type'
member_type = 'member_type'
default_value = 'default'
element_type = 'item_type'
dict_key_type = 'key_type'
dict_value_type = 'value_type'
REQUIRED_IMPORTS: List[str] = ['typing']
TOP_MOST_MODULE_NAME = 'forte.data.ontology.core'
DEFAULT_CONSTRAINTS_KEYS = {
"BaseLink": {SchemaKeywords.parent_type: "ParentType",
SchemaKeywords.child_type: "ChildType"},
"BaseGroup": {SchemaKeywords.member_type: "MemberType"}
}
AUTO_GEN_SIGNATURE = '***automatically_generated***'
AUTO_GEN_FILENAME = '.generated'
AUTO_DEL_FILENAME = '.deleted'
SOURCE_JSON_PFX = "***source json:"
SOURCE_JSON_SFX = "***"
SOURCE_JSON_TEMP = Template(f"{SOURCE_JSON_PFX}$file_path{SOURCE_JSON_SFX}")
def get_ignore_error_lines(json_filepath: str) -> List[str]:
source_json_sign = SOURCE_JSON_TEMP.substitute(file_path=json_filepath)
return [