| # mypy: allow-untyped-defs |
| |
| import errno |
| import logging |
| import os |
| import shutil |
| import stat |
| import subprocess |
| import sys |
| import tarfile |
| import time |
| import zipfile |
| from io import BytesIO |
| from socket import error as SocketError # NOQA: N812 |
| from urllib.request import urlopen |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| def call(*args): |
| """Log terminal command, invoke it as a subprocess. |
| |
| Returns a bytestring of the subprocess output if no error. |
| """ |
| logger.debug(" ".join(args)) |
| try: |
| return subprocess.check_output(args).decode('utf8') |
| except subprocess.CalledProcessError as e: |
| logger.critical("%s exited with return code %i" % |
| (e.cmd, e.returncode)) |
| logger.critical(e.output) |
| raise |
| |
| |
| def seekable(fileobj): |
| """Attempt to use file.seek on given file, with fallbacks.""" |
| try: |
| fileobj.seek(fileobj.tell()) |
| except Exception: |
| return BytesIO(fileobj.read()) |
| else: |
| return fileobj |
| |
| |
| def untar(fileobj, dest="."): |
| """Extract tar archive.""" |
| logger.debug("untar") |
| fileobj = seekable(fileobj) |
| kwargs = {} |
| if sys.version_info.major >= 3 and sys.version_info.minor >= 12: |
| kwargs["filter"] = "tar" |
| with tarfile.open(fileobj=fileobj) as tar_data: |
| tar_data.extractall(path=dest, **kwargs) |
| |
| |
| def unzip(fileobj, dest=None, limit=None): |
| """Extract zip archive.""" |
| logger.debug("unzip") |
| fileobj = seekable(fileobj) |
| with zipfile.ZipFile(fileobj) as zip_data: |
| for info in zip_data.infolist(): |
| if limit is not None and info.filename not in limit: |
| continue |
| # external_attr has a size of 4 bytes and the info it contains depends on the system where the ZIP file was created. |
| # - If the Zipfile was created on an UNIX environment, then the 2 highest bytes represent UNIX permissions and file |
| # type bits (sys/stat.h st_mode entry on struct stat) and the lowest byte represents DOS FAT compatibility attributes |
| # (used mainly to store the directory bit). |
| # - If the ZipFile was created on a WIN/DOS environment then the lowest byte represents DOS FAT file attributes |
| # (those attributes are: directory bit, hidden bit, read-only bit, system-file bit, etc). |
| # More info at https://unix.stackexchange.com/a/14727 and https://forensicswiki.xyz/page/ZIP |
| # So, we can ignore the DOS FAT attributes because python ZipFile.extract() already takes care of creating the directories |
| # as needed (both on win and *nix) and the other DOS FAT attributes (hidden/read-only/system-file/etc) are not interesting |
| # here (not even on Windows, since we don't care about setting those extra attributes for our use case). |
| # So we do this: |
| # 1. When uncompressing on a Windows system we just call to extract(). |
| # 2. When uncompressing on an Unix-like system we only take care of the attributes if the zip file was created on an |
| # Unix-like system, otherwise we don't have any info about the file permissions other than the DOS FAT attributes, |
| # which are useless here, so just call to extract() without setting any specific file permission in that case. |
| if info.create_system == 0 or sys.platform == 'win32': |
| zip_data.extract(info, path=dest) |
| else: |
| stat_st_mode = info.external_attr >> 16 |
| info_dst_path = os.path.join(dest, info.filename) |
| if stat.S_ISLNK(stat_st_mode): |
| # Symlinks are stored in the ZIP file as text files that contain inside the target filename of the symlink. |
| # Recreate the symlink instead of calling extract() when an entry with the attribute stat.S_IFLNK is detected. |
| link_src_path = zip_data.read(info) |
| link_dst_dir = os.path.dirname(info_dst_path) |
| if not os.path.isdir(link_dst_dir): |
| os.makedirs(link_dst_dir) |
| |
| # Remove existing link if exists. |
| if os.path.islink(info_dst_path): |
| os.unlink(info_dst_path) |
| os.symlink(link_src_path, info_dst_path) |
| else: |
| zip_data.extract(info, path=dest) |
| # Preserve bits 0-8 only: rwxrwxrwx (no sticky/setuid/setgid bits). |
| perm = stat_st_mode & 0x1FF |
| os.chmod(info_dst_path, perm) |
| |
| |
| def get(url): |
| """Issue GET request to a given URL and return the response.""" |
| import requests |
| |
| logger.debug("GET %s" % url) |
| resp = requests.get(url, stream=True) |
| resp.raise_for_status() |
| return resp |
| |
| |
| def get_download_to_descriptor(fd, url, max_retries=5): |
| """Download an URL in chunks and saves it to a file descriptor (truncating it) |
| It doesn't close the descriptor, but flushes it on success. |
| It retries the download in case of ECONNRESET up to max_retries. |
| This function is meant to download big files directly to the disk without |
| caching the whole file in memory. |
| """ |
| if max_retries < 1: |
| max_retries = 1 |
| wait = 2 |
| for current_retry in range(1, max_retries+1): |
| try: |
| logger.info("Downloading %s Try %d/%d" % (url, current_retry, max_retries)) |
| resp = urlopen(url) |
| # We may come here in a retry, ensure to truncate fd before start writing. |
| fd.seek(0) |
| fd.truncate(0) |
| while True: |
| chunk = resp.read(16*1024) |
| if not chunk: |
| break # Download finished |
| fd.write(chunk) |
| fd.flush() |
| # Success |
| return |
| except SocketError as e: |
| if current_retry < max_retries and e.errno == errno.ECONNRESET: |
| # Retry |
| logger.error("Connection reset by peer. Retrying after %ds..." % wait) |
| time.sleep(wait) |
| wait *= 2 |
| else: |
| # Maximum retries or unknown error |
| raise |
| |
| def rmtree(path): |
| # This works around two issues: |
| # 1. Cannot delete read-only files owned by us (e.g. files extracted from tarballs) |
| # 2. On Windows, we sometimes just need to retry in case the file handler |
| # hasn't been fully released (a common issue). |
| def handle_remove_readonly(func, path, exc): |
| excvalue = exc[1] |
| if func in (os.rmdir, os.remove, os.unlink) and excvalue.errno == errno.EACCES: |
| os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777 |
| func(path) |
| else: |
| raise |
| |
| return shutil.rmtree(path, onerror=handle_remove_readonly) |
| |
| |
| def sha256sum(file_path): |
| """Computes the SHA256 hash sum of a file""" |
| from hashlib import sha256 |
| hash = sha256() |
| with open(file_path, 'rb') as f: |
| for chunk in iter(lambda: f.read(4096), b''): |
| hash.update(chunk) |
| return hash.hexdigest() |
| |
| |
| # see https://docs.python.org/3/whatsnew/3.12.html#imp |
| def load_source(modname, filename): |
| import importlib.machinery |
| import importlib.util |
| |
| loader = importlib.machinery.SourceFileLoader(modname, filename) |
| spec = importlib.util.spec_from_file_location(modname, filename, loader=loader) |
| module = importlib.util.module_from_spec(spec) |
| sys.modules[module.__name__] = module |
| loader.exec_module(module) |
| return module |