Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import six
import unicodecsv
from rows.plugins.utils import (
create_table,
ipartition,
serialize,
)
from rows.utils import Source
sniffer = unicodecsv.Sniffer()
# Some CSV files have more than 128kB of data in a cell, so we force this value
# to be greater (16MB).
# TODO: check if it impacts in memory usage.
# TODO: may add option to change it by passing a parameter to import/export.
unicodecsv.field_size_limit(16777216)
def fix_dialect(dialect):
if not dialect.doublequote and dialect.escapechar is None:
dialect.doublequote = True
if dialect.quoting == unicodecsv.QUOTE_MINIMAL and dialect.quotechar == "'":
# Python csv's Sniffer seems to detect a wrong quotechar when
# quoting is minimal
dialect.quotechar = '"'
class excel_semicolon(unicodecsv.excel):
delimiter = ';'
unicodecsv.register_dialect("excel-semicolon", excel_semicolon)
def export_version_csv(index, version_list):
assert isinstance(index, AbstractIndex)
assert isinstance(version_list, list) or isinstance(version_list, VersionSet)
assert all(isinstance(v, Version) for v in version_list)
csv.field_size_limit(sys.maxsize)
output = io.BytesIO()
writer = csv.writer(output)
# write header data
writer.writerow(["Index Title"] + [index.title for _ in version_list])
writer.writerow(["Version Title"] + [v.versionTitle for v in version_list])
writer.writerow(["Language"] + [v.language for v in version_list])
writer.writerow(["Version Source"] + [v.versionSource for v in version_list])
writer.writerow(["Version Notes"] + [getattr(v, "versionNotes", "") for v in version_list])
section_refs = index.all_section_refs()
for section_ref in section_refs:
segment_refs = section_ref.all_subrefs()
seg_vers = {}
def import_versions_from_file(csv_filename, columns):
"""
Import the versions in the columns listed in `columns`
:param columns: zero-based list of column numbers with a new version in them
:return:
"""
csv.field_size_limit(sys.maxsize)
with open(csv_filename, 'rb') as csvfile:
reader = csv.reader(csvfile)
rows = [row for row in reader]
return _import_versions_from_csv(rows, columns)
def import_versions_from_stream(csv_stream, columns, user_id):
csv.field_size_limit(sys.maxsize)
reader = csv.reader(csv_stream)
rows = [row for row in reader]
return _import_versions_from_csv(rows, columns, user_id)
import sys
import gzip
import io
PY2 = (sys.version_info[0] == 2)
if PY2:
from pipes import quote
import unicodecsv, subprocess32, uritemplate
unicodecsv.field_size_limit(sys.maxsize)
check_output = subprocess32.check_output
CalledProcessError = subprocess32.CalledProcessError
TimeoutExpired = subprocess32.TimeoutExpired
csvIO = io.BytesIO
def csvreader(file, encoding=None, **kwargs):
''' Pass encoding to unicodecsv
'''
if encoding is not None:
kwargs['encoding'] = encoding
if 'delimiter' in kwargs:
kwargs['delimiter'] = str(kwargs['delimiter'])
def export_merged_csv(index, lang=None):
assert isinstance(index, Index)
assert lang in ["en", "he"]
csv.field_size_limit(sys.maxsize)
output = io.BytesIO()
writer = csv.writer(output)
# write header data
writer.writerow(["Index Title"] + [index.title])
writer.writerow(["Version Title"] + ["merged"])
writer.writerow(["Language"] + [lang])
writer.writerow(["Version Source"] + ["-"])
writer.writerow(["Version Notes"] + ["-"])
section_refs = index.all_section_refs()
for section_ref in section_refs:
segment_refs = section_ref.all_subrefs()
seg_vers = {}