login page

This commit is contained in:
Alicja Cięciwa
2020-10-27 12:57:58 +01:00
commit cb8886666c
8545 changed files with 1082463 additions and 0 deletions

View File

@@ -0,0 +1,13 @@
Copyright (c) 2018, Dan Ryan <dan@danryan.co>
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

View File

@@ -0,0 +1,75 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, unicode_literals
from .compat import (
NamedTemporaryFile,
StringIO,
TemporaryDirectory,
partialmethod,
to_native_string,
)
from .contextmanagers import (
atomic_open_for_write,
cd,
open_file,
replaced_stream,
replaced_streams,
spinner,
temp_environ,
temp_path,
)
from .cursor import hide_cursor, show_cursor
from .misc import (
StreamWrapper,
chunked,
decode_for_output,
divide,
get_wrapped_stream,
load_path,
partialclass,
run,
shell_escape,
take,
to_bytes,
to_text,
)
from .path import create_tracked_tempdir, create_tracked_tempfile, mkdir_p, rmtree
from .spin import create_spinner
__version__ = "0.5.2"
__all__ = [
"shell_escape",
"load_path",
"run",
"partialclass",
"temp_environ",
"temp_path",
"cd",
"atomic_open_for_write",
"open_file",
"rmtree",
"mkdir_p",
"TemporaryDirectory",
"NamedTemporaryFile",
"partialmethod",
"spinner",
"create_spinner",
"create_tracked_tempdir",
"create_tracked_tempfile",
"to_native_string",
"decode_for_output",
"to_text",
"to_bytes",
"take",
"chunked",
"divide",
"StringIO",
"get_wrapped_stream",
"StreamWrapper",
"replaced_stream",
"replaced_streams",
"show_cursor",
"hide_cursor",
]

View File

@@ -0,0 +1,526 @@
# -*- coding: utf-8 -*-
# This Module is taken in part from the click project and expanded
# see https://github.com/pallets/click/blob/6cafd32/click/_winconsole.py
# Copyright © 2014 by the Pallets team.
# Some rights reserved.
# Redistribution and use in source and binary forms of the software as well as
# documentation, with or without modification, are permitted provided that the
# following conditions are met:
# Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this
# software without specific prior written permission.
# THIS SOFTWARE AND DOCUMENTATION IS PROVIDED BY THE COPYRIGHT HOLDERS AND
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT
# NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE AND
# DOCUMENTATION, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# This module is based on the excellent work by Adam Bartoš who
# provided a lot of what went into the implementation here in
# the discussion to issue1602 in the Python bug tracker.
#
# There are some general differences in regards to how this works
# compared to the original patches as we do not need to patch
# the entire interpreter but just work in our little world of
# echo and prmopt.
import ctypes
import io
import os
import sys
import time
import zlib
from ctypes import (
POINTER,
WINFUNCTYPE,
Structure,
byref,
c_char,
c_char_p,
c_int,
c_ssize_t,
c_ulong,
c_void_p,
create_unicode_buffer,
py_object,
windll,
)
from ctypes.wintypes import HANDLE, LPCWSTR, LPWSTR
from itertools import count
import msvcrt
from six import PY2, text_type
from .compat import IS_TYPE_CHECKING
from .misc import StreamWrapper, run, to_text
try:
from ctypes import pythonapi
PyObject_GetBuffer = pythonapi.PyObject_GetBuffer
PyBuffer_Release = pythonapi.PyBuffer_Release
except ImportError:
pythonapi = None
if IS_TYPE_CHECKING:
from typing import Text
c_ssize_p = POINTER(c_ssize_t)
CommandLineToArgvW = WINFUNCTYPE(POINTER(LPWSTR), LPCWSTR, POINTER(c_int))(
("CommandLineToArgvW", windll.shell32)
)
kernel32 = windll.kernel32
GetLastError = kernel32.GetLastError
GetCommandLineW = WINFUNCTYPE(LPWSTR)(("GetCommandLineW", windll.kernel32))
GetConsoleCursorInfo = kernel32.GetConsoleCursorInfo
GetStdHandle = kernel32.GetStdHandle
LocalFree = WINFUNCTYPE(ctypes.c_void_p, ctypes.c_void_p)(("LocalFree", windll.kernel32))
ReadConsoleW = kernel32.ReadConsoleW
SetConsoleCursorInfo = kernel32.SetConsoleCursorInfo
WriteConsoleW = kernel32.WriteConsoleW
# XXX: Added for cursor hiding on windows
STDOUT_HANDLE_ID = ctypes.c_ulong(-11)
STDERR_HANDLE_ID = ctypes.c_ulong(-12)
STDIN_HANDLE = GetStdHandle(-10)
STDOUT_HANDLE = GetStdHandle(-11)
STDERR_HANDLE = GetStdHandle(-12)
STREAM_MAP = {0: STDIN_HANDLE, 1: STDOUT_HANDLE, 2: STDERR_HANDLE}
PyBUF_SIMPLE = 0
PyBUF_WRITABLE = 1
ERROR_SUCCESS = 0
ERROR_NOT_ENOUGH_MEMORY = 8
ERROR_OPERATION_ABORTED = 995
STDIN_FILENO = 0
STDOUT_FILENO = 1
STDERR_FILENO = 2
EOF = b"\x1a"
MAX_BYTES_WRITTEN = 32767
class Py_buffer(Structure):
_fields_ = [
("buf", c_void_p),
("obj", py_object),
("len", c_ssize_t),
("itemsize", c_ssize_t),
("readonly", c_int),
("ndim", c_int),
("format", c_char_p),
("shape", c_ssize_p),
("strides", c_ssize_p),
("suboffsets", c_ssize_p),
("internal", c_void_p),
]
if PY2:
_fields_.insert(-1, ("smalltable", c_ssize_t * 2))
# XXX: This was added for the use of cursors
class CONSOLE_CURSOR_INFO(Structure):
_fields_ = [("dwSize", ctypes.c_int), ("bVisible", ctypes.c_int)]
# On PyPy we cannot get buffers so our ability to operate here is
# serverly limited.
if pythonapi is None:
get_buffer = None
else:
def get_buffer(obj, writable=False):
buf = Py_buffer()
flags = PyBUF_WRITABLE if writable else PyBUF_SIMPLE
PyObject_GetBuffer(py_object(obj), byref(buf), flags)
try:
buffer_type = c_char * buf.len
return buffer_type.from_address(buf.buf)
finally:
PyBuffer_Release(byref(buf))
def get_long_path(short_path):
# type: (Text, str) -> Text
BUFFER_SIZE = 500
buffer = create_unicode_buffer(BUFFER_SIZE)
get_long_path_name = windll.kernel32.GetLongPathNameW
get_long_path_name(to_text(short_path), buffer, BUFFER_SIZE)
return buffer.value
class _WindowsConsoleRawIOBase(io.RawIOBase):
def __init__(self, handle):
self.handle = handle
def isatty(self):
io.RawIOBase.isatty(self)
return True
class _WindowsConsoleReader(_WindowsConsoleRawIOBase):
def readable(self):
return True
def readinto(self, b):
bytes_to_be_read = len(b)
if not bytes_to_be_read:
return 0
elif bytes_to_be_read % 2:
raise ValueError(
"cannot read odd number of bytes from " "UTF-16-LE encoded console"
)
buffer = get_buffer(b, writable=True)
code_units_to_be_read = bytes_to_be_read // 2
code_units_read = c_ulong()
rv = ReadConsoleW(
self.handle, buffer, code_units_to_be_read, byref(code_units_read), None
)
if GetLastError() == ERROR_OPERATION_ABORTED:
# wait for KeyboardInterrupt
time.sleep(0.1)
if not rv:
raise OSError("Windows error: %s" % GetLastError())
if buffer[0] == EOF:
return 0
return 2 * code_units_read.value
class _WindowsConsoleWriter(_WindowsConsoleRawIOBase):
def writable(self):
return True
@staticmethod
def _get_error_message(errno):
if errno == ERROR_SUCCESS:
return "ERROR_SUCCESS"
elif errno == ERROR_NOT_ENOUGH_MEMORY:
return "ERROR_NOT_ENOUGH_MEMORY"
return "Windows error %s" % errno
def write(self, b):
bytes_to_be_written = len(b)
buf = get_buffer(b)
code_units_to_be_written = min(bytes_to_be_written, MAX_BYTES_WRITTEN) // 2
code_units_written = c_ulong()
WriteConsoleW(
self.handle, buf, code_units_to_be_written, byref(code_units_written), None
)
bytes_written = 2 * code_units_written.value
if bytes_written == 0 and bytes_to_be_written > 0:
raise OSError(self._get_error_message(GetLastError()))
return bytes_written
class ConsoleStream(object):
def __init__(self, text_stream, byte_stream):
self._text_stream = text_stream
self.buffer = byte_stream
@property
def name(self):
return self.buffer.name
@property
def fileno(self):
return self.buffer.fileno
def write(self, x):
if isinstance(x, text_type):
return self._text_stream.write(x)
try:
self.flush()
except Exception:
pass
return self.buffer.write(x)
def writelines(self, lines):
for line in lines:
self.write(line)
def __getattr__(self, name):
try:
return getattr(self._text_stream, name)
except io.UnsupportedOperation:
return getattr(self.buffer, name)
def isatty(self):
return self.buffer.isatty()
def __repr__(self):
return "<ConsoleStream name=%r encoding=%r>" % (self.name, self.encoding)
class WindowsChunkedWriter(object):
"""
Wraps a stream (such as stdout), acting as a transparent proxy for all
attribute access apart from method 'write()' which we wrap to write in
limited chunks due to a Windows limitation on binary console streams.
"""
def __init__(self, wrapped):
# double-underscore everything to prevent clashes with names of
# attributes on the wrapped stream object.
self.__wrapped = wrapped
def __getattr__(self, name):
return getattr(self.__wrapped, name)
def write(self, text):
total_to_write = len(text)
written = 0
while written < total_to_write:
to_write = min(total_to_write - written, MAX_BYTES_WRITTEN)
self.__wrapped.write(text[written : written + to_write])
written += to_write
_wrapped_std_streams = set()
def _wrap_std_stream(name):
# Python 2 & Windows 7 and below
if PY2 and sys.getwindowsversion()[:2] <= (6, 1) and name not in _wrapped_std_streams:
setattr(sys, name, WindowsChunkedWriter(getattr(sys, name)))
_wrapped_std_streams.add(name)
def _get_text_stdin(buffer_stream):
text_stream = StreamWrapper(
io.BufferedReader(_WindowsConsoleReader(STDIN_HANDLE)),
"utf-16-le",
"strict",
line_buffering=True,
)
return ConsoleStream(text_stream, buffer_stream)
def _get_text_stdout(buffer_stream):
text_stream = StreamWrapper(
io.BufferedWriter(_WindowsConsoleWriter(STDOUT_HANDLE)),
"utf-16-le",
"strict",
line_buffering=True,
)
return ConsoleStream(text_stream, buffer_stream)
def _get_text_stderr(buffer_stream):
text_stream = StreamWrapper(
io.BufferedWriter(_WindowsConsoleWriter(STDERR_HANDLE)),
"utf-16-le",
"strict",
line_buffering=True,
)
return ConsoleStream(text_stream, buffer_stream)
if PY2:
def _hash_py_argv():
return zlib.crc32("\x00".join(sys.argv[1:]))
_initial_argv_hash = _hash_py_argv()
def _get_windows_argv():
argc = c_int(0)
argv_unicode = CommandLineToArgvW(GetCommandLineW(), byref(argc))
try:
argv = [argv_unicode[i] for i in range(0, argc.value)]
finally:
LocalFree(argv_unicode)
del argv_unicode
if not hasattr(sys, "frozen"):
argv = argv[1:]
while len(argv) > 0:
arg = argv[0]
if not arg.startswith("-") or arg == "-":
break
argv = argv[1:]
if arg.startswith(("-c", "-m")):
break
return argv[1:]
_stream_factories = {0: _get_text_stdin, 1: _get_text_stdout, 2: _get_text_stderr}
def _get_windows_console_stream(f, encoding, errors):
if (
get_buffer is not None
and encoding in ("utf-16-le", None)
and errors in ("strict", None)
and hasattr(f, "isatty")
and f.isatty()
):
if isinstance(f, ConsoleStream):
return f
func = _stream_factories.get(f.fileno())
if func is not None:
if not PY2:
f = getattr(f, "buffer", None)
if f is None:
return None
else:
# If we are on Python 2 we need to set the stream that we
# deal with to binary mode as otherwise the exercise if a
# bit moot. The same problems apply as for
# get_binary_stdin and friends from _compat.
msvcrt.setmode(f.fileno(), os.O_BINARY)
return func(f)
def hide_cursor():
cursor_info = CONSOLE_CURSOR_INFO()
GetConsoleCursorInfo(STDOUT_HANDLE, ctypes.byref(cursor_info))
cursor_info.visible = False
SetConsoleCursorInfo(STDOUT_HANDLE, ctypes.byref(cursor_info))
def show_cursor():
cursor_info = CONSOLE_CURSOR_INFO()
GetConsoleCursorInfo(STDOUT_HANDLE, ctypes.byref(cursor_info))
cursor_info.visible = True
SetConsoleCursorInfo(STDOUT_HANDLE, ctypes.byref(cursor_info))
def get_stream_handle(stream):
return STREAM_MAP.get(stream.fileno())
def _walk_for_powershell(directory):
for path, dirs, files in os.walk(directory):
powershell = next(
iter(fn for fn in files if fn.lower() == "powershell.exe"), None
)
if powershell is not None:
return os.path.join(directory, powershell)
for subdir in dirs:
powershell = _walk_for_powershell(os.path.join(directory, subdir))
if powershell:
return powershell
return None
def _get_powershell_path():
paths = [
os.path.expandvars(r"%windir%\{0}\WindowsPowerShell").format(subdir)
for subdir in ("SysWOW64", "system32")
]
powershell_path = next(iter(_walk_for_powershell(pth) for pth in paths), None)
if not powershell_path:
powershell_path, _ = run(
["where", "powershell"], block=True, nospin=True, return_object=False
)
if powershell_path:
return powershell_path.strip()
return None
def _get_sid_with_powershell():
powershell_path = _get_powershell_path()
if not powershell_path:
return None
args = [
powershell_path,
"-ExecutionPolicy",
"Bypass",
"-Command",
"Invoke-Expression '[System.Security.Principal.WindowsIdentity]::GetCurrent().user | Write-Host'",
]
sid, _ = run(args, nospin=True)
return sid.strip()
def _get_sid_from_registry():
try:
import winreg
except ImportError:
import _winreg as winreg
var_names = ("%USERPROFILE%", "%HOME%")
current_user_home = next(iter(os.path.expandvars(v) for v in var_names if v), None)
root, subkey = (
winreg.HKEY_LOCAL_MACHINE,
r"Software\Microsoft\Windows NT\CurrentVersion\ProfileList",
)
subkey_names = []
value = None
matching_key = None
try:
with winreg.OpenKeyEx(root, subkey, 0, winreg.KEY_READ) as key:
for i in count():
key_name = winreg.EnumKey(key, i)
subkey_names.append(key_name)
value = query_registry_value(
root, r"{0}\{1}".format(subkey, key_name), "ProfileImagePath"
)
if value and value.lower() == current_user_home.lower():
matching_key = key_name
break
except OSError:
pass
if matching_key is not None:
return matching_key
def get_value_from_tuple(value, value_type):
try:
import winreg
except ImportError:
import _winreg as winreg
if value_type in (winreg.REG_SZ, winreg.REG_EXPAND_SZ):
if "\0" in value:
return value[: value.index("\0")]
return value
return None
def query_registry_value(root, key_name, value):
try:
import winreg
except ImportError:
import _winreg as winreg
try:
with winreg.OpenKeyEx(root, key_name, 0, winreg.KEY_READ) as key:
return get_value_from_tuple(*winreg.QueryValueEx(key, value))
except OSError:
return None
def get_current_user():
fns = (_get_sid_from_registry, _get_sid_with_powershell)
for fn in fns:
result = fn()
if result:
return result
return None

