Compare commits

...

4 Commits
v1.0.3 ... main

Author SHA1 Message Date
tmwgsicp 829ae4d0c0 feat: add proactive login expiry monitoring
Add login_reminder module to check credential expiration every 6 hours and send webhook alerts (24h/6h before expiry). Fix expire_time from 30 days to 4 days to align with WeChat actual validity.

Made-with: Cursor
2026-03-31 14:26:03 +08:00
tmwgsicp 8d90743584 fix(docker): resolve proxy pool configuration not loading in Docker deployment
Problem:
Docker uses 'uvicorn app:app' command which skips the if __name__ == '__main__'
block, causing load_dotenv() never executed and PROXY_URLS from .env not loaded.

Solution:
Move load_dotenv() to module level in app.py to ensure .env is loaded for all
startup methods (python app.py, uvicorn app:app, docker-compose).

Changes:
- Add module-level load_dotenv() in app.py
- Update Dockerfile version 1.0.4 -> 1.0.5
- Improve audio content display UI
- Add docs/ and scripts/ to .gitignore

Made-with: Cursor
2026-03-29 20:34:08 +08:00
tmwgsicp 9cfa0ac5b1 fix(docker): resolve credentials save permission issue on NAS platforms
Made-with: Cursor
2026-03-25 10:08:42 +08:00
tmwgsicp ad62e8b8bb fix: use SITE_URL and X-Forwarded headers for RSS URLs in reverse proxy
- Add get_base_url() helper function to detect HTTPS reverse proxy
- Prioritize SITE_URL env var over request.base_url
- Support X-Forwarded-Proto and X-Forwarded-Host headers
- Fixes RSS URL showing http:// instead of https:// behind reverse proxy

Fixes #6

Made-with: Cursor
2026-03-24 23:38:44 +08:00
13 changed files with 346 additions and 44 deletions

4
.gitignore vendored
View File

@ -67,3 +67,7 @@ data/
# SaaS 版本(独立仓库管理) # SaaS 版本(独立仓库管理)
saas/ saas/
# 个人文档和脚本(不提交)
docs/
scripts/

View File

@ -21,7 +21,7 @@ FROM python:3.11-slim
LABEL maintainer="tmwgsicp" LABEL maintainer="tmwgsicp"
LABEL description="WeChat Official Account Article Download API with RSS Support" LABEL description="WeChat Official Account Article Download API with RSS Support"
LABEL version="1.0.3" LABEL version="1.0.5"
WORKDIR /app WORKDIR /app

View File

@ -27,7 +27,7 @@
- **公众号搜索** — 按名称搜索公众号,获取 FakeID - **公众号搜索** — 按名称搜索公众号,获取 FakeID
- **扫码登录** — 微信公众平台扫码登录凭证自动保存4 天有效期 - **扫码登录** — 微信公众平台扫码登录凭证自动保存4 天有效期
- **图片代理** — 代理微信 CDN 图片,解决防盗链问题 - **图片代理** — 代理微信 CDN 图片,解决防盗链问题
- **Webhook 通知** — 登录过期、触发验证等事件自动推送(支持企业微信机器人) - **Webhook 通知** — 登录过期提醒提前24h/6h预警+已过期通知)、触发验证等事件自动推送(支持企业微信机器人)
- **API 文档** — 自动生成 Swagger UI / ReDoc在线调试所有接口 - **API 文档** — 自动生成 Swagger UI / ReDoc在线调试所有接口
<div align="center"> <div align="center">
@ -361,7 +361,9 @@ cp env.example .env
| `WECHAT_TOKEN` | 微信 Token登录后自动填充 | - | | `WECHAT_TOKEN` | 微信 Token登录后自动填充 | - |
| `WECHAT_COOKIE` | 微信 Cookie登录后自动填充 | - | | `WECHAT_COOKIE` | 微信 Cookie登录后自动填充 | - |
| `WECHAT_FAKEID` | 公众号 FakeID登录后自动填充 | - | | `WECHAT_FAKEID` | 公众号 FakeID登录后自动填充 | - |
| `WEBHOOK_URL` | Webhook 通知地址(可选) | 空 | | `WECHAT_EXPIRE_TIME` | 凭证过期时间(登录后自动填充) | - |
| `WEBHOOK_URL` | Webhook 通知地址(支持企业微信机器人) | 空 |
| `WEBHOOK_NOTIFICATION_INTERVAL` | 同一事件通知最小间隔(秒) | 300 |
| `RATE_LIMIT_GLOBAL` | 全局每分钟请求上限 | 10 | | `RATE_LIMIT_GLOBAL` | 全局每分钟请求上限 | 10 |
| `RATE_LIMIT_PER_IP` | 单 IP 每分钟请求上限 | 5 | | `RATE_LIMIT_PER_IP` | 单 IP 每分钟请求上限 | 5 |
| `RATE_LIMIT_ARTICLE_INTERVAL` | 文章请求最小间隔(秒) | 3 | | `RATE_LIMIT_ARTICLE_INTERVAL` | 文章请求最小间隔(秒) | 3 |
@ -493,6 +495,7 @@ PROXY_URLS=socks5://myuser:mypass@vps1-ip:1080,socks5://myuser:mypass@vps2-ip:10
│ ├── rate_limiter.py # 限频器 │ ├── rate_limiter.py # 限频器
│ ├── rss_store.py # RSS 数据存储SQLite │ ├── rss_store.py # RSS 数据存储SQLite
│ ├── rss_poller.py # RSS 后台轮询器 │ ├── rss_poller.py # RSS 后台轮询器
│ ├── login_reminder.py # 登录过期提醒(主动检测)
│ ├── content_processor.py # 内容处理与图片代理 │ ├── content_processor.py # 内容处理与图片代理
│ ├── image_proxy.py # 图片URL代理工具 │ ├── image_proxy.py # 图片URL代理工具
│ ├── article_fetcher.py # 批量并发获取文章 │ ├── article_fetcher.py # 批量并发获取文章
@ -540,9 +543,14 @@ PROXY_URLS=socks5://myuser:mypass@vps1-ip:1080,socks5://myuser:mypass@vps2-ip:10
</details> </details>
<details> <details>
<summary><b>Token 多久过期</b></summary> <summary><b>Token 多久过期?如何提前知道?</b></summary>
Cookie 登录有效期约 4 天,过期后需重新扫码登录。配置 `WEBHOOK_URL` 可以在过期时收到通知。 Cookie 登录有效期约 4 天,系统会:
1. 前端显示到期时间(`/api/admin/status` 接口返回 `expireTime``isExpired` 字段)
2. **后台每 6 小时主动检测**,提前 24h / 6h 通过 Webhook 预警
3. 过期后立即通过 Webhook 通知
配置 `WEBHOOK_URL`(支持企业微信群机器人)可收到实时提醒,避免因凭证过期导致 RSS 轮询失败或搜索功能不可用。
</details> </details>
<details> <details>

10
app.py
View File

@ -10,6 +10,9 @@
""" """
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dotenv import load_dotenv
load_dotenv()
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
@ -56,7 +59,14 @@ async def lifespan(app: FastAPI):
init_db() init_db()
await rss_poller.start() await rss_poller.start()
# 启动登录过期提醒器(自动检测凭证有效期并 webhook 通知)
from utils.login_reminder import login_reminder
await login_reminder.start()
yield yield
await login_reminder.stop()
await rss_poller.stop() await rss_poller.stop()

View File

@ -6,6 +6,11 @@
# 2. Edit .env and set SITE_URL to your actual URL # 2. Edit .env and set SITE_URL to your actual URL
# 3. Run: docker-compose up -d # 3. Run: docker-compose up -d
# 4. Visit http://localhost:5000/login.html to scan QR code # 4. Visit http://localhost:5000/login.html to scan QR code
#
# Note for NAS users (Synology/QNAP):
# If you encounter permission issues, run on NAS:
# - chmod -R 777 ./data
# - Credentials are automatically saved to ./data directory
services: services:
wechat-api: wechat-api:
@ -17,10 +22,10 @@ services:
ports: ports:
- "5000:5000" - "5000:5000"
volumes: volumes:
# Persist SQLite database # Persist SQLite database and credentials
- ./data:/app/data - ./data:/app/data
# Config file (writable - login saves credentials here) # Config file (read-only - credentials saved to data/)
- ./.env:/app/.env - ./.env:/app/.env:ro
environment: environment:
- TZ=Asia/Shanghai - TZ=Asia/Shanghai
healthcheck: healthcheck:

View File

@ -497,8 +497,8 @@ async def biz_login(request: Request):
import traceback import traceback
traceback.print_exc() traceback.print_exc()
# 计算过期时间(30天后 # 计算过期时间(4天后与微信实际有效期一致
expire_time = int((time.time() + 30 * 24 * 3600) * 1000) expire_time = int((time.time() + 4 * 24 * 3600) * 1000)
# 保存凭证 # 保存凭证
auth_manager.save_credentials( auth_manager.save_credentials(

View File

@ -11,6 +11,7 @@ RSS 订阅路由
import csv import csv
import io import io
import os
import time import time
import logging import logging
from datetime import datetime, timezone from datetime import datetime, timezone
@ -28,6 +29,23 @@ from utils.image_proxy import proxy_image_url
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def get_base_url(request: Request) -> str:
"""
获取服务的基础 URL优先使用环境变量 SITE_URL
支持反向代理检测 X-Forwarded-Proto X-Forwarded-Host
"""
# 优先使用配置的 SITE_URL
site_url = os.getenv("SITE_URL", "").strip()
if site_url:
return site_url.rstrip("/")
# 检测反向代理头部
proto = request.headers.get("X-Forwarded-Proto", "http")
host = request.headers.get("X-Forwarded-Host") or request.headers.get("Host", "localhost:5000")
return f"{proto}://{host}"
router = APIRouter() router = APIRouter()
@ -118,7 +136,7 @@ async def get_subscriptions(request: Request):
返回每个订阅的基本信息缓存文章数和 RSS 地址 返回每个订阅的基本信息缓存文章数和 RSS 地址
""" """
subs = rss_store.list_subscriptions() subs = rss_store.list_subscriptions()
base_url = str(request.base_url).rstrip("/") base_url = get_base_url(request)
items = [] items = []
for s in subs: for s in subs:
@ -195,7 +213,7 @@ async def get_aggregated_rss_feed(
articles = rss_store.get_all_articles(limit=limit) if subs else [] articles = rss_store.get_all_articles(limit=limit) if subs else []
base_url = str(request.base_url).rstrip("/") base_url = get_base_url(request)
xml = _build_aggregated_rss_xml(articles, nickname_map, base_url) xml = _build_aggregated_rss_xml(articles, nickname_map, base_url)
return Response( return Response(
content=xml, content=xml,
@ -218,7 +236,7 @@ async def export_subscriptions(
- **opml**: 标准 OPML 格式可直接导入 RSS 阅读器 - **opml**: 标准 OPML 格式可直接导入 RSS 阅读器
""" """
subs = rss_store.list_subscriptions() subs = rss_store.list_subscriptions()
base_url = str(request.base_url).rstrip("/") base_url = get_base_url(request)
if format == "opml": if format == "opml":
return _build_opml_response(subs, base_url) return _build_opml_response(subs, base_url)
@ -448,7 +466,7 @@ async def get_rss_feed(fakeid: str, request: Request,
raise HTTPException(status_code=404, detail="未找到该订阅,请先添加订阅") raise HTTPException(status_code=404, detail="未找到该订阅,请先添加订阅")
articles = rss_store.get_articles(fakeid, limit=limit) articles = rss_store.get_articles(fakeid, limit=limit)
base_url = str(request.base_url).rstrip("/") base_url = get_base_url(request)
xml = _build_rss_xml(fakeid, sub, articles, base_url) xml = _build_rss_xml(fakeid, sub, articles, base_url)
return Response( return Response(

View File

@ -8,6 +8,7 @@
搜索路由 - FastAPI版本 搜索路由 - FastAPI版本
""" """
import os
from fastapi import APIRouter, Query, Request from fastapi import APIRouter, Query, Request
from pydantic import BaseModel from pydantic import BaseModel
from typing import Optional, List from typing import Optional, List
@ -18,6 +19,21 @@ from utils.image_proxy import proxy_image_url
router = APIRouter() router = APIRouter()
def get_base_url(request: Request) -> str:
"""
获取服务的基础 URL优先使用环境变量 SITE_URL
支持反向代理检测 X-Forwarded-Proto X-Forwarded-Host
"""
site_url = os.getenv("SITE_URL", "").strip()
if site_url:
return site_url.rstrip("/")
proto = request.headers.get("X-Forwarded-Proto", "http")
host = request.headers.get("X-Forwarded-Host") or request.headers.get("Host", "localhost:5000")
return f"{proto}://{host}"
class Account(BaseModel): class Account(BaseModel):
"""公众号模型""" """公众号模型"""
id: str id: str
@ -80,7 +96,7 @@ async def search_accounts(query: str = Query(..., description="公众号名称
accounts = result.get("list", []) accounts = result.get("list", [])
# 获取 base_url 用于图片代理 # 获取 base_url 用于图片代理
base_url = str(request.base_url).rstrip("/") if request else "" base_url = get_base_url(request) if request else ""
# 格式化返回数据 # 格式化返回数据
formatted_accounts = [] formatted_accounts = []

View File

@ -34,12 +34,31 @@ class AuthManager:
self.base_dir = Path(__file__).parent.parent self.base_dir = Path(__file__).parent.parent
self.env_path = self.base_dir / ".env" self.env_path = self.base_dir / ".env"
# Docker环境下的凭证文件存储在data目录权限更可靠
self.credentials_file = self.base_dir / "data" / ".credentials.json"
# 加载环境变量 # 加载环境变量
self._load_credentials() self._load_credentials()
self._initialized = True self._initialized = True
def _load_credentials(self): def _load_credentials(self):
"""从.env文件加载凭证""" """
从多个来源加载凭证优先级
1. data/.credentials.json (Docker环境推荐)
2. .env 文件 (本地部署)
3. 环境变量
"""
# 先尝试从 JSON 凭证文件加载Docker 环境)
if self.credentials_file.exists():
try:
import json
with open(self.credentials_file, 'r', encoding='utf-8') as f:
self.credentials = json.load(f)
return
except Exception as e:
print(f"Warning: Failed to load credentials from {self.credentials_file}: {e}")
# 回退到 .env 文件(本地部署)
if self.env_path.exists(): if self.env_path.exists():
load_dotenv(self.env_path, override=True) load_dotenv(self.env_path, override=True)
@ -54,7 +73,9 @@ class AuthManager:
def save_credentials(self, token: str, cookie: str, fakeid: str, def save_credentials(self, token: str, cookie: str, fakeid: str,
nickname: str, expire_time: int) -> bool: nickname: str, expire_time: int) -> bool:
""" """
保存凭证到.env文件 保存凭证支持双存储策略
1. 优先保存到 data/.credentials.json (Docker环境推荐权限可靠)
2. 同时尝试保存到 .env (本地部署兼容)
Args: Args:
token: 微信Token token: 微信Token
@ -66,21 +87,33 @@ class AuthManager:
Returns: Returns:
保存是否成功 保存是否成功
""" """
# 更新内存中的凭证
self.credentials.update({
"token": token,
"cookie": cookie,
"fakeid": fakeid,
"nickname": nickname,
"expire_time": expire_time
})
success = False
# 策略1: 保存到 data/.credentials.json (Docker 环境优先)
try:
import json
self.credentials_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.credentials_file, 'w', encoding='utf-8') as f:
json.dump(self.credentials, f, indent=2, ensure_ascii=False)
print(f"[OK] 凭证已保存到: {self.credentials_file}")
success = True
except Exception as e:
print(f"[WARN] 无法保存到凭证文件: {e}")
# 策略2: 同时尝试保存到 .env 文件(本地部署兼容)
try: try:
# 更新内存中的凭证
self.credentials.update({
"token": token,
"cookie": cookie,
"fakeid": fakeid,
"nickname": nickname,
"expire_time": expire_time
})
# 确保.env文件存在
if not self.env_path.exists(): if not self.env_path.exists():
self.env_path.touch() self.env_path.touch()
# 保存到.env文件
env_file = str(self.env_path) env_file = str(self.env_path)
set_key(env_file, "WECHAT_TOKEN", token) set_key(env_file, "WECHAT_TOKEN", token)
set_key(env_file, "WECHAT_COOKIE", cookie) set_key(env_file, "WECHAT_COOKIE", cookie)
@ -88,11 +121,17 @@ class AuthManager:
set_key(env_file, "WECHAT_NICKNAME", nickname) set_key(env_file, "WECHAT_NICKNAME", nickname)
set_key(env_file, "WECHAT_EXPIRE_TIME", str(expire_time)) set_key(env_file, "WECHAT_EXPIRE_TIME", str(expire_time))
print(f"✅ 凭证已保存到: {self.env_path}") print(f"[OK] 凭证已同步到: {self.env_path}")
return True success = True
except Exception as e: except Exception as e:
print(f"❌ 保存凭证失败: {e}") print(f"[WARN] 无法写入 .env 文件 (Docker环境正常): {e}")
# Docker 环境下 .env 可能只读,不影响功能
if not success:
print(f"[ERROR] 凭证保存完全失败")
return False return False
return True
def get_credentials(self) -> Optional[Dict[str, any]]: def get_credentials(self) -> Optional[Dict[str, any]]:
""" """
@ -155,7 +194,7 @@ class AuthManager:
def clear_credentials(self) -> bool: def clear_credentials(self) -> bool:
""" """
清除凭证 清除凭证双存储都清除
Returns: Returns:
清除是否成功 清除是否成功
@ -178,12 +217,20 @@ class AuthManager:
for key in env_keys: for key in env_keys:
os.environ.pop(key, None) os.environ.pop(key, None)
# 删除凭证文件
if self.credentials_file.exists():
self.credentials_file.unlink()
print(f"[OK] 凭证文件已删除: {self.credentials_file}")
# 清空 .env 文件中的凭证字段(保留其他配置) # 清空 .env 文件中的凭证字段(保留其他配置)
if self.env_path.exists(): try:
env_file = str(self.env_path) if self.env_path.exists():
for key in env_keys: env_file = str(self.env_path)
set_key(env_file, key, "") for key in env_keys:
print(f"✅ 凭证已清除: {self.env_path}") set_key(env_file, key, "")
print(f"[OK] .env 凭证已清除: {self.env_path}")
except Exception as e:
print(f"[WARN] 无法清除 .env 文件 (Docker环境正常): {e}")
return True return True
except Exception as e: except Exception as e:

View File

@ -105,15 +105,22 @@ def extract_content(html: str) -> str:
Extract article body, trying multiple container patterns. Extract article body, trying multiple container patterns.
Different WeChat account types (government, media, personal) use Different WeChat account types (government, media, personal) use
different HTML structures. We try them in order of specificity. different HTML structures. We try them in order of specificity.
For image-text messages (item_show_type=8) and short posts (item_show_type=10), For image-text messages (item_show_type=8), short posts (item_show_type=10),
delegates to helpers. and audio share pages (item_show_type=7), delegates to helpers.
""" """
from utils.helpers import ( from utils.helpers import (
is_image_text_message, _extract_image_text_content, is_image_text_message, _extract_image_text_content,
is_short_content_message, _extract_short_content, is_short_content_message, _extract_short_content,
is_audio_message, _extract_audio_content, is_audio_message, _extract_audio_content,
get_item_show_type, _extract_audio_share_content,
) )
# Check for audio/video share pages (item_show_type=7) FIRST
# These pages use Vue apps and have no js_content div
if get_item_show_type(html) == '7':
result = _extract_audio_share_content(html)
return result.get('content', '')
if is_image_text_message(html): if is_image_text_message(html):
result = _extract_image_text_content(html) result = _extract_image_text_content(html)
return result.get('content', '') return result.get('content', '')

View File

@ -361,12 +361,14 @@ def _extract_audio_content(html: str) -> Dict:
dur_str = f' ({minutes}:{seconds:02d})' dur_str = f' ({minutes}:{seconds:02d})'
display_name = audio['name'] or f'Audio {i + 1}' display_name = audio['name'] or f'Audio {i + 1}'
# 友好提示音频需要微信鉴权不提供无法播放的URL
html_parts.append( html_parts.append(
f'<div style="margin:12px 0;padding:12px 16px;background:#f6f6f6;border-radius:8px">' f'<div style="margin:12px 0;padding:12px 16px;background:#fff9e6;'
f'<p style="margin:0 0 4px;font-size:15px;font-weight:500">' f'border-left:4px solid #fa8c16;border-radius:4px">'
f'{html_module.escape(display_name)}{dur_str}</p>' f'<p style="margin:0 0 4px;font-size:14px;color:#595959;font-weight:500">'
f'<a href="{audio["url"]}" style="color:#1890ff;font-size:14px">' f'音频内容: {html_module.escape(display_name)}{dur_str}</p>'
f'[Play Audio / Click to Listen]</a>' f'<p style="margin:0;font-size:13px;color:#8c8c8c">'
f'此文章包含音频,需要在微信中查看完整内容</p>'
f'</div>' f'</div>'
) )
@ -428,6 +430,22 @@ def _extract_audio_share_content(html: str) -> Dict:
# 生成内容 # 生成内容
content_parts = [] content_parts = []
# 标题(如果有)
if title:
content_parts.append(
f'<div style="margin:20px 0;text-align:center">'
f'<h2 style="margin:0;font-size:22px;font-weight:600;color:#262626">{title}</h2>'
f'</div>'
)
# 作者(如果有)
if author:
content_parts.append(
f'<div style="margin:12px 0;text-align:center">'
f'<p style="margin:0;font-size:14px;color:#8c8c8c">作者: {author}</p>'
f'</div>'
)
# 封面图 # 封面图
if images: if images:
for img_url in images: for img_url in images:
@ -682,6 +700,8 @@ def get_unavailable_reason(html: str) -> Optional[str]:
return None return None
# 真正的不可用标记静态HTML中的明确文字 # 真正的不可用标记静态HTML中的明确文字
# 注意微信的正常文章HTML中可能在JS代码里包含"已删除"/"违规"等字符串
# 需要确保这些关键字是在实际内容中而不是在JS字符串字面量中
markers = [ markers = [
("该内容已被发布者删除", "已被发布者删除"), ("该内容已被发布者删除", "已被发布者删除"),
("内容已删除", "已被发布者删除"), ("内容已删除", "已被发布者删除"),
@ -694,6 +714,21 @@ def get_unavailable_reason(html: str) -> Optional[str]:
] ]
for keyword, reason in markers: for keyword, reason in markers:
if keyword in html: if keyword in html:
# 额外验证如果HTML很大(>1MB) 且有真实的内容容器,
# 说明是正常文章,"已删除"/"违规"可能只是JS代码中的字符串
if len(html) > 1000000:
has_real_content = (
'id="js_content"' in html or
'class="rich_media_content' in html
)
if has_real_content:
# 进一步确认:检查关键字是否在 <body> 的前10KB可见区域
# 如果只在后面的 <script> 中出现,跳过
import re
body_match = re.search(r'<body[^>]*>(.*?)(?:<script|$)', html[:50000], re.DOTALL | re.IGNORECASE)
if body_match and keyword not in body_match.group(1):
# 关键字不在body前部可能是JS代码跳过此marker
continue
return reason return reason
# 特殊处理:"该内容暂时无法查看"独立页面 # 特殊处理:"该内容暂时无法查看"独立页面

150
utils/login_reminder.py Normal file
View File

@ -0,0 +1,150 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (C) 2026 tmwgsicp
# Licensed under the GNU Affero General Public License v3.0
# See LICENSE file in the project root for full license text.
# SPDX-License-Identifier: AGPL-3.0-only
"""
登录过期提醒开源版
定期检查本地微信登录凭证过期状态提前 webhook 通知
"""
import asyncio
import logging
import time
from typing import Optional
from utils.webhook import webhook
logger = logging.getLogger(__name__)
class LoginReminder:
"""登录过期提醒管理器(开源版单账号架构)"""
def __init__(self):
self.check_interval = 6 * 3600 # 每 6 小时检查一次
self.warning_threshold = 24 * 3600 # 提前 24 小时预警
self.critical_threshold = 6 * 3600 # 提前 6 小时严重警告
self._running = False
self._task: Optional[asyncio.Task] = None
self._last_warning_level = None # 记录最后一次警告级别,避免重复
async def start(self):
"""启动提醒服务"""
if self._running:
logger.warning("登录提醒服务已在运行")
return
self._running = True
self._task = asyncio.create_task(self._run())
logger.info("登录提醒服务已启动,检查间隔: %d", self.check_interval)
async def stop(self):
"""停止提醒服务"""
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("登录提醒服务已停止")
async def _run(self):
"""后台任务循环"""
while self._running:
try:
await self._check_login_status()
except Exception as e:
logger.error("检查登录状态失败: %s", e, exc_info=True)
await asyncio.sleep(self.check_interval)
async def _check_login_status(self):
"""检查本地登录凭证的过期状态"""
from utils.auth_manager import auth_manager
# 获取凭证信息
creds = auth_manager.get_credentials()
if not creds or not creds.get("token"):
logger.debug("无登录凭证,跳过检查")
return
expire_time = creds.get("expire_time", 0)
if expire_time <= 0:
logger.debug("凭证无过期时间,跳过检查")
return
nickname = creds.get("nickname", "未知账号")
now = int(time.time() * 1000) # 毫秒时间戳
time_left_ms = expire_time - now
time_left_sec = time_left_ms / 1000
# 已过期
if time_left_sec <= 0:
if self._last_warning_level != 'expired':
await self._notify_expired(nickname)
self._last_warning_level = 'expired'
return
# 严重警告6 小时内过期)
if time_left_sec <= self.critical_threshold:
if self._last_warning_level not in ['critical', 'expired']:
await self._notify_critical(nickname, time_left_sec)
self._last_warning_level = 'critical'
return
# 一般警告24 小时内过期)
if time_left_sec <= self.warning_threshold:
if self._last_warning_level not in ['warning', 'critical', 'expired']:
await self._notify_warning(nickname, time_left_sec)
self._last_warning_level = 'warning'
return
# 状态正常,重置警告级别
if self._last_warning_level is not None:
self._last_warning_level = None
logger.info("登录状态已恢复正常: %s", nickname)
async def _notify_warning(self, nickname: str, time_left: float):
"""发送一般警告通知"""
hours = time_left / 3600
logger.warning(
"登录凭证即将过期 [%s] - 剩余 %.1f 小时",
nickname, hours
)
await webhook.notify('login_expiring_soon', {
'nickname': nickname,
'hours_left': round(hours, 1),
'level': 'warning',
'message': f'登录凭证将在 {round(hours, 1)} 小时后过期,请及时重新登录',
})
async def _notify_critical(self, nickname: str, time_left: float):
"""发送严重警告通知"""
hours = time_left / 3600
logger.error(
"登录凭证即将过期(紧急)[%s] - 剩余 %.1f 小时",
nickname, hours
)
await webhook.notify('login_expiring_critical', {
'nickname': nickname,
'hours_left': round(hours, 1),
'level': 'critical',
'message': f'登录凭证将在 {round(hours, 1)} 小时后过期(紧急),请立即重新登录',
})
async def _notify_expired(self, nickname: str):
"""发送已过期通知"""
logger.error("登录凭证已过期 [%s]", nickname)
await webhook.notify('login_expired', {
'nickname': nickname,
'message': '登录凭证已过期API 功能将受限,请重新登录',
})
# 全局单例
login_reminder = LoginReminder()

View File

@ -21,6 +21,8 @@ logger = logging.getLogger("webhook")
EVENT_LABELS = { EVENT_LABELS = {
"login_success": "登录成功", "login_success": "登录成功",
"login_expired": "登录过期", "login_expired": "登录过期",
"login_expiring_soon": "登录即将过期",
"login_expiring_critical": "登录即将过期(紧急)",
"verification_required": "触发验证", "verification_required": "触发验证",
"content_fetch_failed": "文章内容获取失败", "content_fetch_failed": "文章内容获取失败",
} }