from __future__ import unicode_literals import os import traceback import tempfile from glob import glob from datetime import datetime from threading import Thread from enum import Enum from typing import Optional, Dict, List, Any, Tuple, Union import shutil as celery_shutil from .setup import P logger = P.logger ModelSetting = P.ModelSetting # yt-dlp 패키지 설정 (기본값 index 1) package_idx: str = ModelSetting.get("youtube_dl_package") if not package_idx: package_idx = "1" youtube_dl_package: str = P.youtube_dl_packages[int(package_idx)].replace("-", "_") class Status(Enum): READY = 0 START = 1 DOWNLOADING = 2 ERROR = 3 FINISHED = 4 STOP = 5 COMPLETED = 6 def __str__(self) -> str: str_list: List[str] = ["준비", "분석중", "다운로드중", "실패", "변환중", "중지", "완료"] return str_list[self.value] class MyYoutubeDL: DEFAULT_FILENAME: str = "%(title)s-%(id)s.%(ext)s" _index: int = 0 def __init__( self, plugin: str, type_name: str, url: str, filename: str, temp_path: str, save_path: Optional[str] = None, opts: Optional[Dict[str, Any]] = None, dateafter: Optional[str] = None, datebefore: Optional[str] = None, ): # yt-dlp/youtube-dl의 utils 모듈에서 DateRange 임포트 DateRange = __import__( f"{youtube_dl_package}.utils", fromlist=["DateRange"] ).DateRange if save_path is None: save_path = temp_path if opts is None: opts = {} self.plugin: str = plugin self.type: str = type_name self.url: str = url self.filename: str = filename # 임시 폴더 생성 if not os.path.isdir(temp_path): os.makedirs(temp_path) self.temp_path: str = tempfile.mkdtemp(prefix="youtube-dl_", dir=temp_path) # 저장 폴더 생성 if not os.path.isdir(save_path): os.makedirs(save_path) self.save_path: str = save_path self.opts: Dict[str, Any] = opts if dateafter or datebefore: self.opts["daterange"] = DateRange(start=dateafter, end=datebefore) self.index: int = MyYoutubeDL._index MyYoutubeDL._index += 1 self._status: Status = Status.READY self._thread: Optional[Thread] = None self.key: Optional[str] = None self.start_time: Optional[datetime] = None self.end_time: Optional[datetime] = None # 비디오 정보 self.info_dict: Dict[str, Optional[str]] = { "extractor": None, "title": None, "uploader": None, "uploader_url": None, } # 진행률 정보 self.progress_hooks: Dict[str, Optional[Union[int, float, str]]] = { "downloaded_bytes": None, "total_bytes": None, "eta": None, "speed": None, } def start(self) -> bool: """다운로드 스레드 시작""" if self.status != Status.READY: return False self._thread = Thread(target=self.run) self._thread.start() return True def run(self) -> None: """다운로드 실행 본체""" youtube_dl = __import__(youtube_dl_package) try: self.start_time = datetime.now() self.status = Status.START # 정보 추출 info_dict = MyYoutubeDL.get_info_dict( self.url, self.opts.get("proxy"), self.opts.get("cookiefile"), self.opts.get("http_headers"), self.opts.get("cookiesfrombrowser"), ) if info_dict is None: self.status = Status.ERROR return self.info_dict["extractor"] = info_dict.get("extractor", "unknown") self.info_dict["title"] = info_dict.get("title", info_dict.get("id", "unknown")) self.info_dict["uploader"] = info_dict.get("uploader", "") self.info_dict["uploader_url"] = info_dict.get("uploader_url", "") ydl_opts: Dict[str, Any] = { "logger": MyLogger(), "progress_hooks": [self.my_hook], "outtmpl": os.path.join(self.temp_path, self.filename), "ignoreerrors": True, "cachedir": False, "nocheckcertificate": True, } # yt-dlp 전용 성능 향상 옵션 if youtube_dl_package == "yt_dlp": ydl_opts.update({ "concurrent_fragment_downloads": 5, "retries": 10, }) ydl_opts.update(self.opts) with youtube_dl.YoutubeDL(ydl_opts) as ydl: logger.debug(f"다운로드 시작: {self.url}") error_code: int = ydl.download([self.url]) logger.debug(f"다운로드 종료 (코드: {error_code})") if self.status in (Status.START, Status.FINISHED, Status.DOWNLOADING): # 임시 폴더의 파일을 실제 저장 경로로 이동 for i in glob(self.temp_path + "/**/*", recursive=True): path: str = i.replace(self.temp_path, self.save_path, 1) if os.path.isdir(i): if not os.path.isdir(path): os.mkdir(path) continue celery_shutil.move(i, path) self.status = Status.COMPLETED except Exception as error: self.status = Status.ERROR logger.error(f"실행 중 예외 발생: {error}") logger.error(traceback.format_exc()) finally: if os.path.exists(self.temp_path): celery_shutil.rmtree(self.temp_path) if self.status != Status.STOP: self.end_time = datetime.now() def stop(self) -> bool: """다운로드 중지""" if self.status in (Status.ERROR, Status.STOP, Status.COMPLETED): return False self.status = Status.STOP self.end_time = datetime.now() return True @staticmethod def get_preview_url(url: str) -> Optional[str]: """미리보기용 직접 재생 가능한 URL 추출""" youtube_dl = __import__(youtube_dl_package) try: # 미리보기를 위해 다양한 포맷 시도 (mp4, hls 등) ydl_opts: Dict[str, Any] = { "format": "best[ext=mp4]/bestvideo[ext=mp4]+bestaudio[ext=m4a]/best", "logger": MyLogger(), "nocheckcertificate": True, "quiet": True, "js_runtimes": {"node": {"path": "/Users/yommi/.local/state/fnm_multishells/53824_1769161399333/bin/node"}}, } with youtube_dl.YoutubeDL(ydl_opts) as ydl: info: Dict[str, Any] = ydl.extract_info(url, download=False) # 1. HLS 매니페스트 우선 (가장 안정적으로 오디오+비디오 제공) if info.get("manifest_url"): return info["manifest_url"] # 2. 직접 URL (ydl_opts에서 지정한 best[ext=mp4] 결과) if info.get("url"): return info["url"] # 3. 포맷 목록에서 적절한 것 찾기 formats = info.get("formats", []) # 오디오와 비디오가 모두 있는 포맷 찾기 combined_formats = [f for f in formats if f.get("vcodec") != "none" and f.get("acodec") != "none"] if combined_formats: # 가장 좋은 화질의 결합 포맷 선택 return combined_formats[-1].get("url") return None except Exception as error: logger.error(f"미리보기 URL 추출 중 예외 발생: {error}") return None @staticmethod def get_version() -> str: """라이브러리 버전 확인""" __version__: str = __import__( f"{youtube_dl_package}.version", fromlist=["__version__"] ).__version__ return __version__ @staticmethod def get_info_dict( url: str, proxy: Optional[str] = None, cookiefile: Optional[str] = None, http_headers: Optional[Dict[str, str]] = None, cookiesfrombrowser: Optional[str] = None, **extra_opts ) -> Optional[Dict[str, Any]]: """비디오 메타데이터 정보 추출""" youtube_dl = __import__(youtube_dl_package) try: ydl_opts: Dict[str, Any] = { "logger": MyLogger(), "nocheckcertificate": True, "quiet": True, # JS 런타임 수동 지정 (유저 시스템 환경 반영) "js_runtimes": {"node": {"path": "/Users/yommi/.local/state/fnm_multishells/53824_1769161399333/bin/node"}}, } # 기본값으로 extract_flat 적용 (명시적으로 override 가능) if "extract_flat" not in extra_opts: ydl_opts["extract_flat"] = True # True = 모든 추출기에 적용 if proxy: ydl_opts["proxy"] = proxy if cookiefile: ydl_opts["cookiefile"] = cookiefile if http_headers: ydl_opts["http_headers"] = http_headers if cookiesfrombrowser: ydl_opts["cookiesfrombrowser"] = (cookiesfrombrowser, None, None, None) # 추가 옵션 반영 (playliststart, playlistend 등) ydl_opts.update(extra_opts) with youtube_dl.YoutubeDL(ydl_opts) as ydl: info: Dict[str, Any] = ydl.extract_info(url, download=False) except Exception as error: logger.error(f"정보 추출 중 예외 발생: {error}") logger.error(traceback.format_exc()) return None return ydl.sanitize_info(info) def my_hook(self, data: Dict[str, Any]) -> None: """진행률 업데이트 훅""" if self.status != Status.STOP: if data["status"] == "downloading": self.status = Status.DOWNLOADING elif data["status"] == "error": self.status = Status.ERROR elif data["status"] == "finished": self.status = Status.FINISHED if data["status"] != "error": self.filename = os.path.basename(data.get("filename", self.filename)) self.progress_hooks["downloaded_bytes"] = data.get("downloaded_bytes") self.progress_hooks["total_bytes"] = data.get("total_bytes") or data.get("total_bytes_estimate") self.progress_hooks["eta"] = data.get("eta") self.progress_hooks["speed"] = data.get("speed") @property def status(self) -> Status: return self._status @status.setter def status(self, value: Status) -> None: self._status = value # Socket.IO를 통한 상태 업데이트 전송 try: basic_module = P.get_module('basic') if basic_module: basic_module.socketio_emit("status", self) except Exception as e: logger.error(f"SocketIO 전송 에러: {e}") class MyLogger: """yt-dlp의 로그를 가로채서 처리하는 클래성""" def debug(self, msg: str) -> None: # 진행 상황 관련 로그는 걸러냄 if " ETA " in msg or "at" in msg and "B/s" in msg: return logger.debug(msg) def warning(self, msg: str) -> None: logger.warning(msg) def error(self, msg: str) -> None: logger.error(msg)