View File

@@ -0,0 +1,8 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, unicode_literals
from .functools import partialmethod
from .surrogateescape import register_surrogateescape
from .tempfile import NamedTemporaryFile
__all__ = ["NamedTemporaryFile", "partialmethod", "register_surrogateescape"]

View File

@@ -0,0 +1,84 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, unicode_literals
from functools import partial
__all__ = ["partialmethod"]
class partialmethod(object):
"""Method descriptor with partial application of the given arguments
and keywords.
Supports wrapping existing descriptors and handles non-descriptor
callables as instance methods.
"""
def __init__(self, func, *args, **keywords):
if not callable(func) and not hasattr(func, "__get__"):
raise TypeError("{!r} is not callable or a descriptor".format(func))
# func could be a descriptor like classmethod which isn't callable,
# so we can't inherit from partial (it verifies func is callable)
if isinstance(func, partialmethod):
# flattening is mandatory in order to place cls/self before all
# other arguments
# it's also more efficient since only one function will be called
self.func = func.func
self.args = func.args + args
self.keywords = func.keywords.copy()
self.keywords.update(keywords)
else:
self.func = func
self.args = args
self.keywords = keywords
def __repr__(self):
args = ", ".join(map(repr, self.args))
keywords = ", ".join("{}={!r}".format(k, v) for k, v in self.keywords.items())
format_string = "{module}.{cls}({func}, {args}, {keywords})"
return format_string.format(
module=self.__class__.__module__,
cls=self.__class__.__qualname__,
func=self.func,
args=args,
keywords=keywords,
)
def _make_unbound_method(self):
def _method(*args, **keywords):
call_keywords = self.keywords.copy()
call_keywords.update(keywords)
if len(args) > 1:
cls_or_self, rest = args[0], tuple(args[1:])
else:
cls_or_self = args[0]
rest = tuple()
call_args = (cls_or_self,) + self.args + tuple(rest)
return self.func(*call_args, **call_keywords)
_method.__isabstractmethod__ = self.__isabstractmethod__
_method._partialmethod = self
return _method
def __get__(self, obj, cls):
get = getattr(self.func, "__get__", None)
result = None
if get is not None:
new_func = get(obj, cls)
if new_func is not self.func:
# Assume __get__ returning something new indicates the
# creation of an appropriate callable
result = partial(new_func, *self.args, **self.keywords)
try:
result.__self__ = new_func.__self__
except AttributeError:
pass
if result is None:
# If the underlying descriptor didn't do anything, treat this
# like an instance method
result = self._make_unbound_method().__get__(obj, cls)
return result
@property
def __isabstractmethod__(self):
return getattr(self.func, "__isabstractmethod__", False)

View File

@@ -0,0 +1,196 @@
"""
This is Victor Stinner's pure-Python implementation of PEP 383: the "surrogateescape" error
handler of Python 3.
Source: misc/python/surrogateescape.py in https://bitbucket.org/haypo/misc
"""
# This code is released under the Python license and the BSD 2-clause license
import codecs
import sys
import six
FS_ERRORS = "surrogateescape"
# # -- Python 2/3 compatibility -------------------------------------
# FS_ERRORS = 'my_surrogateescape'
def u(text):
if six.PY3:
return text
else:
return text.decode("unicode_escape")
def b(data):
if six.PY3:
return data.encode("latin1")
else:
return data
if six.PY3:
_unichr = chr
bytes_chr = lambda code: bytes((code,))
else:
_unichr = unichr # type: ignore
bytes_chr = chr
def surrogateescape_handler(exc):
"""
Pure Python implementation of the PEP 383: the "surrogateescape" error
handler of Python 3. Undecodable bytes will be replaced by a Unicode
character U+DCxx on decoding, and these are translated into the
original bytes on encoding.
"""
mystring = exc.object[exc.start : exc.end]
try:
if isinstance(exc, UnicodeDecodeError):
# mystring is a byte-string in this case
decoded = replace_surrogate_decode(mystring)
elif isinstance(exc, UnicodeEncodeError):
# In the case of u'\udcc3'.encode('ascii',
# 'this_surrogateescape_handler'), both Python 2.x and 3.x raise an
# exception anyway after this function is called, even though I think
# it's doing what it should. It seems that the strict encoder is called
# to encode the unicode string that this function returns ...
decoded = replace_surrogate_encode(mystring)
else:
raise exc
except NotASurrogateError:
raise exc
return (decoded, exc.end)
class NotASurrogateError(Exception):
pass
def replace_surrogate_encode(mystring):
"""
Returns a (unicode) string, not the more logical bytes, because the codecs
register_error functionality expects this.
"""
decoded = []
for ch in mystring:
# if utils.PY3:
# code = ch
# else:
code = ord(ch)
# The following magic comes from Py3.3's Python/codecs.c file:
if not 0xD800 <= code <= 0xDCFF:
# Not a surrogate. Fail with the original exception.
raise NotASurrogateError
# mybytes = [0xe0 | (code >> 12),
# 0x80 | ((code >> 6) & 0x3f),
# 0x80 | (code & 0x3f)]
# Is this a good idea?
if 0xDC00 <= code <= 0xDC7F:
decoded.append(_unichr(code - 0xDC00))
elif code <= 0xDCFF:
decoded.append(_unichr(code - 0xDC00))
else:
raise NotASurrogateError
return str().join(decoded)
def replace_surrogate_decode(mybytes):
"""
Returns a (unicode) string
"""
decoded = []
for ch in mybytes:
# We may be parsing newbytes (in which case ch is an int) or a native
# str on Py2
if isinstance(ch, int):
code = ch
else:
code = ord(ch)
if 0x80 <= code <= 0xFF:
decoded.append(_unichr(0xDC00 + code))
elif code <= 0x7F:
decoded.append(_unichr(code))
else:
# # It may be a bad byte
# # Try swallowing it.
# continue
# print("RAISE!")
raise NotASurrogateError
return str().join(decoded)
def encodefilename(fn):
if FS_ENCODING == "ascii":
# ASCII encoder of Python 2 expects that the error handler returns a
# Unicode string encodable to ASCII, whereas our surrogateescape error
# handler has to return bytes in 0x80-0xFF range.
encoded = []
for index, ch in enumerate(fn):
code = ord(ch)
if code < 128:
ch = bytes_chr(code)
elif 0xDC80 <= code <= 0xDCFF:
ch = bytes_chr(code - 0xDC00)
else:
raise UnicodeEncodeError(
FS_ENCODING, fn, index, index + 1, "ordinal not in range(128)"
)
encoded.append(ch)
return bytes().join(encoded)
elif FS_ENCODING == "utf-8":
# UTF-8 encoder of Python 2 encodes surrogates, so U+DC80-U+DCFF
# doesn't go through our error handler
encoded = []
for index, ch in enumerate(fn):
code = ord(ch)
if 0xD800 <= code <= 0xDFFF:
if 0xDC80 <= code <= 0xDCFF:
ch = bytes_chr(code - 0xDC00)
encoded.append(ch)
else:
raise UnicodeEncodeError(
FS_ENCODING, fn, index, index + 1, "surrogates not allowed"
)
else:
ch_utf8 = ch.encode("utf-8")
encoded.append(ch_utf8)
return bytes().join(encoded)
else:
return fn.encode(FS_ENCODING, FS_ERRORS)
def decodefilename(fn):
return fn.decode(FS_ENCODING, FS_ERRORS)
FS_ENCODING = "ascii"
fn = b("[abc\xff]")
encoded = u("[abc\udcff]")
# FS_ENCODING = 'cp932'; fn = b('[abc\x81\x00]'); encoded = u('[abc\udc81\x00]')
# FS_ENCODING = 'UTF-8'; fn = b('[abc\xff]'); encoded = u('[abc\udcff]')
# normalize the filesystem encoding name.
# For example, we expect "utf-8", not "UTF8".
FS_ENCODING = codecs.lookup(FS_ENCODING).name
def register_surrogateescape():
"""
Registers the surrogateescape error handler on Python 2 (only)
"""
if six.PY3:
return
try:
codecs.lookup_error(FS_ERRORS)
except LookupError:
codecs.register_error(FS_ERRORS, surrogateescape_handler)
if __name__ == "__main__":
pass

View File

@@ -0,0 +1,234 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, unicode_literals
import functools
import io
import os
import sys
from tempfile import _bin_openflags, _mkstemp_inner, gettempdir
import six
try:
from weakref import finalize
except ImportError:
from pipenv.vendor.backports.weakref import finalize
def fs_encode(path):
try:
return os.fsencode(path)
except AttributeError:
from ..compat import fs_encode
return fs_encode(path)
def fs_decode(path):
try:
return os.fsdecode(path)
except AttributeError:
from ..compat import fs_decode
return fs_decode(path)
__all__ = ["finalize", "NamedTemporaryFile"]
try:
from tempfile import _infer_return_type
except ImportError:
def _infer_return_type(*args):
_types = set()
for arg in args:
if isinstance(type(arg), six.string_types):
_types.add(str)
elif isinstance(type(arg), bytes):
_types.add(bytes)
elif arg:
_types.add(type(arg))
return _types.pop()
def _sanitize_params(prefix, suffix, dir):
"""Common parameter processing for most APIs in this module."""
output_type = _infer_return_type(prefix, suffix, dir)
if suffix is None:
suffix = output_type()
if prefix is None:
if output_type is str:
prefix = "tmp"
else:
prefix = os.fsencode("tmp")
if dir is None:
if output_type is str:
dir = gettempdir()
else:
dir = fs_encode(gettempdir())
return prefix, suffix, dir, output_type
class _TemporaryFileCloser:
"""A separate object allowing proper closing of a temporary file's
underlying file object, without adding a __del__ method to the
temporary file."""
file = None # Set here since __del__ checks it
close_called = False
def __init__(self, file, name, delete=True):
self.file = file
self.name = name
self.delete = delete
# NT provides delete-on-close as a primitive, so we don't need
# the wrapper to do anything special. We still use it so that
# file.name is useful (i.e. not "(fdopen)") with NamedTemporaryFile.
if os.name != "nt":
# Cache the unlinker so we don't get spurious errors at
# shutdown when the module-level "os" is None'd out. Note
# that this must be referenced as self.unlink, because the
# name TemporaryFileWrapper may also get None'd out before
# __del__ is called.
def close(self, unlink=os.unlink):
if not self.close_called and self.file is not None:
self.close_called = True
try:
self.file.close()
finally:
if self.delete:
unlink(self.name)
# Need to ensure the file is deleted on __del__
def __del__(self):
self.close()
else:
def close(self):
if not self.close_called:
self.close_called = True
self.file.close()
class _TemporaryFileWrapper:
"""Temporary file wrapper
This class provides a wrapper around files opened for
temporary use. In particular, it seeks to automatically
remove the file when it is no longer needed.
"""
def __init__(self, file, name, delete=True):
self.file = file
self.name = name
self.delete = delete
self._closer = _TemporaryFileCloser(file, name, delete)
def __getattr__(self, name):
# Attribute lookups are delegated to the underlying file
# and cached for non-numeric results
# (i.e. methods are cached, closed and friends are not)
file = self.__dict__["file"]
a = getattr(file, name)
if hasattr(a, "__call__"):
func = a
@functools.wraps(func)
def func_wrapper(*args, **kwargs):
return func(*args, **kwargs)
# Avoid closing the file as long as the wrapper is alive,
# see issue #18879.
func_wrapper._closer = self._closer
a = func_wrapper
if not isinstance(a, int):
setattr(self, name, a)
return a
# The underlying __enter__ method returns the wrong object
# (self.file) so override it to return the wrapper
def __enter__(self):
self.file.__enter__()
return self
# Need to trap __exit__ as well to ensure the file gets
# deleted when used in a with statement
def __exit__(self, exc, value, tb):
result = self.file.__exit__(exc, value, tb)
self.close()
return result
def close(self):
"""
Close the temporary file, possibly deleting it.
"""
self._closer.close()
# iter() doesn't use __getattr__ to find the __iter__ method
def __iter__(self):
# Don't return iter(self.file), but yield from it to avoid closing
# file as long as it's being used as iterator (see issue #23700). We
# can't use 'yield from' here because iter(file) returns the file
# object itself, which has a close method, and thus the file would get
# closed when the generator is finalized, due to PEP380 semantics.
for line in self.file:
yield line
def NamedTemporaryFile(
mode="w+b",
buffering=-1,
encoding=None,
newline=None,
suffix=None,
prefix=None,
dir=None,
delete=True,
wrapper_class_override=None,
):
"""Create and return a temporary file.
Arguments:
'prefix', 'suffix', 'dir' -- as for mkstemp.
'mode' -- the mode argument to io.open (default "w+b").
'buffering' -- the buffer size argument to io.open (default -1).
'encoding' -- the encoding argument to io.open (default None)
'newline' -- the newline argument to io.open (default None)
'delete' -- whether the file is deleted on close (default True).
The file is created as mkstemp() would do it.
Returns an object with a file-like interface; the name of the file
is accessible as its 'name' attribute. The file will be automatically
deleted when it is closed unless the 'delete' argument is set to False.
"""
prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir)
flags = _bin_openflags
# Setting O_TEMPORARY in the flags causes the OS to delete
# the file when it is closed. This is only supported by Windows.
if not wrapper_class_override:
wrapper_class_override = _TemporaryFileWrapper
if os.name == "nt" and delete:
flags |= os.O_TEMPORARY
if sys.version_info < (3, 5):
(fd, name) = _mkstemp_inner(dir, prefix, suffix, flags)
else:
(fd, name) = _mkstemp_inner(dir, prefix, suffix, flags, output_type)
try:
file = io.open(fd, mode, buffering=buffering, newline=newline, encoding=encoding)
if wrapper_class_override is not None:
return type(str("_TempFileWrapper"), (wrapper_class_override, object), {})(
file, name, delete
)
else:
return _TemporaryFileWrapper(file, name, delete)
except BaseException:
os.unlink(name)
os.close(fd)
raise

View File

@@ -0,0 +1,85 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, unicode_literals
import itertools
import re
import shlex
import six
__all__ = ["ScriptEmptyError", "Script"]
class ScriptEmptyError(ValueError):
pass
def _quote_if_contains(value, pattern):
if next(re.finditer(pattern, value), None):
return '"{0}"'.format(re.sub(r'(\\*)"', r'\1\1\\"', value))
return value
class Script(object):
"""Parse a script line (in Pipfile's [scripts] section).
This always works in POSIX mode, even on Windows.
"""
def __init__(self, command, args=None):
self._parts = [command]
if args:
self._parts.extend(args)
@classmethod
def parse(cls, value):
if isinstance(value, six.string_types):
value = shlex.split(value)
if not value:
raise ScriptEmptyError(value)
return cls(value[0], value[1:])
def __repr__(self):
return "Script({0!r})".format(self._parts)
@property
def command(self):
return self._parts[0]
@property
def args(self):
return self._parts[1:]
def extend(self, extra_args):
self._parts.extend(extra_args)
def cmdify(self):
"""Encode into a cmd-executable string.
This re-implements CreateProcess's quoting logic to turn a list of
arguments into one single string for the shell to interpret.
* All double quotes are escaped with a backslash.
* Existing backslashes before a quote are doubled, so they are all
escaped properly.
* Backslashes elsewhere are left as-is; cmd will interpret them
literally.
The result is then quoted into a pair of double quotes to be grouped.
An argument is intentionally not quoted if it does not contain
whitespaces. This is done to be compatible with Windows built-in
commands that don't work well with quotes, e.g. everything with `echo`,
and DOS-style (forward slash) switches.
The intended use of this function is to pre-process an argument list
before passing it into ``subprocess.Popen(..., shell=True)``.
See also: https://docs.python.org/3/library/subprocess.html#converting-argument-sequence
"""
return " ".join(
itertools.chain(
[_quote_if_contains(self.command, r"[\s^()]")],
(_quote_if_contains(arg, r"[\s^]") for arg in self.args),
)
)

View File

