Add type annotations

A few code are revised because the type checker (mypy) doesn't allow
changing types on a variable.

PR #20935.
This commit is contained in:
Chocobo1 2024-06-17 13:18:32 +08:00 committed by sledgehammer999
parent 12a4c3fda2
commit 5e5aa8a563
No known key found for this signature in database
GPG key ID: 6E4A2D025B7CC9A2
4 changed files with 122 additions and 72 deletions

View file

@ -53,7 +53,7 @@ jobs:
python-version: '3.7'
- name: Install tools (search engine)
run: pip install bandit pycodestyle pyflakes
run: pip install bandit mypy pycodestyle pyflakes pyright
- name: Gather files (search engine)
run: |
@ -61,6 +61,16 @@ jobs:
echo $PY_FILES
echo "PY_FILES=$PY_FILES" >> "$GITHUB_ENV"
- name: Check typings (search engine)
run: |
MYPYPATH="src/searchengine/nova3" \
mypy \
--follow-imports skip \
--strict \
$PY_FILES
pyright \
$PY_FILES
- name: Lint code (search engine)
run: |
pyflakes $PY_FILES

View file

@ -1,4 +1,4 @@
#VERSION: 1.46
#VERSION: 1.47
# Author:
# Christophe DUMEZ (chris@qbittorrent.org)
@ -39,9 +39,11 @@ import tempfile
import urllib.error
import urllib.parse
import urllib.request
from collections.abc import Mapping
from typing import Any, Dict, Optional
def getBrowserUserAgent():
def getBrowserUserAgent() -> str:
""" Disguise as browser to circumvent website blocking """
# Firefox release calendar
@ -57,7 +59,7 @@ def getBrowserUserAgent():
return f"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:{nowVersion}.0) Gecko/20100101 Firefox/{nowVersion}.0"
headers = {'User-Agent': getBrowserUserAgent()}
headers: Dict[str, Any] = {'User-Agent': getBrowserUserAgent()}
# SOCKS5 Proxy support
if "sock_proxy" in os.environ and len(os.environ["sock_proxy"].strip()) > 0:
@ -67,13 +69,13 @@ if "sock_proxy" in os.environ and len(os.environ["sock_proxy"].strip()) > 0:
if m is not None:
socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, m.group('host'),
int(m.group('port')), True, m.group('username'), m.group('password'))
socket.socket = socks.socksocket
socket.socket = socks.socksocket # type: ignore[misc]
def htmlentitydecode(s):
def htmlentitydecode(s: str) -> str:
# First convert alpha entities (such as é)
# (Inspired from http://mail.python.org/pipermail/python-list/2007-June/443813.html)
def entity2char(m):
def entity2char(m: re.Match[str]) -> str:
entity = m.group(1)
if entity in html.entities.name2codepoint:
return chr(html.entities.name2codepoint[entity])
@ -87,7 +89,7 @@ def htmlentitydecode(s):
return re.sub(r'&#x(\w+);', lambda x: chr(int(x.group(1), 16)), t)
def retrieve_url(url, custom_headers={}):
def retrieve_url(url: str, custom_headers: Mapping[str, Any] = {}) -> str:
""" Return the content of the url page as a string """
req = urllib.request.Request(url, headers={**headers, **custom_headers})
try:
@ -95,7 +97,7 @@ def retrieve_url(url, custom_headers={}):
except urllib.error.URLError as errno:
print(" ".join(("Connection error:", str(errno.reason))))
return ""
dat = response.read()
dat: bytes = response.read()
# Check if it is gzipped
if dat[:2] == b'\x1f\x8b':
# Data is gzip encoded, decode it
@ -109,16 +111,15 @@ def retrieve_url(url, custom_headers={}):
ignore, charset = info['Content-Type'].split('charset=')
except Exception:
pass
dat = dat.decode(charset, 'replace')
dat = htmlentitydecode(dat)
# return dat.encode('utf-8', 'replace')
return dat
datStr = dat.decode(charset, 'replace')
datStr = htmlentitydecode(datStr)
return datStr
def download_file(url, referer=None):
def download_file(url: str, referer: Optional[str] = None) -> str:
""" Download file at url and write it to a file, return the path to the file and the url """
file, path = tempfile.mkstemp()
file = os.fdopen(file, "wb")
fileHandle, path = tempfile.mkstemp()
file = os.fdopen(fileHandle, "wb")
# Download url
req = urllib.request.Request(url, headers=headers)
if referer is not None:

