# -*- coding: utf-8 -*- ######################################################### # python import os import sys import traceback import logging import threading import time import re import random import urllib import asyncio # import pip # import urlparse from urllib.parse import urlparse import json import aiohttp packages = ["beautifulsoup4"] for package in packages: try: import package except ImportError: # main(["install", package]) os.system(f"pip install {package}") # third-party import requests from lxml import html, etree from bs4 import BeautifulSoup from urllib import parse # from debugger import Debugger # from debugger1 import timerun, yommilogger from .debugger1 import timerun, yommi_logger # import snoop # from snoop import spy # sjva 공용 from framework import db, scheduler, path_data from framework.job import Job from framework.util import Util from framework.logger import get_logger # 패키지 # from .plugin import package_name, logger # from anime_downloader.logic_ohli24 import ModelOhli24Item from .model import ModelSetting, ModelInflearn, ModelInflearnProgram from .logic_queue import LogicQueue ######################################################### package_name = __name__.split(".")[0] logger = get_logger(package_name) class LogicInflearn(object): headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98" "Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8", "Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7", } session = None referer = None current_data = None season = "1" @staticmethod def get_html(url): try: if LogicInflearn.session is None: LogicInflearn.session = requests.Session() LogicInflearn.headers["referer"] = LogicInflearn.referer LogicInflearn.referer = url # logger.debug( # f"get_html()::LogicLinkkfYommi.referer = {LogicLinkkfYommi.referer}" # ) page = LogicInflearn.session.get(url, headers=LogicInflearn.headers) # logger.info("page", page) return page.content.decode("utf8", errors="replace") # return page.text # return page.content except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) @staticmethod def get_video_url_from_url(url, url2): video_url = None referer_url = None vtt_url = None LogicInflearn.referer = url2 # logger.info("dx download url : %s , url2 : %s" % (url, url2)) # logger.debug(LogicLinkkfYommi.referer) try: if "kfani" in url2: # kfani 계열 처리 => 방문해서 m3u8을 받아온다. logger.debug("kfani routine") LogicInflearn.referer = url2 # logger.debug(f"url2: {url2}") data = LogicInflearn.get_html(url2) # logger.info("dx: data", data) regex2 = r'"([^\"]*m3u8)"|]+src=\"([^"]+)' temp_url = re.findall(regex2, data)[0] video_url = "" ref = "https://kfani.me" for i in temp_url: if i is None: continue video_url = i # video_url = '{1} -headers \'Referer: "{0}"\' -user_agent "Mozilla/5.0 (Windows NT 10.0; Win64; # x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3554.0 Safari/537.36"'.format(ref, # video_url) match = re.compile(r".*?.vtt)").search(data) # logger.info("match group: %s", match.group('vtt_url')) vtt_url = match.group("vtt_url") # logger.info("vtt_url: %s", vtt_url) # logger.debug(f"LogicLinkkfYommi.referer: {LogicLinkkfYommi.referer}") referer_url = url2 else: logger.error("새로운 유형의 url 발생! %s %s" % (url, url2)) except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) # logger.debug(f"referer_url: {referer_url}") # logger.debug(f"LogicLinkkfYommi.referer: {LogicLinkkfYommi.referer}") return [video_url, referer_url, vtt_url] @staticmethod def get_video_url(episode_url): try: # url = urlparse.urljoin(ModelSetting.get('inflearn_url'), episode_id) url = episode_url # logger.info("url: %s" % url) data = LogicInflearn.get_html(url) # logger.info(data) tree = html.fromstring(data) url2s = [ tag.attrib["value"] for tag in tree.xpath('//*[@id="body"]/div/span/center/select/option') ] # logger.info('dx: url', url) # logger.info('dx: urls2', url2s) video_url = None referer_url = None # dx for url2 in url2s: try: if video_url is not None: continue logger.debug(f"url: {url}, url2: {url2}") ret = LogicInflearn.get_video_url_from_url(url, url2) print(f"ret::::> {ret}") if ret is not None: video_url = ret referer_url = url2 except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) # logger.info(video_url) # return [video_url, referer_url] return video_url except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) @staticmethod def apply_new_title(new_title): try: ret = {} if LogicInflearn.current_data is not None: program = ( db.session.query(ModelInflearnProgram) .filter_by(programcode=LogicInflearn.current_data["code"]) .first() ) new_title = Util.change_text_for_use_filename(new_title) LogicInflearn.current_data["save_folder"] = new_title program.save_folder = new_title db.session.commit() for entity in LogicInflearn.current_data["episode"]: entity["save_folder"] = new_title entity["filename"] = LogicInflearn.get_filename( LogicInflearn.current_data["save_folder"], LogicInflearn.current_data["season"], entity["title"], ) # tmp = data['filename'].split('.') # tmp[0] = new_title # data['filename'] = '.'.join(tmp) return LogicInflearn.current_data else: ret["ret"] = False ret["log"] = "No current data!!" except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) ret["ret"] = False ret["log"] = str(e) return ret @staticmethod def apply_new_season(new_season): try: ret = {} season = int(new_season) if LogicInflearn.current_data is not None: program = ( db.session.query(ModelInflearnProgram) .filter_by(programcode=LogicInflearn.current_data["code"]) .first() ) LogicInflearn.current_data["season"] = season program.season = season db.session.commit() for entity in LogicInflearn.current_data["episode"]: entity["filename"] = LogicInflearn.get_filename( LogicInflearn.current_data["save_folder"], LogicInflearn.current_data["season"], entity["title"], ) return LogicInflearn.current_data else: ret["ret"] = False ret["log"] = "No current data!!" except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) ret["ret"] = False ret["log"] = str(e) return ret @staticmethod def add_whitelist(*args): ret = {} logger.debug(f"args: {args}") try: if len(args) == 0: code = str(LogicInflearn.current_data["code"]) else: code = str(args[0]) whitelist_program = ModelSetting.get("whitelist_program") whitelist_programs = [ str(x.strip().replace(" ", "")) for x in whitelist_program.replace("\n", ",").split(",") ] if code not in whitelist_programs: whitelist_programs.append(code) whitelist_programs = filter( lambda x: x != "", whitelist_programs ) # remove blank code whitelist_program = ",".join(whitelist_programs) entity = ( db.session.query(ModelSetting) .filter_by(key="whitelist_program") .with_for_update() .first() ) entity.value = whitelist_program db.session.commit() ret["ret"] = True ret["code"] = code if len(args) == 0: return LogicInflearn.current_data else: return ret else: ret["ret"] = False ret["log"] = "이미 추가되어 있습니다." except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) ret["ret"] = False ret["log"] = str(e) return ret @staticmethod def get_lecture_list(): try: url = f"{ModelSetting.get('inflearn_url')}/curation/latest?page=1" html_content = LogicInflearn.get_html(url) download_path = ModelSetting.get("download_path") tree = html.fromstring(html_content) tmp_items = tree.xpath('//div[@class="item"]') # logger.info('tmp_items:::', tmp_items) data = {"ret": "success"} # logger.debug(tree.xpath('//*[@id="wp_page"]//text()')) if tree.xpath('//*[@id="wp_page"]//text()'): data["total_page"] = tree.xpath('//*[@id="wp_page"]//text()')[-1] else: data["total_page"] = 0 data["episode_count"] = len(tmp_items) data["episode"] = [] for item in tmp_items: entity = {} entity["link"] = item.xpath(".//a/@href")[0] entity["code"] = re.search(r"[0-9]+", entity["link"]).group() entity["title"] = item.xpath('.//span[@class="name-film"]//text()')[ 0 ].strip() entity["image_link"] = item.xpath( './/img[@class="photo"]/@data-lazy-src' )[0] entity["chapter"] = item.xpath(".//a/button/span//text()")[0] # logger.info('entity:::', entity['title']) data["episode"].append(entity) json_file_path = os.path.join(download_path, "airing_list.json") logger.debug("json_file_path:: %s", json_file_path) with open(json_file_path, "w") as outfile: json.dump(data, outfile) return data except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) @staticmethod def get_search_result(query): try: # query = query.encode("utf-8") _query = urllib.parse.quote(query) url = f"{ModelSetting.get('inflearn_url')}/?s={_query}" logger.debug("search url::> %s", url) html_content = LogicInflearn.get_html(url) download_path = ModelSetting.get("download_path") tree = html.fromstring(html_content) tmp_items = tree.xpath('//div[@class="item"]') # logger.info('tmp_items:::', tmp_items) data = {"ret": "success", "query": query} # data["total_page"] = tree.xpath('//*[@id="wp_page"]//text()')[-1] if tree.xpath('//*[@id="wp_page"]//text()'): data["total_page"] = tree.xpath('//*[@id="wp_page"]//text()')[-1] else: data["total_page"] = 0 data["episode_count"] = len(tmp_items) data["episode"] = [] for item in tmp_items: entity = {} entity["link"] = item.xpath(".//a/@href")[0] entity["code"] = re.search(r"[0-9]+", entity["link"]).group() entity["title"] = item.xpath('.//span[@class="name-film"]//text()')[ 0 ].strip() entity["image_link"] = item.xpath('.//img[@class="photo"]/@src')[0] # logger.info('entity:::', entity['title']) data["episode"].append(entity) json_file_path = os.path.join(download_path, "airing_list.json") logger.debug("json_file_path:: %s", json_file_path) with open(json_file_path, "w") as outfile: json.dump(data, outfile) return data except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) @staticmethod def get_lecture_list_info(cate, page): try: url = "" if cate == "recent": # url = f"{ModelSetting.get('inflearn_url')}/curation/latest?page={page}&order={cate}" url = f"{ModelSetting.get('inflearn_url')}/courses?page={page}&order={cate}" elif cate == "rating": url = f"{ModelSetting.get('inflearn_url')}/courses?page={page}&order={cate}" elif cate == "popular": url = f"{ModelSetting.get('inflearn_url')}/courses?page={page}&order={cate}" elif cate == "seq": url = f"{ModelSetting.get('inflearn_url')}/courses?page={page}&order={cate}" logger.debug(f"get_lecture_list_info():url >> {url}") html_content = LogicInflearn.get_html(url) # logger.debug("html_content: %s", html_content) tree = html.fromstring(html_content) tmp_items = tree.xpath( '//div[contains(@class, "courses_card_list_body")]/div' ) # logger.info("tmp_items::: %s", tmp_items) data = { "ret": "success", "page": page, "total_page": 100, "episode_count": len(tmp_items), "episode": [], } for item in tmp_items: entity = {} entity["link"] = item.xpath(".//a/@href")[0] entity["code"] = entity["link"].split("/")[-1] entity["_code"] = item.xpath("/div/@data-productid") # logger.debug(item) # entity["code"] = 1 entity["title"] = item.xpath('.//p[@class="course_title"]/text()')[ 0 ].strip() entity["teacher"] = item.xpath('.//div[@class="instructor"]/text()')[ 0 ].strip() # entity["price"] = item.xpath( # './/div[@class="price"]//span[@class="pay_price"]/text()' # )[0].strip() entity["price"] = item.xpath('.//div[@class="price"]/text()') entity["image_link"] = item.xpath('.//img[@class="swiper-lazy"]/@src') entity["chapter"] = entity["price"] # entity["chapter"] = item.xpath(".//a/button/span//text()")[0] # logger.info('entity:::', entity['title']) data["episode"].append(entity) # json_file_path = os.path.join(download_path, "airing_list.json") # logger.debug("json_file_path:: %s", json_file_path) # with open(json_file_path, "w") as outfile: # json.dump(data, outfile) # logger.debug("data:: %s", data) return data except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) # @staticmethod # def get_lecture_list_info(cate, page): # try: # url = "" # if cate == "recent": # # url = f"{ModelSetting.get('inflearn_url')}/curation/latest?page={page}&order={cate}" # url = f"{ModelSetting.get('inflearn_url')}/courses?page={page}&order={cate}" # elif cate == "rating": # url = f"{ModelSetting.get('inflearn_url')}/courses?page={page}&order={cate}" # elif cate == "popular": # url = f"{ModelSetting.get('inflearn_url')}/courses?page={page}&order={cate}" # elif cate == "seq": # url = f"{ModelSetting.get('inflearn_url')}/courses?page={page}&order={cate}" # logger.debug(f"get_lecture_list_info():url >> {url}") # # html_content = LogicInflearn.get_html(url) # # logger.debug("html_content: %s", html_content) # # tree = html.fromstring(html_content) # tmp_items = tree.xpath( # '//ul[@class="tag-courses__list e-tag-courses__list"]/li' # ) # # logger.info("tmp_items::: %s", tmp_items) # # data = { # "ret": "success", # "page": page, # "total_page": 100, # "episode_count": len(tmp_items), # "episode": [], # } # # for item in tmp_items: # entity = {} # entity["link"] = item.xpath(".//a/@href")[0] # entity["code"] = entity["link"].split("/")[-1] # entity["_code"] = item.attrib["data-id"] # # logger.debug(item) # # entity["code"] = 1 # entity["title"] = item.xpath('.//div[@class="info__basic"]//h3/text()')[ # 0 # ].strip() # entity["teacher"] = item.xpath( # './/div[@class="info__basic"]//h4/text()' # )[0].strip() # entity["price"] = item.xpath( # './/div[@class="course-card__price"]/dd/text()' # )[0].strip() # entity["image_link"] = item.xpath('.//img[@class="swiper-lazy"]/@src') # entity["chapter"] = entity["price"] # # entity["chapter"] = item.xpath(".//a/button/span//text()")[0] # # logger.info('entity:::', entity['title']) # data["episode"].append(entity) # # # json_file_path = os.path.join(download_path, "airing_list.json") # # logger.debug("json_file_path:: %s", json_file_path) # # # with open(json_file_path, "w") as outfile: # # json.dump(data, outfile) # # logger.debug("data:: %s", data) # # return data # # except Exception as e: # logger.error("Exception:%s", e) # logger.error(traceback.format_exc()) @staticmethod def get_screen_movie_info(page): try: url = f"{ModelSetting.get('inflearn_url')}/ani/page/{page}" html_content = LogicInflearn.get_html(url) download_path = ModelSetting.get("download_path") tree = html.fromstring(html_content) tmp_items = tree.xpath('//div[@class="item"]') # logger.info('tmp_items:::', tmp_items) data = {"ret": "success", "page": page} data["episode_count"] = len(tmp_items) data["episode"] = [] for item in tmp_items: entity = {} entity["link"] = item.xpath(".//a/@href")[0] entity["code"] = re.search(r"[0-9]+", entity["link"]).group() entity["title"] = item.xpath('.//span[@class="name-film"]//text()')[ 0 ].strip() entity["image_link"] = item.xpath( './/img[@class="photo"]/@data-lazy-src' )[0] # logger.info('entity:::', entity['title']) data["episode"].append(entity) json_file_path = os.path.join(download_path, "airing_list.json") logger.debug("json_file_path:: %s", json_file_path) with open(json_file_path, "w") as outfile: json.dump(data, outfile) return data except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) @staticmethod @yommi_logger(logging_type="debug") def get_title_info_old(code): try: # if ( # LogicInflearn.current_data is not None # and LogicInflearn.current_data["code"] == code # and LogicInflearn.current_data["ret"] # ): # return LogicInflearn.current_data url = "%s/course/%s" % (ModelSetting.get("inflearn_url"), parse.quote(code)) logger.info(url) html_content = LogicInflearn.get_html(url) sys.setrecursionlimit(10**7) # logger.info(html_content) # tree = html.fromstring(html_content) # tree = etree.fromstring( # html_content, parser=etree.XMLParser(huge_tree=True) # ) # tree1 = BeautifulSoup(html_content, "lxml") # tree = etree.HTML(str(soup)) # logger.info(tree) data = {"code": code, "ret": False} soup = BeautifulSoup(html_content, "html.parser") # logger.debug(soup.select_one("div.cd-header__thumbnail-cover")) data["poster_url"] = soup.select_one( "div.cd-header__thumbnail-cover div img" )["src"] data["title"] = soup.select_one("div.cd-header__title").text main_title = soup.select_one("div.cd-header__title").text # data["item_id"] = soup.select_one('meta[property="dable:item_id"]')[ # "content" # ] # item_id = data["item_id"] data["save_folder"] = data["title"] data["season"] = "1" # tmp = soup.select("ul > a") # logger.debug(f"tmp1 size:=> {str(len(tmp))}") curriculum_content = soup.find_all("a", {"class": "cd-accordion__unit"}) preview_path = [] for i, elem in enumerate(curriculum_content): # print(elem) preview_path.append(elem["href"]) # print(f"{i}. {elem['href']}") # 미리보기 가능 1번 동영상 뷰 페이지로 이동 # self.getVideoInfo(preview_path[0]) base_url = "https://www.inflearn.com" url = base_url + parse.quote(preview_path[0]) logger.debug(f"url::::: {url}") resData = requests.get(url, timeout=20) if resData.url != url: # redirect occurred; likely symbol doesn't exist or cannot be found. raise requests.TooManyRedirects() resData.raise_for_status() # soup = BeautifulSoup(resData.text, "html.parser") soup = BeautifulSoup(resData.text, "html.parser") items = soup.find_all("div", attrs={"class": "unit-el"}) # print(len(items)) lecture_list = [] # create xlsx file # wb = Workbook() # ws = wb.active # create xlsx sheet # ws.append( # ["title", "data_id", "run_time", "api_url", "file_name", "hlsUrl"] # ) temp = [] # print(type(items)) program = ( db.session.query(ModelInflearnProgram) .filter_by(programcode=code) .first() ) if program is None: program = ModelInflearnProgram(data) db.session.add(program) db.session.commit() else: data["save_folder"] = program.save_folder data["season"] = program.season for idx, item in enumerate(items): # temp1 = {} print("idx::", idx) data_id = item["data-id"] run_time = "" title = item.find("div", attrs={"class": "title"}).get_text() if item.find("span", {"class": "runtime"}) is not None: run_time = item.find("span", {"class": "runtime"}).get_text() api_url = f"{base_url}/api/course/{code}/lecture/{data_id}" temp1["season"] = "1" LogicInflearn.season = "1" # logger.debug(api_url) m3u8_info = LogicInflearn.getM3u8_info( api_url, LogicInflearn.season, idx ) # print(api_url) # print('type::::', type(m3u8_url)) logger.debug(m3u8_info) # ws.append( # [ # title, # data_id, # run_time, # api_url, # m3u8_info["name"], # m3u8_info["hlsUrl"], # ] # ) # temp.append(title, data_id, run_time, api_url,m3u8_info['name'], m3u8_info['hlsUrl']) # temp1['title'] = title temp1["save_folder"] = Util.change_text_for_use_filename( data["save_folder"] ) # logger.debug(temp1["save_folder"]) tmp_save_path = ModelSetting.get("download_path") if ModelSetting.get("auto_make_folder") == "True": program_path = os.path.join(tmp_save_path, temp1["save_folder"]) temp1["save_path"] = program_path if ModelSetting.get("inflearn_auto_make_season_folder"): temp1["save_path"] = os.path.join( temp1["save_path"], "Season %s" % int(temp1["season"]) ) temp1["title"] = title temp1["data_id"] = data_id temp1["item_id"] = m3u8_info["data_id"] temp1["code"] = temp1["item_id"] temp1["run_time"] = run_time temp1["api_url"] = api_url temp1["name"] = m3u8_info["name"] temp1["filename"] = m3u8_info["filename"] # logger.debug(temp1["name"]) # logger.debug(temp1["filename"]) temp1["url"] = m3u8_info["hlsUrl"] # temp1["url"] = m3u8_info["hlsUrl"] temp1["size"] = m3u8_info["size"] temp.append(temp1) # print(temp) # logger.info('data', data) # LogicInflearn.current_data = temp data["episode"] = temp LogicInflearn.current_data = data # logger.debug(data) return data except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) data["log"] = str(e) data["ret"] = "error" return data except IndexError as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) data["log"] = str(e) data["ret"] = "error" return data @staticmethod @yommi_logger(logging_type="debug") def get_title_info(code): try: url = "%s/course/%s" % (ModelSetting.get("inflearn_url"), parse.quote(code)) logger.info(url) html_content = LogicInflearn.get_html(url) # sys.setrecursionlimit(10**7) # logger.info(html_content) # tree = html.fromstring(html_content) # tree = etree.fromstring( # html_content, parser=etree.XMLParser(huge_tree=True) # ) # tree1 = BeautifulSoup(html_content, "lxml") # tree = etree.HTML(str(soup)) # logger.info(tree) data = {"code": code, "ret": False} soup = BeautifulSoup(html_content, "html.parser") # logger.debug(soup.select_one("div.cd-header__thumbnail-cover")) data["poster_url"] = soup.select_one( "div.cd-header__thumbnail-cover div img" )["src"] data["title"] = soup.select_one("div.cd-header__title").text main_title = soup.select_one("div.cd-header__title").text # data["item_id"] = soup.select_one('meta[property="dable:item_id"]')[ # "content" # ] # item_id = data["item_id"] data["save_folder"] = data["title"] data["season"] = "1" # tmp = soup.select("ul > a") # logger.debug(f"tmp1 size:=> {str(len(tmp))}") curriculum_content = soup.find_all("a", {"class": "cd-accordion__unit"}) preview_path = [] # for i, elem in enumerate(curriculum_content): # # print(elem) # preview_path.append(elem["href"]) # # print(f"{i}. {elem['href']}") first_item = curriculum_content[0]["href"] # 미리보기 가능 1번 동영상 뷰 페이지로 이동 # self.getVideoInfo(preview_path[0]) base_url = "https://www.inflearn.com" url = base_url + parse.quote(first_item) logger.debug(f"url::::: {url}") res_data = requests.get(url, timeout=20) if res_data.url != url: # redirect occurred; likely symbol doesn't exist or cannot be found. raise requests.TooManyRedirects() res_data.raise_for_status() # soup = BeautifulSoup(resData.text, "html.parser") soup = BeautifulSoup(res_data.text, "html.parser") items = soup.find_all("div", attrs={"class": "unit-el"}) # print(len(items)) lecture_list = [] temp = [] # print(type(items)) program = ( db.session.query(ModelInflearnProgram) .filter_by(programcode=code) .first() ) if program is None: program = ModelInflearnProgram(data) db.session.add(program) db.session.commit() else: data["save_folder"] = program.save_folder data["season"] = program.season # curriculum_urls = [] # for idx, item in enumerate(items): # data_id = item["data-id"] # api_url = f"{base_url}/api/course/{code}/lecture/{data_id}" # curriculum_urls.append(api_url) # # ret_data = asyncio.run(get_lecture_infos(curriculum_urls)) # # logger.debug(f"ret_data():: ret_data=> {ret_data}") for idx, item in enumerate(items): # temp1 = {} print("idx::", idx) data_id = item["data-id"] run_time = "" title = item.find("div", attrs={"class": "title"}).get_text() if item.find("span", {"class": "runtime"}) is not None: run_time = item.find("span", {"class": "runtime"}).get_text() api_url = f"{base_url}/api/course/{code}/lecture/{data_id}" temp1["season"] = "1" LogicInflearn.season = "1" # logger.debug(api_url) m3u8_info = LogicInflearn.getM3u8_info( api_url, LogicInflearn.season, idx ) # print(api_url) # print('type::::', type(m3u8_url)) logger.debug(m3u8_info) # ws.append( # [ # title, # data_id, # run_time, # api_url, # m3u8_info["name"], # m3u8_info["hlsUrl"], # ] # ) # temp.append(title, data_id, run_time, api_url,m3u8_info['name'], m3u8_info['hlsUrl']) # temp1['title'] = title temp1["save_folder"] = Util.change_text_for_use_filename( data["save_folder"] ) # logger.debug(temp1["save_folder"]) tmp_save_path = ModelSetting.get("download_path") if ModelSetting.get("auto_make_folder") == "True": program_path = os.path.join(tmp_save_path, temp1["save_folder"]) temp1["save_path"] = program_path if ModelSetting.get("inflearn_auto_make_season_folder"): temp1["save_path"] = os.path.join( temp1["save_path"], "Season %s" % int(temp1["season"]) ) temp1["title"] = title temp1["data_id"] = data_id temp1["item_id"] = m3u8_info["data_id"] temp1["code"] = temp1["item_id"] temp1["run_time"] = run_time temp1["api_url"] = api_url temp1["name"] = m3u8_info["name"] temp1["filename"] = m3u8_info["filename"] # logger.debug(temp1["name"]) # logger.debug(temp1["filename"]) temp1["url"] = m3u8_info["hlsUrl"] # temp1["url"] = m3u8_info["hlsUrl"] temp1["size"] = m3u8_info["size"] temp.append(temp1) # print(temp) # logger.info('data', data) # LogicInflearn.current_data = temp data["episode"] = temp LogicInflearn.current_data = data # logger.debug(data) return data except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) data["log"] = str(e) data["ret"] = "error" return data except IndexError as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) data["log"] = str(e) data["ret"] = "error" return data @staticmethod def getM3u8_info(url, season, idx): data_id = "" m3u8_url = "" name = "" size = "" duration = "" filename = "" title = "" res_data = LogicInflearn.getHtml(url, "json").json() # logger.info(f"getM3u8_info()::url => {url}") logger.info(f"getM3u8_info::url => {url}") # logger.debug("resData::: %s", res_data) try: if res_data["course"]["id"] is not None: data_id = res_data["course"]["id"] if res_data["course"]["_"]["current_unit"]["title"] is not None: title = res_data["course"]["_"]["current_unit"]["title"] if res_data["newBOX"]["video"]["name"] is not None: name = res_data["newBOX"]["video"]["name"] filename = f"{title}.{name.split('.')[0]}.S{season.zfill(2)}.E{str(idx).zfill(3)}.{name.split('.')[-1]}" if res_data["newBOX"]["video"]["vod_info"]["hlsUrl"] is not None: # logger.debug(res_data["newBOX"]["video"]["vod_info"]["hlsUrl"]) m3u8_url = res_data["newBOX"]["video"]["vod_info"]["hlsUrl"] size = res_data["newBOX"]["video"]["vod_info"]["size"] duration = res_data["newBOX"]["video"]["vod_info"]["duration"] # return { # "name": name, # "hlsUrl": m3u8_url, # "size": size, # "duration": duration, # } except KeyError: pass # name = "" # m3u8_url = "" # size = None return { "data_id": data_id, "title": title, "name": name, "hlsUrl": m3u8_url, "size": size, "duration": duration, "filename": filename, } @staticmethod def getHtml(url, header): o = parse.urlparse(url) # print(o) tmp_url = f"{o.scheme}://{o.netloc}{parse.quote(o.path)}" # print('tmp_url', tmp_url) # if (header == 'json'): # resData = requests.get(tmp_url).json() # else: # resData = requests.get(tmp_url) resData = requests.get(tmp_url) # print('resData:::', resData) if ( resData.url != tmp_url ): # redirect occurred; likely symbol doesn't exist or cannot be found. raise requests.TooManyRedirects() # print(resHtml.text) resData.raise_for_status() return resData @staticmethod def get_filename(maintitle, season, title): try: # logger.debug("get_filename()===") # logger.info("title:: %s", title) # logger.info("maintitle:: %s", maintitle) match = re.compile( r"(?P.*?)\s?((?P<season>\d+)기)?\s?((?P<epi_no>\d+)화?)" ).search(title) if match: epi_no = int(match.group("epi_no")) if epi_no < 10: epi_no = "0%s" % epi_no else: epi_no = "%s" % epi_no if int(season) < 10: season = "0%s" % season else: season = "%s" % season # title_part = match.group('title').strip() # ret = '%s.S%sE%s%s.720p-SA.mp4' % (maintitle, season, epi_no, date_str) ret = "%s.S%sE%s.720p-LK.mp4" % (maintitle, season, epi_no) else: logger.debug("NOT MATCH") ret = "%s.720p-SA.mp4" % maintitle return Util.change_text_for_use_filename(ret) except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) @staticmethod def get_info_by_code(code): logger.info(f"get_info_by_code: {code}") # logger.debug(LogicInflearn.current_data) try: if LogicInflearn.current_data is not None: for t in LogicInflearn.current_data["episode"]: if t["data_id"] == code: return t except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) @staticmethod def scheduler_function(): try: logger.debug("Linkkf scheduler_function start..") whitelist_program = ModelSetting.get("whitelist_program") whitelist_programs = [ x.strip().replace(" ", "") for x in whitelist_program.replace("\n", ",").split(",") ] logger.debug(f"whitelist_programs: {whitelist_programs}") for code in whitelist_programs: logger.info("auto download start : %s", code) downloaded = ( db.session.query(ModelInflearn) .filter(ModelInflearn.completed.is_(True)) .filter_by(programcode=code) .with_for_update() .all() ) # logger.debug(f"downloaded:: {downloaded}") dl_codes = [dl.episodecode for dl in downloaded] # logger.debug('dl_codes:: ', dl_codes) logger.info("downloaded codes :%s", dl_codes) # if len(dl_codes) > 0: data = LogicInflearn.get_title_info(code) for episode in data["episode"]: e_code = episode["code"] if e_code not in dl_codes: logger.info("Logic Queue added :%s", e_code) LogicQueue.add_queue(episode) logger.debug("========================================") except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) @staticmethod def download(form): try: ret = {} logger.debug("download call") # ret = None # options = { # "save_path": form["save_path"], # "filename": form["filename"], # "format": form["format"], # } logger.debug(form) # Todo: # tmp = LogicQueue.add_queue(form.to_dict()) tmp = LogicQueue.add_youtube_queue(form.to_dict()) logger.debug("add_queue : tmp >> %s", tmp) # ret["ret"] = "success" if tmp else "fail" ret["ret"] = tmp return ret except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) @staticmethod def reset_db() -> bool: db.session.query(ModelInflearn).delete() db.session.commit() return True @staticmethod def get_excel_info(): _path_dir = "/WD/Users/yommi/Work/fastapi/app/inflearn_xlsx" file_list = os.listdir(_path_dir) # logger.debug(file_list) return file_list