Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
state = State.On
status = "Motor HW is ON"
if m.isInMotion():
state = State.Moving
status = "Motor HW is MOVING"
m.getCurrentUserPosition()
switchstate = 0
if m.hitLowerLimit():
switchstate |= MotorController.LowerLimitSwitch
state = State.Alarm
status = "Motor HW is in ALARM. Hit hardware lower limit switch"
if m.hitUpperLimit():
switchstate |= MotorController.UpperLimitSwitch
state = State.Alarm
status = "Motor HW is in ALARM. Hit hardware upper limit switch"
if state != State.Alarm and not m.hasPower():
state = State.Off
status = "Motor is powered off"
#self._log.info("StateOne(%s) = %s", axis, (state, status, switchstate))
return state, status, switchstate
def re_init(self):
self.set_state(State.Init, propagate=2)
status = "{0} is Initializing (temporarily unavailable)".format(self.name)
self.set_status(status, propagate=2)
manager = self.pool.ctrl_manager
old_e_ids = self._element_ids
old_p_e_ids = self._pending_element_ids
elem_axis = dict(self._element_axis)
for axis in elem_axis:
self.remove_axis(axis, propagate=0)
if self._lib_info is None:
mod_name = self.get_library_name()
else:
mod_name = self._lib_info.name
if self._ctrl_info is None:
state_info[elem] = si
for elem, elem_state_info in list(state_info.items()):
elem_type = elem.get_type()
if elem_type == ElementType.External:
continue
u_state, u_status = self._calculate_element_state(
elem, elem_state_info)
if u_state == State.Moving:
moving.add(elem)
elif u_state == State.On:
on.add(elem)
elif u_state == State.Fault:
fault.add(elem)
elif u_state == State.Alarm:
alarm.add(elem)
elif u_state is State.Unknown:
unknown.add(elem)
elif u_state is None:
none.add(elem)
status.append(u_status)
state = State.On
if none or unknown:
state = State.Unknown
if fault:
state = State.Fault
elif alarm:
state = State.Alarm
elif moving:
state = State.Moving
self._state_statistics = {State.On: on, State.Fault: fault,
State.Alarm: alarm, State.Moving: moving,
State.Unknown: unknown, None: none}
def calculate_state_info(self, status_info=None):
# Refer to Position.__init__ method for an explanation on this 'hack'
if not self._position._listeners_configured:
for position_attr in self.get_physical_position_attribute_iterator():
position_attr.add_listener(self._position.on_change)
self._position._listeners_configured = True
if status_info is None:
status_info = self._state, self._status
state, status = status_info
if state == State.On:
state_str = "Stopped"
else:
state_str = "in " + State[state]
new_status = self._STD_STATUS.format(name=self.name, state=state_str,
ctrl_status=status)
return status_info[0], new_status
def on_door_changed(self, event_source, event_type, event_value):
# during server startup and shutdown avoid processing element
# creation events
if SardanaServer.server_state != State.Running:
return
timestamp = time.time()
name = event_type.name.lower()
multi_attr = self.get_device_attr()
try:
attr = multi_attr.get_attr_by_name(name)
except DevFailed:
return
if name == "state":
event_value = self.calculate_tango_state(event_value)
elif name == "status":
event_value = self.calculate_tango_status(event_value)
inst, props, *args, **kwargs)
# initialize hardware communication
self.springfield = springfieldlib.SpringfieldMotorHW()
# do some initialization
self._motors = {}
def AddDevice(self, axis):
self._motors[axis] = True
def DeleteDevice(self, axis):
del self._motors[axis]
StateMap = {
1: State.On,
2: State.Moving,
3: State.Fault,
}
def StateOne(self, axis):
springfield = self.springfield
state = self.StateMap[springfield.getState(axis)]
status = springfield.getStatus(axis)
limit_switches = MotorController.NoLimitSwitch
hw_limit_switches = springfield.getLimits(axis)
if hw_limit_switches[0]:
limit_switches |= MotorController.HomeLimitSwitch
if hw_limit_switches[1]:
limit_switches |= MotorController.UpperLimitSwitch
if hw_limit_switches[2]:
def StateOne(self, axis):
"""Get the dummy trigger/gate state"""
self._log.debug('StateOne(%d): entering...' % axis)
sta = State.On
status = "Stopped"
idx = axis - 1
if self.tg[idx].isGenerating():
sta = State.Moving
status = "Moving"
self._log.debug('StateOne(%d): returning (%s, %s)' % (axis, sta, status))
return sta, status
self._first_read_cache = True
else:
if isinstance(event_value, SardanaAttribute):
if event_value.error:
error = Except.to_dev_failed(*event_value.exc_info)
else:
value = event_value.value
timestamp = event_value.timestamp
else:
value = event_value
if name == "timer" and value is None:
value = "None"
elif name == "value":
w_value = event_source.get_value_attribute().w_value
state = self.oned.get_state()
if state == State.Moving:
quality = AttrQuality.ATTR_CHANGING
self.set_attribute(attr, value=value, w_value=w_value,
timestamp=timestamp, quality=quality,
priority=priority, error=error, synch=False)
def StateOne(self, axis):
self._log.debug('StateOne(%d): entering...' % axis)
idx = axis - 1
sta = State.On
status = "Stopped"
if self._armed:
sta = State.Moving
status = "Armed"
elif axis in self.counting_channels:
channel = self.channels[idx]
now = time.time()
elapsed_time = now - self.start_time
self._updateChannelState(axis, elapsed_time)
if channel.is_counting:
sta = State.Moving
status = "Acquiring"
ret = (sta, status)
self._log.debug('StateOne(%d): returning %s' % (axis, repr(ret)))
return sta, status