@@ -0,0 +1,453 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import codecs
import errno
import os
import sys
import warnings
from tempfile import mkdtemp
import six
from .backports.tempfile import NamedTemporaryFile as _NamedTemporaryFile
__all__ = [
"Path",
"get_terminal_size",
"finalize",
"partialmethod",
"JSONDecodeError",
"FileNotFoundError",
"ResourceWarning",
"PermissionError",
"is_type_checking",
"IS_TYPE_CHECKING",
"IsADirectoryError",
"fs_str",
"lru_cache",
"TemporaryDirectory",
"NamedTemporaryFile",
"to_native_string",
"samefile",
"Mapping",
"Hashable",
"MutableMapping",
"Container",
"Iterator",
"KeysView",
"ItemsView",
"MappingView",
"Iterable",
"Set",
"Sequence",
"Sized",
"ValuesView",
"MutableSet",
"MutableSequence",
"Callable",
"fs_encode",
"fs_decode",
"_fs_encode_errors",
"_fs_decode_errors",
]
if sys.version_info >= (3, 5): # pragma: no cover
from pathlib import Path
else: # pragma: no cover
from pipenv.vendor.pathlib2 import Path
if sys.version_info >= (3, 4): # pragma: no cover
# Only Python 3.4+ is supported
from functools import lru_cache, partialmethod
from tempfile import NamedTemporaryFile
from shutil import get_terminal_size
from weakref import finalize
from collections.abc import (
Mapping,
Hashable,
MutableMapping,
Container,
Iterator,
KeysView,
ItemsView,
MappingView,
Iterable,
Set,
Sequence,
Sized,
ValuesView,
MutableSet,
MutableSequence,
Callable,
)
from os.path import samefile
else: # pragma: no cover
# Only Python 2.7 is supported
from pipenv.vendor.backports.functools_lru_cache import lru_cache
from pipenv.vendor.backports.shutil_get_terminal_size import get_terminal_size
from .backports.functools import partialmethod # type: ignore
from .backports.surrogateescape import register_surrogateescape
from collections import (
Mapping,
Hashable,
MutableMapping,
Container,
Iterator,
KeysView,
ItemsView,
MappingView,
Iterable,
Set,
Sequence,
Sized,
ValuesView,
MutableSet,
MutableSequence,
Callable,
)
register_surrogateescape()
NamedTemporaryFile = _NamedTemporaryFile
from pipenv.vendor.backports.weakref import finalize # type: ignore
try:
from os.path import samefile
except ImportError:
def samestat(s1, s2):
"""Test whether two stat buffers reference the same file."""
return s1.st_ino == s2.st_ino and s1.st_dev == s2.st_dev
def samefile(f1, f2):
"""Test whether two pathnames reference the same actual file or
directory This is determined by the device number and i-node number
and raises an exception if an os.stat() call on either pathname
fails."""
s1 = os.stat(f1)
s2 = os.stat(f2)
return samestat(s1, s2)
try:
# Introduced Python 3.5
from json import JSONDecodeError
except ImportError: # pragma: no cover
JSONDecodeError = ValueError # type: ignore
if six.PY2: # pragma: no cover
from io import BytesIO as StringIO
class ResourceWarning(Warning):
pass
class FileNotFoundError(IOError):
"""No such file or directory."""
def __init__(self, *args, **kwargs):
self.errno = errno.ENOENT
super(FileNotFoundError, self).__init__(*args, **kwargs)
class PermissionError(OSError):
def __init__(self, *args, **kwargs):
self.errno = errno.EACCES
super(PermissionError, self).__init__(*args, **kwargs)
class TimeoutError(OSError):
"""Timeout expired."""
def __init__(self, *args, **kwargs):
self.errno = errno.ETIMEDOUT
super(TimeoutError, self).__init__(*args, **kwargs)
class IsADirectoryError(OSError):
"""The command does not work on directories."""
def __init__(self, *args, **kwargs):
self.errno = errno.EISDIR
super(IsADirectoryError, self).__init__(*args, **kwargs)
class FileExistsError(OSError):
def __init__(self, *args, **kwargs):
self.errno = errno.EEXIST
super(FileExistsError, self).__init__(*args, **kwargs)
else: # pragma: no cover
from builtins import (
ResourceWarning,
FileNotFoundError,
PermissionError,
IsADirectoryError,
FileExistsError,
TimeoutError,
)
from io import StringIO
if not sys.warnoptions:
warnings.simplefilter("default", ResourceWarning)
def is_type_checking():
try:
from typing import TYPE_CHECKING
except ImportError:
return False
return TYPE_CHECKING
IS_TYPE_CHECKING = os.environ.get("MYPY_RUNNING", is_type_checking())
class TemporaryDirectory(object):
"""
Create and return a temporary directory. This has the same
behavior as mkdtemp but can be used as a context manager. For
example:
with TemporaryDirectory() as tmpdir:
...
Upon exiting the context, the directory and everything contained
in it are removed.
"""
def __init__(self, suffix="", prefix=None, dir=None):
if "RAM_DISK" in os.environ:
import uuid
name = uuid.uuid4().hex
dir_name = os.path.join(os.environ["RAM_DISK"].strip(), name)
os.mkdir(dir_name)
self.name = dir_name
else:
suffix = suffix if suffix else ""
if not prefix:
self.name = mkdtemp(suffix=suffix, dir=dir)
else:
self.name = mkdtemp(suffix, prefix, dir)
self._finalizer = finalize(
self,
self._cleanup,
self.name,
warn_message="Implicitly cleaning up {!r}".format(self),
)
@classmethod
def _rmtree(cls, name):
from .path import rmtree
rmtree(name)
@classmethod
def _cleanup(cls, name, warn_message):
cls._rmtree(name)
warnings.warn(warn_message, ResourceWarning)
def __repr__(self):
return "<{} {!r}>".format(self.__class__.__name__, self.name)
def __enter__(self):
return self
def __exit__(self, exc, value, tb):
self.cleanup()
def cleanup(self):
if self._finalizer.detach():
self._rmtree(self.name)
def is_bytes(string):
"""Check if a string is a bytes instance.
:param Union[str, bytes] string: A string that may be string or bytes like
:return: Whether the provided string is a bytes type or not
:rtype: bool
"""
if six.PY3 and isinstance(string, (bytes, memoryview, bytearray)): # noqa
return True
elif six.PY2 and isinstance(string, (buffer, bytearray)): # noqa
return True
return False
def fs_str(string):
"""Encodes a string into the proper filesystem encoding.
Borrowed from pip-tools
"""
if isinstance(string, str):
return string
assert not isinstance(string, bytes)
return string.encode(_fs_encoding)
def _get_path(path):
"""Fetch the string value from a path-like object.
Returns **None** if there is no string value.
"""
if isinstance(path, (six.string_types, bytes)):
return path
path_type = type(path)
try:
path_repr = path_type.__fspath__(path)
except AttributeError:
return
if isinstance(path_repr, (six.string_types, bytes)):
return path_repr
return
# copied from the os backport which in turn copied this from
# the pyutf8 package --
# URL: https://github.com/etrepum/pyutf8/blob/master/pyutf8/ref.py
#
def _invalid_utf8_indexes(bytes):
skips = []
i = 0
len_bytes = len(bytes)
while i < len_bytes:
c1 = bytes[i]
if c1 < 0x80:
# U+0000 - U+007F - 7 bits
i += 1
continue
try:
c2 = bytes[i + 1]
if (c1 & 0xE0 == 0xC0) and (c2 & 0xC0 == 0x80):
# U+0080 - U+07FF - 11 bits
c = ((c1 & 0x1F) << 6) | (c2 & 0x3F)
if c < 0x80: # pragma: no cover
# Overlong encoding
skips.extend([i, i + 1]) # pragma: no cover
i += 2
continue
c3 = bytes[i + 2]
if (c1 & 0xF0 == 0xE0) and (c2 & 0xC0 == 0x80) and (c3 & 0xC0 == 0x80):
# U+0800 - U+FFFF - 16 bits
c = ((((c1 & 0x0F) << 6) | (c2 & 0x3F)) << 6) | (c3 & 0x3F)
if (c < 0x800) or (0xD800 <= c <= 0xDFFF):
# Overlong encoding or surrogate.
skips.extend([i, i + 1, i + 2])
i += 3
continue
c4 = bytes[i + 3]
if (
(c1 & 0xF8 == 0xF0)
and (c2 & 0xC0 == 0x80)
and (c3 & 0xC0 == 0x80)
and (c4 & 0xC0 == 0x80)
):
# U+10000 - U+10FFFF - 21 bits
c = ((((((c1 & 0x0F) << 6) | (c2 & 0x3F)) << 6) | (c3 & 0x3F)) << 6) | (
c4 & 0x3F
)
if (c < 0x10000) or (c > 0x10FFFF): # pragma: no cover
# Overlong encoding or invalid code point.
skips.extend([i, i + 1, i + 2, i + 3])
i += 4
continue
except IndexError:
pass
skips.append(i)
i += 1
return skips
# XXX backport: Another helper to support the Python 2 UTF-8 decoding hack.
def _chunks(b, indexes):
i = 0
for j in indexes:
yield b[i:j]
yield b[j : j + 1]
i = j + 1
yield b[i:]
def fs_encode(path):
"""Encode a filesystem path to the proper filesystem encoding.
:param Union[str, bytes] path: A string-like path
:returns: A bytes-encoded filesystem path representation
"""
path = _get_path(path)
if path is None:
raise TypeError("expected a valid path to encode")
if isinstance(path, six.text_type):
if six.PY2:
return b"".join(
(
_byte(ord(c) - 0xDC00)
if 0xDC00 <= ord(c) <= 0xDCFF
else c.encode(_fs_encoding, _fs_encode_errors)
)
for c in path
)
return path.encode(_fs_encoding, _fs_encode_errors)
return path
def fs_decode(path):
"""Decode a filesystem path using the proper filesystem encoding.
:param path: The filesystem path to decode from bytes or string
:return: The filesystem path, decoded with the determined encoding
:rtype: Text
"""
path = _get_path(path)
if path is None:
raise TypeError("expected a valid path to decode")
if isinstance(path, six.binary_type):
import array
indexes = _invalid_utf8_indexes(array.array(str("B"), path))
if six.PY2:
return "".join(
chunk.decode(_fs_encoding, _fs_decode_errors)
for chunk in _chunks(path, indexes)
)
if indexes and os.name == "nt":
return path.decode(_fs_encoding, "surrogateescape")
return path.decode(_fs_encoding, _fs_decode_errors)
return path
if sys.version_info[0] < 3: # pragma: no cover
_fs_encode_errors = "surrogatepass" if sys.platform == "win32" else "surrogateescape"
_fs_decode_errors = "surrogateescape"
_fs_encoding = "utf-8"
else: # pragma: no cover
_fs_encoding = "utf-8"
_fs_decode_errors = "surrogateescape"
if sys.platform.startswith("win"):
_fs_error_fn = None
_fs_encode_errors = "surrogatepass"
else:
if sys.version_info >= (3, 3):
_fs_encoding = sys.getfilesystemencoding()
if not _fs_encoding:
_fs_encoding = sys.getdefaultencoding()
alt_strategy = "surrogateescape"
_fs_error_fn = getattr(sys, "getfilesystemencodeerrors", None)
_fs_encode_errors = _fs_error_fn() if _fs_error_fn else alt_strategy
_fs_decode_errors = _fs_error_fn() if _fs_error_fn else _fs_decode_errors
_byte = chr if sys.version_info < (3,) else lambda i: bytes([i])
def to_native_string(string):
from .misc import to_text, to_bytes
if six.PY2:
return to_bytes(string)
return to_text(string)

View File

