diff --git a/.gitignore b/.gitignore index ee527b8..17235ad 100644 --- a/.gitignore +++ b/.gitignore @@ -60,3 +60,6 @@ fetch_progress.json # Logs logs/ + +# RSS database +data/ diff --git a/README.md b/README.md index 02b8d13..39f76a2 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ - **扫码登录** — 微信公众平台扫码登录,凭证自动保存 - **自动限频** — 内置三层限频机制(全局/IP/文章间隔),防止触发微信风控 - **Webhook 通知** — 登录过期、触发验证等事件自动推送 +- **RSS 订阅** — 订阅任意公众号,自动定时拉取新文章,生成标准 RSS 2.0 源 - **API 文档** — 自动生成 Swagger UI,在线调试所有接口
@@ -165,6 +166,46 @@ curl "http://localhost:5000/api/public/articles?fakeid=YOUR_FAKEID&begin=50&coun curl "http://localhost:5000/api/public/articles/search?fakeid=YOUR_FAKEID&query=关键词" ``` +### RSS 订阅 + +`GET /api/rss/{fakeid}` — 获取指定公众号的 RSS 2.0 订阅源 + +| 参数 | 类型 | 必填 | 说明 | +|------|------|------|------| +| `fakeid` | string(路径) | 是 | 公众号 FakeID | +| `limit` | int(查询) | 否 | 返回文章数量上限,默认 `20` | + +使用方式: + +```bash +# 1. 搜索公众号获取 fakeid +curl "http://localhost:5000/api/public/searchbiz?query=人民日报" +# 返回 fakeid: MzA1MjM1ODk2MA== + +# 2. 添加订阅 +curl -X POST http://localhost:5000/api/rss/subscribe \ + -H "Content-Type: application/json" \ + -d '{"fakeid": "MzA1MjM1ODk2MA==", "nickname": "人民日报"}' + +# 3. 手动触发一次轮询(立即拉取文章) +curl -X POST http://localhost:5000/api/rss/poll + +# 4. 获取 RSS 源(把这个地址添加到 RSS 阅读器) +curl "http://localhost:5000/api/rss/MzA1MjM1ODk2MA==" +``` + +也可以通过管理面板的 **RSS 订阅** 页面可视化管理,搜索公众号一键订阅并复制 RSS 地址。 + +#### RSS 订阅管理接口 + +| 方法 | 路径 | 说明 | +|------|------|------| +| `POST` | `/api/rss/subscribe` | 添加 RSS 订阅 | +| `DELETE` | `/api/rss/subscribe/{fakeid}` | 取消 RSS 订阅 | +| `GET` | `/api/rss/subscriptions` | 获取订阅列表 | +| `POST` | `/api/rss/poll` | 手动触发轮询 | +| `GET` | `/api/rss/status` | 轮询器状态 | + ### 其他接口 | 方法 | 路径 | 说明 | @@ -201,7 +242,93 @@ cp env.example .env | `RATE_LIMIT_GLOBAL` | 全局每分钟请求上限 | 10 | | `RATE_LIMIT_PER_IP` | 单 IP 每分钟请求上限 | 5 | | `RATE_LIMIT_ARTICLE_INTERVAL` | 文章请求最小间隔(秒) | 3 | +| `RSS_POLL_INTERVAL` | RSS 轮询间隔(秒) | 3600 | +| `PROXY_URLS` | 代理池地址(多个逗号分隔,留空直连) | 空 | | `PORT` | 服务端口 | 5000 | +| `HOST` | 监听地址 | 0.0.0.0 | +| `DEBUG` | 调试模式(开启热重载) | false | + +### 代理池配置(可选) + +文章内容获取接口(`POST /api/article`)会访问微信文章页面,频繁请求可能触发微信验证码保护。配置代理池可以将请求分散到不同 IP,降低风控风险。 + +> 本项目使用 `curl_cffi` 模拟 Chrome TLS 指纹,请求特征与真实浏览器一致,配合代理池效果更佳。 + +**方案:多台 VPS 自建 SOCKS5 代理** + +准备 2-3 台低价 VPS(各大云厂商轻量应用服务器即可,¥20-30/月/台),每台运行一个 SOCKS5 代理服务。推荐 [gost](https://github.com/go-gost/gost)(Go 语言实现,单二进制文件,无依赖)。 + +**第一步:在每台 VPS 上安装 gost** + +```bash +# 下载最新版(以 Linux amd64 为例,其他架构请去 GitHub Releases 页面选择) +# 国外服务器直接下载 +wget https://github.com/go-gost/gost/releases/download/v3.2.6/gost_3.2.6_linux_amd64.tar.gz + +# 国内服务器使用加速镜像(任选一个可用的) +wget https://gh-proxy.com/https://github.com/go-gost/gost/releases/download/v3.2.6/gost_3.2.6_linux_amd64.tar.gz +# 或 +wget https://ghproxy.cc/https://github.com/go-gost/gost/releases/download/v3.2.6/gost_3.2.6_linux_amd64.tar.gz + +# 解压并移动到系统路径 +tar -xzf gost_3.2.6_linux_amd64.tar.gz +mv gost /usr/local/bin/ +chmod +x /usr/local/bin/gost + +# 验证安装 +gost -V +``` + +**第二步:启动 SOCKS5 代理服务** + +```bash +# 带用户名密码认证(推荐,替换 myuser / mypass 和端口) +gost -L socks5://myuser:mypass@:1080 + +# 不带认证(仅内网或已配置防火墙时使用) +gost -L socks5://:1080 +``` + +**第三步:配置为 systemd 服务(开机自启)** + +```bash +cat > /etc/systemd/system/gost.service << 'EOF' +[Unit] +Description=GOST Proxy +After=network.target + +[Service] +Type=simple +ExecStart=/usr/local/bin/gost -L socks5://myuser:mypass@:1080 +Restart=always +RestartSec=5 + +[Install] +WantedBy=multi-user.target +EOF + +systemctl daemon-reload +systemctl enable gost +systemctl start gost +``` + +**第四步:开放防火墙端口** + +```bash +# 仅允许你的主服务器 IP 连接(替换为实际 IP) +ufw allow from YOUR_MAIN_SERVER_IP to any port 1080 + +# 或者如果用的是云厂商安全组,在控制台添加入站规则: +# 端口 1080 / TCP / 来源 IP 限制为你的主服务器 +``` + +**第五步:在主服务器 `.env` 中配置代理池** + +```bash +PROXY_URLS=socks5://myuser:mypass@vps1-ip:1080,socks5://myuser:mypass@vps2-ip:1080,socks5://myuser:mypass@vps3-ip:1080 +``` + +配置后重启服务,每次文章请求会轮流使用不同的代理 IP。可以通过 `GET /api/health` 确认代理池状态。留空则直连(默认行为)。 --- @@ -211,9 +338,12 @@ cp env.example .env ├── app.py # FastAPI 主应用 ├── requirements.txt # Python 依赖 ├── env.example # 环境变量示例 +├── data/ # 数据目录(运行时自动创建) +│ └── rss.db # RSS 订阅 SQLite 数据库 ├── routes/ # API 路由 │ ├── article.py # 文章内容获取 │ ├── articles.py # 文章列表 +│ ├── rss.py # RSS 订阅管理与输出 │ ├── search.py # 公众号搜索 │ ├── login.py # 扫码登录 │ ├── admin.py # 管理接口 @@ -223,9 +353,13 @@ cp env.example .env ├── utils/ # 工具模块 │ ├── auth_manager.py # 认证管理 │ ├── helpers.py # HTML 解析 +│ ├── http_client.py # HTTP 客户端(curl_cffi + 代理池) +│ ├── proxy_pool.py # 代理池轮转 │ ├── rate_limiter.py # 限频器 +│ ├── rss_store.py # RSS 数据存储(SQLite) +│ ├── rss_poller.py # RSS 后台轮询器 │ └── webhook.py # Webhook 通知 -└── static/ # 前端页面 +└── static/ # 前端页面(含 RSS 管理) ``` --- diff --git a/app.py b/app.py index 7e706e8..383e14e 100644 --- a/app.py +++ b/app.py @@ -9,15 +9,18 @@ 主应用文件 """ +from contextlib import asynccontextmanager + from fastapi import FastAPI from fastapi.staticfiles import StaticFiles -from fastapi.responses import FileResponse, JSONResponse, HTMLResponse +from fastapi.responses import FileResponse, HTMLResponse from fastapi.middleware.cors import CORSMiddleware -import os from pathlib import Path # 导入路由 -from routes import article, articles, search, admin, login, image, health, stats +from routes import article, articles, search, admin, login, image, health, stats, rss +from utils.rss_store import init_db +from utils.rss_poller import rss_poller API_DESCRIPTION = """ 微信公众号文章下载 API,支持文章解析、公众号搜索、文章列表获取等功能。 @@ -34,6 +37,29 @@ API_DESCRIPTION = """ 所有核心接口都需要先登录。登录后凭证自动保存到 `.env` 文件,服务重启后无需重新登录(有效期约 4 天)。 """ + +@asynccontextmanager +async def lifespan(app: FastAPI): + """应用生命周期:启动和关闭""" + env_file = Path(__file__).parent / ".env" + if not env_file.exists(): + print("\n" + "=" * 60) + print("[WARNING] .env file not found") + print("=" * 60) + print("Please configure .env file or login via admin page") + print("Visit: http://localhost:5000/admin.html") + print("=" * 60 + "\n") + else: + print("\n" + "=" * 60) + print("[OK] .env file loaded") + print("=" * 60 + "\n") + + init_db() + await rss_poller.start() + yield + await rss_poller.stop() + + app = FastAPI( title="WeChat Download API", description=API_DESCRIPTION, @@ -45,6 +71,7 @@ app = FastAPI( "name": "AGPL-3.0", "url": "https://www.gnu.org/licenses/agpl-3.0.html", }, + lifespan=lifespan, ) # CORS配置 @@ -65,6 +92,7 @@ app.include_router(search.router, prefix="/api/public", tags=["公众号搜索"] app.include_router(admin.router, prefix="/api/admin", tags=["管理"]) app.include_router(login.router, prefix="/api/login", tags=["登录"]) app.include_router(image.router, prefix="/api", tags=["图片代理"]) +app.include_router(rss.router, prefix="/api", tags=["RSS 订阅"]) # 静态文件 static_dir = Path(__file__).parent / "static" @@ -107,39 +135,34 @@ async def verify_page(): """验证页面""" return FileResponse(static_dir / "verify.html") -# 启动事件 -@app.on_event("startup") -async def startup_event(): - """启动时检查配置""" - env_file = Path(__file__).parent / ".env" - if not env_file.exists(): - print("\n" + "=" * 60) - print("[WARNING] .env file not found") - print("=" * 60) - print("Please configure .env file or login via admin page") - print("Visit: http://localhost:5000/admin.html") - print("=" * 60 + "\n") - else: - print("\n" + "=" * 60) - print("[OK] .env file loaded") - print("=" * 60 + "\n") +@app.get("/rss.html", include_in_schema=False) +async def rss_page(): + """RSS 订阅管理页面""" + return FileResponse(static_dir / "rss.html") if __name__ == "__main__": + import os import uvicorn - + from dotenv import load_dotenv + + load_dotenv() + host = os.getenv("HOST", "0.0.0.0") + port = int(os.getenv("PORT", "5000")) + debug = os.getenv("DEBUG", "false").lower() in ("true", "1", "yes") + print("=" * 60) print("Wechat Article API Service - FastAPI Version") print("=" * 60) - print("Admin Page: http://localhost:5000/admin.html") - print("API Docs: http://localhost:5000/api/docs") - print("ReDoc Docs: http://localhost:5000/api/redoc") + print(f"Admin Page: http://localhost:{port}/admin.html") + print(f"API Docs: http://localhost:{port}/api/docs") + print(f"ReDoc Docs: http://localhost:{port}/api/redoc") print("First time? Please login via admin page") print("=" * 60) - + uvicorn.run( "app:app", - host="0.0.0.0", - port=5000, - reload=True, - log_level="info" + host=host, + port=port, + reload=debug, + log_level="debug" if debug else "info", ) diff --git a/env.example b/env.example index d2269ae..a08d0a2 100644 --- a/env.example +++ b/env.example @@ -18,6 +18,15 @@ WEBHOOK_URL= # 同一事件通知最小间隔(秒),防止重复轰炸 WEBHOOK_NOTIFICATION_INTERVAL=300 +# RSS 订阅配置 +# 轮询间隔(秒),默认 3600(1 小时) +RSS_POLL_INTERVAL=3600 + +# 代理池 (留空则直连,多个用逗号分隔) +# 支持 HTTP / SOCKS5 代理,用于分散请求 IP 降低风控风险 +# 示例: socks5://ip1:1080,http://ip2:8080,socks5://user:pass@ip3:1080 +PROXY_URLS= + # 服务配置 PORT=5000 HOST=0.0.0.0 diff --git a/requirements.txt b/requirements.txt index 7b408a7..3b1ce70 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ uvicorn[standard]==0.24.0 pydantic==2.5.0 httpx==0.25.2 python-dotenv==1.0.0 +curl_cffi>=0.7.0 diff --git a/routes/__init__.py b/routes/__init__.py index b8a92b8..392cdfb 100644 --- a/routes/__init__.py +++ b/routes/__init__.py @@ -9,6 +9,6 @@ """ # 导出所有路由模块 -from . import article, articles, search, admin, login, image, health, stats +from . import article, articles, search, admin, login, image, health, stats, rss -__all__ = ['article', 'articles', 'search', 'admin', 'login', 'image', 'health', 'stats'] +__all__ = ['article', 'articles', 'search', 'admin', 'login', 'image', 'health', 'stats', 'rss'] diff --git a/routes/article.py b/routes/article.py index e758fcc..20051c5 100644 --- a/routes/article.py +++ b/routes/article.py @@ -12,11 +12,11 @@ from fastapi import APIRouter, HTTPException, Request from pydantic import BaseModel, Field from typing import Optional, List import re -import httpx from utils.auth_manager import auth_manager from utils.helpers import extract_article_info, parse_article_url from utils.rate_limiter import rate_limiter from utils.webhook import webhook +from utils.http_client import fetch_page router = APIRouter() @@ -56,86 +56,63 @@ async def get_article(article_request: ArticleRequest, request: Request): - `publish_time`: 发布时间戳 - `images`: 文章内的图片列表 """ - # ⭐ 限频检查 client_ip = request.client.host if request.client else "unknown" allowed, error_msg = rate_limiter.check_rate_limit(client_ip, "/api/article") if not allowed: - return { - "success": False, - "error": f"⏱️ {error_msg}" - } - - # 检查认证 + return {"success": False, "error": f"⏱️ {error_msg}"} + credentials = auth_manager.get_credentials() if not credentials: - return { - "success": False, - "error": "服务器未登录,请先访问管理页面扫码登录" - } - - # 准备请求头 - headers = { - "Cookie": credentials["cookie"], - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF XWEB/8391", - "Referer": "https://mp.weixin.qq.com/" - } - + return {"success": False, "error": "服务器未登录,请先访问管理页面扫码登录"} + try: - # 发起HTTP请求 - async with httpx.AsyncClient(timeout=30.0) as client: - response = await client.get(article_request.url, headers=headers, follow_redirects=True) - response.raise_for_status() - html = response.text - - # 检查内容 + html = await fetch_page( + article_request.url, + extra_headers={"Referer": "https://mp.weixin.qq.com/"}, + ) + if "js_content" not in html: - # 检查各种错误情况 if "verify" in html or "验证" in html or "环境异常" in html: - # 🔔 Webhook通知 await webhook.notify('verification_required', { 'url': article_request.url, 'ip': client_ip }) return { "success": False, - "error": "触发微信安全验证。解决方法:1) 在浏览器中打开文章URL完成验证 2) 等待30分钟后重试 3) 降低请求频率" + "error": "触发微信安全验证。解决方法:1) 在浏览器中打开文章URL完成验证 " + "2) 等待30分钟后重试 3) 降低请求频率" } if "请登录" in html: - # 🔔 Webhook通知 await webhook.notify('login_expired', { - 'account': auth_manager.get_nickname(), + 'account': credentials.get('nickname', ''), 'url': article_request.url }) - return { - "success": False, - "error": "登录已失效,请重新扫码登录" - } + return {"success": False, "error": "登录已失效,请重新扫码登录"} return { "success": False, "error": "无法获取文章内容。可能原因:文章被删除、访问受限或需要验证。" } - - # 多种方式尝试提取 URL 参数(__biz, mid, idx, sn) + params = parse_article_url(article_request.url) - + if not params or not params.get('__biz'): location_match = re.search(r'var\s+msg_link\s*=\s*"([^"]+)"', html) if location_match: real_url = location_match.group(1).replace('&', '&') params = parse_article_url(real_url) - + if not params or not params.get('__biz'): href_match = re.search(r'window\.location\.href\s*=\s*"([^"]+)"', html) if href_match: real_url = href_match.group(1).replace('&', '&') params = parse_article_url(real_url) - + if not params or not params.get('__biz'): biz_match = re.search(r'var\s+__biz\s*=\s*"([^"]+)"', html) mid_match = re.search(r'var\s+mid\s*=\s*"([^"]+)"', html) idx_match = re.search(r'var\s+idx\s*=\s*"([^"]+)"', html) sn_match = re.search(r'var\s+sn\s*=\s*"([^"]+)"', html) - + if all([biz_match, mid_match, idx_match, sn_match]): params = { '__biz': biz_match.group(1), @@ -143,30 +120,16 @@ async def get_article(article_request: ArticleRequest, request: Request): 'idx': idx_match.group(1), 'sn': sn_match.group(1) } - + if not params or not params.get('__biz'): params = None - - # 提取文章信息(params可以是None) + article_data = extract_article_info(html, params) - - return { - "success": True, - "data": article_data - } - - except httpx.HTTPStatusError as e: - return { - "success": False, - "error": f"HTTP错误: {e.response.status_code}" - } - except httpx.TimeoutException: - return { - "success": False, - "error": "请求超时,请稍后重试" - } + + return {"success": True, "data": article_data} + except Exception as e: - return { - "success": False, - "error": f"处理请求时发生错误: {str(e)}" - } + error_str = str(e) + if "timeout" in error_str.lower(): + return {"success": False, "error": "请求超时,请稍后重试"} + return {"success": False, "error": f"处理请求时发生错误: {error_str}"} diff --git a/routes/health.py b/routes/health.py index 55efe1d..54b8448 100644 --- a/routes/health.py +++ b/routes/health.py @@ -9,27 +9,22 @@ """ from fastapi import APIRouter -from pydantic import BaseModel router = APIRouter() -class HealthResponse(BaseModel): - """健康检查响应""" - status: str - version: str - framework: str -@router.get("/health", response_model=HealthResponse, summary="健康检查") +@router.get("/health", summary="健康检查") async def health_check(): """ - 检查服务健康状态 - - Returns: - 服务状态信息 + 检查服务健康状态,包括 HTTP 引擎和代理池信息。 """ + from utils.http_client import ENGINE_NAME + from utils.proxy_pool import proxy_pool + return { "status": "healthy", "version": "1.0.0", - "framework": "FastAPI" + "framework": "FastAPI", + "http_engine": ENGINE_NAME, + "proxy_pool": proxy_pool.get_status(), } - diff --git a/routes/rss.py b/routes/rss.py new file mode 100644 index 0000000..916f02d --- /dev/null +++ b/routes/rss.py @@ -0,0 +1,298 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# Copyright (C) 2026 tmwgsicp +# Licensed under the GNU Affero General Public License v3.0 +# See LICENSE file in the project root for full license text. +# SPDX-License-Identifier: AGPL-3.0-only +""" +RSS 订阅路由 +订阅管理 + RSS XML 输出 +""" + +import time +import logging +from datetime import datetime, timezone +from html import escape as html_escape +from urllib.parse import quote +from xml.etree.ElementTree import Element, SubElement, tostring +from typing import Optional + +from fastapi import APIRouter, HTTPException, Query, Request +from fastapi.responses import Response +from pydantic import BaseModel, Field + +from utils import rss_store +from utils.rss_poller import rss_poller, POLL_INTERVAL + +logger = logging.getLogger(__name__) + +router = APIRouter() + + +# ── Pydantic models ────────────────────────────────────── + +class SubscribeRequest(BaseModel): + fakeid: str = Field(..., description="公众号 FakeID") + nickname: str = Field("", description="公众号名称") + alias: str = Field("", description="公众号微信号") + head_img: str = Field("", description="头像 URL") + + +class SubscribeResponse(BaseModel): + success: bool + message: str = "" + + +class SubscriptionItem(BaseModel): + fakeid: str + nickname: str + alias: str + head_img: str + created_at: int + last_poll: int + article_count: int = 0 + rss_url: str = "" + + +class SubscriptionListResponse(BaseModel): + success: bool + data: list = [] + + +class PollerStatusResponse(BaseModel): + success: bool + data: dict = {} + + +# ── 订阅管理 ───────────────────────────────────────────── + +@router.post("/rss/subscribe", response_model=SubscribeResponse, summary="添加 RSS 订阅") +async def subscribe(req: SubscribeRequest, request: Request): + """ + 添加一个公众号到 RSS 订阅列表。 + + 添加后,后台轮询器会定时拉取该公众号的最新文章。 + + **请求体参数:** + - **fakeid** (必填): 公众号 FakeID,通过搜索接口获取 + - **nickname** (可选): 公众号名称 + - **alias** (可选): 公众号微信号 + - **head_img** (可选): 公众号头像 URL + """ + added = rss_store.add_subscription( + fakeid=req.fakeid, + nickname=req.nickname, + alias=req.alias, + head_img=req.head_img, + ) + if added: + logger.info("RSS subscription added: %s (%s)", req.nickname, req.fakeid[:8]) + return SubscribeResponse(success=True, message="订阅成功") + return SubscribeResponse(success=True, message="已订阅,无需重复添加") + + +@router.delete("/rss/subscribe/{fakeid}", response_model=SubscribeResponse, + summary="取消 RSS 订阅") +async def unsubscribe(fakeid: str): + """ + 取消订阅一个公众号,同时删除该公众号的缓存文章。 + + **路径参数:** + - **fakeid**: 公众号 FakeID + """ + removed = rss_store.remove_subscription(fakeid) + if removed: + logger.info("RSS subscription removed: %s", fakeid[:8]) + return SubscribeResponse(success=True, message="已取消订阅") + return SubscribeResponse(success=False, message="未找到该订阅") + + +@router.get("/rss/subscriptions", response_model=SubscriptionListResponse, + summary="获取订阅列表") +async def get_subscriptions(request: Request): + """ + 获取当前所有 RSS 订阅的公众号列表。 + + 返回每个订阅的基本信息、缓存文章数和 RSS 地址。 + """ + subs = rss_store.list_subscriptions() + base_url = str(request.base_url).rstrip("/") + + items = [] + for s in subs: + items.append({ + **s, + "rss_url": f"{base_url}/api/rss/{s['fakeid']}", + }) + + return SubscriptionListResponse(success=True, data=items) + + +@router.post("/rss/poll", response_model=PollerStatusResponse, + summary="手动触发轮询") +async def trigger_poll(): + """ + 手动触发一次轮询,立即拉取所有订阅公众号的最新文章。 + + 通常用于首次订阅后立即获取文章,无需等待下一个轮询周期。 + """ + if not rss_poller.is_running: + return PollerStatusResponse( + success=False, + data={"message": "轮询器未启动"} + ) + try: + await rss_poller.poll_now() + return PollerStatusResponse( + success=True, + data={"message": "轮询完成"} + ) + except Exception as e: + return PollerStatusResponse( + success=False, + data={"message": f"轮询出错: {str(e)}"} + ) + + +@router.get("/rss/status", response_model=PollerStatusResponse, + summary="轮询器状态") +async def poller_status(): + """ + 获取 RSS 轮询器运行状态。 + """ + subs = rss_store.list_subscriptions() + return PollerStatusResponse( + success=True, + data={ + "running": rss_poller.is_running, + "poll_interval": POLL_INTERVAL, + "subscription_count": len(subs), + }, + ) + + +# ── RSS XML 输出 ────────────────────────────────────────── + +def _proxy_cover(url: str, base_url: str) -> str: + """将微信 CDN 封面图地址替换为本服务的图片代理地址""" + if url and "mmbiz.qpic.cn" in url: + return base_url + "/api/image?url=" + quote(url, safe="") + return url + + +def _rfc822(ts: int) -> str: + """Unix 时间戳 → RFC 822 日期字符串""" + if not ts: + return "" + dt = datetime.fromtimestamp(ts, tz=timezone.utc) + return dt.strftime("%a, %d %b %Y %H:%M:%S +0000") + + +def _build_rss_xml(fakeid: str, sub: dict, articles: list, + base_url: str) -> str: + rss = Element("rss", version="2.0") + rss.set("xmlns:atom", "http://www.w3.org/2005/Atom") + + channel = SubElement(rss, "channel") + SubElement(channel, "title").text = sub.get("nickname") or fakeid + SubElement(channel, "link").text = "https://mp.weixin.qq.com" + SubElement(channel, "description").text = ( + f'{sub.get("nickname", "")} 的微信公众号文章 RSS 订阅' + ) + SubElement(channel, "language").text = "zh-CN" + SubElement(channel, "lastBuildDate").text = _rfc822(int(time.time())) + SubElement(channel, "generator").text = "WeChat Download API" + + atom_link = SubElement(channel, "atom:link") + atom_link.set("href", f"{base_url}/api/rss/{fakeid}") + atom_link.set("rel", "self") + atom_link.set("type", "application/rss+xml") + + if sub.get("head_img"): + image = SubElement(channel, "image") + SubElement(image, "url").text = sub["head_img"] + SubElement(image, "title").text = sub.get("nickname", "") + SubElement(image, "link").text = "https://mp.weixin.qq.com" + + for a in articles: + item = SubElement(channel, "item") + SubElement(item, "title").text = a.get("title", "") + + link = a.get("link", "") + SubElement(item, "link").text = link + + guid = SubElement(item, "guid") + guid.text = link + guid.set("isPermaLink", "true") + + if a.get("publish_time"): + SubElement(item, "pubDate").text = _rfc822(a["publish_time"]) + + if a.get("author"): + SubElement(item, "author").text = a["author"] + + cover = _proxy_cover(a.get("cover", ""), base_url) + digest = html_escape(a.get("digest", "")) if a.get("digest") else "" + author = html_escape(a.get("author", "")) if a.get("author") else "" + title_escaped = html_escape(a.get("title", "")) + + html_parts = [] + if cover: + html_parts.append( + f'
' + f'' + f'{title_escaped}' + f'
' + ) + if digest: + html_parts.append( + f'

