Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from awscli.testutils import unittest
from botocore.compat import OrderedDict, json
from awscli.customizations.datapipeline import translator
# Thoughout these tests, 'df' refers to the condensed JSON definition format
# that the user provides and API refers to the format expected by the API.
class TestTranslatePipelineDefinitions(unittest.TestCase):
maxDiff = None
def load_def(self, json_string):
return json.loads(json_string, object_pairs_hook=OrderedDict)
def test_convert_schedule_df_to_api(self):
definition = self.load_def("""{"objects": [
{
"id" : "S3ToS3Copy",
"type" : "CopyActivity",
"schedule" : { "ref" : "CopyPeriod" },
"input" : { "ref" : "InputData" },
"output" : { "ref" : "OutputData" }
}
]}""")
actual = translator.definition_to_api(definition)
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import datetime
from awscli.customizations.s3.filegenerator import FileStat
from awscli.customizations.s3.syncstrategy.exacttimestamps import \
ExactTimestampsSync
from awscli.testutils import unittest
class TestExactTimestampsSync(unittest.TestCase):
def setUp(self):
self.sync_strategy = ExactTimestampsSync()
def test_compare_exact_timestamps_dest_older(self):
"""
Confirm that same-sized files are synced when
the destination is older than the source and
`exact_timestamps` is set.
"""
time_src = datetime.datetime.now()
time_dst = time_src - datetime.timedelta(days=1)
src_file = FileStat(src='', dest='',
compare_key='test.py', size=10,
last_update=time_src, src_type='s3',
dest_type='local', operation_name='download')
'type': 's3'},
'dir_op': True, 'use_src_name': True}
file_stats = FileGenerator(self.client, '', True).call(input_local_dir)
all_filenames = self.filenames + self.symlink_files
all_filenames.sort()
result_list = []
for file_stat in file_stats:
result_list.append(getattr(file_stat, 'src'))
self.assertEqual(len(result_list), len(all_filenames))
# Just check to make sure the right local files are generated.
for i in range(len(result_list)):
filename = six.text_type(os.path.abspath(all_filenames[i]))
self.assertEqual(result_list[i], filename)
class TestListFilesLocally(unittest.TestCase):
maxDiff = None
def setUp(self):
self.directory = six.text_type(tempfile.mkdtemp())
def tearDown(self):
shutil.rmtree(self.directory)
@mock.patch('os.listdir')
def test_error_raised_on_decoding_error(self, listdir_mock):
# On Python3, sys.getdefaultencoding
file_generator = FileGenerator(None, None, None)
# utf-8 encoding for U+2713.
listdir_mock.return_value = [b'\xe2\x9c\x93']
list(file_generator.list_files(self.directory, dir_op=True))
# Ensure the message was added to the result queue and is
class FakeError(Exception):
pass
class RecordingFormatter(Formatter):
def __init__(self, output=None, include=None, exclude=None):
super(RecordingFormatter, self).__init__(
output=output, include=include, exclude=exclude)
self.history = []
def _display(self, event_record):
self.history.append(event_record)
class TestFormatter(unittest.TestCase):
def test_display_no_filters(self):
formatter = RecordingFormatter()
event_record = {'event_type': 'my_event'}
other_event_record = {'event_type': 'my_other_event'}
formatter.display(event_record)
formatter.display(other_event_record)
self.assertEqual(
formatter.history,
[
event_record,
other_event_record
]
)
def test_include_specified_event_type(self):
formatter = RecordingFormatter(include=['my_event'])
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0e
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from mock import Mock
from awscli.testutils import unittest, BaseAWSCommandParamsTest
from awscli.customizations.s3.s3 import awscli_initialize, add_s3
class AWSInitializeTest(unittest.TestCase):
"""
This test ensures that all events are correctly registered such that
all of the commands can be run.
"""
def setUp(self):
self.cli = Mock()
def test_initialize(self):
awscli_initialize(self.cli)
reference = []
reference.append("building-command-table.main")
reference.append("building-command-table.sync")
for arg in self.cli.register.call_args_list:
self.assertIn(arg[0][0], reference)
import awscli
from argparse import Namespace
from mock import MagicMock, patch, call
from six import StringIO
from botocore.session import Session
from botocore.credentials import Credentials
from awscli.customizations.codecommit import CodeCommitGetCommand
from awscli.customizations.codecommit import CodeCommitCommand
from awscli.testutils import unittest, StringIOWithFileNo
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
class TestCodeCommitCredentialHelper(unittest.TestCase):
PROTOCOL_HOST_PATH = ('protocol=https\n'
'host=git-codecommit.us-east-1.amazonaws.com\n'
'path=/v1/repos/myrepo')
FIPS_PROTOCOL_HOST_PATH = ('protocol=https\n'
'host=git-codecommit-fips.us-east-1.amazonaws.com\n'
'path=/v1/repos/myrepo')
VPC_1_PROTOCOL_HOST_PATH = ('protocol=https\n'
'host=vpce-0b47ea360adebf88a-jkl88hez.git-codecommit.us-east-1.vpce.amazonaws.com\n'
'path=/v1/repos/myrepo')
VPC_2_PROTOCOL_HOST_PATH = ('protocol=https\n'
'host=vpce-0b47ea360adebf88a-jkl88hez-us-east-1a.git-codecommit.us-east-1.vpce.amazonaws.com\n'
'path=/v1/repos/myrepo')
size=self.threshold+1,
operation_name='upload',
client=self.client,
))
self.s3_handler_multi.call(tasks)
# Confirm UploadPart was called
self.assertIn("Completed 4 of 4 part(s)", self.output.getvalue())
# Confirm the files were uploaded.
response = self.client.list_objects(Bucket=self.bucket)
self.assertEqual(len(response.get('Contents', [])), 2)
class S3HandlerTestUnicodeMove(unittest.TestCase):
def setUp(self):
self.session = botocore.session.get_session(EnvironmentVariables)
self.client = self.session.create_client('s3', 'us-west-2')
self.source_client = self.session.create_client('s3', 'us-west-2')
params = {'region': 'us-west-2', 'acl': 'private', 'quiet': True}
self.s3_handler = S3Handler(self.session, params)
self.bucket = make_s3_files(self.session, key1=u'\u2713')
self.bucket2 = create_bucket(self.session)
self.s3_files = [self.bucket + '/' + u'\u2713']
self.s3_files2 = [self.bucket2 + '/' + u'\u2713']
def tearDown(self):
s3_cleanup(self.bucket, self.session)
s3_cleanup(self.bucket2, self.session)
def test_move_unicode(self):
import botocore.client
class FakeCompletionLookup(object):
def __init__(self, completion_data):
self.completion_data = completion_data
def get_server_completion_data(self, lineage, command_name, param_name):
key = (tuple(lineage), command_name, param_name)
try:
return self.completion_data[key]
except KeyError:
return None
class TestServerSideAutocompleter(unittest.TestCase):
def setUp(self):
self.index = InMemoryIndex({
'command_names': {
'': ['aws'],
'aws': ['ec2', 'iam', 's3'],
'aws.iam': ['delete-user-policy', 'delete-user'],
},
'arg_names': {
'': {
'aws': ['region', 'endpoint-url'],
},
'aws.iam': {
'delete-user-policy': ['policy-name'],
'delete-user': ['user-name'],
},
},
from contextlib import contextmanager
from mock import Mock, patch
from awscli.testutils import unittest, skip_if_windows
from awscli.testutils import capture_output
from awscli.compat import StringIO
from awscli.table import COLORAMA_KWARGS
import colorama
from colorama import Fore
from colorama import Back
@skip_if_windows('Posix color code tests')
class TestPosix(unittest.TestCase):
_ANSI_RESET_ALL = '\033[0m'
_ANSI_FORE_RED = '\033[31m'
_ANSI_FORE_BLUE = '\033[34m'
def test_fore_attributes(self):
self.assertEqual(Fore.BLACK, '\033[30m')
self.assertEqual(Fore.RED, '\033[31m')
self.assertEqual(Fore.GREEN, '\033[32m')
self.assertEqual(Fore.YELLOW, '\033[33m')
self.assertEqual(Fore.BLUE, '\033[34m')
self.assertEqual(Fore.MAGENTA, '\033[35m')
self.assertEqual(Fore.CYAN, '\033[36m')
self.assertEqual(Fore.WHITE, '\033[37m')
self.assertEqual(Fore.RESET, '\033[39m')
# Check the light, extended versions.
profile='profile-does-not-exist')
self.configure = configure.ConfigureCommand(session,
prompter=self.precanned,
config_writer=self.writer)
self.configure(args=[], parsed_globals=self.global_args)
self.assert_credentials_file_updated_with(
{'aws_access_key_id': 'new_value',
'aws_secret_access_key': 'new_value',
'__section__': 'profile-does-not-exist'})
self.writer.update_config.assert_called_with(
{'__section__': 'profile profile-does-not-exist',
'region': 'new_value',
'output': 'new_value'}, 'myconfigfile')
class TestInteractivePrompter(unittest.TestCase):
def setUp(self):
self.input_patch = mock.patch('awscli.compat.raw_input')
self.mock_raw_input = self.input_patch.start()
self.stdout = six.StringIO()
self.stdout_patch = mock.patch('sys.stdout', self.stdout)
self.stdout_patch.start()
def tearDown(self):
self.input_patch.stop()
self.stdout_patch.stop()
def test_access_key_is_masked(self):
self.mock_raw_input.return_value = 'foo'
prompter = configure.InteractivePrompter()
response = prompter.get_value(