feat: add audio extraction, type-10 posts, and comprehensive unavailability detection

Made-with: Cursor
This commit is contained in:
tmwgsicp 2026-03-23 14:19:41 +08:00
parent f9ccb1b2ae
commit 94a0b78ca8
7 changed files with 330 additions and 41 deletions

1
.gitignore vendored
View File

@ -35,6 +35,7 @@ venv.bak/
.vscode/
.idea/
.cursor/
.claude/
*.swp
*.swo
*~

View File

@ -21,6 +21,8 @@ WEBHOOK_NOTIFICATION_INTERVAL=300
# RSS 订阅配置
# 轮询间隔(秒),默认 36001 小时)
RSS_POLL_INTERVAL=3600
# 每次轮询拉取的文章批次数,默认 10高频更新的公众号可适当增大
ARTICLES_PER_POLL=10
# RSS 轮询时是否获取完整文章内容true/false默认 true
# ⚠️ 启用时强烈建议配置下方的 PROXY_URLS避免账号被微信风控
RSS_FETCH_FULL_CONTENT=true

View File

@ -44,42 +44,83 @@ async def _fetch_via_proxy(
article_url: str,
timeout: int,
wechat_cookie: Optional[str] = None,
wechat_token: Optional[str] = None
wechat_token: Optional[str] = None,
max_retries: int = 2
) -> Optional[str]:
"""通过 SOCKS5 代理或直连获取文章"""
"""
通过 SOCKS5 代理或直连获取文章
Args:
article_url: 文章 URL
timeout: 超时时间
wechat_cookie: 微信 Cookie
wechat_token: 微信 Token
max_retries: 内容验证失败时的最大重试次数(每次会尝试不同代理)
"""
try:
# 使用现有的 http_client支持代理池轮转 + 直连兜底)
from utils.http_client import fetch_page
logger.info("[Proxy] %s", article_url[:80])
logger.info("[Fetch] %s", article_url[:80])
# 构建完整 URL带 token
full_url = article_url
if wechat_token:
separator = '&' if '?' in article_url else '?'
full_url = f"{article_url}{separator}token={wechat_token}"
# 准备请求头
extra_headers = {"Referer": "https://mp.weixin.qq.com/"}
if wechat_cookie:
extra_headers["Cookie"] = wechat_cookie
html = await fetch_page(
full_url,
extra_headers=extra_headers,
timeout=timeout
)
for attempt in range(max_retries + 1):
try:
html = await fetch_page(
full_url,
extra_headers=extra_headers,
timeout=timeout
)
from utils.helpers import has_article_content, is_article_unavailable
if is_article_unavailable(html):
logger.warning("[Fetch] permanently unavailable (attempt %d/%d) %s",
attempt + 1, max_retries + 1, article_url[:60])
return html
if has_article_content(html):
logger.info("[Fetch] len=%d (attempt %d/%d)",
len(html), attempt + 1, max_retries + 1)
return html
else:
hint = "unknown"
if "验证" in html or "verify" in html.lower() or "环境异常" in html:
hint = "wechat_verification"
elif "请登录" in html or "login" in html.lower():
hint = "login_required"
elif "location.replace" in html or "location.href" in html:
hint = "redirect_page"
elif len(html) < 1000:
hint = "empty_or_blocked"
logger.warning(
"[Fetch] invalid (len=%d, hint=%s, attempt %d/%d) %s",
len(html), hint, attempt + 1, max_retries + 1,
article_url[:60]
)
if attempt < max_retries:
await asyncio.sleep(1)
continue
except Exception as e:
logger.warning("[Fetch] request error: %s (attempt %d/%d)",
str(e)[:100], attempt + 1, max_retries + 1)
if attempt < max_retries:
await asyncio.sleep(1)
continue
from utils.helpers import has_article_content
if has_article_content(html):
logger.info("[Proxy] len=%d", len(html))
return html
else:
logger.warning("[Proxy] invalid content (len=%d, no known content marker)", len(html))
return None
return None
except Exception as e:
logger.error("[Proxy] %s", str(e)[:100])
logger.error("[Fetch] fatal error: %s", str(e)[:100])
return None
@ -110,9 +151,7 @@ async def fetch_articles_batch(
async with semaphore:
html = await fetch_article_content(url, timeout, wechat_token, wechat_cookie)
results[url] = html
# 避免请求过快
await asyncio.sleep(0.5)
await asyncio.sleep(1)
logger.info("[Batch] 开始批量获取 %d 篇文章", len(article_urls))

View File

@ -100,14 +100,27 @@ def extract_content(html: str) -> str:
Extract article body, trying multiple container patterns.
Different WeChat account types (government, media, personal) use
different HTML structures. We try them in order of specificity.
For image-text messages (item_show_type=8), delegates to helpers.
For image-text messages (item_show_type=8) and short posts (item_show_type=10),
delegates to helpers.
"""
from utils.helpers import is_image_text_message, _extract_image_text_content
from utils.helpers import (
is_image_text_message, _extract_image_text_content,
is_short_content_message, _extract_short_content,
is_audio_message, _extract_audio_content,
)
if is_image_text_message(html):
result = _extract_image_text_content(html)
return result.get('content', '')
if is_short_content_message(html):
result = _extract_short_content(html)
return result.get('content', '')
if is_audio_message(html):
result = _extract_audio_content(html)
return result.get('content', '')
# Pattern 1: id="js_content" (most common)
content = _extract_div_inner(html, r'<div[^>]*\bid=["\']js_content["\'][^>]*>')
if content:

View File

@ -62,10 +62,31 @@ def parse_article_url(url: str) -> Optional[Dict[str, str]]:
except Exception:
return None
def get_item_show_type(html: str) -> Optional[str]:
"""提取 item_show_type 值"""
m = re.search(r"window\.item_show_type\s*=\s*'(\d+)'", html)
return m.group(1) if m else None
def is_image_text_message(html: str) -> bool:
"""检测是否为图文消息item_show_type=8类似小红书多图+文字)"""
m = re.search(r"window\.item_show_type\s*=\s*'(\d+)'", html)
return m is not None and m.group(1) == '8'
return get_item_show_type(html) == '8'
def is_short_content_message(html: str) -> bool:
"""检测是否为短内容/转发消息item_show_type=10纯文字无 js_content div"""
return get_item_show_type(html) == '10'
def is_audio_message(html: str) -> bool:
"""
Detect audio articles (voice messages embedded via mpvoice / mp-common-mpaudio).
检测是否为音频文章包含 mpvoice 标签或音频播放器组件
"""
return ('voice_encode_fileid' in html or
'<mpvoice' in html or
'mp-common-mpaudio' in html or
'js_editor_audio' in html)
def _extract_image_text_content(html: str) -> Dict:
@ -198,6 +219,159 @@ def _extract_image_text_content(html: str) -> Dict:
}
def _jsdecode_unescape(s: str) -> str:
"""Unescape JsDecode \\xNN sequences and HTML entities."""
import html as html_module
s = re.sub(r'\\x([0-9a-fA-F]{2})', lambda m: chr(int(m.group(1), 16)), s)
s = html_module.unescape(s)
s = html_module.unescape(s)
return s
def _extract_short_content(html: str) -> Dict:
"""
Extract content from item_show_type=10 (short posts / reposts).
Type-10 articles have no js_content div; text and metadata are inside
JsDecode() calls in <script> tags.
"""
import html as html_module
# content / content_noencode (prefer content_noencode for unescaped text)
text = ''
for key in ('content_noencode', 'content'):
m = re.search(rf"{key}:\s*JsDecode\('([^']*)'\)", html)
if m and len(m.group(1)) > 10:
text = _jsdecode_unescape(m.group(1))
break
# Cover / head image
images = []
img_m = re.search(r"round_head_img:\s*JsDecode\('([^']+)'\)", html)
if img_m:
img_url = _jsdecode_unescape(img_m.group(1))
if 'mmbiz.qpic.cn' in img_url or 'wx.qlogo.cn' in img_url:
images.append(img_url)
# Build HTML: simple paragraphs
html_parts = []
if text:
for line in text.replace('\\x0a', '\n').replace('\\n', '\n').split('\n'):
line = line.strip()
if line:
safe = html_module.escape(line)
html_parts.append(
f'<p style="margin:0 0 8px;line-height:1.8;font-size:15px;color:#333">{safe}</p>'
)
content = '\n'.join(html_parts)
plain_content = text.replace('\\x0a', '\n').replace('\\n', '\n') if text else ''
return {
'content': content,
'plain_content': plain_content,
'images': images,
}
def _extract_audio_content(html: str) -> Dict:
"""
Extract audio content from WeChat voice articles.
音频文章使用 mpvoice / mp-common-mpaudio 标签嵌入语音
通过 voice_encode_fileid 构造下载链接
Also extracts any surrounding text content from js_content.
"""
import html as html_module
from bs4 import BeautifulSoup
audio_items = []
# Pattern 1: <mpvoice voice_encode_fileid="..." name="..." .../>
for m in re.finditer(
r'<mpvoice[^>]*voice_encode_fileid=["\']([^"\']+)["\'][^>]*/?>',
html, re.IGNORECASE
):
fileid = m.group(1)
name_m = re.search(r'name=["\']([^"\']*)["\']', m.group(0))
name = html_module.unescape(name_m.group(1)) if name_m else ''
play_length_m = re.search(r'play_length=["\'](\d+)["\']', m.group(0))
duration = int(play_length_m.group(1)) if play_length_m else 0
audio_url = f"https://res.wx.qq.com/voice/getvoice?mediaid={fileid}"
audio_items.append({'name': name, 'url': audio_url, 'duration': duration})
# Pattern 2: mp-common-mpaudio with voice_encode_fileid in data or attributes
if not audio_items:
for m in re.finditer(
r'<mp-common-mpaudio[^>]*voice_encode_fileid=["\']([^"\']+)["\'][^>]*>',
html, re.IGNORECASE
):
fileid = m.group(1)
name_m = re.search(r'name=["\']([^"\']*)["\']', m.group(0))
name = html_module.unescape(name_m.group(1)) if name_m else ''
play_length_m = re.search(r'play_length=["\'](\d+)["\']', m.group(0))
duration = int(play_length_m.group(1)) if play_length_m else 0
audio_url = f"https://res.wx.qq.com/voice/getvoice?mediaid={fileid}"
audio_items.append({'name': name, 'url': audio_url, 'duration': duration})
# Build HTML content
html_parts = []
# Extract surrounding text from js_content (some audio articles have text too)
text_content = ''
js_match = re.search(
r'<div[^>]*id=["\']js_content["\'][^>]*>([\s\S]*?)</div>\s*(?:<script|<div[^>]*class=["\']rich_media_tool)',
html, re.IGNORECASE
)
if js_match:
try:
soup = BeautifulSoup(js_match.group(1), 'html.parser')
for tag in soup.find_all(['mpvoice', 'mp-common-mpaudio']):
tag.decompose()
text_content = soup.get_text(separator='\n', strip=True)
except Exception:
pass
if text_content:
for line in text_content.split('\n'):
line = line.strip()
if line:
html_parts.append(f'<p style="margin:0 0 8px;line-height:1.8">{html_module.escape(line)}</p>')
for i, audio in enumerate(audio_items):
dur_str = ''
if audio['duration'] > 0:
minutes = audio['duration'] // 60
seconds = audio['duration'] % 60
dur_str = f' ({minutes}:{seconds:02d})'
display_name = audio['name'] or f'Audio {i + 1}'
html_parts.append(
f'<div style="margin:12px 0;padding:12px 16px;background:#f6f6f6;border-radius:8px">'
f'<p style="margin:0 0 4px;font-size:15px;font-weight:500">'
f'{html_module.escape(display_name)}{dur_str}</p>'
f'<a href="{audio["url"]}" style="color:#1890ff;font-size:14px">'
f'[Play Audio / Click to Listen]</a>'
f'</div>'
)
content = '\n'.join(html_parts) if html_parts else ''
plain_parts = []
if text_content:
plain_parts.append(text_content)
for i, audio in enumerate(audio_items):
display_name = audio['name'] or f'Audio {i + 1}'
plain_parts.append(f"[Audio] {display_name} - {audio['url']}")
return {
'content': content,
'plain_content': '\n'.join(plain_parts),
'images': [],
'audios': audio_items,
}
def extract_article_info(html: str, params: Optional[Dict] = None) -> Dict:
"""
从HTML中提取文章信息
@ -217,11 +391,13 @@ def extract_article_info(html: str, params: Optional[Dict] = None) -> Dict:
re.search(r'<h2[^>]*class=[^>]*rich_media_title[^>]*>([\s\S]*?)</h2>', html, re.IGNORECASE) or
re.search(r"var\s+msg_title\s*=\s*'([^']+)'\.html\(false\)", html) or
re.search(r"window\.msg_title\s*=\s*window\.title\s*=\s*'([^']*)'", html) or
re.search(r'<meta\s+property="og:title"\s+content="([^"]+)"', html)
re.search(r'<meta\s+property="og:title"\s+content="([^"]+)"', html) or
re.search(r"msg_title:\s*JsDecode\('([^']+)'\)", html)
)
if title_match:
title = title_match.group(1)
title = _jsdecode_unescape(title)
title = re.sub(r'<[^>]+>', '', title)
title = title.replace('&quot;', '"').replace('&amp;', '&').strip()
@ -251,12 +427,22 @@ def extract_article_info(html: str, params: Optional[Dict] = None) -> Dict:
except (ValueError, TypeError):
pass
# 检测是否为图文消息item_show_type=8
# 检测特殊内容类型
if is_image_text_message(html):
img_text_data = _extract_image_text_content(html)
content = img_text_data['content']
images = img_text_data['images']
plain_content = img_text_data['plain_content']
elif is_short_content_message(html):
short_data = _extract_short_content(html)
content = short_data['content']
images = short_data['images']
plain_content = short_data['plain_content']
elif is_audio_message(html):
audio_data = _extract_audio_content(html)
content = audio_data['content']
images = audio_data['images']
plain_content = audio_data['plain_content']
else:
content = ''
images = []
@ -318,17 +504,23 @@ def has_article_content(html: str) -> bool:
"""
Check whether the fetched HTML likely contains article content.
Different WeChat account types use different content containers.
Must match actual HTML elements (id/class attributes), not random JS strings,
to avoid false positives on WeChat verification pages (~1.9MB) that contain
"js_content" references in their JavaScript code.
"""
content_markers = [
"js_content",
"rich_media_content",
"rich_media_area_primary",
"page-content",
"page_content",
element_markers = [
'id="js_content"',
'class="rich_media_content',
'class="rich_media_area_primary',
'id="page-content"',
'id="page_content"',
]
if any(marker in html for marker in content_markers):
if any(marker in html for marker in element_markers):
return True
return is_image_text_message(html)
if is_image_text_message(html) or is_short_content_message(html) or is_audio_message(html):
return True
return False
def get_client_ip(request) -> str:
@ -349,6 +541,37 @@ def is_article_deleted(html: str) -> bool:
"""检查文章是否被删除"""
return '已删除' in html or 'deleted' in html.lower()
def is_article_unavailable(html: str) -> bool:
"""
Check if the article is permanently unavailable (deleted / censored / restricted).
检查文章是否永久不可获取删除/违规/限制
"""
return get_unavailable_reason(html) is not None
def get_unavailable_reason(html: str) -> Optional[str]:
"""
Return human-readable reason if article is permanently unavailable, else None.
返回文章不可用的原因如果文章正常则返回 None
"""
markers = [
("该内容已被发布者删除", "已被发布者删除"),
("内容已删除", "已被发布者删除"),
("此内容因违规无法查看", "因违规无法查看"),
("涉嫌违反相关法律法规和政策", "涉嫌违规被限制"),
("此内容发送失败无法查看", "发送失败无法查看"),
("该内容暂时无法查看", "暂时无法查看"),
("根据作者隐私设置,无法查看该内容", "作者隐私设置不可见"),
("接相关投诉,此内容违反", "因投诉违规被限制"),
("该文章已被第三方辟谣", "已被第三方辟谣"),
]
for keyword, reason in markers:
if keyword in html:
return reason
return None
def is_need_verification(html: str) -> bool:
"""检查是否需要验证"""
return ('verify' in html.lower() or

View File

@ -20,13 +20,13 @@ import httpx
from utils.auth_manager import auth_manager
from utils import rss_store
from utils.helpers import extract_article_info, parse_article_url, is_image_text_message, has_article_content
from utils.helpers import extract_article_info, parse_article_url, is_image_text_message, has_article_content, is_article_unavailable, get_unavailable_reason
from utils.http_client import fetch_page
logger = logging.getLogger(__name__)
POLL_INTERVAL = int(os.getenv("RSS_POLL_INTERVAL", "3600"))
ARTICLES_PER_POLL = 10
ARTICLES_PER_POLL = int(os.getenv("ARTICLES_PER_POLL", "10"))
FETCH_FULL_CONTENT = os.getenv("RSS_FETCH_FULL_CONTENT", "true").lower() == "true"
@ -207,10 +207,9 @@ class RSSPoller:
wechat_token = os.getenv("WECHAT_TOKEN", "")
wechat_cookie = os.getenv("WECHAT_COOKIE", "")
# 批量并发获取max_concurrency=5传递微信凭证
results = await fetch_articles_batch(
article_links,
max_concurrency=5,
max_concurrency=3,
timeout=60,
wechat_token=wechat_token,
wechat_cookie=wechat_cookie
@ -225,7 +224,18 @@ class RSSPoller:
continue
html = results.get(link)
if not html or not has_article_content(html):
if not html:
logger.warning("Empty HTML: %s", link[:80])
enriched.append(article)
continue
if is_article_unavailable(html):
reason = get_unavailable_reason(html) or "unknown"
logger.warning("Article permanently unavailable (%s): %s", reason, link[:80])
article["content"] = f"<p>[unavailable] {reason}</p>"
article["plain_content"] = f"[unavailable] {reason}"
enriched.append(article)
continue
if not has_article_content(html):
logger.warning("No content in HTML: %s", link[:80])
enriched.append(article)
continue

View File

@ -22,6 +22,7 @@ EVENT_LABELS = {
"login_success": "登录成功",
"login_expired": "登录过期",
"verification_required": "触发验证",
"content_fetch_failed": "文章内容获取失败",
}