Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_default_provider(self):
# Use environment variable to force specific credentials file
scoped_env = ScopedEnvironmentVariable('AWS_SHARED_CREDENTIALS_FILE', 'test/resources/credentials_test')
event_loop_group = awscrt.io.EventLoopGroup()
host_resolver = awscrt.io.DefaultHostResolver(event_loop_group)
bootstrap = awscrt.io.ClientBootstrap(event_loop_group, host_resolver)
provider = awscrt.auth.AwsCredentialsProvider.new_default_chain(bootstrap)
future = provider.get_credentials()
credentials = future.result(TIMEOUT)
self.assertEqual('credentials_test_access_key_id', credentials.access_key_id)
self.assertEqual('credentials_test_secret_access_key', credentials.secret_access_key)
self.assertIsNone(credentials.session_token)
del scoped_env
def test_default_provider(self):
# Use environment variable to force specific credentials file
scoped_env = ScopedEnvironmentVariable('AWS_SHARED_CREDENTIALS_FILE', 'test/resources/credentials_test')
event_loop_group = awscrt.io.EventLoopGroup()
host_resolver = awscrt.io.DefaultHostResolver(event_loop_group)
bootstrap = awscrt.io.ClientBootstrap(event_loop_group, host_resolver)
provider = awscrt.auth.AwsCredentialsProvider.new_default_chain(bootstrap)
future = provider.get_credentials()
credentials = future.result(TIMEOUT)
self.assertEqual('credentials_test_access_key_id', credentials.access_key_id)
self.assertEqual('credentials_test_secret_access_key', credentials.secret_access_key)
self.assertIsNone(credentials.session_token)
del scoped_env
else:
print('{} unsupported value for the verbose option'.format(args.verbose))
exit(-1)
log_output = 'stderr'
if args.trace:
log_output = args.trace
io.init_logging(log_level, log_output)
# an event loop group is needed for IO operations. Unless you're a server or a client doing hundreds of connections
# you only want one of these.
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
# client bootstrap knows how to connect all the pieces. In this case it also has the default dns resolver
# baked in.
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
url = urlparse(args.url)
port = 443
scheme = 'https'
if url.scheme is not None and url.scheme == 'http':
scheme = 'http'
if url.port is not None:
port = url.port
else:
if scheme == 'http':
parser.add_argument('--key', help="File path to your private key file, in PEM format")
parser.add_argument('--root-ca', help="File path to root certificate authority, in PEM format. " +
"Necessary if MQTT server uses a certificate that's not already in " +
"your trust store")
parser.add_argument('--client-id', default="test-" + str(uuid4()), help="Client ID for MQTT connection.")
parser.add_argument('--thing-name', required=True, help="The name assigned to your IoT Thing")
parser.add_argument('--job-time', default=5, type=float, help="Emulate working on job by sleeping this many seconds.")
parser.add_argument('--use-websocket', default=False, action='store_true',
help="To use a websocket instead of raw mqtt. If you " +
"specify this option you must specify a region for signing, you can also enable proxy mode.")
parser.add_argument('--signing-region', default='us-east-1', help="If you specify --use-web-socket, this " +
"is the region that will be used for computing the Sigv4 signature")
parser.add_argument('--proxy-host', help="Hostname for proxy to connect to. Note: if you use this feature, " +
"you will likely need to set --root-ca to the ca for your proxy.")
parser.add_argument('--proxy-port', type=int, default=8080, help="Port for proxy to connect to.")
parser.add_argument('--verbosity', choices=[x.name for x in io.LogLevel], default=io.LogLevel.NoLogs.name,
help='Logging level')
# Using globals to simplify sample code
is_sample_done = threading.Event()
mqtt_connection = None
jobs_client = None
thing_name = ""
class LockedData(object):
def __init__(self):
self.lock = threading.Lock()
self.disconnect_called = False
self.is_working_on_job = False
self.is_next_job_waiting = False