Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.log.info(msg)
# scroll informational message
msg = u' '.join(msgs)
yield self._display.scroll_message(msg)
# write the ZOLLHOF logo
self._display.set_raw_digit(0, 0b0001001)
self._display.set_raw_digit(1, 0b0111111)
self._display.set_raw_digit(2, 0b0110110)
self._display.set_raw_digit(3, 0b1110000)
self._display.set_raw_digit(4, 0b0111111)
self._display.set_raw_digit(5, 0b1110001)
self._display.write_display()
yield sleep(5)
def onJoin(self, details):
counter = 0
while True:
print("publish: com.myapp.topic1", counter)
yield self.publish(u'com.myapp.topic1', counter)
counter += 1
yield sleep(1)
counter=counter)
counter += 1
# CALL a remote procedure
#
try:
res = yield self.call('com.example.mul2', counter, 3)
self.log.info("mul2() called with result: {result}",
result=res)
except ApplicationError as e:
# ignore errors due to the frontend not yet having
# registered the procedure we would like to call
if e.error != 'wamp.error.no_such_procedure':
raise e
yield sleep(1)
try:
res = yield self.call('com.myapp.add2', counter, 7)
print("Got call result: {}".format(res))
except Exception as e:
print("Call failed: {}".format(e))
try:
publication = yield self.publish('com.myapp.topic1', counter,
options=PublishOptions(acknowledge=True, discloseMe=True, excludeMe=False))
print("Event published with publication ID {}".format(publication.id))
except Exception as e:
print("Publication failed: {}".format(e))
counter += 1
yield sleep(1)
def rainbow_cycle(self, wait_ms=20, iterations=5):
"""Draw rainbow that uniformly distributes itself across all pixels."""
self.log.info('rainbow-cycle animation starting ..')
for j in range(256 * iterations):
for i in range(self._leds.numPixels()):
r, g, b = self.wheel(int((i * 256 / self._leds.numPixels()) + j) & 255)
self.set_color(r, g, b, i)
yield sleep(wait_ms / 1000.0)
yield self.subscribe(on_event, u'com.myapp.topic1')
counter = 0
while True:
print("publish: com.myapp.topic1", counter)
pub_options = PublishOptions(
acknowledge=True,
exclude_me=False
)
publication = yield self.publish(
u'com.myapp.topic1', counter,
options=pub_options,
)
print("Published with publication ID {}".format(publication.id))
counter += 1
yield sleep(1)
raise TypeError('"off" must be an integer')
if self._is_beeping:
raise ApplicationError(u'{}.already-beeping'.format(self._prefix), 'currently already in a beeping sequence')
# run the beeping sequence ..
self.log.info('start beeping sequence: count={count}, on={on}, off={off}', count=count, on=on, off=off)
self._is_beeping = True
self.publish(u'{}.on_beep_started'.format(self._prefix), count=count, on=on, off=off)
for i in range(count):
GPIO.output(self._buzzer_pin, 1)
yield sleep(float(on) / 1000.)
GPIO.output(self._buzzer_pin, 0)
yield sleep(float(off) / 1000.)
self._is_beeping = False
self.publish(u'{}.on_beep_ended'.format(self._prefix))
workflow = read_json('lie_workflow_spec.json')
self.wf = WorkflowSpec(workflow=workflow)
self.wf.task_runner = self
# Update metadata
self.wf.workflow.nodes[self.wf.workflow.root].update(
self.session_config())
# Run the workflow
self.wf.workflow.nodes[1]['active'] = True
self.wf.next()
# Non blocking sleep until workflow is completed
maxitr = 5
while not self.wf.is_completed and maxitr > 0:
yield sleep(1)
maxitr -= 1
with open('project.json', 'w') as pf:
pf.write(write_json(self.wf.workflow))
print('LEAVING')
return
# Create project
project = {}
# Get protein structure from structure database
protein = yield self.task(
u'liestudio.structures.get_structure', 'protein')
project['protein'] = protein
print(protein)
"""button pressed handler"""
self.log.info('Button pressed handler on thread {thread_id}', thread_id=current_thread().ident)
""" if the button is pressed during the progress is running this Error will be fired"""
if self._is_pressed:
raise ApplicationError(u'{}.already-pressed'.format(self._prefix), 'Button is already pressed ')
self._is_pressed = True
self.log.info("Pressed")
""" publish event button_pressed"""
self.publish(u'{}.button_pressed'.format(self._prefix))
"""wait for a short time"""
yield sleep(1000 / 1000.)
self._is_pressed = False
""" publish event button_released"""
self.publish(u'{}.button_released'.format(self._prefix))
self.log.info("released")
def welcome(self):
"""
Play annoying beep sequence.
"""
self.log.info('start welcome beep sequence ..')
# sequence of 7 short beeps
yield self.beep(7)
# wait 0.5s
yield sleep(.5)
# another 3 longer beeps
yield self.beep(3, on=200, off=200)