from typing import Any, Dict, List, Optional, Tuple, Union from flask import Response, jsonify, render_template from loguru import logger from support import SupportYaml from tool import ToolUtil from .model import ModelYoutubeDlItem from .my_youtube_dl import MyYoutubeDL, Status from .setup import * import platform import os import subprocess import traceback import sys from datetime import datetime try: from gommi_downloader_manager.mod_queue import ModuleQueue except ImportError as e: # 디버깅용 로그 - 실제 운영 시에는 logger를 써야 하지만, 초기화 전일 수 있음 print(f"GDM Import Error: {e}") try: from framework import app, logger logger.error(f"GDM Import Error in youtube-dl: {e}") except: pass ModuleQueue = None class ModuleBasic(PluginModuleBase): """유튜브 다운로더의 기본 기능을 담당하는 모듈""" youtube_dl_list: List[MyYoutubeDL] = [] def __init__(self, P: Any) -> None: super(ModuleBasic, self).__init__( P, name="basic", first_menu="setting", scheduler_desc="유튜브 다운로더" ) self.db_default: Dict[str, str] = { "db_version": "2", "youtube_dl_package": "1", "ffmpeg_path": "" if platform.system() != "Windows" else os.path.join("PATH_APP_ROOT", "bin", "Windows", "ffmpeg.exe"), "temp_path": os.path.join("{PATH_DATA}", "download_tmp"), "save_path": os.path.join("{PATH_DATA}", "download"), "default_filename": "", "proxy": "", } self.web_list_model: Any = ModelYoutubeDlItem def process_menu(self, sub: str, req: Any) -> Union[Response, str]: """메뉴별 템플릿 렌더링""" logger.debug(f"sub: {sub}") arg: Dict[str, Any] = P.ModelSetting.to_dict() arg["package_name"] = P.package_name arg["package_version"] = P.plugin_info["version"] arg["template_name"] = f"{P.package_name}_{sub}" # JS 파일명 하위 호환성 logger.debug(f"arg:: {arg}") if sub == "setting": arg["is_include"] = F.scheduler.is_include(self.get_scheduler_name()) arg["is_running"] = F.scheduler.is_running(self.get_scheduler_name()) arg["package_list"] = [[x, x] for x in P.youtube_dl_packages] arg["youtube_dl_version"] = self.get_youtube_dl_version() arg["DEFAULT_FILENAME"] = self.get_default_filename() elif sub == "download": default_filename: Optional[str] = P.ModelSetting.get("default_filename") arg["filename"] = ( default_filename if default_filename else self.get_default_filename() ) arg["preset_list"] = self.get_preset_list() arg["postprocessor_list"] = self.get_postprocessor_list() return render_template(f"{P.package_name}_{self.name}_{sub}.html", arg=arg) def plugin_load(self) -> None: """플러그인 로드 시 업그레이드 체크""" try: package_idx: str = P.ModelSetting.get("youtube_dl_package") if not package_idx: package_idx = "1" youtube_dl: str = P.youtube_dl_packages[int(package_idx)] logger.debug(f"{youtube_dl} 업그레이드 체크") # 선택된 패키지(yt-dlp 등)를 최신 버전으로 업데이트 subprocess.check_output( [sys.executable, "-m", "pip", "install", "--upgrade", youtube_dl], universal_newlines=True, ) except Exception as error: logger.error(f"플러그인 로드 중 예외 발생: {error}") logger.error(traceback.format_exc()) def get_youtube_dl_version(self) -> str: """다운로드 라이브러리 버전 획득""" try: return MyYoutubeDL.get_version() except Exception as error: logger.error(f"버전 확인 중 예외 발생: {error}") # logger.error(traceback.format_exc()) return "패키지 임포트 실패" def process_command(self, command: str, arg1: Any, arg2: Any, arg3: Any, req: Any) -> Response: """일반 커맨드 처리 (현재 미사용)""" ret: Dict[str, str] = {"ret": "success"} return jsonify(ret) def process_ajax(self, sub: str, req: Any) -> Response: """UI에서의 AJAX 요청 처리""" try: logger.debug(f"AJAX 요청: {sub}, {req.values}") ret: Dict[str, Any] = {"ret": "success"} if sub == "ffmpeg_version": path: str = req.form["path"] output: bytes = subprocess.check_output([path, "-version"]) ret["data"] = output.decode().replace("\n", "
") elif sub == "download": postprocessor: str = req.form["postprocessor"] video_convertor: List[str] extract_audio: List[str] video_convertor, extract_audio = self.get_postprocessor() preferedformat: Optional[str] = None preferredcodec: Optional[str] = None preferredquality: Optional[int] = None if postprocessor in video_convertor: preferedformat = postprocessor elif postprocessor in extract_audio: preferredcodec = postprocessor preferredquality = 192 # GDM 연동 시도 if ModuleQueue: gdm_options = { 'proxy': P.ModelSetting.get("proxy"), 'ffmpeg_path': P.ModelSetting.get("ffmpeg_path"), } if req.form.get("format"): gdm_options['format'] = req.form["format"] if preferredcodec: gdm_options['extract_audio'] = True gdm_options['audio_format'] = preferredcodec if preferedformat: gdm_options['merge_output_format'] = preferedformat if postprocessor == "60fps": # recode-video를 mkv로 강제하여 무조건 재인코딩(변환) 단계가 실행되도록 함. # 그래야만 VideoConvertor 필터가 적용됨. gdm_options['extra_args'] = gdm_options.get('extra_args', []) + [ '--recode-video', 'mkv', '--postprocessor-args', 'VideoConvertor:-c:v libx264 -crf 23 -vf minterpolate=fps=60' ] # 상세 로그 확인을 위해 verbose 추가 가능 (디버깅용) # gdm_options['extra_args'] = gdm_options.get('extra_args', []) + ['--verbose'] # GDM 큐에 추가 task = ModuleQueue.add_download( url=req.form["url"], save_path=ToolUtil.make_path(P.ModelSetting.get("save_path")), filename=req.form["filename"], source_type='youtube', caller_plugin=P.package_name, **gdm_options ) if task: ret["ret"] = "success" ret["msg"] = "GDM 대기열에 추가되었습니다." return jsonify(ret) # GDM 미설치 시 기존 방식 (직접 다운로드) youtube_dl: Optional[MyYoutubeDL] = self.download( plugin=P.package_name, url=req.form["url"], filename=req.form["filename"], temp_path=ToolUtil.make_path(P.ModelSetting.get("temp_path")), save_path=ToolUtil.make_path(P.ModelSetting.get("save_path")), format=req.form["format"], preferedformat=preferedformat, preferredcodec=preferredcodec, preferredquality=preferredquality, proxy=P.ModelSetting.get("proxy"), ffmpeg_path=P.ModelSetting.get("ffmpeg_path"), ) if youtube_dl: youtube_dl.start() self.socketio_emit("add", youtube_dl) ret["ret"] = "info" ret["msg"] = "분석중..." else: ret["ret"] = "danger" ret["msg"] = "다운로드 실패" elif sub == "list": ret["data"] = [] for i in self.youtube_dl_list: data: Optional[Dict[str, Any]] = self.get_data(i) if data is not None: ret["data"].append(data) elif sub == "all_stop": for i in self.youtube_dl_list: i.stop() elif sub == "stop": index: int = int(req.form["index"]) self.youtube_dl_list[index].stop() elif sub == "thumbnail": youtube_dl = self.download( plugin=P.package_name, url=req.form["url"], filename=req.form["filename"], temp_path=ToolUtil.make_path(P.ModelSetting.get("temp_path")), save_path=ToolUtil.make_path(P.ModelSetting.get("save_path")), format="best", proxy=P.ModelSetting.get("proxy"), ffmpeg_path=P.ModelSetting.get("ffmpeg_path"), type="thumbnail", ) if youtube_dl: youtube_dl.start() self.socketio_emit("add", youtube_dl) ret["ret"] = "info" ret["msg"] = "분석중..." else: ret["ret"] = "danger" ret["msg"] = "다운로드 실패" elif sub == "sub": opts: Dict[str, Any] = {} if req.form.get("all_subs") == "true": opts["allsubtitles"] = True else: opts["writesubtitles"] = True youtube_dl = self.download( plugin=P.package_name, url=req.form["url"], filename=req.form["filename"], temp_path=ToolUtil.make_path(P.ModelSetting.get("temp_path")), save_path=ToolUtil.make_path(P.ModelSetting.get("save_path")), format="best", proxy=P.ModelSetting.get("proxy"), ffmpeg_path=P.ModelSetting.get("ffmpeg_path"), type="sub", opts=opts, ) if youtube_dl: youtube_dl.start() self.socketio_emit("add", youtube_dl) ret["ret"] = "info" ret["msg"] = "분석중..." else: ret["ret"] = "danger" ret["msg"] = "다운로드 실패" elif sub == "preview": url: str = req.form["url"] data: Optional[str] = MyYoutubeDL.get_preview_url(url) if data: ret["data"] = data else: ret["ret"] = "warning" ret["msg"] = "미리보기 URL을 가져올 수 없습니다." return jsonify(ret) except Exception as error: logger.error(f"AJAX 처리 중 예외 발생: {error}") logger.error(traceback.format_exc()) return jsonify({"ret": "danger", "msg": str(error)}) def process_api(self, sub: str, req: Any) -> Response: """외부 모듈(Youtube 플러그인 등)에서의 API 요청 처리""" try: if sub == "info_dict": url: str = req.values.get("url") proxy: Optional[str] = req.values.get("proxy") data: Optional[Dict[str, Any]] = MyYoutubeDL.get_info_dict(url, proxy) return jsonify(data) elif sub == "download": # Gommi Download Manager 연동 if ModuleQueue: url = req.values.get("url") save_path = ToolUtil.make_path(req.values.get("save_path") or P.ModelSetting.get("save_path")) filename = req.values.get("filename") task = ModuleQueue.add_download( url=url, save_path=save_path, filename=filename, source_type='youtube', caller_plugin='youtube-dl' ) if task: return jsonify({'ret': 'success', 'msg': '큐에 추가됨', 'task_id': task.id}) else: return jsonify({'ret': 'fail'}) youtube_dl: Optional[MyYoutubeDL] = self.download( plugin=req.values.get("plugin"), url=req.values.get("url"), filename=req.values.get("filename"), temp_path=ToolUtil.make_path(P.ModelSetting.get("temp_path")), save_path=ToolUtil.make_path(req.values.get("save_path") or P.ModelSetting.get("save_path")), format=req.values.get("format"), preferedformat=req.values.get("preferedformat"), preferredcodec=req.values.get("preferredcodec"), preferredquality=req.values.get("preferredquality"), proxy=req.values.get("proxy") or P.ModelSetting.get("proxy"), ffmpeg_path=req.values.get("ffmpeg_path") or P.ModelSetting.get("ffmpeg_path"), key=req.values.get("key"), ) if youtube_dl: if req.values.get("start") == "True" or req.values.get("start") is None: youtube_dl.start() return jsonify(self.get_data(youtube_dl)) else: return jsonify({"ret": "error"}) elif sub == "status": index: int = int(req.values.get("index")) return jsonify(self.get_data(self.youtube_dl_list[index])) elif sub == "stop": index: int = int(req.values.get("index")) self.youtube_dl_list[index].stop() return jsonify({"ret": "success"}) except Exception as error: logger.error(f"API 처리 중 예외 발생: {error}") logger.error(traceback.format_exc()) return jsonify({"ret": "error", "msg": str(error)}) def download(self, **kwargs: Any) -> Optional[MyYoutubeDL]: """다운로드 객체 생성 및 리스트 관리""" try: logger.debug(kwargs) plugin: str = kwargs["plugin"] url: str = kwargs["url"] filename: str = kwargs["filename"] temp_path: str = kwargs["temp_path"] save_path: str = kwargs["save_path"] type_name: str = kwargs.get("type", "video") opts: Dict[str, Any] = kwargs.get("opts", {}) if "format" in kwargs and kwargs["format"]: opts["format"] = kwargs["format"] if type_name == "thumbnail": opts["writethumbnail"] = True opts["skip_download"] = True elif type_name == "sub": opts["skip_download"] = True postprocessor: List[Dict[str, Any]] = opts.get("postprocessors", []) if "preferedformat" in kwargs and kwargs["preferedformat"]: postprocessor.append( { "key": "FFmpegVideoConvertor", "preferedformat": kwargs["preferedformat"], } ) if "preferredcodec" in kwargs and kwargs["preferredcodec"]: postprocessor.append( { "key": "FFmpegExtractAudio", "preferredcodec": kwargs["preferredcodec"], "preferredquality": str(kwargs.get("preferredquality", 192)), } ) if postprocessor: opts["postprocessors"] = postprocessor if "playlist" in kwargs and kwargs["playlist"]: if kwargs["playlist"] == "reverse": opts["playlistreverse"] = True elif kwargs["playlist"] == "random": opts["playlistrandom"] = True else: opts["playlist_items"] = kwargs["playlist"] if "archive" in kwargs and kwargs["archive"]: opts["download_archive"] = kwargs["archive"] if "proxy" in kwargs and kwargs["proxy"]: opts["proxy"] = kwargs["proxy"] if "ffmpeg_path" in kwargs and kwargs["ffmpeg_path"]: opts["ffmpeg_location"] = kwargs["ffmpeg_path"] if "cookiefile" in kwargs and kwargs["cookiefile"]: opts["cookiefile"] = kwargs["cookiefile"] if "headers" in kwargs and kwargs["headers"]: opts["http_headers"] = kwargs["headers"] dateafter: Optional[str] = kwargs.get("dateafter") youtube_dl: MyYoutubeDL = MyYoutubeDL( plugin, type_name, url, filename, temp_path, save_path, opts, dateafter ) youtube_dl.key = kwargs.get("key") self.youtube_dl_list.append(youtube_dl) return youtube_dl except Exception as error: logger.error(f"다운로드 객체 생성 중 예외 발생: {error}") logger.error(traceback.format_exc()) return None def get_data(self, youtube_dl: MyYoutubeDL) -> Optional[Dict[str, Any]]: """다운로드 객체의 현재 상태 데이터를 딕셔너리로 변환""" try: data: Dict[str, Any] = {} data["plugin"] = youtube_dl.plugin data["url"] = youtube_dl.url data["filename"] = youtube_dl.filename data["temp_path"] = youtube_dl.temp_path data["save_path"] = youtube_dl.save_path data["index"] = youtube_dl.index data["status_str"] = youtube_dl.status.name data["status_ko"] = str(youtube_dl.status) data["end_time"] = "" data["extractor"] = youtube_dl.type + ( " - " + youtube_dl.info_dict["extractor"] if youtube_dl.info_dict["extractor"] is not None else "" ) data["title"] = ( youtube_dl.info_dict["title"] if youtube_dl.info_dict["title"] is not None else youtube_dl.url ) data["uploader"] = ( youtube_dl.info_dict["uploader"] if youtube_dl.info_dict["uploader"] is not None else "" ) data["uploader_url"] = ( youtube_dl.info_dict["uploader_url"] if youtube_dl.info_dict["uploader_url"] is not None else "" ) data["downloaded_bytes_str"] = "" data["total_bytes_str"] = "" data["percent"] = "0" data["eta"] = ( str(youtube_dl.progress_hooks["eta"]) if youtube_dl.progress_hooks["eta"] is not None else "" ) data["speed_str"] = ( self.human_readable_size(youtube_dl.progress_hooks["speed"], "/s") if youtube_dl.progress_hooks["speed"] is not None else "" ) if youtube_dl.status == Status.READY: data["start_time"] = "" data["download_time"] = "" else: if youtube_dl.end_time is None: download_time: Any = datetime.now() - youtube_dl.start_time else: download_time = youtube_dl.end_time - youtube_dl.start_time data["end_time"] = youtube_dl.end_time.strftime("%m-%d %H:%M:%S") if None not in ( youtube_dl.progress_hooks["downloaded_bytes"], youtube_dl.progress_hooks["total_bytes"], ): data["downloaded_bytes_str"] = self.human_readable_size( youtube_dl.progress_hooks["downloaded_bytes"] ) data["total_bytes_str"] = self.human_readable_size( youtube_dl.progress_hooks["total_bytes"] ) data[ "percent" ] = f"{(float(youtube_dl.progress_hooks['downloaded_bytes']) / float(youtube_dl.progress_hooks['total_bytes']) * 100):.2f}" data["start_time"] = youtube_dl.start_time.strftime("%m-%d %H:%M:%S") data[ "download_time" ] = f"{int(download_time.seconds / 60):02d}:{int(download_time.seconds % 60):02d}" return data except Exception as error: logger.error(f"상태 데이터 변환 중 예외 발생: {error}") logger.error(traceback.format_exc()) return None def human_readable_size(self, size: Union[int, float, None], suffix: str = "") -> str: """바이트 단위를 사람이 읽기 쉬운 형식으로 변환""" if size is None: return "" for unit in ("Bytes", "KB", "MB", "GB", "TB", "PB", "EB", "ZB"): if size < 1024.0: return f"{size:3.1f} {unit}{suffix}" size /= 1024.0 return f"{size:.1f} YB{suffix}" def socketio_emit(self, cmd: str, data: MyYoutubeDL) -> None: """Socket.IO 메시지 전송""" F.socketio.emit( cmd, self.get_data(data), namespace=f"/{P.package_name}", ) def get_postprocessor(self) -> Tuple[List[str], List[str]]: """후처리 리스트 분류 (비디오/오디오)""" video_convertor: List[str] = [] extract_audio: List[str] = [] for i in self.get_postprocessor_list(): if i[2] == "비디오 변환": video_convertor.append(i[0]) elif i[2] == "오디오 추출": extract_audio.append(i[0]) return video_convertor, extract_audio # def plugin_load(self): # if ( # os.path.exists( # ToolUtil.make_path(P.ModelSetting.get(f"{self.name}_path_config")) # ) # is False # ): # shutil.copyfile( # os.path.join( # os.path.dirname(__file__), "files", f"config_{self.name}.yaml" # ), # ToolUtil.make_path(P.ModelSetting.get(f"{self.name}_path_config")), # ) def get_default_filename(self) -> str: return MyYoutubeDL.DEFAULT_FILENAME @staticmethod def get_preset_list() -> List[List[str]]: return [ ["bestvideo+bestaudio/best", "최고 화질"], ["bestvideo[height<=1080]+bestaudio/best[height<=1080]", "1080p"], ["worstvideo+worstaudio/worst", "최저 화질"], ["bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]", "최고 화질(mp4)"], [ "bestvideo[ext=mp4][height<=1080]+bestaudio[ext=m4a]/best[ext=mp4][height<=1080]", "1080p(mp4)", ], ["bestvideo[filesize<50M]+bestaudio/best[filesize<50M]", "50MB 미만"], ["bestaudio/best", "오디오만"], ["_custom", "사용자 정의"], ] @staticmethod def get_postprocessor_list() -> List[List[Union[str, None]]]: return [ ["", "후처리 안함", None], ["mp4", "MP4", "비디오 변환"], ["flv", "FLV", "비디오 변환"], ["webm", "WebM", "비디오 변환"], ["ogg", "Ogg", "비디오 변환"], ["mkv", "MKV", "비디오 변환"], ["ts", "TS", "비디오 변환"], ["avi", "AVI", "비디오 변환"], ["wmv", "WMV", "비디오 변환"], ["mov", "MOV", "비디오 변환"], ["gif", "GIF", "비디오 변환"], ["mp3", "MP3", "오디오 추출"], ["aac", "AAC", "오디오 추출"], ["flac", "FLAC", "오디오 추출"], ["m4a", "M4A", "오디오 추출"], ["opus", "Opus", "오디오 추출"], ["vorbis", "Vorbis", "오디오 추출"], ["wav", "WAV", "오디오 추출"], # ["60fps", "60fps 보간 (느림)", "고급 변환"], ]