Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_build_files_scan_with_non_default_relpath_ignore(self):
self.assertEqual(OrderedSet([
self.create_buildfile('grandparent/parent/BUILD'),
self.create_buildfile('grandparent/parent/BUILD.twitter'),
self.create_buildfile('grandparent/parent/child2/child3/BUILD'),
self.create_buildfile('grandparent/parent/child5/BUILD'),
]), self.scan_buildfiles('grandparent/parent', build_ignore_patterns=['**/parent/child1']))
).with_binaries(
foo_binary = pants(':foo_bin')
)
)
foo_bin = python_binary(
name = 'foo_bin',
entry_point = 'foo.bin.foo',
dependencies = [ pants(':foo_bin_dep') ]
)
foo_bin_dep = python_library(
name = 'foo_bin_dep'
)
assert SetupPy.minified_dependencies(foo) == OrderedSet([foo_bin, foo_bin_dep])
entry_points = dict(SetupPy.iter_entry_points(foo))
assert entry_points == {'foo_binary': 'foo.bin.foo'}
with self.run_execute(foo, recursive=False) as setup_py_command:
setup_py_command.run_one.assert_called_with(foo)
with self.run_execute(foo, recursive=True) as setup_py_command:
setup_py_command.run_one.assert_called_with(foo)
def collector(dep):
return OrderedSet([dep])
def _filter_existing_dirs(self, dir_candidates, compiler_exe):
real_dirs = OrderedSet()
for maybe_existing_dir in dir_candidates:
# Could use a `seen_dir_paths` set if we want to avoid pinging the fs for duplicate entries.
if is_readable_dir(maybe_existing_dir):
real_dirs.add(os.path.realpath(maybe_existing_dir))
else:
logger.debug("non-existent or non-accessible directory at {} while "
"parsing directories from {}"
.format(maybe_existing_dir, compiler_exe))
return list(real_dirs)
This is an additive operation: any existing connections involving these nodes are preserved.
"""
all_addresses = set()
new_targets = list()
# Index the ProductGraph.
for hydrated_target in hydrated_targets:
target_adaptor = hydrated_target.adaptor
address = target_adaptor.address
all_addresses.add(address)
if address not in self._target_by_address:
new_targets.append(self._index_target(target_adaptor))
# Once the declared dependencies of all targets are indexed, inject their
# additional "traversable_(dependency_)?specs".
deps_to_inject = OrderedSet()
addresses_to_inject = set()
def inject(target, dep_spec, is_dependency):
address = Address.parse(dep_spec, relative_to=target.address.spec_path)
if not any(address == t.address for t in target.dependencies):
addresses_to_inject.add(address)
if is_dependency:
deps_to_inject.add((target.address, address))
self.apply_injectables(new_targets)
for target in new_targets:
for spec in target.compute_dependency_specs(payload=target.payload):
inject(target, spec, is_dependency=True)
for spec in target.compute_injectable_specs(payload=target.payload):
inject(target, spec, is_dependency=False)
def _gather_dex_entries(self, target):
"""Gather relevant dex inputs from a walk of AndroidBinary's dependency graph.
The dx tool accepts 1) directories, 2) jars/zips, or 3) loose classfiles. The return value
will contain any or all of those.
"""
classpath_products = self.context.products.get_data('runtime_classpath')
unpacked_archives = self.context.products.get('unpacked_libraries')
gathered_entries = OrderedSet()
class_files = {}
def get_entries(tgt):
# We gather just internal classpath elements here. Unpacked external dependency classpath
# elements are gathered just below.
cp_entries = ClasspathUtil.internal_classpath((tgt,), classpath_products)
gathered_entries.update(cp_entries)
# Gather classes from the contents of unpacked libraries.
unpacked = unpacked_archives.get(tgt)
if unpacked:
# If there are unpacked_archives then we know this target is an AndroidLibrary.
for archives in unpacked.values():
for unpacked_dir in archives:
try:
gathered_entries.update(self._filter_unpacked_dir(tgt, unpacked_dir, class_files))
def iter_transitive_jars(jar_lib):
"""
:type jar_lib: :class:`pants.backend.jvm.targets.jar_library.JarLibrary`
:rtype: :class:`collections.Iterator` of
:class:`pants.java.jar.M2Coordinate`
"""
if classpath_products:
jar_products = classpath_products.get_artifact_classpath_entries_for_targets((jar_lib,))
for _, jar_entry in jar_products:
coordinate = jar_entry.coordinate
# We drop classifier and type_ since those fields are represented in the global
# libraries dict and here we just want the key into that dict (see `_jar_id`).
yield M2Coordinate(org=coordinate.org, name=coordinate.name, rev=coordinate.rev,
classifier=coordinate.classifier)
target_libraries = OrderedSet()
if isinstance(current_target, JarLibrary):
target_libraries = OrderedSet(iter_transitive_jars(current_target))
classes_dir = self.context.products.get_data('bloop_classes_dir').get(current_target, None)
if classes_dir is not None:
info['classes_dir'] = classes_dir.path
dep_classpath = self.context.products.get_data('bloop_dep_classpath').get(current_target, None)
if dep_classpath is not None:
info['dependency_classpath'] = dep_classpath
zinc_analysis = self.context.products.get_data('zinc_analysis').get(current_target, None)
if zinc_analysis is not None:
# TODO: what is z.jar used for? classes dir?
_classes_dir, _z_jar_file, analysis_file = zinc_analysis
info['zinc_analysis'] = analysis_file
Filters and adds paths from extra_classpath_tuples to the end of the resulting list.
:param targets: The targets to generate a classpath for.
:param ClasspathProducts classpath_products: Product containing classpath elements.
:param extra_classpath_tuples: Additional classpath entries as tuples of
(string, ClasspathEntry).
:param confs: The list of confs for use by this classpath.
:returns: The classpath entries as a list of path elements.
:rtype: list of ClasspathEntry
"""
classpath_iter = cls._classpath_iter(
classpath_products.get_classpath_entries_for_targets(targets),
confs=confs,
)
total_classpath = OrderedSet(classpath_iter)
filtered_extra_classpath_iter = cls._filtered_classpath_by_confs_iter(
extra_classpath_tuples,
confs,
)
extra_classpath_iter = cls._entries_iter(filtered_extra_classpath_iter)
total_classpath.update(extra_classpath_iter)
return list(total_classpath)
def _run_lint(self, target, args):
chroot = PythonChroot(target, self.root_dir, extra_targets=[
Target.get(Address.parse(self.root_dir, '3rdparty/python:pylint'))],
conn_timeout=self._conn_timeout)
chroot.builder.info.ignore_errors = True
builder = chroot.dump()
builder.info.entry_point = 'pylint.lint'
builder.freeze()
interpreter_args = [
'--rcfile=%s' % os.path.join(self.root_dir, 'build-support', 'pylint', 'pylint.rc')]
interpreter_args.extend(args or [])
sources = OrderedSet([])
target.walk(lambda trg: sources.update(
trg.sources if hasattr(trg, 'sources') and trg.sources is not None else []))
pex = PEX(builder.path())
pex.run(args=interpreter_args + list(sources), with_chroot=True)
def _resolve_distributions_by_platform(self, reqs, platforms):
deduped_reqs = OrderedSet(reqs)
find_links = OrderedSet()
for req in deduped_reqs:
self._log.debug(f' Dumping requirement: {req}')
self._builder.add_requirement(str(req.requirement))
if req.repository:
find_links.add(req.repository)
# Resolve the requirements into distributions.
distributions = self._resolve_multi(self._builder.interpreter, deduped_reqs, platforms,
find_links)
return distributions