Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def StateOne(self, axis):
#self._log.debug("StateOne(%d)", axis)
#raise Exception("Cannot StateOne %d" % axis)
idx = axis - 1
m = self.m[idx]
state = State.On
status = "Motor HW is ON"
if m.isInMotion():
state = State.Moving
status = "Motor HW is MOVING"
m.getCurrentUserPosition()
switchstate = 0
if m.hitLowerLimit():
switchstate |= MotorController.LowerLimitSwitch
state = State.Alarm
status = "Motor HW is in ALARM. Hit hardware lower limit switch"
if m.hitUpperLimit():
switchstate |= MotorController.UpperLimitSwitch
state = State.Alarm
status = "Motor HW is in ALARM. Hit hardware upper limit switch"
if state != State.Alarm and not m.hasPower():
state = State.Off
status = "Motor is powered off"
#self._log.info("StateOne(%s) = %s", axis, (state, status, switchstate))
return state, status, switchstate
def StateOne(self, axis):
"""Get the dummy trigger/gate state"""
try:
self._log.debug('StateOne(%d): entering...' % axis)
sta = State.On
status = "Stopped"
idx = axis - 1
tg = self.tg[idx]
if tg.is_started():
sta = State.Moving
status = "Started"
if tg.is_running():
status = "Running"
self._log.debug('StateOne(%d): returning (%s, %s)' % \
(axis, sta, status))
except Exception, e:
print e
return sta, status
if name == "state":
value = self.calculate_tango_state(event_value)
elif name == "status":
value = self.calculate_tango_status(event_value)
else:
if isinstance(event_value, SardanaAttribute):
if event_value.error:
error = Except.to_dev_failed(*event_value.exc_info)
else:
value = event_value.value
timestamp = event_value.timestamp
if name == "value":
state = self.twod.get_state()
if state == State.Moving:
quality = AttrQuality.ATTR_CHANGING
self.set_attribute(attr, value=value, w_value=w_value,
timestamp=timestamp, quality=quality,
priority=priority, error=error, synch=False)
elif u_state is State.Unknown:
unknown.add(elem)
elif u_state is None:
none.add(elem)
status.append(u_status)
state = State.On
if none or unknown:
state = State.Unknown
if fault:
state = State.Fault
elif alarm:
state = State.Alarm
elif moving:
state = State.Moving
self._state_statistics = {State.On: on, State.Fault: fault,
State.Alarm: alarm, State.Moving: moving,
State.Unknown: unknown, None: none}
status = "\n".join(status)
return state, status
value = self.calculate_tango_status(event_value)
elif name == "valuebuffer":
value = self._encode_value_chunk(event_value)
elif name == "value":
if isinstance(event_value, SardanaAttribute):
if event_value.error:
error = Except.to_dev_failed(*event_value.exc_info)
else:
value = event_value.value
timestamp = event_value.timestamp
else:
value = event_value
if name == "value":
state = self.zerod.get_state()
if state == State.Moving:
quality = AttrQuality.ATTR_CHANGING
self.set_attribute(attr, value=value, timestamp=timestamp,
quality=quality, priority=priority, error=error,
synch=False)
def StateOne(self, axis):
"""Get the specified motor state"""
springfield = self.springfield
state = springfield.getState(axis)
if state == 1:
return State.On, "Motor is stopped"
elif state == 2:
return State.Moving, "Motor is moving"
elif state == 3:
return State.Fault, "Motor has an error"
def read_Value(self, attr):
twod = self.twod
use_cache = twod.is_in_operation() and not self.Force_HW_Read
value = twod.get_value(cache=use_cache, propagate=0)
if value.error:
Except.throw_python_exception(*value.exc_info)
state = twod.get_state(cache=use_cache, propagate=0)
quality = None
if state == State.Moving:
quality = AttrQuality.ATTR_CHANGING
self.set_attribute(attr, value=value.value, quality=quality,
timestamp=value.timestamp, priority=0)
def StateOne(self, axis):
idx = axis - 1
sta = State.On
status = "Stopped"
if axis in self.counting_channels:
channel = self.channels[idx]
now = time.time()
elapsed_time = now - self.start_time
self._updateChannelState(axis, elapsed_time)
if channel.is_counting:
sta = State.Moving
status = "Acquiring"
return sta, status
else:
if isinstance(event_value, SardanaAttribute):
if event_value.error:
error = Except.to_dev_failed(*event_value.exc_info)
else:
value = event_value.value
timestamp = event_value.timestamp
else:
value = event_value
state = self.motor.get_state(propagate=0)
if name == "position":
w_value = event_source.get_position_attribute().w_value
if state == State.Moving:
quality = AttrQuality.ATTR_CHANGING
elif name == "dialposition" and state == State.Moving:
quality = AttrQuality.ATTR_CHANGING
self.set_attribute(attr, value=value, w_value=w_value,
timestamp=timestamp, quality=quality,
priority=priority, error=error, synch=False)
fault, alarm, on, moving = set(), set(), set(), set()
status = []
if state_info is None:
state_info = {}
for elem in user_elements:
if elem.get_type() == ElementType.External:
continue
# cannot call get_state(us) here since it may lead to dead lock!
si = elem.inspect_state(), elem.inspect_status()
state_info[elem] = si
for elem, elem_state_info in state_info.items():
elem_type = elem.get_type()
if elem_type == ElementType.External:
continue
u_state, u_status = self._calculate_element_state(elem, elem_state_info)
if u_state == State.Moving:
moving.add(elem)
elif u_state == State.On:
on.add(elem)
elif u_state == State.Fault:
fault.add(elem)
elif u_state == State.Alarm:
alarm.add(elem)
elif u_state is State.Unknown:
unknown.add(elem)
elif u_state is None:
none.add(elem)
status.append(u_status)
state = State.On
if none or unknown:
state = State.Unknown
if fault: