Compare commits
5 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
829ae4d0c0 | |
|
|
8d90743584 | |
|
|
9cfa0ac5b1 | |
|
|
ad62e8b8bb | |
|
|
f9968a4e0d |
|
|
@ -67,3 +67,7 @@ data/
|
|||
|
||||
# SaaS 版本(独立仓库管理)
|
||||
saas/
|
||||
|
||||
# 个人文档和脚本(不提交)
|
||||
docs/
|
||||
scripts/
|
||||
|
|
|
|||
12
Dockerfile
12
Dockerfile
|
|
@ -21,15 +21,14 @@ FROM python:3.11-slim
|
|||
|
||||
LABEL maintainer="tmwgsicp"
|
||||
LABEL description="WeChat Official Account Article Download API with RSS Support"
|
||||
LABEL version="1.0.0"
|
||||
LABEL version="1.0.5"
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Install runtime dependencies (curl for healthcheck)
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
curl \
|
||||
&& rm -rf /var/lib/apt/lists/* \
|
||||
&& useradd -m -u 1000 appuser
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Copy wheels from builder and install
|
||||
COPY --from=builder /app/wheels /wheels
|
||||
|
|
@ -38,11 +37,8 @@ RUN pip install --no-cache-dir /wheels/* && rm -rf /wheels
|
|||
# Copy application code
|
||||
COPY . .
|
||||
|
||||
# Create data directory for SQLite and set permissions
|
||||
RUN mkdir -p /app/data && chown -R appuser:appuser /app
|
||||
|
||||
# Switch to non-root user
|
||||
USER appuser
|
||||
# Create data directory
|
||||
RUN mkdir -p /app/data
|
||||
|
||||
# Environment variables with sensible defaults
|
||||
ENV PYTHONUNBUFFERED=1 \
|
||||
|
|
|
|||
16
README.md
16
README.md
|
|
@ -27,7 +27,7 @@
|
|||
- **公众号搜索** — 按名称搜索公众号,获取 FakeID
|
||||
- **扫码登录** — 微信公众平台扫码登录,凭证自动保存,4 天有效期
|
||||
- **图片代理** — 代理微信 CDN 图片,解决防盗链问题
|
||||
- **Webhook 通知** — 登录过期、触发验证等事件自动推送(支持企业微信机器人)
|
||||
- **Webhook 通知** — 登录过期提醒(提前24h/6h预警+已过期通知)、触发验证等事件自动推送(支持企业微信机器人)
|
||||
- **API 文档** — 自动生成 Swagger UI / ReDoc,在线调试所有接口
|
||||
|
||||
<div align="center">
|
||||
|
|
@ -361,7 +361,9 @@ cp env.example .env
|
|||
| `WECHAT_TOKEN` | 微信 Token(登录后自动填充) | - |
|
||||
| `WECHAT_COOKIE` | 微信 Cookie(登录后自动填充) | - |
|
||||
| `WECHAT_FAKEID` | 公众号 FakeID(登录后自动填充) | - |
|
||||
| `WEBHOOK_URL` | Webhook 通知地址(可选) | 空 |
|
||||
| `WECHAT_EXPIRE_TIME` | 凭证过期时间(登录后自动填充) | - |
|
||||
| `WEBHOOK_URL` | Webhook 通知地址(支持企业微信机器人) | 空 |
|
||||
| `WEBHOOK_NOTIFICATION_INTERVAL` | 同一事件通知最小间隔(秒) | 300 |
|
||||
| `RATE_LIMIT_GLOBAL` | 全局每分钟请求上限 | 10 |
|
||||
| `RATE_LIMIT_PER_IP` | 单 IP 每分钟请求上限 | 5 |
|
||||
| `RATE_LIMIT_ARTICLE_INTERVAL` | 文章请求最小间隔(秒) | 3 |
|
||||
|
|
@ -493,6 +495,7 @@ PROXY_URLS=socks5://myuser:mypass@vps1-ip:1080,socks5://myuser:mypass@vps2-ip:10
|
|||
│ ├── rate_limiter.py # 限频器
|
||||
│ ├── rss_store.py # RSS 数据存储(SQLite)
|
||||
│ ├── rss_poller.py # RSS 后台轮询器
|
||||
│ ├── login_reminder.py # 登录过期提醒(主动检测)
|
||||
│ ├── content_processor.py # 内容处理与图片代理
|
||||
│ ├── image_proxy.py # 图片URL代理工具
|
||||
│ ├── article_fetcher.py # 批量并发获取文章
|
||||
|
|
@ -540,9 +543,14 @@ PROXY_URLS=socks5://myuser:mypass@vps1-ip:1080,socks5://myuser:mypass@vps2-ip:10
|
|||
</details>
|
||||
|
||||
<details>
|
||||
<summary><b>Token 多久过期</b></summary>
|
||||
<summary><b>Token 多久过期?如何提前知道?</b></summary>
|
||||
|
||||
Cookie 登录有效期约 4 天,过期后需重新扫码登录。配置 `WEBHOOK_URL` 可以在过期时收到通知。
|
||||
Cookie 登录有效期约 4 天,系统会:
|
||||
1. 前端显示到期时间(`/api/admin/status` 接口返回 `expireTime` 和 `isExpired` 字段)
|
||||
2. **后台每 6 小时主动检测**,提前 24h / 6h 通过 Webhook 预警
|
||||
3. 过期后立即通过 Webhook 通知
|
||||
|
||||
配置 `WEBHOOK_URL`(支持企业微信群机器人)可收到实时提醒,避免因凭证过期导致 RSS 轮询失败或搜索功能不可用。
|
||||
</details>
|
||||
|
||||
<details>
|
||||
|
|
|
|||
10
app.py
10
app.py
|
|
@ -10,6 +10,9 @@
|
|||
"""
|
||||
|
||||
from contextlib import asynccontextmanager
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
|
|
@ -56,7 +59,14 @@ async def lifespan(app: FastAPI):
|
|||
|
||||
init_db()
|
||||
await rss_poller.start()
|
||||
|
||||
# 启动登录过期提醒器(自动检测凭证有效期并 webhook 通知)
|
||||
from utils.login_reminder import login_reminder
|
||||
await login_reminder.start()
|
||||
|
||||
yield
|
||||
|
||||
await login_reminder.stop()
|
||||
await rss_poller.stop()
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -6,6 +6,11 @@
|
|||
# 2. Edit .env and set SITE_URL to your actual URL
|
||||
# 3. Run: docker-compose up -d
|
||||
# 4. Visit http://localhost:5000/login.html to scan QR code
|
||||
#
|
||||
# Note for NAS users (Synology/QNAP):
|
||||
# If you encounter permission issues, run on NAS:
|
||||
# - chmod -R 777 ./data
|
||||
# - Credentials are automatically saved to ./data directory
|
||||
|
||||
services:
|
||||
wechat-api:
|
||||
|
|
@ -17,10 +22,10 @@ services:
|
|||
ports:
|
||||
- "5000:5000"
|
||||
volumes:
|
||||
# Persist SQLite database
|
||||
# Persist SQLite database and credentials
|
||||
- ./data:/app/data
|
||||
# Config file (writable - login saves credentials here)
|
||||
- ./.env:/app/.env
|
||||
# Config file (read-only - credentials saved to data/)
|
||||
- ./.env:/app/.env:ro
|
||||
environment:
|
||||
- TZ=Asia/Shanghai
|
||||
healthcheck:
|
||||
|
|
|
|||
|
|
@ -497,8 +497,8 @@ async def biz_login(request: Request):
|
|||
import traceback
|
||||
traceback.print_exc()
|
||||
|
||||
# 计算过期时间(30天后)
|
||||
expire_time = int((time.time() + 30 * 24 * 3600) * 1000)
|
||||
# 计算过期时间(4天后,与微信实际有效期一致)
|
||||
expire_time = int((time.time() + 4 * 24 * 3600) * 1000)
|
||||
|
||||
# 保存凭证
|
||||
auth_manager.save_credentials(
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ RSS 订阅路由
|
|||
|
||||
import csv
|
||||
import io
|
||||
import os
|
||||
import time
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
|
|
@ -28,6 +29,23 @@ from utils.image_proxy import proxy_image_url
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_base_url(request: Request) -> str:
|
||||
"""
|
||||
获取服务的基础 URL,优先使用环境变量 SITE_URL,
|
||||
支持反向代理(检测 X-Forwarded-Proto 和 X-Forwarded-Host)
|
||||
"""
|
||||
# 优先使用配置的 SITE_URL
|
||||
site_url = os.getenv("SITE_URL", "").strip()
|
||||
if site_url:
|
||||
return site_url.rstrip("/")
|
||||
|
||||
# 检测反向代理头部
|
||||
proto = request.headers.get("X-Forwarded-Proto", "http")
|
||||
host = request.headers.get("X-Forwarded-Host") or request.headers.get("Host", "localhost:5000")
|
||||
|
||||
return f"{proto}://{host}"
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
|
|
@ -118,7 +136,7 @@ async def get_subscriptions(request: Request):
|
|||
返回每个订阅的基本信息、缓存文章数和 RSS 地址。
|
||||
"""
|
||||
subs = rss_store.list_subscriptions()
|
||||
base_url = str(request.base_url).rstrip("/")
|
||||
base_url = get_base_url(request)
|
||||
|
||||
items = []
|
||||
for s in subs:
|
||||
|
|
@ -195,7 +213,7 @@ async def get_aggregated_rss_feed(
|
|||
|
||||
articles = rss_store.get_all_articles(limit=limit) if subs else []
|
||||
|
||||
base_url = str(request.base_url).rstrip("/")
|
||||
base_url = get_base_url(request)
|
||||
xml = _build_aggregated_rss_xml(articles, nickname_map, base_url)
|
||||
return Response(
|
||||
content=xml,
|
||||
|
|
@ -218,7 +236,7 @@ async def export_subscriptions(
|
|||
- **opml**: 标准 OPML 格式,可直接导入 RSS 阅读器
|
||||
"""
|
||||
subs = rss_store.list_subscriptions()
|
||||
base_url = str(request.base_url).rstrip("/")
|
||||
base_url = get_base_url(request)
|
||||
|
||||
if format == "opml":
|
||||
return _build_opml_response(subs, base_url)
|
||||
|
|
@ -448,7 +466,7 @@ async def get_rss_feed(fakeid: str, request: Request,
|
|||
raise HTTPException(status_code=404, detail="未找到该订阅,请先添加订阅")
|
||||
|
||||
articles = rss_store.get_articles(fakeid, limit=limit)
|
||||
base_url = str(request.base_url).rstrip("/")
|
||||
base_url = get_base_url(request)
|
||||
xml = _build_rss_xml(fakeid, sub, articles, base_url)
|
||||
|
||||
return Response(
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@
|
|||
搜索路由 - FastAPI版本
|
||||
"""
|
||||
|
||||
import os
|
||||
from fastapi import APIRouter, Query, Request
|
||||
from pydantic import BaseModel
|
||||
from typing import Optional, List
|
||||
|
|
@ -18,6 +19,21 @@ from utils.image_proxy import proxy_image_url
|
|||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
def get_base_url(request: Request) -> str:
|
||||
"""
|
||||
获取服务的基础 URL,优先使用环境变量 SITE_URL,
|
||||
支持反向代理(检测 X-Forwarded-Proto 和 X-Forwarded-Host)
|
||||
"""
|
||||
site_url = os.getenv("SITE_URL", "").strip()
|
||||
if site_url:
|
||||
return site_url.rstrip("/")
|
||||
|
||||
proto = request.headers.get("X-Forwarded-Proto", "http")
|
||||
host = request.headers.get("X-Forwarded-Host") or request.headers.get("Host", "localhost:5000")
|
||||
|
||||
return f"{proto}://{host}"
|
||||
|
||||
class Account(BaseModel):
|
||||
"""公众号模型"""
|
||||
id: str
|
||||
|
|
@ -80,7 +96,7 @@ async def search_accounts(query: str = Query(..., description="公众号名称
|
|||
accounts = result.get("list", [])
|
||||
|
||||
# 获取 base_url 用于图片代理
|
||||
base_url = str(request.base_url).rstrip("/") if request else ""
|
||||
base_url = get_base_url(request) if request else ""
|
||||
|
||||
# 格式化返回数据
|
||||
formatted_accounts = []
|
||||
|
|
|
|||
|
|
@ -34,12 +34,31 @@ class AuthManager:
|
|||
self.base_dir = Path(__file__).parent.parent
|
||||
self.env_path = self.base_dir / ".env"
|
||||
|
||||
# Docker环境下的凭证文件(存储在data目录,权限更可靠)
|
||||
self.credentials_file = self.base_dir / "data" / ".credentials.json"
|
||||
|
||||
# 加载环境变量
|
||||
self._load_credentials()
|
||||
self._initialized = True
|
||||
|
||||
def _load_credentials(self):
|
||||
"""从.env文件加载凭证"""
|
||||
"""
|
||||
从多个来源加载凭证,优先级:
|
||||
1. data/.credentials.json (Docker环境推荐)
|
||||
2. .env 文件 (本地部署)
|
||||
3. 环境变量
|
||||
"""
|
||||
# 先尝试从 JSON 凭证文件加载(Docker 环境)
|
||||
if self.credentials_file.exists():
|
||||
try:
|
||||
import json
|
||||
with open(self.credentials_file, 'r', encoding='utf-8') as f:
|
||||
self.credentials = json.load(f)
|
||||
return
|
||||
except Exception as e:
|
||||
print(f"Warning: Failed to load credentials from {self.credentials_file}: {e}")
|
||||
|
||||
# 回退到 .env 文件(本地部署)
|
||||
if self.env_path.exists():
|
||||
load_dotenv(self.env_path, override=True)
|
||||
|
||||
|
|
@ -54,7 +73,9 @@ class AuthManager:
|
|||
def save_credentials(self, token: str, cookie: str, fakeid: str,
|
||||
nickname: str, expire_time: int) -> bool:
|
||||
"""
|
||||
保存凭证到.env文件
|
||||
保存凭证,支持双存储策略:
|
||||
1. 优先保存到 data/.credentials.json (Docker环境推荐,权限可靠)
|
||||
2. 同时尝试保存到 .env (本地部署兼容)
|
||||
|
||||
Args:
|
||||
token: 微信Token
|
||||
|
|
@ -66,21 +87,33 @@ class AuthManager:
|
|||
Returns:
|
||||
保存是否成功
|
||||
"""
|
||||
# 更新内存中的凭证
|
||||
self.credentials.update({
|
||||
"token": token,
|
||||
"cookie": cookie,
|
||||
"fakeid": fakeid,
|
||||
"nickname": nickname,
|
||||
"expire_time": expire_time
|
||||
})
|
||||
|
||||
success = False
|
||||
|
||||
# 策略1: 保存到 data/.credentials.json (Docker 环境优先)
|
||||
try:
|
||||
import json
|
||||
self.credentials_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(self.credentials_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(self.credentials, f, indent=2, ensure_ascii=False)
|
||||
print(f"[OK] 凭证已保存到: {self.credentials_file}")
|
||||
success = True
|
||||
except Exception as e:
|
||||
print(f"[WARN] 无法保存到凭证文件: {e}")
|
||||
|
||||
# 策略2: 同时尝试保存到 .env 文件(本地部署兼容)
|
||||
try:
|
||||
# 更新内存中的凭证
|
||||
self.credentials.update({
|
||||
"token": token,
|
||||
"cookie": cookie,
|
||||
"fakeid": fakeid,
|
||||
"nickname": nickname,
|
||||
"expire_time": expire_time
|
||||
})
|
||||
|
||||
# 确保.env文件存在
|
||||
if not self.env_path.exists():
|
||||
self.env_path.touch()
|
||||
|
||||
# 保存到.env文件
|
||||
env_file = str(self.env_path)
|
||||
set_key(env_file, "WECHAT_TOKEN", token)
|
||||
set_key(env_file, "WECHAT_COOKIE", cookie)
|
||||
|
|
@ -88,11 +121,17 @@ class AuthManager:
|
|||
set_key(env_file, "WECHAT_NICKNAME", nickname)
|
||||
set_key(env_file, "WECHAT_EXPIRE_TIME", str(expire_time))
|
||||
|
||||
print(f"✅ 凭证已保存到: {self.env_path}")
|
||||
return True
|
||||
print(f"[OK] 凭证已同步到: {self.env_path}")
|
||||
success = True
|
||||
except Exception as e:
|
||||
print(f"❌ 保存凭证失败: {e}")
|
||||
print(f"[WARN] 无法写入 .env 文件 (Docker环境正常): {e}")
|
||||
# Docker 环境下 .env 可能只读,不影响功能
|
||||
|
||||
if not success:
|
||||
print(f"[ERROR] 凭证保存完全失败")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def get_credentials(self) -> Optional[Dict[str, any]]:
|
||||
"""
|
||||
|
|
@ -155,7 +194,7 @@ class AuthManager:
|
|||
|
||||
def clear_credentials(self) -> bool:
|
||||
"""
|
||||
清除凭证
|
||||
清除凭证(双存储都清除)
|
||||
|
||||
Returns:
|
||||
清除是否成功
|
||||
|
|
@ -178,12 +217,20 @@ class AuthManager:
|
|||
for key in env_keys:
|
||||
os.environ.pop(key, None)
|
||||
|
||||
# 删除凭证文件
|
||||
if self.credentials_file.exists():
|
||||
self.credentials_file.unlink()
|
||||
print(f"[OK] 凭证文件已删除: {self.credentials_file}")
|
||||
|
||||
# 清空 .env 文件中的凭证字段(保留其他配置)
|
||||
if self.env_path.exists():
|
||||
env_file = str(self.env_path)
|
||||
for key in env_keys:
|
||||
set_key(env_file, key, "")
|
||||
print(f"✅ 凭证已清除: {self.env_path}")
|
||||
try:
|
||||
if self.env_path.exists():
|
||||
env_file = str(self.env_path)
|
||||
for key in env_keys:
|
||||
set_key(env_file, key, "")
|
||||
print(f"[OK] .env 凭证已清除: {self.env_path}")
|
||||
except Exception as e:
|
||||
print(f"[WARN] 无法清除 .env 文件 (Docker环境正常): {e}")
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
|
|
|
|||
|
|
@ -105,15 +105,22 @@ def extract_content(html: str) -> str:
|
|||
Extract article body, trying multiple container patterns.
|
||||
Different WeChat account types (government, media, personal) use
|
||||
different HTML structures. We try them in order of specificity.
|
||||
For image-text messages (item_show_type=8) and short posts (item_show_type=10),
|
||||
delegates to helpers.
|
||||
For image-text messages (item_show_type=8), short posts (item_show_type=10),
|
||||
and audio share pages (item_show_type=7), delegates to helpers.
|
||||
"""
|
||||
from utils.helpers import (
|
||||
is_image_text_message, _extract_image_text_content,
|
||||
is_short_content_message, _extract_short_content,
|
||||
is_audio_message, _extract_audio_content,
|
||||
get_item_show_type, _extract_audio_share_content,
|
||||
)
|
||||
|
||||
# Check for audio/video share pages (item_show_type=7) FIRST
|
||||
# These pages use Vue apps and have no js_content div
|
||||
if get_item_show_type(html) == '7':
|
||||
result = _extract_audio_share_content(html)
|
||||
return result.get('content', '')
|
||||
|
||||
if is_image_text_message(html):
|
||||
result = _extract_image_text_content(html)
|
||||
return result.get('content', '')
|
||||
|
|
|
|||
|
|
@ -361,12 +361,14 @@ def _extract_audio_content(html: str) -> Dict:
|
|||
dur_str = f' ({minutes}:{seconds:02d})'
|
||||
|
||||
display_name = audio['name'] or f'Audio {i + 1}'
|
||||
# 友好提示:音频需要微信鉴权,不提供无法播放的URL
|
||||
html_parts.append(
|
||||
f'<div style="margin:12px 0;padding:12px 16px;background:#f6f6f6;border-radius:8px">'
|
||||
f'<p style="margin:0 0 4px;font-size:15px;font-weight:500">'
|
||||
f'{html_module.escape(display_name)}{dur_str}</p>'
|
||||
f'<a href="{audio["url"]}" style="color:#1890ff;font-size:14px">'
|
||||
f'[Play Audio / Click to Listen]</a>'
|
||||
f'<div style="margin:12px 0;padding:12px 16px;background:#fff9e6;'
|
||||
f'border-left:4px solid #fa8c16;border-radius:4px">'
|
||||
f'<p style="margin:0 0 4px;font-size:14px;color:#595959;font-weight:500">'
|
||||
f'音频内容: {html_module.escape(display_name)}{dur_str}</p>'
|
||||
f'<p style="margin:0;font-size:13px;color:#8c8c8c">'
|
||||
f'此文章包含音频,需要在微信中查看完整内容</p>'
|
||||
f'</div>'
|
||||
)
|
||||
|
||||
|
|
@ -428,6 +430,22 @@ def _extract_audio_share_content(html: str) -> Dict:
|
|||
# 生成内容
|
||||
content_parts = []
|
||||
|
||||
# 标题(如果有)
|
||||
if title:
|
||||
content_parts.append(
|
||||
f'<div style="margin:20px 0;text-align:center">'
|
||||
f'<h2 style="margin:0;font-size:22px;font-weight:600;color:#262626">{title}</h2>'
|
||||
f'</div>'
|
||||
)
|
||||
|
||||
# 作者(如果有)
|
||||
if author:
|
||||
content_parts.append(
|
||||
f'<div style="margin:12px 0;text-align:center">'
|
||||
f'<p style="margin:0;font-size:14px;color:#8c8c8c">作者: {author}</p>'
|
||||
f'</div>'
|
||||
)
|
||||
|
||||
# 封面图
|
||||
if images:
|
||||
for img_url in images:
|
||||
|
|
@ -682,6 +700,8 @@ def get_unavailable_reason(html: str) -> Optional[str]:
|
|||
return None
|
||||
|
||||
# 真正的不可用标记(静态HTML中的明确文字)
|
||||
# 注意:微信的正常文章HTML中可能在JS代码里包含"已删除"/"违规"等字符串
|
||||
# 需要确保这些关键字是在实际内容中,而不是在JS字符串字面量中
|
||||
markers = [
|
||||
("该内容已被发布者删除", "已被发布者删除"),
|
||||
("内容已删除", "已被发布者删除"),
|
||||
|
|
@ -694,6 +714,21 @@ def get_unavailable_reason(html: str) -> Optional[str]:
|
|||
]
|
||||
for keyword, reason in markers:
|
||||
if keyword in html:
|
||||
# 额外验证:如果HTML很大(>1MB) 且有真实的内容容器,
|
||||
# 说明是正常文章,"已删除"/"违规"可能只是JS代码中的字符串
|
||||
if len(html) > 1000000:
|
||||
has_real_content = (
|
||||
'id="js_content"' in html or
|
||||
'class="rich_media_content' in html
|
||||
)
|
||||
if has_real_content:
|
||||
# 进一步确认:检查关键字是否在 <body> 的前10KB可见区域
|
||||
# 如果只在后面的 <script> 中出现,跳过
|
||||
import re
|
||||
body_match = re.search(r'<body[^>]*>(.*?)(?:<script|$)', html[:50000], re.DOTALL | re.IGNORECASE)
|
||||
if body_match and keyword not in body_match.group(1):
|
||||
# 关键字不在body前部,可能是JS代码,跳过此marker
|
||||
continue
|
||||
return reason
|
||||
|
||||
# 特殊处理:"该内容暂时无法查看"独立页面
|
||||
|
|
|
|||
|
|
@ -0,0 +1,150 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright (C) 2026 tmwgsicp
|
||||
# Licensed under the GNU Affero General Public License v3.0
|
||||
# See LICENSE file in the project root for full license text.
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
"""
|
||||
登录过期提醒(开源版)
|
||||
定期检查本地微信登录凭证过期状态,提前 webhook 通知。
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from typing import Optional
|
||||
from utils.webhook import webhook
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LoginReminder:
|
||||
"""登录过期提醒管理器(开源版单账号架构)"""
|
||||
|
||||
def __init__(self):
|
||||
self.check_interval = 6 * 3600 # 每 6 小时检查一次
|
||||
self.warning_threshold = 24 * 3600 # 提前 24 小时预警
|
||||
self.critical_threshold = 6 * 3600 # 提前 6 小时严重警告
|
||||
self._running = False
|
||||
self._task: Optional[asyncio.Task] = None
|
||||
self._last_warning_level = None # 记录最后一次警告级别,避免重复
|
||||
|
||||
async def start(self):
|
||||
"""启动提醒服务"""
|
||||
if self._running:
|
||||
logger.warning("登录提醒服务已在运行")
|
||||
return
|
||||
|
||||
self._running = True
|
||||
self._task = asyncio.create_task(self._run())
|
||||
logger.info("登录提醒服务已启动,检查间隔: %d 秒", self.check_interval)
|
||||
|
||||
async def stop(self):
|
||||
"""停止提醒服务"""
|
||||
self._running = False
|
||||
if self._task:
|
||||
self._task.cancel()
|
||||
try:
|
||||
await self._task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
logger.info("登录提醒服务已停止")
|
||||
|
||||
async def _run(self):
|
||||
"""后台任务循环"""
|
||||
while self._running:
|
||||
try:
|
||||
await self._check_login_status()
|
||||
except Exception as e:
|
||||
logger.error("检查登录状态失败: %s", e, exc_info=True)
|
||||
|
||||
await asyncio.sleep(self.check_interval)
|
||||
|
||||
async def _check_login_status(self):
|
||||
"""检查本地登录凭证的过期状态"""
|
||||
from utils.auth_manager import auth_manager
|
||||
|
||||
# 获取凭证信息
|
||||
creds = auth_manager.get_credentials()
|
||||
if not creds or not creds.get("token"):
|
||||
logger.debug("无登录凭证,跳过检查")
|
||||
return
|
||||
|
||||
expire_time = creds.get("expire_time", 0)
|
||||
if expire_time <= 0:
|
||||
logger.debug("凭证无过期时间,跳过检查")
|
||||
return
|
||||
|
||||
nickname = creds.get("nickname", "未知账号")
|
||||
now = int(time.time() * 1000) # 毫秒时间戳
|
||||
time_left_ms = expire_time - now
|
||||
time_left_sec = time_left_ms / 1000
|
||||
|
||||
# 已过期
|
||||
if time_left_sec <= 0:
|
||||
if self._last_warning_level != 'expired':
|
||||
await self._notify_expired(nickname)
|
||||
self._last_warning_level = 'expired'
|
||||
return
|
||||
|
||||
# 严重警告(6 小时内过期)
|
||||
if time_left_sec <= self.critical_threshold:
|
||||
if self._last_warning_level not in ['critical', 'expired']:
|
||||
await self._notify_critical(nickname, time_left_sec)
|
||||
self._last_warning_level = 'critical'
|
||||
return
|
||||
|
||||
# 一般警告(24 小时内过期)
|
||||
if time_left_sec <= self.warning_threshold:
|
||||
if self._last_warning_level not in ['warning', 'critical', 'expired']:
|
||||
await self._notify_warning(nickname, time_left_sec)
|
||||
self._last_warning_level = 'warning'
|
||||
return
|
||||
|
||||
# 状态正常,重置警告级别
|
||||
if self._last_warning_level is not None:
|
||||
self._last_warning_level = None
|
||||
logger.info("登录状态已恢复正常: %s", nickname)
|
||||
|
||||
async def _notify_warning(self, nickname: str, time_left: float):
|
||||
"""发送一般警告通知"""
|
||||
hours = time_left / 3600
|
||||
logger.warning(
|
||||
"登录凭证即将过期 [%s] - 剩余 %.1f 小时",
|
||||
nickname, hours
|
||||
)
|
||||
|
||||
await webhook.notify('login_expiring_soon', {
|
||||
'nickname': nickname,
|
||||
'hours_left': round(hours, 1),
|
||||
'level': 'warning',
|
||||
'message': f'登录凭证将在 {round(hours, 1)} 小时后过期,请及时重新登录',
|
||||
})
|
||||
|
||||
async def _notify_critical(self, nickname: str, time_left: float):
|
||||
"""发送严重警告通知"""
|
||||
hours = time_left / 3600
|
||||
logger.error(
|
||||
"登录凭证即将过期(紧急)[%s] - 剩余 %.1f 小时",
|
||||
nickname, hours
|
||||
)
|
||||
|
||||
await webhook.notify('login_expiring_critical', {
|
||||
'nickname': nickname,
|
||||
'hours_left': round(hours, 1),
|
||||
'level': 'critical',
|
||||
'message': f'登录凭证将在 {round(hours, 1)} 小时后过期(紧急),请立即重新登录',
|
||||
})
|
||||
|
||||
async def _notify_expired(self, nickname: str):
|
||||
"""发送已过期通知"""
|
||||
logger.error("登录凭证已过期 [%s]", nickname)
|
||||
|
||||
await webhook.notify('login_expired', {
|
||||
'nickname': nickname,
|
||||
'message': '登录凭证已过期,API 功能将受限,请重新登录',
|
||||
})
|
||||
|
||||
|
||||
# 全局单例
|
||||
login_reminder = LoginReminder()
|
||||
|
|
@ -21,6 +21,8 @@ logger = logging.getLogger("webhook")
|
|||
EVENT_LABELS = {
|
||||
"login_success": "登录成功",
|
||||
"login_expired": "登录过期",
|
||||
"login_expiring_soon": "登录即将过期",
|
||||
"login_expiring_critical": "登录即将过期(紧急)",
|
||||
"verification_required": "触发验证",
|
||||
"content_fetch_failed": "文章内容获取失败",
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue