Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def discover_dialect(sample, encoding=None, delimiters=(b",", b";", b"\t", b"|")):
"""Discover a CSV dialect based on a sample size.
`encoding` is not used (Python 2)
"""
try:
dialect = sniffer.sniff(sample, delimiters=delimiters)
except unicodecsv.Error: # Couldn't detect: fall back to 'excel'
dialect = unicodecsv.excel
fix_dialect(dialect)
return dialect
for row in dnsreader:
while self._pause:
if self._shutdown:
LOGGER.debug("Shutdown received while paused")
break
time.sleep(.5)
if self._shutdown:
LOGGER.debug("Shutdown received")
break
if row is None or not row:
LOGGER.warning("Skipping empty row in file %s"
% (filename))
continue
self.data_queue.put({'header': header, 'row': row})
except unicodecsv.Error as e:
LOGGER.exception("CSV Parse Error in file %s - line %i\n"
% (os.path.basename(filename),
dnsreader.line_num))
except Exception as e:
LOGGER.exception("Unable to process file %s" % (filename))
def _csv_data_from_file(csv_file, preview_limit=10):
try:
dialect = unicodecsv.Sniffer().sniff(csv_file.read(1024))
csv_file.seek(0)
csv_reader = unicodecsv.reader(csv_file, dialect)
csv_values = itertools.islice(csv_reader, preview_limit)
csv_values = zip(*csv_values)
return {'success': True, 'data': csv_values}
except unicodecsv.Error as exc:
return {'success': False, 'error': exc.message}
except UnicodeDecodeError as exc:
return {'success': False, 'error': exc}
def parse_csv(work_queue, collection, filename, options):
if options.verbose:
print "Processing file: %s" % filename
csvfile = open(filename, 'rb')
dnsreader = unicodecsv.reader(csvfile, strict = True, skipinitialspace = True)
try:
header = dnsreader.next()
if not check_header(header):
raise unicodecsv.Error('CSV header not found')
for row in dnsreader:
work_queue.put({'header': header, 'row': row})
except unicodecsv.Error, e:
sys.stderr.write("CSV Parse Error in file %s - line %i\n\t%s\n" % (os.path.basename(filename), dnsreader.line_num, str(e)))
if write_header:
writer.writeheader()
for row in data:
# Not ready to send these data via CSV attachment as they break
# across multiple columns.
row.pop('inventory_data', None)
writer.writerow(normalize_nested_dicts(row))
# This must be closed before returned for loading.
csv_file.close()
yield csv_file
# Remove the csv file after loading.
os.remove(csv_file.name)
except (OSError, csv.Error) as e:
raise CSVFileError(resource_name, e)
print('{0} Assets converted from source file'.format(len(woosmap_assets)))
woosmap_api_helper = Woosmap()
# /!\ deleting existing assets before posting new ones /!\
woosmap_api_helper.delete()
count_imported_assets = 0
for chunk in batch(woosmap_assets, BATCH_SIZE):
imported_success = import_assets(chunk, woosmap_api_helper)
if imported_success:
count_imported_assets += len(chunk)
woosmap_api_helper.end()
print("{0} Assets successfully imported".format(count_imported_assets))
except csv.Error as csv_error:
print('Error in CSV file found: {0}'.format(csv_error))
except Exception as exception:
print("Script Failed! {0}".format(exception))
finally:
end = time.time()
print('...Script ended in {0} seconds'.format(end - start))
def parse_csv(work_queue, collection, filename, options):
if options.verbose:
print "Processing file: %s" % filename
csvfile = open(filename, 'rb')
dnsreader = unicodecsv.reader(csvfile, strict = True, skipinitialspace = True)
try:
header = dnsreader.next()
if not check_header(header):
raise unicodecsv.Error('CSV header not found')
for row in dnsreader:
work_queue.put({'header': header, 'row': row})
except unicodecsv.Error, e:
sys.stderr.write("CSV Parse Error in file %s - line %i\n\t%s\n" % (os.path.basename(filename), dnsreader.line_num, str(e)))
def render_csv_response(self, filename, headers, rows):
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = \
'attachment; filename=' + filename + '.csv'
writer = csv.writer(response)
writer.writerow(headers)
for row in rows:
try:
writer.writerow(row)
except csv.Error:
pass
except UnicodeEncodeError:
pass
return response
skipinitialspace=True)
except Exception as e:
LOGGER.exception("Unable to setup csv reader for file %s"
% (filename))
return
try:
header = next(dnsreader)
except Exception as e:
LOGGER.exception("Unable to iterate through csv file %s"
% (filename))
return
try:
if not self.check_header(header):
raise unicodecsv.Error('CSV header not found')
for row in dnsreader:
while self._pause:
if self._shutdown:
LOGGER.debug("Shutdown received while paused")
break
time.sleep(.5)
if self._shutdown:
LOGGER.debug("Shutdown received")
break
if row is None or not row:
LOGGER.warning("Skipping empty row in file %s"
% (filename))
continue
self.data_queue.put({'header': header, 'row': row})
except unicodecsv.Error as e:
def as_table(file, limit=None):
try:
sio = StringIO(file.data)
reader = DictReader(sio)
data = {'headers': None, 'rows': [], 'total': 0}
for i, row in enumerate(reader):
if data['headers'] is None:
data['headers'] = row.keys()
if limit is None or i < limit:
rd = [row.get(k) for k in data['headers']]
data['rows'].append(rd)
data['total'] = i
return data
except CSVError, e:
return {'status': 'error', 'error': unicode(e)}