Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_schema(message_type_id, action, ocpp_version, parse_float=float):
"""
Read schema from disk and return in. Reads will be cached for performance
reasons.
The `parse_float` argument can be used to set the conversion method that
is used to parse floats. It must be a callable taking 1 argument. By
default it is `float()`, but certain schema's require `decimal.Decimal()`.
"""
if ocpp_version not in ["1.6", "2.0"]:
raise ValueError
schemas_dir = 'v' + ocpp_version.replace('.', '')
schema_name = action
if message_type_id == MessageType.CallResult:
schema_name += 'Response'
elif message_type_id == MessageType.Call:
if ocpp_version == "2.0":
schema_name += 'Request'
if ocpp_version == "2.0":
schema_name += '_v1p0'
dir, _ = os.path.split(os.path.realpath(__file__))
relative_path = f'{schemas_dir}/schemas/{schema_name}.json'
path = os.path.join(dir, relative_path)
if relative_path in _schemas:
return _schemas[relative_path]
# The JSON schemas for OCPP 2.0 start with a byte order mark (BOM)
If the message is a of type Call the corresponding hooks are executed.
If the message is of type CallResult or CallError the message is passed
to the call() function via the response_queue.
"""
try:
msg = unpack(raw_msg)
except OCPPError as e:
LOGGER.exception("Unable to parse message: '%s', it doesn't seem "
"to be valid OCPP: %s", raw_msg, e)
return
if msg.message_type_id == MessageType.Call:
await self._handle_call(msg)
elif msg.message_type_id in \
[MessageType.CallResult, MessageType.CallError]:
self._response_queue.put_nowait(msg)