Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _convert_object_to_feed_entry(obj, rootName, content_writer):
updated_str = datetime.utcnow().isoformat()
if datetime.utcnow().utcoffset() is None:
updated_str += '+00:00'
writer = _XmlWriter()
writer.preprocessor('')
writer.start('entry', [
('xmlns:d', _XmlSchemas.DataServices, None),
('xmlns:m', _XmlSchemas.DataServicesMetadata, None),
('xmlns', _XmlSchemas.Atom, None),
])
writer.element('title', '')
writer.element('updated', updated_str)
writer.start('author')
writer.element('name', '')
writer.end('author')
writer.element('id', '')
writer.start('content', [('type', 'application/xml', None)])
writer.start(rootName, [
('xmlns:i', _XmlSchemas.SchemaInstance, None),
('xmlns', _XmlSchemas.ServiceBus, None),
])
if message_type == None:
message = Message(
respbody, service_instance, message_location, custom_properties,
'application/atom+xml;type=entry;charset=utf-8', broker_properties)
else:
message = Message(respbody, service_instance, message_location,
custom_properties, message_type, broker_properties)
return message
# convert functions
_etree_sb_feed_namespaces = {
'atom': _XmlSchemas.Atom,
'i': _XmlSchemas.SchemaInstance,
'sb': _XmlSchemas.ServiceBus,
'arrays': _XmlSchemas.SerializationArrays,
}
def _convert_response_to_rule(response):
root = ETree.fromstring(response.body)
return _convert_etree_element_to_rule(root)
def _convert_etree_element_to_rule(entry_element):
''' Converts entry element to rule object.
The format of xml for rule:
<content type="application/xml">
</content>
writer.start('entry', [
('xmlns:d', _XmlSchemas.DataServices, None),
('xmlns:m', _XmlSchemas.DataServicesMetadata, None),
('xmlns', _XmlSchemas.Atom, None),
])
writer.element('title', '')
writer.element('updated', updated_str)
writer.start('author')
writer.element('name', '')
writer.end('author')
writer.element('id', '')
writer.start('content', [('type', 'application/xml', None)])
writer.start(rootName, [
('xmlns:i', _XmlSchemas.SchemaInstance, None),
('xmlns', _XmlSchemas.ServiceBus, None),
])
if obj:
content_writer(writer, obj)
writer.end(rootName)
writer.end('content')
writer.end('entry')
xml = writer.xml()
writer.close()
return xml
custom_properties[name] = float(value)
if message_type == None:
message = Message(
respbody, service_instance, message_location, custom_properties,
'application/atom+xml;type=entry;charset=utf-8', broker_properties)
else:
message = Message(respbody, service_instance, message_location,
custom_properties, message_type, broker_properties)
return message
# convert functions
_etree_sb_feed_namespaces = {
'atom': _XmlSchemas.Atom,
'i': _XmlSchemas.SchemaInstance,
'sb': _XmlSchemas.ServiceBus,
'arrays': _XmlSchemas.SerializationArrays,
}
def _convert_response_to_rule(response):
root = ETree.fromstring(response.body)
return _convert_etree_element_to_rule(root)
def _convert_etree_element_to_rule(entry_element):
''' Converts entry element to rule object.
The format of xml for rule:
<content type="application/xml"></content>
def _convert_object_to_feed_entry(obj, rootName, content_writer):
updated_str = datetime.utcnow().isoformat()
if datetime.utcnow().utcoffset() is None:
updated_str += '+00:00'
writer = _XmlWriter()
writer.preprocessor('')
writer.start('entry', [
('xmlns:d', _XmlSchemas.DataServices, None),
('xmlns:m', _XmlSchemas.DataServicesMetadata, None),
('xmlns', _XmlSchemas.Atom, None),
])
writer.element('title', '')
writer.element('updated', updated_str)
writer.start('author')
writer.element('name', '')
writer.end('author')
writer.element('id', '')
writer.start('content', [('type', 'application/xml', None)])
writer.start(rootName, [
('xmlns:i', _XmlSchemas.SchemaInstance, None),
('xmlns', _XmlSchemas.ServiceBus, None),
])
if obj:
content_writer(writer, obj)
def _convert_object_to_feed_entry(obj, rootName, content_writer):
updated_str = datetime.utcnow().isoformat()
if datetime.utcnow().utcoffset() is None:
updated_str += '+00:00'
writer = _XmlWriter()
writer.preprocessor('')
writer.start('entry', [
('xmlns:d', _XmlSchemas.DataServices, None),
('xmlns:m', _XmlSchemas.DataServicesMetadata, None),
('xmlns', _XmlSchemas.Atom, None),
])
writer.element('title', '')
writer.element('updated', updated_str)
writer.start('author')
writer.element('name', '')
writer.end('author')
writer.element('id', '')
writer.start('content', [('type', 'application/xml', None)])
writer.start(rootName, [
('xmlns:i', _XmlSchemas.SchemaInstance, None),
('xmlns', _XmlSchemas.ServiceBus, None),
])
if obj: