Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def query(self, query):
try:
result = jq(query).transform(self.data(), multiple_output = True)
except:
return Knead(None, is_data = True)
if (isinstance(result, list) and len(result) == 1):
result = result[0]
return Knead(result, is_data = True)
if not sg_id:
security_group = ec2.create_security_group(
GroupName=security_group_name,
Description='EC2 security group - created by Ec2InstanceMaker',
VpcId=vpc_id
)
if base_os == 'windows2019':
add_inbound_security_group_rule(region, security_group, "tcp", "0.0.0.0/0", 3389, 3389)
else:
add_inbound_security_group_rule(region, security_group, "tcp", "0.0.0.0/0", 22, 22)
sg_id = list(ec2.security_groups.filter(Filters=filters))
if enable_efs == 'true':
subnet_filter = [ { 'Name': 'availability-zone', 'Values': [az] } ]
subnet_cidr_raw = ec2_client.describe_subnets(Filters=[({'Name': 'availability-zone', 'Values': [az]})])
subnet_cidr_json = json.dumps(subnet_cidr_raw)
subnet_cidr_block = jq('del(.ResponseMetadata) | .Subnets[].CidrBlock').transform(text=subnet_cidr_json, text_output=True)
subnet_cidr_block = subnet_cidr_block.replace('"', '')
add_inbound_security_group_rule(region, security_group, "tcp", subnet_cidr_block, 2049, 2049)
if enable_fsx == 'true':
subnet_filter = [ { 'Name': 'availability-zone', 'Values': [az] } ]
subnet_cidr_raw = ec2_client.describe_subnets(Filters=[({'Name': 'availability-zone', 'Values': [az]})])
subnet_cidr_json = json.dumps(subnet_cidr_raw)
subnet_cidr_block = jq('del(.ResponseMetadata) | .Subnets[].CidrBlock').transform(text=subnet_cidr_json, text_output=True)
subnet_cidr_block = subnet_cidr_block.replace('"', '')
add_inbound_security_group_rule(region, security_group, "tcp", subnet_cidr_block, 988, 988)
p_val('security_group', debug_mode)
# Parse and validate the selected security group (vpc_security_group_ids).
v_sg_id = str(*sg_id).split("'")
vpc_security_group_ids = v_sg_id[1]
p_val('vpc_security_group_ids', debug_mode)
fsx_s3_import_bucket: fsx_s3_import_path,
fsx_s3_export_bucket: fsx_s3_export_path
}
else:
fsx_s3_bucket_paths = {
fsx_s3_import_bucket: fsx_s3_import_path,
fsx_s3_export_bucket: fsx_s3_export_path
}
print('Setting S3-Lustre import path to: s3://' + fsx_s3_import_bucket + '/' + fsx_s3_import_path)
print('Setting Lustre-S3 export path to: s3://' + fsx_s3_export_bucket + '/' + fsx_s3_export_path)
for fsx_s3_bucket, fsx_s3_path in fsx_s3_bucket_paths.items():
check_fsx_s3_path = s3_client.list_objects_v2(
Bucket=fsx_s3_bucket,
Prefix=fsx_s3_path
)
check_fsx_s3_object_count = jq('.KeyCount').transform(text=json.dumps(check_fsx_s3_path, default=str), text_output=True)
if int(check_fsx_s3_object_count) == 0:
error_msg = 'Please ensure s3://' + fsx_s3_bucket + '/' + fsx_s3_path + ' exists!'
refer_to_docs_and_quit(error_msg)
else:
p_val('fsx_s3_bucket', debug_mode)
p_val('fsx_s3_path', debug_mode)
# Check to ensure external NFS support has been properly enabled.
if (enable_external_nfs == 'true') and (external_nfs_server == ''):
error_msg='Missing: valid setting for "--external_nfs_server"'
refer_to_docs_and_quit(error_msg)
else:
p_val('enable_external_nfs', debug_mode)
p_val('external_nfs_server', debug_mode)
return IRCResponse(ResponseType.Say, u"Not enough parameters, usage: {}".format(self.help), message.ReplyTo)
url, jqfilter = (message.ParameterList[0], u" ".join(message.ParameterList[1:]))
if not re.match(r'^\w+://', url):
url = u"http://{}".format(url)
if 'jq' in message.Metadata and url in message.Metadata['jq']:
page = message.Metadata['jq'][url]
else:
page = web.fetchURL(url)
if page is None:
return IRCResponse(ResponseType.Say, u"Problem fetching {}".format(url), message.ReplyTo)
try:
value = jq(jqfilter).transform(text=page.body)
except ValueError as e:
response = re.sub(r'[\r\n]+', u' ', e.message)
return IRCResponse(ResponseType.Say, response, message.ReplyTo)
if value is None:
return IRCResponse(ResponseType.Say,
u"{} does not match a value".format(jqfilter),
message.ReplyTo)
if isinstance(value, dict):
return IRCResponse(ResponseType.Say,
u"{} matches a dict".format(jqfilter),
message.ReplyTo)
if isinstance(value, list):
return IRCResponse(ResponseType.Say,
u"{} matches a list".format(jqfilter),
message.ReplyTo)
]
)
try:
kubernetes_maintenance_check = (
config.get('kubernetes_maintenance_check') or
loads(OPERATOR_UI_MAINTENANCE_CHECK)
)
if (
kubernetes_maintenance_check and
{'url', 'query'} <= kubernetes_maintenance_check.keys()
):
config['kubernetes_in_maintenance'] = (
jq(kubernetes_maintenance_check['query']).transform(
requests.get(
kubernetes_maintenance_check['url'],
headers=service_auth_header(),
).json(),
)
)
except ValueError:
exception('Could not determine Kubernetes cluster status')
return ok(config)
def format_jq(output, fmt):
try:
import jq
except ImportError:
if fmt == '.':
return output
raise ImportError(
'To use `-f jq`, you must install the optional jq dependency.\n'
'`pip install jq`\n',
'Note that some platforms may require additional programs to '
'build jq from source (like `libtool`).\n'
'See https://pypi.org/project/jq/ for instructions.'
)
results = []
for x in jq.jq(fmt).transform(output, multiple_output=True):
if x not in (None, ''):
if isinstance(x, str):
results.append(x)
else:
results.append(json.dumps(x))
return '\n'.join(results)
ClientRequestToken=instance_serial_number,
FileSystemType='LUSTRE',
StorageCapacity=fsx_size,
SubnetIds=[subnet_id,],
SecurityGroupIds=[vpc_security_group_ids,],
Tags=LustreTags,
LustreConfiguration=Lustre_Fsx_Configuration
)
# Print some interesting information to the console for operator convenience.
# Add a 60-second timer to ensure the instance(s) don't become available prior
# to the Lustre file system reaching a mountable state.
make_lustre_file_system_json = json.dumps(make_lustre_file_system, default=str)
lustre_file_system_dnsName = jq('del(.ResponseMetadata) | .FileSystem.DNSName').transform(text=make_lustre_file_system_json, text_output=True)
lustre_file_system_id = jq('del(.ResponseMetadata) | .FileSystem.FileSystemId').transform(text=make_lustre_file_system_json, text_output=True)
print('Lustre file system ID: ' + lustre_file_system_id.replace('\"', '').strip())
print('Lustre file system size (TB): ' + str(fsx_size))
print('Building the Lustre file system:')
time_waiter(60, 3)
print('')
print('Finished building: ' + lustre_file_system_dnsName.replace('\"', '').strip())
# Create the new EC2 instance(s) with Terraform.
print('Invoking Terraform to build ' + instance_name + '...')
if debug_mode == 'true':
subprocess.run('TF_LOG=DEBUG terraform init -input=false', shell=True, cwd=instance_data_dir)
subprocess.run('TF_LOG=DEBUG terraform plan -out terraform_environment', shell=True, cwd=instance_data_dir)
subprocess.run('TF_LOG=DEBUG terraform apply \"terraform_environment\"', shell=True, cwd=instance_data_dir)
else:
subprocess.run('terraform init -input=false', shell=True, cwd=instance_data_dir)
def on_status(self, status):
row = dict([(key, jq.jq(key).transform(status._json)) for key in self._keys])
self._loader._on_event(row)
def cb(msg):
uri = msg.message.tbs.uri
namespace = ensure_b64encode(msg.message.tbs.namespace)
sent_timestamp = msg.message.timestamps
if len(sent_timestamp) == 0:
sent_timestamp = datetime.now()
else:
sent_timestamp = datetime.utcfromtimestamp(sent_timestamp[0])
values = []
for po in msg.message.tbs.payload:
x = xbos_pb2.XBOS.FromString(po.content)
x = MessageToDict(x)
values.append(jq.jq(path).transform(x))
callback(Response(namespace, uri, sent_timestamp, values))
await self.subscribe_msg(namespace, resource, cb, name=name)
def _compile_jq(jq_expr):
"""Compile JQ expression, returning a JQ program object
See: https://pypi.python.org/pypi/jq
"""
import jq
return jq.jq(jq_expr)