feat(rss): RSS支持完整文章内容+图片代理修复

核心功能:
1. RSS包含完整文章内容(图文混排),可在阅读器中直接阅读全文
2. 修复RSS图片显示问题,所有图片通过代理正常显示

技术实现:
- 新增content_processor处理文章内容,保持图文顺序
- 新增image_proxy统一处理图片URL代理
- 新增article_fetcher支持批量并发获取文章
- 使用SITE_URL环境变量(必需配置)
- 使用minidom+CDATA防止HTML被XML转义

配置要求:
- 必须在.env中配置SITE_URL(如http://IP:5000)
- 强烈建议配置PROXY_URLS避免账号风控

Made-with: Cursor
This commit is contained in:
tmwgsicp 2026-02-26 10:50:21 +08:00
parent fb903d8038
commit 55bb0a6134
12 changed files with 896 additions and 90 deletions

3
.gitignore vendored
View File

@ -63,3 +63,6 @@ logs/
# RSS database
data/
# SaaS 版本(独立仓库管理)
saas/

131
README.md
View File

@ -19,9 +19,9 @@
## 功能特性
- **RSS 订阅** — 订阅任意公众号,自动定时拉取新文章,生成标准 RSS 2.0 源,接入 FreshRSS / Feedly 等阅读器即可使用
- **RSS 订阅** — 订阅任意公众号,自动定时拉取新文章**包含完整文章内容和图片**,生成标准 RSS 2.0 源,接入 FreshRSS / Feedly 等阅读器即可使用
- **文章内容获取** — 通过 URL 获取文章完整内容(标题、作者、正文 HTML / 纯文本、图片列表)
- **反风控体系** — Chrome TLS 指纹模拟 + IP 代理池轮转 + 三层自动限频,有效对抗微信封控
- **反风控体系** — Chrome TLS 指纹模拟 + SOCKS5 代理池轮转 + 三层自动限频,有效对抗微信封控
- **文章列表 & 搜索** — 获取任意公众号历史文章列表,支持分页和关键词搜索
- **公众号搜索** — 按名称搜索公众号,获取 FakeID
- **扫码登录** — 微信公众平台扫码登录凭证自动保存4 天有效期
@ -39,11 +39,19 @@
---
## SaaS 托管版(即将推出)
## SaaS 托管版 — 已上线 🚀
不想自己部署?我们正在筹备 **RSS 订阅托管服务**——无需服务器、无需配置,输入公众号名称即可获得 RSS 订阅地址,直接接入你喜欢的 RSS 阅读器。同时也在评估开放文章内容获取 API 的托管方案。
**不想折腾部署30 秒注册即可使用** 👉 **[wechatrss.waytomaster.com](https://wechatrss.waytomaster.com)**
感兴趣的话欢迎扫码添加微信,提前锁定体验名额 👇 [联系方式](#联系方式)
搜索公众号名称,拿到 RSS 链接丢进你的阅读器——Feedly、Inoreader、NetNewsWire 全部兼容。
| 套餐 | 公众号数量 | 价格 |
|------|-----------|------|
| 免费版 | 2 个 | ¥0 |
| 基础版 | 20 个 | ¥9.9/月 |
| 专业版 | 50 个 | ¥19.9/月 |
> 免费版够用就一直免费,不够了再升级,没有套路。
---
@ -57,11 +65,33 @@
登录后即可通过 API 获取**任意公众号**的公开文章(不限于自己的公众号)。
> **本地电脑可以直接使用!** 不需要公网服务器——在本地启动服务后通过 `localhost` 访问即可完成扫码登录和全部功能。只有当你需要从其他设备(如手机 RSS 阅读器)远程访问时,才需要公网服务器或内网穿透。
---
## 快速开始
### 方式一:一键启动(推荐)
### 方式一Docker 部署(推荐,适合 NAS
**最简单的部署方式,适用于群晖 NAS、威联通 NAS、服务器等环境。**
```bash
# 克隆项目
git clone https://github.com/tmwgsicp/wechat-download-api.git
cd wechat-download-api
# 配置环境变量(可选)
cp env.example .env
# 启动服务
docker-compose up -d
```
服务启动后访问 http://your-ip:5000 即可使用。
> 详细的 Docker 部署指南(包括群晖 NAS 图形界面操作)请查看 **[DOCKER.md](DOCKER.md)**
### 方式二:一键启动脚本
**Windows**
```bash
@ -78,7 +108,7 @@ chmod +x start.sh
> Linux 生产环境可使用 `sudo bash start.sh` 自动配置 systemd 服务和开机自启。
### 方式:手动安装
### 方式:手动安装
```bash
# 创建虚拟环境
@ -104,6 +134,64 @@ python app.py
---
## 服务器部署
### Docker 部署(推荐)
适用于各类服务器、NAS 等环境,零依赖、易维护。详见 **[DOCKER.md](DOCKER.md)**
### Linux 生产环境systemd
`start.sh` 脚本在 Linux 上以 `sudo` 运行时,会自动注册 systemd 服务并启用开机自启:
```bash
sudo bash start.sh
```
之后可通过以下命令管理服务:
```bash
# 查看运行状态
bash status.sh
# 停止服务
bash stop.sh
# 手动操作
sudo systemctl restart wechat-download-api
sudo systemctl status wechat-download-api
```
### 配置反向代理(可选)
如需通过域名或 HTTPS 访问,配置 Nginx 反向代理到 `localhost:5000`
```nginx
server {
listen 80;
server_name your-domain.com;
location / {
proxy_pass http://127.0.0.1:5000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}
```
### 环境变量
复制 `env.example``.env` 并按需修改:
```bash
cp env.example .env
```
主要配置项参见 `env.example` 中的注释说明。
---
## API 接口
### 获取文章内容
@ -208,6 +296,12 @@ curl "http://localhost:5000/api/rss/MzA1MjM1ODk2MA=="
也可以通过管理面板的 **RSS 订阅** 页面可视化管理,搜索公众号一键订阅并复制 RSS 地址。
> **关于 RSS 内容**: RSS 源包含**完整文章内容**(图文混排),您可以直接在 RSS 阅读器中阅读全文。
>
> 系统使用 **SOCKS5 代理池 + Chrome TLS 指纹模拟**技术获取文章内容,有效规避微信风控。
>
> 扫码登录后,系统会**自动**将微信凭证用于内容获取,无需手动配置。如需禁用完整内容获取(仅保留标题和摘要),可在 `.env` 中设置 `RSS_FETCH_FULL_CONTENT=false`
#### RSS 订阅管理接口
| 方法 | 路径 | 说明 |
@ -255,14 +349,26 @@ cp env.example .env
| `RATE_LIMIT_PER_IP` | 单 IP 每分钟请求上限 | 5 |
| `RATE_LIMIT_ARTICLE_INTERVAL` | 文章请求最小间隔(秒) | 3 |
| `RSS_POLL_INTERVAL` | RSS 轮询间隔(秒) | 3600 |
| `PROXY_URLS` | 代理池地址(多个逗号分隔,留空直连) | 空 |
| `RSS_FETCH_FULL_CONTENT` | RSS 是否获取完整内容true/false | true |
| `PROXY_URLS` | **SOCKS5 代理池地址(强烈建议配置,避免账号风控)** | 空 |
| `SITE_URL` | **网站访问地址用于RSS图片代理必须配置** | http://localhost:5000 |
| `PORT` | 服务端口 | 5000 |
| `HOST` | 监听地址 | 0.0.0.0 |
| `DEBUG` | 调试模式(开启热重载) | false |
### 代理池配置(可选)
> **⚠️ 重要**: `SITE_URL` 必须配置为实际访问地址IP或域名否则RSS图片无法正常显示。例如
> - 本地开发: `http://localhost:5000`
> - 局域网部署: `http://192.168.1.100:5000`
> - 公网域名: `https://你的域名.com`
文章内容获取接口(`POST /api/article`)会访问微信文章页面,频繁请求可能触发微信验证码保护。配置代理池可以将请求分散到不同 IP降低风控风险。
### SOCKS5 代理池配置(⚠️ 强烈建议)
**重要提示**:
- ⚠️ **启用完整内容获取时,强烈建议配置代理池,避免账号被微信风控**
- ⚠️ **不配置代理直连微信可能导致频繁验证、账号限制、IP封禁**
- ✅ **配置2-3个代理IP可有效分散请求降低风控风险**
**用途**:获取文章完整内容时分散请求 IP配合 Chrome TLS 指纹模拟,有效规避微信风控。
> 本项目使用 `curl_cffi` 模拟 Chrome TLS 指纹,请求特征与真实浏览器一致,配合代理池效果更佳。
@ -370,6 +476,9 @@ PROXY_URLS=socks5://myuser:mypass@vps1-ip:1080,socks5://myuser:mypass@vps2-ip:10
│ ├── rate_limiter.py # 限频器
│ ├── rss_store.py # RSS 数据存储SQLite
│ ├── rss_poller.py # RSS 后台轮询器
│ ├── content_processor.py # 内容处理与图片代理
│ ├── image_proxy.py # 图片URL代理工具
│ ├── article_fetcher.py # 批量并发获取文章
│ └── webhook.py # Webhook 通知
└── static/ # 前端页面(含 RSS 管理)
```
@ -476,6 +585,8 @@ Cookie 登录有效期约 4 天,过期后需重新扫码登录。配置 `WEBHO
</table>
- **GitHub Issues**: [提交问题](https://github.com/tmwgsicp/wechat-download-api/issues)
- **邮箱**: creator@waytomaster.com
- **SaaS 托管版**: [wechatrss.waytomaster.com](https://wechatrss.waytomaster.com)
---

BIN
assets/qrcode/group.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 282 KiB

View File

@ -21,13 +21,22 @@ WEBHOOK_NOTIFICATION_INTERVAL=300
# RSS 订阅配置
# 轮询间隔(秒),默认 36001 小时)
RSS_POLL_INTERVAL=3600
# RSS 轮询时是否获取完整文章内容true/false默认 true
# ⚠️ 启用时强烈建议配置下方的 PROXY_URLS避免账号被微信风控
RSS_FETCH_FULL_CONTENT=true
# 代理池 (留空则直连,多个用逗号分隔)
# 支持 HTTP / SOCKS5 代理,用于分散请求 IP 降低风控风险
# 示例: socks5://ip1:1080,http://ip2:8080,socks5://user:pass@ip3:1080
# SOCKS5 代理池(⚠️ 启用RSS完整内容时强烈建议配置避免账号风控
# 用途:分散请求 IP配合 Chrome TLS 指纹模拟,有效规避微信封控
# 不配置代理直连微信可能导致频繁验证、账号限制、IP 封禁
# 支持 SOCKS5 代理,多个用逗号分隔,建议 2-3 个即可
# 示例: socks5://ip1:1080,socks5://ip2:1080,socks5://user:pass@ip3:1080
# 留空则直连(仅适用于少量订阅或禁用 RSS_FETCH_FULL_CONTENT 的情况)
PROXY_URLS=
# 服务配置
# 网站URL(用于RSS图片代理,必须配置为实际访问地址)
# 例如: http://你的IP:5000 或 https://你的域名.com
SITE_URL=http://localhost:5000
PORT=5000
HOST=0.0.0.0
DEBUG=false

View File

@ -69,6 +69,7 @@ async def get_article(article_request: ArticleRequest, request: Request):
html = await fetch_page(
article_request.url,
extra_headers={"Referer": "https://mp.weixin.qq.com/"},
timeout=120 # WeChat 大文章可能超时,延长至 120 秒
)
if "js_content" not in html:

View File

@ -13,8 +13,6 @@ import time
import logging
from datetime import datetime, timezone
from html import escape as html_escape
from urllib.parse import quote
from xml.etree.ElementTree import Element, SubElement, tostring
from typing import Optional
from fastapi import APIRouter, HTTPException, Query, Request
@ -23,6 +21,7 @@ from pydantic import BaseModel, Field
from utils import rss_store
from utils.rss_poller import rss_poller, POLL_INTERVAL
from utils.image_proxy import proxy_image_url
logger = logging.getLogger(__name__)
@ -120,8 +119,11 @@ async def get_subscriptions(request: Request):
items = []
for s in subs:
# 将头像 URL 转换为代理链接
head_img = proxy_image_url(s.get("head_img", ""), base_url)
items.append({
**s,
"head_img": head_img,
"rss_url": f"{base_url}/api/rss/{s['fakeid']}",
})
@ -173,13 +175,6 @@ async def poller_status():
# ── RSS XML 输出 ──────────────────────────────────────────
def _proxy_cover(url: str, base_url: str) -> str:
"""将微信 CDN 封面图地址替换为本服务的图片代理地址"""
if url and "mmbiz.qpic.cn" in url:
return base_url + "/api/image?url=" + quote(url, safe="")
return url
def _rfc822(ts: int) -> str:
"""Unix 时间戳 → RFC 822 日期字符串"""
if not ts:
@ -190,81 +185,137 @@ def _rfc822(ts: int) -> str:
def _build_rss_xml(fakeid: str, sub: dict, articles: list,
base_url: str) -> str:
rss = Element("rss", version="2.0")
rss.set("xmlns:atom", "http://www.w3.org/2005/Atom")
channel = SubElement(rss, "channel")
SubElement(channel, "title").text = sub.get("nickname") or fakeid
SubElement(channel, "link").text = "https://mp.weixin.qq.com"
SubElement(channel, "description").text = (
f'{sub.get("nickname", "")} 的微信公众号文章 RSS 订阅'
)
SubElement(channel, "language").text = "zh-CN"
SubElement(channel, "lastBuildDate").text = _rfc822(int(time.time()))
SubElement(channel, "generator").text = "WeChat Download API"
atom_link = SubElement(channel, "atom:link")
atom_link.set("href", f"{base_url}/api/rss/{fakeid}")
atom_link.set("rel", "self")
atom_link.set("type", "application/rss+xml")
"""
构建 RSS XML使用 CDATA 包裹 HTML 内容
"""
from xml.dom import minidom
# 创建 XML 文档
doc = minidom.Document()
# 创建根元素
rss = doc.createElement("rss")
rss.setAttribute("version", "2.0")
rss.setAttribute("xmlns:atom", "http://www.w3.org/2005/Atom")
doc.appendChild(rss)
# 创建 channel
channel = doc.createElement("channel")
rss.appendChild(channel)
# Channel 基本信息
def add_text_element(parent, tag, text):
elem = doc.createElement(tag)
elem.appendChild(doc.createTextNode(str(text)))
parent.appendChild(elem)
return elem
add_text_element(channel, "title", sub.get("nickname") or fakeid)
add_text_element(channel, "link", "https://mp.weixin.qq.com")
add_text_element(channel, "description",
f'{sub.get("nickname", "")} 的微信公众号文章 RSS 订阅')
add_text_element(channel, "language", "zh-CN")
add_text_element(channel, "lastBuildDate", _rfc822(int(time.time())))
add_text_element(channel, "generator", "WeChat Download API")
# atom:link
atom_link = doc.createElement("atom:link")
atom_link.setAttribute("href", f"{base_url}/api/rss/{fakeid}")
atom_link.setAttribute("rel", "self")
atom_link.setAttribute("type", "application/rss+xml")
channel.appendChild(atom_link)
# Channel 图片
if sub.get("head_img"):
image = SubElement(channel, "image")
SubElement(image, "url").text = sub["head_img"]
SubElement(image, "title").text = sub.get("nickname", "")
SubElement(image, "link").text = "https://mp.weixin.qq.com"
image = doc.createElement("image")
head_img_proxied = proxy_image_url(sub["head_img"], base_url)
add_text_element(image, "url", head_img_proxied)
add_text_element(image, "title", sub.get("nickname", ""))
add_text_element(image, "link", "https://mp.weixin.qq.com")
channel.appendChild(image)
# 文章列表
for a in articles:
item = SubElement(channel, "item")
SubElement(item, "title").text = a.get("title", "")
item = doc.createElement("item")
add_text_element(item, "title", a.get("title", ""))
link = a.get("link", "")
SubElement(item, "link").text = link
guid = SubElement(item, "guid")
guid.text = link
guid.set("isPermaLink", "true")
add_text_element(item, "link", link)
guid = doc.createElement("guid")
guid.setAttribute("isPermaLink", "true")
guid.appendChild(doc.createTextNode(link))
item.appendChild(guid)
if a.get("publish_time"):
SubElement(item, "pubDate").text = _rfc822(a["publish_time"])
add_text_element(item, "pubDate", _rfc822(a["publish_time"]))
if a.get("author"):
SubElement(item, "author").text = a["author"]
cover = _proxy_cover(a.get("cover", ""), base_url)
add_text_element(item, "author", a["author"])
# 构建 description HTML
cover = proxy_image_url(a.get("cover", ""), base_url)
digest = html_escape(a.get("digest", "")) if a.get("digest") else ""
author = html_escape(a.get("author", "")) if a.get("author") else ""
title_escaped = html_escape(a.get("title", ""))
content_html = a.get("content", "")
html_parts = []
if cover:
if content_html:
# 统一策略:入库时已代理(见utils/rss_poller.py:236),RSS输出时直接使用
html_parts.append(
f'<div style="margin-bottom:12px">'
f'<a href="{html_escape(link)}">'
f'<img src="{html_escape(cover)}" alt="{title_escaped}" '
f'style="max-width:100%;height:auto;border-radius:8px" />'
f'</a></div>'
f'<div style="font-size:16px;line-height:1.8;color:#333">'
f'{content_html}'
f'</div>'
)
if digest:
if author:
html_parts.append(
f'<hr style="margin:24px 0;border:none;border-top:1px solid #eee" />'
f'<p style="color:#888;font-size:13px;margin:0">作者: {author}</p>'
)
else:
if cover:
html_parts.append(
f'<div style="margin-bottom:12px">'
f'<a href="{html_escape(link)}">'
f'<img src="{html_escape(cover)}" alt="{title_escaped}" '
f'style="max-width:100%;height:auto;border-radius:8px" />'
f'</a></div>'
)
if digest:
html_parts.append(
f'<p style="color:#333;font-size:15px;line-height:1.8;'
f'margin:0 0 16px">{digest}</p>'
)
if author:
html_parts.append(
f'<p style="color:#888;font-size:13px;margin:0 0 12px">'
f'作者: {author}</p>'
)
html_parts.append(
f'<p style="color:#333;font-size:15px;line-height:1.8;'
f'margin:0 0 16px">{digest}</p>'
f'<p style="margin:0"><a href="{html_escape(link)}" '
f'style="color:#1890ff;text-decoration:none;font-size:14px">'
f'阅读原文 &rarr;</a></p>'
)
if author:
html_parts.append(
f'<p style="color:#888;font-size:13px;margin:0 0 12px">'
f'作者: {author}</p>'
)
html_parts.append(
f'<p style="margin:0"><a href="{html_escape(link)}" '
f'style="color:#1890ff;text-decoration:none;font-size:14px">'
f'阅读原文 &rarr;</a></p>'
)
SubElement(item, "description").text = "\n".join(html_parts)
xml_bytes = tostring(rss, encoding="unicode", xml_declaration=False)
return '<?xml version="1.0" encoding="UTF-8"?>\n' + xml_bytes
# 使用 CDATA 包裹 HTML 内容
description = doc.createElement("description")
cdata = doc.createCDATASection("\n".join(html_parts))
description.appendChild(cdata)
item.appendChild(description)
channel.appendChild(item)
# 生成 XML 字符串
xml_str = doc.toprettyxml(indent=" ", encoding=None)
# 移除多余的空行和 XML 声明(我们自己添加)
lines = [line for line in xml_str.split('\n') if line.strip()]
xml_str = '\n'.join(lines[1:]) # 跳过默认的 XML 声明
return '<?xml version="1.0" encoding="UTF-8"?>\n' + xml_str
@router.get("/rss/{fakeid}", summary="获取 RSS 订阅源",

View File

@ -8,12 +8,13 @@
搜索路由 - FastAPI版本
"""
from fastapi import APIRouter, Query
from fastapi import APIRouter, Query, Request
from pydantic import BaseModel
from typing import Optional, List
import time
import httpx
from utils.auth_manager import auth_manager
from utils.image_proxy import proxy_image_url
router = APIRouter()
@ -30,7 +31,7 @@ class SearchResponse(BaseModel):
error: Optional[str] = None
@router.get("/searchbiz", response_model=SearchResponse, summary="搜索公众号")
async def search_accounts(query: str = Query(..., description="公众号名称或关键词", alias="query")):
async def search_accounts(query: str = Query(..., description="公众号名称或关键词", alias="query"), request: Request = None):
"""
按关键词搜索微信公众号获取 FakeID
@ -78,14 +79,19 @@ async def search_accounts(query: str = Query(..., description="公众号名称
if result.get("base_resp", {}).get("ret") == 0:
accounts = result.get("list", [])
# 获取 base_url 用于图片代理
base_url = str(request.base_url).rstrip("/") if request else ""
# 格式化返回数据
formatted_accounts = []
for acc in accounts:
# 将头像 URL 转换为代理链接
round_head_img = proxy_image_url(acc.get("round_head_img", ""), base_url)
formatted_accounts.append({
"fakeid": acc.get("fakeid", ""),
"nickname": acc.get("nickname", ""),
"alias": acc.get("alias", ""),
"round_head_img": acc.get("round_head_img", ""),
"round_head_img": round_head_img,
"service_type": acc.get("service_type", 0)
})

130
utils/article_fetcher.py Normal file
View File

@ -0,0 +1,130 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
文章内容获取器 - SOCKS5 代理方案
使用 curl_cffi 模拟真实浏览器 TLS 指纹支持代理池轮转
"""
import asyncio
import logging
import os
from typing import Optional
logger = logging.getLogger(__name__)
async def fetch_article_content(
article_url: str,
timeout: int = 60,
wechat_token: Optional[str] = None,
wechat_cookie: Optional[str] = None
) -> Optional[str]:
"""
获取文章内容
请求策略
1. SOCKS5 代理池轮转
2. 直连兜底
Args:
article_url: 文章 URL
timeout: 超时时间
wechat_token: 微信 token用于鉴权
wechat_cookie: 微信 Cookie用于鉴权
Returns:
文章 HTML 内容失败返回 None
"""
# 使用代理池获取文章
html = await _fetch_via_proxy(article_url, timeout, wechat_cookie, wechat_token)
return html
async def _fetch_via_proxy(
article_url: str,
timeout: int,
wechat_cookie: Optional[str] = None,
wechat_token: Optional[str] = None
) -> Optional[str]:
"""通过 SOCKS5 代理或直连获取文章"""
try:
# 使用现有的 http_client支持代理池轮转 + 直连兜底)
from utils.http_client import fetch_page
logger.info("[Proxy] %s", article_url[:80])
# 构建完整 URL带 token
full_url = article_url
if wechat_token:
separator = '&' if '?' in article_url else '?'
full_url = f"{article_url}{separator}token={wechat_token}"
# 准备请求头
extra_headers = {"Referer": "https://mp.weixin.qq.com/"}
if wechat_cookie:
extra_headers["Cookie"] = wechat_cookie
html = await fetch_page(
full_url,
extra_headers=extra_headers,
timeout=timeout
)
# 验证内容有效性
if "js_content" in html and len(html) > 500000:
logger.info("[Proxy] ✅ len=%d", len(html))
return html
else:
logger.warning("[Proxy] ❌ 内容无效 (len=%d, has_js_content=%s)",
len(html), "js_content" in html)
return None
except Exception as e:
logger.error("[Proxy] ❌ %s", str(e)[:100])
return None
async def fetch_articles_batch(
article_urls: list,
max_concurrency: int = 5,
timeout: int = 60,
wechat_token: Optional[str] = None,
wechat_cookie: Optional[str] = None
) -> dict:
"""
批量获取文章内容并发版
Args:
article_urls: 文章 URL 列表
max_concurrency: 最大并发数
timeout: 单个请求超时时间
wechat_token: 微信 token用于鉴权
wechat_cookie: 微信 Cookie用于鉴权
Returns:
{url: html} 字典失败的 URL 对应 None
"""
semaphore = asyncio.Semaphore(max_concurrency)
results = {}
async def fetch_one(url):
async with semaphore:
html = await fetch_article_content(url, timeout, wechat_token, wechat_cookie)
results[url] = html
# 避免请求过快
await asyncio.sleep(0.5)
logger.info("[Batch] 开始批量获取 %d 篇文章", len(article_urls))
await asyncio.gather(
*[fetch_one(url) for url in article_urls],
return_exceptions=True
)
success_count = sum(1 for html in results.values() if html)
fail_count = len(results) - success_count
logger.info("[Batch] 完成: 成功=%d, 失败=%d", success_count, fail_count)
return results

308
utils/content_processor.py Normal file
View File

@ -0,0 +1,308 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
图文内容处理器 - 完美还原微信文章的图文混合内容
"""
import re
import logging
from typing import Dict, List
from urllib.parse import quote
logger = logging.getLogger(__name__)
def process_article_content(html: str, proxy_base_url: str = None) -> Dict:
"""
处理文章内容保持图文顺序并代理图片
Args:
html: 原始 HTML
proxy_base_url: 图片代理基础 URL例如https://你的域名.com
Returns:
{
'content': '处理后的 HTML图片已代理',
'plain_content': '纯文本',
'images': ['图片URL列表'],
'has_images': True/False
}
"""
# 1. 提取正文内容(保持原始 HTML 结构)
content = extract_content(html)
if not content:
return {
'content': '',
'plain_content': '',
'images': [],
'has_images': False
}
# 2. 提取所有图片 URL按顺序
images = extract_images_in_order(content)
# 3. 代理图片 URL保持 HTML 中的图片顺序)
if proxy_base_url:
content = proxy_all_images(content, proxy_base_url)
# 4. 清理和优化 HTML
content = clean_html(content)
# 5. 生成纯文本
plain_content = html_to_text(content)
return {
'content': content,
'plain_content': plain_content,
'images': images,
'has_images': len(images) > 0
}
def extract_content(html: str) -> str:
"""
提取文章正文保持原始 HTML 结构
微信文章的正文在 id="js_content" div
这个 div 内的 HTML 已经按正确顺序排列了文本和图片
"""
# 方法 1: 匹配 id="js_content" (改进版,更灵活)
match = re.search(
r'<div[^>]*\bid=["\']js_content["\'][^>]*>(.*?)</div>',
html,
re.DOTALL | re.IGNORECASE
)
if match:
return match.group(1).strip()
# 方法 2: 匹配 class="rich_media_content"
match = re.search(
r'<div[^>]*\bclass=["\'][^"\']*rich_media_content[^"\']*["\'][^>]*>(.*?)</div>',
html,
re.DOTALL | re.IGNORECASE
)
if match:
return match.group(1).strip()
logger.warning("未能提取文章正文")
return ""
def extract_images_in_order(content: str) -> List[str]:
"""
按顺序提取所有图片 URL
微信文章的图片有两种属性
1. data-src主要- 懒加载图片
2. src备用- 直接加载图片
"""
images = []
# 提取所有 <img> 标签(按 HTML 中的顺序)
img_pattern = re.compile(r'<img[^>]*>', re.IGNORECASE)
for img_tag in img_pattern.finditer(content):
img_html = img_tag.group(0)
# 优先提取 data-src
data_src_match = re.search(r'data-src="([^"]+)"', img_html)
if data_src_match:
img_url = data_src_match.group(1)
if is_valid_image_url(img_url) and img_url not in images:
images.append(img_url)
continue
# 备用:提取 src
src_match = re.search(r'src="([^"]+)"', img_html)
if src_match:
img_url = src_match.group(1)
if is_valid_image_url(img_url) and img_url not in images:
images.append(img_url)
logger.info(f"提取到 {len(images)} 张图片(按顺序)")
return images
def proxy_all_images(content: str, proxy_base_url: str) -> str:
"""
代理所有图片 URL保持 HTML 中的图片顺序
替换策略
1. 提取图片URLdata-src src
2. 替换为代理URL
3. 确保同时有 data-src src 属性RSS阅读器需要src
重要: RSS 阅读器需要 src 属性才能显示图片!
"""
def replace_img_tag(match):
"""替换单个 <img> 标签"""
img_html = match.group(0)
# 提取原始图片 URL优先data-src其次src
data_src_match = re.search(r'data-src="([^"]+)"', img_html, re.IGNORECASE)
src_match = re.search(r'\ssrc="([^"]+)"', img_html, re.IGNORECASE)
original_url = None
if data_src_match:
original_url = data_src_match.group(1)
elif src_match:
original_url = src_match.group(1)
if not original_url or not is_valid_image_url(original_url):
return img_html
# 生成代理 URL
proxy_url = f"{proxy_base_url}/api/image?url={quote(original_url, safe='')}"
new_html = img_html
# 第一步:替换 data-src如果有
if data_src_match:
new_html = re.sub(
r'data-src="[^"]+"',
f'data-src="{proxy_url}"',
new_html,
count=1,
flags=re.IGNORECASE
)
# 第二步:处理 src 属性
if src_match:
# 已有 src直接替换
new_html = re.sub(
r'\ssrc="[^"]+"',
f' src="{proxy_url}"',
new_html,
count=1,
flags=re.IGNORECASE
)
else:
# 没有 src必须添加使用最简单可靠的方法
new_html = new_html.replace('<img', f'<img src="{proxy_url}"', 1)
# 处理大写
if 'src=' not in new_html:
new_html = new_html.replace('<IMG', f'<IMG src="{proxy_url}"', 1)
return new_html
# 替换所有 <img> 标签
content = re.sub(
r'<img[^>]*>',
replace_img_tag,
content,
flags=re.IGNORECASE
)
logger.info("图片 URL 已代理")
return content
def is_valid_image_url(url: str) -> bool:
"""判断是否为有效的图片 URL"""
if not url:
return False
# 排除 base64 和无效 URL
if url.startswith('data:'):
return False
# 只保留微信 CDN 图片
wechat_cdn_domains = [
'mmbiz.qpic.cn',
'mmbiz.qlogo.cn',
'wx.qlogo.cn'
]
return any(domain in url for domain in wechat_cdn_domains)
def clean_html(content: str) -> str:
"""
清理和优化 HTML
1. 移除 script 标签
2. 移除 style 标签可选
3. 移除空白标签
"""
# 移除 <script> 标签
content = re.sub(r'<script[^>]*>.*?</script>', '', content, flags=re.DOTALL | re.IGNORECASE)
# 移除 <style> 标签(可选,保留可以保持样式)
# content = re.sub(r'<style[^>]*>.*?</style>', '', content, flags=re.DOTALL | re.IGNORECASE)
# 移除空段落
content = re.sub(r'<p[^>]*>\s*</p>', '', content, flags=re.IGNORECASE)
# 移除多余空白
content = re.sub(r'\n\s*\n', '\n', content)
return content.strip()
def html_to_text(html: str) -> str:
"""将 HTML 转为纯文本(移除图片,只保留文字)"""
import html as html_module
# 移除图片标签
text = re.sub(r'<img[^>]*>', '', html, flags=re.IGNORECASE)
# 移除其他标签
text = re.sub(r'<br\s*/?>', '\n', text, flags=re.IGNORECASE)
text = re.sub(r'</(?:p|div|section|h[1-6])>', '\n', text, flags=re.IGNORECASE)
text = re.sub(r'<[^>]+>', '', text)
# HTML 实体解码
text = html_module.unescape(text)
# 清理空白
text = re.sub(r'[ \t]+', ' ', text)
text = re.sub(r'\n{3,}', '\n\n', text)
return text.strip()
# ==================== 使用示例 ====================
def example_usage():
"""使用示例"""
# 假设这是从微信获取的原始 HTML
original_html = """
<html>
<body>
<div id="js_content">
<p>这是第一段文字</p>
<p><img data-src="https://mmbiz.qpic.cn/image1.jpg" /></p>
<p>这是第二段文字</p>
<p><img data-src="https://mmbiz.qpic.cn/image2.jpg" /></p>
<p>这是第三段文字</p>
</div>
</body>
</html>
"""
# 处理内容
result = process_article_content(
html=original_html,
proxy_base_url="https://wechatrss.waytomaster.com"
)
print("处理后的 HTML:")
print(result['content'])
print("\n图片列表(按顺序):")
for i, img in enumerate(result['images'], 1):
print(f" {i}. {img}")
print("\n纯文本:")
print(result['plain_content'])
if __name__ == "__main__":
example_usage()

View File

@ -9,8 +9,10 @@ HTTP 客户端封装
优先使用 curl_cffi模拟 Chrome TLS 指纹不可用时自动降级到 httpx
支持代理池轮转当前代理失败 尝试下一个 全部失败 直连兜底
注意curl_cffi AsyncSession 在部分环境下 SOCKS5 代理不工作
因此代理场景使用同步 Session + 线程池来规避此问题
注意
1. curl_cffi AsyncSession 在部分环境下 SOCKS5 代理不工作
因此代理场景使用同步 Session + 线程池来规避此问题
2. 优先使用 SOCKS5 代理避免被封禁
"""
import asyncio
@ -86,6 +88,8 @@ async def fetch_page(url: str, extra_headers: Optional[Dict] = None,
async def _do_fetch(url: str, headers: Dict, timeout: int,
proxy: Optional[str]) -> str:
"""执行实际的HTTP请求"""
# SOCKS5 代理或无代理:正常请求
if HAS_CURL_CFFI:
return await _fetch_curl_cffi(url, headers, timeout, proxy)
return await _fetch_httpx(url, headers, timeout, proxy)
@ -103,7 +107,7 @@ async def _fetch_curl_cffi(url: str, headers: Dict, timeout: int,
def _fetch_curl_cffi_sync(url: str, headers: Dict, timeout: int,
proxy: Optional[str]) -> str:
"""同步请求,在线程池中执行。规避 AsyncSession + SOCKS5 代理的兼容性问题。"""
kwargs = {"timeout": timeout, "allow_redirects": True}
kwargs = {"timeout": timeout, "allow_redirects": True, "verify": False} # 跳过 SSL 验证
if proxy:
kwargs["proxy"] = proxy
with CurlSession(impersonate="chrome120") as session:

91
utils/image_proxy.py Normal file
View File

@ -0,0 +1,91 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (C) 2026 tmwgsicp
# Licensed under the GNU Affero General Public License v3.0
# See LICENSE file in the project root for full license text.
# SPDX-License-Identifier: AGPL-3.0-only
"""
图片 URL 处理工具
统一处理微信 CDN HTTP 图片转 HTTPS 代理
"""
from urllib.parse import quote
def proxy_image_url(url: str, base_url: str) -> str:
"""
将微信 CDN HTTP 图片 URL 转换为 HTTPS 代理 URL
Args:
url: 原始图片 URL
base_url: 服务基础 URL ( http://localhost:8000 https://your-domain.com)
Returns:
代理后的 HTTPS URL 或原始 URL
Examples:
>>> proxy_image_url("http://mmbiz.qpic.cn/xxx.jpg", "https://example.com")
'https://example.com/api/image?url=http%3A//mmbiz.qpic.cn/xxx.jpg'
>>> proxy_image_url("https://example.com/image.jpg", "https://example.com")
'https://example.com/image.jpg'
"""
if not url:
return ""
# 防止重复代理:如果 URL 已经是代理 URL直接返回
if "/api/image?url=" in url:
return url
# 只代理微信 CDN 的图片
if "mmbiz.qpic.cn" in url or "mmbiz.qlogo.cn" in url or "wx.qlogo.cn" in url:
return f"{base_url.rstrip('/')}/api/image?url={quote(url, safe='')}"
return url
def proxy_content_images(html_content: str, base_url: str) -> str:
"""
代理 HTML 内容中的所有微信图片 URL
Args:
html_content: 文章 HTML 内容
base_url: 服务基础 URL
Returns:
代理后的 HTML 内容
"""
import re
if not html_content:
return ""
# 替换 data-src 属性
def replace_data_src(match):
url = match.group(1)
proxied_url = proxy_image_url(url, base_url)
return f'data-src="{proxied_url}" src="{proxied_url}"'
html_content = re.sub(
r'data-src="([^"]+)"',
replace_data_src,
html_content
)
# 替换 src 属性(避免重复替换已经有 data-src 的)
def replace_src(match):
full_tag = match.group(0)
# 如果已经有 data-src跳过
if 'data-src=' in full_tag:
return full_tag
url = match.group(1)
proxied_url = proxy_image_url(url, base_url)
return f'src="{proxied_url}"'
html_content = re.sub(
r'src="([^"]+)"',
replace_src,
html_content
)
return html_content

View File

@ -20,11 +20,14 @@ import httpx
from utils.auth_manager import auth_manager
from utils import rss_store
from utils.helpers import extract_article_info, parse_article_url
from utils.http_client import fetch_page
logger = logging.getLogger(__name__)
POLL_INTERVAL = int(os.getenv("RSS_POLL_INTERVAL", "3600"))
ARTICLES_PER_POLL = 10
FETCH_FULL_CONTENT = os.getenv("RSS_FETCH_FULL_CONTENT", "true").lower() == "true"
class RSSPoller:
@ -83,6 +86,10 @@ class RSSPoller:
for fakeid in fakeids:
try:
articles = await self._fetch_article_list(fakeid, creds)
if articles and FETCH_FULL_CONTENT:
# 获取完整文章内容
articles = await self._enrich_articles_content(articles)
if articles:
new_count = rss_store.save_articles(fakeid, articles)
if new_count > 0:
@ -164,6 +171,91 @@ class RSSPoller:
async def poll_now(self):
"""手动触发一次轮询"""
await self._poll_all()
async def _enrich_articles_content(self, articles: List[Dict]) -> List[Dict]:
"""
批量获取文章完整内容并发版
限制最多获取 20 篇文章的完整内容避免大量文章导致轮询过久
Args:
articles: 文章列表包含基本信息
Returns:
enriched_articles: 包含完整内容的文章列表
"""
from utils.article_fetcher import fetch_articles_batch
from utils.content_processor import process_article_content
# 提取所有文章链接
article_links = [a.get("link", "") for a in articles if a.get("link")]
if not article_links:
return articles
# 限制最多获取 20 篇5个批次可能返回100+篇)
max_fetch = 20
if len(article_links) > max_fetch:
logger.info("文章数 %d 篇超过限制,仅获取最近 %d 篇的完整内容",
len(article_links), max_fetch)
article_links = article_links[:max_fetch]
articles = articles[:max_fetch]
logger.info("开始批量获取 %d 篇文章的完整内容", len(article_links))
# 获取微信凭证(从环境变量读取)
wechat_token = os.getenv("WECHAT_TOKEN", "")
wechat_cookie = os.getenv("WECHAT_COOKIE", "")
# 批量并发获取max_concurrency=5传递微信凭证
results = await fetch_articles_batch(
article_links,
max_concurrency=5,
timeout=60,
wechat_token=wechat_token,
wechat_cookie=wechat_cookie
)
# 处理结果并合并到原文章数据
enriched = []
for article in articles:
link = article.get("link", "")
if not link:
enriched.append(article)
continue
html = results.get(link)
if not html or "js_content" not in html:
logger.warning("❌ No content in HTML: %s", link[:80])
enriched.append(article)
continue
try:
# 使用 content_processor 处理文章内容(完美保持图文顺序)
# 从环境变量读取网站URL,入库时代理图片(与SaaS版策略一致)
site_url = os.getenv("SITE_URL", "http://localhost:5000").rstrip("/")
result = process_article_content(html, proxy_base_url=site_url)
# 合并到原文章数据
article["content"] = result.get("content", "")
article["plain_content"] = result.get("plain_content", "")
# 如果原始数据没有作者,从 HTML 中提取
if not article.get("author"):
from utils.helpers import extract_article_info, parse_article_url
article_info = extract_article_info(html, parse_article_url(link))
article["author"] = article_info.get("author", "")
logger.info("✅ Content fetched: %s... (%d chars, %d images)",
link[:50],
len(article["content"]),
len(result.get("images", [])))
except Exception as e:
logger.error("Failed to process content for %s: %s", link[:80], str(e))
enriched.append(article)
return enriched
rss_poller = RSSPoller()