diff --git a/lib/camoufox_anilife.py b/lib/camoufox_anilife.py index 996d3d0..e3f7f79 100644 --- a/lib/camoufox_anilife.py +++ b/lib/camoufox_anilife.py @@ -1,32 +1,29 @@ #!/usr/bin/env python3 """ -Camoufox 기반 Anilife 비디오 URL 추출 스크립트 +Camoufox 기반 Anilife 비디오 URL 추출 스크립트 (비동기 버전) 강력한 봇 감지 우회 기능이 있는 스텔스 Firefox - -사용법: - python camoufox_anilife.py """ import sys import json -import time +import asyncio import re import os -def _run_browser(browser, detail_url, episode_num, result): - """실제 브라우저 작업을 수행하는 내부 함수""" - page = browser.new_page() +async def _run_browser(browser, detail_url, episode_num, result): + """실제 브라우저 작업을 수행하는 내부 비동기 함수""" + page = await browser.new_page() try: # 1. Detail 페이지로 이동 print(f"1. Navigating to detail page: {detail_url}", file=sys.stderr) - page.goto(detail_url, wait_until="domcontentloaded", timeout=30000) - time.sleep(2) + await page.goto(detail_url, wait_until="domcontentloaded", timeout=30000) + await asyncio.sleep(2) print(f" Current URL: {page.url}", file=sys.stderr) # 2. 에피소드 목록으로 스크롤 - page.mouse.wheel(0, 800) - time.sleep(1) + await page.mouse.wheel(0, 800) + await asyncio.sleep(1) # 3. 해당 에피소드 찾아서 클릭 print(f"2. Looking for episode {episode_num}", file=sys.stderr) @@ -35,33 +32,33 @@ def _run_browser(browser, detail_url, episode_num, result): try: # epl-num 클래스의 div에서 에피소드 번호 찾기 episode_link = page.locator(f'a:has(.epl-num:text("{episode_num}"))').first - if episode_link.is_visible(timeout=5000): - href = episode_link.get_attribute("href") + if await episode_link.is_visible(timeout=5000): + href = await episode_link.get_attribute("href") print(f" Found episode link: {href}", file=sys.stderr) - episode_link.click() + await episode_link.click() episode_clicked = True - time.sleep(3) + await asyncio.sleep(3) except Exception as e: print(f" Method 1 failed: {e}", file=sys.stderr) if not episode_clicked: try: # provider 링크들 중에서 에피소드 번호가 포함된 것 클릭 - links = page.locator('a[href*="/ani/provider/"]').all() + links = await page.locator('a[href*="/ani/provider/"]').all() for link in links: - text = link.inner_text() + text = await link.inner_text() if episode_num in text: print(f" Found: {text}", file=sys.stderr) - link.click() + await link.click() episode_clicked = True - time.sleep(3) + await asyncio.sleep(3) break except Exception as e: print(f" Method 2 failed: {e}", file=sys.stderr) if not episode_clicked: result["error"] = f"Episode {episode_num} not found" - result["html"] = page.content() + result["html"] = await page.content() return result # 4. Provider 페이지에서 _aldata 추출 @@ -71,12 +68,12 @@ def _run_browser(browser, detail_url, episode_num, result): # 리다이렉트 확인 if "/ani/provider/" not in page.url: result["error"] = f"Redirected to {page.url}" - result["html"] = page.content() + result["html"] = await page.content() return result # _aldata 추출 시도 try: - aldata_value = page.evaluate("typeof _aldata !== 'undefined' ? _aldata : null") + aldata_value = await page.evaluate("typeof _aldata !== 'undefined' ? _aldata : null") if aldata_value: result["aldata"] = aldata_value result["success"] = True @@ -86,8 +83,8 @@ def _run_browser(browser, detail_url, episode_num, result): print(f" JS error: {js_err}", file=sys.stderr) # HTML에서 _aldata 패턴 추출 시도 - html = page.content() - aldata_match = re.search(r'_aldata\s*=\s*["\']([A-Za-z0-9+/=]+)["\']', html) + html_content = await page.content() + aldata_match = re.search(r'_aldata\s*=\s*["\']([A-Za-z0-9+/=]+)["\']', html_content) if aldata_match: result["aldata"] = aldata_match.group(1) result["success"] = True @@ -97,13 +94,13 @@ def _run_browser(browser, detail_url, episode_num, result): # 5. CloudVideo 버튼 클릭 시도 print("4. Trying CloudVideo button click...", file=sys.stderr) try: - page.mouse.wheel(0, 500) - time.sleep(1) + await page.mouse.wheel(0, 500) + await asyncio.sleep(1) cloudvideo_btn = page.locator('a[onclick*="moveCloudvideo"], a[onclick*="moveJawcloud"]').first - if cloudvideo_btn.is_visible(timeout=3000): - cloudvideo_btn.click() - time.sleep(3) + if await cloudvideo_btn.is_visible(timeout=3000): + await cloudvideo_btn.click() + await asyncio.sleep(3) result["current_url"] = page.url print(f" After click URL: {page.url}", file=sys.stderr) @@ -115,7 +112,7 @@ def _run_browser(browser, detail_url, episode_num, result): # 플레이어 페이지에서 _aldata 추출 try: - aldata_value = page.evaluate("typeof _aldata !== 'undefined' ? _aldata : null") + aldata_value = await page.evaluate("typeof _aldata !== 'undefined' ? _aldata : null") if aldata_value: result["aldata"] = aldata_value result["success"] = True @@ -125,28 +122,28 @@ def _run_browser(browser, detail_url, episode_num, result): pass # HTML에서 추출 - html = page.content() - aldata_match = re.search(r'_aldata\s*=\s*["\']([A-Za-z0-9+/=]+)["\']', html) + html_content = await page.content() + aldata_match = re.search(r'_aldata\s*=\s*["\']([A-Za-z0-9+/=]+)["\']', html_content) if aldata_match: result["aldata"] = aldata_match.group(1) result["success"] = True return result - result["html"] = html + result["html"] = html_content except Exception as click_err: print(f" Click error: {click_err}", file=sys.stderr) - result["html"] = page.content() + result["html"] = await page.content() finally: - page.close() + await page.close() return result -def extract_aldata(detail_url: str, episode_num: str) -> dict: - """Camoufox로 Detail 페이지에서 _aldata 추출""" +async def extract_aldata(detail_url: str, episode_num: str) -> dict: + """AsyncCamoufox로 Detail 페이지에서 _aldata 추출""" try: - from camoufox.sync_api import Camoufox + from camoufox.async_api import AsyncCamoufox except ImportError as e: return {"error": f"Camoufox not installed: {e}"} @@ -167,13 +164,13 @@ def extract_aldata(detail_url: str, episode_num: str) -> dict: # xvfb 인자 지원 여부에 따른 안전한 실행 (Try-Except Fallback) try: - with Camoufox(**camou_args) as browser: - return _run_browser(browser, detail_url, episode_num, result) + async with AsyncCamoufox(**camou_args) as browser: + return await _run_browser(browser, detail_url, episode_num, result) except TypeError as e: if "xvfb" in str(e): print(f" Warning: Local Camoufox version too old for 'xvfb'. Falling back to headless.", file=sys.stderr) - with Camoufox(headless=True) as browser: - return _run_browser(browser, detail_url, episode_num, result) + async with AsyncCamoufox(headless=True) as browser: + return await _run_browser(browser, detail_url, episode_num, result) raise e except Exception as e: @@ -190,5 +187,10 @@ if __name__ == "__main__": detail_url = sys.argv[1] episode_num = sys.argv[2] - result = extract_aldata(detail_url, episode_num) - print(json.dumps(result, ensure_ascii=False)) + + # 비동기 실행 루프 시작 + try: + res = asyncio.run(extract_aldata(detail_url, episode_num)) + print(json.dumps(res, ensure_ascii=False)) + except Exception as e: + print(json.dumps({"error": str(e), "success": False})) diff --git a/lib/ytdlp_downloader.py b/lib/ytdlp_downloader.py index aa4104f..1264fb0 100644 --- a/lib/ytdlp_downloader.py +++ b/lib/ytdlp_downloader.py @@ -9,6 +9,7 @@ import sys import time import re import logging +import platform logger = logging.getLogger(__name__) @@ -144,10 +145,21 @@ class YtdlpDownloader: '--no-check-certificate', '--progress', '--verbose', # 디버깅용 상세 로그 - '--impersonate', 'chrome-120', # 정밀한 크롬-120 지문 사용 '--extractor-args', 'generic:force_hls', # HLS 강제 추출 '-o', self.output_path, ] + + # 1.5 환경별 브라우저 위장 설정 (Impersonate) + # macOS에서는 고급 위장 기능을 사용하되, 종속성 문제가 잦은 Linux/Docker에서는 UA 수동 지정 + is_mac = platform.system() == 'Darwin' + if is_mac: + cmd += ['--impersonate', 'chrome-120'] + logger.debug("Using yt-dlp --impersonate chrome-120 (macOS detected)") + else: + # Docker/Linux: impersonate 라이브러리 부재 가능하므로 UA 수동 설정 + user_agent = self.headers.get('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36') + cmd += ['--user-agent', user_agent] + logger.debug(f"Using manual User-Agent on {platform.system()}: {user_agent}") # 2. 프록시 설정 if self.proxy: