How to use the hl7apy.parser.parse_message function in hl7apy

To help you get started, we’ve selected a few hl7apy examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github crs4 / hl7apy / tests / test_parser.py View on Github external
def test_parse_version_27_message(self):
        m = parse_message(self.rsp_k21_27)
        self.assertEqual(m.msh.msh_2.to_er7(), '^~\\&#')
        self.assertEqual(m.encoding_chars['TRUNCATION'], '#')
        self.assertEqual(m.to_er7(), self.rsp_k21_27)
github crs4 / hl7apy / tests / test_validation.py View on Github external
def _create_message(self, msg_str):
        return parse_message(msg_str, message_profile=self.rsp_k21_mp)
github crs4 / hl7apy / tests / test_validation.py View on Github external
def _create_message(self, msg_str):
        return parse_message(msg_str)
github crs4 / hl7apy / tests / test_parser.py View on Github external
def test_parse_message_missing_version(self):
        m = parse_message('MSH|^~\\&|SEND APP|SEND FAC|REC APP|REC FAC|20080115153000||ADT^A01^ADT_A01||')
github crs4 / hl7apy / tests / test_parser.py View on Github external
def test_parse_message_ignoring_groups(self):
        msh = 'MSH|^~\\&|SEND APP|SEND FAC|REC APP|REC FAC|20080115153000||ADT^A01^ADT_A01|0123456789|P|2.5||||AL\r'
        evn = 'EVN||20080115153000||AAA|AAA|20080114003000\r'
        pid = 'PID|1||123-456-789^^^HOSPITAL^MR||SURNAME^NAME^A|||M|||1111 SOMEWHERE^^SOMEWHERE^^^USA||555~444|||M\r'
        nk1 = 'NK1|1|WOMAN^WIFE|SPO|1111 SOMEWHERE^^SOMEWHERE^^^USA\r'
        pv1 = 'PV1|1|I|PATIENT WARD|U||||^REFER^DOCTOR^^MD|^CONSUL^DOC|CAR||||2|A0|||||||||||||||||||||||||||||2013\r'
        in1 = 'IN1|1|INSURANCE PLAN ID^PLAN DESC|COMPANY ID|INS CNY, INC.|5555 INSURERS STREET^^SOMEWHERE^^^USA'

        str_message = msh + evn + pid + nk1 + pv1 + in1
        message = parse_message(str_message, find_groups=False)

        self.assertEqual(message.children[0].name, 'MSH')
        self.assertEqual(message.children[1].name, 'EVN')
        self.assertEqual(message.children[2].name, 'PID')
        self.assertEqual(message.children[3].name, 'NK1')
        self.assertEqual(message.children[4].name, 'PV1')
        self.assertEqual(message.children[5].name, 'IN1')
        self.assertEqual(message.pid.pid_5.pid_5_1.fn_1.to_er7(), 'SURNAME')
        self.assertEqual(message.evn.evn_5.xcn_1.to_er7(), message.evn.evn_5.evn_5_1.to_er7())
        m_string = message.to_er7()
        self.assertEqual(m_string, str_message)
github crs4 / hl7apy / tests / test_core.py View on Github external
def test_assign_value(self):
        msg = _get_test_msg()
        a = Message('OML_O33', validation_level=VALIDATION_LEVEL.TOLERANT)
        parsed_a = parse_message(msg, validation_level=VALIDATION_LEVEL.TOLERANT)
        a.value = msg
        self.assertEqual(a.to_er7(), parsed_a.to_er7())

        b = Message('OML_O33', validation_level=VALIDATION_LEVEL.STRICT)
        b.value = msg
        parsed_b = parse_message(msg, validation_level=VALIDATION_LEVEL.STRICT)
        self.assertEqual(b.to_er7(), parsed_b.to_er7())
        self.assertEqual(list(b.children.indexes.keys()), list(parsed_b.children.indexes.keys()))

        c = Message('ADT_A01', validation_level=VALIDATION_LEVEL.TOLERANT)
        with self.assertRaises(OperationNotAllowed):
            c.value = msg

        msg = msg.replace('^', 'x')
        with self.assertRaises(OperationNotAllowed):
            a.value = msg
github crs4 / hl7apy / examples / lab62 / actor.py View on Github external
def receive(message):
        # print to stdout the data received
        print("Received by LB")
        try:
            # parse the incoming HL7 message
            m = parse_message(message, find_groups=False)
        except:
            print('parsing failed', repr(message))
        else:
            print("Message type:", m.MSH.message_type.to_er7())
            print("Message content:", repr(m.to_er7()))
            surname, name = m.PID.PID_5.family_name.to_er7(), m.PID.PID_5.given_name.to_er7()
            print("Patient data:", surname, name)
github crs4 / hl7apy / paper_examples / pdq_example / client.py View on Github external
def query(host, port):
    msg = \
        'MSH|^~\&|REC APP|REC FAC|SENDING APP|SENTING FAC|20110708163513||QBP^Q22^QBP_Q21|111069|D|2.5|||||ITA||EN\r' \
        'QPD|IHE PDQ Query|111069|@PID.5.1.1^PATIENT_SURNAME||||\r' \
        'RCP|I|'
    # establish the connection
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        sock.connect((host, port))
        # send the message
        sock.sendall(parse_message(msg).to_mllp())
        # receive the answer
        received = sock.recv(1024*1024)
        return received
    finally:
        sock.close()
github crs4 / hl7apy / examples / iti_21 / server.py View on Github external
def __init__(self, message):
        msg = parse_message(message, message_profile=self.REQ_MP)
        super(PDQSupplier, self).__init__(msg)
github crs4 / hl7apy / examples / iti_21 / server.py View on Github external
def reply(self):

        if isinstance(self.exc, UnsupportedMessageType):
            err_code, err_msg = 101, 'Unsupported message'
        elif isinstance(self.exc, InvalidHL7Message):
            err_code, err_msg = 102, 'Incoming message is not an HL7 valid message'
        else:
            err_code, err_msg = 100, 'Unknown error occurred'

        parsed_message = parse_message(self.incoming_message)

        m = Message("ACK")
        m.MSH.MSH_9 = "ACK^ACK"
        m.MSA.MSA_1 = "AR"
        m.MSA.MSA_2 = parsed_message.MSH.MSH_10
        m.ERR.ERR_1 = "%s" % err_code
        m.ERR.ERR_2 = "%s" % err_msg

        return m.to_mllp()