Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_check_pio_upgrade(clirunner, isolated_pio_home, validate_cliresult):
def _patch_pio_version(version):
maintenance.__version__ = version
cmd_upgrade.VERSION = version.split(".", 3)
interval = int(app.get_setting("check_platformio_interval")) * 3600 * 24
last_check = {"platformio_upgrade": time() - interval - 1}
origin_version = maintenance.__version__
# check development version
_patch_pio_version("3.0.0-a1")
app.set_state_item("last_check", last_check)
result = clirunner.invoke(cli_pio, ["platform", "list"])
validate_cliresult(result)
assert "There is a new version" in result.output
assert "Please upgrade" in result.output
# check stable version
_patch_pio_version("2.11.0")
app.set_state_item("last_check", last_check)
result = clirunner.invoke(cli_pio, ["platform", "list"])
validate_cliresult(result)
def test_settings_check(clirunner, validate_cliresult):
result = clirunner.invoke(cli, ["get"])
validate_cliresult(result)
assert result.output
for item in app.DEFAULT_SETTINGS.items():
assert item[0] in result.output
def fetch_content(uri, data=None, headers=None, cache_valid=None):
if not headers:
headers = {
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) "
"AppleWebKit/603.3.8 (KHTML, like Gecko) Version/10.1.2 "
"Safari/603.3.8"
)
}
cache_key = app.ContentCache.key_from_args(uri, data) if cache_valid else None
with app.ContentCache() as cc:
if cache_key:
result = cc.get(cache_key)
if result is not None:
defer.returnValue(result)
# check internet before and resolve issue with 60 seconds timeout
util.internet_on(raise_exception=True)
session = helpers.requests_session()
if data:
r = yield session.post(uri, data=data, headers=headers)
else:
r = yield session.get(uri, headers=headers)
r.raise_for_status()
def cli(
environment,
project_dir,
project_conf,
pattern,
flags,
severity,
silent,
verbose,
json_output,
fail_on_defect,
):
app.set_session_var("custom_project_conf", project_conf)
# find project directory on upper level
if isfile(project_dir):
project_dir = find_project_dir_above(project_dir)
results = []
with fs.cd(project_dir):
config = ProjectConfig.get_instance(project_conf)
config.validate(environment)
default_envs = config.default_envs()
for envname in config.envs():
skipenv = any(
[
environment and envname not in environment,
not environment and default_envs and envname not in default_envs,
def resend_backuped_reports():
tm = app.get_state_item("telemetry", {})
if "backup" not in tm or not tm["backup"]:
return False
for report in tm["backup"]:
mp = MeasurementProtocol()
for key, value in report.items():
mp[key] = value
mp.send(report["t"])
# clean
tm["backup"] = []
app.set_state_item("telemetry", tm)
return True
def unpack(source_path, dest_dir):
with_progress = not app.is_disabled_progressbar()
try:
with FileUnpacker(source_path) as fu:
return fu.unpack(dest_dir, with_progress=with_progress)
except IOError as e:
if not with_progress:
raise e
with FileUnpacker(source_path) as fu:
return fu.unpack(dest_dir, with_progress=False)
def load_state():
with app.State(AppRPC.APPSTATE_PATH, lock=True) as state:
storage = state.get("storage", {})
# base data
caller_id = app.get_session_var("caller_id")
storage['cid'] = app.get_cid()
storage['coreVersion'] = __version__
storage['coreSystype'] = util.get_systype()
storage['coreCaller'] = (str(caller_id).lower()
if caller_id else None)
storage['coreSettings'] = {
name: {
"description": data['description'],
"default_value": data['value'],
"value": app.get_setting(name)
}
for name, data in app.DEFAULT_SETTINGS.items()
}
storage['homeDir'] = expanduser("~")
storage['projectsDir'] = storage['coreSettings']['projects_dir'][
'value']
if not pkg_dir:
continue
latest = pm.outdated(pkg_dir, requirements)
if (not latest and not PlatformFactory.newPlatform(
pkg_dir).are_outdated_packages()):
continue
data = _get_installed_platform_data(pkg_dir,
with_boards=False,
expose_packages=False)
if latest:
data['versionLatest'] = latest
result.append(data)
return click.echo(dump_json_to_unicode(result))
# cleanup cached board and platform lists
app.clean_cache()
for platform in platforms:
click.echo(
"Platform %s" %
click.style(pkg_dir_to_name.get(platform, platform), fg="cyan"))
click.echo("--------")
pm.update(platform, only_packages=only_packages, only_check=only_check)
click.echo()
return True
@cli.command("show", short_help="Show details about installed platform")
@click.argument("platform")
@click.pass_context
def platforms_show(ctx, platform):
installed_platforms = PlatformFactory.get_platforms(
installed=True).keys()
if platform not in installed_platforms:
if (not app.get_setting("enable_prompts") or
click.confirm("The platform '%s' has not been installed yet. "
"Would you like to install it now?" % platform)):
ctx.invoke(platforms_install, platforms=[platform])
else:
raise PlatformNotInstalledYet(platform)
p = PlatformFactory.newPlatform(platform)
click.echo("{name:<20} - {description} [ {url} ]".format(
name=click.style(p.get_type(), fg="cyan"),
description=p.get_description(), url=p.get_vendor_url()))
installed_packages = PackageManager.get_installed()
for name in p.get_installed_packages():
data = installed_packages[name]
pkgalias = p.get_package_alias(name)
click.echo("----------")