v0.6.23: Fix Linkkf download - CDP Headers wrapper, yt-dlp --add-header support

- Fix zendriver_daemon CDP Headers bug (wrap dict with zd.cdp.network.Headers())
- Fix HTML entity decoding in iframe URLs (use html.unescape())
- Simplify GDM source_type to always use 'linkkf'
This commit is contained in:
2026-01-08 01:29:36 +09:00
parent d1866111c7
commit 24217712a6
7 changed files with 181 additions and 79 deletions

View File

@@ -81,6 +81,13 @@
## 📝 변경 이력 (Changelog) ## 📝 변경 이력 (Changelog)
### v0.6.23 (2026-01-08)
- **Linkkf 다운로드 완전 복구**:
- **Zendriver Daemon CDP 헤더 버그 수정**: `zd.cdp.network.Headers()` 타입 래핑 누락으로 Referer 헤더가 적용되지 않던 문제 해결.
- **HTML 엔티티 디코딩 개선**: iframe URL의 `&` 등 HTML 엔티티를 `html.unescape()`로 올바르게 디코딩.
- **GDM yt-dlp 헤더 전달**: `--add-header` 옵션으로 Referer/User-Agent를 yt-dlp에 전달하여 CDN 리다이렉트 방지.
- **부수 효과**: Ohli24 등 모든 브라우저 기반 추출에서 동일한 헤더 적용 개선.
### v0.6.22 (2026-01-08) ### v0.6.22 (2026-01-08)
- **Linkkf 추출 로직 강화**: Cloudflare 보호가 강화된 Linkkf 도메인(flexora.xyz 등)에 대응하기 위해 브라우저 기반(Zendriver/Camoufox) 추출 엔진을 도입했습니다. - **Linkkf 추출 로직 강화**: Cloudflare 보호가 강화된 Linkkf 도메인(flexora.xyz 등)에 대응하기 위해 브라우저 기반(Zendriver/Camoufox) 추출 엔진을 도입했습니다.
- **오추출 방지**: 광고나 서비스 차단 페이지(Google Cloud 등)의 iframe을 비디오 URL로 오인하는 문제를 수정했습니다. - **오추출 방지**: 광고나 서비스 차단 페이지(Google Cloud 등)의 iframe을 비디오 URL로 오인하는 문제를 수정했습니다.

View File

@@ -1,5 +1,5 @@
title: "애니 다운로더" title: "애니 다운로더"
version: 0.6.22 version: 0.6.23
package_name: "anime_downloader" package_name: "anime_downloader"
developer: "projectdx" developer: "projectdx"
description: "anime downloader" description: "anime downloader"

31
inspect_zendriver_test.py Normal file
View File

@@ -0,0 +1,31 @@
import asyncio
import zendriver as zd
import json
import os
async def test():
try:
browser = await zd.start(headless=True)
page = await browser.get("about:blank")
# Test header setting
headers = {"Referer": "https://v2.linkkf.app/"}
try:
await page.send(zd.cdp.network.enable())
headers_obj = zd.cdp.network.Headers(headers)
await page.send(zd.cdp.network.set_extra_http_headers(headers_obj))
print("Successfully set headers")
except Exception as e:
print(f"Failed to set headers: {e}")
import traceback
traceback.print_exc()
methods = [m for m in dir(page) if not m.startswith("_")]
print(json.dumps({"methods": methods}))
await browser.stop()
except Exception as e:
import traceback
print(json.dumps({"error": str(e), "traceback": traceback.format_exc()}))
if __name__ == "__main__":
asyncio.run(test())

View File

