Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""Loads an entity mapping file.
Args:
entity_type (str): The name of the entity
"""
file_path = path.get_entity_map_path(self.app_path, entity_type)
logger.debug("Loading entity map from file '%s'", file_path)
if not os.path.isfile(file_path):
logger.warning("Entity map file not found at %s", file_path)
json_data = {}
else:
try:
with open(file_path, "r") as json_file:
json_data = json.load(json_file)
except json.JSONDecodeError:
raise MindMeldError(
"Could not load entity map (Invalid JSON): {!r}".format(file_path)
)
self._entity_files[entity_type]["mapping"]["data"] = json_data
self._entity_files[entity_type]["mapping"]["loaded"] = time.time()
pass
class ProcessorError(MindMeldError):
"""An exception which indicates an error with a processor."""
pass
class ParserTimeout(MindMeldError):
"""An exception for when parsing takes an unexpected length of time"""
pass
class MarkupError(MindMeldError):
pass
class SystemEntityMarkupError(MarkupError):
pass
class SystemEntityResolutionError(MindMeldError):
"""An exception representing an error resolving a system entity"""
pass
class KnowledgeBaseError(MindMeldError):
"""An exception for unexpected error from knowledge base."""
class MarkupError(MindMeldError):
pass
class SystemEntityMarkupError(MarkupError):
pass
class SystemEntityResolutionError(MindMeldError):
"""An exception representing an error resolving a system entity"""
pass
class KnowledgeBaseError(MindMeldError):
"""An exception for unexpected error from knowledge base."""
pass
class KnowledgeBaseConnectionError(KnowledgeBaseError):
"""An exception for problem connecting to knowledge base."""
def __init__(self, es_host):
self.es_host = es_host
if (not es_host) or (not es_host[0]):
self.message = (
"Unable to connect to Elasticsearch for knowledge base. Please"
" verify your connection to localhost."
)
def _check_query_entities(self, queries):
entity_types = path.get_entity_types(self.app_path)
for query in queries:
for entity in query.entities:
if (
entity.entity.type not in entity_types
and not entity.entity.is_system_entity
):
msg = "Unknown entity {!r} found in query {!r}"
raise MindMeldError(
msg.format(entity.entity.type, query.query.text)
)
self.message = message
if status_code is not None:
self.status_code = status_code
self.payload = payload
def to_dict(self):
obj = dict(self.payload or ())
obj["error"] = self.message
return obj
class EmbeddingDownloadError(MindMeldError):
pass
class AllowedNlpClassesKeyError(MindMeldError):
pass
class ClassifierLoadError(MindMeldError):
pass
class ProcessorError(MindMeldError):
"""An exception which indicates an error with a processor."""
pass
class ParserTimeout(MindMeldError):
"""An exception for when parsing takes an unexpected length of time"""
"Unable to connect to Elasticsearch for entity resolution. "
"Please verify your connection to: {hosts}.".format(
hosts=", ".join(es_host)
)
)
class AuthNotFoundError(MindMeldError):
pass
class MindMeldVersionError(MindMeldError):
pass
class MindMeldImportError(MindMeldError):
pass
def to_dict(self):
obj = dict(self.payload or ())
obj["error"] = self.message
return obj
class EmbeddingDownloadError(MindMeldError):
pass
class AllowedNlpClassesKeyError(MindMeldError):
pass
class ClassifierLoadError(MindMeldError):
pass
class ProcessorError(MindMeldError):
"""An exception which indicates an error with a processor."""
pass
class ParserTimeout(MindMeldError):
"""An exception for when parsing takes an unexpected length of time"""
pass
class MarkupError(MindMeldError):
class ParserTimeout(MindMeldError):
"""An exception for when parsing takes an unexpected length of time"""
pass
class MarkupError(MindMeldError):
pass
class SystemEntityMarkupError(MarkupError):
pass
class SystemEntityResolutionError(MindMeldError):
"""An exception representing an error resolving a system entity"""
pass
class KnowledgeBaseError(MindMeldError):
"""An exception for unexpected error from knowledge base."""
pass
class KnowledgeBaseConnectionError(KnowledgeBaseError):
"""An exception for problem connecting to knowledge base."""
def __init__(self, es_host):