@@ -0,0 +1,418 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import io
import os
import stat
import sys
from contextlib import closing, contextmanager
import six
from .compat import IS_TYPE_CHECKING, NamedTemporaryFile, Path
from .path import is_file_url, is_valid_url, path_to_url, url_to_path
if IS_TYPE_CHECKING:
from typing import (
Any,
Bytes,
Callable,
ContextManager,
Dict,
IO,
Iterator,
Optional,
Union,
Text,
Tuple,
TypeVar,
)
from types import ModuleType
from requests import Session
from six.moves.http_client import HTTPResponse as Urllib_HTTPResponse
from urllib3.response import HTTPResponse as Urllib3_HTTPResponse
from .spin import VistirSpinner, DummySpinner
TSpinner = Union[VistirSpinner, DummySpinner]
_T = TypeVar("_T")
__all__ = [
"temp_environ",
"temp_path",
"cd",
"atomic_open_for_write",
"open_file",
"spinner",
"dummy_spinner",
"replaced_stream",
"replaced_streams",
]
# Borrowed from Pew.
# See https://github.com/berdario/pew/blob/master/pew/_utils.py#L82
@contextmanager
def temp_environ():
# type: () -> Iterator[None]
"""Allow the ability to set os.environ temporarily"""
environ = dict(os.environ)
try:
yield
finally:
os.environ.clear()
os.environ.update(environ)
@contextmanager
def temp_path():
# type: () -> Iterator[None]
"""A context manager which allows the ability to set sys.path temporarily
>>> path_from_virtualenv = load_path("/path/to/venv/bin/python")
>>> print(sys.path)
[
'/home/user/.pyenv/versions/3.7.0/bin',
'/home/user/.pyenv/versions/3.7.0/lib/python37.zip',
'/home/user/.pyenv/versions/3.7.0/lib/python3.7',
'/home/user/.pyenv/versions/3.7.0/lib/python3.7/lib-dynload',
'/home/user/.pyenv/versions/3.7.0/lib/python3.7/site-packages'
]
>>> with temp_path():
sys.path = path_from_virtualenv
# Running in the context of the path above
run(["pip", "install", "stuff"])
>>> print(sys.path)
[
'/home/user/.pyenv/versions/3.7.0/bin',
'/home/user/.pyenv/versions/3.7.0/lib/python37.zip',
'/home/user/.pyenv/versions/3.7.0/lib/python3.7',
'/home/user/.pyenv/versions/3.7.0/lib/python3.7/lib-dynload',
'/home/user/.pyenv/versions/3.7.0/lib/python3.7/site-packages'
]
"""
path = [p for p in sys.path]
try:
yield
finally:
sys.path = [p for p in path]
@contextmanager
def cd(path):
# type: () -> Iterator[None]
"""Context manager to temporarily change working directories
:param str path: The directory to move into
>>> print(os.path.abspath(os.curdir))
'/home/user/code/myrepo'
>>> with cd("/home/user/code/otherdir/subdir"):
... print("Changed directory: %s" % os.path.abspath(os.curdir))
Changed directory: /home/user/code/otherdir/subdir
>>> print(os.path.abspath(os.curdir))
'/home/user/code/myrepo'
"""
if not path:
return
prev_cwd = Path.cwd().as_posix()
if isinstance(path, Path):
path = path.as_posix()
os.chdir(str(path))
try:
yield
finally:
os.chdir(prev_cwd)
@contextmanager
def dummy_spinner(spin_type, text, **kwargs):
# type: (str, str, Any)
class FakeClass(object):
def __init__(self, text=""):
self.text = text
def fail(self, exitcode=1, text=None):
if text:
print(text)
raise SystemExit(exitcode, text)
def ok(self, text):
print(text)
return 0
def write(self, text):
print(text)
myobj = FakeClass(text)
yield myobj
@contextmanager
def spinner(
spinner_name=None, # type: Optional[str]
start_text=None, # type: Optional[str]
handler_map=None, # type: Optional[Dict[str, Callable]]
nospin=False, # type: bool
write_to_stdout=True, # type: bool
):
# type: (...) -> ContextManager[TSpinner]
"""Get a spinner object or a dummy spinner to wrap a context.
:param str spinner_name: A spinner type e.g. "dots" or "bouncingBar" (default: {"bouncingBar"})
:param str start_text: Text to start off the spinner with (default: {None})
:param dict handler_map: Handler map for signals to be handled gracefully (default: {None})
:param bool nospin: If true, use the dummy spinner (default: {False})
:param bool write_to_stdout: Writes to stdout if true, otherwise writes to stderr (default: True)
:return: A spinner object which can be manipulated while alive
:rtype: :class:`~vistir.spin.VistirSpinner`
Raises:
RuntimeError -- Raised if the spinner extra is not installed
"""
from .spin import create_spinner
has_yaspin = None
try:
import yaspin
except ImportError:
has_yaspin = False
if not nospin:
raise RuntimeError(
"Failed to import spinner! Reinstall vistir with command:"
" pip install --upgrade vistir[spinner]"
)
else:
spinner_name = ""
else:
has_yaspin = True
spinner_name = ""
use_yaspin = (has_yaspin is False) or (nospin is True)
if has_yaspin is None or has_yaspin is True and not nospin:
use_yaspin = True
if start_text is None and use_yaspin is True:
start_text = "Running..."
with create_spinner(
spinner_name=spinner_name,
text=start_text,
handler_map=handler_map,
nospin=nospin,
use_yaspin=use_yaspin,
write_to_stdout=write_to_stdout,
) as _spinner:
yield _spinner
@contextmanager
def atomic_open_for_write(target, binary=False, newline=None, encoding=None):
# type: (str, bool, Optional[str], Optional[str]) -> None
"""Atomically open `target` for writing.
This is based on Lektor's `atomic_open()` utility, but simplified a lot
to handle only writing, and skip many multi-process/thread edge cases
handled by Werkzeug.
:param str target: Target filename to write
:param bool binary: Whether to open in binary mode, default False
:param Optional[str] newline: The newline character to use when writing, determined
from system if not supplied.
:param Optional[str] encoding: The encoding to use when writing, defaults to system
encoding.
How this works:
* Create a temp file (in the same directory of the actual target), and
yield for surrounding code to write to it.
* If some thing goes wrong, try to remove the temp file. The actual target
is not touched whatsoever.
* If everything goes well, close the temp file, and replace the actual
target with this new file.
.. code:: python
>>> fn = "test_file.txt"
>>> def read_test_file(filename=fn):
with open(filename, 'r') as fh:
print(fh.read().strip())
>>> with open(fn, "w") as fh:
fh.write("this is some test text")
>>> read_test_file()
this is some test text
>>> def raise_exception_while_writing(filename):
with open(filename, "w") as fh:
fh.write("writing some new text")
raise RuntimeError("Uh oh, hope your file didn't get overwritten")
>>> raise_exception_while_writing(fn)
Traceback (most recent call last):
...
RuntimeError: Uh oh, hope your file didn't get overwritten
>>> read_test_file()
writing some new text
# Now try with vistir
>>> def raise_exception_while_writing(filename):
with vistir.contextmanagers.atomic_open_for_write(filename) as fh:
fh.write("Overwriting all the text from before with even newer text")
raise RuntimeError("But did it get overwritten now?")
>>> raise_exception_while_writing(fn)
Traceback (most recent call last):
...
RuntimeError: But did it get overwritten now?
>>> read_test_file()
writing some new text
"""
mode = "w+b" if binary else "w"
f = NamedTemporaryFile(
dir=os.path.dirname(target),
prefix=".__atomic-write",
mode=mode,
encoding=encoding,
newline=newline,
delete=False,
)
# set permissions to 0644
try:
os.chmod(f.name, stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
except OSError:
pass
try:
yield f
except BaseException:
f.close()
try:
os.remove(f.name)
except OSError:
pass
raise
else:
f.close()
try:
os.remove(target) # This is needed on Windows.
except OSError:
pass
os.rename(f.name, target) # No os.replace() on Python 2.
@contextmanager
def open_file(
link, # type: Union[_T, str]
session=None, # type: Optional[Session]
stream=True, # type: bool
):
# type: (...) -> ContextManager[Union[IO[bytes], Urllib3_HTTPResponse, Urllib_HTTPResponse]]
"""
Open local or remote file for reading.
:param pip._internal.index.Link link: A link object from resolving dependencies with
pip, or else a URL.
:param Optional[Session] session: A :class:`~requests.Session` instance
:param bool stream: Whether to stream the content if remote, default True
:raises ValueError: If link points to a local directory.
:return: a context manager to the opened file-like object
"""
if not isinstance(link, six.string_types):
try:
link = link.url_without_fragment
except AttributeError:
raise ValueError("Cannot parse url from unkown type: {0!r}".format(link))
if not is_valid_url(link) and os.path.exists(link):
link = path_to_url(link)
if is_file_url(link):
# Local URL
local_path = url_to_path(link)
if os.path.isdir(local_path):
raise ValueError("Cannot open directory for read: {}".format(link))
else:
with io.open(local_path, "rb") as local_file:
yield local_file
else:
# Remote URL
headers = {"Accept-Encoding": "identity"}
if not session:
try:
from requests import Session # noqa
except ImportError:
session = None
else:
session = Session()
if session is None:
with closing(six.moves.urllib.request.urlopen(link)) as f:
yield f
else:
with session.get(link, headers=headers, stream=stream) as resp:
try:
raw = getattr(resp, "raw", None)
result = raw if raw else resp
yield result
finally:
if raw:
conn = raw._connection
if conn is not None:
conn.close()
result.close()
@contextmanager
def replaced_stream(stream_name):
# type: (str) -> Iterator[IO[Text]]
"""
Context manager to temporarily swap out *stream_name* with a stream wrapper.
:param str stream_name: The name of a sys stream to wrap
:returns: A ``StreamWrapper`` replacement, temporarily
>>> orig_stdout = sys.stdout
>>> with replaced_stream("stdout") as stdout:
... sys.stdout.write("hello")
... assert stdout.getvalue() == "hello"
>>> sys.stdout.write("hello")
'hello'
"""
orig_stream = getattr(sys, stream_name)
new_stream = six.StringIO()
try:
setattr(sys, stream_name, new_stream)
yield getattr(sys, stream_name)
finally:
setattr(sys, stream_name, orig_stream)
@contextmanager
def replaced_streams():
# type: () -> Iterator[Tuple[IO[Text], IO[Text]]]
"""
Context manager to replace both ``sys.stdout`` and ``sys.stderr`` using
``replaced_stream``
returns: *(stdout, stderr)*
>>> import sys
>>> with vistir.contextmanagers.replaced_streams() as streams:
>>> stdout, stderr = streams
>>> sys.stderr.write("test")
>>> sys.stdout.write("hello")
>>> assert stdout.getvalue() == "hello"
>>> assert stderr.getvalue() == "test"
>>> stdout.getvalue()
'hello'
>>> stderr.getvalue()
'test'
"""
with replaced_stream("stdout") as stdout:
with replaced_stream("stderr") as stderr:
yield (stdout, stderr)

View File

@@ -0,0 +1,61 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function
import os
import sys
__all__ = ["hide_cursor", "show_cursor", "get_stream_handle"]
def get_stream_handle(stream=sys.stdout):
"""
Get the OS appropriate handle for the corresponding output stream.
:param str stream: The the stream to get the handle for
:return: A handle to the appropriate stream, either a ctypes buffer
or **sys.stdout** or **sys.stderr**.
"""
handle = stream
if os.name == "nt":
from ._winconsole import get_stream_handle as get_win_stream_handle
return get_win_stream_handle(stream)
return handle
def hide_cursor(stream=sys.stdout):
"""
Hide the console cursor on the given stream
:param stream: The name of the stream to get the handle for
:return: None
:rtype: None
"""
handle = get_stream_handle(stream=stream)
if os.name == "nt":
from ._winconsole import hide_cursor
hide_cursor()
else:
handle.write("\033[?25l")
handle.flush()
def show_cursor(stream=sys.stdout):
"""
Show the console cursor on the given stream
:param stream: The name of the stream to get the handle for
:return: None
:rtype: None
"""
handle = get_stream_handle(stream=stream)
if os.name == "nt":
from ._winconsole import show_cursor
show_cursor()
else:
handle.write("\033[?25h")
handle.flush()

View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function
from .compat import IS_TYPE_CHECKING
MYPY_RUNNING = IS_TYPE_CHECKING

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,661 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import atexit
import errno
import functools
import os
import posixpath
import shutil
import stat
import sys
import time
import unicodedata
import warnings
import six
from six.moves import urllib_parse
from six.moves.urllib import request as urllib_request
from .backports.tempfile import _TemporaryFileWrapper
from .compat import (
IS_TYPE_CHECKING,
FileNotFoundError,
Path,
PermissionError,
ResourceWarning,
TemporaryDirectory,
_fs_encoding,
_NamedTemporaryFile,
finalize,
fs_decode,
fs_encode,
)
# fmt: off
if six.PY3:
from urllib.parse import quote_from_bytes as quote
else:
from urllib import quote
# fmt: on
if IS_TYPE_CHECKING:
from types import TracebackType
from typing import (
Any,
AnyStr,
ByteString,
Callable,
Generator,
Iterator,
List,
Optional,
Text,
Tuple,
Type,
Union,
)
if six.PY3:
TPath = os.PathLike
else:
TPath = Union[str, bytes]
TFunc = Callable[..., Any]
__all__ = [
"check_for_unc_path",
"get_converted_relative_path",
"handle_remove_readonly",
"normalize_path",
"is_in_path",
"is_file_url",
"is_readonly_path",
"is_valid_url",
"mkdir_p",
"ensure_mkdir_p",
"create_tracked_tempdir",
"create_tracked_tempfile",
"path_to_url",
"rmtree",
"safe_expandvars",
"set_write_bit",
"url_to_path",
"walk_up",
]
if os.name == "nt":
warnings.filterwarnings(
"ignore",
category=DeprecationWarning,
message="The Windows bytes API has been deprecated.*",
)
def unicode_path(path):
# type: (TPath) -> Text
# Paths are supposed to be represented as unicode here
if six.PY2 and isinstance(path, six.binary_type):
return path.decode(_fs_encoding)
return path
def native_path(path):
# type: (TPath) -> str
if six.PY2 and isinstance(path, six.text_type):
return path.encode(_fs_encoding)
return str(path)
# once again thank you django...
# https://github.com/django/django/blob/fc6b90b/django/utils/_os.py
if six.PY3 or os.name == "nt":
abspathu = os.path.abspath
else:
def abspathu(path):
# type: (TPath) -> Text
"""Version of os.path.abspath that uses the unicode representation of
the current working directory, thus avoiding a UnicodeDecodeError in
join when the cwd has non-ASCII characters."""
if not os.path.isabs(path):
path = os.path.join(os.getcwdu(), path)
return os.path.normpath(path)
def normalize_path(path):
# type: (TPath) -> Text
"""Return a case-normalized absolute variable-expanded path.
:param str path: The non-normalized path
:return: A normalized, expanded, case-normalized path
:rtype: str
"""
path = os.path.abspath(os.path.expandvars(os.path.expanduser(str(path))))
if os.name == "nt" and os.path.exists(path):
from ._winconsole import get_long_path
path = get_long_path(path)
return os.path.normpath(os.path.normcase(path))
def is_in_path(path, parent):
# type: (TPath, TPath) -> bool
"""Determine if the provided full path is in the given parent root.
:param str path: The full path to check the location of.
:param str parent: The parent path to check for membership in
:return: Whether the full path is a member of the provided parent.
:rtype: bool
"""
return normalize_path(path).startswith(normalize_path(parent))
def normalize_drive(path):
# type: (TPath) -> Text
"""Normalize drive in path so they stay consistent.
This currently only affects local drives on Windows, which can be
identified with either upper or lower cased drive names. The case is
always converted to uppercase because it seems to be preferred.
"""
from .misc import to_text
if os.name != "nt" or not (
isinstance(path, six.string_types) or getattr(path, "__fspath__", None)
):
return path # type: ignore
drive, tail = os.path.splitdrive(path)
# Only match (lower cased) local drives (e.g. 'c:'), not UNC mounts.
if drive.islower() and len(drive) == 2 and drive[1] == ":":
return "{}{}".format(drive.upper(), tail)
return to_text(path, encoding="utf-8")
def path_to_url(path):
# type: (TPath) -> Text
"""Convert the supplied local path to a file uri.
:param str path: A string pointing to or representing a local path
:return: A `file://` uri for the same location
:rtype: str
>>> path_to_url("/home/user/code/myrepo/myfile.zip")
'file:///home/user/code/myrepo/myfile.zip'
"""
from .misc import to_bytes
if not path:
return path # type: ignore
normalized_path = Path(normalize_drive(os.path.abspath(path))).as_posix()
if os.name == "nt" and normalized_path[1] == ":":
drive, _, path = normalized_path.partition(":")
# XXX: This enables us to handle half-surrogates that were never
# XXX: actually part of a surrogate pair, but were just incidentally
# XXX: passed in as a piece of a filename
quoted_path = quote(fs_encode(path))
return fs_decode("file:///{}:{}".format(drive, quoted_path))
# XXX: This is also here to help deal with incidental dangling surrogates
# XXX: on linux, by making sure they are preserved during encoding so that
# XXX: we can urlencode the backslash correctly
bytes_path = to_bytes(normalized_path, errors="backslashreplace")
return fs_decode("file://{}".format(quote(bytes_path)))
def url_to_path(url):
# type: (str) -> str
"""Convert a valid file url to a local filesystem path.
Follows logic taken from pip's equivalent function
"""
assert is_file_url(url), "Only file: urls can be converted to local paths"
_, netloc, path, _, _ = urllib_parse.urlsplit(url)
# Netlocs are UNC paths
if netloc:
netloc = "\\\\" + netloc
path = urllib_request.url2pathname(netloc + path)
return urllib_parse.unquote(path)
def is_valid_url(url):
# type: (Union[str, bytes]) -> bool
"""Checks if a given string is an url."""
from .misc import to_text
if not url:
return url # type: ignore
pieces = urllib_parse.urlparse(to_text(url))
return all([pieces.scheme, pieces.netloc])
def is_file_url(url):
# type: (Any) -> bool
"""Returns true if the given url is a file url."""
from .misc import to_text
if not url:
return False
if not isinstance(url, six.string_types):
try:
url = url.url
except AttributeError:
raise ValueError("Cannot parse url from unknown type: {!r}".format(url))
url = to_text(url, encoding="utf-8")
return urllib_parse.urlparse(url.lower()).scheme == "file"
def is_readonly_path(fn):
# type: (TPath) -> bool
"""Check if a provided path exists and is readonly.
Permissions check is `bool(path.stat & stat.S_IREAD)` or `not
os.access(path, os.W_OK)`
"""
fn = fs_decode(fs_encode(fn))
if os.path.exists(fn):
file_stat = os.stat(fn).st_mode
return not bool(file_stat & stat.S_IWRITE) or not os.access(fn, os.W_OK)
return False
def mkdir_p(newdir, mode=0o777):
# type: (TPath, int) -> None
"""Recursively creates the target directory and all of its parents if they
do not already exist. Fails silently if they do.
:param str newdir: The directory path to ensure
:raises: OSError if a file is encountered along the way
"""
newdir = fs_decode(fs_encode(newdir))
if os.path.exists(newdir):
if not os.path.isdir(newdir):
raise OSError(
"a file with the same name as the desired dir, '{}', already exists.".format(
fs_decode(newdir)
)
)
return None
os.makedirs(newdir, mode)
def ensure_mkdir_p(mode=0o777):
# type: (int) -> Callable[Callable[..., Any], Callable[..., Any]]
"""Decorator to ensure `mkdir_p` is called to the function's return
value."""
def decorator(f):
# type: (Callable[..., Any]) -> Callable[..., Any]
@functools.wraps(f)
def decorated(*args, **kwargs):
# type: () -> str
path = f(*args, **kwargs)
mkdir_p(path, mode=mode)
return path
return decorated
return decorator
TRACKED_TEMPORARY_DIRECTORIES = []
def create_tracked_tempdir(*args, **kwargs):
# type: (Any, Any) -> str
"""Create a tracked temporary directory.
This uses `TemporaryDirectory`, but does not remove the directory when
the return value goes out of scope, instead registers a handler to cleanup
on program exit.
The return value is the path to the created directory.
"""
tempdir = TemporaryDirectory(*args, **kwargs)
TRACKED_TEMPORARY_DIRECTORIES.append(tempdir)
atexit.register(tempdir.cleanup)
warnings.simplefilter("ignore", ResourceWarning)
return tempdir.name
def create_tracked_tempfile(*args, **kwargs):
# type: (Any, Any) -> str
"""Create a tracked temporary file.
This uses the `NamedTemporaryFile` construct, but does not remove the file
until the interpreter exits.
The return value is the file object.
"""
kwargs["wrapper_class_override"] = _TrackedTempfileWrapper
return _NamedTemporaryFile(*args, **kwargs)
def _find_icacls_exe():
# type: () -> Optional[Text]
if os.name == "nt":
paths = [
os.path.expandvars(r"%windir%\{0}").format(subdir)
for subdir in ("system32", "SysWOW64")
]
for path in paths:
icacls_path = next(
iter(fn for fn in os.listdir(path) if fn.lower() == "icacls.exe"), None
)
if icacls_path is not None:
icacls_path = os.path.join(path, icacls_path)
return icacls_path
return None
def set_write_bit(fn):
# type: (str) -> None
"""Set read-write permissions for the current user on the target path. Fail
silently if the path doesn't exist.
:param str fn: The target filename or path
:return: None
"""
fn = fs_decode(fs_encode(fn))
if not os.path.exists(fn):
return
file_stat = os.stat(fn).st_mode
os.chmod(fn, file_stat | stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
if os.name == "nt":
from ._winconsole import get_current_user
user_sid = get_current_user()
icacls_exe = _find_icacls_exe() or "icacls"
from .misc import run
if user_sid:
c = run(
[
icacls_exe,
"''{}''".format(fn),
"/grant",
"{}:WD".format(user_sid),
"/T",
"/C",
"/Q",
],
nospin=True,
return_object=True,
)
if not c.err and c.returncode == 0:
return
if not os.path.isdir(fn):
for path in [fn, os.path.dirname(fn)]:
try:
os.chflags(path, 0)
except AttributeError:
pass
return None
for root, dirs, files in os.walk(fn, topdown=False):
for dir_ in [os.path.join(root, d) for d in dirs]:
set_write_bit(dir_)
for file_ in [os.path.join(root, f) for f in files]:
set_write_bit(file_)
def rmtree(directory, ignore_errors=False, onerror=None):
# type: (str, bool, Optional[Callable]) -> None
"""Stand-in for :func:`~shutil.rmtree` with additional error-handling.
This version of `rmtree` handles read-only paths, especially in the case of index
files written by certain source control systems.
:param str directory: The target directory to remove
:param bool ignore_errors: Whether to ignore errors, defaults to False
:param func onerror: An error handling function, defaults to :func:`handle_remove_readonly`
.. note::
Setting `ignore_errors=True` may cause this to silently fail to delete the path
"""
directory = fs_decode(fs_encode(directory))
if onerror is None:
onerror = handle_remove_readonly
try:
shutil.rmtree(directory, ignore_errors=ignore_errors, onerror=onerror)
except (IOError, OSError, FileNotFoundError, PermissionError) as exc: # noqa:B014
# Ignore removal failures where the file doesn't exist
if exc.errno != errno.ENOENT:
raise
def _wait_for_files(path): # pragma: no cover
# type: (Union[str, TPath]) -> Optional[List[TPath]]
"""Retry with backoff up to 1 second to delete files from a directory.
:param str path: The path to crawl to delete files from
:return: A list of remaining paths or None
:rtype: Optional[List[str]]
"""
timeout = 0.001
remaining = []
while timeout < 1.0:
remaining = []
if os.path.isdir(path):
L = os.listdir(path)
for target in L:
_remaining = _wait_for_files(target)
if _remaining:
remaining.extend(_remaining)
continue
try:
os.unlink(path)
except FileNotFoundError as e:
if e.errno == errno.ENOENT:
return
except (OSError, IOError, PermissionError): # noqa:B014
time.sleep(timeout)
timeout *= 2
remaining.append(path)
else:
return
return remaining
def handle_remove_readonly(func, path, exc):
# type: (Callable[..., str], TPath, Tuple[Type[OSError], OSError, TracebackType]) -> None
"""Error handler for shutil.rmtree.
Windows source repo folders are read-only by default, so this error handler
attempts to set them as writeable and then proceed with deletion.
:param function func: The caller function
:param str path: The target path for removal
:param Exception exc: The raised exception
This function will call check :func:`is_readonly_path` before attempting to call
:func:`set_write_bit` on the target path and try again.
"""
# Check for read-only attribute
from .compat import ResourceWarning, FileNotFoundError, PermissionError
PERM_ERRORS = (errno.EACCES, errno.EPERM, errno.ENOENT)
default_warning_message = "Unable to remove file due to permissions restriction: {!r}"
# split the initial exception out into its type, exception, and traceback
exc_type, exc_exception, exc_tb = exc
if is_readonly_path(path):
# Apply write permission and call original function
set_write_bit(path)
try:
func(path)
except ( # noqa:B014
OSError,
IOError,
FileNotFoundError,
PermissionError,
) as e: # pragma: no cover
if e.errno in PERM_ERRORS:
if e.errno == errno.ENOENT:
return
remaining = None
if os.path.isdir(path):
remaining = _wait_for_files(path)
if remaining:
warnings.warn(default_warning_message.format(path), ResourceWarning)
else:
func(path, ignore_errors=True)
return
if exc_exception.errno in PERM_ERRORS:
set_write_bit(path)
remaining = _wait_for_files(path)
try:
func(path)
except (OSError, IOError, FileNotFoundError, PermissionError) as e: # noqa:B014
if e.errno in PERM_ERRORS:
if e.errno != errno.ENOENT: # File still exists
warnings.warn(default_warning_message.format(path), ResourceWarning)
return
else:
raise exc_exception
def walk_up(bottom):
# type: (Union[TPath, str]) -> Generator[Tuple[str, List[str], List[str]], None, None]
"""Mimic os.walk, but walk 'up' instead of down the directory tree.
From: https://gist.github.com/zdavkeos/1098474
"""
bottom = os.path.realpath(str(bottom))
# Get files in current dir.
try:
names = os.listdir(bottom)
except Exception:
return
dirs, nondirs = [], []
for name in names:
if os.path.isdir(os.path.join(bottom, name)):
dirs.append(name)
else:
nondirs.append(name)
yield bottom, dirs, nondirs
new_path = os.path.realpath(os.path.join(bottom, ".."))
# See if we are at the top.
if new_path == bottom:
return
for x in walk_up(new_path):
yield x
def check_for_unc_path(path):
# type: (Path) -> bool
"""Checks to see if a pathlib `Path` object is a unc path or not."""
if (
os.name == "nt"
and len(path.drive) > 2
and not path.drive[0].isalpha()
and path.drive[1] != ":"
):
return True
else:
return False
def get_converted_relative_path(path, relative_to=None):
# type: (TPath, Optional[TPath]) -> str
"""Convert `path` to be relative.
Given a vague relative path, return the path relative to the given
location.
:param str path: The location of a target path
:param str relative_to: The starting path to build against, optional
:returns: A relative posix-style path with a leading `./`
This performs additional conversion to ensure the result is of POSIX form,
and starts with `./`, or is precisely `.`.
>>> os.chdir('/home/user/code/myrepo/myfolder')
>>> vistir.path.get_converted_relative_path('/home/user/code/file.zip')
'./../../file.zip'
>>> vistir.path.get_converted_relative_path('/home/user/code/myrepo/myfolder/mysubfolder')
'./mysubfolder'
>>> vistir.path.get_converted_relative_path('/home/user/code/myrepo/myfolder')
'.'
"""
from .misc import to_text, to_bytes # noqa
if not relative_to:
relative_to = os.getcwdu() if six.PY2 else os.getcwd()
if six.PY2:
path = to_bytes(path, encoding="utf-8")
else:
path = to_text(path, encoding="utf-8")
relative_to = to_text(relative_to, encoding="utf-8")
start_path = Path(relative_to)
try:
start = start_path.resolve()
except OSError:
start = start_path.absolute()
# check if there is a drive letter or mount point
# if it is a mountpoint use the original absolute path
# instead of the unc path
if check_for_unc_path(start):
start = start_path.absolute()
path = start.joinpath(path).relative_to(start)
# check and see if the path that was passed into the function is a UNC path
# and raise value error if it is not.
if check_for_unc_path(path):
raise ValueError("The path argument does not currently accept UNC paths")
relpath_s = to_text(posixpath.normpath(path.as_posix()))
if not (relpath_s == "." or relpath_s.startswith("./")):
relpath_s = posixpath.join(".", relpath_s)
return relpath_s
def safe_expandvars(value):
# type: (TPath) -> str
"""Call os.path.expandvars if value is a string, otherwise do nothing."""
if isinstance(value, six.string_types):
return os.path.expandvars(value)
return value # type: ignore
class _TrackedTempfileWrapper(_TemporaryFileWrapper, object):
def __init__(self, *args, **kwargs):
super(_TrackedTempfileWrapper, self).__init__(*args, **kwargs)
self._finalizer = finalize(self, self.cleanup)
@classmethod
def _cleanup(cls, fileobj):
try:
fileobj.close()
finally:
os.unlink(fileobj.name)
def cleanup(self):
if self._finalizer.detach():
try:
self.close()
finally:
os.unlink(self.name)
else:
try:
self.close()
except OSError:
pass

View File

@@ -0,0 +1,493 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function
import functools
import os
import signal
import sys
import threading
import time
from io import StringIO
import colorama
import six
from .compat import IS_TYPE_CHECKING, to_native_string
from .cursor import hide_cursor, show_cursor
from .misc import decode_for_output, to_text
from .termcolors import COLOR_MAP, COLORS, DISABLE_COLORS, colored
if IS_TYPE_CHECKING:
from typing import (
Any,
Callable,
ContextManager,
Dict,
IO,
Optional,
Text,
Type,
TypeVar,
Union,
)
TSignalMap = Dict[
Type[signal.SIGINT],
Callable[..., int, str, Union["DummySpinner", "VistirSpinner"]],
]
_T = TypeVar("_T", covariant=True)
try:
import yaspin
except ImportError: # pragma: no cover
yaspin = None
Spinners = None
SpinBase = None
else: # pragma: no cover
import yaspin.spinners
import yaspin.core
Spinners = yaspin.spinners.Spinners
SpinBase = yaspin.core.Yaspin
if os.name == "nt": # pragma: no cover
def handler(signum, frame, spinner):
"""Signal handler, used to gracefully shut down the ``spinner`` instance
when specified signal is received by the process running the ``spinner``.
``signum`` and ``frame`` are mandatory arguments. Check ``signal.signal``
function for more details.
"""
spinner.fail()
spinner.stop()
else: # pragma: no cover
def handler(signum, frame, spinner):
"""Signal handler, used to gracefully shut down the ``spinner`` instance
when specified signal is received by the process running the ``spinner``.
``signum`` and ``frame`` are mandatory arguments. Check ``signal.signal``
function for more details.
"""
spinner.red.fail("")
spinner.stop()
CLEAR_LINE = chr(27) + "[K"
TRANSLATION_MAP = {10004: u"OK", 10008: u"x"}
decode_output = functools.partial(decode_for_output, translation_map=TRANSLATION_MAP)
class DummySpinner(object):
def __init__(self, text="", **kwargs):
# type: (str, Any) -> None
if DISABLE_COLORS:
colorama.init()
self.text = to_native_string(decode_output(text)) if text else ""
self.stdout = kwargs.get("stdout", sys.stdout)
self.stderr = kwargs.get("stderr", sys.stderr)
self.out_buff = StringIO()
self.write_to_stdout = kwargs.get("write_to_stdout", False)
super(DummySpinner, self).__init__()
def __enter__(self):
if self.text and self.text != "None":
if self.write_to_stdout:
self.write(self.text)
return self
def __exit__(self, exc_type, exc_val, tb):
if exc_type:
import traceback
formatted_tb = traceback.format_exception(exc_type, exc_val, tb)
self.write_err("".join(formatted_tb))
self._close_output_buffer()
return False
def __getattr__(self, k): # pragma: no cover
try:
retval = super(DummySpinner, self).__getattribute__(k)
except AttributeError:
if k in COLOR_MAP.keys() or k.upper() in COLORS:
return self
raise
else:
return retval
def _close_output_buffer(self):
if self.out_buff and not self.out_buff.closed:
try:
self.out_buff.close()
except Exception:
pass
def fail(self, exitcode=1, text="FAIL"):
# type: (int, str) -> None
if text is not None and text != "None":
if self.write_to_stdout:
self.write(text)
else:
self.write_err(text)
self._close_output_buffer()
def ok(self, text="OK"):
# type: (str) -> int
if text is not None and text != "None":
if self.write_to_stdout:
self.write(text)
else:
self.write_err(text)
self._close_output_buffer()
return 0
def hide_and_write(self, text, target=None):
# type: (str, Optional[str]) -> None
if not target:
target = self.stdout
if text is None or isinstance(text, six.string_types) and text == "None":
pass
target.write(decode_output(u"\r", target_stream=target))
self._hide_cursor(target=target)
target.write(decode_output(u"{0}\n".format(text), target_stream=target))
target.write(CLEAR_LINE)
self._show_cursor(target=target)
def write(self, text=None):
# type: (Optional[str]) -> None
if not self.write_to_stdout:
return self.write_err(text)
if text is None or isinstance(text, six.string_types) and text == "None":
pass
if not self.stdout.closed:
stdout = self.stdout
else:
stdout = sys.stdout
stdout.write(decode_output(u"\r", target_stream=stdout))
text = to_text(text)
line = decode_output(u"{0}\n".format(text), target_stream=stdout)
stdout.write(line)
stdout.write(CLEAR_LINE)
def write_err(self, text=None):
# type: (Optional[str]) -> None
if text is None or isinstance(text, six.string_types) and text == "None":
pass
text = to_text(text)
if not self.stderr.closed:
stderr = self.stderr
else:
if sys.stderr.closed:
print(text)
return
stderr = sys.stderr
stderr.write(decode_output(u"\r", target_stream=stderr))
line = decode_output(u"{0}\n".format(text), target_stream=stderr)
stderr.write(line)
stderr.write(CLEAR_LINE)
@staticmethod
def _hide_cursor(target=None):
# type: (Optional[IO]) -> None
pass
@staticmethod
def _show_cursor(target=None):
# type: (Optional[IO]) -> None
pass
if SpinBase is None:
SpinBase = DummySpinner
class VistirSpinner(SpinBase):
"A spinner class for handling spinners on windows and posix."
def __init__(self, *args, **kwargs):
# type: (Any, Any)
"""
Get a spinner object or a dummy spinner to wrap a context.
Keyword Arguments:
:param str spinner_name: A spinner type e.g. "dots" or "bouncingBar" (default: {"bouncingBar"})
:param str start_text: Text to start off the spinner with (default: {None})
:param dict handler_map: Handler map for signals to be handled gracefully (default: {None})
:param bool nospin: If true, use the dummy spinner (default: {False})
:param bool write_to_stdout: Writes to stdout if true, otherwise writes to stderr (default: True)
"""
self.handler = handler
colorama.init()
sigmap = {} # type: TSignalMap
if handler:
sigmap.update({signal.SIGINT: handler, signal.SIGTERM: handler})
handler_map = kwargs.pop("handler_map", {})
if os.name == "nt":
sigmap[signal.SIGBREAK] = handler
else:
sigmap[signal.SIGALRM] = handler
if handler_map:
sigmap.update(handler_map)
spinner_name = kwargs.pop("spinner_name", "bouncingBar")
start_text = kwargs.pop("start_text", None)
_text = kwargs.pop("text", "Running...")
kwargs["text"] = start_text if start_text is not None else _text
kwargs["sigmap"] = sigmap
kwargs["spinner"] = getattr(Spinners, spinner_name, "")
write_to_stdout = kwargs.pop("write_to_stdout", True)
self.stdout = kwargs.pop("stdout", sys.stdout)
self.stderr = kwargs.pop("stderr", sys.stderr)
self.out_buff = StringIO()
self.write_to_stdout = write_to_stdout
self.is_dummy = bool(yaspin is None)
self._stop_spin = None # type: Optional[threading.Event]
self._hide_spin = None # type: Optional[threading.Event]
self._spin_thread = None # type: Optional[threading.Thread]
super(VistirSpinner, self).__init__(*args, **kwargs)
if DISABLE_COLORS:
colorama.deinit()
def ok(self, text=u"OK", err=False):
# type: (str, bool) -> None
"""Set Ok (success) finalizer to a spinner."""
# Do not display spin text for ok state
self._text = None
_text = to_text(text) if text else u"OK"
err = err or not self.write_to_stdout
self._freeze(_text, err=err)
def fail(self, text=u"FAIL", err=False):
# type: (str, bool) -> None
"""Set fail finalizer to a spinner."""
# Do not display spin text for fail state
self._text = None
_text = text if text else u"FAIL"
err = err or not self.write_to_stdout
self._freeze(_text, err=err)
def hide_and_write(self, text, target=None):
# type: (str, Optional[str]) -> None
if not target:
target = self.stdout
if text is None or isinstance(text, six.string_types) and text == u"None":
pass
target.write(decode_output(u"\r"))
self._hide_cursor(target=target)
target.write(decode_output(u"{0}\n".format(text)))
target.write(CLEAR_LINE)
self._show_cursor(target=target)
def write(self, text): # pragma: no cover
# type: (str) -> None
if not self.write_to_stdout:
return self.write_err(text)
stdout = self.stdout
if self.stdout.closed:
stdout = sys.stdout
stdout.write(decode_output(u"\r", target_stream=stdout))
stdout.write(decode_output(CLEAR_LINE, target_stream=stdout))
if text is None:
text = ""
text = decode_output(u"{0}\n".format(text), target_stream=stdout)
stdout.write(text)
self.out_buff.write(text)
def write_err(self, text): # pragma: no cover
# type: (str) -> None
"""Write error text in the terminal without breaking the spinner."""
stderr = self.stderr
if self.stderr.closed:
stderr = sys.stderr
stderr.write(decode_output(u"\r", target_stream=stderr))
stderr.write(decode_output(CLEAR_LINE, target_stream=stderr))
if text is None:
text = ""
text = decode_output(u"{0}\n".format(text), target_stream=stderr)
self.stderr.write(text)
self.out_buff.write(decode_output(text, target_stream=self.out_buff))
def start(self):
# type: () -> None
if self._sigmap:
self._register_signal_handlers()
target = self.stdout if self.write_to_stdout else self.stderr
if target.isatty():
self._hide_cursor(target=target)
self._stop_spin = threading.Event()
self._hide_spin = threading.Event()
self._spin_thread = threading.Thread(target=self._spin)
self._spin_thread.start()
def stop(self):
# type: () -> None
if self._dfl_sigmap:
# Reset registered signal handlers to default ones
self._reset_signal_handlers()
if self._spin_thread:
self._stop_spin.set()
self._spin_thread.join()
target = self.stdout if self.write_to_stdout else self.stderr
if target.isatty():
target.write("\r")
if self.write_to_stdout:
self._clear_line()
else:
self._clear_err()
if target.isatty():
self._show_cursor(target=target)
self.out_buff.close()
def _freeze(self, final_text, err=False):
# type: (str, bool) -> None
"""Stop spinner, compose last frame and 'freeze' it."""
if not final_text:
final_text = ""
target = self.stderr if err else self.stdout
if target.closed:
target = sys.stderr if err else sys.stdout
text = to_text(final_text)
last_frame = self._compose_out(text, mode="last")
self._last_frame = decode_output(last_frame, target_stream=target)
# Should be stopped here, otherwise prints after
# self._freeze call will mess up the spinner
self.stop()
target.write(self._last_frame)
def _compose_color_func(self):
# type: () -> Callable[..., str]
fn = functools.partial(
colored, color=self._color, on_color=self._on_color, attrs=list(self._attrs)
)
return fn
def _compose_out(self, frame, mode=None):
# type: (str, Optional[str]) -> Text
# Ensure Unicode input
frame = to_text(frame)
if self._text is None:
self._text = u""
text = to_text(self._text)
if self._color_func is not None:
frame = self._color_func(frame)
if self._side == "right":
frame, text = text, frame
# Mode
frame = to_text(frame)
if not mode:
out = u"\r{0} {1}".format(frame, text)
else:
out = u"{0} {1}\n".format(frame, text)
return out
def _spin(self):
# type: () -> None
target = self.stdout if self.write_to_stdout else self.stderr
clear_fn = self._clear_line if self.write_to_stdout else self._clear_err
while not self._stop_spin.is_set():
if self._hide_spin.is_set():
# Wait a bit to avoid wasting cycles
time.sleep(self._interval)
continue
# Compose output
spin_phase = next(self._cycle)
out = self._compose_out(spin_phase)
out = decode_output(out, target)
# Write
target.write(out)
clear_fn()
target.flush()
# Wait
time.sleep(self._interval)
target.write("\b")
def _register_signal_handlers(self):
# type: () -> None
# SIGKILL cannot be caught or ignored, and the receiving
# process cannot perform any clean-up upon receiving this
# signal.
try:
if signal.SIGKILL in self._sigmap.keys():
raise ValueError(
"Trying to set handler for SIGKILL signal. "
"SIGKILL cannot be cought or ignored in POSIX systems."
)
except AttributeError:
pass
for sig, sig_handler in self._sigmap.items():
# A handler for a particular signal, once set, remains
# installed until it is explicitly reset. Store default
# signal handlers for subsequent reset at cleanup phase.
dfl_handler = signal.getsignal(sig)
self._dfl_sigmap[sig] = dfl_handler
# ``signal.SIG_DFL`` and ``signal.SIG_IGN`` are also valid
# signal handlers and are not callables.
if callable(sig_handler):
# ``signal.signal`` accepts handler function which is
# called with two arguments: signal number and the
# interrupted stack frame. ``functools.partial`` solves
# the problem of passing spinner instance into the handler
# function.
sig_handler = functools.partial(sig_handler, spinner=self)
signal.signal(sig, sig_handler)
def _reset_signal_handlers(self):
# type: () -> None
for sig, sig_handler in self._dfl_sigmap.items():
signal.signal(sig, sig_handler)
@staticmethod
def _hide_cursor(target=None):
# type: (Optional[IO]) -> None
if not target:
target = sys.stdout
hide_cursor(stream=target)
@staticmethod
def _show_cursor(target=None):
# type: (Optional[IO]) -> None
if not target:
target = sys.stdout
show_cursor(stream=target)
@staticmethod
def _clear_err():
# type: () -> None
sys.stderr.write(CLEAR_LINE)
@staticmethod
def _clear_line():
# type: () -> None
sys.stdout.write(CLEAR_LINE)
def create_spinner(*args, **kwargs):
# type: (Any, Any) -> Union[DummySpinner, VistirSpinner]
nospin = kwargs.pop("nospin", False)
use_yaspin = kwargs.pop("use_yaspin", not nospin)
if nospin or not use_yaspin:
return DummySpinner(*args, **kwargs)
return VistirSpinner(*args, **kwargs)

View File

@@ -0,0 +1,126 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import os
import re
import colorama
import six
from .compat import to_native_string
DISABLE_COLORS = os.getenv("CI", False) or os.getenv(
"ANSI_COLORS_DISABLED", os.getenv("VISTIR_DISABLE_COLORS", False)
)
ATTRIBUTE_NAMES = ["bold", "dark", "", "underline", "blink", "", "reverse", "concealed"]
ATTRIBUTES = dict(zip(ATTRIBUTE_NAMES, range(1, 9)))
del ATTRIBUTES[""]
colors = ["grey", "red", "green", "yellow", "blue", "magenta", "cyan", "white"]
COLORS = dict(zip(colors, range(30, 38)))
HIGHLIGHTS = dict(zip(["on_{0}".format(c) for c in colors], range(40, 48)))
ANSI_REMOVAL_RE = re.compile(r"\033\[((?:\d|;)*)([a-zA-Z])")
COLOR_MAP = {
# name: type
"blink": "attrs",
"bold": "attrs",
"concealed": "attrs",
"dark": "attrs",
"reverse": "attrs",
"underline": "attrs",
"blue": "color",
"cyan": "color",
"green": "color",
"magenta": "color",
"red": "color",
"white": "color",
"yellow": "color",
"on_blue": "on_color",
"on_cyan": "on_color",
"on_green": "on_color",
"on_grey": "on_color",
"on_magenta": "on_color",
"on_red": "on_color",
"on_white": "on_color",
"on_yellow": "on_color",
}
COLOR_ATTRS = COLOR_MAP.keys()
RESET = colorama.Style.RESET_ALL
def colored(text, color=None, on_color=None, attrs=None):
"""Colorize text using a reimplementation of the colorizer from
https://github.com/pavdmyt/yaspin so that it works on windows.
Available text colors:
red, green, yellow, blue, magenta, cyan, white.
Available text highlights:
on_red, on_green, on_yellow, on_blue, on_magenta, on_cyan, on_white.
Available attributes:
bold, dark, underline, blink, reverse, concealed.
Example:
colored('Hello, World!', 'red', 'on_grey', ['blue', 'blink'])
colored('Hello, World!', 'green')
"""
return colorize(text, fg=color, bg=on_color, attrs=attrs)
def colorize(text, fg=None, bg=None, attrs=None):
if os.getenv("ANSI_COLORS_DISABLED") is None:
style = "NORMAL"
if attrs is not None and not isinstance(attrs, list):
_attrs = []
if isinstance(attrs, six.string_types):
_attrs.append(attrs)
else:
_attrs = list(attrs)
attrs = _attrs
if attrs and "bold" in attrs:
style = "BRIGHT"
attrs.remove("bold")
if fg is not None:
fg = fg.upper()
text = to_native_string("%s%s%s%s%s") % (
to_native_string(getattr(colorama.Fore, fg)),
to_native_string(getattr(colorama.Style, style)),
to_native_string(text),
to_native_string(colorama.Fore.RESET),
to_native_string(colorama.Style.NORMAL),
)
if bg is not None:
bg = bg.upper()
text = to_native_string("%s%s%s%s") % (
to_native_string(getattr(colorama.Back, bg)),
to_native_string(text),
to_native_string(colorama.Back.RESET),
to_native_string(colorama.Style.NORMAL),
)
if attrs is not None:
fmt_str = to_native_string("%s[%%dm%%s%s[9m") % (chr(27), chr(27))
for attr in attrs:
text = fmt_str % (ATTRIBUTES[attr], text)
text += RESET
else:
text = ANSI_REMOVAL_RE.sub("", text)
return text
def cprint(text, color=None, on_color=None, attrs=None, **kwargs):
"""Print colorize text.
It accepts arguments of print function.
"""
print((colored(text, color, on_color, attrs)), **kwargs)