Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_from_json_error_newline_key(self):
with self.assertRaises(ValueError):
table = Table.from_json('examples/test.json', newline=True, key='test') # noqa
def test_from_json_file_like_object(self):
table1 = Table(self.rows, self.column_names, self.column_types)
with open('examples/test.json') as f:
table2 = Table.from_json(f)
self.assertColumnNames(table2, self.column_names)
self.assertColumnTypes(table2, [Number, Text, Boolean, Date, DateTime, TimeDelta])
self.assertRows(table2, table1.rows)
def test_from_json_file_like_object(self):
table1 = Table(self.rows, self.column_names, self.column_types)
with open('examples/test.json') as f:
table2 = Table.from_json(f)
self.assertColumnNames(table2, self.column_names)
self.assertColumnTypes(table2, [Number, Text, Boolean, Date, DateTime, TimeDelta])
self.assertRows(table2, table1.rows)
def test_from_json_with_key(self):
table1 = Table(self.rows, self.column_names, self.column_types)
table2 = Table.from_json('examples/test_key.json', key='data')
self.assertColumnNames(table2, self.column_names)
self.assertColumnTypes(table2, [Number, Text, Boolean, Date, DateTime, TimeDelta])
self.assertRows(table2, table1.rows)
def test_from_json_nested(self):
table = Table.from_json('examples/test_nested.json')
self.assertColumnNames(table, ['one', 'two/two_a', 'two/two_b', 'three/0', 'three/1', 'three/2'])
self.assertColumnTypes(table, [Number, Text, Text, Text, Number, Text])
self.assertRows(table, [
[1, 'a', 'b', 'a', 2, 'c'],
[2, 'c', 'd', 'd', 2, 'f']
])
def test_from_json_mixed_keys(self):
table = Table.from_json('examples/test_mixed.json')
self.assertColumnNames(table, ['one', 'two', 'three', 'four', 'five'])
self.assertColumnTypes(table, [Number, Number, Text, Text, Number])
self.assertRows(table, [
[1, 4, 'a', None, None],
[2, 3, 'b', 'd', None],
[None, 2, u'👍', None, 5]
])
def test_from_json_no_type_tester(self):
tester = TypeTester(limit=0)
table = Table.from_json('examples/test.json', column_types=tester)
self.assertColumnTypes(table, [Text, Text, Text, Text, Text, Text])
def test_from_json_mixed_keys(self):
table = Table.from_json('examples/test_mixed.json')
self.assertColumnNames(table, ['one', 'two', 'three', 'four', 'five'])
self.assertColumnTypes(table, [Number, Number, Text, Text, Number])
self.assertRows(table, [
[1, 4, 'a', None, None],
[2, 3, 'b', 'd', None],
[None, 2, u'👍', None, 5]
])
# Convert the file.
if filetype == 'csv' and self.args.no_inference and not self.args.no_header_row and not self.args.skip_lines and self.args.sniff_limit == 0:
reader = agate.csv.reader(self.input_file, **self.reader_kwargs)
writer = agate.csv.writer(self.output_file, **self.writer_kwargs)
writer.writerows(reader)
elif filetype == 'fixed':
self.output_file.write(fixed2csv(self.input_file, schema, output=self.output_file, **kwargs))
elif filetype == 'geojson':
self.output_file.write(geojson2csv(self.input_file, **kwargs))
elif filetype in ('csv', 'dbf', 'json', 'ndjson', 'xls', 'xlsx'):
if filetype == 'csv':
table = agate.Table.from_csv(self.input_file, **kwargs)
elif filetype == 'json':
table = agate.Table.from_json(self.input_file, key=self.args.key, **kwargs)
elif filetype == 'ndjson':
table = agate.Table.from_json(self.input_file, key=self.args.key, newline=True, **kwargs)
elif filetype == 'xls':
table = agate.Table.from_xls(self.input_file, sheet=self.args.sheet, encoding_override=self.args.encoding_xls, **kwargs)
elif filetype == 'xlsx':
table = agate.Table.from_xlsx(self.input_file, sheet=self.args.sheet, **kwargs)
elif filetype == 'dbf':
if not hasattr(self.input_file, 'name'):
raise ValueError('DBF files can not be converted from stdin. You must pass a filename.')
table = agate.Table.from_dbf(self.input_file.name, **kwargs)
table.to_csv(self.output_file, **self.writer_kwargs)
if self.args.write_sheets:
# Close and re-open the file, as the file object has been mutated or closed.
self.input_file.close()
self.input_file = self.open_excel_input_file(path)
"""
Convert a file of a specified format to CSV.
"""
if format == 'fixed':
if not schema:
raise ValueError('schema must not be null when format is "fixed"')
output.write(fixed2csv(f, schema, output=output, **kwargs))
elif format == 'geojson':
output.write(geojson2csv(f, **kwargs))
elif format in ('csv', 'dbf', 'json', 'ndjson', 'xls', 'xlsx'):
if format == 'csv':
table = agate.Table.from_csv(f, **kwargs)
elif format == 'json':
table = agate.Table.from_json(f, key=key, **kwargs)
elif format == 'ndjson':
table = agate.Table.from_json(f, key=key, newline=True, **kwargs)
elif format == 'xls':
table = agate.Table.from_xls(f, sheet=kwargs.get('sheet', None))
elif format == 'xlsx':
table = agate.Table.from_xlsx(f, sheet=kwargs.get('sheet', None))
elif format == 'dbf':
with dbf.Table(f.name) as db:
column_names = db.field_names
table = agate.Table(db, column_names)
table.to_csv(output)
else:
raise ValueError('format "%s" is not supported' % format)