{digest}

' + ) + if author: + html_parts.append( + f'

' + f'作者: {author}

' + ) + html_parts.append( + f'

' + f'阅读原文 →

' + ) + + SubElement(item, "description").text = "\n".join(html_parts) + + xml_bytes = tostring(rss, encoding="unicode", xml_declaration=False) + return '\n' + xml_bytes + + +@router.get("/rss/{fakeid}", summary="获取 RSS 订阅源", + response_class=Response) +async def get_rss_feed(fakeid: str, request: Request, + limit: int = Query(20, ge=1, le=100, + description="文章数量上限")): + """ + 获取指定公众号的 RSS 2.0 订阅源(XML 格式)。 + + 将此地址添加到任何 RSS 阅读器即可订阅公众号文章。 + + **路径参数:** + - **fakeid**: 公众号 FakeID + + **查询参数:** + - **limit** (可选): 返回文章数量上限,默认 20 + """ + sub = rss_store.get_subscription(fakeid) + if not sub: + raise HTTPException(status_code=404, detail="未找到该订阅,请先添加订阅") + + articles = rss_store.get_articles(fakeid, limit=limit) + base_url = str(request.base_url).rstrip("/") + xml = _build_rss_xml(fakeid, sub, articles, base_url) + + return Response( + content=xml, + media_type="application/rss+xml; charset=utf-8", + headers={"Cache-Control": "public, max-age=600"}, + ) diff --git a/static/admin.html b/static/admin.html index 4033791..cd429a1 100644 --- a/static/admin.html +++ b/static/admin.html @@ -523,6 +523,7 @@
diff --git a/static/rss.html b/static/rss.html new file mode 100644 index 0000000..ae9087f --- /dev/null +++ b/static/rss.html @@ -0,0 +1,616 @@ + + + + + + RSS 订阅管理 - WeChat Download API + + + +
+
+
+

