Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _read_etree_element(parent_element, child_element_name, target_object, target_field_name, converter):
child_element = parent_element.find('./sb:{0}'.format(child_element_name), _etree_sb_feed_namespaces)
if child_element is not None:
field_value = _get_etree_text(child_element)
if converter is not None:
field_value = converter(field_value)
setattr(target_object, target_field_name, field_value)
return True
return False
('ClaimType', 'claim_type', None),
('ClaimValue', 'claim_value', None),
('ModifiedTime', 'modified_time', None),
('CreatedTime', 'created_time', None),
('KeyName', 'key_name', None),
('PrimaryKey', 'primary_key', None),
('SecondaryKey', 'secondary_key', None),
]
for map in mappings:
_read_etree_element(rule_node, map[0], rule, map[1], map[2])
rights_nodes = rule_node.find('./sb:Rights', _etree_sb_feed_namespaces)
if rights_nodes is not None:
for access_rights_node in rights_nodes.findall('./sb:AccessRights', _etree_sb_feed_namespaces):
node_value = _get_etree_text(access_rights_node)
if node_value:
rule.rights.append(node_value)
hub.authorization_rules.append(rule)
if invalid_event_hub:
raise WindowsAzureError(_ERROR_EVENT_HUB_NOT_FOUND)
# extract id, updated and name value from feed entry and set them of queue.
for name, value in _ETreeXmlToObject.get_entry_properties_from_element(
entry_element, True).items():
if name == 'name':
value = value.partition('?')[0]
setattr(hub, name, value)
return hub
('SizeInBytes', 'size_in_bytes', int),
('MessageRetentionInDays', 'message_retention_in_days', int),
('Status', 'status', None),
('UserMetadata', 'user_metadata', None),
('PartitionCount', 'partition_count', int),
('EntityAvailableStatus', 'entity_available_status', None),
]
for map in mappings:
if _read_etree_element(hub_element, map[0], hub, map[1], map[2]):
invalid_event_hub = False
ids = hub_element.find('./sb:PartitionIds', _etree_sb_feed_namespaces)
if ids is not None:
for id_node in ids.findall('./arrays:string', _etree_sb_feed_namespaces):
value = _get_etree_text(id_node)
if value:
hub.partition_ids.append(value)
rules_nodes = hub_element.find('./sb:AuthorizationRules', _etree_sb_feed_namespaces)
if rules_nodes is not None:
invalid_event_hub = False
for rule_node in rules_nodes.findall('./sb:AuthorizationRule', _etree_sb_feed_namespaces):
rule = AuthorizationRule()
mappings = [
('ClaimType', 'claim_type', None),
('ClaimValue', 'claim_value', None),
('ModifiedTime', 'modified_time', None),
('CreatedTime', 'created_time', None),
('KeyName', 'key_name', None),
('PrimaryKey', 'primary_key', None),