Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
config.args = parser.parse_args()
config.args.checkdir = os.path.abspath(os.path.expanduser(config.args.checkdir))
identifier = config.args.identifier[0]
files = config.args.files
if config.args.offline:
config.args.local = True
if not config.args.local:
try:
# Submit to check50 repo.
import submit50
except ImportError:
raise InternalError(
"submit50 is not installed. Install submit50 and run check50 again.")
else:
submit50.handler.type = "check"
signal.signal(signal.SIGINT, submit50.handler)
submit50.run.verbose = config.args.verbose
username, commit_hash = submit50.submit("check50", identifier)
# Wait until payload comes back with check data.
print("Checking...", end="")
sys.stdout.flush()
pings = 0
while True:
# Terminate if no response.
if pings > 45:
try:
# Import module from file path directly.
module = imp.load_source(slug, os.path.join(config.check_dir, "__init__.py"))
# Ensure that there is exactly one class decending from Checks defined in this package.
checks, = (cls for _, cls in inspect.getmembers(module, inspect.isclass)
if hasattr(cls, "_Checks__sentinel")
and cls.__module__.startswith(slug))
except (OSError, IOError) as e:
if e.errno != errno.ENOENT:
raise
except ValueError:
pass
else:
return checks
raise InternalError("invalid identifier")
config.check_dir = os.path.join(checks_root, slug.replace("/", os.sep), "check50")
if not config.args.offline:
if os.path.exists(checks_root):
command = ["git", "-C", checks_root, "pull", "origin", "master"]
else:
command = ["git", "clone", "https://github.com/{}/{}".format(org, repo), checks_root]
# Can't use subprocess.DEVNULL because it requires python 3.3.
stdout = stderr = None if config.args.verbose else open(os.devnull, "wb")
# Update checks via git.
try:
subprocess.check_call(command, stdout=stdout, stderr=stderr)
except subprocess.CalledProcessError:
raise InternalError("failed to clone checks")
# Install any dependencies from requirements.txt either in the root of the
# repository or in the directory of the specific check.
for dir in [checks_root, os.path.dirname(config.check_dir)]:
requirements = os.path.join(dir, "requirements.txt")
if os.path.exists(requirements):
args = ["install", "--upgrade", "-r", requirements]
# If we are not in a virtualenv, we need --user
if not hasattr(sys, "real_prefix"):
args.append("--user")
if not config.args.verbose:
args += ["--quiet"] * 3
try:
subprocess.check_call([sys.executable, "-m", "pip"] + args)
# repository or in the directory of the specific check.
for dir in [checks_root, os.path.dirname(config.check_dir)]:
requirements = os.path.join(dir, "requirements.txt")
if os.path.exists(requirements):
args = ["install", "--upgrade", "-r", requirements]
# If we are not in a virtualenv, we need --user
if not hasattr(sys, "real_prefix"):
args.append("--user")
if not config.args.verbose:
args += ["--quiet"] * 3
try:
subprocess.check_call([sys.executable, "-m", "pip"] + args)
except subprocess.CalledProcessError:
raise InternalError("failed to install dependencies in ({})".format(
requirements[len(config.args.checkdir) + 1:]))
# Refresh sys.path to look for newly installed dependencies
reload(site)
try:
# Import module from file path directly.
module = imp.load_source(slug, os.path.join(config.check_dir, "__init__.py"))
# Ensure that there is exactly one class decending from Checks defined in this package.
checks, = (cls for _, cls in inspect.getmembers(module, inspect.isclass)
if hasattr(cls, "_Checks__sentinel")
and cls.__module__.startswith(slug))
except (OSError, IOError) as e:
if e.errno != errno.ENOENT:
raise
def valgrind(func):
if config.test_cases[-1] == func.__name__:
frame = traceback.extract_stack(limit=2)[0]
raise InternalError("invalid check in {} on line {} of {}:\n"
"@valgrind must be placed below @check"
.format(frame.name, frame.lineno, frame.filename))
@wraps(func)
def wrapper(self):
if not which("valgrind"):
raise Error("valgrind not installed", result=Checks.SKIP)
self._valgrind = True
try:
func(self)
self._check_valgrind()
finally:
self._valgrind = False
return wrapper
def excepthook(cls, exc, tb):
cleanup()
# Class is a BaseException, better just quit.
if not issubclass(cls, Exception):
print()
return
if cls is InternalError:
cprint(exc.msg, "red", file=sys.stderr)
elif any(issubclass(cls, err) for err in [IOError, OSError]) and exc.errno == errno.ENOENT:
cprint("{} not found".format(exc.filename), "red", file=sys.stderr)
else:
cprint("Sorry, something's wrong! Let sysadmins@cs50.harvard.edu know!", "red", file=sys.stderr)
if config.args.verbose:
traceback.print_exception(cls, exc, tb)