import logging from datetime import datetime, timedelta from typing import List from urllib.parse import quote from epg2xml.providers import EPGProgram, EPGProvider, no_endtime from epg2xml.utils import ParserBeautifulSoup as BeautifulSoup log = logging.getLogger(__name__.rsplit(".", maxsplit=1)[-1].upper()) CH_CATE = ["지상파", "종합편성", "케이블", "스카이라이프", "해외위성", "라디오"] class DAUM(EPGProvider): """EPGProvider for DAUM 데이터: rawhtml 요청수: #channels 특이사항: - 최대 7일치를 한 번에 - 프로그램 시작 시각만 제공 """ referer = None title_regex = r"^(?P.*?)\s?([\<\(]?(?P<part>\d{1})부[\>\)]?)?\s?(<(?P<subname1>.*)>)?\s?((?P<epnum>\d+)회)?\s?(<(?P<subname2>.*)>)?$" def get_svc_channels(self) -> List[dict]: svc_channels = [] url = "https://search.daum.net/search?DA=B3T&w=tot&rtmaxcoll=B3T&q={}" channelsel1 = '#channelNaviLayer > div[class^="layer_tv layer_all"] ul > li' channelsel2 = 'div[class="wrap_sub"] > span > a' for c in CH_CATE: search_url = url.format(f"{c} 편성표") data = self.request(search_url) soup = BeautifulSoup(data) if not soup.find_all(attrs={"disp-attr": "B3T"}): continue all_channels = [str(x.text.strip()) for x in soup.select(channelsel1)] if not all_channels: all_channels += [str(x.text.strip()) for x in soup.select(channelsel2)] svc_cate = c.replace("스카이라이프", "SKYLIFE") for x in all_channels: svc_channels.append( { "Name": x, "ServiceId": f"{svc_cate} {x}", "Category": c, } ) return svc_channels @no_endtime def get_programs(self) -> None: url = "https://search.daum.net/search?DA=B3T&w=tot&rtmaxcoll=B3T&q={}" for idx, _ch in enumerate(self.req_channels): log.info("%03d/%03d %s", idx + 1, len(self.req_channels), _ch) search_url = url.format(quote(_ch.svcid + " 편성표")) data = self.request(search_url) try: _epgs = self.__epgs_of_days(_ch.id, data) except AssertionError as e: log.warning("%s: %s", e, _ch) except Exception: log.exception("프로그램 파싱 중 예외: %s", _ch) else: _ch.programs.extend(_epgs) def __epgs_of_days(self, channelid: str, data: str) -> List[EPGProgram]: soup = BeautifulSoup(data) assert soup.find_all(attrs={"disp-attr": "B3T"}), "EPG 정보가 없거나 없는 채널입니다" days = soup.select('div[class="tbl_head head_type2"] > span > span[class="date"]') # 연도 추정 currdate = datetime.now() # 언제나 basedate보다 미래 basedate = datetime.strptime(days[0].text.strip(), "%m.%d").replace(year=currdate.year) if (basedate - currdate).days > 0: basedate = basedate.replace(year=basedate.year - 1) _epgs = [] for nd, _ in enumerate(days): hours = soup.select(f'[id="tvProgramListWrap"] > table > tbody > tr > td:nth-of-type({nd+1})') assert len(hours) == 24, f"24개의 시간 행이 있어야 합니다: 현재: {len(hours):d}" for nh, hour in enumerate(hours): for dl in hour.select("dl"): _epg = EPGProgram(channelid) nm = int(dl.select("dt")[0].text.strip()) _epg.stime = basedate + timedelta(days=nd, hours=nh, minutes=nm) for atag in dl.select("dd > a"): _epg.title = atag.text.strip() for span in dl.select("dd > span"): class_val = " ".join(span["class"]) if class_val == "": _epg.title = span.text.strip() elif "ico_re" in class_val: _epg.rebroadcast = True elif "ico_rate" in class_val: _epg.rating = int(class_val.split("ico_rate")[1].strip()) else: # ico_live ico_hd ico_subtitle ico_hand ico_uhd ico_talk ico_st _epg.extras = (_epg.extras or []) + [span.text.strip()] if m := self.title_regex.search(_epg.title): _epg.title = m.group("title") _epg.part_num = m.group("part") _epg.ep_num = m.group("epnum") _epg.title_sub = m.group("subname2") or m.group("subname1") if _epg.part_num: _epg.title += f" {_epg.part_num}부" _epgs.append(_epg) return _epgs