feat: Convert camoufox_anilife.py to an asynchronous implementation and adjust yt-dlp browser impersonation based on the operating system.

This commit is contained in:
2025-12-28 23:51:21 +09:00
parent 34c6d628a2
commit 5dc7a307fc
2 changed files with 60 additions and 46 deletions

View File

@@ -1,32 +1,29 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Camoufox 기반 Anilife 비디오 URL 추출 스크립트 Camoufox 기반 Anilife 비디오 URL 추출 스크립트 (비동기 버전)
강력한 봇 감지 우회 기능이 있는 스텔스 Firefox 강력한 봇 감지 우회 기능이 있는 스텔스 Firefox
사용법:
python camoufox_anilife.py <detail_url> <episode_num>
""" """
import sys import sys
import json import json
import time import asyncio
import re import re
import os import os
def _run_browser(browser, detail_url, episode_num, result): async def _run_browser(browser, detail_url, episode_num, result):
"""실제 브라우저 작업을 수행하는 내부 함수""" """실제 브라우저 작업을 수행하는 내부 비동기 함수"""
page = browser.new_page() page = await browser.new_page()
try: try:
# 1. Detail 페이지로 이동 # 1. Detail 페이지로 이동
print(f"1. Navigating to detail page: {detail_url}", file=sys.stderr) print(f"1. Navigating to detail page: {detail_url}", file=sys.stderr)
page.goto(detail_url, wait_until="domcontentloaded", timeout=30000) await page.goto(detail_url, wait_until="domcontentloaded", timeout=30000)
time.sleep(2) await asyncio.sleep(2)
print(f" Current URL: {page.url}", file=sys.stderr) print(f" Current URL: {page.url}", file=sys.stderr)
# 2. 에피소드 목록으로 스크롤 # 2. 에피소드 목록으로 스크롤
page.mouse.wheel(0, 800) await page.mouse.wheel(0, 800)
time.sleep(1) await asyncio.sleep(1)
# 3. 해당 에피소드 찾아서 클릭 # 3. 해당 에피소드 찾아서 클릭
print(f"2. Looking for episode {episode_num}", file=sys.stderr) print(f"2. Looking for episode {episode_num}", file=sys.stderr)
@@ -35,33 +32,33 @@ def _run_browser(browser, detail_url, episode_num, result):
try: try:
# epl-num 클래스의 div에서 에피소드 번호 찾기 # epl-num 클래스의 div에서 에피소드 번호 찾기
episode_link = page.locator(f'a:has(.epl-num:text("{episode_num}"))').first episode_link = page.locator(f'a:has(.epl-num:text("{episode_num}"))').first
if episode_link.is_visible(timeout=5000): if await episode_link.is_visible(timeout=5000):
href = episode_link.get_attribute("href") href = await episode_link.get_attribute("href")
print(f" Found episode link: {href}", file=sys.stderr) print(f" Found episode link: {href}", file=sys.stderr)
episode_link.click() await episode_link.click()
episode_clicked = True episode_clicked = True
time.sleep(3) await asyncio.sleep(3)
except Exception as e: except Exception as e:
print(f" Method 1 failed: {e}", file=sys.stderr) print(f" Method 1 failed: {e}", file=sys.stderr)
if not episode_clicked: if not episode_clicked:
try: try:
# provider 링크들 중에서 에피소드 번호가 포함된 것 클릭 # provider 링크들 중에서 에피소드 번호가 포함된 것 클릭
links = page.locator('a[href*="/ani/provider/"]').all() links = await page.locator('a[href*="/ani/provider/"]').all()
for link in links: for link in links:
text = link.inner_text() text = await link.inner_text()
if episode_num in text: if episode_num in text:
print(f" Found: {text}", file=sys.stderr) print(f" Found: {text}", file=sys.stderr)
link.click() await link.click()
episode_clicked = True episode_clicked = True
time.sleep(3) await asyncio.sleep(3)
break break
except Exception as e: except Exception as e:
print(f" Method 2 failed: {e}", file=sys.stderr) print(f" Method 2 failed: {e}", file=sys.stderr)
if not episode_clicked: if not episode_clicked:
result["error"] = f"Episode {episode_num} not found" result["error"] = f"Episode {episode_num} not found"
result["html"] = page.content() result["html"] = await page.content()
return result return result
# 4. Provider 페이지에서 _aldata 추출 # 4. Provider 페이지에서 _aldata 추출
@@ -71,12 +68,12 @@ def _run_browser(browser, detail_url, episode_num, result):
# 리다이렉트 확인 # 리다이렉트 확인
if "/ani/provider/" not in page.url: if "/ani/provider/" not in page.url:
result["error"] = f"Redirected to {page.url}" result["error"] = f"Redirected to {page.url}"
result["html"] = page.content() result["html"] = await page.content()
return result return result
# _aldata 추출 시도 # _aldata 추출 시도
try: try:
aldata_value = page.evaluate("typeof _aldata !== 'undefined' ? _aldata : null") aldata_value = await page.evaluate("typeof _aldata !== 'undefined' ? _aldata : null")
if aldata_value: if aldata_value:
result["aldata"] = aldata_value result["aldata"] = aldata_value
result["success"] = True result["success"] = True
@@ -86,8 +83,8 @@ def _run_browser(browser, detail_url, episode_num, result):
print(f" JS error: {js_err}", file=sys.stderr) print(f" JS error: {js_err}", file=sys.stderr)
# HTML에서 _aldata 패턴 추출 시도 # HTML에서 _aldata 패턴 추출 시도
html = page.content() html_content = await page.content()
aldata_match = re.search(r'_aldata\s*=\s*["\']([A-Za-z0-9+/=]+)["\']', html) aldata_match = re.search(r'_aldata\s*=\s*["\']([A-Za-z0-9+/=]+)["\']', html_content)
if aldata_match: if aldata_match:
result["aldata"] = aldata_match.group(1) result["aldata"] = aldata_match.group(1)
result["success"] = True result["success"] = True
@@ -97,13 +94,13 @@ def _run_browser(browser, detail_url, episode_num, result):
# 5. CloudVideo 버튼 클릭 시도 # 5. CloudVideo 버튼 클릭 시도
print("4. Trying CloudVideo button click...", file=sys.stderr) print("4. Trying CloudVideo button click...", file=sys.stderr)
try: try:
page.mouse.wheel(0, 500) await page.mouse.wheel(0, 500)
time.sleep(1) await asyncio.sleep(1)
cloudvideo_btn = page.locator('a[onclick*="moveCloudvideo"], a[onclick*="moveJawcloud"]').first cloudvideo_btn = page.locator('a[onclick*="moveCloudvideo"], a[onclick*="moveJawcloud"]').first
if cloudvideo_btn.is_visible(timeout=3000): if await cloudvideo_btn.is_visible(timeout=3000):
cloudvideo_btn.click() await cloudvideo_btn.click()
time.sleep(3) await asyncio.sleep(3)
result["current_url"] = page.url result["current_url"] = page.url
print(f" After click URL: {page.url}", file=sys.stderr) print(f" After click URL: {page.url}", file=sys.stderr)
@@ -115,7 +112,7 @@ def _run_browser(browser, detail_url, episode_num, result):
# 플레이어 페이지에서 _aldata 추출 # 플레이어 페이지에서 _aldata 추출
try: try:
aldata_value = page.evaluate("typeof _aldata !== 'undefined' ? _aldata : null") aldata_value = await page.evaluate("typeof _aldata !== 'undefined' ? _aldata : null")
if aldata_value: if aldata_value:
result["aldata"] = aldata_value result["aldata"] = aldata_value
result["success"] = True result["success"] = True
@@ -125,28 +122,28 @@ def _run_browser(browser, detail_url, episode_num, result):
pass pass
# HTML에서 추출 # HTML에서 추출
html = page.content() html_content = await page.content()
aldata_match = re.search(r'_aldata\s*=\s*["\']([A-Za-z0-9+/=]+)["\']', html) aldata_match = re.search(r'_aldata\s*=\s*["\']([A-Za-z0-9+/=]+)["\']', html_content)
if aldata_match: if aldata_match:
result["aldata"] = aldata_match.group(1) result["aldata"] = aldata_match.group(1)
result["success"] = True result["success"] = True
return result return result
result["html"] = html result["html"] = html_content
except Exception as click_err: except Exception as click_err:
print(f" Click error: {click_err}", file=sys.stderr) print(f" Click error: {click_err}", file=sys.stderr)
result["html"] = page.content() result["html"] = await page.content()
finally: finally:
page.close() await page.close()
return result return result
def extract_aldata(detail_url: str, episode_num: str) -> dict: async def extract_aldata(detail_url: str, episode_num: str) -> dict:
"""Camoufox로 Detail 페이지에서 _aldata 추출""" """AsyncCamoufox로 Detail 페이지에서 _aldata 추출"""
try: try:
from camoufox.sync_api import Camoufox from camoufox.async_api import AsyncCamoufox
except ImportError as e: except ImportError as e:
return {"error": f"Camoufox not installed: {e}"} return {"error": f"Camoufox not installed: {e}"}
@@ -167,13 +164,13 @@ def extract_aldata(detail_url: str, episode_num: str) -> dict:
# xvfb 인자 지원 여부에 따른 안전한 실행 (Try-Except Fallback) # xvfb 인자 지원 여부에 따른 안전한 실행 (Try-Except Fallback)
try: try:
with Camoufox(**camou_args) as browser: async with AsyncCamoufox(**camou_args) as browser:
return _run_browser(browser, detail_url, episode_num, result) return await _run_browser(browser, detail_url, episode_num, result)
except TypeError as e: except TypeError as e:
if "xvfb" in str(e): if "xvfb" in str(e):
print(f" Warning: Local Camoufox version too old for 'xvfb'. Falling back to headless.", file=sys.stderr) print(f" Warning: Local Camoufox version too old for 'xvfb'. Falling back to headless.", file=sys.stderr)
with Camoufox(headless=True) as browser: async with AsyncCamoufox(headless=True) as browser:
return _run_browser(browser, detail_url, episode_num, result) return await _run_browser(browser, detail_url, episode_num, result)
raise e raise e
except Exception as e: except Exception as e:
@@ -190,5 +187,10 @@ if __name__ == "__main__":
detail_url = sys.argv[1] detail_url = sys.argv[1]
episode_num = sys.argv[2] episode_num = sys.argv[2]
result = extract_aldata(detail_url, episode_num)
print(json.dumps(result, ensure_ascii=False)) # 비동기 실행 루프 시작
try:
res = asyncio.run(extract_aldata(detail_url, episode_num))
print(json.dumps(res, ensure_ascii=False))
except Exception as e:
print(json.dumps({"error": str(e), "success": False}))

