Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
response = self.client.get(import_url)
self.assertRedirect(response, reverse('users.user_login'))
self.client.login(username='author', password='author')
response = self.client.get(import_url)
self.assertTrue(200, response.status_code)
self.assertEquals(response.request['PATH_INFO'], import_url)
csv_file = open('test_runner/blog/test_files/posts.csv', 'rb')
post_data = dict(csv_file=csv_file)
response = self.client.post(import_url, post_data, follow=True)
self.assertEqual(200, response.status_code)
task = ImportTask.objects.get()
self.assertEqual(json.loads(task.import_results), dict(records=4, errors=0, error_messages=[]))
# new posts should all have a new tag
self.assertEqual(Post.objects.filter(tags="new").count(), 4)
ImportTask.objects.all().delete()
csv_file = open('test_runner/blog/test_files/posts.xls', 'rb')
post_data = dict(csv_file=csv_file)
response = self.client.post(import_url, post_data, follow=True)
self.assertEqual(200, response.status_code)
task = ImportTask.objects.get()
self.assertEqual(json.loads(task.import_results), dict(records=4, errors=0, error_messages=[]))
task = ImportTask.objects.get()
self.assertEqual(json.loads(task.import_results), dict(records=0, errors=4,
error_messages=[dict(line=2, error='foo'),
dict(line=3, error='foo'),
dict(line=4, error='foo'),
dict(line=5, error='foo')]))
ImportTask.objects.all().delete()
csv_file = open('test_runner/blog/test_files/posts.xls', 'rb')
post_data = dict(csv_file=csv_file)
response = self.client.post(import_url, post_data, follow=True)
self.assertEqual(200, response.status_code)
task = ImportTask.objects.get()
self.assertEqual(json.loads(task.import_results), dict(records=0, errors=4,
error_messages=[dict(line=2, error='foo'),
dict(line=3, error='foo'),
dict(line=4, error='foo'),
dict(line=5, error='foo')]))
def csv_import(task_id): # pragma: no cover
task_obj = ImportTask.objects.get(pk=task_id)
log = StringIO()
task_obj.task_id = csv_import.request.id
task_obj.task_status = ImportTask.RUNNING
task_obj.log("Started import at %s" % timezone.now())
task_obj.log("--------------------------------")
task_obj.save()
try:
with transaction.atomic():
model = import_string(task_obj.model_class)
records = model.import_csv(task_obj, log)
task_obj.task_status = ImportTask.SUCCESS
task_obj.save()
task_obj.log(log.getvalue())
def faq_csv_import(org_id, task_id): # pragma: no cover
task = ImportTask.objects.get(id=task_id)
org = Org.objects.get(id=org_id)
task.task_id = faq_csv_import.request.id
task.log("Started import at %s" % timezone.now())
task.log("--------------------------------")
task.save()
try:
with transaction.atomic() and open(task.csv_file.path) as csv_file: # transaction prevents partial csv import
# Load csv into Dict
records = csv.DictReader(csv_file)
lines = 0
for line in records:
lines += 1