Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testBucketConditionalSetXmlAcl(self):
b = self._MakeVersionedBucket()
k = b.new_key("foo")
s1 = "test1"
k.set_contents_from_string(s1)
g1 = k.generation
mg1 = k.meta_generation
self.assertEqual(str(mg1), "1")
acl_xml = (
'' +
'READ' +
'')
acl = ACL()
h = handler.XmlHandler(acl, b)
sax.parseString(acl_xml, h)
acl = acl.to_xml()
b.set_xml_acl(acl, key_name="foo")
k = b.get_key("foo")
g2 = k.generation
mg2 = k.meta_generation
self.assertEqual(g2, g1)
self.assertGreater(mg2, mg1)
with self.assertRaisesRegexp(ValueError, ("Received if_metageneration "
"argument with no "
"if_generation argument")):
b.set_xml_acl(acl, key_name="foo", if_metageneration=123)
def get_messages(self, num_messages=1, visibility_timeout=None):
path = '%s/front?NumberOfMessages=%d' % (self.id, num_messages)
if visibility_timeout:
path = '%s&VisibilityTimeout=%d' % (path, visibility_timeout)
response = self.connection.make_request('GET', path)
body = response.read()
if response.status >= 300:
raise SQSError(response.status, response.reason, body)
rs = ResultSet([('Message', self.message_class)])
h = XmlHandler(rs, self)
xml.sax.parseString(body, h)
return rs
def get_status(self, action, params, path='/', parent=None, verb='GET'):
if not parent:
parent = self
response = self.make_request(action, params, path, verb)
body = response.read()
boto.log.debug(body)
if response.status == 200:
rs = ResultSet()
h = handler.XmlHandler(rs, parent)
xml.sax.parseString(body, h)
return rs.status
else:
boto.log.error('%s %s' % (response.status, response.reason))
boto.log.error('%s' % body)
raise self.ResponseError(response.status, response.reason, body)
def write(self, message):
"""
Add a single message to the queue.
Inputs:
message - The message to be written to the queue
Returns:
None
"""
path = '%s/back' % self.id
message.queue = self
response = self.connection.make_request('PUT', path, None,
message.get_body_encoded())
body = response.read()
if response.status >= 300:
raise SQSError(response.status, response.reason, body)
handler = XmlHandler(message, self.connection)
xml.sax.parseString(body, handler)
return None
def get_storage_class(self):
"""
Returns the StorageClass for the bucket.
:rtype: str
:return: The StorageClass for the bucket.
"""
response = self.connection.make_request('GET', self.name,
query_args='storageClass')
body = response.read()
if response.status == 200:
rs = ResultSet(self)
h = handler.XmlHandler(rs, self)
xml.sax.parseString(body, h)
return rs.StorageClass
else:
raise self.connection.provider.storage_response_error(
response.status, response.reason, body)
def _create_object(self, config, resource, dist_class):
response = self.make_request('POST', '/%s/%s' % (self.Version, resource),
{'Content-Type' : 'text/xml'}, data=config.to_xml())
body = response.read()
if response.status == 201:
d = dist_class(connection=self)
h = handler.XmlHandler(d, self)
xml.sax.parseString(body, h)
return d
else:
raise CloudFrontServerError(response.status, response.reason, body)
def _get_all_objects(self, resource, tags, result_set_class=None,
result_set_kwargs=None):
if not tags:
tags = [('DistributionSummary', DistributionSummary)]
response = self.make_request('GET', '/%s/%s' % (self.Version,
resource))
body = response.read()
boto.log.debug(body)
if response.status >= 300:
raise CloudFrontServerError(response.status, response.reason, body)
rs_class = result_set_class or ResultSet
rs_kwargs = result_set_kwargs or dict()
rs = rs_class(tags, **rs_kwargs)
h = handler.XmlHandler(rs, self)
xml.sax.parseString(body, h)
return rs
def get_token_by_caller_reference(self, callerReference):
"""
Returns details about the token specified by 'CallerReference'.
"""
params ={}
params['CallerReference'] = callerReference
response = self.make_request("GetTokenByCaller", params)
body = response.read()
if(response.status == 200):
rs = ResultSet()
h = handler.XmlHandler(rs, self)
xml.sax.parseString(body, h)
return rs
else:
raise FPSResponseError(response.status, response.reason, body)
def set_queue_attribute(self, queue_url, attribute, value):
params = {'Attribute' : attribute, 'Value' : value}
response = self.make_request('SetQueueAttributes', params, queue_url)
body = response.read()
if response.status == 200:
rs = ResultSet()
h = handler.XmlHandler(rs, self)
xml.sax.parseString(body, h)
return rs.status
else:
raise SQSError(response.status, response.reason, body)
def _get_info(self, id, resource, dist_class):
uri = '/%s/%s/%s' % (self.Version, resource, id)
response = self.make_request('GET', uri)
body = response.read()
boto.log.debug(body)
if response.status >= 300:
raise CloudFrontServerError(response.status, response.reason, body)
d = dist_class(connection=self)
response_headers = response.msg
for key in response_headers.keys():
if key.lower() == 'etag':
d.etag = response_headers[key]
h = handler.XmlHandler(d, self)
xml.sax.parseString(body, h)
return d