Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def is_atx_agent_outdated(self):
agent_version = self._device.shell("/data/local/tmp/atx-agent version").strip()
if agent_version == "dev":
self.logger.info("skip version check for atx-agent dev")
return False
# semver major.minor.patch
try:
real_ver = list(map(int, agent_version.split(".")))
want_ver = list(map(int, __atx_agent_version__.split(".")))
except ValueError:
return True
self.logger.debug("Real version: %s, Expect version: %s", real_ver,
want_ver)
if real_ver[:2] != want_ver[:2]:
return True
return real_ver[2] < want_ver[2]
'armeabi-v7a': 'atx-agent_{v}_linux_armv7.tar.gz',
'arm64-v8a': 'atx-agent_{v}_linux_armv7.tar.gz',
'armeabi': 'atx-agent_{v}_linux_armv6.tar.gz',
'x86': 'atx-agent_{v}_linux_386.tar.gz',
}
name = None
for abi in self.abis:
name = files.get(abi)
if name:
break
if not name:
raise Exception(
"arch(%s) need to be supported yet, please report an issue in github"
% self.abis)
return GITHUB_BASEURL + '/atx-agent/releases/download/%s/%s' % (
__atx_agent_version__, name.format(v=__atx_agent_version__))
def _atx_agent_check(self):
""" check atx-agent health status and version """
try:
version = self._reqsess.get(self.path2url('/version'),
timeout=5).text
if version != __atx_agent_version__:
warnings.warn('Version dismatch, expect "%s" actually "%s"' %
(__atx_agent_version__, version),
Warning,
stacklevel=2)
# Cancel bellow code to make connect() return faster.
# launch service to prevent uiautomator killed by Android system
# self.adb_shell('am', 'startservice', '-n', 'com.github.uiautomator/.Service')
except (requests.ConnectionError, ) as e:
raise EnvironmentError(
"atx-agent is not responding, need to init device first")
def _install_atx_agent(self, server_addr=None):
self.logger.info("Install atx-agent %s", __atx_agent_version__)
self.push_url(self.atx_agent_url,
tgz=True,
extract_name="atx-agent")
args = ["/data/local/tmp/atx-agent", "server", "--nouia", "-d"]
if server_addr:
args.extend(['-t', server_addr])
self.shell("/data/local/tmp/atx-agent", "server", "--stop")
self.shell(*args)
async def async_main():
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
# yapf: disable
parser.add_argument('-s', '--server', default='localhost:4000', help='server address')
parser.add_argument("--allow-remote", action="store_true", help="allow remote connect device")
parser.add_argument('-t', '--test', action="store_true", help="run test code")
parser.add_argument('-p', '--port', type=int, default=3500, help='listen port')
parser.add_argument("--atx-agent-version", default=u2.version.__atx_agent_version__, help="set atx-agent version")
parser.add_argument("--owner", type=str, help="provider owner email")
parser.add_argument("--owner-file", type=argparse.FileType("r"), help="provider owner email from file")
args = parser.parse_args()
# yapf: enable
settings.atx_agent_version = args.atx_agent_version
owner_email = args.owner
if args.owner_file:
with args.owner_file as file:
owner_email = file.read().strip()
logger.info("Owner: %s", owner_email)
if args.test:
for apk_name in ("cloudmusic.apk", ): # , "apkinfo.exe"):
apk_path = "testdata/" + apk_name
for arch in ("386", "amd64", "armv6", "armv7"):
storepath = tmpdir + "/atx-agent-%s.tar.gz" % arch
url = get_binary_url(version, arch)
mirror_download(url, storepath)
with tarfile.open(storepath, "r:gz") as t:
t.extract("atx-agent", path=tmpdir+"/"+arch)
z.write(
"/".join([tmpdir, arch, "atx-agent"]), "atx-agent-"+arch)
shutil.move(tmp_target_zip, target_zip)
print(">>> Zip created", target_zip)
if __name__ == "__main__":
from uiautomator2.version import __atx_agent_version__
create_bundle(__atx_agent_version__)