# -*- coding: utf-8 -*- ######################################################### # python import os import sys import traceback import logging import threading import time import re import random import urllib # import pip # import urlparse from urllib.parse import urlparse import json packages = ["beautifulsoup4"] for package in packages: try: import package except ImportError: # main(["install", package]) os.system(f"pip install {package}") # third-party import requests from lxml import html, etree from bs4 import BeautifulSoup # import snoop # from snoop import spy # sjva 공용 from framework import db, scheduler, path_data from framework.job import Job from framework.util import Util from framework.logger import get_logger # 패키지 # from .plugin import package_name, logger # from anime_downloader.logic_ohli24 import ModelOhli24Item from .model import ModelSetting, ModelLinkkf, ModelLinkkfProgram from .logic_queue import LogicQueue ######################################################### package_name = __name__.split(".")[0] logger = get_logger(package_name) class LogicInflearn(object): headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98" "Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8", "Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7", } session = None referer = None current_data = None @staticmethod def get_html(url): try: if LogicInflearn.session is None: LogicInflearn.session = requests.Session() LogicInflearn.headers["referer"] = LogicInflearn.referer LogicInflearn.referer = url # logger.debug( # f"get_html()::LogicLinkkfYommi.referer = {LogicLinkkfYommi.referer}" # ) page = LogicInflearn.session.get(url, headers=LogicInflearn.headers) # logger.info("page", page) return page.content.decode("utf8", errors="replace") # return page.text # return page.content except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) @staticmethod def get_video_url_from_url(url, url2): video_url = None referer_url = None vtt_url = None LogicInflearn.referer = url2 # logger.info("dx download url : %s , url2 : %s" % (url, url2)) # logger.debug(LogicLinkkfYommi.referer) try: if "kfani" in url2: # kfani 계열 처리 => 방문해서 m3u8을 받아온다. logger.debug("kfani routine") LogicInflearn.referer = url2 # logger.debug(f"url2: {url2}") data = LogicInflearn.get_html(url2) # logger.info("dx: data", data) regex2 = r'"([^\"]*m3u8)"|]+src=\"([^"]+)' temp_url = re.findall(regex2, data)[0] video_url = "" ref = "https://kfani.me" for i in temp_url: if i is None: continue video_url = i # video_url = '{1} -headers \'Referer: "{0}"\' -user_agent "Mozilla/5.0 (Windows NT 10.0; Win64; # x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3554.0 Safari/537.36"'.format(ref, # video_url) match = re.compile(r".*?.vtt)").search(data) # logger.info("match group: %s", match.group('vtt_url')) vtt_url = match.group("vtt_url") # logger.info("vtt_url: %s", vtt_url) # logger.debug(f"LogicLinkkfYommi.referer: {LogicLinkkfYommi.referer}") referer_url = url2 else: logger.error("새로운 유형의 url 발생! %s %s" % (url, url2)) except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) # logger.debug(f"referer_url: {referer_url}") # logger.debug(f"LogicLinkkfYommi.referer: {LogicLinkkfYommi.referer}") return [video_url, referer_url, vtt_url] @staticmethod def get_video_url(episode_url): try: # url = urlparse.urljoin(ModelSetting.get('inflearn_url'), episode_id) url = episode_url # logger.info("url: %s" % url) data = LogicInflearn.get_html(url) # logger.info(data) tree = html.fromstring(data) url2s = [ tag.attrib["value"] for tag in tree.xpath('//*[@id="body"]/div/span/center/select/option') ] # logger.info('dx: url', url) # logger.info('dx: urls2', url2s) video_url = None referer_url = None # dx for url2 in url2s: try: if video_url is not None: continue logger.debug(f"url: {url}, url2: {url2}") ret = LogicInflearn.get_video_url_from_url(url, url2) print(f"ret::::> {ret}") if ret is not None: video_url = ret referer_url = url2 except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) # logger.info(video_url) # return [video_url, referer_url] return video_url except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) @staticmethod def apply_new_title(new_title): try: ret = {} if LogicInflearn.current_data is not None: program = ( db.session.query(ModelLinkkfProgram) .filter_by(programcode=LogicInflearn.current_data["code"]) .first() ) new_title = Util.change_text_for_use_filename(new_title) LogicInflearn.current_data["save_folder"] = new_title program.save_folder = new_title db.session.commit() for entity in LogicInflearn.current_data["episode"]: entity["save_folder"] = new_title entity["filename"] = LogicInflearn.get_filename( LogicInflearn.current_data["save_folder"], LogicInflearn.current_data["season"], entity["title"], ) # tmp = data['filename'].split('.') # tmp[0] = new_title # data['filename'] = '.'.join(tmp) return LogicInflearn.current_data else: ret["ret"] = False ret["log"] = "No current data!!" except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) ret["ret"] = False ret["log"] = str(e) return ret @staticmethod def apply_new_season(new_season): try: ret = {} season = int(new_season) if LogicInflearn.current_data is not None: program = ( db.session.query(ModelLinkkfProgram) .filter_by(programcode=LogicInflearn.current_data["code"]) .first() ) LogicInflearn.current_data["season"] = season program.season = season db.session.commit() for entity in LogicInflearn.current_data["episode"]: entity["filename"] = LogicInflearn.get_filename( LogicInflearn.current_data["save_folder"], LogicInflearn.current_data["season"], entity["title"], ) return LogicInflearn.current_data else: ret["ret"] = False ret["log"] = "No current data!!" except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) ret["ret"] = False ret["log"] = str(e) return ret @staticmethod def add_whitelist(*args): ret = {} logger.debug(f"args: {args}") try: if len(args) == 0: code = str(LogicInflearn.current_data["code"]) else: code = str(args[0]) whitelist_program = ModelSetting.get("whitelist_program") whitelist_programs = [ str(x.strip().replace(" ", "")) for x in whitelist_program.replace("\n", ",").split(",") ] if code not in whitelist_programs: whitelist_programs.append(code) whitelist_programs = filter( lambda x: x != "", whitelist_programs ) # remove blank code whitelist_program = ",".join(whitelist_programs) entity = ( db.session.query(ModelSetting) .filter_by(key="whitelist_program") .with_for_update() .first() ) entity.value = whitelist_program db.session.commit() ret["ret"] = True ret["code"] = code if len(args) == 0: return LogicInflearn.current_data else: return ret else: ret["ret"] = False ret["log"] = "이미 추가되어 있습니다." except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) ret["ret"] = False ret["log"] = str(e) return ret @staticmethod def get_airing_info(): try: url = f"{ModelSetting.get('inflearn_url')}/airing" html_content = LogicInflearn.get_html(url) download_path = ModelSetting.get("download_path") tree = html.fromstring(html_content) tmp_items = tree.xpath('//div[@class="item"]') # logger.info('tmp_items:::', tmp_items) data = {"ret": "success"} # logger.debug(tree.xpath('//*[@id="wp_page"]//text()')) if tree.xpath('//*[@id="wp_page"]//text()'): data["total_page"] = tree.xpath('//*[@id="wp_page"]//text()')[-1] else: data["total_page"] = 0 data["episode_count"] = len(tmp_items) data["episode"] = [] for item in tmp_items: entity = {} entity["link"] = item.xpath(".//a/@href")[0] entity["code"] = re.search(r"[0-9]+", entity["link"]).group() entity["title"] = item.xpath('.//span[@class="name-film"]//text()')[ 0 ].strip() entity["image_link"] = item.xpath( './/img[@class="photo"]/@data-lazy-src' )[0] entity["chapter"] = item.xpath(".//a/button/span//text()")[0] # logger.info('entity:::', entity['title']) data["episode"].append(entity) json_file_path = os.path.join(download_path, "airing_list.json") logger.debug("json_file_path:: %s", json_file_path) with open(json_file_path, "w") as outfile: json.dump(data, outfile) return data except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) @staticmethod def get_search_result(query): try: # query = query.encode("utf-8") _query = urllib.parse.quote(query) url = f"{ModelSetting.get('inflearn_url')}/?s={_query}" logger.debug("search url::> %s", url) html_content = LogicInflearn.get_html(url) download_path = ModelSetting.get("download_path") tree = html.fromstring(html_content) tmp_items = tree.xpath('//div[@class="item"]') # logger.info('tmp_items:::', tmp_items) data = {"ret": "success", "query": query} # data["total_page"] = tree.xpath('//*[@id="wp_page"]//text()')[-1] if tree.xpath('//*[@id="wp_page"]//text()'): data["total_page"] = tree.xpath('//*[@id="wp_page"]//text()')[-1] else: data["total_page"] = 0 data["episode_count"] = len(tmp_items) data["episode"] = [] for item in tmp_items: entity = {} entity["link"] = item.xpath(".//a/@href")[0] entity["code"] = re.search(r"[0-9]+", entity["link"]).group() entity["title"] = item.xpath('.//span[@class="name-film"]//text()')[ 0 ].strip() entity["image_link"] = item.xpath('.//img[@class="photo"]/@src')[0] # logger.info('entity:::', entity['title']) data["episode"].append(entity) json_file_path = os.path.join(download_path, "airing_list.json") logger.debug("json_file_path:: %s", json_file_path) with open(json_file_path, "w") as outfile: json.dump(data, outfile) return data except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) @staticmethod def get_anime_list_info(cate, page): try: if cate == "ing": url = f"{ModelSetting.get('inflearn_url')}/airing/page/{page}" elif cate == "complete": url = f"{ModelSetting.get('inflearn_url')}/anime-list/page/{page}" logger.debug(f"get_anime_list_info():url >> {url}") html_content = LogicInflearn.get_html(url) download_path = ModelSetting.get("download_path") tree = html.fromstring(html_content) tmp_items = tree.xpath('//div[@class="item"]') # logger.info('tmp_items:::', tmp_items) data = {"ret": "success", "page": page} data["total_page"] = tree.xpath('//*[@id="wp_page"]//text()')[-1] data["episode_count"] = len(tmp_items) data["episode"] = [] for item in tmp_items: entity = {} entity["link"] = item.xpath(".//a/@href")[0] entity["code"] = re.search(r"[0-9]+", entity["link"]).group() entity["title"] = item.xpath('.//span[@class="name-film"]//text()')[ 0 ].strip() entity["image_link"] = item.xpath( './/img[@class="photo"]/@data-lazy-src' )[0] entity["chapter"] = item.xpath(".//a/button/span//text()")[0] # logger.info('entity:::', entity['title']) data["episode"].append(entity) json_file_path = os.path.join(download_path, "airing_list.json") logger.debug("json_file_path:: %s", json_file_path) with open(json_file_path, "w") as outfile: json.dump(data, outfile) return data except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) @staticmethod def get_screen_movie_info(page): try: url = f"{ModelSetting.get('inflearn_url')}/ani/page/{page}" html_content = LogicInflearn.get_html(url) download_path = ModelSetting.get("download_path") tree = html.fromstring(html_content) tmp_items = tree.xpath('//div[@class="item"]') # logger.info('tmp_items:::', tmp_items) data = {"ret": "success", "page": page} data["episode_count"] = len(tmp_items) data["episode"] = [] for item in tmp_items: entity = {} entity["link"] = item.xpath(".//a/@href")[0] entity["code"] = re.search(r"[0-9]+", entity["link"]).group() entity["title"] = item.xpath('.//span[@class="name-film"]//text()')[ 0 ].strip() entity["image_link"] = item.xpath( './/img[@class="photo"]/@data-lazy-src' )[0] # logger.info('entity:::', entity['title']) data["episode"].append(entity) json_file_path = os.path.join(download_path, "airing_list.json") logger.debug("json_file_path:: %s", json_file_path) with open(json_file_path, "w") as outfile: json.dump(data, outfile) return data except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) @staticmethod def get_complete_anilist_info(page): try: url = f"{ModelSetting.get('inflearn_url')}/anime-list/page/{page}" html_content = LogicInflearn.get_html(url) download_path = ModelSetting.get("download_path") tree = html.fromstring(html_content) tmp_items = tree.xpath('//div[@class="item"]') # logger.info('tmp_items:::', tmp_items) data = {"ret": "success", "page": page} data["episode_count"] = len(tmp_items) data["episode"] = [] if tree.xpath('//*[@id="wp_page"]//text()'): data["total_page"] = tree.xpath('//*[@id="wp_page"]//text()')[-1] else: data["total_page"] = 0 for item in tmp_items: entity = {} entity["link"] = item.xpath(".//a/@href")[0] entity["code"] = re.search(r"[0-9]+", entity["link"]).group() entity["title"] = item.xpath('.//span[@class="name-film"]//text()')[ 0 ].strip() entity["image_link"] = item.xpath( './/img[@class="photo"]/@data-lazy-src' )[0] # logger.info('entity:::', entity['title']) data["episode"].append(entity) json_file_path = os.path.join(download_path, "airing_list.json") logger.debug("json_file_path:: %s", json_file_path) with open(json_file_path, "w") as outfile: json.dump(data, outfile) return data except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) @staticmethod def get_title_info(code): try: if ( LogicInflearn.current_data is not None and LogicInflearn.current_data["code"] == code and LogicInflearn.current_data["ret"] ): return LogicInflearn.current_data url = "%s/%s" % (ModelSetting.get("inflearn_url"), code) # logger.info(url) html_content = LogicInflearn.get_html(url) sys.setrecursionlimit(10**7) # logger.info(html_content) tree = html.fromstring(html_content) # tree = etree.fromstring( # html_content, parser=etree.XMLParser(huge_tree=True) # ) # tree1 = BeautifulSoup(html_content, "lxml") soup = BeautifulSoup(html_content, "html.parser") # tree = etree.HTML(str(soup)) # logger.info(tree) data = {"code": code, "ret": False} # //*[@id="body"]/div/div[1]/article/center/strong # tmp = tree.xpath('/html/body/div[2]/div/div/article/center/strong' # )[0].text_content().strip().encode('utf8') # tmp = tree.xpath('//*[@id="body"]/div/div[1]/article/center/strong')[0].text_content().strip() # logger.info('tmp::>', tree.xpath('//div[@class="hrecipe"]/article/center/strong')) # tmp1 = tree.xpath("//div[contains(@id, 'related')]/ul/a") # tmp = tree1.find_element(By.Xpath, "//ul/a") tmp = soup.select("ul > a") # logger.debug(f"tmp1 size:=> {str(len(tmp))}") try: tmp = ( tree.xpath('//div[@class="hrecipe"]/article/center/strong')[0] .text_content() .strip() ) except IndexError: tmp = tree.xpath("//article/center/strong")[0].text_content().strip() # print(tmp) # logger.info(tmp) match = re.compile(r"(?P\d+)기").search(tmp) if match: data["season"] = match.group("season") else: data["season"] = "1" # replace_str = f'({data["season"]}기)' # logger.info(replace_str) data["_id"] = str(code) data["title"] = tmp.replace(data["season"] + "기", "").strip() data["title"] = data["title"].replace("()", "").strip() data["title"] = ( Util.change_text_for_use_filename(data["title"]) .replace("OVA", "") .strip() ) # logger.info(f"title:: {data['title']}") try: # data['poster_url'] = tree.xpath( # '//*[@id="body"]/div/div/div[1]/center/img' # )[0].attrib['data-src'] data["poster_url"] = tree.xpath( '//*[@id="body"]/div/div[1]/div[1]/center/img' )[0].attrib["data-lazy-src"] data["detail"] = [ { "info": tree.xpath("/html/body/div[2]/div/div[1]/div[1]")[0] .text_content() .strip() } ] except Exception as e: data["detail"] = [{"정보없음": ""}] data["poster_url"] = None data["rate"] = tree.xpath('span[@class="tag-score"]') # tag_score = tree.xpath('//span[@class="taq-score"]').text_content().strip() tag_score = tree.xpath('//span[@class="taq-score"]')[0].text_content() # logger.debug(tag_score) tag_count = ( tree.xpath('//span[contains(@class, "taq-count")]')[0] .text_content() .strip() ) data_rate = tree.xpath('//div[@class="rating"]/div/@data-rate') # logger.debug("data_rate::> %s", data_rate) # tmp = tree.xpath('//*[@id="relatedpost"]/ul/li') # tmp = tree.xpath('//article/a') # 수정된 # tmp = tree.xpath("//ul/a") tmp = soup.select("ul > a") # logger.debug(f"tmp size:=> {str(len(tmp))}") # logger.info(tmp) if tmp is not None: data["episode_count"] = str(len(tmp)) else: data["episode_count"] = "0" data["episode"] = [] # tags = tree.xpath( # '//*[@id="syno-nsc-ext-gen3"]/article/div[1]/article/a') # tags = tree.xpath("//ul/a") tags = soup.select("ul > a") # logger.info("tags", tags) # re1 = re.compile(r'\/(?P\d+)') re1 = re.compile(r"\-([^-])+\.") data["save_folder"] = data["title"] # logger.debug(f"save_folder::> {data['save_folder']}") program = ( db.session.query(ModelLinkkfProgram).filter_by(programcode=code).first() ) if program is None: program = ModelLinkkfProgram(data) db.session.add(program) db.session.commit() else: data["save_folder"] = program.save_folder data["season"] = program.season idx = 1 for t in tags: entity = { "_id": data["code"], "program_code": data["code"], "program_title": data["title"], "save_folder": Util.change_text_for_use_filename( data["save_folder"] ), "title": t.text.strip(), # "title": t.text_content().strip(), } # entity['code'] = re1.search(t.attrib['href']).group('code') # logger.debug(f"title ::>{entity['title']}") # 고유id임을 알수 없는 말도 안됨.. # 에피소드 코드가 고유해야 상태값 갱신이 제대로 된 값에 넣어짐 p = re.compile(r"([0-9]+)화?") m_obj = p.match(entity["title"]) # logger.info(m_obj.group()) # entity['code'] = data['code'] + '_' +str(idx) episode_code = None # logger.debug(f"m_obj::> {m_obj}") if m_obj is not None: episode_code = m_obj.group(1) entity["code"] = data["code"] + episode_code.zfill(4) else: entity["code"] = data["code"] # logger.info('episode_code', episode_code) # entity["url"] = t.attrib["href"] entity["url"] = t["href"] entity["season"] = data["season"] # 저장경로 저장 tmp_save_path = ModelSetting.get("download_path") if ModelSetting.get("auto_make_folder") == "True": program_path = os.path.join(tmp_save_path, entity["save_folder"]) entity["save_path"] = program_path if ModelSetting.get("inflearn_auto_make_season_folder"): entity["save_path"] = os.path.join( entity["save_path"], "Season %s" % int(entity["season"]) ) data["episode"].append(entity) entity["image"] = data["poster_url"] # entity['title'] = t.text_content().strip().encode('utf8') # entity['season'] = data['season'] # logger.debug(f"save_folder::2> {data['save_folder']}") entity["filename"] = LogicInflearn.get_filename( data["save_folder"], data["season"], entity["title"] ) idx = idx + 1 data["ret"] = True # logger.info('data', data) LogicInflearn.current_data = data # srt 파일 처리 return data except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) data["log"] = str(e) data["ret"] = "error" return data except IndexError as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) data["log"] = str(e) data["ret"] = "error" return data @staticmethod def get_filename(maintitle, season, title): try: # logger.debug("get_filename()===") # logger.info("title:: %s", title) # logger.info("maintitle:: %s", maintitle) match = re.compile( r"(?P.*?)\s?((?P<season>\d+)기)?\s?((?P<epi_no>\d+)화?)" ).search(title) if match: epi_no = int(match.group("epi_no")) if epi_no < 10: epi_no = "0%s" % epi_no else: epi_no = "%s" % epi_no if int(season) < 10: season = "0%s" % season else: season = "%s" % season # title_part = match.group('title').strip() # ret = '%s.S%sE%s%s.720p-SA.mp4' % (maintitle, season, epi_no, date_str) ret = "%s.S%sE%s.720p-LK.mp4" % (maintitle, season, epi_no) else: logger.debug("NOT MATCH") ret = "%s.720p-SA.mp4" % maintitle return Util.change_text_for_use_filename(ret) except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) @staticmethod def get_info_by_code(code): logger.error("get_info_by_code: %s", code) try: if LogicInflearn.current_data is not None: for t in LogicInflearn.current_data["episode"]: if t["code"] == code: return t except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) @staticmethod def scheduler_function(): try: logger.debug("Linkkf scheduler_function start..") whitelist_program = ModelSetting.get("whitelist_program") whitelist_programs = [ x.strip().replace(" ", "") for x in whitelist_program.replace("\n", ",").split(",") ] logger.debug(f"whitelist_programs: {whitelist_programs}") for code in whitelist_programs: logger.info("auto download start : %s", code) downloaded = ( db.session.query(ModelLinkkf) .filter(ModelLinkkf.completed.is_(True)) .filter_by(programcode=code) .with_for_update() .all() ) # logger.debug(f"downloaded:: {downloaded}") dl_codes = [dl.episodecode for dl in downloaded] # logger.debug('dl_codes:: ', dl_codes) logger.info("downloaded codes :%s", dl_codes) # if len(dl_codes) > 0: data = LogicInflearn.get_title_info(code) for episode in data["episode"]: e_code = episode["code"] if e_code not in dl_codes: logger.info("Logic Queue added :%s", e_code) LogicQueue.add_queue(episode) logger.debug("========================================") except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) @staticmethod def reset_db() -> bool: db.session.query(ModelLinkkf).delete() db.session.commit() return True