Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
response_xml = f.read().decode("utf-8")
with requests_mock.mock() as m:
m.post(self.baseurl, text=response_xml)
daily_interval = TSC.DailyInterval(time(4, 50))
new_schedule = TSC.ScheduleItem("daily-schedule-1", 90, TSC.ScheduleItem.Type.Subscription,
TSC.ScheduleItem.ExecutionOrder.Serial, daily_interval)
new_schedule = self.server.schedules.create(new_schedule)
self.assertEqual("907cae38-72fd-417c-892a-95540c4664cd", new_schedule.id)
self.assertEqual("daily-schedule-1", new_schedule.name)
self.assertEqual("Active", new_schedule.state)
self.assertEqual(90, new_schedule.priority)
self.assertEqual("2016-09-15T21:01:09Z", format_datetime(new_schedule.created_at))
self.assertEqual("2016-09-15T21:01:09Z", format_datetime(new_schedule.updated_at))
self.assertEqual(TSC.ScheduleItem.Type.Subscription, new_schedule.schedule_type)
self.assertEqual("2016-09-16T11:45:00Z", format_datetime(new_schedule.next_run_at))
self.assertEqual(TSC.ScheduleItem.ExecutionOrder.Serial, new_schedule.execution_order)
self.assertEqual(time(4, 45), new_schedule.interval_item.start_time)
response_xml = f.read().decode("utf-8")
with requests_mock.mock() as m:
m.post(self.baseurl, text=response_xml)
monthly_interval = TSC.MonthlyInterval(time(7), 12)
new_schedule = TSC.ScheduleItem("monthly-schedule-1", 20, TSC.ScheduleItem.Type.Extract,
TSC.ScheduleItem.ExecutionOrder.Serial, monthly_interval)
new_schedule = self.server.schedules.create(new_schedule)
self.assertEqual("e06a7c75-5576-4f68-882d-8909d0219326", new_schedule.id)
self.assertEqual("monthly-schedule-1", new_schedule.name)
self.assertEqual("Active", new_schedule.state)
self.assertEqual(20, new_schedule.priority)
self.assertEqual("2016-09-15T21:16:56Z", format_datetime(new_schedule.created_at))
self.assertEqual("2016-09-15T21:16:56Z", format_datetime(new_schedule.updated_at))
self.assertEqual(TSC.ScheduleItem.Type.Extract, new_schedule.schedule_type)
self.assertEqual("2016-10-12T14:00:00Z", format_datetime(new_schedule.next_run_at))
self.assertEqual(TSC.ScheduleItem.ExecutionOrder.Serial, new_schedule.execution_order)
self.assertEqual(time(7), new_schedule.interval_item.start_time)
self.assertEqual("12", new_schedule.interval_item.interval)
response_xml = read_xml_asset(PUBLISH_XML)
with requests_mock.mock() as m:
m.post(self.baseurl, text=response_xml)
new_datasource = TSC.DatasourceItem('SampleDS', 'ee8c6e70-43b6-11e6-af4f-f7b0d8e20760')
publish_mode = self.server.PublishMode.CreateNew
new_datasource = self.server.datasources.publish(new_datasource,
asset('SampleDS.tds'),
mode=publish_mode)
self.assertEqual('e76a1461-3b1d-4588-bf1b-17551a879ad9', new_datasource.id)
self.assertEqual('SampleDS', new_datasource.name)
self.assertEqual('SampleDS', new_datasource.content_url)
self.assertEqual('dataengine', new_datasource.datasource_type)
self.assertEqual('2016-08-11T21:22:40Z', format_datetime(new_datasource.created_at))
self.assertEqual('2016-08-17T23:37:08Z', format_datetime(new_datasource.updated_at))
self.assertEqual('ee8c6e70-43b6-11e6-af4f-f7b0d8e20760', new_datasource.project_id)
self.assertEqual('default', new_datasource.project_name)
self.assertEqual('5de011f8-5aa9-4d5b-b991-f462c8dd6bb7', new_datasource.owner_id)
def test_create_daily(self):
with open(CREATE_DAILY_XML, "rb") as f:
response_xml = f.read().decode("utf-8")
with requests_mock.mock() as m:
m.post(self.baseurl, text=response_xml)
daily_interval = TSC.DailyInterval(time(4, 50))
new_schedule = TSC.ScheduleItem("daily-schedule-1", 90, TSC.ScheduleItem.Type.Subscription,
TSC.ScheduleItem.ExecutionOrder.Serial, daily_interval)
new_schedule = self.server.schedules.create(new_schedule)
self.assertEqual("907cae38-72fd-417c-892a-95540c4664cd", new_schedule.id)
self.assertEqual("daily-schedule-1", new_schedule.name)
self.assertEqual("Active", new_schedule.state)
self.assertEqual(90, new_schedule.priority)
self.assertEqual("2016-09-15T21:01:09Z", format_datetime(new_schedule.created_at))
self.assertEqual("2016-09-15T21:01:09Z", format_datetime(new_schedule.updated_at))
self.assertEqual(TSC.ScheduleItem.Type.Subscription, new_schedule.schedule_type)
self.assertEqual("2016-09-16T11:45:00Z", format_datetime(new_schedule.next_run_at))
self.assertEqual(TSC.ScheduleItem.ExecutionOrder.Serial, new_schedule.execution_order)
self.assertEqual(time(4, 45), new_schedule.interval_item.start_time)
def test_get_by_id(self):
response_xml = read_xml_asset(GET_BY_ID_XML)
with requests_mock.mock() as m:
m.get(self.baseurl + '/9dbd2263-16b5-46e1-9c43-a76bb8ab65fb', text=response_xml)
single_datasource = self.server.datasources.get_by_id('9dbd2263-16b5-46e1-9c43-a76bb8ab65fb')
self.assertEqual('9dbd2263-16b5-46e1-9c43-a76bb8ab65fb', single_datasource.id)
self.assertEqual('dataengine', single_datasource.datasource_type)
self.assertEqual('Sampledatasource', single_datasource.content_url)
self.assertEqual('2016-08-04T21:31:55Z', format_datetime(single_datasource.created_at))
self.assertEqual('2016-08-04T21:31:55Z', format_datetime(single_datasource.updated_at))
self.assertEqual('default', single_datasource.project_name)
self.assertEqual('Sample datasource', single_datasource.name)
self.assertEqual('ee8c6e70-43b6-11e6-af4f-f7b0d8e20760', single_datasource.project_id)
self.assertEqual('5de011f8-5aa9-4d5b-b991-f462c8dd6bb7', single_datasource.owner_id)
self.assertEqual(set(['world', 'indicators', 'sample']), single_datasource.tags)
def test_get_by_id(self):
with open(GET_BY_ID_XML, 'rb') as f:
response_xml = f.read().decode('utf-8')
with requests_mock.mock() as m:
m.get(self.baseurl + '/dd2239f6-ddf1-4107-981a-4cf94e415794', text=response_xml)
single_user = self.server.users.get_by_id('dd2239f6-ddf1-4107-981a-4cf94e415794')
self.assertEqual('dd2239f6-ddf1-4107-981a-4cf94e415794', single_user.id)
self.assertEqual('alice', single_user.name)
self.assertEqual('Alice', single_user.fullname)
self.assertEqual('Publisher', single_user.site_role)
self.assertEqual('ServerDefault', single_user.auth_setting)
self.assertEqual('2016-08-16T23:17:06Z', format_datetime(single_user.last_login))
self.assertEqual('local', single_user.domain_name)
self.assertEqual(2, pagination_item.total_available)
self.assertEqual("c9cff7f9-309c-4361-99ff-d4ba8c9f5467", extract.id)
self.assertEqual("Weekday early mornings", extract.name)
self.assertEqual("Active", extract.state)
self.assertEqual(50, extract.priority)
self.assertEqual("2016-07-06T20:19:00Z", format_datetime(extract.created_at))
self.assertEqual("2016-09-13T11:00:32Z", format_datetime(extract.updated_at))
self.assertEqual("Extract", extract.schedule_type)
self.assertEqual("2016-09-14T11:00:00Z", format_datetime(extract.next_run_at))
self.assertEqual("bcb79d07-6e47-472f-8a65-d7f51f40c36c", subscription.id)
self.assertEqual("Saturday night", subscription.name)
self.assertEqual("Active", subscription.state)
self.assertEqual(80, subscription.priority)
self.assertEqual("2016-07-07T20:19:00Z", format_datetime(subscription.created_at))
self.assertEqual("2016-09-12T16:39:38Z", format_datetime(subscription.updated_at))
self.assertEqual("Subscription", subscription.schedule_type)
self.assertEqual("2016-09-18T06:00:00Z", format_datetime(subscription.next_run_at))
self.assertEqual("f456e8f2-aeb2-4a8e-b823-00b6f08640f0", flow.id)
self.assertEqual("First of the month 1:00AM", flow.name)
self.assertEqual("Active", flow.state)
self.assertEqual(50, flow.priority)
self.assertEqual("2019-02-19T18:52:19Z", format_datetime(flow.created_at))
self.assertEqual("2019-02-19T18:55:51Z", format_datetime(flow.updated_at))
self.assertEqual("Flow", flow.schedule_type)
self.assertEqual("2019-03-01T09:00:00Z", format_datetime(flow.next_run_at))
def test_populate_users(self):
with open(POPULATE_USERS, 'rb') as f:
response_xml = f.read().decode('utf-8')
with requests_mock.mock() as m:
m.get(self.baseurl + '/e7833b48-c6f7-47b5-a2a7-36e7dd232758/users?pageNumber=1&pageSize=100',
text=response_xml, complete_qs=True)
single_group = TSC.GroupItem(name='Test Group')
single_group._id = 'e7833b48-c6f7-47b5-a2a7-36e7dd232758'
self.server.groups.populate_users(single_group)
self.assertEqual(1, len(list(single_group.users)))
user = list(single_group.users).pop()
self.assertEqual('dd2239f6-ddf1-4107-981a-4cf94e415794', user.id)
self.assertEqual('alice', user.name)
self.assertEqual('Publisher', user.site_role)
self.assertEqual('2016-08-16T23:17:06Z', format_datetime(user.last_login))
show_tabs=False,
project_id='ee8c6e70-43b6-11e6-af4f-f7b0d8e20760')
sample_workbook = os.path.join(TEST_ASSET_DIR, 'SampleWB.twbx')
publish_mode = self.server.PublishMode.CreateNew
new_workbook = self.server.workbooks.publish(new_workbook,
sample_workbook,
publish_mode)
self.assertEqual('a8076ca1-e9d8-495e-bae6-c684dbb55836', new_workbook.id)
self.assertEqual('RESTAPISample', new_workbook.name)
self.assertEqual('RESTAPISample_0', new_workbook.content_url)
self.assertEqual(False, new_workbook.show_tabs)
self.assertEqual(1, new_workbook.size)
self.assertEqual('2016-08-18T18:33:24Z', format_datetime(new_workbook.created_at))
self.assertEqual('2016-08-18T20:31:34Z', format_datetime(new_workbook.updated_at))
self.assertEqual('ee8c6e70-43b6-11e6-af4f-f7b0d8e20760', new_workbook.project_id)
self.assertEqual('default', new_workbook.project_name)
self.assertEqual('5de011f8-5aa9-4d5b-b991-f462c8dd6bb7', new_workbook.owner_id)
self.assertEqual('fe0b4e89-73f4-435e-952d-3a263fbfa56c', new_workbook.views[0].id)
self.assertEqual('GDP per capita', new_workbook.views[0].name)
self.assertEqual('RESTAPISample_0/sheets/GDPpercapita', new_workbook.views[0].content_url)
with open(CREATE_WEEKLY_XML, "rb") as f:
response_xml = f.read().decode("utf-8")
with requests_mock.mock() as m:
m.post(self.baseurl, text=response_xml)
weekly_interval = TSC.WeeklyInterval(time(9, 15), TSC.IntervalItem.Day.Monday,
TSC.IntervalItem.Day.Wednesday,
TSC.IntervalItem.Day.Friday)
new_schedule = TSC.ScheduleItem("weekly-schedule-1", 80, TSC.ScheduleItem.Type.Extract,
TSC.ScheduleItem.ExecutionOrder.Parallel, weekly_interval)
new_schedule = self.server.schedules.create(new_schedule)
self.assertEqual("1adff386-6be0-4958-9f81-a35e676932bf", new_schedule.id)
self.assertEqual("weekly-schedule-1", new_schedule.name)
self.assertEqual("Active", new_schedule.state)
self.assertEqual(80, new_schedule.priority)
self.assertEqual("2016-09-15T21:12:50Z", format_datetime(new_schedule.created_at))
self.assertEqual("2016-09-15T21:12:50Z", format_datetime(new_schedule.updated_at))
self.assertEqual(TSC.ScheduleItem.Type.Extract, new_schedule.schedule_type)
self.assertEqual("2016-09-16T16:15:00Z", format_datetime(new_schedule.next_run_at))
self.assertEqual(TSC.ScheduleItem.ExecutionOrder.Parallel, new_schedule.execution_order)
self.assertEqual(time(9, 15), new_schedule.interval_item.start_time)
self.assertEqual(("Monday", "Wednesday", "Friday"),
new_schedule.interval_item.interval)