Add type annotations

A few code are revised because the type checker (mypy) doesn't allow
changing types on a variable.

PR #20935.
This commit is contained in:
Chocobo1 2024-06-17 13:18:32 +08:00 committed by GitHub
parent 2000be12ba
commit d71086e400
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 122 additions and 72 deletions

View file

@ -53,7 +53,7 @@ jobs:
python-version: '3.7' python-version: '3.7'
- name: Install tools (search engine) - name: Install tools (search engine)
run: pip install bandit pycodestyle pyflakes run: pip install bandit mypy pycodestyle pyflakes pyright
- name: Gather files (search engine) - name: Gather files (search engine)
run: | run: |
@ -61,6 +61,16 @@ jobs:
echo $PY_FILES echo $PY_FILES
echo "PY_FILES=$PY_FILES" >> "$GITHUB_ENV" echo "PY_FILES=$PY_FILES" >> "$GITHUB_ENV"
- name: Check typings (search engine)
run: |
MYPYPATH="src/searchengine/nova3" \
mypy \
--follow-imports skip \
--strict \
$PY_FILES
pyright \
$PY_FILES
- name: Lint code (search engine) - name: Lint code (search engine)
run: | run: |
pyflakes $PY_FILES pyflakes $PY_FILES

View file

@ -1,4 +1,4 @@
#VERSION: 1.46 #VERSION: 1.47
# Author: # Author:
# Christophe DUMEZ (chris@qbittorrent.org) # Christophe DUMEZ (chris@qbittorrent.org)
@ -39,9 +39,11 @@ import tempfile
import urllib.error import urllib.error
import urllib.parse import urllib.parse
import urllib.request import urllib.request
from collections.abc import Mapping
from typing import Any, Dict, Optional
def getBrowserUserAgent(): def getBrowserUserAgent() -> str:
""" Disguise as browser to circumvent website blocking """ """ Disguise as browser to circumvent website blocking """
# Firefox release calendar # Firefox release calendar
@ -57,7 +59,7 @@ def getBrowserUserAgent():
return f"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:{nowVersion}.0) Gecko/20100101 Firefox/{nowVersion}.0" return f"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:{nowVersion}.0) Gecko/20100101 Firefox/{nowVersion}.0"
headers = {'User-Agent': getBrowserUserAgent()} headers: Dict[str, Any] = {'User-Agent': getBrowserUserAgent()}
# SOCKS5 Proxy support # SOCKS5 Proxy support
if "sock_proxy" in os.environ and len(os.environ["sock_proxy"].strip()) > 0: if "sock_proxy" in os.environ and len(os.environ["sock_proxy"].strip()) > 0:
@ -67,13 +69,13 @@ if "sock_proxy" in os.environ and len(os.environ["sock_proxy"].strip()) > 0:
if m is not None: if m is not None:
socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, m.group('host'), socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, m.group('host'),
int(m.group('port')), True, m.group('username'), m.group('password')) int(m.group('port')), True, m.group('username'), m.group('password'))
socket.socket = socks.socksocket socket.socket = socks.socksocket # type: ignore[misc]
def htmlentitydecode(s): def htmlentitydecode(s: str) -> str:
# First convert alpha entities (such as é) # First convert alpha entities (such as é)
# (Inspired from http://mail.python.org/pipermail/python-list/2007-June/443813.html) # (Inspired from http://mail.python.org/pipermail/python-list/2007-June/443813.html)
def entity2char(m): def entity2char(m: re.Match[str]) -> str:
entity = m.group(1) entity = m.group(1)
if entity in html.entities.name2codepoint: if entity in html.entities.name2codepoint:
return chr(html.entities.name2codepoint[entity]) return chr(html.entities.name2codepoint[entity])
@ -87,7 +89,7 @@ def htmlentitydecode(s):
return re.sub(r'&#x(\w+);', lambda x: chr(int(x.group(1), 16)), t) return re.sub(r'&#x(\w+);', lambda x: chr(int(x.group(1), 16)), t)
def retrieve_url(url, custom_headers={}): def retrieve_url(url: str, custom_headers: Mapping[str, Any] = {}) -> str:
""" Return the content of the url page as a string """ """ Return the content of the url page as a string """
req = urllib.request.Request(url, headers={**headers, **custom_headers}) req = urllib.request.Request(url, headers={**headers, **custom_headers})
try: try:
@ -95,7 +97,7 @@ def retrieve_url(url, custom_headers={}):
except urllib.error.URLError as errno: except urllib.error.URLError as errno:
print(" ".join(("Connection error:", str(errno.reason)))) print(" ".join(("Connection error:", str(errno.reason))))
return "" return ""
dat = response.read() dat: bytes = response.read()
# Check if it is gzipped # Check if it is gzipped
if dat[:2] == b'\x1f\x8b': if dat[:2] == b'\x1f\x8b':
# Data is gzip encoded, decode it # Data is gzip encoded, decode it
@ -109,16 +111,15 @@ def retrieve_url(url, custom_headers={}):
ignore, charset = info['Content-Type'].split('charset=') ignore, charset = info['Content-Type'].split('charset=')
except Exception: except Exception:
pass pass
dat = dat.decode(charset, 'replace') datStr = dat.decode(charset, 'replace')
dat = htmlentitydecode(dat) datStr = htmlentitydecode(datStr)
# return dat.encode('utf-8', 'replace') return datStr
return dat
def download_file(url, referer=None): def download_file(url: str, referer: Optional[str] = None) -> str:
""" Download file at url and write it to a file, return the path to the file and the url """ """ Download file at url and write it to a file, return the path to the file and the url """
file, path = tempfile.mkstemp() fileHandle, path = tempfile.mkstemp()
file = os.fdopen(file, "wb") file = os.fdopen(fileHandle, "wb")
# Download url # Download url
req = urllib.request.Request(url, headers=headers) req = urllib.request.Request(url, headers=headers)
if referer is not None: if referer is not None:

