From 94a0b78ca8a3e706d038b4b17719ae017d015421 Mon Sep 17 00:00:00 2001 From: tmwgsicp <2589462900@qq.com> Date: Mon, 23 Mar 2026 14:19:41 +0800 Subject: [PATCH] feat: add audio extraction, type-10 posts, and comprehensive unavailability detection Made-with: Cursor --- .gitignore | 1 + env.example | 2 + utils/article_fetcher.py | 83 +++++++++---- utils/content_processor.py | 17 ++- utils/helpers.py | 247 +++++++++++++++++++++++++++++++++++-- utils/rss_poller.py | 20 ++- utils/webhook.py | 1 + 7 files changed, 330 insertions(+), 41 deletions(-) diff --git a/.gitignore b/.gitignore index f9be8a4..11f4934 100644 --- a/.gitignore +++ b/.gitignore @@ -35,6 +35,7 @@ venv.bak/ .vscode/ .idea/ .cursor/ +.claude/ *.swp *.swo *~ diff --git a/env.example b/env.example index ec317c1..fe1c1ef 100644 --- a/env.example +++ b/env.example @@ -21,6 +21,8 @@ WEBHOOK_NOTIFICATION_INTERVAL=300 # RSS 订阅配置 # 轮询间隔(秒),默认 3600(1 小时) RSS_POLL_INTERVAL=3600 +# 每次轮询拉取的文章批次数,默认 10(高频更新的公众号可适当增大) +ARTICLES_PER_POLL=10 # RSS 轮询时是否获取完整文章内容(true/false),默认 true # ⚠️ 启用时强烈建议配置下方的 PROXY_URLS,避免账号被微信风控 RSS_FETCH_FULL_CONTENT=true diff --git a/utils/article_fetcher.py b/utils/article_fetcher.py index 496c445..dff62bb 100644 --- a/utils/article_fetcher.py +++ b/utils/article_fetcher.py @@ -44,42 +44,83 @@ async def _fetch_via_proxy( article_url: str, timeout: int, wechat_cookie: Optional[str] = None, - wechat_token: Optional[str] = None + wechat_token: Optional[str] = None, + max_retries: int = 2 ) -> Optional[str]: - """通过 SOCKS5 代理或直连获取文章""" + """ + 通过 SOCKS5 代理或直连获取文章 + + Args: + article_url: 文章 URL + timeout: 超时时间 + wechat_cookie: 微信 Cookie + wechat_token: 微信 Token + max_retries: 内容验证失败时的最大重试次数(每次会尝试不同代理) + """ try: - # 使用现有的 http_client(支持代理池轮转 + 直连兜底) from utils.http_client import fetch_page - logger.info("[Proxy] %s", article_url[:80]) + logger.info("[Fetch] %s", article_url[:80]) - # 构建完整 URL(带 token) full_url = article_url if wechat_token: separator = '&' if '?' in article_url else '?' full_url = f"{article_url}{separator}token={wechat_token}" - # 准备请求头 extra_headers = {"Referer": "https://mp.weixin.qq.com/"} if wechat_cookie: extra_headers["Cookie"] = wechat_cookie - html = await fetch_page( - full_url, - extra_headers=extra_headers, - timeout=timeout - ) + for attempt in range(max_retries + 1): + try: + html = await fetch_page( + full_url, + extra_headers=extra_headers, + timeout=timeout + ) + + from utils.helpers import has_article_content, is_article_unavailable + + if is_article_unavailable(html): + logger.warning("[Fetch] permanently unavailable (attempt %d/%d) %s", + attempt + 1, max_retries + 1, article_url[:60]) + return html + + if has_article_content(html): + logger.info("[Fetch] len=%d (attempt %d/%d)", + len(html), attempt + 1, max_retries + 1) + return html + else: + hint = "unknown" + if "验证" in html or "verify" in html.lower() or "环境异常" in html: + hint = "wechat_verification" + elif "请登录" in html or "login" in html.lower(): + hint = "login_required" + elif "location.replace" in html or "location.href" in html: + hint = "redirect_page" + elif len(html) < 1000: + hint = "empty_or_blocked" + + logger.warning( + "[Fetch] invalid (len=%d, hint=%s, attempt %d/%d) %s", + len(html), hint, attempt + 1, max_retries + 1, + article_url[:60] + ) + if attempt < max_retries: + await asyncio.sleep(1) + continue + + except Exception as e: + logger.warning("[Fetch] request error: %s (attempt %d/%d)", + str(e)[:100], attempt + 1, max_retries + 1) + if attempt < max_retries: + await asyncio.sleep(1) + continue - from utils.helpers import has_article_content - if has_article_content(html): - logger.info("[Proxy] len=%d", len(html)) - return html - else: - logger.warning("[Proxy] invalid content (len=%d, no known content marker)", len(html)) - return None + return None except Exception as e: - logger.error("[Proxy] %s", str(e)[:100]) + logger.error("[Fetch] fatal error: %s", str(e)[:100]) return None @@ -110,9 +151,7 @@ async def fetch_articles_batch( async with semaphore: html = await fetch_article_content(url, timeout, wechat_token, wechat_cookie) results[url] = html - - # 避免请求过快 - await asyncio.sleep(0.5) + await asyncio.sleep(1) logger.info("[Batch] 开始批量获取 %d 篇文章", len(article_urls)) diff --git a/utils/content_processor.py b/utils/content_processor.py index 5fad64f..526b939 100644 --- a/utils/content_processor.py +++ b/utils/content_processor.py @@ -100,14 +100,27 @@ def extract_content(html: str) -> str: Extract article body, trying multiple container patterns. Different WeChat account types (government, media, personal) use different HTML structures. We try them in order of specificity. - For image-text messages (item_show_type=8), delegates to helpers. + For image-text messages (item_show_type=8) and short posts (item_show_type=10), + delegates to helpers. """ - from utils.helpers import is_image_text_message, _extract_image_text_content + from utils.helpers import ( + is_image_text_message, _extract_image_text_content, + is_short_content_message, _extract_short_content, + is_audio_message, _extract_audio_content, + ) if is_image_text_message(html): result = _extract_image_text_content(html) return result.get('content', '') + if is_short_content_message(html): + result = _extract_short_content(html) + return result.get('content', '') + + if is_audio_message(html): + result = _extract_audio_content(html) + return result.get('content', '') + # Pattern 1: id="js_content" (most common) content = _extract_div_inner(html, r']*\bid=["\']js_content["\'][^>]*>') if content: diff --git a/utils/helpers.py b/utils/helpers.py index 9a09533..6279f66 100644 --- a/utils/helpers.py +++ b/utils/helpers.py @@ -62,10 +62,31 @@ def parse_article_url(url: str) -> Optional[Dict[str, str]]: except Exception: return None +def get_item_show_type(html: str) -> Optional[str]: + """提取 item_show_type 值""" + m = re.search(r"window\.item_show_type\s*=\s*'(\d+)'", html) + return m.group(1) if m else None + + def is_image_text_message(html: str) -> bool: """检测是否为图文消息(item_show_type=8,类似小红书多图+文字)""" - m = re.search(r"window\.item_show_type\s*=\s*'(\d+)'", html) - return m is not None and m.group(1) == '8' + return get_item_show_type(html) == '8' + + +def is_short_content_message(html: str) -> bool: + """检测是否为短内容/转发消息(item_show_type=10,纯文字无 js_content div)""" + return get_item_show_type(html) == '10' + + +def is_audio_message(html: str) -> bool: + """ + Detect audio articles (voice messages embedded via mpvoice / mp-common-mpaudio). + 检测是否为音频文章(包含 mpvoice 标签或音频播放器组件)。 + """ + return ('voice_encode_fileid' in html or + ' Dict: @@ -198,6 +219,159 @@ def _extract_image_text_content(html: str) -> Dict: } +def _jsdecode_unescape(s: str) -> str: + """Unescape JsDecode \\xNN sequences and HTML entities.""" + import html as html_module + s = re.sub(r'\\x([0-9a-fA-F]{2})', lambda m: chr(int(m.group(1), 16)), s) + s = html_module.unescape(s) + s = html_module.unescape(s) + return s + + +def _extract_short_content(html: str) -> Dict: + """ + Extract content from item_show_type=10 (short posts / reposts). + + Type-10 articles have no js_content div; text and metadata are inside + JsDecode() calls in