Compare commits

...

5 Commits
v1.0.2 ... main

Author SHA1 Message Date
tmwgsicp 829ae4d0c0 feat: add proactive login expiry monitoring
Add login_reminder module to check credential expiration every 6 hours and send webhook alerts (24h/6h before expiry). Fix expire_time from 30 days to 4 days to align with WeChat actual validity.

Made-with: Cursor
2026-03-31 14:26:03 +08:00
tmwgsicp 8d90743584 fix(docker): resolve proxy pool configuration not loading in Docker deployment
Problem:
Docker uses 'uvicorn app:app' command which skips the if __name__ == '__main__'
block, causing load_dotenv() never executed and PROXY_URLS from .env not loaded.

Solution:
Move load_dotenv() to module level in app.py to ensure .env is loaded for all
startup methods (python app.py, uvicorn app:app, docker-compose).

Changes:
- Add module-level load_dotenv() in app.py
- Update Dockerfile version 1.0.4 -> 1.0.5
- Improve audio content display UI
- Add docs/ and scripts/ to .gitignore

Made-with: Cursor
2026-03-29 20:34:08 +08:00
tmwgsicp 9cfa0ac5b1 fix(docker): resolve credentials save permission issue on NAS platforms
Made-with: Cursor
2026-03-25 10:08:42 +08:00
tmwgsicp ad62e8b8bb fix: use SITE_URL and X-Forwarded headers for RSS URLs in reverse proxy
- Add get_base_url() helper function to detect HTTPS reverse proxy
- Prioritize SITE_URL env var over request.base_url
- Support X-Forwarded-Proto and X-Forwarded-Host headers
- Fixes RSS URL showing http:// instead of https:// behind reverse proxy

Fixes #6

Made-with: Cursor
2026-03-24 23:38:44 +08:00
tmwgsicp f9968a4e0d fix: run container as root to resolve volume permission issue
- Remove non-root user (appuser) to eliminate SQLite database creation failures
- Users no longer need to manually create data directory or fix permissions
- Version bump to 1.0.3

Fixes #5

Made-with: Cursor
2026-03-24 20:02:45 +08:00
13 changed files with 349 additions and 51 deletions

4
.gitignore vendored
View File

@ -67,3 +67,7 @@ data/
# SaaS 版本(独立仓库管理)
saas/
# 个人文档和脚本(不提交)
docs/
scripts/

View File

