Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_no_header(self):
fcr = FilteringCSVReader(iter(self.tab1), patterns={2: 'only'}, header=False)
self.assertEqual(self.tab1[2], next(fcr))
self.assertEqual(self.tab1[3], next(fcr))
try:
next(fcr)
self.fail("Should be no more rows left.")
except StopIteration:
pass
def test_index_out_of_range(self):
fcr = FilteringCSVReader(iter(self.tab2), patterns={3: '0'})
self.assertEqual(self.tab2[0], next(fcr))
self.assertEqual(self.tab2[4], next(fcr))
try:
next(fcr)
self.fail("Should be no more rows left.")
except StopIteration:
pass
def test_any_match(self):
fcr = FilteringCSVReader(iter(self.tab2), patterns={'age': 'only', 0: '2'}, any_match=True)
self.assertEqual(self.tab2[0], next(fcr))
self.assertEqual(self.tab2[2], next(fcr))
self.assertEqual(self.tab2[4], next(fcr))
try:
next(fcr)
self.fail("Should be no more rows left.")
except StopIteration:
pass
def test_inverse(self):
fcr = FilteringCSVReader(iter(self.tab2), patterns=['1'], inverse=True)
self.assertEqual(self.tab2[0], next(fcr))
self.assertEqual(self.tab2[2], next(fcr))
self.assertEqual(self.tab2[4], next(fcr))
try:
next(fcr)
self.fail("Should be no more rows left.")
except StopIteration:
pass
def test_regex(self):
pattern = re.compile(".*(Reader|Tribune).*")
fcr = FilteringCSVReader(iter(self.tab1), patterns={1: pattern})
self.assertEqual(self.tab1[0], next(fcr))
self.assertEqual(self.tab1[1], next(fcr))
self.assertEqual(self.tab1[3], next(fcr))
self.assertEqual(self.tab1[4], next(fcr))
try:
next(fcr)
self.fail("Should be no more rows left.")
except StopIteration:
pass
def test_multiline(self):
table = [
['a', 'b'],
['1', 'foo\nbar']
]
fcr = FilteringCSVReader(iter(table), patterns={'b': re.compile('bar')})
self.assertEqual(table[0], next(fcr))
self.assertEqual(table[1], next(fcr))
try:
next(fcr)
self.fail("Should be no more rows left.")
except StopIteration:
pass
def test_any_match_and_inverse(self):
fcr = FilteringCSVReader(iter(self.tab2), patterns={'age': 'only', 0: '2'}, any_match=True, inverse=True)
self.assertEqual(self.tab2[0], next(fcr))
self.assertEqual(self.tab2[1], next(fcr))
self.assertEqual(self.tab2[3], next(fcr))
try:
next(fcr)
self.fail("Should be no more rows left.")
except StopIteration:
pass
def test_pattern(self):
fcr = FilteringCSVReader(iter(self.tab1), patterns=['1'])
self.assertEqual(self.tab1[0], next(fcr))
self.assertEqual(self.tab1[1], next(fcr))
self.assertEqual(self.tab1[4], next(fcr))
try:
next(fcr)
self.fail("Should be no more rows left.")
except StopIteration:
pass
def __init__(self, reader, patterns, header=True, any_match=False, inverse=False):
super(FilteringCSVReader, self).__init__()
self.reader = reader
self.header = header
if self.header:
self.column_names = next(reader)
self.any_match = any_match
self.inverse = inverse
self.patterns = standardize_patterns(self.column_names, patterns)
reader_kwargs['line_numbers'] = True
rows, column_names, column_ids = self.get_rows_and_column_names_and_column_ids(**reader_kwargs)
if self.args.regex:
pattern = re.compile(self.args.regex)
elif self.args.matchfile:
lines = set(line.rstrip() for line in self.args.matchfile)
def pattern(x):
return x in lines
else:
pattern = self.args.pattern
patterns = dict((column_id, pattern) for column_id in column_ids)
filter_reader = FilteringCSVReader(rows, header=False, patterns=patterns, inverse=self.args.inverse)
output = agate.csv.writer(self.output_file, **writer_kwargs)
output.writerow(column_names)
for row in filter_reader:
output.writerow(row)