808 lines
32 KiB
Python
808 lines
32 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
from __future__ import absolute_import, print_function
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
|
|
|
|
os.environ["PIP_PYTHON_PATH"] = str(sys.executable)
|
|
|
|
|
|
def find_site_path(pkg, site_dir=None):
|
|
import pkg_resources
|
|
if site_dir is not None:
|
|
site_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
working_set = pkg_resources.WorkingSet([site_dir] + sys.path[:])
|
|
for dist in working_set:
|
|
root = dist.location
|
|
base_name = dist.project_name if dist.project_name else dist.key
|
|
name = None
|
|
if "top_level.txt" in dist.metadata_listdir(""):
|
|
name = next(iter([l.strip() for l in dist.get_metadata_lines("top_level.txt") if l is not None]), None)
|
|
if name is None:
|
|
name = pkg_resources.safe_name(base_name).replace("-", "_")
|
|
if not any(pkg == _ for _ in [base_name, name]):
|
|
continue
|
|
path_options = [name, "{0}.py".format(name)]
|
|
path_options = [os.path.join(root, p) for p in path_options if p is not None]
|
|
path = next(iter(p for p in path_options if os.path.exists(p)), None)
|
|
if path is not None:
|
|
return (dist, path)
|
|
return (None, None)
|
|
|
|
|
|
def _patch_path(pipenv_site=None):
|
|
import site
|
|
pipenv_libdir = os.path.dirname(os.path.abspath(__file__))
|
|
pipenv_site_dir = os.path.dirname(pipenv_libdir)
|
|
pipenv_dist = None
|
|
if pipenv_site is not None:
|
|
pipenv_dist, pipenv_path = find_site_path("pipenv", site_dir=pipenv_site)
|
|
else:
|
|
pipenv_dist, pipenv_path = find_site_path("pipenv", site_dir=pipenv_site_dir)
|
|
if pipenv_dist is not None:
|
|
pipenv_dist.activate()
|
|
else:
|
|
site.addsitedir(next(iter(
|
|
sitedir for sitedir in (pipenv_site, pipenv_site_dir)
|
|
if sitedir is not None
|
|
), None))
|
|
if pipenv_path is not None:
|
|
pipenv_libdir = pipenv_path
|
|
for _dir in ("vendor", "patched", pipenv_libdir):
|
|
sys.path.insert(0, os.path.join(pipenv_libdir, _dir))
|
|
|
|
|
|
def get_parser():
|
|
from argparse import ArgumentParser
|
|
parser = ArgumentParser("pipenv-resolver")
|
|
parser.add_argument("--pre", action="store_true", default=False)
|
|
parser.add_argument("--clear", action="store_true", default=False)
|
|
parser.add_argument("--verbose", "-v", action="count", default=False)
|
|
parser.add_argument("--dev", action="store_true", default=False)
|
|
parser.add_argument("--debug", action="store_true", default=False)
|
|
parser.add_argument("--system", action="store_true", default=False)
|
|
parser.add_argument("--parse-only", action="store_true", default=False)
|
|
parser.add_argument("--pipenv-site", metavar="pipenv_site_dir", action="store",
|
|
default=os.environ.get("PIPENV_SITE_DIR"))
|
|
parser.add_argument("--requirements-dir", metavar="requirements_dir", action="store",
|
|
default=os.environ.get("PIPENV_REQ_DIR"))
|
|
parser.add_argument("--write", metavar="write", action="store",
|
|
default=os.environ.get("PIPENV_RESOLVER_FILE"))
|
|
parser.add_argument("packages", nargs="*")
|
|
return parser
|
|
|
|
|
|
def which(*args, **kwargs):
|
|
return sys.executable
|
|
|
|
|
|
def handle_parsed_args(parsed):
|
|
if parsed.debug:
|
|
parsed.verbose = max(parsed.verbose, 2)
|
|
if parsed.verbose > 1:
|
|
logging.getLogger("notpip").setLevel(logging.DEBUG)
|
|
elif parsed.verbose > 0:
|
|
logging.getLogger("notpip").setLevel(logging.INFO)
|
|
os.environ["PIPENV_VERBOSITY"] = str(parsed.verbose)
|
|
if "PIPENV_PACKAGES" in os.environ:
|
|
parsed.packages += os.environ.get("PIPENV_PACKAGES", "").strip().split("\n")
|
|
return parsed
|
|
|
|
|
|
class Entry(object):
|
|
"""A resolved entry from a resolver run"""
|
|
|
|
def __init__(self, name, entry_dict, project, resolver, reverse_deps=None, dev=False):
|
|
super(Entry, self).__init__()
|
|
from pipenv.vendor.requirementslib.models.utils import tomlkit_value_to_python
|
|
self.name = name
|
|
if isinstance(entry_dict, dict):
|
|
self.entry_dict = self.clean_initial_dict(entry_dict)
|
|
else:
|
|
self.entry_dict = entry_dict
|
|
self.project = project
|
|
section = "develop" if dev else "default"
|
|
pipfile_section = "dev-packages" if dev else "packages"
|
|
self.dev = dev
|
|
self.pipfile = tomlkit_value_to_python(
|
|
project.parsed_pipfile.get(pipfile_section, {})
|
|
)
|
|
self.lockfile = project.lockfile_content.get(section, {})
|
|
self.pipfile_dict = self.pipfile.get(self.pipfile_name, {})
|
|
if self.dev and self.name in project.lockfile_content.get("default", {}):
|
|
self.lockfile_dict = project.lockfile_content["default"][name]
|
|
else:
|
|
self.lockfile_dict = self.lockfile.get(name, entry_dict)
|
|
self.resolver = resolver
|
|
self.reverse_deps = reverse_deps
|
|
self._original_markers = None
|
|
self._markers = None
|
|
self._entry = None
|
|
self._lockfile_entry = None
|
|
self._pipfile_entry = None
|
|
self._parent_deps = []
|
|
self._flattened_parents = []
|
|
self._requires = None
|
|
self._deptree = None
|
|
self._parents_in_pipfile = []
|
|
|
|
@staticmethod
|
|
def make_requirement(name=None, entry=None, from_ireq=False):
|
|
from pipenv.vendor.requirementslib.models.requirements import Requirement
|
|
if from_ireq:
|
|
return Requirement.from_ireq(entry)
|
|
return Requirement.from_pipfile(name, entry)
|
|
|
|
@classmethod
|
|
def clean_initial_dict(cls, entry_dict):
|
|
if not entry_dict.get("version", "").startswith("=="):
|
|
entry_dict["version"] = cls.clean_specifier(entry_dict.get("version", ""))
|
|
if "name" in entry_dict:
|
|
del entry_dict["name"]
|
|
return entry_dict
|
|
|
|
@classmethod
|
|
def parse_pyparsing_exprs(cls, expr_iterable):
|
|
from pipenv.vendor.pyparsing import Literal, MatchFirst
|
|
keys = []
|
|
expr_list = []
|
|
expr = expr_iterable.copy()
|
|
if isinstance(expr, Literal) or (
|
|
expr.__class__.__name__ == Literal.__name__
|
|
):
|
|
keys.append(expr.match)
|
|
elif isinstance(expr, MatchFirst) or (
|
|
expr.__class__.__name__ == MatchFirst.__name__
|
|
):
|
|
expr_list = expr.exprs
|
|
elif isinstance(expr, list):
|
|
expr_list = expr
|
|
if expr_list:
|
|
for part in expr_list:
|
|
keys.extend(cls.parse_pyparsing_exprs(part))
|
|
return keys
|
|
|
|
@classmethod
|
|
def get_markers_from_dict(cls, entry_dict):
|
|
from pipenv.vendor.packaging import markers as packaging_markers
|
|
from pipenv.vendor.requirementslib.models.markers import normalize_marker_str
|
|
marker_keys = cls.parse_pyparsing_exprs(packaging_markers.VARIABLE)
|
|
markers = set()
|
|
keys_in_dict = [k for k in marker_keys if k in entry_dict]
|
|
markers = {
|
|
normalize_marker_str("{k} {v}".format(k=k, v=entry_dict.pop(k)))
|
|
for k in keys_in_dict
|
|
}
|
|
if "markers" in entry_dict:
|
|
markers.add(normalize_marker_str(entry_dict["markers"]))
|
|
if None in markers:
|
|
markers.remove(None)
|
|
if markers:
|
|
entry_dict["markers"] = " and ".join(list(markers))
|
|
else:
|
|
markers = None
|
|
return markers, entry_dict
|
|
|
|
@property
|
|
def markers(self):
|
|
self._markers, self.entry_dict = self.get_markers_from_dict(self.entry_dict)
|
|
return self._markers
|
|
|
|
@markers.setter
|
|
def markers(self, markers):
|
|
if not markers:
|
|
marker_str = self.marker_to_str(markers)
|
|
if marker_str:
|
|
self._entry = self.entry.merge_markers(marker_str)
|
|
self._markers = self.marker_to_str(self._entry.markers)
|
|
entry_dict = self.entry_dict.copy()
|
|
entry_dict["markers"] = self.marker_to_str(self._entry.markers)
|
|
self.entry_dict = entry_dict
|
|
|
|
@property
|
|
def original_markers(self):
|
|
original_markers, lockfile_dict = self.get_markers_from_dict(
|
|
self.lockfile_dict
|
|
)
|
|
self.lockfile_dict = lockfile_dict
|
|
self._original_markers = self.marker_to_str(original_markers)
|
|
return self._original_markers
|
|
|
|
@staticmethod
|
|
def marker_to_str(marker):
|
|
from pipenv.vendor.requirementslib.models.markers import normalize_marker_str
|
|
if not marker:
|
|
return None
|
|
from pipenv.vendor import six
|
|
from pipenv.vendor.vistir.compat import Mapping
|
|
marker_str = None
|
|
if isinstance(marker, Mapping):
|
|
marker_dict, _ = Entry.get_markers_from_dict(marker)
|
|
if marker_dict:
|
|
marker_str = "{0}".format(marker_dict.popitem()[1])
|
|
elif isinstance(marker, (list, set, tuple)):
|
|
marker_str = " and ".join([normalize_marker_str(m) for m in marker if m])
|
|
elif isinstance(marker, six.string_types):
|
|
marker_str = "{0}".format(normalize_marker_str(marker))
|
|
if isinstance(marker_str, six.string_types):
|
|
return marker_str
|
|
return None
|
|
|
|
def get_cleaned_dict(self, keep_outdated=False):
|
|
if keep_outdated and self.is_updated:
|
|
self.validate_constraints()
|
|
self.ensure_least_updates_possible()
|
|
elif not keep_outdated:
|
|
self.validate_constraints()
|
|
if self.entry.extras != self.lockfile_entry.extras:
|
|
entry_extras = list(self.entry.extras)
|
|
if self.lockfile_entry.extras:
|
|
entry_extras.extend(list(self.lockfile_entry.extras))
|
|
self._entry.req.extras = entry_extras
|
|
self.entry_dict["extras"] = self.entry.extras
|
|
if self.original_markers and not self.markers:
|
|
original_markers = self.marker_to_str(self.original_markers)
|
|
self.markers = original_markers
|
|
self.entry_dict["markers"] = self.marker_to_str(original_markers)
|
|
entry_hashes = set(self.entry.hashes)
|
|
locked_hashes = set(self.lockfile_entry.hashes)
|
|
if entry_hashes != locked_hashes and not self.is_updated:
|
|
self.entry_dict["hashes"] = list(entry_hashes | locked_hashes)
|
|
self.entry_dict["name"] = self.name
|
|
if "version" in self.entry_dict:
|
|
self.entry_dict["version"] = self.strip_version(self.entry_dict["version"])
|
|
_, self.entry_dict = self.get_markers_from_dict(self.entry_dict)
|
|
return self.entry_dict
|
|
|
|
@property
|
|
def lockfile_entry(self):
|
|
if self._lockfile_entry is None:
|
|
self._lockfile_entry = self.make_requirement(self.name, self.lockfile_dict)
|
|
return self._lockfile_entry
|
|
|
|
@lockfile_entry.setter
|
|
def lockfile_entry(self, entry):
|
|
self._lockfile_entry = entry
|
|
|
|
@property
|
|
def pipfile_entry(self):
|
|
if self._pipfile_entry is None:
|
|
self._pipfile_entry = self.make_requirement(self.pipfile_name, self.pipfile_dict)
|
|
return self._pipfile_entry
|
|
|
|
@property
|
|
def entry(self):
|
|
if self._entry is None:
|
|
self._entry = self.make_requirement(self.name, self.entry_dict)
|
|
return self._entry
|
|
|
|
@property
|
|
def normalized_name(self):
|
|
return self.entry.normalized_name
|
|
|
|
@property
|
|
def pipfile_name(self):
|
|
return self.project.get_package_name_in_pipfile(self.name, dev=self.dev)
|
|
|
|
@property
|
|
def is_in_pipfile(self):
|
|
return True if self.pipfile_name else False
|
|
|
|
@property
|
|
def pipfile_packages(self):
|
|
return self.project.pipfile_package_names["dev" if self.dev else "default"]
|
|
|
|
def create_parent(self, name, specifier="*"):
|
|
parent = self.create(name, specifier, self.project, self.resolver,
|
|
self.reverse_deps, self.dev)
|
|
parent._deptree = self.deptree
|
|
return parent
|
|
|
|
@property
|
|
def deptree(self):
|
|
if not self._deptree:
|
|
self._deptree = self.project.environment.get_package_requirements()
|
|
return self._deptree
|
|
|
|
@classmethod
|
|
def create(cls, name, entry_dict, project, resolver, reverse_deps=None, dev=False):
|
|
return cls(name, entry_dict, project, resolver, reverse_deps, dev)
|
|
|
|
@staticmethod
|
|
def clean_specifier(specifier):
|
|
from pipenv.vendor.packaging.specifiers import Specifier
|
|
if not any(specifier.startswith(k) for k in Specifier._operators.keys()):
|
|
if specifier.strip().lower() in ["any", "<any>", "*"]:
|
|
return "*"
|
|
specifier = "=={0}".format(specifier)
|
|
elif specifier.startswith("==") and specifier.count("=") > 3:
|
|
specifier = "=={0}".format(specifier.lstrip("="))
|
|
return specifier
|
|
|
|
@staticmethod
|
|
def strip_version(specifier):
|
|
from pipenv.vendor.packaging.specifiers import Specifier
|
|
op = next(iter(
|
|
k for k in Specifier._operators.keys() if specifier.startswith(k)
|
|
), None)
|
|
if op:
|
|
specifier = specifier[len(op):]
|
|
while op:
|
|
op = next(iter(
|
|
k for k in Specifier._operators.keys() if specifier.startswith(k)
|
|
), None)
|
|
if op:
|
|
specifier = specifier[len(op):]
|
|
return specifier
|
|
|
|
@property
|
|
def parent_deps(self):
|
|
if not self._parent_deps:
|
|
self._parent_deps = self.get_parent_deps(unnest=False)
|
|
return self._parent_deps
|
|
|
|
@property
|
|
def flattened_parents(self):
|
|
if not self._flattened_parents:
|
|
self._flattened_parents = self.get_parent_deps(unnest=True)
|
|
return self._flattened_parents
|
|
|
|
@property
|
|
def parents_in_pipfile(self):
|
|
if not self._parents_in_pipfile:
|
|
self._parents_in_pipfile = [
|
|
p for p in self.flattened_parents
|
|
if p.normalized_name in self.pipfile_packages
|
|
]
|
|
return self._parents_in_pipfile
|
|
|
|
@property
|
|
def is_updated(self):
|
|
return self.entry.specifiers != self.lockfile_entry.specifiers
|
|
|
|
@property
|
|
def requirements(self):
|
|
if not self._requires:
|
|
self._requires = next(iter(
|
|
self.project.environment.get_package_requirements(self.name)
|
|
), {})
|
|
return self._requires
|
|
|
|
@property
|
|
def updated_version(self):
|
|
version = self.entry.specifiers
|
|
return self.strip_version(version)
|
|
|
|
@property
|
|
def updated_specifier(self):
|
|
# type: () -> str
|
|
return self.entry.specifiers
|
|
|
|
@property
|
|
def original_specifier(self):
|
|
# type: () -> str
|
|
return self.lockfile_entry.specifiers
|
|
|
|
@property
|
|
def original_version(self):
|
|
if self.original_specifier:
|
|
return self.strip_version(self.original_specifier)
|
|
return None
|
|
|
|
def validate_specifiers(self):
|
|
if self.is_in_pipfile and not self.pipfile_entry.editable:
|
|
return self.pipfile_entry.requirement.specifier.contains(self.updated_version)
|
|
return True
|
|
|
|
def get_dependency(self, name):
|
|
if self.requirements:
|
|
return next(iter(
|
|
dep for dep in self.requirements.get("dependencies", [])
|
|
if dep and dep.get("package_name", "") == name
|
|
), {})
|
|
return {}
|
|
|
|
def get_parent_deps(self, unnest=False):
|
|
from pipenv.vendor.packaging.specifiers import Specifier
|
|
parents = []
|
|
for spec in self.reverse_deps.get(self.normalized_name, {}).get("parents", set()):
|
|
spec_match = next(iter(c for c in Specifier._operators if c in spec), None)
|
|
name = spec
|
|
parent = None
|
|
if spec_match is not None:
|
|
spec_index = spec.index(spec_match)
|
|
specifier = self.clean_specifier(spec[spec_index:len(spec_match)]).strip()
|
|
name_start = spec_index + len(spec_match)
|
|
name = spec[name_start:].strip()
|
|
parent = self.create_parent(name, specifier)
|
|
else:
|
|
name = spec
|
|
parent = self.create_parent(name)
|
|
if parent is not None:
|
|
parents.append(parent)
|
|
if not unnest or parent.pipfile_name is not None:
|
|
continue
|
|
if self.reverse_deps.get(parent.normalized_name, {}).get("parents", set()):
|
|
parents.extend(parent.flattened_parents)
|
|
return parents
|
|
|
|
def ensure_least_updates_possible(self):
|
|
"""
|
|
Mutate the current entry to ensure that we are making the smallest amount of
|
|
changes possible to the existing lockfile -- this will keep the old locked
|
|
versions of packages if they satisfy new constraints.
|
|
|
|
:return: None
|
|
"""
|
|
constraints = self.get_constraints()
|
|
can_use_original = True
|
|
can_use_updated = True
|
|
satisfied_by_versions = set()
|
|
for constraint in constraints:
|
|
if not constraint.specifier.contains(self.original_version):
|
|
self.can_use_original = False
|
|
if not constraint.specifier.contains(self.updated_version):
|
|
self.can_use_updated = False
|
|
satisfied_by_value = getattr(constraint, "satisfied_by", None)
|
|
if satisfied_by_value:
|
|
satisfied_by = "{0}".format(
|
|
self.clean_specifier(str(satisfied_by_value.version))
|
|
)
|
|
satisfied_by_versions.add(satisfied_by)
|
|
if can_use_original:
|
|
self.entry_dict = self.lockfile_dict.copy()
|
|
elif can_use_updated:
|
|
if len(satisfied_by_versions) == 1:
|
|
self.entry_dict["version"] = next(iter(
|
|
sat_by for sat_by in satisfied_by_versions if sat_by
|
|
), None)
|
|
hashes = None
|
|
if self.lockfile_entry.specifiers == satisfied_by:
|
|
ireq = self.lockfile_entry.as_ireq()
|
|
if not self.lockfile_entry.hashes and self.resolver._should_include_hash(ireq):
|
|
hashes = self.resolver.get_hash(ireq)
|
|
else:
|
|
hashes = self.lockfile_entry.hashes
|
|
else:
|
|
if self.resolver._should_include_hash(constraint):
|
|
hashes = self.resolver.get_hash(constraint)
|
|
if hashes:
|
|
self.entry_dict["hashes"] = list(hashes)
|
|
self._entry.hashes = frozenset(hashes)
|
|
else:
|
|
# check for any parents, since they depend on this and the current
|
|
# installed versions are not compatible with the new version, so
|
|
# we will need to update the top level dependency if possible
|
|
self.check_flattened_parents()
|
|
|
|
def get_constraints(self):
|
|
"""
|
|
Retrieve all of the relevant constraints, aggregated from the pipfile, resolver,
|
|
and parent dependencies and their respective conflict resolution where possible.
|
|
|
|
:return: A set of **InstallRequirement** instances representing constraints
|
|
:rtype: Set
|
|
"""
|
|
constraints = {
|
|
c for c in self.resolver.parsed_constraints
|
|
if c and c.name == self.entry.name
|
|
}
|
|
pipfile_constraint = self.get_pipfile_constraint()
|
|
if pipfile_constraint and not (self.pipfile_entry.editable or pipfile_constraint.editable):
|
|
constraints.add(pipfile_constraint)
|
|
return constraints
|
|
|
|
def get_pipfile_constraint(self):
|
|
"""
|
|
Retrieve the version constraint from the pipfile if it is specified there,
|
|
otherwise check the constraints of the parent dependencies and their conflicts.
|
|
|
|
:return: An **InstallRequirement** instance representing a version constraint
|
|
"""
|
|
if self.is_in_pipfile:
|
|
return self.pipfile_entry.as_ireq()
|
|
return self.constraint_from_parent_conflicts()
|
|
|
|
def constraint_from_parent_conflicts(self):
|
|
"""
|
|
Given a resolved entry with multiple parent dependencies with different
|
|
constraints, searches for the resolution that satisfies all of the parent
|
|
constraints.
|
|
|
|
:return: A new **InstallRequirement** satisfying all parent constraints
|
|
:raises: :exc:`~pipenv.exceptions.DependencyConflict` if resolution is impossible
|
|
"""
|
|
# ensure that we satisfy the parent dependencies of this dep
|
|
parent_dependencies = set()
|
|
has_mismatch = False
|
|
can_use_original = True
|
|
for p in self.parent_deps:
|
|
# updated dependencies should be satisfied since they were resolved already
|
|
if p.is_updated:
|
|
continue
|
|
# parents with no requirements can't conflict
|
|
if not p.requirements:
|
|
continue
|
|
entry_ref = p.get_dependency(self.name)
|
|
required = entry_ref.get("required_version", "*")
|
|
required = self.clean_specifier(required)
|
|
parent_requires = self.make_requirement(self.name, required)
|
|
parent_dependencies.add("{0} => {1} ({2})".format(p.name, self.name, required))
|
|
# use pre=True here or else prereleases dont satisfy constraints
|
|
if parent_requires.requirement.specifier and (
|
|
not parent_requires.requirement.specifier.contains(self.original_version, prereleases=True)
|
|
):
|
|
can_use_original = False
|
|
if parent_requires.requirement.specifier and (
|
|
not parent_requires.requirement.specifier.contains(self.updated_version, prereleases=True)
|
|
):
|
|
if not self.entry.editable and self.updated_version != self.original_version:
|
|
has_mismatch = True
|
|
if has_mismatch and not can_use_original:
|
|
from pipenv.exceptions import DependencyConflict
|
|
msg = (
|
|
"Cannot resolve {0} ({1}) due to conflicting parent dependencies: "
|
|
"\n\t{2}".format(
|
|
self.name, self.updated_version, "\n\t".join(parent_dependencies)
|
|
)
|
|
)
|
|
raise DependencyConflict(msg)
|
|
elif can_use_original:
|
|
return self.lockfile_entry.as_ireq()
|
|
return self.entry.as_ireq()
|
|
|
|
def validate_constraints(self):
|
|
"""
|
|
Retrieves the full set of available constraints and iterate over them, validating
|
|
that they exist and that they are not causing unresolvable conflicts.
|
|
|
|
:return: True if the constraints are satisfied by the resolution provided
|
|
:raises: :exc:`pipenv.exceptions.DependencyConflict` if the constraints dont exist
|
|
"""
|
|
constraints = self.get_constraints()
|
|
for constraint in constraints:
|
|
try:
|
|
constraint.check_if_exists(False)
|
|
except Exception:
|
|
from pipenv.exceptions import DependencyConflict
|
|
from pipenv.environments import is_verbose
|
|
if is_verbose():
|
|
print("Tried constraint: {0!r}".format(constraint), file=sys.stderr)
|
|
msg = (
|
|
"Cannot resolve conflicting version {0}{1} while {2}{3} is "
|
|
"locked.".format(
|
|
self.name, self.updated_specifier, self.old_name, self.old_specifiers
|
|
)
|
|
)
|
|
raise DependencyConflict(msg)
|
|
return True
|
|
|
|
def check_flattened_parents(self):
|
|
for parent in self.parents_in_pipfile:
|
|
if not parent.updated_specifier:
|
|
continue
|
|
if not parent.validate_specifiers():
|
|
from pipenv.exceptions import DependencyConflict
|
|
msg = (
|
|
"Cannot resolve conflicting versions: (Root: {0}) {1}{2} (Pipfile) "
|
|
"Incompatible with {3}{4} (resolved)\n".format(
|
|
self.name, parent.pipfile_name,
|
|
parent.pipfile_entry.requirement.specifiers, parent.name,
|
|
parent.updated_specifiers
|
|
)
|
|
)
|
|
raise DependencyConflict(msg)
|
|
|
|
def __getattribute__(self, key):
|
|
result = None
|
|
old_version = ["was_", "had_", "old_"]
|
|
new_version = ["is_", "has_", "new_"]
|
|
if any(key.startswith(v) for v in new_version):
|
|
entry = Entry.__getattribute__(self, "entry")
|
|
try:
|
|
keystart = key.index("_") + 1
|
|
try:
|
|
result = getattr(entry, key[keystart:])
|
|
except AttributeError:
|
|
result = getattr(entry, key)
|
|
except AttributeError:
|
|
result = super(Entry, self).__getattribute__(key)
|
|
return result
|
|
if any(key.startswith(v) for v in old_version):
|
|
lockfile_entry = Entry.__getattribute__(self, "lockfile_entry")
|
|
try:
|
|
keystart = key.index("_") + 1
|
|
try:
|
|
result = getattr(lockfile_entry, key[keystart:])
|
|
except AttributeError:
|
|
result = getattr(lockfile_entry, key)
|
|
except AttributeError:
|
|
result = super(Entry, self).__getattribute__(key)
|
|
return result
|
|
return super(Entry, self).__getattribute__(key)
|
|
|
|
|
|
def clean_results(results, resolver, project, dev=False):
|
|
from pipenv.utils import translate_markers
|
|
if not project.lockfile_exists:
|
|
return results
|
|
lockfile = project.lockfile_content
|
|
section = "develop" if dev else "default"
|
|
reverse_deps = project.environment.reverse_dependencies()
|
|
new_results = [r for r in results if r["name"] not in lockfile[section]]
|
|
for result in results:
|
|
name = result.get("name")
|
|
entry_dict = result.copy()
|
|
entry = Entry(name, entry_dict, project, resolver, reverse_deps=reverse_deps, dev=dev)
|
|
entry_dict = translate_markers(entry.get_cleaned_dict(keep_outdated=False))
|
|
new_results.append(entry_dict)
|
|
return new_results
|
|
|
|
|
|
def clean_outdated(results, resolver, project, dev=False):
|
|
if not project.lockfile_exists:
|
|
return results
|
|
lockfile = project.lockfile_content
|
|
section = "develop" if dev else "default"
|
|
reverse_deps = project.environment.reverse_dependencies()
|
|
new_results = [r for r in results if r["name"] not in lockfile[section]]
|
|
for result in results:
|
|
name = result.get("name")
|
|
entry_dict = result.copy()
|
|
entry = Entry(name, entry_dict, project, resolver, reverse_deps=reverse_deps, dev=dev)
|
|
# The old entry was editable but this one isnt; prefer the old one
|
|
# TODO: Should this be the case for all locking?
|
|
if entry.was_editable and not entry.is_editable:
|
|
continue
|
|
lockfile_entry = lockfile[section].get(name, None)
|
|
if not lockfile_entry:
|
|
alternate_section = "develop" if not dev else "default"
|
|
if name in lockfile[alternate_section]:
|
|
lockfile_entry = lockfile[alternate_section][name]
|
|
if lockfile_entry and not entry.is_updated:
|
|
old_markers = next(iter(m for m in (
|
|
entry.lockfile_entry.markers, lockfile_entry.get("markers", None)
|
|
) if m is not None), None)
|
|
new_markers = entry_dict.get("markers", None)
|
|
if old_markers:
|
|
old_markers = Entry.marker_to_str(old_markers)
|
|
if old_markers and not new_markers:
|
|
entry.markers = old_markers
|
|
elif new_markers and not old_markers:
|
|
del entry.entry_dict["markers"]
|
|
entry._entry.req.req.marker = None
|
|
entry._entry.markers = None
|
|
# if the entry has not changed versions since the previous lock,
|
|
# don't introduce new markers since that is more restrictive
|
|
# if entry.has_markers and not entry.had_markers and not entry.is_updated:
|
|
# do make sure we retain the original markers for entries that are not changed
|
|
entry_dict = entry.get_cleaned_dict(keep_outdated=True)
|
|
new_results.append(entry_dict)
|
|
return new_results
|
|
|
|
|
|
def parse_packages(packages, pre, clear, system, requirements_dir=None):
|
|
from pipenv.vendor.requirementslib.models.requirements import Requirement
|
|
from pipenv.vendor.vistir.contextmanagers import cd, temp_path
|
|
from pipenv.utils import parse_indexes
|
|
parsed_packages = []
|
|
for package in packages:
|
|
indexes, trusted_hosts, line = parse_indexes(package)
|
|
line = " ".join(line)
|
|
pf = dict()
|
|
req = Requirement.from_line(line)
|
|
if not req.name:
|
|
with temp_path(), cd(req.req.setup_info.base_dir):
|
|
sys.path.insert(0, req.req.setup_info.base_dir)
|
|
req.req._setup_info.get_info()
|
|
req.update_name_from_path(req.req.setup_info.base_dir)
|
|
try:
|
|
name, entry = req.pipfile_entry
|
|
except Exception:
|
|
continue
|
|
else:
|
|
if name is not None and entry is not None:
|
|
pf[name] = entry
|
|
parsed_packages.append(pf)
|
|
print("RESULTS:")
|
|
if parsed_packages:
|
|
print(json.dumps(parsed_packages))
|
|
else:
|
|
print(json.dumps([]))
|
|
|
|
|
|
def resolve_packages(pre, clear, verbose, system, write, requirements_dir, packages):
|
|
from pipenv.utils import create_mirror_source, resolve_deps, replace_pypi_sources
|
|
pypi_mirror_source = (
|
|
create_mirror_source(os.environ["PIPENV_PYPI_MIRROR"])
|
|
if "PIPENV_PYPI_MIRROR" in os.environ
|
|
else None
|
|
)
|
|
|
|
def resolve(packages, pre, project, sources, clear, system, requirements_dir=None):
|
|
from pipenv.patched.piptools import logging as piptools_logging
|
|
piptools_logging.log.verbosity = 1 if verbose else 0
|
|
return resolve_deps(
|
|
packages,
|
|
which,
|
|
project=project,
|
|
pre=pre,
|
|
sources=sources,
|
|
clear=clear,
|
|
allow_global=system,
|
|
req_dir=requirements_dir
|
|
)
|
|
|
|
from pipenv.core import project
|
|
sources = (
|
|
replace_pypi_sources(project.pipfile_sources, pypi_mirror_source)
|
|
if pypi_mirror_source
|
|
else project.pipfile_sources
|
|
)
|
|
keep_outdated = os.environ.get("PIPENV_KEEP_OUTDATED", False)
|
|
results, resolver = resolve(
|
|
packages,
|
|
pre=pre,
|
|
project=project,
|
|
sources=sources,
|
|
clear=clear,
|
|
system=system,
|
|
requirements_dir=requirements_dir,
|
|
)
|
|
if keep_outdated:
|
|
results = clean_outdated(results, resolver, project)
|
|
else:
|
|
results = clean_results(results, resolver, project)
|
|
if write:
|
|
with open(write, "w") as fh:
|
|
if not results:
|
|
json.dump([], fh)
|
|
else:
|
|
json.dump(results, fh)
|
|
else:
|
|
print("RESULTS:")
|
|
if results:
|
|
print(json.dumps(results))
|
|
else:
|
|
print(json.dumps([]))
|
|
|
|
|
|
def _main(pre, clear, verbose, system, write, requirements_dir, packages, parse_only=False):
|
|
os.environ["PIPENV_REQUESTED_PYTHON_VERSION"] = ".".join([str(s) for s in sys.version_info[:3]])
|
|
os.environ["PIP_PYTHON_PATH"] = str(sys.executable)
|
|
if parse_only:
|
|
parse_packages(
|
|
packages,
|
|
pre=pre,
|
|
clear=clear,
|
|
system=system,
|
|
requirements_dir=requirements_dir,
|
|
)
|
|
else:
|
|
resolve_packages(pre, clear, verbose, system, write, requirements_dir, packages)
|
|
|
|
|
|
def main():
|
|
parser = get_parser()
|
|
parsed, remaining = parser.parse_known_args()
|
|
_patch_path(pipenv_site=parsed.pipenv_site)
|
|
import warnings
|
|
from pipenv.vendor.vistir.compat import ResourceWarning
|
|
from pipenv.vendor.vistir.misc import replace_with_text_stream
|
|
warnings.simplefilter("ignore", category=ResourceWarning)
|
|
replace_with_text_stream("stdout")
|
|
replace_with_text_stream("stderr")
|
|
os.environ["PIP_DISABLE_PIP_VERSION_CHECK"] = str("1")
|
|
os.environ["PYTHONIOENCODING"] = str("utf-8")
|
|
os.environ["PYTHONUNBUFFERED"] = str("1")
|
|
parsed = handle_parsed_args(parsed)
|
|
_main(parsed.pre, parsed.clear, parsed.verbose, parsed.system, parsed.write,
|
|
parsed.requirements_dir, parsed.packages, parse_only=parsed.parse_only)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|