From f9ccb1b2aefc07c0ed50389dd744e7e75de2872a Mon Sep 17 00:00:00 2001 From: tmwgsicp <2589462900@qq.com> Date: Sat, 21 Mar 2026 06:23:24 +0800 Subject: [PATCH] fix: support image-text messages and multi-container article extraction --- routes/article.py | 21 ++- utils/article_fetcher.py | 6 +- utils/content_processor.py | 30 +++- utils/helpers.py | 276 ++++++++++++++++++++++++++++++------- utils/rss_poller.py | 8 +- utils/rss_store.py | 26 +++- 6 files changed, 295 insertions(+), 72 deletions(-) diff --git a/routes/article.py b/routes/article.py index d8291d4..18724c3 100644 --- a/routes/article.py +++ b/routes/article.py @@ -8,16 +8,21 @@ 文章路由 - FastAPI版本 """ +import logging +import re +from typing import Optional, List + from fastapi import APIRouter, HTTPException, Request from pydantic import BaseModel, Field -from typing import Optional, List -import re + from utils.auth_manager import auth_manager -from utils.helpers import extract_article_info, parse_article_url +from utils.helpers import extract_article_info, parse_article_url, is_image_text_message, has_article_content, get_client_ip from utils.rate_limiter import rate_limiter from utils.webhook import webhook from utils.http_client import fetch_page +logger = logging.getLogger(__name__) + router = APIRouter() class ArticleRequest(BaseModel): @@ -56,23 +61,25 @@ async def get_article(article_request: ArticleRequest, request: Request): - `publish_time`: 发布时间戳 - `images`: 文章内的图片列表 """ - client_ip = request.client.host if request.client else "unknown" + client_ip = get_client_ip(request) allowed, error_msg = rate_limiter.check_rate_limit(client_ip, "/api/article") if not allowed: - return {"success": False, "error": f"⏱️ {error_msg}"} + return {"success": False, "error": f"Rate limited: {error_msg}"} credentials = auth_manager.get_credentials() if not credentials: return {"success": False, "error": "服务器未登录,请先访问管理页面扫码登录"} try: + logger.info("[Article] request from %s: %s", client_ip, article_request.url[:80]) + html = await fetch_page( article_request.url, extra_headers={"Referer": "https://mp.weixin.qq.com/"}, - timeout=120 # WeChat 大文章可能超时,延长至 120 秒 + timeout=120 ) - if "js_content" not in html: + if not has_article_content(html): if "verify" in html or "验证" in html or "环境异常" in html: await webhook.notify('verification_required', { 'url': article_request.url, diff --git a/utils/article_fetcher.py b/utils/article_fetcher.py index 0c21f14..496c445 100644 --- a/utils/article_fetcher.py +++ b/utils/article_fetcher.py @@ -70,12 +70,12 @@ async def _fetch_via_proxy( timeout=timeout ) - # 验证内容有效性: 只检查 js_content 是否存在 - if "js_content" in html: + from utils.helpers import has_article_content + if has_article_content(html): logger.info("[Proxy] len=%d", len(html)) return html else: - logger.warning("[Proxy] invalid content (len=%d, has_js_content=False)", len(html)) + logger.warning("[Proxy] invalid content (len=%d, no known content marker)", len(html)) return None except Exception as e: diff --git a/utils/content_processor.py b/utils/content_processor.py index 4ca30e5..5fad64f 100644 --- a/utils/content_processor.py +++ b/utils/content_processor.py @@ -97,17 +97,43 @@ def _extract_div_inner(html: str, open_tag_pattern: str) -> str: def extract_content(html: str) -> str: """ - Extract article body from the js_content div, handling nested divs. + Extract article body, trying multiple container patterns. + Different WeChat account types (government, media, personal) use + different HTML structures. We try them in order of specificity. + For image-text messages (item_show_type=8), delegates to helpers. """ + from utils.helpers import is_image_text_message, _extract_image_text_content + + if is_image_text_message(html): + result = _extract_image_text_content(html) + return result.get('content', '') + + # Pattern 1: id="js_content" (most common) content = _extract_div_inner(html, r']*\bid=["\']js_content["\'][^>]*>') if content: return content + # Pattern 2: class contains rich_media_content content = _extract_div_inner(html, r']*\bclass=["\'][^"\']*rich_media_content[^"\']*["\'][^>]*>') if content: return content - logger.warning("Failed to extract article body") + # Pattern 3: id="page-content" (government/institutional accounts) + content = _extract_div_inner(html, r']*\bid=["\']page-content["\'][^>]*>') + if content: + return content + + # Pattern 4: class contains rich_media_area_primary_inner + content = _extract_div_inner(html, r']*\bclass=["\'][^"\']*rich_media_area_primary_inner[^"\']*["\'][^>]*>') + if content: + return content + + # Pattern 5: id="js_article" (alternative article container) + content = _extract_div_inner(html, r']*\bid=["\']js_article["\'][^>]*>') + if content: + return content + + logger.warning("Failed to extract article body from any known container") return "" diff --git a/utils/helpers.py b/utils/helpers.py index e17ae37..9a09533 100644 --- a/utils/helpers.py +++ b/utils/helpers.py @@ -62,31 +62,169 @@ def parse_article_url(url: str) -> Optional[Dict[str, str]]: except Exception: return None +def is_image_text_message(html: str) -> bool: + """检测是否为图文消息(item_show_type=8,类似小红书多图+文字)""" + m = re.search(r"window\.item_show_type\s*=\s*'(\d+)'", html) + return m is not None and m.group(1) == '8' + + +def _extract_image_text_content(html: str) -> Dict: + """ + 提取图文消息的内容(item_show_type=8) + + 图文消息的结构与普通文章完全不同: + - 图片在 picture_page_info_list 的 JsDecode() 中 + - 文字在 meta description 或 content_desc 中 + - 没有 #js_content div + """ + import html as html_module + + # 提取图片 URL(从 picture_page_info_list 中的 cdn_url) + # 页面中有两种格式: + # 1. picture_page_info_list: [ { cdn_url: JsDecode('...'), ... } ] (带JsDecode) + # 2. picture_page_info_list = [ { width:..., height:..., cdn_url: '...' } ] (简单格式) + # 每个 item 中第一个 cdn_url 是主图,watermark_info 内的是水印,需要跳过 + images = [] + + # 优先使用简单格式(第二种),更易解析且包含所有图片 + simple_list_pos = html.find('picture_page_info_list = [') + if simple_list_pos >= 0: + bracket_start = html.find('[', simple_list_pos) + depth = 0 + end = bracket_start + for end in range(bracket_start, min(bracket_start + 20000, len(html))): + if html[end] == '[': + depth += 1 + elif html[end] == ']': + depth -= 1 + if depth == 0: + break + block = html[bracket_start:end + 1] + # 按顶层 { 分割,每个 item 取第一个 cdn_url(主图) + items = re.split(r'\n\s{4,10}\{', block) + for item in items: + m = re.search(r"cdn_url:\s*'([^']+)'", item) + if m: + url = m.group(1) + if url not in images and ('mmbiz.qpic.cn' in url or 'mmbiz.qlogo.cn' in url): + images.append(url) + + # 降级: 使用 JsDecode 格式 + if not images: + jsdecode_list_match = re.search( + r'picture_page_info_list:\s*\[', html + ) + if jsdecode_list_match: + block_start = jsdecode_list_match.end() - 1 + depth = 0 + end = block_start + for end in range(block_start, min(block_start + 20000, len(html))): + if html[end] == '[': + depth += 1 + elif html[end] == ']': + depth -= 1 + if depth == 0: + break + block = html[block_start:end + 1] + # 按顶层 { 分割 + items = re.split(r'\n\s{10,30}\{(?=\s*\n\s*cdn_url)', block) + for item in items: + m = re.search(r"cdn_url:\s*JsDecode\('([^']+)'\)", item) + if m: + url = m.group(1).replace('\\x26amp;', '&').replace('\\x26', '&') + if url not in images and ('mmbiz.qpic.cn' in url or 'mmbiz.qlogo.cn' in url): + images.append(url) + + # 提取文字描述 + desc = '' + # 方法1: meta description + desc_match = re.search(r' < -> <) + desc = re.sub(r'\\x([0-9a-fA-F]{2})', lambda m: chr(int(m.group(1), 16)), desc) + desc = html_module.unescape(desc) + # 二次 unescape 处理双重编码 + desc = html_module.unescape(desc) + # 清理 HTML 标签残留 + desc = re.sub(r'<[^>]+>', '', desc) + desc = desc.replace('\\x0a', '\n').replace('\\n', '\n') + + # 方法2: content_desc + if not desc: + desc_match2 = re.search(r"content_desc:\s*JsDecode\('([^']*)'\)", html) + if desc_match2: + desc = desc_match2.group(1) + desc = html_module.unescape(desc) + + # 构建 HTML 内容:竖向画廊 + 文字(RSS 兼容) + html_parts = [] + + # 竖向画廊:每张图限宽,紧凑排列,兼容主流 RSS 阅读器 + if images: + gallery_imgs = [] + for i, img_url in enumerate(images): + gallery_imgs.append( + f'

' + f'' + f'

' + ) + gallery_imgs.append( + f'

' + f'{len(images)} images' + f'

' + ) + html_parts.append('\n'.join(gallery_imgs)) + + # 文字描述区域 + if desc: + text_lines = [] + for line in desc.split('\n'): + line = line.strip() + if line: + text_lines.append( + f'

{line}

' + ) + html_parts.append('\n'.join(text_lines)) + + content = '\n'.join(html_parts) + plain_content = desc if desc else '' + + return { + 'content': content, + 'plain_content': plain_content, + 'images': images, + } + + def extract_article_info(html: str, params: Optional[Dict] = None) -> Dict: """ 从HTML中提取文章信息 - + Args: html: 文章HTML内容 params: URL参数(可选,用于返回__biz等信息) - + Returns: 文章信息字典 """ - + title = '' + # 图文消息的标题通常在 window.msg_title 中 title_match = ( re.search(r']*class=[^>]*rich_media_title[^>]*>([\s\S]*?)', html, re.IGNORECASE) or re.search(r']*class=[^>]*rich_media_title[^>]*>([\s\S]*?)', html, re.IGNORECASE) or re.search(r"var\s+msg_title\s*=\s*'([^']+)'\.html\(false\)", html) or + re.search(r"window\.msg_title\s*=\s*window\.title\s*=\s*'([^']*)'", html) or re.search(r']+>', '', title) title = title.replace('"', '"').replace('&', '&').strip() - + author = '' author_match = ( re.search(r']*id="js_name"[^>]*>([\s\S]*?)', html, re.IGNORECASE) or @@ -94,72 +232,81 @@ def extract_article_info(html: str, params: Optional[Dict] = None) -> Dict: re.search(r']*class=[^>]*rich_media_meta_nickname[^>]*>([^<]+)', html, re.IGNORECASE) ) - + if author_match: author = author_match.group(1) author = re.sub(r'<[^>]+>', '', author).strip() - + publish_time = 0 time_match = ( re.search(r'var\s+publish_time\s*=\s*"(\d+)"', html) or re.search(r'var\s+ct\s*=\s*"(\d+)"', html) or + re.search(r"var\s+ct\s*=\s*'(\d+)'", html) or re.search(r']*id="publish_time"[^>]*>([^<]+)', html) ) - + if time_match: try: publish_time = int(time_match.group(1)) except (ValueError, TypeError): pass - - content = '' - images = [] - - # 方法1: 匹配 id="js_content" - content_match = re.search(r']*id="js_content"[^>]*>([\s\S]*?)]*>[\s\S]*?', html, re.IGNORECASE) - - if not content_match: - # 方法2: 匹配 class包含rich_media_content - content_match = re.search(r']*class="[^"]*rich_media_content[^"]*"[^>]*>([\s\S]*?)', html, re.IGNORECASE) - - if content_match and content_match.group(1): - content = content_match.group(1).strip() + + # 检测是否为图文消息(item_show_type=8) + if is_image_text_message(html): + img_text_data = _extract_image_text_content(html) + content = img_text_data['content'] + images = img_text_data['images'] + plain_content = img_text_data['plain_content'] else: - # 方法3: 手动截取 - js_content_pos = html.find('id="js_content"') - if js_content_pos > 0: - start = html.find('>', js_content_pos) + 1 - script_pos = html.find(' start: - content = html[start:script_pos].strip() - if content: - # 提取data-src属性 - img_regex = re.compile(r']+data-src="([^"]+)"') - for img_match in img_regex.finditer(content): - img_url = img_match.group(1) - if img_url not in images: - images.append(img_url) - - # 提取src属性 - img_regex2 = re.compile(r']+src="([^"]+)"') - for img_match in img_regex2.finditer(content): - img_url = img_match.group(1) - if not img_url.startswith('data:') and img_url not in images: - images.append(img_url) - - content = re.sub(r']*>[\s\S]*?', '', content, flags=re.IGNORECASE) - + content = '' + images = [] + + # 方法1: 匹配 id="js_content" + content_match = re.search(r']*id="js_content"[^>]*>([\s\S]*?)]*>[\s\S]*?', html, re.IGNORECASE) + + if not content_match: + # 方法2: 匹配 class包含rich_media_content + content_match = re.search(r']*class="[^"]*rich_media_content[^"]*"[^>]*>([\s\S]*?)', html, re.IGNORECASE) + + if content_match and content_match.group(1): + content = content_match.group(1).strip() + else: + # 方法3: 手动截取 + js_content_pos = html.find('id="js_content"') + if js_content_pos > 0: + start = html.find('>', js_content_pos) + 1 + script_pos = html.find(' start: + content = html[start:script_pos].strip() + if content: + # 提取data-src属性 + img_regex = re.compile(r']+data-src="([^"]+)"') + for img_match in img_regex.finditer(content): + img_url = img_match.group(1) + if img_url not in images: + images.append(img_url) + + # 提取src属性 + img_regex2 = re.compile(r']+src="([^"]+)"') + for img_match in img_regex2.finditer(content): + img_url = img_match.group(1) + if not img_url.startswith('data:') and img_url not in images: + images.append(img_url) + + content = re.sub(r']*>[\s\S]*?', '', content, flags=re.IGNORECASE) + plain_content = html_to_text(content) if content else '' + __biz = params.get('__biz', 'unknown') if params else 'unknown' publish_time_str = '' if publish_time > 0: from datetime import datetime dt = datetime.fromtimestamp(publish_time) publish_time_str = dt.strftime('%Y-%m-%d %H:%M:%S') - + return { 'title': title, 'content': content, - 'plain_content': html_to_text(content) if content else '', + 'plain_content': plain_content, 'images': images, 'author': author, 'publish_time': publish_time, @@ -167,14 +314,45 @@ def extract_article_info(html: str, params: Optional[Dict] = None) -> Dict: '__biz': __biz } +def has_article_content(html: str) -> bool: + """ + Check whether the fetched HTML likely contains article content. + Different WeChat account types use different content containers. + """ + content_markers = [ + "js_content", + "rich_media_content", + "rich_media_area_primary", + "page-content", + "page_content", + ] + if any(marker in html for marker in content_markers): + return True + return is_image_text_message(html) + + +def get_client_ip(request) -> str: + """ + Extract real client IP from request, respecting reverse proxy headers. + Priority: X-Forwarded-For > X-Real-IP > request.client.host + """ + forwarded_for = request.headers.get("x-forwarded-for", "") + if forwarded_for: + return forwarded_for.split(",")[0].strip() + real_ip = request.headers.get("x-real-ip", "") + if real_ip: + return real_ip.strip() + return request.client.host if request.client else "unknown" + + def is_article_deleted(html: str) -> bool: """检查文章是否被删除""" return '已删除' in html or 'deleted' in html.lower() def is_need_verification(html: str) -> bool: """检查是否需要验证""" - return ('verify' in html.lower() or - '验证' in html or + return ('verify' in html.lower() or + '验证' in html or '环境异常' in html) def is_login_required(html: str) -> bool: diff --git a/utils/rss_poller.py b/utils/rss_poller.py index 2cfdda6..829b81d 100644 --- a/utils/rss_poller.py +++ b/utils/rss_poller.py @@ -20,7 +20,7 @@ import httpx from utils.auth_manager import auth_manager from utils import rss_store -from utils.helpers import extract_article_info, parse_article_url +from utils.helpers import extract_article_info, parse_article_url, is_image_text_message, has_article_content from utils.http_client import fetch_page logger = logging.getLogger(__name__) @@ -225,8 +225,8 @@ class RSSPoller: continue html = results.get(link) - if not html or "js_content" not in html: - logger.warning("❌ No content in HTML: %s", link[:80]) + if not html or not has_article_content(html): + logger.warning("No content in HTML: %s", link[:80]) enriched.append(article) continue @@ -246,7 +246,7 @@ class RSSPoller: article_info = extract_article_info(html, parse_article_url(link)) article["author"] = article_info.get("author", "") - logger.info("✅ Content fetched: %s... (%d chars, %d images)", + logger.info("Content fetched: %s... (%d chars, %d images)", link[:50], len(article["content"]), len(result.get("images", []))) diff --git a/utils/rss_store.py b/utils/rss_store.py index 5ba85a3..e8f27c6 100644 --- a/utils/rss_store.py +++ b/utils/rss_store.py @@ -133,17 +133,29 @@ def update_last_poll(fakeid: str): # ── 文章缓存 ───────────────────────────────────────────── def save_articles(fakeid: str, articles: List[Dict]) -> int: - """批量保存文章,返回新增数量""" + """ + 批量保存文章,返回新增数量。 + If an article already exists but has empty content, update it with new content. + """ conn = _get_conn() inserted = 0 try: for a in articles: + content = a.get("content", "") + plain_content = a.get("plain_content", "") try: - conn.execute( - "INSERT OR IGNORE INTO articles " + cursor = conn.execute( + "INSERT INTO articles " "(fakeid, aid, title, link, digest, cover, author, " "content, plain_content, publish_time, fetched_at) " - "VALUES (?,?,?,?,?,?,?,?,?,?,?)", + "VALUES (?,?,?,?,?,?,?,?,?,?,?) " + "ON CONFLICT(fakeid, link) DO UPDATE SET " + "content = CASE WHEN excluded.content != '' AND articles.content = '' " + " THEN excluded.content ELSE articles.content END, " + "plain_content = CASE WHEN excluded.plain_content != '' AND articles.plain_content = '' " + " THEN excluded.plain_content ELSE articles.plain_content END, " + "author = CASE WHEN excluded.author != '' AND articles.author = '' " + " THEN excluded.author ELSE articles.author END", ( fakeid, a.get("aid", ""), @@ -152,13 +164,13 @@ def save_articles(fakeid: str, articles: List[Dict]) -> int: a.get("digest", ""), a.get("cover", ""), a.get("author", ""), - a.get("content", ""), - a.get("plain_content", ""), + content, + plain_content, a.get("publish_time", 0), int(time.time()), ), ) - if conn.total_changes: + if cursor.rowcount > 0: inserted += 1 except sqlite3.IntegrityError: pass