diff --git a/.github/workflows/ci_python.yaml b/.github/workflows/ci_python.yaml index f08b382b4..fe525c8c6 100644 --- a/.github/workflows/ci_python.yaml +++ b/.github/workflows/ci_python.yaml @@ -53,7 +53,7 @@ jobs: python-version: '3.7' - name: Install tools (search engine) - run: pip install bandit pycodestyle pyflakes + run: pip install bandit mypy pycodestyle pyflakes pyright - name: Gather files (search engine) run: | @@ -61,6 +61,16 @@ jobs: echo $PY_FILES echo "PY_FILES=$PY_FILES" >> "$GITHUB_ENV" + - name: Check typings (search engine) + run: | + MYPYPATH="src/searchengine/nova3" \ + mypy \ + --follow-imports skip \ + --strict \ + $PY_FILES + pyright \ + $PY_FILES + - name: Lint code (search engine) run: | pyflakes $PY_FILES diff --git a/src/searchengine/nova3/helpers.py b/src/searchengine/nova3/helpers.py index 0aaf281b2..f0206e383 100644 --- a/src/searchengine/nova3/helpers.py +++ b/src/searchengine/nova3/helpers.py @@ -1,4 +1,4 @@ -#VERSION: 1.46 +#VERSION: 1.47 # Author: # Christophe DUMEZ (chris@qbittorrent.org) @@ -39,9 +39,11 @@ import tempfile import urllib.error import urllib.parse import urllib.request +from collections.abc import Mapping +from typing import Any, Dict, Optional -def getBrowserUserAgent(): +def getBrowserUserAgent() -> str: """ Disguise as browser to circumvent website blocking """ # Firefox release calendar @@ -57,7 +59,7 @@ def getBrowserUserAgent(): return f"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:{nowVersion}.0) Gecko/20100101 Firefox/{nowVersion}.0" -headers = {'User-Agent': getBrowserUserAgent()} +headers: Dict[str, Any] = {'User-Agent': getBrowserUserAgent()} # SOCKS5 Proxy support if "sock_proxy" in os.environ and len(os.environ["sock_proxy"].strip()) > 0: @@ -67,13 +69,13 @@ if "sock_proxy" in os.environ and len(os.environ["sock_proxy"].strip()) > 0: if m is not None: socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, m.group('host'), int(m.group('port')), True, m.group('username'), m.group('password')) - socket.socket = socks.socksocket + socket.socket = socks.socksocket # type: ignore[misc] -def htmlentitydecode(s): +def htmlentitydecode(s: str) -> str: # First convert alpha entities (such as é) # (Inspired from http://mail.python.org/pipermail/python-list/2007-June/443813.html) - def entity2char(m): + def entity2char(m: re.Match[str]) -> str: entity = m.group(1) if entity in html.entities.name2codepoint: return chr(html.entities.name2codepoint[entity]) @@ -87,7 +89,7 @@ def htmlentitydecode(s): return re.sub(r'&#x(\w+);', lambda x: chr(int(x.group(1), 16)), t) -def retrieve_url(url, custom_headers={}): +def retrieve_url(url: str, custom_headers: Mapping[str, Any] = {}) -> str: """ Return the content of the url page as a string """ req = urllib.request.Request(url, headers={**headers, **custom_headers}) try: @@ -95,7 +97,7 @@ def retrieve_url(url, custom_headers={}): except urllib.error.URLError as errno: print(" ".join(("Connection error:", str(errno.reason)))) return "" - dat = response.read() + dat: bytes = response.read() # Check if it is gzipped if dat[:2] == b'\x1f\x8b': # Data is gzip encoded, decode it @@ -109,16 +111,15 @@ def retrieve_url(url, custom_headers={}): ignore, charset = info['Content-Type'].split('charset=') except Exception: pass - dat = dat.decode(charset, 'replace') - dat = htmlentitydecode(dat) - # return dat.encode('utf-8', 'replace') - return dat + datStr = dat.decode(charset, 'replace') + datStr = htmlentitydecode(datStr) + return datStr -def download_file(url, referer=None): +def download_file(url: str, referer: Optional[str] = None) -> str: """ Download file at url and write it to a file, return the path to the file and the url """ - file, path = tempfile.mkstemp() - file = os.fdopen(file, "wb") + fileHandle, path = tempfile.mkstemp() + file = os.fdopen(fileHandle, "wb") # Download url req = urllib.request.Request(url, headers=headers) if referer is not None: diff --git a/src/searchengine/nova3/nova2.py b/src/searchengine/nova3/nova2.py index 2c5963beb..9db438b96 100644 --- a/src/searchengine/nova3/nova2.py +++ b/src/searchengine/nova3/nova2.py @@ -1,4 +1,4 @@ -#VERSION: 1.45 +#VERSION: 1.46 # Author: # Fabien Devaux @@ -37,17 +37,21 @@ import importlib import pathlib import sys import urllib.parse +from collections.abc import Iterable, Iterator, Sequence +from enum import Enum from glob import glob from multiprocessing import Pool, cpu_count from os import path +from typing import Dict, List, Optional, Set, Tuple, Type -THREADED = True +THREADED: bool = True try: - MAX_THREADS = cpu_count() + MAX_THREADS: int = cpu_count() except NotImplementedError: MAX_THREADS = 1 -CATEGORIES = {'all', 'movies', 'tv', 'music', 'games', 'anime', 'software', 'pictures', 'books'} +Category = Enum('Category', ['all', 'movies', 'tv', 'music', 'games', 'anime', 'software', 'pictures', 'books']) + ################################################################################ # Every engine should have a "search" method taking @@ -58,11 +62,29 @@ CATEGORIES = {'all', 'movies', 'tv', 'music', 'games', 'anime', 'software', 'pic ################################################################################ +EngineName = str + + +class Engine: + url: str + name: EngineName + supported_categories: Dict[str, str] + + def __init__(self) -> None: + pass + + def search(self, what: str, cat: str = Category.all.name) -> None: + pass + + def download_torrent(self, info: str) -> None: + pass + + # global state -engine_dict = dict() +engine_dict: Dict[EngineName, Optional[Type[Engine]]] = {} -def list_engines(): +def list_engines() -> List[EngineName]: """ List all engines, including broken engines that fail on import @@ -81,10 +103,10 @@ def list_engines(): return found_engines -def get_engine(engine_name): - #global engine_dict +def get_engine(engine_name: EngineName) -> Optional[Type[Engine]]: if engine_name in engine_dict: return engine_dict[engine_name] + # when import fails, engine is None engine = None try: @@ -97,35 +119,37 @@ def get_engine(engine_name): return engine -def initialize_engines(found_engines): +def initialize_engines(found_engines: Iterable[EngineName]) -> Set[EngineName]: """ Import available engines - Return list of available engines + Return set of available engines """ - supported_engines = [] + supported_engines = set() for engine_name in found_engines: # import engine engine = get_engine(engine_name) if engine is None: continue - supported_engines.append(engine_name) + supported_engines.add(engine_name) return supported_engines -def engines_to_xml(supported_engines): +def engines_to_xml(supported_engines: Iterable[EngineName]) -> Iterator[str]: """ Generates xml for supported engines """ tab = " " * 4 for engine_name in supported_engines: search_engine = get_engine(engine_name) + if search_engine is None: + continue supported_categories = "" if hasattr(search_engine, "supported_categories"): supported_categories = " ".join((key for key in search_engine.supported_categories.keys() - if key != "all")) + if key != Category.all.name)) yield "".join((tab, "<", engine_name, ">\n", tab, tab, "", search_engine.name, "\n", @@ -134,7 +158,7 @@ def engines_to_xml(supported_engines): tab, "\n")) -def displayCapabilities(supported_engines): +def displayCapabilities(supported_engines: Iterable[EngineName]) -> None: """ Display capabilities in XML format @@ -151,21 +175,24 @@ def displayCapabilities(supported_engines): print(xml) -def run_search(engine_list): +def run_search(engine_list: Tuple[Optional[Type[Engine]], str, Category]) -> bool: """ Run search in engine - @param engine_list List with engine, query and category + @param engine_list Tuple with engine, query and category @retval False if any exceptions occurred @retval True otherwise """ - engine, what, cat = engine_list + engine_class, what, cat = engine_list + if engine_class is None: + return False + try: - engine = engine() + engine = engine_class() # avoid exceptions due to invalid category if hasattr(engine, 'supported_categories'): - if cat in engine.supported_categories: - engine.search(what, cat) + if cat.name in engine.supported_categories: + engine.search(what, cat.name) else: engine.search(what) @@ -174,7 +201,7 @@ def run_search(engine_list): return False -def main(args): +def main(args: Sequence[str]) -> None: # qbt tend to run this script in 'isolate mode' so append the current path manually current_path = str(pathlib.Path(__file__).parent.resolve()) if current_path not in sys.path: @@ -182,7 +209,7 @@ def main(args): found_engines = list_engines() - def show_usage(): + def show_usage() -> None: print("./nova2.py all|engine1[,engine2]* ", file=sys.stderr) print("found engines: " + ','.join(found_engines), file=sys.stderr) print("to list available engines: ./nova2.py --capabilities [--names]", file=sys.stderr) @@ -190,7 +217,6 @@ def main(args): if not args: show_usage() sys.exit(1) - elif args[0] == "--capabilities": supported_engines = initialize_engines(found_engines) if "--names" in args: @@ -198,14 +224,14 @@ def main(args): return displayCapabilities(supported_engines) return - elif len(args) < 3: show_usage() sys.exit(1) cat = args[1].lower() - - if cat not in CATEGORIES: + try: + category = Category[cat] + except KeyError: print(" - ".join(('Invalid category', cat)), file=sys.stderr) sys.exit(1) @@ -223,16 +249,18 @@ def main(args): engines_list = initialize_engines(found_engines) else: # discard not-found engines - engines_list = [engine for engine in engines_list if engine in found_engines] + engines_list = {engine for engine in engines_list if engine in found_engines} what = urllib.parse.quote(' '.join(args[2:])) + params = ((get_engine(engine_name), what, category) for engine_name in engines_list) + if THREADED: # child process spawning is controlled min(number of searches, number of cpu) with Pool(min(len(engines_list), MAX_THREADS)) as pool: - pool.map(run_search, ([get_engine(engine_name), what, cat] for engine_name in engines_list)) + pool.map(run_search, params) else: # py3 note: map is needed to be evaluated for content to be executed - all(map(run_search, ([get_engine(engine_name), what, cat] for engine_name in engines_list))) + all(map(run_search, params)) if __name__ == "__main__": diff --git a/src/searchengine/nova3/novaprinter.py b/src/searchengine/nova3/novaprinter.py index 80f73aae3..66afcdc72 100644 --- a/src/searchengine/nova3/novaprinter.py +++ b/src/searchengine/nova3/novaprinter.py @@ -1,4 +1,4 @@ -#VERSION: 1.48 +#VERSION: 1.49 # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: @@ -24,8 +24,25 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. +import re +from collections.abc import Mapping +from typing import Any -def prettyPrinter(dictionary): +# TODO: enable this when using Python >= 3.8 +#SearchResults = TypedDict('SearchResults', { +# 'link': str, +# 'name': str, +# 'size': str, +# 'seeds': int, +# 'leech': int, +# 'engine_url': str, +# 'desc_link': str, # Optional +# 'pub_date': int # Optional +#}) +SearchResults = Mapping[str, Any] + + +def prettyPrinter(dictionary: SearchResults) -> None: outtext = "|".join(( dictionary["link"], dictionary["name"].replace("|", " "), @@ -34,7 +51,7 @@ def prettyPrinter(dictionary): str(dictionary["leech"]), dictionary["engine_url"], dictionary.get("desc_link", ""), # Optional - str(dictionary.get("pub_date", -1)), # Optional + str(dictionary.get("pub_date", -1)) # Optional )) # fd 1 is stdout @@ -42,30 +59,24 @@ def prettyPrinter(dictionary): print(outtext, file=utf8stdout) -def anySizeToBytes(size_string): +sizeUnitRegex: re.Pattern[str] = re.compile(r"^(?P\d*\.?\d+) *(?P[a-z]+)?", re.IGNORECASE) + + +def anySizeToBytes(size_string: str) -> int: """ Convert a string like '1 KB' to '1024' (bytes) """ - # separate integer from unit - try: - size, unit = size_string.split() - except Exception: - try: - size = size_string.strip() - unit = ''.join([c for c in size if c.isalpha()]) - if len(unit) > 0: - size = size[:-len(unit)] - except Exception: - return -1 - if len(size) == 0: - return -1 - size = float(size) - if len(unit) == 0: - return int(size) - short_unit = unit.upper()[0] - # convert - units_dict = {'T': 40, 'G': 30, 'M': 20, 'K': 10} - if short_unit in units_dict: - size = size * 2**units_dict[short_unit] - return int(size) + match = sizeUnitRegex.match(size_string.strip()) + if match is None: + return -1 + + size = float(match.group('size')) # need to match decimals + unit = match.group('unit') + + if unit is not None: + units_exponents = {'T': 40, 'G': 30, 'M': 20, 'K': 10} + exponent = units_exponents.get(unit[0].upper(), 0) + size *= 2**exponent + + return round(size)