View file

@ -1,4 +1,4 @@
#VERSION: 1.45 #VERSION: 1.46
# Author: # Author:
# Fabien Devaux <fab AT gnux DOT info> # Fabien Devaux <fab AT gnux DOT info>
@ -37,17 +37,21 @@ import importlib
import pathlib import pathlib
import sys import sys
import urllib.parse import urllib.parse
from collections.abc import Iterable, Iterator, Sequence
from enum import Enum
from glob import glob from glob import glob
from multiprocessing import Pool, cpu_count from multiprocessing import Pool, cpu_count
from os import path from os import path
from typing import Dict, List, Optional, Set, Tuple, Type
THREADED = True THREADED: bool = True
try: try:
MAX_THREADS = cpu_count() MAX_THREADS: int = cpu_count()
except NotImplementedError: except NotImplementedError:
MAX_THREADS = 1 MAX_THREADS = 1
CATEGORIES = {'all', 'movies', 'tv', 'music', 'games', 'anime', 'software', 'pictures', 'books'} Category = Enum('Category', ['all', 'movies', 'tv', 'music', 'games', 'anime', 'software', 'pictures', 'books'])
################################################################################ ################################################################################
# Every engine should have a "search" method taking # Every engine should have a "search" method taking
@ -58,11 +62,29 @@ CATEGORIES = {'all', 'movies', 'tv', 'music', 'games', 'anime', 'software', 'pic
################################################################################ ################################################################################
EngineName = str
class Engine:
url: str
name: EngineName
supported_categories: Dict[str, str]
def __init__(self) -> None:
pass
def search(self, what: str, cat: str = Category.all.name) -> None:
pass
def download_torrent(self, info: str) -> None:
pass
# global state # global state
engine_dict = dict() engine_dict: Dict[EngineName, Optional[Type[Engine]]] = {}
def list_engines(): def list_engines() -> List[EngineName]:
""" List all engines, """ List all engines,
including broken engines that fail on import including broken engines that fail on import
@ -81,10 +103,10 @@ def list_engines():
return found_engines return found_engines
def get_engine(engine_name): def get_engine(engine_name: EngineName) -> Optional[Type[Engine]]:
#global engine_dict
if engine_name in engine_dict: if engine_name in engine_dict:
return engine_dict[engine_name] return engine_dict[engine_name]
# when import fails, engine is None # when import fails, engine is None
engine = None engine = None
try: try:
@ -97,35 +119,37 @@ def get_engine(engine_name):
return engine return engine
def initialize_engines(found_engines): def initialize_engines(found_engines: Iterable[EngineName]) -> Set[EngineName]:
""" Import available engines """ Import available engines
Return list of available engines Return set of available engines
""" """
supported_engines = [] supported_engines = set()
for engine_name in found_engines: for engine_name in found_engines:
# import engine # import engine
engine = get_engine(engine_name) engine = get_engine(engine_name)
if engine is None: if engine is None:
continue continue
supported_engines.append(engine_name) supported_engines.add(engine_name)
return supported_engines return supported_engines
def engines_to_xml(supported_engines): def engines_to_xml(supported_engines: Iterable[EngineName]) -> Iterator[str]:
""" Generates xml for supported engines """ """ Generates xml for supported engines """
tab = " " * 4 tab = " " * 4
for engine_name in supported_engines: for engine_name in supported_engines:
search_engine = get_engine(engine_name) search_engine = get_engine(engine_name)
if search_engine is None:
continue
supported_categories = "" supported_categories = ""
if hasattr(search_engine, "supported_categories"): if hasattr(search_engine, "supported_categories"):
supported_categories = " ".join((key supported_categories = " ".join((key
for key in search_engine.supported_categories.keys() for key in search_engine.supported_categories.keys()
if key != "all")) if key != Category.all.name))
yield "".join((tab, "<", engine_name, ">\n", yield "".join((tab, "<", engine_name, ">\n",
tab, tab, "<name>", search_engine.name, "</name>\n", tab, tab, "<name>", search_engine.name, "</name>\n",
@ -134,7 +158,7 @@ def engines_to_xml(supported_engines):
tab, "</", engine_name, ">\n")) tab, "</", engine_name, ">\n"))
def displayCapabilities(supported_engines): def displayCapabilities(supported_engines: Iterable[EngineName]) -> None:
""" """
Display capabilities in XML format Display capabilities in XML format
<capabilities> <capabilities>
@ -151,21 +175,24 @@ def displayCapabilities(supported_engines):
print(xml) print(xml)
def run_search(engine_list): def run_search(engine_list: Tuple[Optional[Type[Engine]], str, Category]) -> bool:
""" Run search in engine """ Run search in engine
@param engine_list List with engine, query and category @param engine_list Tuple with engine, query and category
@retval False if any exceptions occurred @retval False if any exceptions occurred
@retval True otherwise @retval True otherwise
""" """
engine, what, cat = engine_list engine_class, what, cat = engine_list
if engine_class is None:
return False
try: try:
engine = engine() engine = engine_class()
# avoid exceptions due to invalid category # avoid exceptions due to invalid category
if hasattr(engine, 'supported_categories'): if hasattr(engine, 'supported_categories'):
if cat in engine.supported_categories: if cat.name in engine.supported_categories:
engine.search(what, cat) engine.search(what, cat.name)
else: else:
engine.search(what) engine.search(what)
@ -174,7 +201,7 @@ def run_search(engine_list):
return False return False
def main(args): def main(args: Sequence[str]) -> None:
# qbt tend to run this script in 'isolate mode' so append the current path manually # qbt tend to run this script in 'isolate mode' so append the current path manually
current_path = str(pathlib.Path(__file__).parent.resolve()) current_path = str(pathlib.Path(__file__).parent.resolve())
if current_path not in sys.path: if current_path not in sys.path:
@ -182,7 +209,7 @@ def main(args):
found_engines = list_engines() found_engines = list_engines()
def show_usage(): def show_usage() -> None:
print("./nova2.py all|engine1[,engine2]* <category> <keywords>", file=sys.stderr) print("./nova2.py all|engine1[,engine2]* <category> <keywords>", file=sys.stderr)
print("found engines: " + ','.join(found_engines), file=sys.stderr) print("found engines: " + ','.join(found_engines), file=sys.stderr)
print("to list available engines: ./nova2.py --capabilities [--names]", file=sys.stderr) print("to list available engines: ./nova2.py --capabilities [--names]", file=sys.stderr)
@ -190,7 +217,6 @@ def main(args):
if not args: if not args:
show_usage() show_usage()
sys.exit(1) sys.exit(1)
elif args[0] == "--capabilities": elif args[0] == "--capabilities":
supported_engines = initialize_engines(found_engines) supported_engines = initialize_engines(found_engines)
if "--names" in args: if "--names" in args:
@ -198,14 +224,14 @@ def main(args):
return return
displayCapabilities(supported_engines) displayCapabilities(supported_engines)
return return
elif len(args) < 3: elif len(args) < 3:
show_usage() show_usage()
sys.exit(1) sys.exit(1)
cat = args[1].lower() cat = args[1].lower()
try:
if cat not in CATEGORIES: category = Category[cat]
except KeyError:
print(" - ".join(('Invalid category', cat)), file=sys.stderr) print(" - ".join(('Invalid category', cat)), file=sys.stderr)
sys.exit(1) sys.exit(1)
@ -223,16 +249,18 @@ def main(args):
engines_list = initialize_engines(found_engines) engines_list = initialize_engines(found_engines)
else: else:
# discard not-found engines # discard not-found engines
engines_list = [engine for engine in engines_list if engine in found_engines] engines_list = {engine for engine in engines_list if engine in found_engines}
what = urllib.parse.quote(' '.join(args[2:])) what = urllib.parse.quote(' '.join(args[2:]))
params = ((get_engine(engine_name), what, category) for engine_name in engines_list)
if THREADED: if THREADED:
# child process spawning is controlled min(number of searches, number of cpu) # child process spawning is controlled min(number of searches, number of cpu)
with Pool(min(len(engines_list), MAX_THREADS)) as pool: with Pool(min(len(engines_list), MAX_THREADS)) as pool:
pool.map(run_search, ([get_engine(engine_name), what, cat] for engine_name in engines_list)) pool.map(run_search, params)
else: else:
# py3 note: map is needed to be evaluated for content to be executed # py3 note: map is needed to be evaluated for content to be executed
all(map(run_search, ([get_engine(engine_name), what, cat] for engine_name in engines_list))) all(map(run_search, params))
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -1,4 +1,4 @@
#VERSION: 1.48 #VERSION: 1.49
# Redistribution and use in source and binary forms, with or without # Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met: # modification, are permitted provided that the following conditions are met:
@ -24,8 +24,25 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE. # POSSIBILITY OF SUCH DAMAGE.
import re
from collections.abc import Mapping
from typing import Any
def prettyPrinter(dictionary): # TODO: enable this when using Python >= 3.8
#SearchResults = TypedDict('SearchResults', {
# 'link': str,
# 'name': str,
# 'size': str,
# 'seeds': int,
# 'leech': int,
# 'engine_url': str,
# 'desc_link': str, # Optional
# 'pub_date': int # Optional
#})
SearchResults = Mapping[str, Any]
def prettyPrinter(dictionary: SearchResults) -> None:
outtext = "|".join(( outtext = "|".join((
dictionary["link"], dictionary["link"],
dictionary["name"].replace("|", " "), dictionary["name"].replace("|", " "),
@ -34,7 +51,7 @@ def prettyPrinter(dictionary):
str(dictionary["leech"]), str(dictionary["leech"]),
dictionary["engine_url"], dictionary["engine_url"],
dictionary.get("desc_link", ""), # Optional dictionary.get("desc_link", ""), # Optional
str(dictionary.get("pub_date", -1)), # Optional str(dictionary.get("pub_date", -1)) # Optional
)) ))
# fd 1 is stdout # fd 1 is stdout
@ -42,30 +59,24 @@ def prettyPrinter(dictionary):
print(outtext, file=utf8stdout) print(outtext, file=utf8stdout)
def anySizeToBytes(size_string): sizeUnitRegex: re.Pattern[str] = re.compile(r"^(?P<size>\d*\.?\d+) *(?P<unit>[a-z]+)?", re.IGNORECASE)
def anySizeToBytes(size_string: str) -> int:
""" """
Convert a string like '1 KB' to '1024' (bytes) Convert a string like '1 KB' to '1024' (bytes)
""" """
# separate integer from unit
try:
size, unit = size_string.split()
except Exception:
try:
size = size_string.strip()
unit = ''.join([c for c in size if c.isalpha()])
if len(unit) > 0:
size = size[:-len(unit)]
except Exception:
return -1
if len(size) == 0:
return -1
size = float(size)
if len(unit) == 0:
return int(size)
short_unit = unit.upper()[0]
# convert match = sizeUnitRegex.match(size_string.strip())
units_dict = {'T': 40, 'G': 30, 'M': 20, 'K': 10} if match is None:
if short_unit in units_dict: return -1
size = size * 2**units_dict[short_unit]
return int(size) size = float(match.group('size')) # need to match decimals
unit = match.group('unit')
if unit is not None:
units_exponents = {'T': 40, 'G': 30, 'M': 20, 'K': 10}
exponent = units_exponents.get(unit[0].upper(), 0)
size *= 2**exponent
return round(size)