Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_json_serialize(self):
class JSONMapAttribute(MapAttribute):
arbitrary_data = JSONAttribute()
def __eq__(self, other):
return self.arbitrary_data == other.arbitrary_data
item = {'foo': 'bar', 'bool': True, 'number': 3.141}
json_map = JSONMapAttribute(arbitrary_data=item)
serialized = json_map.serialize(json_map)
deserialized = json_map.deserialize(serialized)
assert isinstance(deserialized, JSONMapAttribute)
assert deserialized == json_map
assert deserialized.arbitrary_data == item
def test_control_chars(self):
"""
JSONAttribute with control chars
"""
attr = JSONAttribute()
item = {'foo\t': 'bar\n', 'bool': True, 'number': 3.141}
encoded = six.u(json.dumps(item))
assert attr.deserialize(encoded) == item
def test_quoted_json(self):
attr = JSONAttribute()
serialized = attr.serialize('\\t')
assert attr.deserialize(serialized) == '\\t'
serialized = attr.serialize('"')
assert attr.deserialize(serialized) == '"'
def test_json_attribute(self):
"""
JSONAttribute.default
"""
attr = JSONAttribute()
assert attr is not None
assert attr.attr_type == STRING
attr = JSONAttribute(default={})
assert attr.default == {}
def test_json_attribute(self):
"""
JSONAttribute.default
"""
attr = JSONAttribute()
assert attr is not None
assert attr.attr_type == STRING
attr = JSONAttribute(default={})
assert attr.default == {}
from esser.constants import AGGREGATE_KEY_DELIMITER
class DynamoDBEventModel(Model):
"""
A DynamoDB Event model
"""
class Meta:
region = os.environ.get('AWS_REGION', 'ap-southeast-2')
table_name = os.environ.get("EVENT_TABLE", "events")
host = os.environ.get('DYNAMODB_HOST', None)
aggregate_name = UnicodeAttribute(hash_key=True)
aggregate_key = UnicodeAttribute(range_key=True)
event_type = UnicodeAttribute()
created_at = UTCDateTimeAttribute()
event_data = JSONAttribute()
@classmethod
def _conditional_operator_check(cls, conditional_operator):
pass
@property
def aggregate_id(self):
return self.aggregate_key.split(AGGREGATE_KEY_DELIMITER)[0]
@property
def version(self):
return int(self.aggregate_key.split(
AGGREGATE_KEY_DELIMITER
)[1])
@convert_pynamo_attribute.register(attributes.JSONAttribute)
def convert_json_to_string(type, attribute, registry=None):
return JSONString(description=attribute.attr_name, required=not attribute.null)
class StoreModel(Model):
"""
Represents a key-value store in a DynamoDB.
"""
class Meta:
"""Meta class for Store."""
table_name = "cla-{}-store".format(stage)
if stage == "local":
host = "http://localhost:8000"
write_capacity_units = int(cla.conf["DYNAMO_WRITE_UNITS"])
read_capacity_units = int(cla.conf["DYNAMO_READ_UNITS"])
key = UnicodeAttribute(hash_key=True)
value = JSONAttribute()
expire = NumberAttribute()
class Store(key_value_store_interface.KeyValueStore):
"""
ORM-agnostic wrapper for the DynamoDB key-value store model.
"""
def __init__(self):
super(Store).__init__()
def set(self, key, value):
model = StoreModel()
model.key = key
model.value = value
model.expire = self.get_expire_timestamp()
signature_user_ccla_company_id = UnicodeAttribute(null=True)
signature_acl = UnicodeSetAttribute(default=set())
signature_project_index = ProjectSignatureIndex()
signature_reference_index = ReferenceSignatureIndex()
signature_envelope_id = UnicodeAttribute(null=True)
# Callback type refers to either Gerrit or GitHub
signature_return_url_type = UnicodeAttribute(null=True)
note = UnicodeAttribute(null=True)
signature_project_external_id = UnicodeAttribute(null=True)
signature_company_signatory_id = UnicodeAttribute(null=True)
signature_company_signatory_name = UnicodeAttribute(null=True)
signature_company_signatory_email = UnicodeAttribute(null=True)
signature_company_initial_manager_id = UnicodeAttribute(null=True)
signature_company_initial_manager_name = UnicodeAttribute(null=True)
signature_company_initial_manager_email = UnicodeAttribute(null=True)
signature_company_secondary_manager_list = JSONAttribute(null=True)
signature_company_signatory_index = SignatureCompanySignatoryIndex()
signature_company_initial_manager_index = SignatureCompanyInitialManagerIndex()
project_signature_external_id_index = SignatureProjectExternalIndex()
# whitelists are only used by CCLAs
domain_whitelist = ListAttribute(null=True)
email_whitelist = ListAttribute(null=True)
github_whitelist = ListAttribute(null=True)
github_org_whitelist = ListAttribute(null=True)
class Signature(model_interfaces.Signature): # pylint: disable=too-many-public-methods
"""
ORM-agnostic wrapper for the DynamoDB Signature model.
"""
"""
class Meta:
table_name = settings.value.DYNAMODB_TABLE_HOSTS
if settings.value.APPLICATION_ENV == 'development':
host = settings.value.DYNAMODB_URL
session_cls = CustomPynamoSession
service = UnicodeAttribute(hash_key=True)
ip_address = UnicodeAttribute(range_key=True)
service_repo_name_index = ServiceRepoNameIndex()
service_repo_name = UnicodeAttribute(null=True)
port = NumberAttribute()
last_check_in = UTCDateTimeAttribute()
revision = UnicodeAttribute()
tags = JSONAttribute()