Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def update(self, site_item):
if not site_item.id:
error = "Site item missing ID."
raise MissingRequiredFieldError(error)
if site_item.admin_mode:
if site_item.admin_mode == SiteItem.AdminMode.ContentOnly and site_item.user_quota:
error = 'You cannot set admin_mode to ContentOnly and also set a user quota'
raise ValueError(error)
url = "{0}/{1}".format(self.baseurl, site_item.id)
update_req = RequestFactory.Site.update_req(site_item)
server_response = self.put_request(url, update_req)
logger.info('Updated site item (ID: {0})'.format(site_item.id))
update_site = copy.copy(site_item)
return update_site._parse_common_tags(server_response.content, self.parent_srv.namespace)
def update(self, group_item, default_site_role=UNLICENSED_USER):
if not group_item.id:
error = "Group item missing ID."
raise MissingRequiredFieldError(error)
url = "{0}/{1}".format(self.baseurl, group_item.id)
update_req = RequestFactory.Group.update_req(group_item, default_site_role)
server_response = self.put_request(url, update_req)
logger.info('Updated group item (ID: {0})'.format(group_item.id))
updated_group = GroupItem.from_response(server_response.content, self.parent_srv.namespace)[0]
return updated_group
def update(self, workbook_item):
if not workbook_item.id:
error = "Workbook item missing ID. Workbook must be retrieved from server first."
raise MissingRequiredFieldError(error)
self._resource_tagger.update_tags(self.baseurl, workbook_item)
# Update the workbook itself
url = "{0}/{1}".format(self.baseurl, workbook_item.id)
update_req = RequestFactory.Workbook.update_req(workbook_item)
server_response = self.put_request(url, update_req)
logger.info('Updated workbook item (ID: {0}'.format(workbook_item.id))
updated_workbook = copy.copy(workbook_item)
return updated_workbook._parse_common_tags(server_response.content, self.parent_srv.namespace)
def create(self, site_item):
if site_item.admin_mode:
if site_item.admin_mode == SiteItem.AdminMode.ContentOnly and site_item.user_quota:
error = 'You cannot set admin_mode to ContentOnly and also set a user quota'
raise ValueError(error)
url = self.baseurl
create_req = RequestFactory.Site.create_req(site_item)
server_response = self.post_request(url, create_req)
new_site = SiteItem.from_response(server_response.content, self.parent_srv.namespace)[0]
logger.info('Created new site (ID: {0})'.format(new_site.id))
return new_site
def refresh(self, workbook_id):
url = "{0}/{1}/refresh".format(self.baseurl, workbook_id)
empty_req = RequestFactory.Empty.empty_req()
server_response = self.post_request(url, empty_req)
new_job = JobItem.from_response(server_response.content, self.parent_srv.namespace)[0]
return new_job
def update(self, project_item):
if not project_item.id:
error = "Project item missing ID."
raise MissingRequiredFieldError(error)
url = "{0}/{1}".format(self.baseurl, project_item.id)
update_req = RequestFactory.Project.update_req(project_item)
server_response = self.put_request(url, update_req)
logger.info('Updated project item (ID: {0})'.format(project_item.id))
updated_project = ProjectItem.from_response(server_response.content, self.parent_srv.namespace)[0]
return updated_project
url = "{0}?flowType={1}".format(self.baseurl, file_extension)
if mode == self.parent_srv.PublishMode.Overwrite or mode == self.parent_srv.PublishMode.Append:
url += '&{0}=true'.format(mode.lower())
# Determine if chunking is required (64MB is the limit for single upload method)
if os.path.getsize(file_path) >= FILESIZE_LIMIT:
logger.info('Publishing {0} to server with chunking method (flow over 64MB)'.format(filename))
upload_session_id = Fileuploads.upload_chunks(self.parent_srv, file_path)
url = "{0}&uploadSessionId={1}".format(url, upload_session_id)
xml_request, content_type = RequestFactory.Flow.publish_req_chunked(flow_item,
connections)
else:
logger.info('Publishing {0} to server'.format(filename))
with open(file_path, 'rb') as f:
file_contents = f.read()
xml_request, content_type = RequestFactory.Flow.publish_req(flow_item,
filename,
file_contents,
connections)
# Send the publishing request to server
try:
server_response = self.post_request(url, xml_request, content_type)
except InternalServerError as err:
if err.code == 504:
err.content = "Timeout error while publishing. Please use asynchronous publishing to avoid timeouts."
raise err
else:
new_flow = FlowItem.from_response(server_response.content, self.parent_srv.namespace)[0]
logger.info('Published {0} (ID: {1})'.format(filename, new_flow.id))
return new_flow
def update_connection(self, workbook_item, connection_item):
url = "{0}/{1}/connections/{2}".format(self.baseurl, workbook_item.id, connection_item.id)
update_req = RequestFactory.Connection.update_req(connection_item)
server_response = self.put_request(url, update_req)
connection = ConnectionItem.from_response(server_response.content, self.parent_srv.namespace)[0]
logger.info('Updated workbook item (ID: {0} & connection item {1}'.format(workbook_item.id,
connection_item.id))
return connection
def add(self, user_item):
url = self.baseurl
add_req = RequestFactory.User.add_req(user_item)
server_response = self.post_request(url, add_req)
new_user = UserItem.from_response(server_response.content, self.parent_srv.namespace).pop()
logger.info('Added new user (ID: {0})'.format(user_item.id))
return new_user
def create(self, webhook_item):
url = self.baseurl
create_req = RequestFactory.Webhook.create_req(webhook_item)
server_response = self.post_request(url, create_req)
new_webhook = WebhookItem.from_response(server_response.content, self.parent_srv.namespace)[0]
logger.info('Created new webhook (ID: {0})'.format(new_webhook.id))
return new_webhook