Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# initialize config in this order
default_objs = []
default_objs.append(configtypes["Main"](default_settings["Main"], default=True))
default_objs.append(configtypes["Target"](default_settings["Target"], "target", default=True))
order = ["Software", "TraceMethod", "HardwareClass", "HostConfig", "TargetStage", "PostProcess"]
for i in order:
for (name, v) in default_settings[i].iteritems():
if not (i == "TraceMethod" and name == "run"):
default_objs.append(configtypes[i](v, name, default=True))
with open(config, 'r') as f:
settings = toml.loads(f.read())
Main.raw = munchify(settings)
setup_special_fields(Main.raw)
configtypes["Main"](settings["Main"])
configtypes["Target"](settings["Target"], "target")
for i in order:
if i in settings.keys():
for (name, v) in settings[i].iteritems():
if not (i == "TraceMethod" and name == "run"):
configtypes[i](v, name)
for (k, v) in settings.iteritems():
if k not in configtypes.keys():
raise ConfigException("%s is not a valid config section group name" % k)
if "Main" not in registry.keys():
"""
try:
validate(definition, self._get_schema_from_file(schema_file))
except ValidationError:
raise ConfigException(
'Failed to validate interface with schema: {}'.format(
traceback.format_exc()))
try:
global_settings = self._load_global_setting(
definition.get('global_settings'), context
)
requests = [self._load_request(item) for item in definition['requests']]
return munchify({
'meta': munchify(definition['meta']),
'tokens': definition['tokens'],
'global_settings': global_settings,
'requests': requests,
})
except Exception as ex:
error = 'Unable to load configuration: %s' % str(ex)
_logger.exception(error)
raise ConfigException(error)
try:
if response.status_code in ('204',):
# "204 No Content"
retdata = {}
else:
# Decode the expected JSON
retdata = response.json()
except Exception as e:
# Invalid JSON payload.
msg = (u'HTTP Status Code: [{0}]; API response data for end-point [{1}] does not '
u'appear to be valid JSON. Cause: {2}.')
msg = msg.format(response.status_code, endpoint, e)
if debug_response:
utils.log_with_debug_info(logging.ERROR, msg + u' Data: [' + str(response.text) + u']')
raise InvalidJSONError(msg)
retdata = munch.munchify(retdata) if munchify else retdata
return (retdata[u'response'] if u'response' in retdata else retdata), response
def _spatialReference(self):
"""Gets the spatial reference dict."""
resp_d = {}
if SPATIAL_REFERENCE in self.json:
resp_d = self.json[SPATIAL_REFERENCE]
elif self.json.get(EXTENT) and SPATIAL_REFERENCE in self.json[EXTENT]:
resp_d = self.json[EXTENT][SPATIAL_REFERENCE]
elif GEOMETRIES in self.json:
try:
first = self.json.get(GEOMETRIES, [])[0]
resp_d = first.get(SPATIAL_REFERENCE) or {}
except IndexError:
pass
elif CRS in self.json:
resp_d = self.json.get(CRS, {})
return munch.munchify(resp_d)
codetreeFile = executedir + "/" + docname + ".codetree"
execution_count = 0
count = 0
if os.path.exists(codetreeFile):
with open(codetreeFile, "r", encoding="UTF-8") as f:
json_obj = json.load(f)
for cell in nb.cells:
if cell['cell_type'] == "code":
execution_count += 1
cellcopy = normalize_cell(cell.copy())
hashcode = create_hashcode(cellcopy)
if hashcode in json_obj:
output = json_obj[hashcode]['outputs']
cell['execution_count'] = execution_count
cell['outputs'] = munchify(output)
if 'hide-output' in cell['metadata']:
cell['outputs'] = []
return nb
def eval_feature_expression(feature, expression):
safe_dict = {'f': munchify(feature)}
safe_dict.update({
'sum': sum,
'pow': pow,
'min': min,
'max': max,
'math': math,
'bool': bool,
'int': partial(nullable, int),
'str': partial(nullable, str),
'float': partial(nullable, float),
'len': partial(nullable, len),
})
try:
from shapely.geometry import shape
safe_dict['shape'] = shape
except ImportError:
result['enabled_at_feature_service'] = self.updateDefinition({CAPABILITIES: capabilities, HAS_STATIC_DATA: False, EDITOR_TRACKING_INFO: editorInfo})
else:
result['enabled_at_feature_service'] = {'status': 'already enabled'}
# loop through layers and enable editor tracking
editFields = {"editFieldsInfo":{"creationDateField":"","creatorField":"","editDateField":"","editorField":""}}
for lyrDef in self.layers:
url = '/'.join([self.url, str(lyrDef.id)])
lyr = AGOLFeatureLayer(url, token=self.token)
status = lyr.addToDefinition(editFields)
result['layers'].append({
'id': lyr.id,
'name': lyr.name,
'result': status
})
return munch.munchify(result)
def _load_global_setting(self, candidate, variables):
"""
Load and render global setting with variables.
:param candidate: Global setting as a `dict`
:param variables: variables from context to render setting
:return: A `Munch` object
"""
candidate = candidate or {}
proxy_setting = self._load_proxy(candidate.get('proxy'), variables)
log_setting = self._load_logging(candidate.get('logging'), variables)
return munchify({'proxy': proxy_setting, 'logging': log_setting})
parser.add_argument("--just-bootstrap", default=False,
help="bootstrap and do not run main bot loop",
action='store_true')
args = parser.parse_args()
config = {}
for i, path in enumerate(iter_paths(iter_identify_paths(args.config))):
if i == 0:
print("Loading configuration from '%s'" % path, file=sys.stderr)
else:
print("Loading + merging configuration"
" from '%s'" % path, file=sys.stderr)
tmp_config = load_yaml_or_secret_yaml(path, force_secrets=False)
config = utils.merge_dict(config, tmp_config)
config = munch.munchify(config)
print("Configuration: %s" %
json.dumps(utils.mask_dict_password(config),
indent=4, sort_keys=True), file=sys.stderr)
secrets = {}
for i, path in enumerate(iter_paths(iter_identify_paths(args.secrets))):
if i == 0:
print("Loading secrets from '%s'" % path, file=sys.stderr)
else:
print("Loading + merging secrets"
" from '%s'" % path, file=sys.stderr)
tmp_secrets = load_yaml_or_secret_yaml(path, force_secrets=True)
secrets = utils.merge_dict(secrets, tmp_secrets)
secrets = munch.munchify(secrets)
tz = config.get("tz")
self._cookie = {AGS_TOKEN: self.token.token if isinstance(self.token, Token) else self.token}
if (not self.token or not self._cookie) and not self._proxy:
if self.url in ID_MANAGER.proxies:
self._proxy = ID_MANAGER.proxies[self.url]
# fetch url if this is a portal item
# if portalId:
# make sure token is passed in query string if agol or portal
if isinstance(self.token, Token):
if self.token.get(IS_AGOL) or self.token.get(IS_PORTAL):
params[TOKEN] = str(self.token)
self.raw_response = do_post(self.url, params, ret_json=False, token=self.token, cookies=self._cookie, proxy=self._proxy, referer=self._referer)
self.elapsed = self.raw_response.elapsed
self.response = self.raw_response.json()
self.json = munch.munchify(self.response)
RequestError(self.json)