Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.train = False
data = json.load(open(data_path))
cui_counts = {}
if test_size == 0:
test_set = data
train_set = data
else:
train_set, test_set, _, _ = make_mc_train_test(data, self.cdb, test_size=test_size)
if print_stats:
self._print_stats(test_set, use_filters=use_filters, use_cui_doc_limit=use_cui_doc_limit, use_overlaps=use_overlaps,
use_groups=use_groups)
if reset_cdb:
self.cdb = CDB()
self.spacy_cat.cdb = self.cdb
self.spacy_cat.cat_ann.cdb = self.cdb
if reset_cui_count:
# Get all CUIs
cuis = []
for project in train_set['projects']:
for doc in project['documents']:
for ann in doc['annotations']:
cuis.append(ann['cui'])
for cui in set(cuis):
if cui in self.cdb.cui_count:
self.cdb.cui_count[cui] = 10
# Remove entites that were terminated
if not never_terminate:
from flask import Flask
from medcat.cdb import CDB
from medcat.utils.spacy_pipe import SpacyPipe
from medcat.utils.vocab import Vocab
from medcat.cat import CAT
from flask import request
import os
import json
from spacy import displacy
vocab = Vocab()
cdb = CDB()
cdb.load_dict(os.getenv("CDB_MODEL", '/cat/models/med_ann_norm.dat'))
vocab.load_dict(path=os.getenv("VOCAB_MODEL", '/cat/models/med_ann_norm_dict.dat'))
cat = CAT(cdb, vocab=vocab)
cat.spacy_cat.train = False
app = Flask(__name__)
@app.route('/api_test', methods=['GET', 'POST'])
def api_test():
if request.method == 'POST':
return cat.get_json(request.form.get('text'))
content = get_file('api_test.html')
return content
@app.route('/doc', methods=['POST'])
import json
import pandas
import spacy
from time import sleep
from functools import partial
from multiprocessing import Process, Manager, Queue, Pool, Array
from medcat.cdb import CDB
from medcat.spacy_cat import SpacyCat
from medcat.preprocessing.tokenizers import spacy_split_all
from medcat.utils.spelling import CustomSpellChecker
from medcat.utils.spacy_pipe import SpacyPipe
from medcat.preprocessing.cleaners import spacy_tag_punct
from medcat.utils.helpers import get_all_from_name, tkn_inds_from_doc
from medcat.utils.loggers import basic_logger
log = basic_logger("CAT")
# Check scispacy models
from medcat.utils.helpers import check_scispacy
check_scispacy()
class CAT(object):
""" Annotate a dataset
"""
SEPARATOR = ""
NESTED_ENTITIES = os.getenv("NESTED_ENTITIES", 'false').lower() == 'true'
KEEP_PUNCT = os.getenv("KEEP_PUNCT", ":|.").split("|")
def __init__(self, cdb, vocab=None, skip_stopwords=True, meta_cats=[]):
self.cdb = cdb
self.vocab = vocab
# Build the required spacy pipeline
from flask import Flask
from medcat.cdb import CDB
from medcat.utils.spacy_pipe import SpacyPipe
from medcat.utils.vocab import Vocab
from medcat.cat import CAT
from flask import request
import os
import json
from spacy import displacy
vocab = Vocab()
cdb = CDB()
cdb.load_dict(os.getenv("CDB_MODEL", '/cat/models/med_ann_norm.dat'))
vocab.load_dict(path=os.getenv("VOCAB_MODEL", '/cat/models/med_ann_norm_dict.dat'))
cat = CAT(cdb, vocab=vocab)
cat.spacy_cat.train = False
app = Flask(__name__)
@app.route('/api_test', methods=['GET', 'POST'])
def api_test():
if request.method == 'POST':
return cat.get_json(request.form.get('text'))
content = get_file('api_test.html')
return content
from flask import Flask
from medcat.cdb import CDB
from medcat.utils.spacy_pipe import SpacyPipe
from medcat.utils.vocab import Vocab
from medcat.cat import CAT
from flask import request
import os
import json
from spacy import displacy
vocab = Vocab()
cdb = CDB()
cdb.load_dict(os.getenv("CDB_MODEL", '/cat/models/med_ann_norm.dat'))
vocab.load_dict(path=os.getenv("VOCAB_MODEL", '/cat/models/med_ann_norm_dict.dat'))
cat = CAT(cdb, vocab=vocab)
cat.spacy_cat.train = False
app = Flask(__name__)
@app.route('/api_test', methods=['GET', 'POST'])
def api_test():
if request.method == 'POST':
return cat.get_json(request.form.get('text'))
content = get_file('api_test.html')
return content
@app.route('/doc', methods=['POST'])
def show_annotated_document():
doc = cat(request.form.get('text'))
return displacy.render(doc, style='ent')
markup = ""
offset = 0
text = doc.text
for span in list(doc.ents):
start = span.start_char
end = span.end_char
fragments = text[offset:start].split("\n")
for i, fragment in enumerate(fragments):
markup += html.escape(fragment)
if len(fragments) > 1 and i != len(fragments) - 1:
markup += "<br>"
ent = {'label': '', 'id': span._.id, 'bg': "rgb(74, 154, 239, {})".format(span._.acc * span._.acc + 0.12), 'text': html.escape(span.text)}
# Add the entity
markup += TPL_ENT.format(**ent)
offset = end
markup += html.escape(text[offset:])
out = TPL_ENTS.format(content=markup, dir='ltr')
return out
markup = ""
offset = 0
text = doc['text']
for span in list(doc['entities']):
start = span['start']
end = span['end']
fragments = text[offset:start].split("\n")
for i, fragment in enumerate(fragments):
markup += html.escape(fragment)
if len(fragments) > 1 and i != len(fragments) - 1:
markup += "<br>"
ent = {'label': '', 'id': span['id'], 'bg': "rgb(74, 154, 239, {})".format(1 * 1 + 0.12), 'text': html.escape(span['str'])}
# Add the entity
markup += TPL_ENT.format(**ent)
offset = end
markup += html.escape(text[offset:])
out = TPL_ENTS.format(content=markup, dir='ltr')
return out
for span in list(doc.ents):
start = span.start_char
end = span.end_char
fragments = text[offset:start].split("\n")
for i, fragment in enumerate(fragments):
markup += html.escape(fragment)
if len(fragments) > 1 and i != len(fragments) - 1:
markup += "<br>"
ent = {'label': '', 'id': span._.id, 'bg': "rgb(74, 154, 239, {})".format(span._.acc * span._.acc + 0.12), 'text': html.escape(span.text)}
# Add the entity
markup += TPL_ENT.format(**ent)
offset = end
markup += html.escape(text[offset:])
out = TPL_ENTS.format(content=markup, dir='ltr')
return out
for span in list(doc['entities']):
start = span['start']
end = span['end']
fragments = text[offset:start].split("\n")
for i, fragment in enumerate(fragments):
markup += html.escape(fragment)
if len(fragments) > 1 and i != len(fragments) - 1:
markup += "<br>"
ent = {'label': '', 'id': span['id'], 'bg': "rgb(74, 154, 239, {})".format(1 * 1 + 0.12), 'text': html.escape(span['str'])}
# Add the entity
markup += TPL_ENT.format(**ent)
offset = end
markup += html.escape(text[offset:])
out = TPL_ENTS.format(content=markup, dir='ltr')
return out
negs = self.vocab.get_negative_samples(n=self.CNTX_SPAN * 2, ignore_punct_and_num=True, stopwords=STOP_WORDS)
neg_cntx_vecs = [self.vocab.vec(self.vocab.index2word[x]) for x in negs]
neg_cntx = np.average(neg_cntx_vecs, axis=0)
self.cdb.add_context_vec(cui, neg_cntx, negative=True, cntx_type='MED',
inc_cui_count=False, lr=lr, anneal=True)
#### DEBUG ONLY ####
if self.DEBUG:
if cui in self.cdb.cui2context_vec and len(cntx_vecs) > 0:
if np.dot(unitvec(cntx), unitvec(self.cdb.cui2context_vec[cui])) < 0.01:
log.debug("SIMILARITY MED::::::::::::::::::::")
log.debug(words)
log.debug(cui)
log.debug(tkns)
log.debug(np.dot(unitvec(cntx),
unitvec(self.cdb.cui2context_vec[cui])))
log.debug(":::::::::::::::::::::::::::::::::::\n")
if cui in self.cdb.cui2context_vec_short and len(cntx_vecs_short) > 0:
if np.dot(unitvec(cntx_short), unitvec(self.cdb.cui2context_vec_short[cui])) < 0.01:
log.debug("SIMILARITY SHORT::::::::::::::::::::")
log.debug(words_short)
log.debug(cui)
log.debug(tkns)
log.debug(np.dot(unitvec(cntx_short),
unitvec(self.cdb.cui2context_vec[cui])))
log.debug(":::::::::::::::::::::::::::::::::::\n")