View File

@@ -9,6 +9,7 @@ import sys
import time import time
import re import re
import logging import logging
import platform
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -144,10 +145,21 @@ class YtdlpDownloader:
'--no-check-certificate', '--no-check-certificate',
'--progress', '--progress',
'--verbose', # 디버깅용 상세 로그 '--verbose', # 디버깅용 상세 로그
'--impersonate', 'chrome-120', # 정밀한 크롬-120 지문 사용
'--extractor-args', 'generic:force_hls', # HLS 강제 추출 '--extractor-args', 'generic:force_hls', # HLS 강제 추출
'-o', self.output_path, '-o', self.output_path,
] ]
# 1.5 환경별 브라우저 위장 설정 (Impersonate)
# macOS에서는 고급 위장 기능을 사용하되, 종속성 문제가 잦은 Linux/Docker에서는 UA 수동 지정
is_mac = platform.system() == 'Darwin'
if is_mac:
cmd += ['--impersonate', 'chrome-120']
logger.debug("Using yt-dlp --impersonate chrome-120 (macOS detected)")
else:
# Docker/Linux: impersonate 라이브러리 부재 가능하므로 UA 수동 설정
user_agent = self.headers.get('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
cmd += ['--user-agent', user_agent]
logger.debug(f"Using manual User-Agent on {platform.system()}: {user_agent}")
# 2. 프록시 설정 # 2. 프록시 설정
if self.proxy: if self.proxy: