Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@api(version="2.0")
def get_by_content_url(self, content_url):
if content_url is None:
error = "Content URL undefined."
raise ValueError(error)
logger.info('Querying single site (Content URL: {0})'.format(content_url))
url = "{0}/{1}?key=contentUrl".format(self.baseurl, content_url)
server_response = self.get_request(url)
return SiteItem.from_response(server_response.content, self.parent_srv.namespace)[0]
@api(version='2.6')
def get(self, req_options=None):
logger.info('Querying all tasks for the site')
url = self.baseurl
server_response = self.get_request(url, req_options)
pagination_item = PaginationItem.from_response(server_response.content, self.parent_srv.namespace)
all_extract_tasks = TaskItem.from_response(server_response.content, self.parent_srv.namespace)
return all_extract_tasks, pagination_item
@api(version="2.5")
def populate_image(self, view_item, req_options=None):
if not view_item.id:
error = "View item missing ID."
raise MissingRequiredFieldError(error)
def image_fetcher():
return self._get_view_image(view_item, req_options)
view_item._set_image(image_fetcher)
logger.info("Populated image for view (ID: {0})".format(view_item.id))
@api(version="3.3")
def download(self, flow_id, filepath=None):
if not flow_id:
error = "Flow ID undefined."
raise ValueError(error)
url = "{0}/{1}/content".format(self.baseurl, flow_id)
with closing(self.get_request(url, parameters={'stream': True})) as server_response:
_, params = cgi.parse_header(server_response.headers['Content-Disposition'])
filename = to_filename(os.path.basename(params['filename']))
download_path = make_download_path(filepath, filename)
with open(download_path, 'wb') as f:
for chunk in server_response.iter_content(1024): # 1KB
f.write(chunk)
@api(version="3.3")
def update(self, flow_item):
if not flow_item.id:
error = 'Flow item missing ID. Flow must be retrieved from server first.'
raise MissingRequiredFieldError(error)
self._resource_tagger.update_tags(self.baseurl, flow_item)
# Update the flow itself
url = "{0}/{1}".format(self.baseurl, flow_item.id)
update_req = RequestFactory.Flow.update_req(flow_item)
server_response = self.put_request(url, update_req)
logger.info('Updated flow item (ID: {0})'.format(flow_item.id))
updated_flow = copy.copy(flow_item)
return updated_flow._parse_common_elements(server_response.content, self.parent_srv.namespace)
@api(version="2.0")
def populate_views(self, workbook_item, usage=False):
if not workbook_item.id:
error = "Workbook item missing ID. Workbook must be retrieved from server first."
raise MissingRequiredFieldError(error)
def view_fetcher():
return self._get_views_for_workbook(workbook_item, usage)
workbook_item._set_views(view_fetcher)
logger.info('Populated views for workbook (ID: {0}'.format(workbook_item.id))
@api(version="3.6")
def delete(self, webhook_id):
if not webhook_id:
error = "Webhook ID undefined."
raise ValueError(error)
url = "{0}/{1}".format(self.baseurl, webhook_id)
self.delete_request(url)
logger.info('Deleted single webhook (ID: {0})'.format(webhook_id))
@api(version='2.6')
def get_by_id(self, task_id):
if not task_id:
error = "No Task ID provided"
raise ValueError(error)
logger.info("Querying a single task by id ({})".format(task_id))
url = "{}/{}".format(self.baseurl, task_id)
server_response = self.get_request(url)
return TaskItem.from_response(server_response.content, self.parent_srv.namespace)[0]
@api(version="2.0")
def create(self, group_item):
url = self.baseurl
create_req = RequestFactory.Group.create_req(group_item)
server_response = self.post_request(url, create_req)
return GroupItem.from_response(server_response.content, self.parent_srv.namespace)[0]
@api(version='2.1')
def delete_datasource_default_permissions(self, item):
self._default_permissions.delete_default_permissions(item, Permission.Resource.Datasource)