@ -21,15 +21,14 @@ FROM python:3.11-slim
LABEL maintainer="tmwgsicp"
LABEL description="WeChat Official Account Article Download API with RSS Support"
LABEL version="1.0.0"
LABEL version="1.0.5"
WORKDIR /app
# Install runtime dependencies (curl for healthcheck)
RUN apt-get update && apt-get install -y --no-install-recommends \
curl \
&& rm -rf /var/lib/apt/lists/* \
&& useradd -m -u 1000 appuser
&& rm -rf /var/lib/apt/lists/*
# Copy wheels from builder and install
COPY --from=builder /app/wheels /wheels
@ -38,11 +37,8 @@ RUN pip install --no-cache-dir /wheels/* && rm -rf /wheels
# Copy application code
COPY . .
# Create data directory for SQLite and set permissions
RUN mkdir -p /app/data && chown -R appuser:appuser /app
# Switch to non-root user
USER appuser
# Create data directory
RUN mkdir -p /app/data
# Environment variables with sensible defaults
ENV PYTHONUNBUFFERED=1 \

View File

@ -27,7 +27,7 @@
- **公众号搜索** — 按名称搜索公众号,获取 FakeID
- **扫码登录** — 微信公众平台扫码登录凭证自动保存4 天有效期
- **图片代理** — 代理微信 CDN 图片,解决防盗链问题
- **Webhook 通知** — 登录过期、触发验证等事件自动推送(支持企业微信机器人)
- **Webhook 通知** — 登录过期提醒提前24h/6h预警+已过期通知)、触发验证等事件自动推送(支持企业微信机器人)
- **API 文档** — 自动生成 Swagger UI / ReDoc在线调试所有接口
<div align="center">
@ -361,7 +361,9 @@ cp env.example .env
| `WECHAT_TOKEN` | 微信 Token登录后自动填充 | - |
| `WECHAT_COOKIE` | 微信 Cookie登录后自动填充 | - |
| `WECHAT_FAKEID` | 公众号 FakeID登录后自动填充 | - |
| `WEBHOOK_URL` | Webhook 通知地址(可选) | 空 |
| `WECHAT_EXPIRE_TIME` | 凭证过期时间(登录后自动填充) | - |
| `WEBHOOK_URL` | Webhook 通知地址(支持企业微信机器人) | 空 |
| `WEBHOOK_NOTIFICATION_INTERVAL` | 同一事件通知最小间隔(秒) | 300 |
| `RATE_LIMIT_GLOBAL` | 全局每分钟请求上限 | 10 |
| `RATE_LIMIT_PER_IP` | 单 IP 每分钟请求上限 | 5 |
| `RATE_LIMIT_ARTICLE_INTERVAL` | 文章请求最小间隔(秒) | 3 |
@ -493,6 +495,7 @@ PROXY_URLS=socks5://myuser:mypass@vps1-ip:1080,socks5://myuser:mypass@vps2-ip:10
│ ├── rate_limiter.py # 限频器
│ ├── rss_store.py # RSS 数据存储SQLite
│ ├── rss_poller.py # RSS 后台轮询器
│ ├── login_reminder.py # 登录过期提醒(主动检测)
│ ├── content_processor.py # 内容处理与图片代理
│ ├── image_proxy.py # 图片URL代理工具
│ ├── article_fetcher.py # 批量并发获取文章
@ -540,9 +543,14 @@ PROXY_URLS=socks5://myuser:mypass@vps1-ip:1080,socks5://myuser:mypass@vps2-ip:10
</details>
<details>
<summary><b>Token 多久过期</b></summary>
<summary><b>Token 多久过期?如何提前知道?</b></summary>
Cookie 登录有效期约 4 天,过期后需重新扫码登录。配置 `WEBHOOK_URL` 可以在过期时收到通知。
Cookie 登录有效期约 4 天,系统会:
1. 前端显示到期时间(`/api/admin/status` 接口返回 `expireTime``isExpired` 字段)
2. **后台每 6 小时主动检测**,提前 24h / 6h 通过 Webhook 预警
3. 过期后立即通过 Webhook 通知
配置 `WEBHOOK_URL`(支持企业微信群机器人)可收到实时提醒,避免因凭证过期导致 RSS 轮询失败或搜索功能不可用。
</details>
<details>

10
app.py
View File

@ -10,6 +10,9 @@
"""
from contextlib import asynccontextmanager
from dotenv import load_dotenv
load_dotenv()
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
@ -56,7 +59,14 @@ async def lifespan(app: FastAPI):
init_db()
await rss_poller.start()
# 启动登录过期提醒器(自动检测凭证有效期并 webhook 通知)
from utils.login_reminder import login_reminder
await login_reminder.start()
yield
await login_reminder.stop()
await rss_poller.stop()

View File

@ -6,6 +6,11 @@
# 2. Edit .env and set SITE_URL to your actual URL
# 3. Run: docker-compose up -d
# 4. Visit http://localhost:5000/login.html to scan QR code
#
# Note for NAS users (Synology/QNAP):
# If you encounter permission issues, run on NAS:
# - chmod -R 777 ./data
# - Credentials are automatically saved to ./data directory
services:
wechat-api:
@ -17,10 +22,10 @@ services:
ports:
- "5000:5000"
volumes:
# Persist SQLite database
# Persist SQLite database and credentials
- ./data:/app/data
# Config file (writable - login saves credentials here)
- ./.env:/app/.env
# Config file (read-only - credentials saved to data/)
- ./.env:/app/.env:ro
environment:
- TZ=Asia/Shanghai
healthcheck:

View File

@ -497,8 +497,8 @@ async def biz_login(request: Request):
import traceback
traceback.print_exc()
# 计算过期时间(30天后
expire_time = int((time.time() + 30 * 24 * 3600) * 1000)
# 计算过期时间(4天后与微信实际有效期一致
expire_time = int((time.time() + 4 * 24 * 3600) * 1000)
# 保存凭证
auth_manager.save_credentials(

View File

@ -11,6 +11,7 @@ RSS 订阅路由
import csv
import io
import os
import time
import logging
from datetime import datetime, timezone
@ -28,6 +29,23 @@ from utils.image_proxy import proxy_image_url
logger = logging.getLogger(__name__)
def get_base_url(request: Request) -> str:
"""
获取服务的基础 URL优先使用环境变量 SITE_URL
支持反向代理检测 X-Forwarded-Proto X-Forwarded-Host
"""
# 优先使用配置的 SITE_URL
site_url = os.getenv("SITE_URL", "").strip()
if site_url:
return site_url.rstrip("/")
# 检测反向代理头部
proto = request.headers.get("X-Forwarded-Proto", "http")
host = request.headers.get("X-Forwarded-Host") or request.headers.get("Host", "localhost:5000")
return f"{proto}://{host}"
router = APIRouter()
@ -118,7 +136,7 @@ async def get_subscriptions(request: Request):
返回每个订阅的基本信息缓存文章数和 RSS 地址
"""
subs = rss_store.list_subscriptions()
base_url = str(request.base_url).rstrip("/")
base_url = get_base_url(request)
items = []
for s in subs:
@ -195,7 +213,7 @@ async def get_aggregated_rss_feed(
articles = rss_store.get_all_articles(limit=limit) if subs else []
base_url = str(request.base_url).rstrip("/")
base_url = get_base_url(request)
xml = _build_aggregated_rss_xml(articles, nickname_map, base_url)
return Response(
content=xml,
@ -218,7 +236,7 @@ async def export_subscriptions(
- **opml**: 标准 OPML 格式可直接导入 RSS 阅读器
"""
subs = rss_store.list_subscriptions()
base_url = str(request.base_url).rstrip("/")
base_url = get_base_url(request)
if format == "opml":
return _build_opml_response(subs, base_url)
@ -448,7 +466,7 @@ async def get_rss_feed(fakeid: str, request: Request,
raise HTTPException(status_code=404, detail="未找到该订阅,请先添加订阅")
articles = rss_store.get_articles(fakeid, limit=limit)
base_url = str(request.base_url).rstrip("/")
base_url = get_base_url(request)
xml = _build_rss_xml(fakeid, sub, articles, base_url)
return Response(

View File

@ -8,6 +8,7 @@
搜索路由 - FastAPI版本
"""
import os
from fastapi import APIRouter, Query, Request
from pydantic import BaseModel
from typing import Optional, List
@ -18,6 +19,21 @@ from utils.image_proxy import proxy_image_url
router = APIRouter()
def get_base_url(request: Request) -> str:
"""
获取服务的基础 URL优先使用环境变量 SITE_URL
支持反向代理检测 X-Forwarded-Proto X-Forwarded-Host
"""
site_url = os.getenv("SITE_URL", "").strip()
if site_url:
return site_url.rstrip("/")
proto = request.headers.get("X-Forwarded-Proto", "http")
host = request.headers.get("X-Forwarded-Host") or request.headers.get("Host", "localhost:5000")
return f"{proto}://{host}"
class Account(BaseModel):
"""公众号模型"""
id: str
@ -80,7 +96,7 @@ async def search_accounts(query: str = Query(..., description="公众号名称
accounts = result.get("list", [])
# 获取 base_url 用于图片代理
base_url = str(request.base_url).rstrip("/") if request else ""
base_url = get_base_url(request) if request else ""
# 格式化返回数据
formatted_accounts = []

View File

@ -34,12 +34,31 @@ class AuthManager:
self.base_dir = Path(__file__).parent.parent
self.env_path = self.base_dir / ".env"
# Docker环境下的凭证文件存储在data目录权限更可靠
self.credentials_file = self.base_dir / "data" / ".credentials.json"
# 加载环境变量
self._load_credentials()
self._initialized = True
def _load_credentials(self):
"""从.env文件加载凭证"""
"""
从多个来源加载凭证优先级
1. data/.credentials.json (Docker环境推荐)
2. .env 文件 (本地部署)
3. 环境变量
"""
# 先尝试从 JSON 凭证文件加载Docker 环境)
if self.credentials_file.exists():
try:
import json
with open(self.credentials_file, 'r', encoding='utf-8') as f:
self.credentials = json.load(f)
return
except Exception as e:
print(f"Warning: Failed to load credentials from {self.credentials_file}: {e}")
# 回退到 .env 文件(本地部署)
if self.env_path.exists():
load_dotenv(self.env_path, override=True)
@ -54,7 +73,9 @@ class AuthManager:
def save_credentials(self, token: str, cookie: str, fakeid: str,
nickname: str, expire_time: int) -> bool:
"""
保存凭证到.env文件
保存凭证支持双存储策略
1. 优先保存到 data/.credentials.json (Docker环境推荐权限可靠)
2. 同时尝试保存到 .env (本地部署兼容)
Args:
token: 微信Token
@ -66,21 +87,33 @@ class AuthManager:
Returns:
保存是否成功
"""
# 更新内存中的凭证
self.credentials.update({
"token": token,
"cookie": cookie,
"fakeid": fakeid,
"nickname": nickname,
"expire_time": expire_time
})
success = False
# 策略1: 保存到 data/.credentials.json (Docker 环境优先)
try:
import json
self.credentials_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.credentials_file, 'w', encoding='utf-8') as f:
json.dump(self.credentials, f, indent=2, ensure_ascii=False)
print(f"[OK] 凭证已保存到: {self.credentials_file}")
success = True
except Exception as e:
print(f"[WARN] 无法保存到凭证文件: {e}")
# 策略2: 同时尝试保存到 .env 文件(本地部署兼容)
try:
# 更新内存中的凭证
self.credentials.update({
"token": token,
"cookie": cookie,
"fakeid": fakeid,
"nickname": nickname,
"expire_time": expire_time
})
# 确保.env文件存在
if not self.env_path.exists():
self.env_path.touch()
# 保存到.env文件
env_file = str(self.env_path)
set_key(env_file, "WECHAT_TOKEN", token)
set_key(env_file, "WECHAT_COOKIE", cookie)
@ -88,11 +121,17 @@ class AuthManager:
set_key(env_file, "WECHAT_NICKNAME", nickname)
set_key(env_file, "WECHAT_EXPIRE_TIME", str(expire_time))
print(f"✅ 凭证已保存到: {self.env_path}")
return True
print(f"[OK] 凭证已同步到: {self.env_path}")
success = True
except Exception as e:
print(f"❌ 保存凭证失败: {e}")
print(f"[WARN] 无法写入 .env 文件 (Docker环境正常): {e}")
# Docker 环境下 .env 可能只读,不影响功能
if not success:
print(f"[ERROR] 凭证保存完全失败")
return False
return True
def get_credentials(self) -> Optional[Dict[str, any]]:
"""
@ -155,7 +194,7 @@ class AuthManager:
def clear_credentials(self) -> bool:
"""
清除凭证
清除凭证双存储都清除
Returns:
清除是否成功
@ -178,12 +217,20 @@ class AuthManager:
for key in env_keys:
os.environ.pop(key, None)
# 删除凭证文件
if self.credentials_file.exists():
self.credentials_file.unlink()
print(f"[OK] 凭证文件已删除: {self.credentials_file}")
# 清空 .env 文件中的凭证字段(保留其他配置)
if self.env_path.exists():
env_file = str(self.env_path)
for key in env_keys:
set_key(env_file, key, "")
print(f"✅ 凭证已清除: {self.env_path}")
try:
if self.env_path.exists():
env_file = str(self.env_path)
for key in env_keys:
set_key(env_file, key, "")
print(f"[OK] .env 凭证已清除: {self.env_path}")
except Exception as e:
print(f"[WARN] 无法清除 .env 文件 (Docker环境正常): {e}")
return True
except Exception as e:

View File

@ -105,15 +105,22 @@ def extract_content(html: str) -> str:
Extract article body, trying multiple container patterns.
Different WeChat account types (government, media, personal) use
different HTML structures. We try them in order of specificity.
For image-text messages (item_show_type=8) and short posts (item_show_type=10),
delegates to helpers.
For image-text messages (item_show_type=8), short posts (item_show_type=10),
and audio share pages (item_show_type=7), delegates to helpers.
"""
from utils.helpers import (
is_image_text_message, _extract_image_text_content,
is_short_content_message, _extract_short_content,
is_audio_message, _extract_audio_content,
get_item_show_type, _extract_audio_share_content,
)
# Check for audio/video share pages (item_show_type=7) FIRST
# These pages use Vue apps and have no js_content div
if get_item_show_type(html) == '7':
result = _extract_audio_share_content(html)
return result.get('content', '')
if is_image_text_message(html):
result = _extract_image_text_content(html)
return result.get('content', '')

View File

@ -361,12 +361,14 @@ def _extract_audio_content(html: str) -> Dict:
dur_str = f' ({minutes}:{seconds:02d})'
display_name = audio['name'] or f'Audio {i + 1}'
# 友好提示音频需要微信鉴权不提供无法播放的URL
html_parts.append(
f'<div style="margin:12px 0;padding:12px 16px;background:#f6f6f6;border-radius:8px">'
f'<p style="margin:0 0 4px;font-size:15px;font-weight:500">'
f'{html_module.escape(display_name)}{dur_str}</p>'
f'<a href="{audio["url"]}" style="color:#1890ff;font-size:14px">'
f'[Play Audio / Click to Listen]</a>'
f'<div style="margin:12px 0;padding:12px 16px;background:#fff9e6;'
f'border-left:4px solid #fa8c16;border-radius:4px">'
f'<p style="margin:0 0 4px;font-size:14px;color:#595959;font-weight:500">'
f'音频内容: {html_module.escape(display_name)}{dur_str}</p>'
f'<p style="margin:0;font-size:13px;color:#8c8c8c">'
f'此文章包含音频,需要在微信中查看完整内容</p>'
f'</div>'
)
@ -428,6 +430,22 @@ def _extract_audio_share_content(html: str) -> Dict:
# 生成内容
content_parts = []
# 标题(如果有)
if title:
content_parts.append(
f'<div style="margin:20px 0;text-align:center">'
f'<h2 style="margin:0;font-size:22px;font-weight:600;color:#262626">{title}</h2>'
f'</div>'
)
# 作者(如果有)
if author:
content_parts.append(
f'<div style="margin:12px 0;text-align:center">'
f'<p style="margin:0;font-size:14px;color:#8c8c8c">作者: {author}</p>'
f'</div>'
)
# 封面图
if images:
for img_url in images:
@ -682,6 +700,8 @@ def get_unavailable_reason(html: str) -> Optional[str]:
return None
# 真正的不可用标记静态HTML中的明确文字
# 注意微信的正常文章HTML中可能在JS代码里包含"已删除"/"违规"等字符串
# 需要确保这些关键字是在实际内容中而不是在JS字符串字面量中
markers = [
("该内容已被发布者删除", "已被发布者删除"),
("内容已删除", "已被发布者删除"),
@ -694,6 +714,21 @@ def get_unavailable_reason(html: str) -> Optional[str]:
]
for keyword, reason in markers:
if keyword in html:
# 额外验证如果HTML很大(>1MB) 且有真实的内容容器,
# 说明是正常文章,"已删除"/"违规"可能只是JS代码中的字符串
if len(html) > 1000000:
has_real_content = (
'id="js_content"' in html or
'class="rich_media_content' in html
)
if has_real_content:
# 进一步确认:检查关键字是否在 <body> 的前10KB可见区域
# 如果只在后面的 <script> 中出现,跳过
import re
body_match = re.search(r'<body[^>]*>(.*?)(?:<script|$)', html[:50000], re.DOTALL | re.IGNORECASE)
if body_match and keyword not in body_match.group(1):
# 关键字不在body前部可能是JS代码跳过此marker
continue
return reason
# 特殊处理:"该内容暂时无法查看"独立页面

150
utils/login_reminder.py Normal file
View File

@ -0,0 +1,150 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (C) 2026 tmwgsicp
# Licensed under the GNU Affero General Public License v3.0
# See LICENSE file in the project root for full license text.
# SPDX-License-Identifier: AGPL-3.0-only
"""
登录过期提醒开源版
定期检查本地微信登录凭证过期状态提前 webhook 通知
"""
import asyncio
import logging
import time
from typing import Optional
from utils.webhook import webhook
logger = logging.getLogger(__name__)
class LoginReminder:
"""登录过期提醒管理器(开源版单账号架构)"""
def __init__(self):
self.check_interval = 6 * 3600 # 每 6 小时检查一次
self.warning_threshold = 24 * 3600 # 提前 24 小时预警
self.critical_threshold = 6 * 3600 # 提前 6 小时严重警告
self._running = False
self._task: Optional[asyncio.Task] = None
self._last_warning_level = None # 记录最后一次警告级别,避免重复
async def start(self):
"""启动提醒服务"""
if self._running:
logger.warning("登录提醒服务已在运行")
return
self._running = True
self._task = asyncio.create_task(self._run())
logger.info("登录提醒服务已启动,检查间隔: %d", self.check_interval)
async def stop(self):
"""停止提醒服务"""
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("登录提醒服务已停止")
async def _run(self):
"""后台任务循环"""
while self._running:
try:
await self._check_login_status()
except Exception as e:
logger.error("检查登录状态失败: %s", e, exc_info=True)
await asyncio.sleep(self.check_interval)
async def _check_login_status(self):
"""检查本地登录凭证的过期状态"""
from utils.auth_manager import auth_manager
# 获取凭证信息
creds = auth_manager.get_credentials()
if not creds or not creds.get("token"):
logger.debug("无登录凭证,跳过检查")
return
expire_time = creds.get("expire_time", 0)
if expire_time <= 0:
logger.debug("凭证无过期时间,跳过检查")
return
nickname = creds.get("nickname", "未知账号")
now = int(time.time() * 1000) # 毫秒时间戳
time_left_ms = expire_time - now
time_left_sec = time_left_ms / 1000
# 已过期
if time_left_sec <= 0:
if self._last_warning_level != 'expired':
await self._notify_expired(nickname)
self._last_warning_level = 'expired'
return
# 严重警告6 小时内过期)
if time_left_sec <= self.critical_threshold:
if self._last_warning_level not in ['critical', 'expired']:
await self._notify_critical(nickname, time_left_sec)
self._last_warning_level = 'critical'
return
# 一般警告24 小时内过期)
if time_left_sec <= self.warning_threshold:
if self._last_warning_level not in ['warning', 'critical', 'expired']:
await self._notify_warning(nickname, time_left_sec)
self._last_warning_level = 'warning'
return
# 状态正常,重置警告级别
if self._last_warning_level is not None:
self._last_warning_level = None
logger.info("登录状态已恢复正常: %s", nickname)
async def _notify_warning(self, nickname: str, time_left: float):
"""发送一般警告通知"""
hours = time_left / 3600
logger.warning(
"登录凭证即将过期 [%s] - 剩余 %.1f 小时",
nickname, hours
)
await webhook.notify('login_expiring_soon', {
'nickname': nickname,
'hours_left': round(hours, 1),
'level': 'warning',
'message': f'登录凭证将在 {round(hours, 1)} 小时后过期,请及时重新登录',
})
async def _notify_critical(self, nickname: str, time_left: float):
"""发送严重警告通知"""
hours = time_left / 3600
logger.error(
"登录凭证即将过期(紧急)[%s] - 剩余 %.1f 小时",
nickname, hours
)
await webhook.notify('login_expiring_critical', {
'nickname': nickname,
'hours_left': round(hours, 1),
'level': 'critical',
'message': f'登录凭证将在 {round(hours, 1)} 小时后过期(紧急),请立即重新登录',
})
async def _notify_expired(self, nickname: str):
"""发送已过期通知"""
logger.error("登录凭证已过期 [%s]", nickname)
await webhook.notify('login_expired', {
'nickname': nickname,
'message': '登录凭证已过期API 功能将受限,请重新登录',
})
# 全局单例
login_reminder = LoginReminder()

View File

@ -21,6 +21,8 @@ logger = logging.getLogger("webhook")
EVENT_LABELS = {
"login_success": "登录成功",
"login_expired": "登录过期",
"login_expiring_soon": "登录即将过期",
"login_expiring_critical": "登录即将过期(紧急)",
"verification_required": "触发验证",
"content_fetch_failed": "文章内容获取失败",
}