@@ -109,6 +109,7 @@ class ZendriverHandler(BaseHTTPRequestHandler):
data: Dict[str, Any] = json.loads(body) data: Dict[str, Any] = json.loads(body)
url: Optional[str] = data.get("url") url: Optional[str] = data.get("url")
headers: Optional[Dict[str, str]] = data.get("headers")
timeout: int = cast(int, data.get("timeout", 30)) timeout: int = cast(int, data.get("timeout", 30))
if not url: if not url:
@@ -118,7 +119,7 @@ class ZendriverHandler(BaseHTTPRequestHandler):
# 비동기 fetch 실행 # 비동기 fetch 실행
if loop: if loop:
future = asyncio.run_coroutine_threadsafe( future = asyncio.run_coroutine_threadsafe(
fetch_with_browser(url, timeout), loop fetch_with_browser(url, timeout, headers), loop
) )
result: Dict[str, Any] = future.result(timeout=timeout + 15) result: Dict[str, Any] = future.result(timeout=timeout + 15)
self._send_json(200, result) self._send_json(200, result)
@@ -254,8 +255,8 @@ async def ensure_browser() -> Any:
return browser return browser
async def fetch_with_browser(url: str, timeout: int = 30) -> Dict[str, Any]: async def fetch_with_browser(url: str, timeout: int = 30, headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
"""상시 대기 브라우저로 HTML 페칭 (탭 유지 방식)""" """상시 대기 브라우저로 HTML 페칭 (탭 유지 방식, 헤더 지원)"""
global browser global browser
result: Dict[str, Any] = {"success": False, "html": "", "elapsed": 0.0} result: Dict[str, Any] = {"success": False, "html": "", "elapsed": 0.0}
@@ -283,7 +284,22 @@ async def fetch_with_browser(url: str, timeout: int = 30) -> Dict[str, Any]:
# 페이지 로드 시도 # 페이지 로드 시도
try: try:
page = await asyncio.wait_for(browser.get(url), timeout=20) # 탭(페이지) 열기 (브라우저가 없으면 생성)
page = await browser.get("about:blank") # 새 탭 열기 대신 기존 탭 재활용 혹은 about:blank 이동
# 헤더 설정 (CDP 사용)
if headers:
try:
log_debug(f"[ZendriverDaemon] Setting headers: {list(headers.keys())}")
await page.send(zd.cdp.network.enable())
# Wrap dict with Headers type for CDP compatibility
cdp_headers = zd.cdp.network.Headers(headers)
await page.send(zd.cdp.network.set_extra_http_headers(cdp_headers))
except Exception as e:
log_debug(f"[ZendriverDaemon] Failed to set headers: {e}")
# 실제 페이지 로드
await asyncio.wait_for(page.get(url), timeout=20)
nav_elapsed = time.time() - nav_start nav_elapsed = time.time() - nav_start
except asyncio.TimeoutError: except asyncio.TimeoutError:
log_debug(f"[ZendriverDaemon] Navigation timeout after 20s") log_debug(f"[ZendriverDaemon] Navigation timeout after 20s")

View File

@@ -543,7 +543,7 @@ class LogicLinkkf(AnimeModuleBase):
try: try:
if LogicOhli24.is_zendriver_daemon_running(): if LogicOhli24.is_zendriver_daemon_running():
logger.info(f"[Linkkf] Trying Zendriver Daemon: {url}") logger.info(f"[Linkkf] Trying Zendriver Daemon: {url}")
daemon_res = LogicOhli24.fetch_via_daemon(url, timeout=30) daemon_res = LogicOhli24.fetch_via_daemon(url, timeout=30, headers=LogicLinkkf.headers)
if daemon_res.get("success") and daemon_res.get("html"): if daemon_res.get("success") and daemon_res.get("html"):
elapsed = time.time() - start_time elapsed = time.time() - start_time
logger.info(f"[Linkkf] Daemon success in {elapsed:.2f}s") logger.info(f"[Linkkf] Daemon success in {elapsed:.2f}s")
@@ -712,9 +712,9 @@ class LogicLinkkf(AnimeModuleBase):
if iframe and iframe.get("src"): if iframe and iframe.get("src"):
iframe_src = iframe.get("src") iframe_src = iframe.get("src")
# HTML entity decoding (& -> &) # HTML entity decoding (& -> &, & -> &, etc.)
if "&" in iframe_src: import html as html_lib
iframe_src = iframe_src.replace("&", "&") iframe_src = html_lib.unescape(iframe_src)
logger.info(f"Found player iframe: {iframe_src}") logger.info(f"Found player iframe: {iframe_src}")
@@ -725,46 +725,67 @@ class LogicLinkkf(AnimeModuleBase):
return None, iframe_src, None return None, iframe_src, None
# m3u8 URL 패턴 찾기 (더 정밀하게) # m3u8 URL 패턴 찾기 (더 정밀하게)
# 패턴 1: url: 'https://...m3u8' # 패턴 1: url: 'https://...m3u8' 또는 url: "https://...m3u8"
m3u8_pattern = re.compile(r"url:\s*['\"]([^'\"]*\.m3u8[^'\"]*)['\"]") m3u8_pattern = re.compile(r"url:\s*['\"]([^'\"]*\.m3u8[^'\"]*)['\"]")
m3u8_match = m3u8_pattern.search(iframe_content) m3u8_match = m3u8_pattern.search(iframe_content)
# 패턴 2: <source src="https://...m3u8"> # 패턴 2: <source src="https://...m3u8">
if not m3u8_match: if not m3u8_match:
source_pattern = re.compile(r"<source[^>]+src=['\"]([^'\"]*\.m3u8[^'\"]*)['\"]") source_pattern = re.compile(r"<source[^>]+src=['\"]([^'\"]*\.m3u8[^'\"]*)['\"]", re.IGNORECASE)
m3u8_match = source_pattern.search(iframe_content) m3u8_match = source_pattern.search(iframe_content)
# 패턴 3: var src = '...m3u8' # 패턴 3: var src = '...m3u8'
if not m3u8_match: if not m3u8_match:
src_pattern = re.compile(r"src\s*=\s*['\"]([^'\"]*\.m3u8[^'\"]*)['\"]") src_pattern = re.compile(r"src\s*=\s*['\"]([^'\"]*\.m3u8[^'\"]*)['\"]", re.IGNORECASE)
m3u8_match = src_pattern.search(iframe_content) m3u8_match = src_pattern.search(iframe_content)
if m3u8_match: # 패턴 4: Artplayer 전용 더 넓은 범위
if not m3u8_match:
art_pattern = re.compile(r"url\s*:\s*['\"]([^'\"]+)['\"]")
matches = art_pattern.findall(iframe_content)
for m in matches:
if ".m3u8" in m:
video_url = m
break
if video_url:
logger.info(f"Extracted m3u8 via Artplayer pattern: {video_url}")
if m3u8_match and not video_url:
video_url = m3u8_match.group(1) video_url = m3u8_match.group(1)
if video_url:
# 상대 경로 처리 (예: cache/...) # 상대 경로 처리 (예: cache/...)
if video_url.startswith('cache/') or video_url.startswith('/cache/'): if video_url.startswith('cache/') or video_url.startswith('/cache/'):
from urllib.parse import urljoin from urllib.parse import urljoin
video_url = urljoin(iframe_src, video_url) video_url = urljoin(iframe_src, video_url)
logger.info(f"Extracted m3u8 URL: {video_url}") logger.info(f"Extracted m3u8 URL: {video_url}")
else: else:
logger.warning(f"m3u8 URL not found in iframe. Content snippet: {iframe_content[:200]}...") logger.warning(f"m3u8 URL not found in iframe for: {playid_url}")
# HTML 내용이 너무 길면 앞부분만 로깅
snippet = iframe_content.replace('\n', ' ')
logger.debug(f"Iframe Content snippet (500 chars): {snippet[:500]}...")
# 'cache/' 가 들어있는지 확인
if 'cache/' in iframe_content:
logger.debug("Found 'cache/' keyword in iframe content but regex failed. Inspection required.")
# VTT 자막 URL 추출 # VTT 자막 URL 추출
vtt_pattern = re.compile(r"['\"]src['\"]?:\s*['\"]([^'\"]*\.vtt)['\"]") # VTT 자막 URL 추출 (패턴 1: generic src)
vtt_pattern = re.compile(r"['\"]src['\"]?:\s*['\"]([^'\"]*\.vtt)['\"]", re.IGNORECASE)
vtt_match = vtt_pattern.search(iframe_content) vtt_match = vtt_pattern.search(iframe_content)
if not vtt_match:
vtt_pattern2 = re.compile(r"url:\s*['\"]([^'\"]*\.vtt)['\"]")
vtt_match = vtt_pattern2.search(iframe_content)
if not vtt_match:
vtt_pattern3 = re.compile(r"<track[^>]+src=['\"]([^'\"]*\.vtt)['\"]")
vtt_match = vtt_pattern3.search(iframe_content)
# 패턴 2: url: '...vtt' (Artplayer 등)
if not vtt_match:
vtt_pattern = re.compile(r"url:\s*['\"]([^'\"]*\.vtt[^'\"]*)['\"]", re.IGNORECASE)
vtt_match = vtt_pattern.search(iframe_content)
if vtt_match: if vtt_match:
vtt_url = vtt_match.group(1) vtt_url = vtt_match.group(1)
if vtt_url.startswith('/'): if vtt_url.startswith('s/') or vtt_url.startswith('/s/'):
from urllib.parse import urljoin from urllib.parse import urljoin
vtt_url = urljoin(iframe_src, vtt_url) vtt_url = urljoin(iframe_src, vtt_url)
logger.info(f"Extracted VTT URL: {vtt_url}") logger.info(f"Extracted VTT URL: {vtt_url}")
else:
logger.debug("VTT URL not found in iframe content.")
referer_url = iframe_src referer_url = iframe_src
else: else:
@@ -1664,9 +1685,8 @@ class LogicLinkkf(AnimeModuleBase):
download_method = P.ModelSetting.get("linkkf_download_method") or "ytdlp" download_method = P.ModelSetting.get("linkkf_download_method") or "ytdlp"
download_threads = P.ModelSetting.get_int("linkkf_download_threads") or 16 download_threads = P.ModelSetting.get_int("linkkf_download_threads") or 16
# Linkkf는 항상 'linkkf' source_type 사용 (GDM에서 YtdlpAria2Downloader로 매핑됨)
gdm_source_type = "linkkf" gdm_source_type = "linkkf"
if download_method in ['ytdlp', 'aria2c']:
gdm_source_type = "general"
# Prepare GDM options # Prepare GDM options
gdm_options = { gdm_options = {

View File

@@ -290,13 +290,17 @@ class LogicOhli24(AnimeModuleBase):
return False return False
@classmethod @classmethod
def fetch_via_daemon(cls, url: str, timeout: int = 30) -> dict: def fetch_via_daemon(cls, url: str, timeout: int = 30, headers: dict = None) -> dict:
"""데몬을 통한 HTML 페칭 (빠름)""" """데몬을 통한 HTML 페칭 (빠름, 헤더 지원)"""
try: try:
import requests import requests
payload = {"url": url, "timeout": timeout}
if headers:
payload["headers"] = headers
resp = requests.post( resp = requests.post(
f"http://127.0.0.1:{cls.zendriver_daemon_port}/fetch", f"http://127.0.0.1:{cls.zendriver_daemon_port}/fetch",
json={"url": url, "timeout": timeout}, json=payload,
timeout=timeout + 5 timeout=timeout + 5
) )
if resp.status_code == 200: if resp.status_code == 200:

View File

@@ -1,61 +1,85 @@
import requests
import asyncio import json
import zendriver as zd import re
import sys import sys
import os
import subprocess
async def test(): def test_fetch():
print("=== Zendriver Google Chrome Debug (v0.5.14) ===") url = "https://playv2.sub3.top/r2/play.php?&id=n20&url=405686s1"
headers = {
"Referer": "https://linkkf.live/",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36"
}
# Check possible paths daemon_url = "http://127.0.0.1:19876/fetch"
bin_paths = ["/usr/bin/google-chrome", "/usr/bin/google-chrome-stable", "/usr/bin/chromium-browser"] payload = {
"url": url,
"headers": headers,
"timeout": 30
}
for browser_bin in bin_paths: print(f"Fetching {url} via daemon...")
if not os.path.exists(browser_bin): try:
continue resp = requests.post(daemon_url, json=payload, timeout=40)
if resp.status_code != 200:
print(f"\n>>> Testing binary: {browser_bin}") print(f"Error: HTTP {resp.status_code}")
print(resp.text)
return
# 1. Version Check data = resp.json()
try: if not data.get("success"):
out = subprocess.check_output([browser_bin, "--version"], stderr=subprocess.STDOUT).decode() print(f"Fetch failed: {data.get('error')}")
print(f"Version: {out.strip()}") return
except Exception as e:
print(f"Version check failed: {e}") html = data.get("html", "")
if hasattr(e, 'output'): print(f"Fetch success. Length: {len(html)}")
print(f"Output: {e.output.decode()}")
# Save for inspection
# 2. Minimum execution test (Headless + No Sandbox) with open("linkkf_player_test.html", "w", encoding="utf-8") as f:
print("--- Direct Execution Test ---") f.write(html)
try: print("Saved to linkkf_player_test.html")
cmd = [browser_bin, "--headless", "--no-sandbox", "--disable-gpu", "--user-data-dir=/tmp/test_chrome", "--about:blank"]
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Try regex patterns from mod_linkkf.py
await asyncio.sleep(3) patterns = [
if proc.poll() is None: r"url:\s*['\"]([^'\"]*\.m3u8[^'\"]*)['\"]",
print("SUCCESS: Browser process is alive!") r"<source[^>]+src=['\"]([^'\"]*\.m3u8[^'\"]*)['\"]",
proc.terminate() r"src\s*=\s*['\"]([^'\"]*\.m3u8[^'\"]*)['\"]",
r"url\s*:\s*['\"]([^'\"]+)['\"]"
]
found = False
for p in patterns:
match = re.search(p, html, re.IGNORECASE)
if match:
url_found = match.group(1)
if ".m3u8" in url_found or "m3u8" in p:
print(f"Pattern '{p}' found: {url_found}")
found = True
if not found:
print("No m3u8 found with existing patterns.")
# Search for any .m3u8
any_m3u8 = re.findall(r"['\"]([^'\"]*\.m3u8[^'\"]*)['\"]", html)
if any_m3u8:
print(f"Generic search found {len(any_m3u8)} m3u8 links:")
for m in any_m3u8[:5]:
print(f" - {m}")
else: else:
stdout, stderr = proc.communicate() print("No .m3u8 found in generic search either.")
print(f"FAIL: Browser process died (code {proc.returncode})") # Check for other video extensions or potential indicators
print(f"STDERR: {stderr.decode()}") if "Artplayer" in html:
except Exception as e: print("Artplayer detected.")
print(f"Execution test failed: {e}") if "video" in html:
print("Video tag found.")
# Check for 'cache/'
if "cache/" in html:
print("Found 'cache/' keyword.")
cache_links = re.findall(r"['\"]([^'\"]*cache/[^'\"]*)['\"]", html)
for c in cache_links:
print(f" - Possible cache link: {c}")
# 3. Zendriver Test except Exception as e:
print("--- Zendriver Integration Test ---") print(f"Exception: {e}")
try:
browser = await zd.start(
browser_executable_path=browser_bin,
headless=True,
sandbox=False
)
print("SUCCESS: Zendriver connected!")
await browser.stop()
# If we found one that works, we can stop
print("\n!!! This path works. Set this in the plugin settings or leave empty if it is the first found.")
except Exception as e:
print(f"Zendriver failed: {e}")
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(test()) test_fetch()