Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def get_eventhub_info_async(self):
"""
Get details on the specified EventHub async.
:rtype: dict
"""
alt_creds = {
"username": self._auth_config.get("iot_username"),
"password":self._auth_config.get("iot_password")}
try:
mgmt_auth = self._create_auth(**alt_creds)
mgmt_client = AMQPClientAsync(self.mgmt_target, auth=mgmt_auth, debug=self.debug)
await mgmt_client.open_async()
mgmt_msg = Message(application_properties={'name': self.eh_name})
response = await mgmt_client.mgmt_request_async(
mgmt_msg,
constants.READ_OPERATION,
op_type=b'com.microsoft:eventhub',
status_code_field=b'status-code',
description_fields=b'status-description')
eh_info = response.get_data()
output = {}
if eh_info:
output['name'] = eh_info[b'name'].decode('utf-8')
output['type'] = eh_info[b'type'].decode('utf-8')
output['created_at'] = datetime.datetime.fromtimestamp(float(eh_info[b'created_at'])/1000)
output['partition_count'] = eh_info[b'partition_count']
output['partition_ids'] = [p.decode('utf-8') for p in eh_info[b'partition_ids']]
return output
finally:
def __init__(self, body=None):
# type: (Union[str, bytes, List[AnyStr]]) -> None
self._last_enqueued_event_properties = {} # type: Dict[str, Any]
if body and isinstance(body, list):
self.message = Message(body[0])
for more in body[1:]:
self.message._body.append(more) # pylint: disable=protected-access
elif body is None:
raise ValueError("EventData cannot be None.")
else:
self.message = Message(body)
self.message.annotations = {}
self.message.application_properties = {}
async def _query_meta_data(self, endpoint, path, auth):
source = uamqp.address.Source(endpoint)
receive_client = uamqp.ReceiveClientAsync(source, auth=auth, timeout=30000, debug=DEBUG)
try:
await receive_client.open_async()
message = uamqp.Message(application_properties={'name': path})
response = await receive_client.mgmt_request_async(
message,
b'READ',
op_type=b'com.microsoft:eventhub',
status_code_field=b'status-code',
description_fields=b'status-description',
timeout=30000
)
test = response.get_data()
return test
finally:
await receive_client.close_async()
msg_props.content_type = content_type
# Ensures valid json when content_type is application/json
content_type = content_type.lower()
if content_type == "application/json":
data = json.dumps(process_json_arg(data, "data"))
if content_encoding:
msg_props.content_encoding = content_encoding
if expiry_time_utc:
msg_props.absolute_expiry_time = int(expiry_time_utc)
msg_body = str.encode(data)
message = uamqp.Message(
body=msg_body, properties=msg_props, application_properties=app_props
)
operation = "/messages/devicebound"
endpoint = AmqpBuilder.build_iothub_amqp_endpoint_from_target(target)
endpoint_with_op = endpoint + operation
client = uamqp.SendClient(
target="amqps://" + endpoint_with_op, client_name=_get_container_id(), debug=DEBUG
)
client.queue_message(message)
result = client.send_all_messages()
errors = [m for m in result if m == uamqp.constants.MessageState.SendFailed]
return target_msg_id, errors
def _build_message(self, body):
if isinstance(body, list) and body: # TODO: This only works for a list of bytes/strings
self.message = uamqp.Message(body[0], properties=self.properties, header=self.header)
for more in body[1:]:
self.message._body.append(more) # pylint: disable=protected-access
elif body is None:
raise ValueError("Message body cannot be None.")
else:
self.message = uamqp.Message(body, properties=self.properties, header=self.header)
def __init__(self, body=None):
# type: (Union[str, bytes, List[AnyStr]]) -> None
self._last_enqueued_event_properties = {} # type: Dict[str, Any]
if body and isinstance(body, list):
self.message = Message(body[0])
for more in body[1:]:
self.message._body.append(more) # pylint: disable=protected-access
elif body is None:
raise ValueError("EventData cannot be None.")
else:
self.message = Message(body)
self.message.annotations = {}
self.message.application_properties = {}
def _build_message(self, body):
if isinstance(body, list) and body: # TODO: This only works for a list of bytes/strings
self.message = uamqp.Message(body[0], properties=self.properties, header=self.header)
for more in body[1:]:
self.message._body.append(more) # pylint: disable=protected-access
elif body is None:
raise ValueError("Message body cannot be None.")
else:
self.message = uamqp.Message(body, properties=self.properties, header=self.header)
def get_properties(self):
# type:() -> Dict[str, Any]
"""
Get properties of the specified EventHub.
Keys in the details dictionary include:
-'path'
-'created_at'
-'partition_ids'
:rtype: dict
:raises: ~azure.eventhub.ConnectError
"""
mgmt_msg = Message(application_properties={'name': self.eh_name})
response = self._management_request(mgmt_msg, op_type=b'com.microsoft:eventhub')
output = {}
eh_info = response.get_data()
if eh_info:
output['path'] = eh_info[b'name'].decode('utf-8')
output['created_at'] = datetime.datetime.utcfromtimestamp(float(eh_info[b'created_at']) / 1000)
output['partition_ids'] = [p.decode('utf-8') for p in eh_info[b'partition_ids']]
return output
if batch:
self.message = BatchMessage(data=batch, multi_messages=True, properties=self.msg_properties)
elif message:
self.message = message
self.msg_properties = message.properties
self._annotations = message.annotations
self._app_properties = message.application_properties
else:
if isinstance(body, list) and body:
self.message = Message(body[0], properties=self.msg_properties)
for more in body[1:]:
self.message._body.append(more) # pylint: disable=protected-access
elif body is None:
raise ValueError("EventData cannot be None.")
else:
self.message = Message(body, properties=self.msg_properties)