View file

@ -1,4 +1,4 @@
#VERSION: 1.45
#VERSION: 1.46
# Author:
# Fabien Devaux <fab AT gnux DOT info>
@ -37,17 +37,21 @@ import importlib
import pathlib
import sys
import urllib.parse
from collections.abc import Iterable, Iterator, Sequence
from enum import Enum
from glob import glob
from multiprocessing import Pool, cpu_count
from os import path
from typing import Dict, List, Optional, Set, Tuple, Type
THREADED = True
THREADED: bool = True
try:
MAX_THREADS = cpu_count()
MAX_THREADS: int = cpu_count()
except NotImplementedError:
MAX_THREADS = 1
CATEGORIES = {'all', 'movies', 'tv', 'music', 'games', 'anime', 'software', 'pictures', 'books'}
Category = Enum('Category', ['all', 'movies', 'tv', 'music', 'games', 'anime', 'software', 'pictures', 'books'])
################################################################################
# Every engine should have a "search" method taking
@ -58,11 +62,29 @@ CATEGORIES = {'all', 'movies', 'tv', 'music', 'games', 'anime', 'software', 'pic
################################################################################
EngineName = str
class Engine:
url: str
name: EngineName
supported_categories: Dict[str, str]
def __init__(self) -> None:
pass
def search(self, what: str, cat: str = Category.all.name) -> None:
pass
def download_torrent(self, info: str) -> None:
pass
# global state
engine_dict = dict()
engine_dict: Dict[EngineName, Optional[Type[Engine]]] = {}
def list_engines():
def list_engines() -> List[EngineName]:
""" List all engines,
including broken engines that fail on import
@ -81,10 +103,10 @@ def list_engines():
return found_engines
def get_engine(engine_name):
#global engine_dict
def get_engine(engine_name: EngineName) -> Optional[Type[Engine]]:
if engine_name in engine_dict:
return engine_dict[engine_name]
# when import fails, engine is None
engine = None
try:
@ -97,35 +119,37 @@ def get_engine(engine_name):
return engine
def initialize_engines(found_engines):
def initialize_engines(found_engines: Iterable[EngineName]) -> Set[EngineName]:
""" Import available engines
Return list of available engines
Return set of available engines
"""
supported_engines = []
supported_engines = set()
for engine_name in found_engines:
# import engine
engine = get_engine(engine_name)
if engine is None:
continue
supported_engines.append(engine_name)
supported_engines.add(engine_name)
return supported_engines
def engines_to_xml(supported_engines):
def engines_to_xml(supported_engines: Iterable[EngineName]) -> Iterator[str]:
""" Generates xml for supported engines """
tab = " " * 4
for engine_name in supported_engines:
search_engine = get_engine(engine_name)
if search_engine is None:
continue
supported_categories = ""
if hasattr(search_engine, "supported_categories"):
supported_categories = " ".join((key
for key in search_engine.supported_categories.keys()
if key != "all"))
if key != Category.all.name))
yield "".join((tab, "<", engine_name, ">\n",
tab, tab, "<name>", search_engine.name, "</name>\n",
@ -134,7 +158,7 @@ def engines_to_xml(supported_engines):
tab, "</", engine_name, ">\n"))
def displayCapabilities(supported_engines):
def displayCapabilities(supported_engines: Iterable[EngineName]) -> None:
"""
Display capabilities in XML format
<capabilities>
@ -151,21 +175,24 @@ def displayCapabilities(supported_engines):
print(xml)
def run_search(engine_list):
def run_search(engine_list: Tuple[Optional[Type[Engine]], str, Category]) -> bool:
""" Run search in engine
@param engine_list List with engine, query and category
@param engine_list Tuple with engine, query and category
@retval False if any exceptions occurred
@retval True otherwise
"""
engine, what, cat = engine_list
engine_class, what, cat = engine_list
if engine_class is None:
return False
try:
engine = engine()
engine = engine_class()
# avoid exceptions due to invalid category
if hasattr(engine, 'supported_categories'):
if cat in engine.supported_categories:
engine.search(what, cat)
if cat.name in engine.supported_categories:
engine.search(what, cat.name)
else:
engine.search(what)
@ -174,7 +201,7 @@ def run_search(engine_list):
return False
def main(args):
def main(args: Sequence[str]) -> None:
# qbt tend to run this script in 'isolate mode' so append the current path manually
current_path = str(pathlib.Path(__file__).parent.resolve())
if current_path not in sys.path:
@ -182,7 +209,7 @@ def main(args):
found_engines = list_engines()
def show_usage():
def show_usage() -> None:
print("./nova2.py all|engine1[,engine2]* <category> <keywords>", file=sys.stderr)
print("found engines: " + ','.join(found_engines), file=sys.stderr)
print("to list available engines: ./nova2.py --capabilities [--names]", file=sys.stderr)
@ -190,7 +217,6 @@ def main(args):
if not args:
show_usage()
sys.exit(1)
elif args[0] == "--capabilities":
supported_engines = initialize_engines(found_engines)
if "--names" in args:
@ -198,14 +224,14 @@ def main(args):
return
displayCapabilities(supported_engines)
return
elif len(args) < 3:
show_usage()
sys.exit(1)
cat = args[1].lower()
if cat not in CATEGORIES:
try:
category = Category[cat]
except KeyError:
print(" - ".join(('Invalid category', cat)), file=sys.stderr)
sys.exit(1)
@ -223,16 +249,18 @@ def main(args):
engines_list = initialize_engines(found_engines)
else:
# discard not-found engines
engines_list = [engine for engine in engines_list if engine in found_engines]
engines_list = {engine for engine in engines_list if engine in found_engines}
what = urllib.parse.quote(' '.join(args[2:]))
params = ((get_engine(engine_name), what, category) for engine_name in engines_list)
if THREADED:
# child process spawning is controlled min(number of searches, number of cpu)
with Pool(min(len(engines_list), MAX_THREADS)) as pool:
pool.map(run_search, ([get_engine(engine_name), what, cat] for engine_name in engines_list))
pool.map(run_search, params)
else:
# py3 note: map is needed to be evaluated for content to be executed
all(map(run_search, ([get_engine(engine_name), what, cat] for engine_name in engines_list)))
all(map(run_search, params))
if __name__ == "__main__":

View file

@ -1,4 +1,4 @@
#VERSION: 1.48
#VERSION: 1.49
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
@ -24,8 +24,25 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import re
from collections.abc import Mapping
from typing import Any
def prettyPrinter(dictionary):
# TODO: enable this when using Python >= 3.8
#SearchResults = TypedDict('SearchResults', {
# 'link': str,
# 'name': str,
# 'size': str,
# 'seeds': int,
# 'leech': int,
# 'engine_url': str,
# 'desc_link': str, # Optional
# 'pub_date': int # Optional
#})
SearchResults = Mapping[str, Any]
def prettyPrinter(dictionary: SearchResults) -> None:
outtext = "|".join((
dictionary["link"],
dictionary["name"].replace("|", " "),
@ -34,7 +51,7 @@ def prettyPrinter(dictionary):
str(dictionary["leech"]),
dictionary["engine_url"],
dictionary.get("desc_link", ""), # Optional
str(dictionary.get("pub_date", -1)), # Optional
str(dictionary.get("pub_date", -1)) # Optional
))
# fd 1 is stdout
@ -42,30 +59,24 @@ def prettyPrinter(dictionary):
print(outtext, file=utf8stdout)
def anySizeToBytes(size_string):
sizeUnitRegex: re.Pattern[str] = re.compile(r"^(?P<size>\d*\.?\d+) *(?P<unit>[a-z]+)?", re.IGNORECASE)
def anySizeToBytes(size_string: str) -> int:
"""
Convert a string like '1 KB' to '1024' (bytes)
"""
# separate integer from unit
try:
size, unit = size_string.split()
except Exception:
try:
size = size_string.strip()
unit = ''.join([c for c in size if c.isalpha()])
if len(unit) > 0:
size = size[:-len(unit)]
except Exception:
return -1
if len(size) == 0:
return -1
size = float(size)
if len(unit) == 0:
return int(size)
short_unit = unit.upper()[0]
# convert
units_dict = {'T': 40, 'G': 30, 'M': 20, 'K': 10}
if short_unit in units_dict:
size = size * 2**units_dict[short_unit]
return int(size)
match = sizeUnitRegex.match(size_string.strip())
if match is None:
return -1
size = float(match.group('size')) # need to match decimals
unit = match.group('unit')
if unit is not None:
units_exponents = {'T': 40, 'G': 30, 'M': 20, 'K': 10}
exponent = units_exponents.get(unit[0].upper(), 0)
size *= 2**exponent
return round(size)