Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_workbook_connections(self, workbook_item, req_options=None):
url = "{0}/{1}/connections".format(self.baseurl, workbook_item.id)
server_response = self.get_request(url, req_options)
connections = ConnectionItem.from_response(server_response.content, self.parent_srv.namespace)
return connections
def update_connection(self, workbook_item, connection_item):
url = "{0}/{1}/connections/{2}".format(self.baseurl, workbook_item.id, connection_item.id)
update_req = RequestFactory.Connection.update_req(connection_item)
server_response = self.put_request(url, update_req)
connection = ConnectionItem.from_response(server_response.content, self.parent_srv.namespace)[0]
logger.info('Updated workbook item (ID: {0} & connection item {1}'.format(workbook_item.id,
connection_item.id))
return connection
def _get_flow_connections(self, flow_item, req_options=None):
url = '{0}/{1}/connections'.format(self.baseurl, flow_item.id)
server_response = self.get_request(url, req_options)
connections = ConnectionItem.from_response(server_response.content, self.parent_srv.namespace)
return connections
def update_connection(self, flow_item, connection_item):
url = "{0}/{1}/connections/{2}".format(self.baseurl, flow_item.id, connection_item.id)
update_req = RequestFactory.Connection.update_req(connection_item)
server_response = self.put_request(url, update_req)
connection = ConnectionItem.from_response(server_response.content, self.parent_srv.namespace)[0]
logger.info('Updated flow item (ID: {0} & connection item {1}'.format(flow_item.id,
connection_item.id))
return connection
def _get_workbook_connections(self, workbook_item, req_options=None):
url = "{0}/{1}/connections".format(self.baseurl, workbook_item.id)
server_response = self.get_request(url, req_options)
connections = ConnectionItem.from_response(server_response.content, self.parent_srv.namespace)
return connections
def update_connection(self, datasource_item, connection_item):
url = "{0}/{1}/connections/{2}".format(self.baseurl, datasource_item.id, connection_item.id)
update_req = RequestFactory.Connection.update_req(connection_item)
server_response = self.put_request(url, update_req)
connection = ConnectionItem.from_response(server_response.content, self.parent_srv.namespace)[0]
logger.info('Updated datasource item (ID: {0} & connection item {1}'.format(datasource_item.id,
connection_item.id))
return connection
def update_connection(self, workbook_item, connection_item):
url = "{0}/{1}/connections/{2}".format(self.baseurl, workbook_item.id, connection_item.id)
update_req = RequestFactory.Connection.update_req(connection_item)
server_response = self.put_request(url, update_req)
connection = ConnectionItem.from_response(server_response.content, self.parent_srv.namespace)[0]
logger.info('Updated workbook item (ID: {0} & connection item {1}'.format(workbook_item.id,
connection_item.id))
return connection