Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# or an error occurred
if vl == VL.TOLERANT and (validation_level != VL.TOLERANT and not error_occurred):
continue
file_base_name = os.path.basename(hl7_file.name)
try:
msg_start = time.time()
msg = parse_message(msg_str, vl, find_groups=find_groups, message_profile=mp)
msg_end = time.time()
msg_per_versions[vl][msg.version] += 1
msg_per_type[vl][msg.msh.msh_9.to_er7()] += 1
parsing_time[vl].append((file_base_name, len(msg.children),
msg.msh.msh_9.to_er7(), msg_end - msg_start))
except MessageProfileNotFound:
continue
except InvalidEncodingChars:
if mp is None:
exceptions[vl].append({'type': 'parsing', 'ex': e, 'file_name': f, 'msg': msg_str})
if vl == VL.STRICT:
error_occurred = True
else:
continue
except Exception as e:
exceptions[vl].append({'type': 'parsing', 'ex': e, 'file_name': f, 'msg': msg_str})
if vl == VL.STRICT:
error_occurred = True
n_messages[vl] += 1
try:
encoding_start = time.time()
msg.to_er7()
def test_create_invalid_encoding_chars_message(self):
self.assertRaises(InvalidEncodingChars, Message, encoding_chars=_get_invalid_encoding_chars())
self.assertRaises(InvalidEncodingChars, Message, 'ADT_A01',
encoding_chars=_get_invalid_encoding_chars(),
validation_level=VALIDATION_LEVEL.STRICT)
def _split_msh(content):
m = re.match("^MSH(?P\S)", content)
if m is not None: # if the regular expression matches, it is an HL7 message
field_sep = m.group('field_sep') # get the field separator (first char after MSH)
msh = content.split("\r", 1)[0] # get the first segment
fields = msh.split(field_sep)
seps = fields[1] # get the remaining encoding chars (MSH.2)
if len(seps) > len(set(seps)):
raise InvalidEncodingChars("Found duplicate encoding chars")
try:
comp_sep, rep_sep, escape, sub_sep = seps
trunc_sep = None
except ValueError:
if len(seps) < N_SEPS:
raise InvalidEncodingChars('Missing required encoding chars')
elif len(seps) == N_SEPS_27 and fields[11] >= '2.7':
comp_sep, rep_sep, escape, sub_sep, trunc_sep = seps
else:
raise InvalidEncodingChars('Found {0} encoding chars'.format(len(seps)))
encoding_chars = {
'FIELD': field_sep,
'COMPONENT': comp_sep,
'SUBCOMPONENT': sub_sep,
m = re.match("^MSH(?P\S)", content)
if m is not None: # if the regular expression matches, it is an HL7 message
field_sep = m.group('field_sep') # get the field separator (first char after MSH)
msh = content.split("\r", 1)[0] # get the first segment
fields = msh.split(field_sep)
seps = fields[1] # get the remaining encoding chars (MSH.2)
if len(seps) > len(set(seps)):
raise InvalidEncodingChars("Found duplicate encoding chars")
try:
comp_sep, rep_sep, escape, sub_sep = seps
trunc_sep = None
except ValueError:
if len(seps) < N_SEPS:
raise InvalidEncodingChars('Missing required encoding chars')
elif len(seps) == N_SEPS_27 and fields[11] >= '2.7':
comp_sep, rep_sep, escape, sub_sep, trunc_sep = seps
else:
raise InvalidEncodingChars('Found {0} encoding chars'.format(len(seps)))
encoding_chars = {
'FIELD': field_sep,
'COMPONENT': comp_sep,
'SUBCOMPONENT': sub_sep,
'REPETITION': rep_sep,
'ESCAPE': escape,
'SEGMENT': '\r',
'GROUP': '\r',
}
if trunc_sep:
encoding_chars.update({'TRUNCATION': trunc_sep})
fields = msh.split(field_sep)
seps = fields[1] # get the remaining encoding chars (MSH.2)
if len(seps) > len(set(seps)):
raise InvalidEncodingChars("Found duplicate encoding chars")
try:
comp_sep, rep_sep, escape, sub_sep = seps
trunc_sep = None
except ValueError:
if len(seps) < N_SEPS:
raise InvalidEncodingChars('Missing required encoding chars')
elif len(seps) == N_SEPS_27 and fields[11] >= '2.7':
comp_sep, rep_sep, escape, sub_sep, trunc_sep = seps
else:
raise InvalidEncodingChars('Found {0} encoding chars'.format(len(seps)))
encoding_chars = {
'FIELD': field_sep,
'COMPONENT': comp_sep,
'SUBCOMPONENT': sub_sep,
'REPETITION': rep_sep,
'ESCAPE': escape,
'SEGMENT': '\r',
'GROUP': '\r',
}
if trunc_sep:
encoding_chars.update({'TRUNCATION': trunc_sep})
else:
raise ParserError("Invalid message")
def check_encoding_chars(encoding_chars):
"""
Validate the given encoding chars
:type encoding_chars: ``dict``
:param encoding_chars: the encoding chars (see :func:`hl7apy.set_default_encoding_chars`)
:raises: :exc:`hl7apy.exceptions.InvalidEncodingChars` if the given encoding chars are not valid
"""
if not isinstance(encoding_chars, MutableMapping):
raise InvalidEncodingChars
required = {'FIELD', 'COMPONENT', 'SUBCOMPONENT', 'REPETITION', 'ESCAPE'}
missing = required - set(encoding_chars.keys())
if missing:
raise InvalidEncodingChars('Missing required encoding chars')
values = [v for k, v in encoding_chars.items() if k in required]
if len(values) > len(set(values)):
raise InvalidEncodingChars('Found duplicate encoding chars')
Validate the given encoding chars
:type encoding_chars: ``dict``
:param encoding_chars: the encoding chars (see :func:`hl7apy.set_default_encoding_chars`)
:raises: :exc:`hl7apy.exceptions.InvalidEncodingChars` if the given encoding chars are not valid
"""
if not isinstance(encoding_chars, MutableMapping):
raise InvalidEncodingChars
required = {'FIELD', 'COMPONENT', 'SUBCOMPONENT', 'REPETITION', 'ESCAPE'}
missing = required - set(encoding_chars.keys())
if missing:
raise InvalidEncodingChars('Missing required encoding chars')
values = [v for k, v in encoding_chars.items() if k in required]
if len(values) > len(set(values)):
raise InvalidEncodingChars('Found duplicate encoding chars')