Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_filter_combo(self):
with requests_mock.mock() as m:
m.get(requests_mock.ANY)
url = "http://test/api/2.3/sites/dad65087-b08b-4603-af4e-2887b8aafc67/users"
opts = TSC.RequestOptions(pagesize=13, pagenumber=13)
opts.filter.add(TSC.Filter(TSC.RequestOptions.Field.LastLogin,
TSC.RequestOptions.Operator.GreaterThanOrEqual,
'2017-01-15T00:00:00:00Z'))
opts.filter.add(TSC.Filter(TSC.RequestOptions.Field.SiteRole,
TSC.RequestOptions.Operator.Equals,
'Publisher'))
resp = self.server.workbooks._make_request(requests.get,
url,
content=None,
request_object=opts,
auth_token='j80k54ll2lfMZ0tv97mlPvvSCRyD0DOM',
content_type='text/xml')
expected = 'pagenumber=13&pagesize=13&filter=lastlogin:gte:2017-01-15t00:00:00:00z,siterole:eq:publisher'
def test_filter_equals(self):
with open(FILTER_EQUALS, 'rb') as f:
response_xml = f.read().decode('utf-8')
with requests_mock.mock() as m:
m.get(self.baseurl + '/workbooks?filter=name:eq:RESTAPISample', text=response_xml)
req_option = TSC.RequestOptions()
req_option.filter.add(TSC.Filter(TSC.RequestOptions.Field.Name,
TSC.RequestOptions.Operator.Equals, 'RESTAPISample'))
matching_workbooks, pagination_item = self.server.workbooks.get(req_option)
self.assertEqual(2, pagination_item.total_available)
self.assertEqual('RESTAPISample', matching_workbooks[0].name)
self.assertEqual('RESTAPISample', matching_workbooks[1].name)
def deletesiteusers(self):
parser = argparse.ArgumentParser(description='Delete site users (from the site you are logged into). The users are read from the given CSV file. The file is a simple list of one user name per line')
parser.add_argument('users', help='File that contains a list of users (one per line) to remove from the site')
# now that we're inside a subcommand, ignore the first TWO argvs, ie the command (tabcmd) and the subcommand (deletesiteusers)
args = parser.parse_args(sys.argv[2:])
server = self.set_server_from_session_file()
with open(args.users, newline='') as f:
reader = csv.reader(f)
for row in reader:
# Set-up our filter so we get the user id we need
request_options = TSC.RequestOptions()
request_options.filter.add(TSC.Filter(TSC.RequestOptions.Field.Name, TSC.RequestOptions.Operator.Equals, row[0]))
users = list(TSC.Pager(server.users, request_options))
try:
server.users.remove(users[0].id)
except:
pass
# Determine and use the highest api version for the server
server.use_server_version()
group_name = 'SALES NORTHWEST'
# Try to create a group named "SALES NORTHWEST"
create_example_group(group_name, server)
group_name = 'SALES ROMANIA'
# Try to create a group named "SALES ROMANIA"
create_example_group(group_name, server)
# URL Encode the name of the group that we want to filter on
# i.e. turn spaces into plus signs
filter_group_name = 'SALES+ROMANIA'
options = TSC.RequestOptions()
options.filter.add(TSC.Filter(TSC.RequestOptions.Field.Name,
TSC.RequestOptions.Operator.Equals,
filter_group_name))
filtered_groups, _ = server.groups.get(req_options=options)
# Result can either be a matching group or an empty list
if filtered_groups:
group_name = filtered_groups.pop().name
print(group_name)
else:
error = "No project named '{}' found".format(filter_group_name)
print(error)
options = TSC.RequestOptions()
options.filter.add(TSC.Filter(TSC.RequestOptions.Field.Name,
TSC.RequestOptions.Operator.In,
['SALES+NORTHWEST', 'SALES+ROMANIA', 'this_group']))
# Iterate thru all sites, deleting each user from each StopIteration
for site in all_sites:
# Login to the site
sn = site.name
if site.name == 'Default': # fix up for default site
sn = ""
self.login_and_save_session_file(server._server_address, sn, args.username, args.password)
server = self.set_server_from_session_file()
with open(args.users, newline='') as f:
reader = csv.reader(f)
for row in reader:
# Set-up our filter so we get the user id we need
request_options = TSC.RequestOptions()
request_options.filter.add(TSC.Filter(TSC.RequestOptions.Field.Name, TSC.RequestOptions.Operator.Equals, row[0]))
users = list(TSC.Pager(server.users, request_options))
try:
server.users.remove(users[0].id)
except:
pass
# Sign in again to original site
osn = current_site.name
if current_site.name == 'Default':
osn = ""
self.login_and_save_session_file(server._server_address, osn, args.username, args.password)
def update_workbooks_by_paths(all_projects, materialized_views_config, server, workbook_path_mapping):
for workbook_name, workbook_paths in workbook_path_mapping.items():
req_option = TSC.RequestOptions()
req_option.filter.add(TSC.Filter(TSC.RequestOptions.Field.Name,
TSC.RequestOptions.Operator.Equals,
workbook_name))
workbooks = list(TSC.Pager(server.workbooks, req_option))
all_paths = set(workbook_paths[:])
for workbook in workbooks:
path = find_project_path(all_projects[workbook.project_id], all_projects, "")
if path in workbook_paths:
all_paths.remove(path)
workbook.materialized_views_config = materialized_views_config
server.workbooks.update(workbook)
print("Updated materialized views settings for workbook: {}".format(path + '/' + workbook.name))
for path in all_paths:
print("Cannot find workbook path: {}, each line should only contain one workbook path"
.format(path + '/' + workbook_name))
print('\n')
publish_mode = TSC.Server.PublishMode.Overwrite
extract_item = TSC.DatasourceItem(target_project.id, name)
server.datasources.publish(extract_item, our_file, publish_mode)
##### Workbook
elif our_file_type == self.TableauFileType.Workbook:
print('Processing a workbook...')
workbook_item = TSC.WorkbookItem(target_project.id, name, tabbed)
if publish_mode == TSC.Server.PublishMode.Overwrite:
# Use a filter to get the existing workbook
request_options = TSC.RequestOptions()
request_options.filter.add(TSC.Filter(TSC.RequestOptions.Field.Name, TSC.RequestOptions.Operator.Equals, name))
all_workbooks = list(TSC.Pager(server.workbooks, request_options))
workbook_item = all_workbooks[0]
workbook_item.show_tabs = tabbed
server.workbooks.publish(workbook_item, our_file, publish_mode, connection_credentials)
else:
print('Invalid file type. Must be one of [".tde", ".tds(x)", ".twb(x)"]')
help='desired logging level (set to error by default)')
args = parser.parse_args()
password = getpass.getpass("Password: ")
# Set logging level based on user input, or error by default
logging_level = getattr(logging, args.logging_level.upper())
logging.basicConfig(level=logging_level)
# Step 1: Sign in to server
tableau_auth = TSC.TableauAuth(args.username, password)
server = TSC.Server(args.server)
with server.auth.sign_in(tableau_auth):
# Step 2: Query workbook to move
req_option = TSC.RequestOptions()
req_option.filter.add(TSC.Filter(TSC.RequestOptions.Field.Name,
TSC.RequestOptions.Operator.Equals, args.workbook_name))
all_workbooks, pagination_item = server.workbooks.get(req_option)
# Step 3: Find destination project
all_projects, pagination_item = server.projects.get()
dest_project = next((project for project in all_projects if project.name == args.destination_project), None)
if dest_project is not None:
# Step 4: Update workbook with new project id
if all_workbooks:
print("Old project: {}".format(all_workbooks[0].project_name))
all_workbooks[0].project_id = dest_project.id
target_workbook = server.workbooks.update(all_workbooks[0])
print("New project: {}".format(target_workbook.project_name))
else:
error = "No workbook named {} found.".format(args.workbook_name)
def update_workbooks_by_names(name_list, server, materialized_views_config):
workbook_names = sanitize_workbook_list(name_list, "name")
for workbook_name in workbook_names:
req_option = TSC.RequestOptions()
req_option.filter.add(TSC.Filter(TSC.RequestOptions.Field.Name,
TSC.RequestOptions.Operator.Equals,
workbook_name.rstrip()))
workbooks = list(TSC.Pager(server.workbooks, req_option))
if len(workbooks) == 0:
print("Cannot find workbook name: {}, each line should only contain one workbook name"
.format(workbook_name))
for workbook in workbooks:
workbook.materialized_views_config = materialized_views_config
server.workbooks.update(workbook)
print("Updated materialized views settings for workbook: {}".format(workbook.name))
print('\n')