390 lines
12 KiB
Python
390 lines
12 KiB
Python
import argparse
|
|
import errno
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
from copy import copy
|
|
from logging.handlers import RotatingFileHandler
|
|
from pathlib import Path
|
|
from typing import Union
|
|
|
|
from epg2xml import __description__, __title__, __url__, __version__
|
|
from epg2xml.utils import dump_json
|
|
|
|
# suppress modules logging
|
|
logging.getLogger("requests").setLevel(logging.ERROR)
|
|
logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR)
|
|
|
|
logger = logging.getLogger("CONFIG")
|
|
|
|
|
|
def setup_root_logger(
|
|
*,
|
|
handler: logging.Handler = None,
|
|
formatter: logging.Formatter = None,
|
|
level: Union[int, str] = None,
|
|
) -> None:
|
|
if level is None:
|
|
level = logging.INFO
|
|
|
|
if handler is None:
|
|
# logging to console, stderr by default
|
|
handler = logging.StreamHandler()
|
|
|
|
if formatter is None:
|
|
log_fmt = "%(asctime)-15s %(levelname)-8s %(name)-7s %(lineno)4d: %(message)s"
|
|
formatter = logging.Formatter(log_fmt, datefmt="%Y/%m/%d %H:%M:%S")
|
|
|
|
handler.setFormatter(formatter)
|
|
logging.getLogger().addHandler(handler)
|
|
logging.getLogger().setLevel(level)
|
|
|
|
|
|
class Singleton(type):
|
|
_instances = {}
|
|
|
|
def __call__(cls, *args, **kwargs):
|
|
if cls not in cls._instances:
|
|
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
|
|
|
|
return cls._instances[cls]
|
|
|
|
|
|
class Config:
|
|
__metaclass__ = Singleton
|
|
|
|
base_config = {
|
|
"GLOBAL": {
|
|
"ENABLED": True,
|
|
"FETCH_LIMIT": 2,
|
|
"ID_FORMAT": "{ServiceId}.{Source.lower()}",
|
|
"ADD_REBROADCAST_TO_TITLE": False,
|
|
"ADD_EPNUM_TO_TITLE": True,
|
|
"ADD_DESCRIPTION": True,
|
|
"ADD_XMLTV_NS": False,
|
|
"GET_MORE_DETAILS": False,
|
|
"ADD_CHANNEL_ICON": True,
|
|
"HTTP_PROXY": None,
|
|
},
|
|
"KT": {
|
|
"MY_CHANNELS": [],
|
|
},
|
|
"LG": {
|
|
"MY_CHANNELS": [],
|
|
},
|
|
"SK": {
|
|
"MY_CHANNELS": [],
|
|
},
|
|
"DAUM": {
|
|
"MY_CHANNELS": [],
|
|
},
|
|
"NAVER": {
|
|
"MY_CHANNELS": [],
|
|
},
|
|
"WAVVE": {
|
|
"MY_CHANNELS": [],
|
|
},
|
|
"TVING": {
|
|
"MY_CHANNELS": [],
|
|
},
|
|
"SPOTV": {
|
|
"MY_CHANNELS": [],
|
|
},
|
|
}
|
|
|
|
base_settings = {
|
|
"config": {
|
|
"argv": "--config",
|
|
"env": "EPG2XML_CONFIG",
|
|
"default": str(Path.cwd().joinpath("epg2xml.json")),
|
|
},
|
|
"logfile": {
|
|
"argv": "--logfile",
|
|
"env": "EPG2XML_LOGFILE",
|
|
"default": None,
|
|
},
|
|
"loglevel": {
|
|
"argv": "--loglevel",
|
|
"env": "EPG2XML_LOGLEVEL",
|
|
"default": "INFO",
|
|
},
|
|
"channelfile": {
|
|
"argv": "--channelfile",
|
|
"env": "EPG2XML_CHANNELFILE",
|
|
"default": str(Path.cwd().joinpath("Channel.json")),
|
|
},
|
|
"xmlfile": {
|
|
"argv": "--xmlfile",
|
|
"env": "EPG2XML_XMLFILE",
|
|
"default": None,
|
|
},
|
|
"xmlsock": {
|
|
"argv": "--xmlsock",
|
|
"env": "EPG2XML_XMLSOCK",
|
|
"default": None,
|
|
},
|
|
"parallel": {
|
|
"argv": "--parallel",
|
|
"env": "EPG2XML_PARALLEL",
|
|
"default": False,
|
|
},
|
|
"dbfile": {
|
|
"argv": "--dbfile",
|
|
"env": "EPG2XML_DBFILE",
|
|
"default": None,
|
|
},
|
|
}
|
|
|
|
def __init__(self):
|
|
"""Initializes config"""
|
|
# Args and settings
|
|
self.args = self.parse_args()
|
|
self.settings = self.get_settings()
|
|
# Configs
|
|
self.configs = None
|
|
|
|
@property
|
|
def default_config(self):
|
|
"""reserved for adding extra fields"""
|
|
cfg = copy(self.base_config)
|
|
return cfg
|
|
|
|
def __inner_upgrade(self, settings1, settings2, key=None, overwrite=False):
|
|
sub_upgraded = False
|
|
merged = copy(settings2)
|
|
|
|
if isinstance(settings1, dict):
|
|
for k, v in settings1.items():
|
|
# missing k
|
|
if k not in settings2:
|
|
merged[k] = v
|
|
sub_upgraded = True
|
|
if not key:
|
|
logger.info("Added %r config option: %s", str(k), str(v))
|
|
else:
|
|
logger.info("Added %r to config option %r: %s", str(k), str(key), str(v))
|
|
continue
|
|
|
|
# iterate children
|
|
if isinstance(v, (dict, list)):
|
|
merged[k], did_upgrade = self.__inner_upgrade(
|
|
settings1[k], settings2[k], key=k, overwrite=overwrite
|
|
)
|
|
sub_upgraded = did_upgrade or sub_upgraded
|
|
elif settings1[k] != settings2[k] and overwrite:
|
|
merged = settings1
|
|
sub_upgraded = True
|
|
elif isinstance(settings1, list) and key:
|
|
for v in settings1:
|
|
if v not in settings2:
|
|
merged.append(v)
|
|
sub_upgraded = True
|
|
logger.info("Added to config option %r: %s", str(key), str(v))
|
|
continue
|
|
|
|
return merged, sub_upgraded
|
|
|
|
def upgrade_configs(self, currents):
|
|
fields_env = {}
|
|
|
|
# ENV gets priority: ENV > config.json
|
|
for name, _ in self.base_config.items():
|
|
if name in os.environ:
|
|
# Use JSON decoder to get same behaviour as config file
|
|
fields_env[name] = json.JSONDecoder().decode(os.environ[name])
|
|
logger.debug("setting from ENV --%s=%s", name, fields_env[name])
|
|
|
|
# Update in-memory config with environment settings
|
|
currents.update(fields_env)
|
|
|
|
# Do inner upgrade
|
|
upgraded_configs, upgraded = self.__inner_upgrade(self.base_config, currents)
|
|
return upgraded_configs, upgraded
|
|
|
|
def load_with_hidden(self, cfg_old):
|
|
cfg_new = copy(cfg_old)
|
|
for p in cfg_new:
|
|
# push items in GLOBAL as defaults
|
|
for k, v in cfg_old["GLOBAL"].items():
|
|
if k not in cfg_new[p]:
|
|
cfg_new[p][k] = v
|
|
del cfg_new["GLOBAL"]
|
|
self.configs = cfg_new
|
|
|
|
def load(self):
|
|
logger.debug("Loading config...")
|
|
if not Path(self.settings["config"]).exists():
|
|
logger.info("No config file found. Creating a default one...")
|
|
self.save(self.default_config)
|
|
|
|
try:
|
|
with open(self.settings["config"], "r", encoding="utf-8") as fp:
|
|
cfg, upgraded = self.upgrade_configs(json.load(fp))
|
|
|
|
# Save config if upgraded
|
|
if upgraded:
|
|
self.save(cfg)
|
|
sys.exit(0)
|
|
|
|
self.load_with_hidden(cfg)
|
|
except (json.decoder.JSONDecodeError, ValueError):
|
|
logger.exception("Please check your config here: %s", self.settings["config"])
|
|
sys.exit(1)
|
|
|
|
def save(self, cfg, exitOnSave=True):
|
|
dump_json(self.settings["config"], cfg)
|
|
if exitOnSave:
|
|
logger.info("Your config was upgraded. You may check the changes here: %r", self.settings["config"])
|
|
|
|
if exitOnSave:
|
|
sys.exit(0)
|
|
|
|
def get_settings(self):
|
|
setts = {}
|
|
for name, data in self.base_settings.items():
|
|
# Argrument priority: cmd < environment < default
|
|
try:
|
|
value = None
|
|
# Command line argument
|
|
if self.args[name]:
|
|
value = self.args[name]
|
|
logger.debug("setting from ARG --%s=%s", name, value)
|
|
|
|
# Envirnoment variable
|
|
elif data["env"] in os.environ:
|
|
value = os.environ[data["env"]]
|
|
logger.debug("setting from ENV --%s=%s", data["env"], value)
|
|
|
|
# Default
|
|
else:
|
|
value = data["default"]
|
|
logger.debug("setting by default %s=%s", data["argv"], value)
|
|
|
|
setts[name] = value
|
|
|
|
except Exception:
|
|
logger.exception("Exception raised on setting value: %r", name)
|
|
|
|
# checking existance of important files' dir
|
|
for argname in ["config", "logfile", "channelfile", "dbfile"]:
|
|
filepath = setts[argname]
|
|
if filepath is not None and not Path(filepath).parent.exists():
|
|
logger.error(FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), filepath))
|
|
sys.exit(1)
|
|
|
|
# handling of boolean args
|
|
for argname in ["parallel"]:
|
|
if isinstance(setts[argname], str):
|
|
setts[argname] = setts[argname].lower() in ("y", "yes", "t", "true", "on", "1")
|
|
|
|
# logging to file
|
|
if setts["logfile"] is not None:
|
|
fileHandler = RotatingFileHandler(setts["logfile"], maxBytes=2 * 1024**2, backupCount=5, encoding="utf-8")
|
|
setup_root_logger(handler=fileHandler)
|
|
|
|
# set configured log level
|
|
logging.getLogger().setLevel(setts["loglevel"])
|
|
|
|
return setts
|
|
|
|
# Parse command line arguments
|
|
def parse_args(self):
|
|
parser = argparse.ArgumentParser(
|
|
prog=__title__,
|
|
description=__description__,
|
|
epilog=f"Online help: <{__url__}>",
|
|
formatter_class=argparse.RawTextHelpFormatter,
|
|
)
|
|
|
|
# Mode
|
|
parser.add_argument(
|
|
"cmd",
|
|
metavar="command",
|
|
choices=("run", "fromdb", "update_channels"),
|
|
help=('"run": XML 형식으로 출력\n' '"fromdb": dbfile로부터 불러오기\n' '"update_channels": 채널 정보 업데이트'),
|
|
)
|
|
|
|
# Display version info
|
|
parser.add_argument(
|
|
"-v",
|
|
"--version",
|
|
action="version",
|
|
version=f"{__title__} v{__version__}",
|
|
)
|
|
|
|
# Config file
|
|
parser.add_argument(
|
|
self.base_settings["config"]["argv"],
|
|
nargs="?",
|
|
const=None,
|
|
help=f"config file path (default: {self.base_settings['config']['default']})",
|
|
)
|
|
|
|
# Log file
|
|
parser.add_argument(
|
|
self.base_settings["logfile"]["argv"],
|
|
nargs="?",
|
|
const=None,
|
|
help=f"log file path (default: {self.base_settings['logfile']['default']})",
|
|
)
|
|
|
|
# Log level
|
|
parser.add_argument(
|
|
self.base_settings["loglevel"]["argv"],
|
|
choices=("DEBUG", "INFO", "WARNING", "ERROR"),
|
|
help=f"loglevel (default: {self.base_settings['loglevel']['default']})",
|
|
)
|
|
|
|
# Channel file
|
|
parser.add_argument(
|
|
self.base_settings["channelfile"]["argv"],
|
|
nargs="?",
|
|
const=None,
|
|
help=f"channel file path (default: {self.base_settings['channelfile']['default']})",
|
|
)
|
|
|
|
# XML file
|
|
parser.add_argument(
|
|
self.base_settings["xmlfile"]["argv"],
|
|
nargs="?",
|
|
const=None,
|
|
help="write output to file if specified",
|
|
)
|
|
|
|
# XML socket
|
|
parser.add_argument(
|
|
self.base_settings["xmlsock"]["argv"],
|
|
nargs="?",
|
|
const=None,
|
|
help="send output to unix socket if specified",
|
|
)
|
|
|
|
# Run in Parallel
|
|
parser.add_argument(
|
|
self.base_settings["parallel"]["argv"],
|
|
action="store_true",
|
|
help="run in parallel",
|
|
)
|
|
|
|
# DB file
|
|
parser.add_argument(
|
|
self.base_settings["dbfile"]["argv"],
|
|
nargs="?",
|
|
const=None,
|
|
help="export/import data to/from db",
|
|
)
|
|
|
|
# Print help by default if no arguments
|
|
if len(sys.argv) == 1:
|
|
parser.print_help()
|
|
|
|
sys.exit(0)
|
|
|
|
else:
|
|
return vars(parser.parse_args())
|
|
|
|
|
|
# logging
|
|
setup_root_logger()
|