RSS 订阅管理

+

搜索公众号并添加 RSS 订阅,文章自动更新

+
+
+ 返回面板 + +
+
+ +
+
+ 轮询器 + + 检查中... +
+
+
+ +
+

添加订阅

+
+ + +
+
+
+ +
+

我的订阅

+
加载中...
+
+ + +
+ +
+ + + + diff --git a/utils/auth_manager.py b/utils/auth_manager.py index 235df2a..56021d5 100644 --- a/utils/auth_manager.py +++ b/utils/auth_manager.py @@ -13,7 +13,7 @@ import os import time from pathlib import Path from typing import Optional, Dict -from dotenv import load_dotenv, set_key, find_dotenv +from dotenv import load_dotenv, set_key class AuthManager: """认证管理单例类""" diff --git a/utils/http_client.py b/utils/http_client.py new file mode 100644 index 0000000..b0d843a --- /dev/null +++ b/utils/http_client.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# Copyright (C) 2026 tmwgsicp +# Licensed under the GNU Affero General Public License v3.0 +# See LICENSE file in the project root for full license text. +# SPDX-License-Identifier: AGPL-3.0-only +""" +HTTP 客户端封装 +优先使用 curl_cffi(模拟 Chrome TLS 指纹),不可用时自动降级到 httpx。 +支持代理池轮转:当前代理失败 → 尝试下一个 → 全部失败 → 直连兜底。 + +注意:curl_cffi 的 AsyncSession 在部分环境下 SOCKS5 代理不工作, +因此代理场景使用同步 Session + 线程池来规避此问题。 +""" + +import asyncio +import logging +from concurrent.futures import ThreadPoolExecutor +from typing import Optional, Dict + +logger = logging.getLogger(__name__) + +try: + from curl_cffi.requests import Session as CurlSession + HAS_CURL_CFFI = True +except ImportError: + HAS_CURL_CFFI = False + +ENGINE_NAME = "curl_cffi (Chrome TLS)" if HAS_CURL_CFFI else "httpx (fallback)" +logger.info("HTTP engine: %s", ENGINE_NAME) + +BROWSER_HEADERS = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/120.0.0.0 Safari/537.36", + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9," + "image/avif,image/webp,image/apng,*/*;q=0.8", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", + "Accept-Encoding": "gzip, deflate, br", + "Sec-Ch-Ua": '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', + "Sec-Ch-Ua-Mobile": "?0", + "Sec-Ch-Ua-Platform": '"Windows"', + "Sec-Fetch-Dest": "document", + "Sec-Fetch-Mode": "navigate", + "Sec-Fetch-Site": "none", + "Sec-Fetch-User": "?1", + "Upgrade-Insecure-Requests": "1", +} + +MAX_PROXY_RETRIES = 3 +_executor = ThreadPoolExecutor(max_workers=4) + + +async def fetch_page(url: str, extra_headers: Optional[Dict] = None, + timeout: int = 30) -> str: + """ + 获取网页 HTML 内容。 + 请求策略:代理1 → 代理2 → ... → 直连兜底。 + 成功的代理会被标记为健康,失败的会被临时冷却。 + """ + from utils.proxy_pool import proxy_pool + + headers = {**BROWSER_HEADERS} + if extra_headers: + headers.update(extra_headers) + + tried_proxies = [] + for _ in range(min(MAX_PROXY_RETRIES, proxy_pool.count)): + proxy = proxy_pool.next() + if proxy is None or proxy in tried_proxies: + break + tried_proxies.append(proxy) + + logger.info("fetch_page: url=%s proxy=%s", url[:80], proxy) + try: + result = await _do_fetch(url, headers, timeout, proxy) + proxy_pool.mark_ok(proxy) + return result + except Exception as e: + logger.warning("Proxy %s failed: %s", proxy, e) + proxy_pool.mark_failed(proxy) + + logger.info("fetch_page: url=%s proxy=direct (fallback)", url[:80]) + return await _do_fetch(url, headers, timeout, None) + + +async def _do_fetch(url: str, headers: Dict, timeout: int, + proxy: Optional[str]) -> str: + if HAS_CURL_CFFI: + return await _fetch_curl_cffi(url, headers, timeout, proxy) + return await _fetch_httpx(url, headers, timeout, proxy) + + +async def _fetch_curl_cffi(url: str, headers: Dict, timeout: int, + proxy: Optional[str]) -> str: + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + _executor, + _fetch_curl_cffi_sync, url, headers, timeout, proxy + ) + + +def _fetch_curl_cffi_sync(url: str, headers: Dict, timeout: int, + proxy: Optional[str]) -> str: + """同步请求,在线程池中执行。规避 AsyncSession + SOCKS5 代理的兼容性问题。""" + kwargs = {"timeout": timeout, "allow_redirects": True} + if proxy: + kwargs["proxy"] = proxy + with CurlSession(impersonate="chrome120") as session: + resp = session.get(url, headers=headers, **kwargs) + resp.raise_for_status() + return resp.text + + +async def _fetch_httpx(url: str, headers: Dict, timeout: int, + proxy: Optional[str]) -> str: + import httpx + transport_kwargs = {} + if proxy: + transport_kwargs["proxy"] = proxy + async with httpx.AsyncClient(timeout=float(timeout), + follow_redirects=True, + **transport_kwargs) as client: + resp = await client.get(url, headers=headers) + resp.raise_for_status() + return resp.text diff --git a/utils/proxy_pool.py b/utils/proxy_pool.py new file mode 100644 index 0000000..c72e4a7 --- /dev/null +++ b/utils/proxy_pool.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# Copyright (C) 2026 tmwgsicp +# Licensed under the GNU Affero General Public License v3.0 +# See LICENSE file in the project root for full license text. +# SPDX-License-Identifier: AGPL-3.0-only +""" +代理池管理 +支持多 VPS 自建代理(SOCKS5/HTTP)轮转,分散请求 IP。 +失败的代理会被临时标记为不可用,一段时间后自动恢复探测。 + +配置方式(.env): + PROXY_URLS=socks5://ip1:port,http://ip2:port,socks5://user:pass@ip3:port + +留空则不使用代理。 +""" + +import logging +import os +import time +import threading +from typing import Optional, List + +logger = logging.getLogger(__name__) + +FAIL_COOLDOWN = 120 + + +class ProxyPool: + """带健康检测的轮转代理池""" + + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + self._proxies: List[str] = [] + self._index = 0 + self._fail_until: dict[str, float] = {} + self._lock = threading.Lock() + self._load_proxies() + self._initialized = True + + def _load_proxies(self): + raw = os.getenv("PROXY_URLS", "").strip() + if not raw: + logger.info("Proxy pool: no proxies configured (direct connection)") + return + + self._proxies = [p.strip() for p in raw.split(",") if p.strip()] + logger.info("Proxy pool: loaded %d proxies", len(self._proxies)) + + def reload(self): + """从环境变量重新加载代理列表""" + with self._lock: + self._proxies = [] + self._index = 0 + self._fail_until.clear() + self._load_proxies() + + @property + def enabled(self) -> bool: + return len(self._proxies) > 0 + + @property + def count(self) -> int: + return len(self._proxies) + + def next(self) -> Optional[str]: + """获取下一个可用代理(跳过冷却中的),全部不可用时返回 None""" + if not self._proxies: + return None + now = time.time() + with self._lock: + for _ in range(len(self._proxies)): + proxy = self._proxies[self._index % len(self._proxies)] + self._index += 1 + if self._fail_until.get(proxy, 0) <= now: + return proxy + return None + + def get_all(self) -> List[str]: + return list(self._proxies) + + def mark_failed(self, proxy: str): + """标记代理失败,冷却一段时间后自动恢复""" + with self._lock: + self._fail_until[proxy] = time.time() + FAIL_COOLDOWN + logger.warning("Proxy %s marked failed, cooldown %ds", proxy, FAIL_COOLDOWN) + + def mark_ok(self, proxy: str): + """标记代理恢复正常""" + with self._lock: + self._fail_until.pop(proxy, None) + + def get_status(self) -> dict: + """返回代理池状态""" + now = time.time() + healthy = [] + failed = [] + for p in self._proxies: + if self._fail_until.get(p, 0) > now: + failed.append(p) + else: + healthy.append(p) + return { + "enabled": self.enabled, + "total": self.count, + "healthy": len(healthy), + "failed": len(failed), + "failed_proxies": failed, + } + + +proxy_pool = ProxyPool() diff --git a/utils/rate_limiter.py b/utils/rate_limiter.py index 1f44bc1..b816295 100644 --- a/utils/rate_limiter.py +++ b/utils/rate_limiter.py @@ -9,6 +9,7 @@ API限频模块 防止触发微信风控 """ +import os import time from typing import Dict, Optional from collections import deque @@ -17,27 +18,26 @@ import threading class RateLimiter: """ 智能限频器 - + 策略: - 1. 全局限制: 每分钟最多10个请求 - 2. 单IP限制: 每分钟最多5个请求 - 3. 文章获取: 每个文章间隔至少3秒 + 1. 全局限制: 每分钟最多 N 个请求 + 2. 单IP限制: 每分钟最多 N 个请求 + 3. 文章获取: 每个文章间隔至少 N 秒 """ - + def __init__(self): - self._global_requests = deque() # 全局请求记录 - self._ip_requests: Dict[str, deque] = {} # IP请求记录 - self._article_requests = deque() # 文章请求记录 + self._global_requests = deque() + self._ip_requests: Dict[str, deque] = {} + self._article_requests = deque() self._lock = threading.Lock() - - # 限制配置 - self.GLOBAL_WINDOW = 60 # 全局窗口60秒 - self.GLOBAL_LIMIT = 10 # 全局限制10个请求/分钟 - - self.IP_WINDOW = 60 # IP窗口60秒 - self.IP_LIMIT = 5 # 单IP限制5个请求/分钟 - - self.ARTICLE_INTERVAL = 3 # 文章获取间隔3秒 + + self.GLOBAL_WINDOW = 60 + self.GLOBAL_LIMIT = int(os.getenv("RATE_LIMIT_GLOBAL", "10")) + + self.IP_WINDOW = 60 + self.IP_LIMIT = int(os.getenv("RATE_LIMIT_PER_IP", "5")) + + self.ARTICLE_INTERVAL = int(os.getenv("RATE_LIMIT_ARTICLE_INTERVAL", "3")) def check_rate_limit(self, ip: str, endpoint: str) -> tuple[bool, Optional[str]]: """ diff --git a/utils/rss_poller.py b/utils/rss_poller.py new file mode 100644 index 0000000..794278a --- /dev/null +++ b/utils/rss_poller.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# Copyright (C) 2026 tmwgsicp +# Licensed under the GNU Affero General Public License v3.0 +# See LICENSE file in the project root for full license text. +# SPDX-License-Identifier: AGPL-3.0-only +""" +RSS 后台轮询器 +定时通过公众号后台 API 拉取订阅号的最新文章列表并缓存到 SQLite。 +仅获取标题、摘要、封面等元数据,不访问文章页面,零风控风险。 +""" + +import asyncio +import json +import logging +import os +from typing import List, Dict, Optional + +import httpx + +from utils.auth_manager import auth_manager +from utils import rss_store + +logger = logging.getLogger(__name__) + +POLL_INTERVAL = int(os.getenv("RSS_POLL_INTERVAL", "3600")) +ARTICLES_PER_POLL = 10 + + +class RSSPoller: + """后台轮询单例""" + + _instance = None + _task: Optional[asyncio.Task] = None + _running = False + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + async def start(self): + if self._running: + return + self._running = True + self._task = asyncio.create_task(self._loop()) + logger.info("RSS poller started (interval=%ds)", POLL_INTERVAL) + + async def stop(self): + self._running = False + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + logger.info("RSS poller stopped") + + @property + def is_running(self) -> bool: + return self._running + + async def _loop(self): + while self._running: + try: + await self._poll_all() + except Exception as e: + logger.error("RSS poll cycle error: %s", e, exc_info=True) + await asyncio.sleep(POLL_INTERVAL) + + async def _poll_all(self): + fakeids = rss_store.get_all_fakeids() + if not fakeids: + return + + creds = auth_manager.get_credentials() + if not creds or not creds.get("token") or not creds.get("cookie"): + logger.warning("RSS poll skipped: not logged in") + return + + logger.info("RSS poll: checking %d subscriptions", len(fakeids)) + + for fakeid in fakeids: + try: + articles = await self._fetch_article_list(fakeid, creds) + if articles: + new_count = rss_store.save_articles(fakeid, articles) + if new_count > 0: + logger.info("RSS: %d new articles for %s", new_count, fakeid[:8]) + rss_store.update_last_poll(fakeid) + except Exception as e: + logger.error("RSS poll error for %s: %s", fakeid[:8], e) + await asyncio.sleep(3) + + async def _fetch_article_list(self, fakeid: str, creds: Dict) -> List[Dict]: + params = { + "sub": "list", + "search_field": "null", + "begin": 0, + "count": ARTICLES_PER_POLL, + "query": "", + "fakeid": fakeid, + "type": "101_1", + "free_publish_type": 1, + "sub_action": "list_ex", + "token": creds["token"], + "lang": "zh_CN", + "f": "json", + "ajax": 1, + } + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", + "Referer": "https://mp.weixin.qq.com/", + "Cookie": creds["cookie"], + } + + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.get( + "https://mp.weixin.qq.com/cgi-bin/appmsgpublish", + params=params, + headers=headers, + ) + resp.raise_for_status() + result = resp.json() + + base_resp = result.get("base_resp", {}) + if base_resp.get("ret") != 0: + logger.warning("WeChat API error for %s: ret=%s", + fakeid[:8], base_resp.get("ret")) + return [] + + publish_page = result.get("publish_page", {}) + if isinstance(publish_page, str): + try: + publish_page = json.loads(publish_page) + except (json.JSONDecodeError, ValueError): + return [] + + if not isinstance(publish_page, dict): + return [] + + articles = [] + for item in publish_page.get("publish_list", []): + publish_info = item.get("publish_info", {}) + if isinstance(publish_info, str): + try: + publish_info = json.loads(publish_info) + except (json.JSONDecodeError, ValueError): + continue + if not isinstance(publish_info, dict): + continue + for a in publish_info.get("appmsgex", []): + articles.append({ + "aid": a.get("aid", ""), + "title": a.get("title", ""), + "link": a.get("link", ""), + "digest": a.get("digest", ""), + "cover": a.get("cover", ""), + "author": a.get("author", ""), + "publish_time": a.get("update_time", 0), + }) + return articles + + async def poll_now(self): + """手动触发一次轮询""" + await self._poll_all() + + +rss_poller = RSSPoller() diff --git a/utils/rss_store.py b/utils/rss_store.py new file mode 100644 index 0000000..6b13f59 --- /dev/null +++ b/utils/rss_store.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# Copyright (C) 2026 tmwgsicp +# Licensed under the GNU Affero General Public License v3.0 +# See LICENSE file in the project root for full license text. +# SPDX-License-Identifier: AGPL-3.0-only +""" +RSS 数据存储 — SQLite +管理订阅列表和文章缓存 +""" + +import sqlite3 +import time +import logging +from pathlib import Path +from typing import List, Dict, Optional + +logger = logging.getLogger(__name__) + +DB_PATH = Path(__file__).parent.parent / "data" / "rss.db" + + +def _get_conn() -> sqlite3.Connection: + DB_PATH.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(DB_PATH), check_same_thread=False) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA foreign_keys=ON") + return conn + + +def init_db(): + """建表(幂等)""" + conn = _get_conn() + conn.executescript(""" + CREATE TABLE IF NOT EXISTS subscriptions ( + fakeid TEXT PRIMARY KEY, + nickname TEXT NOT NULL DEFAULT '', + alias TEXT NOT NULL DEFAULT '', + head_img TEXT NOT NULL DEFAULT '', + created_at INTEGER NOT NULL, + last_poll INTEGER NOT NULL DEFAULT 0 + ); + + CREATE TABLE IF NOT EXISTS articles ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + fakeid TEXT NOT NULL, + aid TEXT NOT NULL DEFAULT '', + title TEXT NOT NULL DEFAULT '', + link TEXT NOT NULL DEFAULT '', + digest TEXT NOT NULL DEFAULT '', + cover TEXT NOT NULL DEFAULT '', + author TEXT NOT NULL DEFAULT '', + content TEXT NOT NULL DEFAULT '', + plain_content TEXT NOT NULL DEFAULT '', + publish_time INTEGER NOT NULL DEFAULT 0, + fetched_at INTEGER NOT NULL, + UNIQUE(fakeid, link), + FOREIGN KEY (fakeid) REFERENCES subscriptions(fakeid) ON DELETE CASCADE + ); + + CREATE INDEX IF NOT EXISTS idx_articles_fakeid_time + ON articles(fakeid, publish_time DESC); + """) + conn.commit() + conn.close() + logger.info("RSS database initialized: %s", DB_PATH) + + +# ── 订阅管理 ───────────────────────────────────────────── + +def add_subscription(fakeid: str, nickname: str = "", + alias: str = "", head_img: str = "") -> bool: + conn = _get_conn() + try: + conn.execute( + "INSERT OR IGNORE INTO subscriptions " + "(fakeid, nickname, alias, head_img, created_at) VALUES (?,?,?,?,?)", + (fakeid, nickname, alias, head_img, int(time.time())), + ) + conn.commit() + return conn.total_changes > 0 + finally: + conn.close() + + +def remove_subscription(fakeid: str) -> bool: + conn = _get_conn() + try: + conn.execute("DELETE FROM subscriptions WHERE fakeid=?", (fakeid,)) + conn.commit() + return conn.total_changes > 0 + finally: + conn.close() + + +def list_subscriptions() -> List[Dict]: + conn = _get_conn() + try: + rows = conn.execute( + "SELECT s.*, " + "(SELECT COUNT(*) FROM articles a WHERE a.fakeid=s.fakeid) AS article_count " + "FROM subscriptions s ORDER BY s.created_at DESC" + ).fetchall() + return [dict(r) for r in rows] + finally: + conn.close() + + +def get_subscription(fakeid: str) -> Optional[Dict]: + conn = _get_conn() + try: + row = conn.execute( + "SELECT * FROM subscriptions WHERE fakeid=?", (fakeid,) + ).fetchone() + return dict(row) if row else None + finally: + conn.close() + + +def update_last_poll(fakeid: str): + conn = _get_conn() + try: + conn.execute( + "UPDATE subscriptions SET last_poll=? WHERE fakeid=?", + (int(time.time()), fakeid), + ) + conn.commit() + finally: + conn.close() + + +# ── 文章缓存 ───────────────────────────────────────────── + +def save_articles(fakeid: str, articles: List[Dict]) -> int: + """批量保存文章,返回新增数量""" + conn = _get_conn() + inserted = 0 + try: + for a in articles: + try: + conn.execute( + "INSERT OR IGNORE INTO articles " + "(fakeid, aid, title, link, digest, cover, author, " + "content, plain_content, publish_time, fetched_at) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?)", + ( + fakeid, + a.get("aid", ""), + a.get("title", ""), + a.get("link", ""), + a.get("digest", ""), + a.get("cover", ""), + a.get("author", ""), + a.get("content", ""), + a.get("plain_content", ""), + a.get("publish_time", 0), + int(time.time()), + ), + ) + if conn.total_changes: + inserted += 1 + except sqlite3.IntegrityError: + pass + conn.commit() + return inserted + finally: + conn.close() + + +def get_articles(fakeid: str, limit: int = 20) -> List[Dict]: + conn = _get_conn() + try: + rows = conn.execute( + "SELECT * FROM articles WHERE fakeid=? " + "ORDER BY publish_time DESC LIMIT ?", + (fakeid, limit), + ).fetchall() + return [dict(r) for r in rows] + finally: + conn.close() + + +def get_all_fakeids() -> List[str]: + conn = _get_conn() + try: + rows = conn.execute("SELECT fakeid FROM subscriptions").fetchall() + return [r["fakeid"] for r in rows] + finally: + conn.close() + +