Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@on_method_ready('start')
def get_ready(self):
pass
@on_method_ready('start')
def get_ready(self):
pass
@on_method_ready('install_or_upgrade')
def _setup_stream_server(self):
"""
Setup stream server
Returns:
adb shell process, non-blocking stream reader and local port
"""
localport, deviceport = self.adb.setup_forward("localabstract:javacap_{}".format)
deviceport = deviceport[len("localabstract:"):]
# setup agent proc
apkpath = self.adb.path_app(self.APP_PKG)
cmds = ["CLASSPATH=" + apkpath, 'exec', 'app_process', '/system/bin', self.SCREENCAP_SERVICE,
"--scale", "100", "--socket", "%s" % deviceport, "-lazy", "2>&1"]
proc = self.adb.start_shell(cmds)
# check proc output
@on_method_ready('install_or_upgrade')
def start_recording(self, max_time=1800, bit_rate=None, vertical=None):
"""
Start screen recording
Args:
max_time: maximum rate value, default is 1800
bit_rate: bit rate value, default is None
vertical: vertical parameters, default is None
Raises:
RuntimeError: if any error occurs while setup the recording
Returns:
None if recording did not start, otherwise True
"""
@on_method_ready('install_and_setup')
def operate(self, args):
"""
Perform down, up and move actions
Args:
args: action arguments, dictionary containing type and x, y coordinates, e.g.::
{
"type" : "down",
"x" : 10,
"y" : 10
}
Raises:
RuntimeError: is invalid arguments are provided
@on_method_ready('install_or_upgrade')
def _get_stream(self, lazy=True):
proc, nbsp, localport = self._setup_stream_server(lazy=lazy)
s = SafeSocket()
s.connect((self.adb.host, localport))
t = s.recv(24)
# minicap header
global_headers = struct.unpack("<2B5I2B", t)
LOGGING.debug(global_headers)
# check quirk-bitflags, reference: https://github.com/openstf/minicap#quirk-bitflags
ori, self.quirk_flag = global_headers[-2:]
if self.quirk_flag & 2 and ori in (1, 3):
# resetup
LOGGING.debug("quirk_flag found, going to resetup")
stopping = True
else:
@on_method_ready('install_or_upgrade')
def get_stream(self, lazy=True):
"""
Get stream, it uses `adb forward`and socket communication. Use minicap ``lazy``mode (provided by gzmaruijie)
for long connections - returns one latest frame from the server
Args:
lazy: True or False
Returns:
"""
gen = self._get_stream(lazy)
# if quirk error, restart server and client once
stopped = next(gen)
@on_method_ready('install_and_setup')
def swipe(self, tuple_from_xy, tuple_to_xy, duration=0.8, steps=5):
"""
Perform swipe event.
Args:
tuple_from_xy: start point
tuple_to_xy: end point
duration: time interval for swipe duration, default is 0.8
steps: size of swipe step, default is 5
Returns:
None
"""
swipe_events = [DownEvent(tuple_from_xy), SleepEvent(0.1)]
@on_method_ready('install_and_setup')
def pinch(self, center=None, percent=0.5, duration=0.5, steps=5, in_or_out='in'):
"""
Perform pinch action
minitouch protocol example::
d 0 0 100 50
d 1 100 0 50
c
m 0 10 90 50
m 1 90 10 50
c
m 0 20 80 50
m 1 80 20 50
c
m 0 20 80 50
@on_method_ready('install_and_setup')
def touch(self, tuple_xy, duration=0.01):
"""
Perform touch event
minitouch protocol example::
d 0 10 10 50
c
u 0
c
Args:
tuple_xy: coordinates (x, y)
duration: time interval for touch event, default is 0.01