Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_send_call_with_timeout(connection):
cs = ChargePoint(
id=1234,
connection=connection,
response_timeout=0.1
)
payload = call.ResetPayload(type="Hard")
with pytest.raises(asyncio.TimeoutError):
await cs.call(payload)
# Verify that lock is released if call() crahses. Not releasing the lock
# in case of an exception could lead to a deadlock. See
# https://github.com/mobilityhouse/ocpp/issues/46
assert cs._call_lock.locked() is False
import websockets
except ModuleNotFoundError:
print("This example relies on the 'websockets' package.")
print("Please install it by running: ")
print()
print(" $ pip install websockets")
import sys
sys.exit(1)
from ocpp.routing import on
from ocpp.v16 import ChargePoint as cp
from ocpp.v16.enums import Action, RegistrationStatus
from ocpp.v16 import call_result
class ChargePoint(cp):
@on(Action.BootNotification)
def on_boot_notitication(self, charge_point_vendor, charge_point_model, **kwargs):
return call_result.BootNotificationPayload(
current_time=datetime.utcnow().isoformat(),
interval=10,
status=RegistrationStatus.accepted
)
async def on_connect(websocket, path):
""" For every new charge point that connects, create a ChargePoint instance
and start listening for messages.
"""
charge_point_id = path.strip('/')
cp = ChargePoint(charge_point_id, websocket)
import websockets
except ModuleNotFoundError:
print("This example relies on the 'websockets' package.")
print("Please install it by running: ")
print()
print(" $ pip install websockets")
import sys
sys.exit(1)
from ocpp.v16 import call
from ocpp.v16 import ChargePoint as cp
from ocpp.v16.enums import RegistrationStatus
class ChargePoint(cp):
async def send_boot_notification(self):
request = call.BootNotificationPayload(
charge_point_model="Optimus",
charge_point_vendor="The Mobility House"
)
response = await self.call(request)
if response.status == RegistrationStatus.accepted:
print("Connected to central system.")
async def main():
async with websockets.connect(
'ws://localhost:9000/CP_1',
subprotocols=['ocpp1.6']