Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def main():
args = get_args()
app = args.app[0]
key = args.key[0]
secret = args.secret[0]
port = args.port[0]
baud = args.baud[0]
rx_channel, rx_event = args.rx_channel_event[0].split(':')
tx_channel, tx_event = args.tx_channel_event[0].split(':')
push = pusher.Pusher(
app_id=app, key=key, secret=secret, ssl=True, port=443)
push_client = pusherclient.Pusher(key, secret=secret)
with PySerialDriver(port, baud) as driver:
with Handler(Framer(driver.read, driver.write)) as handler:
def push_it(sbp_msg):
push.trigger(tx_channel, tx_event, sbp_msg.to_json_dict())
def pull_it(data):
handler(SBP.from_json(data))
def connect_it(data):
push_client.subscribe(rx_channel).bind(rx_event, pull_it)
push_client.connection.bind('pusher:connection_established',
connect_it)
push_client.connect()
try:
for msg, metadata in handler.filter(OBS_MSG_LIST):
def to_json_dict(self):
self.to_binary()
d = super( MsgPosECEFGnss, self).to_json_dict()
j = walk_json_dict(exclude_fields(self))
d.update(j)
return d
def to_binary(self):
"""Produce a framed/packed SBP message.
"""
c = containerize(exclude_fields(self))
self.payload = MsgAlmanacGloDep._parser.build(c)
return self.pack()
def to_binary(self):
"""Produce a framed/packed SBP message.
"""
c = containerize(exclude_fields(self))
self.payload = MsgAlmanacGPS._parser.build(c)
return self.pack()
def to_binary(self):
"""Produce a framed/packed SBP message.
"""
c = containerize(exclude_fields(self))
self.payload = MsgDopsDepA._parser.build(c)
return self.pack()
def into_buffer(self, buf, offset):
"""Produce a framed/packed SBP message into the provided buffer and offset.
"""
self.payload = containerize(exclude_fields(self))
self.parser = MsgGPSTime._parser
self.stream_payload.reset(buf, offset)
return self.pack_into(buf, offset, self._build_payload)
def to_binary(self):
"""Produce a framed/packed SBP message.
"""
c = containerize(exclude_fields(self))
self.payload = MsgThreadState._parser.build(c)
return self.pack()
def to_binary(self):
"""Produce a framed/packed SBP message.
"""
c = containerize(exclude_fields(self))
self.payload = MsgEphemerisGPS._parser.build(c)
return self.pack()
def to_binary(self):
"""Produce a framed/packed SBP message.
"""
c = containerize(exclude_fields(self))
self.payload = MsgEphemerisGloDepA._parser.build(c)
return self.pack()
def into_buffer(self, buf, offset):
"""Produce a framed/packed SBP message into the provided buffer and offset.
"""
self.payload = containerize(exclude_fields(self))
self.parser = MsgTrackingState._parser
self.stream_payload.reset(buf, offset)
return self.pack_into(buf, offset, self._build_payload)