# -*- coding: utf-8 -*-
"""Used only by the `database` and `github_utils` submodules."""
import re
from collections.abc import Iterator
from zipfile import ZipFile
from pathlib import Path
from shutil import get_terminal_size
import httpx
import bs4
from tqdm import tqdm
# from tqdm.contrib.logging import logging_redirect_tqdm
# put with logging_redirect_tqdm(loggers=all_loggers): around tqdm
# Note: seems to work without that so far...
from ..utils import get_logger
logger = get_logger(__name__)
[docs]class ServerError(Exception):
"""Some error with the server or connection to the server."""
def _make_tqdm_kwargs(desc: str = ""):
width, _ = get_terminal_size((50, 20))
bar_width = max(int(.8 * width) - 30 - len(desc), 10)
tqdm_kwargs = {
"bar_format": f"{{l_bar}}{{bar:{bar_width}}}{{r_bar}}{{bar:-{bar_width}b}}",
"colour": "green",
"desc": desc
}
return tqdm_kwargs
[docs]def create_client(base_url, cached: bool = False, cache_name: str = ""):
"""Create httpx Client instance, should support cache at some point."""
if cached:
raise NotImplementedError("Caching not yet implemented with httpx.")
transport = httpx.HTTPTransport(retries=5)
client = httpx.Client(base_url=base_url, timeout=2, transport=transport)
return client
[docs]def handle_download(client, pkg_url: str,
save_path: Path, pkg_name: str,
padlen: int, chunk_size: int = 128,
disable_bar=False) -> None:
"""Perform a streamed download and write the content to disk."""
tqdm_kwargs = _make_tqdm_kwargs(f"Downloading {pkg_name:<{padlen}}")
stream = send_get(client, pkg_url, stream=True)
try:
with stream as response:
response.raise_for_status()
total = int(response.headers.get("Content-Length", 0))
# Turn this into non-nested double with block in Python 3.9 or 10
with save_path.open("wb") as file_outer:
with tqdm.wrapattr(file_outer, "write", miniters=1,
total=total, **tqdm_kwargs,
disable=disable_bar) as file_inner:
for chunk in response.iter_bytes(chunk_size=chunk_size):
file_inner.write(chunk)
except httpx.HTTPStatusError as err:
logger.error("Error response %s while requesting %s.",
err.response.status_code, err.request.url)
raise ServerError("Cannot connect to server.") from err
except Exception as err:
logger.exception("Unhandled exception while accessing server.")
raise ServerError("Cannot connect to server.") from err
[docs]def handle_unzipping(save_path: Path, save_dir: Path,
pkg_name: str, padlen: int) -> None:
"""Unpack a zipped folder, usually called right after downloading."""
with ZipFile(save_path, "r") as zip_ref:
namelist = zip_ref.namelist()
tqdm_kwargs = _make_tqdm_kwargs(f"Extracting {pkg_name:<{padlen}}")
for file in tqdm(iterable=namelist, total=len(namelist), **tqdm_kwargs):
zip_ref.extract(file, save_dir)
[docs]def send_get(client, sub_url, stream: bool = False):
"""Send a GET request (streamed or not) using an existing client.
The point of this function is mostly elaborate exception handling.
"""
try:
if stream:
response = client.stream("GET", sub_url)
else:
response = client.get(sub_url)
response.raise_for_status()
except httpx.RequestError as err:
logger.exception("An error occurred while requesting %s.",
err.request.url)
raise ServerError("Cannot connect to server.") from err
except httpx.HTTPStatusError as err:
logger.error("Error response %s while requesting %s.",
err.response.status_code, err.request.url)
raise ServerError("Cannot connect to server.") from err
except Exception as err:
logger.exception("Unhandled exception while accessing server.")
raise ServerError("Cannot connect to server.") from err
return response
[docs]def get_server_folder_contents(client, dir_name: str,
unique_str: str = ".zip$") -> Iterator[str]:
"""Find all zip files in a given server folder."""
dir_name = dir_name + "/" if not dir_name.endswith("/") else dir_name
response = send_get(client, dir_name)
soup = bs4.BeautifulSoup(response.content, features="lxml")
hrefs = soup.find_all("a", href=True, string=re.compile(unique_str))
pkgs = (href.string for href in hrefs)
return pkgs