197 lines
6.3 KiB
Python
197 lines
6.3 KiB
Python
import json
|
|
import logging
|
|
import re
|
|
import sys
|
|
import threading
|
|
import time
|
|
import xml.etree.ElementTree as ET
|
|
from functools import wraps
|
|
from math import floor
|
|
from typing import Callable
|
|
|
|
from bs4 import BeautifulSoup, FeatureNotFound
|
|
|
|
log = logging.getLogger("UTILS")
|
|
|
|
|
|
def dump_json(file_path, data) -> int:
|
|
with open(file_path, "w", encoding="utf-8") as f:
|
|
txt = json.dumps(data, ensure_ascii=False, indent=2)
|
|
# for compact form of channellist in json files
|
|
txt = re.sub(r",\n\s{8}\"", ', "', txt)
|
|
txt = re.sub(r"\s{6}{\s+(.*)\s+}", r" { \g<1> }", txt)
|
|
return f.write(txt)
|
|
|
|
|
|
# https://stackoverflow.com/a/22273639
|
|
_illegal_unichrs = [
|
|
(0x00, 0x08),
|
|
(0x0B, 0x0C),
|
|
(0x0E, 0x1F),
|
|
(0x7F, 0x84),
|
|
(0x86, 0x9F),
|
|
(0xFDD0, 0xFDDF),
|
|
(0xFFFE, 0xFFFF),
|
|
]
|
|
if sys.maxunicode >= 0x10000: # not narrow build
|
|
_illegal_unichrs.extend(
|
|
[
|
|
(0x1FFFE, 0x1FFFF),
|
|
(0x2FFFE, 0x2FFFF),
|
|
(0x3FFFE, 0x3FFFF),
|
|
(0x4FFFE, 0x4FFFF),
|
|
(0x5FFFE, 0x5FFFF),
|
|
(0x6FFFE, 0x6FFFF),
|
|
(0x7FFFE, 0x7FFFF),
|
|
(0x8FFFE, 0x8FFFF),
|
|
(0x9FFFE, 0x9FFFF),
|
|
(0xAFFFE, 0xAFFFF),
|
|
(0xBFFFE, 0xBFFFF),
|
|
(0xCFFFE, 0xCFFFF),
|
|
(0xDFFFE, 0xDFFFF),
|
|
(0xEFFFE, 0xEFFFF),
|
|
(0xFFFFE, 0xFFFFF),
|
|
(0x10FFFE, 0x10FFFF),
|
|
]
|
|
)
|
|
_illegal_ranges = [rf"{chr(low)}-{chr(high)}" for (low, high) in _illegal_unichrs]
|
|
_illegal_xml_chars_RE = re.compile("[" + "".join(_illegal_ranges) + "]")
|
|
|
|
|
|
class Element(ET.Element):
|
|
def __init__(self, *args, **kwargs):
|
|
attrib = kwargs.pop("attrib", {})
|
|
super().__init__(args[0], attrib=attrib, **kwargs)
|
|
if len(args) > 1:
|
|
self.text = args[1]
|
|
|
|
def indent(self, space=" ", level=0):
|
|
if level < 0:
|
|
raise ValueError(f"Initial indentation level must be >= 0, got {level}")
|
|
if len(self) == 0:
|
|
return
|
|
|
|
# Reduce the memory consumption by reusing indentation strings.
|
|
indentations = ["\n" + level * space]
|
|
|
|
def _indent_children(elem, level):
|
|
# Start a new indentation level for the first child.
|
|
child_level = level + 1
|
|
try:
|
|
child_indentation = indentations[child_level]
|
|
except IndexError:
|
|
child_indentation = indentations[level] + space
|
|
indentations.append(child_indentation)
|
|
|
|
if not elem.text or not elem.text.strip():
|
|
elem.text = child_indentation
|
|
|
|
for child in elem:
|
|
if len(child):
|
|
_indent_children(child, child_level)
|
|
if not child.tail or not child.tail.strip():
|
|
child.tail = child_indentation
|
|
|
|
# Dedent after the last child by overwriting the previous indentation.
|
|
if not child.tail.strip(): # pylint: disable=undefined-loop-variable
|
|
child.tail = indentations[level] # pylint: disable=undefined-loop-variable
|
|
|
|
_indent_children(self, 0)
|
|
|
|
def tostring(self, space=" ", level=0):
|
|
self.indent(space=space, level=level)
|
|
return _illegal_xml_chars_RE.sub("", space * level + ET.tostring(self, encoding="unicode"))
|
|
|
|
|
|
class PrefixLogger(logging.LoggerAdapter):
|
|
def __init__(self, logger, prefix):
|
|
super().__init__(logger, {})
|
|
self.prefix = prefix
|
|
|
|
def process(self, msg, kwargs):
|
|
return f"{self.prefix} {msg}", kwargs
|
|
|
|
|
|
class ParserBeautifulSoup(BeautifulSoup):
|
|
"""A ``bs4.BeautifulSoup`` that picks the first available parser."""
|
|
|
|
def insert_before(self, *args):
|
|
pass
|
|
|
|
def insert_after(self, *args):
|
|
pass
|
|
|
|
def __init__(self, markup, **kwargs):
|
|
# pick the first parser available
|
|
for parser in ["lxml", "html.parser"]:
|
|
try:
|
|
super().__init__(markup, parser, **kwargs)
|
|
return
|
|
except FeatureNotFound:
|
|
pass
|
|
|
|
raise FeatureNotFound
|
|
|
|
|
|
class RateLimiter:
|
|
"""original implementation by tomasbasham/ratelimit"""
|
|
|
|
try:
|
|
now: Callable = time.monotonic # Use monotonic time if available
|
|
except AttributeError:
|
|
now: Callable = time.time # otherwise fall back to the system clock
|
|
|
|
def __init__(self, calls: int = 15, period: float = 900.0, tps: float = None):
|
|
if tps is not None:
|
|
if tps <= 0.0:
|
|
raise ValueError("tps must be positive")
|
|
calls, period = 1, 1 / tps
|
|
self.max_calls = max(1, min(sys.maxsize, floor(calls)))
|
|
self.period = period
|
|
|
|
# Initialise the decorator state.
|
|
self.last_reset = self.now()
|
|
self.num_calls = 0
|
|
|
|
# Add thread safety.
|
|
self.lock = threading.RLock()
|
|
|
|
def __call__(self, func: Callable) -> Callable:
|
|
"""
|
|
Return a wrapped function that prevents further function invocations if
|
|
previously called within a specified period of time.
|
|
"""
|
|
|
|
@wraps(func)
|
|
def wrapper(*args, **kargs):
|
|
"""
|
|
Extend the behaviour of the decorated function, forwarding function
|
|
invocations previously called no sooner than a specified period of
|
|
time. The decorator will raise an exception if the function cannot
|
|
be called so the caller may implement a retry strategy such as an
|
|
exponential backoff.
|
|
"""
|
|
with self.lock:
|
|
period_remaining = self.__period_remaining()
|
|
|
|
# If the time window has elapsed then reset.
|
|
if period_remaining <= 0:
|
|
self.num_calls = 0
|
|
self.last_reset = self.now()
|
|
|
|
# Increase the number of attempts to call the function.
|
|
self.num_calls += 1
|
|
|
|
# If the number of attempts to call the function exceeds the maximum
|
|
if self.num_calls > self.max_calls:
|
|
self.last_reset = self.now() + period_remaining # for future call
|
|
time.sleep(period_remaining)
|
|
return func(*args, **kargs)
|
|
return func(*args, **kargs)
|
|
|
|
return wrapper
|
|
|
|
def __period_remaining(self) -> float:
|
|
elapsed = self.now() - self.last_reset
|
|
return self.period - elapsed
|