Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from .rest import RESTRobot
class AbstractServer(object):
def __init__(self, robot, host, port):
self.restful_robot = RESTRobot(robot)
self.host, self.port = host, port
def run(self):
raise NotImplementedError
try:
import zerorpc
class RemoteRobotServer(AbstractServer):
def run(self):
server = zerorpc.Server(self.restful_robot)
server.bind('tcp://{}:{}'.format(self.host, self.port))
server.run()
except ImportError:
pass
def __init__(self, robot, host, port):
""" A ZMQServer allowing remote access of a robot instance.
The server used the REQ/REP zmq pattern. You should always first send a request and then read the answer.
"""
AbstractServer.__init__(self, robot, host, port)
c = zmq.Context()
self.socket = c.socket(zmq.REP)
self.socket.bind('tcp://{}:{}'.format(self.host, self.port))
logger.info('Starting ZMQServer on tcp://%s:%s', self.host, self.port)
def __init__(self, robot, host, port, debug=False):
""" Serving a robot REST API using flask.
The documented API can be found on http://docs.pypot.apiary.io/
All URLs are described, with example of requests and responses.
"""
AbstractServer.__init__(self, robot, host, port)
self.robot = self.restfull_robot
self.app = Flask(__name__) # TODO: use something better than __name__ here
self.app.secret_key = os.urandom(24)
self.app.debug = debug
CORS(self.app)
# Robot's Heartbeat
@self.app.route('/')
def index():
return Response(status=204)
# Devices
@self.app.route('/devices')
def devices():
def __init__(self, robot, host='0.0.0.0', port='9009', quiet=True):
AbstractServer.__init__(self, robot, host, port)
WsSocketHandler.robot = robot
WsSocketHandler.quiet = quiet
def __init__(self, robot, host='0.0.0.0', port='6969', quiet=True):
AbstractServer.__init__(self, robot, host, port)
self.quiet = quiet
self.app = bottle.Bottle()
self.app.install(EnableCors())
rr = self.restfull_robot
# Copy Snap files from system directory to user directory. It avoids
# right issue while PyPot is installed from pip in an admin directory
snap_system_projects_directory = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'snap_projects')
xml_files = [os.path.join(snap_system_projects_directory, f)
for f in os.listdir(snap_system_projects_directory) if f.endswith('.xml')]
for xml_file in xml_files:
dst = os.path.join(get_snap_user_projects_directory(), os.path.basename(xml_file))
logger.info('Copy snap project from {}, to {}'.format(xml_file, dst))
shutil.copyfile(xml_file, dst)
else:
os.chdir(path)
xml_files = [f for f in os.listdir('.') if f.endswith(snap_extension)]
for filename in xml_files:
with open(filename, 'r') as xf:
xml = xf.read()
with open(filename, 'w') as xf:
xml = re.sub(r'''[\s\S]*?<\/l><\/variable>''',
'''{}'''.format(host), xml)
xml = re.sub(r'''[\s\S]*?<\/l><\/variable>''',
'''{}'''.format(port), xml)
xf.write(xml)
os.chdir(localdir)
class SnapRobotServer(AbstractServer):
def __init__(self, robot, host='0.0.0.0', port='6969', quiet=True):
AbstractServer.__init__(self, robot, host, port)
self.quiet = quiet
self.app = bottle.Bottle()
self.app.install(EnableCors())
rr = self.restfull_robot
# Copy Snap files from system directory to user directory. It avoids
# right issue while PyPot is installed from pip in an admin directory
snap_system_projects_directory = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'snap_projects')
xml_files = [os.path.join(snap_system_projects_directory, f)
for f in os.listdir(snap_system_projects_directory) if f.endswith('.xml')]
for xml_file in xml_files:
dst = os.path.join(get_snap_user_projects_directory(), os.path.basename(xml_file))
class MotorsRegistersHandler(PoppyRequestHandler):
def get(self, register_name):
motors_list = self.restful_robot.get_motors_list('motors')
registers_motors = {}
for motor_name in motors_list:
registers_motors[motor_name] = {
register_name: self.restful_robot.get_motor_register_value(
motor_name, register_name)
}
self.write_json(registers_motors)
class HTTPRobotServer(AbstractServer):
"""Refer to the REST API for an exhaustive list of the possible routes."""
def __init__(self, robot, host='0.0.0.0', port='8080', cross_domain_origin='*', **kwargs):
AbstractServer.__init__(self, robot, host, port)
def make_app(self):
PoppyRequestHandler.restful_robot = self.restful_robot
return Application([
(r'/(robot\.json)?', IndexHandler),
(r'/motor/alias/list\.json', MotorsAliasesListHandler),
(r'/motor/(?P[a-zA-Z0-9_]+)/?list\.json', MotorsListHandler),
(r'/sensor/list\.json', SensorsListHandler),
(r'/motor/(?P[a-zA-Z0-9_]+)/register/list\.json', MotorRegistersListHandler),
(r'/sensor/(?P[a-zA-Z0-9_]+)/register/list\.json', MotorRegistersListHandler),
(r'/motor/(?P[a-zA-Z0-9_]+)/register/(?P[a-zA-Z0-9_]+)/list\.json', MotorRegisterHandler),
'led': m.led,
'present_temperature': m.present_temperature,
}
for m in self.robot.motors
}
self.write_message(json.dumps(state))
def handle_command(self, command):
for motor, values in command.items():
m = getattr(self.robot, motor)
for register, value in values.items():
setattr(m, register, value)
class WsRobotServer(AbstractServer):
def __init__(self, robot, host='0.0.0.0', port='9009', quiet=True):
AbstractServer.__init__(self, robot, host, port)
WsSocketHandler.robot = robot
WsSocketHandler.quiet = quiet
def run(self):
loop = IOLoop()
app = Application([
(r'/', WsSocketHandler)
])
app.listen(self.port)
loop.start()
def __init__(self, robot, host='0.0.0.0', port='8080', cross_domain_origin='*', **kwargs):
AbstractServer.__init__(self, robot, host, port)
import zmq
import json
import logging
from .server import AbstractServer
logger = logging.getLogger(__name__)
class ZMQRobotServer(AbstractServer):
def __init__(self, robot, host, port):
""" A ZMQServer allowing remote access of a robot instance.
The server used the REQ/REP zmq pattern. You should always first send a request and then read the answer.
"""
AbstractServer.__init__(self, robot, host, port)
c = zmq.Context()
self.socket = c.socket(zmq.REP)
self.socket.bind('tcp://{}:{}'.format(self.host, self.port))
logger.info('Starting ZMQServer on tcp://%s:%s', self.host, self.port)
def run(self):
""" Run an infinite REQ/REP loop. """