Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def checkFlakesPath(filename, reporter):
"""Check the given path, printing out any warnings detected."""
try:
with open(filename, encoding='utf-8') as f:
codestr = f.read() + '\n'
except UnicodeError:
reporter.unexpectedError(filename, 'problem decoding source')
return
except IOError:
msg = sys.exc_info()[1]
reporter.unexpectedError(filename, msg.args[1])
return
checkFlakes(codestr.encode('utf-8'), filename, reporter)
class CustomReport(pep8.StandardReport):
"""Collect report, and overload the string operator."""
results = []
def get_file_results(self):
"""Overload this function to collect the report."""
if self._deferred_print:
self._deferred_print.sort()
for line_number, offset, code, text, _ in self._deferred_print:
self.results.append({
'path': self.filename,
'row': self.line_offset + line_number,
'col': offset + 1,
'code': code,
'text': text,
})
parser = kwargs.pop('parser', None)
# build options from dict
options_dict = dict(*args, **kwargs)
arglist = None if parse_argv else options_dict.get('paths', None)
options, self.paths = process_options(
arglist, parse_argv, config_file, parser)
if options_dict:
options.__dict__.update(options_dict)
if 'paths' in options_dict:
self.paths = options_dict['paths']
self.runner = self.input_file
self.options = options
if not options.reporter:
options.reporter = BaseReport if options.quiet else StandardReport
options.select = tuple(options.select or ())
if not (options.select or options.ignore or
options.testsuite or options.doctest) and DEFAULT_IGNORE:
# The default choice: ignore controversial checks
options.ignore = tuple(DEFAULT_IGNORE.split(','))
else:
# Ignore all checks which are not explicitly selected
options.ignore = ('',) if options.select else tuple(options.ignore)
options.benchmark_keys = BENCHMARK_KEYS[:]
options.ignore_code = self.ignore_code
options.physical_checks = self.get_checks('physical_line')
options.logical_checks = self.get_checks('logical_line')
options.ast_checks = self.get_checks('tree')
self.init_report()
parser = kwargs.pop('parser', None)
# build options from dict
options_dict = dict(*args, **kwargs)
arglist = None if parse_argv else options_dict.get('paths', None)
options, self.paths = process_options(
arglist, parse_argv, config_file, parser)
if options_dict:
options.__dict__.update(options_dict)
if 'paths' in options_dict:
self.paths = options_dict['paths']
self.runner = self.input_file
self.options = options
if not options.reporter:
options.reporter = BaseReport if options.quiet else StandardReport
options.select = tuple(options.select or ())
if not (options.select or options.ignore or
options.testsuite or options.doctest) and DEFAULT_IGNORE:
# The default choice: ignore controversial checks
options.ignore = tuple(DEFAULT_IGNORE.split(','))
else:
# Ignore all checks which are not explicitly selected
options.ignore = ('',) if options.select else tuple(options.ignore)
options.benchmark_keys = BENCHMARK_KEYS[:]
options.ignore_code = self.ignore_code
options.physical_checks = self.get_checks('physical_line')
options.logical_checks = self.get_checks('logical_line')
options.ast_checks = self.get_checks('tree')
self.init_report()
'counters': self.counters,
'messages': self.messages}
def update_state(self, state):
self.total_errors += state['total_errors']
for key, value in state['counters'].items():
self.counters[key] += value
self.messages.update(state['messages'])
class FileQReport(BaseQReport):
"""File Queue Report."""
print_filename = True
class QueueReport(pep8.StandardReport, BaseQReport):
"""Standard Queue Report."""
def get_file_results(self):
"""Print the result and return the overall count for this file."""
self._deferred_print.sort()
for line_number, offset, code, text, doc in self._deferred_print:
print(self._fmt % {
'path': self.filename,
'row': self.line_offset + line_number, 'col': offset + 1,
'code': code, 'text': text,
})
# stdout is block buffered when not stdout.isatty().
# line can be broken where buffer boundary since other processes
# write to same file.
# flush() after print() to avoid buffer boundary.
def init_file(self, filename, lines, expected, line_offset):
"""Signal a new file."""
self._deferred_print = []
return super(StandardReport, self).init_file(
filename, lines, expected, line_offset)
def _retry_serial(self, func, *args, **kwargs):
"""This will retry the passed function in serial if necessary.
In the event that we encounter an OSError with an errno in
:attr:`serial_retry_errors`, this function will retry this function
using pep8's default Report class which operates in serial.
"""
try:
return func(*args, **kwargs)
except OSError as oserr:
if oserr.errno in self.serial_retry_errors:
self.init_report(pep8.StandardReport)
else:
raise
return func(*args, **kwargs)
# thanks pinocchio nose plugin for this code
color_end = '\x1b[1;0m'
colors = dict(green='\x1b[1;32m', red='\x1b[1;31m', yellow='\x1b[1;33m')
def in_color(color, text):
'''
Colorize text, adding color to each line so that the color shows up
correctly with the less -R as well as more and normal shell.
'''
return ''.join('%s%s%s' % (colors[color], line, color_end)
for line in text.splitlines(True))
class TissueReport(pep8.StandardReport):
'''
pep8's Standard report with a get_file_results that accepts an optional
stream arg to output to.
'''
def get_file_results(self, stream=sys.stdout):
'''
Write the result to stream and return the overall count for this file.
'''
self._deferred_print.sort()
for line_number, offset, code, text, doc in self._deferred_print:
stream.write(self._fmt % {
'path': self.filename,
'row': self.line_offset + line_number, 'col': offset + 1,
'code': code, 'text': text,
} + '\n')
def init_file(self, filename, lines, expected, line_offset):
"""Signal a new file."""
self._deferred_print = []
return super(StandardReport, self).init_file(
filename, lines, expected, line_offset)
'row': self.line_offset + line_number, 'col': offset + 1,
'code': code, 'text': text,
})
if self._show_source:
if line_number > len(self.lines):
line = ''
else:
line = self.lines[line_number - 1]
print(line.rstrip())
print(re.sub(r'\S', ' ', line[:offset]) + '^')
if self._show_pep8 and doc:
print(' ' + doc.strip())
return self.file_errors
class DiffReport(StandardReport):
"""Collect and print the results for the changed lines only."""
def __init__(self, options):
super(DiffReport, self).__init__(options)
self._selected = options.selected_lines
def error(self, line_number, offset, text, check):
if line_number not in self._selected[self.filename]:
return
return super(DiffReport, self).error(line_number, offset, text, check)
class StyleGuide(object):
"""Initialize a PEP-8 instance with few options."""
def __init__(self, *args, **kwargs):
'counters': self.counters,
'messages': self.messages}
def update_state(self, state):
self.total_errors += state['total_errors']
for key, value in state['counters'].items():
self.counters[key] += value
self.messages.update(state['messages'])
class FileQReport(BaseQReport):
"""File Queue Report."""
print_filename = True
class QueueReport(pep8.StandardReport, BaseQReport):
"""Standard Queue Report."""
def get_file_results(self):
"""Print the result and return the overall count for this file."""
self._deferred_print.sort()
for line_number, offset, code, text, doc in self._deferred_print:
print(self._fmt % {
'path': self.filename,
'row': self.line_offset + line_number, 'col': offset + 1,
'code': code, 'text': text,
})
# stdout is block buffered when not stdout.isatty().
# line can be broken where buffer boundary since other processes
# write to same file.
# flush() after print() to avoid buffer boundary.