完成API下载功能

This commit is contained in:
Krcia 2025-11-07 11:15:09 +08:00
parent ebb6384aa4
commit 00ff77f7d7
11 changed files with 1366 additions and 0 deletions

180
api.py Normal file
View File

@ -0,0 +1,180 @@
import asyncio
from flask import Flask, jsonify, request
import jwt
import datetime
import os
from functools import wraps
from download import M3U8Downloader
from function import crawl_missav
from urllib.parse import urlparse
app = Flask(__name__)
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'your-secret-key-here')
downloader = M3U8Downloader(max_workers=10, output_dir=r"download")
# 从环境变量获取用户名密码
USERNAME = os.getenv('USER')
PASSWORD = os.getenv('PASSWORD')
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get(f'Authorization')
if not token:
return jsonify({
'msg': '请登录',
'code': 403,
}), 403
# 检查token格式
if token.startswith('Bearer '):
token = token[7:]
try:
# 解码token
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
current_user = data['user']
except jwt.ExpiredSignatureError:
return jsonify({
'msg': '登录已过期,请重新登录',
'code': 403,
}), 403
except jwt.InvalidTokenError:
return jsonify({
'msg': '无效的token',
'code': 403,
}), 403
return f(*args, **kwargs)
return decorated
@app.route('/api/login', methods=['POST'])
def login():
data = request.get_json()
if not data:
return jsonify({
'msg': '请提供用户名和密码',
'code': 400,
}), 400
username = data.get('username')
password = data.get('password')
# 验证用户名密码
if username == USERNAME and password == PASSWORD:
# 生成token1小时过期
token = jwt.encode({
'user': username,
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}, app.config['SECRET_KEY'], algorithm='HS256')
return jsonify({
'msg': '登录成功',
'code': 200,
'data': {
'token': token,
'expires_in': 3600 # 1小时单位秒
}
}), 200
else:
return jsonify({
'msg': '用户名或密码错误',
'code': 401,
}), 401
@app.route('/api/check/<path:url>')
@token_required
def check_url(url):
status = is_from_missav(url)
if (status):
result = asyncio.run(crawl_missav(
url
))
return jsonify({
'msg': '成功',
'code': 200,
'dat': result
}), 200
else:
return jsonify({
'msg': '不是来自missav的链接',
'code': 500
}), 200
@app.route('/api/download', methods=['POST'])
# @token_required
def download():
data = request.get_json()
if not data:
return jsonify({'error': 'No JSON data provided'}), 400
name = data.get('name')
url = data.get('url')
if not name or not url:
return jsonify({'error': 'Missing name or url parameter'}), 400
task_id = downloader.download(
output_filename=f"{name}.mp4",
m3u8_url=url
)
return jsonify({
'msg': '成功',
'code': 200,
'dat': task_id
}), 200
@app.route('/api/all-task', methods=['GET'])
# @token_required
def all_task():
all_tasks = downloader.get_all_tasks()
return jsonify({
'msg': '成功',
'code': 200,
'data': all_tasks
}), 200
@app.route('/api/progress/<path:task_id>', methods=['GET'])
@token_required
def progress(task_id):
progress_info = downloader.get_progress(task_id)
filename = progress_info['filename']
progress = progress_info['progress'] # 0~1的浮点数如0.56表示56%
status = progress_info['status']
print(f"文件: {filename}, 进度: {progress:.2%}, 状态: {status}")
return jsonify({
'msg': '成功',
'code': 200,
'data': {'name': filename, 'progress': progress}
}), 200
def is_from_missav(url):
try:
parsed = urlparse(url)
hostname = parsed.netloc.lower()
return hostname == 'missav.ws' or hostname.endswith('.missav.ws')
except:
return False
if __name__ == '__main__':
# 检查环境变量是否设置
if not USERNAME or not PASSWORD:
print("警告: 请设置环境变量 USER 和 PASSWORD")
app.run(debug=True, host='0.0.0.0', port=5000)

61
docker/Dockerfile Normal file
View File

@ -0,0 +1,61 @@
FROM python:3.12-slim
# 安装系统依赖
RUN apt-get update && apt-get install -y \
nginx \
curl \
wget \
libglib2.0-0 \
libnss3 \
libnspr4 \
libatk1.0-0 \
libatk-bridge2.0-0 \
libcups2 \
libdrm2 \
libdbus-1-3 \
libxkbcommon0 \
libxcomposite1 \
libxdamage1 \
libxfixes3 \
libxrandr2 \
libgbm1 \
libasound2 \
libpango-1.0-0 \
libcairo2 \
libatspi2.0-0 \
fonts-liberation \
libnss3-tools \
xvfb \
&& rm -rf /var/lib/apt/lists/*
# 设置工作目录
WORKDIR /app
# 复制server文件
COPY ./server/* /app/server/
# 复制nginx配置文件
COPY ./nginx.conf /etc/nginx/nginx.conf
# 安装Python依赖
RUN pip install --no-cache-dir -r /app/server/requirements.txt
# 配置Playwright代理和安装Chromium
RUN playwright install chromium
# 创建下载目录和nginx运行所需目录
RUN mkdir -p /app/server/download /var/run/nginx
# 设置环境变量
ENV USER=admin
ENV PASSWORD=password
ENV SECRET_KEY='asd78yujncisa32r89'
# 设置卷
VOLUME ["/app/server/download"]
# 暴露端口
EXPOSE 80
# 启动命令
CMD service nginx start && cd /app/server && xvfb-run -a python3 api.py

10
docker/html/idnex.html Normal file
View File

@ -0,0 +1,10 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>下载器</title>
</head>
<body>
</body>
</html>

54
docker/nginx.conf Normal file
View File

@ -0,0 +1,54 @@
worker_processes 1;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
sendfile on;
keepalive_timeout 65;
map $proxy_protocol_addr $real_ip {
default $remote_addr;
}
server {
listen 4560;
server_name localhost;
client_header_buffer_size 64k;
large_client_header_buffers 8 128k;
client_max_body_size 50m;
location / {
root /app/html;
try_files $uri $uri/ /index.html;
}
location /api {
proxy_pass http://localhost:5000/api;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 跨域配置
add_header Access-Control-Allow-Origin * always;
add_header Access-Control-Allow-Methods 'GET, POST, OPTIONS, PUT, DELETE' always;
add_header Access-Control-Allow-Headers 'Authorization, Content-Type, X-Requested-With, Accept, Origin' always;
add_header Access-Control-Allow-Credentials true always;
add_header Access-Control-Expose-Headers 'Content-Length, Content-Range' always;
# 处理OPTIONS预检请求
if ($request_method = 'OPTIONS') {
add_header Access-Control-Allow-Origin *;
add_header Access-Control-Allow-Methods 'GET, POST, OPTIONS, PUT, DELETE';
add_header Access-Control-Allow-Headers 'Authorization, Content-Type, X-Requested-With, Accept, Origin';
add_header Access-Control-Max-Age 86400;
add_header Content-Length 0;
add_header Content-Type text/plain;
return 200;
}
}
}
}

180
docker/server/api.py Normal file
View File

@ -0,0 +1,180 @@
import asyncio
from flask import Flask, jsonify, request
import jwt
import datetime
import os
from functools import wraps
from download import M3U8Downloader
from function import crawl_missav
from urllib.parse import urlparse
app = Flask(__name__)
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'your-secret-key-here')
downloader = M3U8Downloader(max_workers=10, output_dir=r"download")
# 从环境变量获取用户名密码
USERNAME = os.getenv('USER')
PASSWORD = os.getenv('PASSWORD')
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({
'msg': '请登录',
'code': 403,
}), 403
# 检查token格式
if token.startswith('Bearer '):
token = token[7:]
try:
# 解码token
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
current_user = data['user']
except jwt.ExpiredSignatureError:
return jsonify({
'msg': '登录已过期,请重新登录',
'code': 403,
}), 403
except jwt.InvalidTokenError:
return jsonify({
'msg': '无效的token',
'code': 403,
}), 403
return f(*args, **kwargs)
return decorated
@app.route('/api/login', methods=['POST'])
def login():
data = request.get_json()
if not data:
return jsonify({
'msg': '请提供用户名和密码',
'code': 400,
}), 400
username = data.get('username')
password = data.get('password')
# 验证用户名密码
if username == USERNAME and password == PASSWORD:
# 生成token1小时过期
token = jwt.encode({
'user': username,
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}, app.config['SECRET_KEY'], algorithm='HS256')
return jsonify({
'msg': '登录成功',
'code': 200,
'data': {
'token': token,
'expires_in': 3600 # 1小时单位秒
}
}), 200
else:
return jsonify({
'msg': '用户名或密码错误',
'code': 401,
}), 401
@app.route('/api/check/<path:url>')
@token_required
def check_url(url):
status = is_from_missav(url)
if (status):
result = asyncio.run(crawl_missav(
url
))
return jsonify({
'msg': '成功',
'code': 200,
'dat': result
}), 200
else:
return jsonify({
'msg': '不是来自missav的链接',
'code': 500
}), 200
@app.route('/api/download', methods=['POST'])
@token_required
def download():
data = request.get_json()
if not data:
return jsonify({'error': 'No JSON data provided'}), 400
name = data.get('name')
url = data.get('url')
if not name or not url:
return jsonify({'error': 'Missing name or url parameter'}), 400
task_id = downloader.download(
output_filename=f"{name}.mp4",
m3u8_url=url
)
return jsonify({
'msg': '成功',
'code': 200,
'dat': task_id
}), 200
@app.route('/api/all-task', methods=['GET'])
@token_required
def all_task():
all_tasks = downloader.get_all_tasks()
return jsonify({
'msg': '成功',
'code': 200,
'data': all_tasks
}), 200
@app.route('/api/progress/<path:task_id>', methods=['GET'])
@token_required
def progress(task_id):
progress_info = downloader.get_progress(task_id)
filename = progress_info['filename']
progress = progress_info['progress'] # 0~1的浮点数如0.56表示56%
status = progress_info['status']
print(f"文件: {filename}, 进度: {progress:.2%}, 状态: {status}")
return jsonify({
'msg': '成功',
'code': 200,
'data': {'name': filename, 'progress': progress}
}), 200
def is_from_missav(url):
try:
parsed = urlparse(url)
hostname = parsed.netloc.lower()
return hostname == 'missav.ws' or hostname.endswith('.missav.ws')
except:
return False
if __name__ == '__main__':
# 检查环境变量是否设置
if not USERNAME or not PASSWORD:
print("警告: 请设置环境变量 USER 和 PASSWORD")
app.run(debug=True, host='0.0.0.0', port=5000)

355
docker/server/download.py Normal file
View File

@ -0,0 +1,355 @@
import os
import time
import threading
import requests
from urllib.parse import urljoin
import m3u8
from Crypto.Cipher import AES
import concurrent.futures
from pathlib import Path
class M3U8Downloader:
def __init__(self, max_workers=5, output_dir="downloads"):
self.max_workers = max_workers
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
# 存储下载任务状态
self.tasks = {}
self.lock = threading.Lock()
self.task_counter = 0
def get_task_info(self, task_id):
"""获取任务信息"""
with self.lock:
return self.tasks.get(task_id, {"status": "not_found"})
def list_tasks(self):
"""列出所有任务"""
with self.lock:
return {task_id: info for task_id, info in self.tasks.items()}
def get_all_tasks(self):
"""
获取全部任务的信息包括文件名和任务ID
Returns:
list: 包含所有任务信息的列表每个元素为字典
[{'task_id': 'task_1', 'filename': 'video1.mp4', 'status': 'downloading', 'progress': 0.56}, ...]
"""
with self.lock:
all_tasks = []
for task_id, task_info in self.tasks.items():
# 计算进度
progress = 0.0
if task_info['status'] == 'preparing':
progress = 0.0
elif task_info['status'] == 'downloading':
if task_info['total_segments'] > 0:
progress = task_info['downloaded_segments'] / task_info['total_segments']
else:
progress = 0.0
elif task_info['status'] == 'merging':
progress = 1.0
elif task_info['status'] == 'completed':
progress = 1.0
elif task_info['status'] == 'failed':
progress = 0.0
all_tasks.append({
'task_id': task_id,
'filename': task_info['output_filename'],
'status': task_info['status'],
'progress': round(progress, 4),
'start_time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(task_info.get('start_time', time.time())))
})
# 按开始时间倒序排列,最新的任务在前面
all_tasks.sort(key=lambda x: x['start_time'], reverse=True)
return all_tasks
def get_tasks_summary(self):
"""
获取任务摘要信息
Returns:
dict: 包含任务统计信息的字典
"""
all_tasks = self.get_all_tasks()
summary = {
'total': len(all_tasks),
'preparing': 0,
'downloading': 0,
'merging': 0,
'completed': 0,
'failed': 0
}
for task in all_tasks:
status = task['status']
if status in summary:
summary[status] += 1
return summary
def download_ts_segment(self, task_info, ts_url, output_path, segment_index):
"""下载单个TS片段"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(ts_url, headers=headers, stream=True, timeout=30)
response.raise_for_status()
ts_data = response.content
# 如果有加密,进行解密
if task_info['key'] and task_info['iv']:
cipher = AES.new(task_info['key'], AES.MODE_CBC, task_info['iv'])
ts_data = cipher.decrypt(ts_data)
with open(output_path, 'wb') as f:
f.write(ts_data)
# 更新进度
with self.lock:
if task_info['task_id'] in self.tasks:
self.tasks[task_info['task_id']]['downloaded_segments'] += 1
return True
except Exception as e:
print(f"下载片段 {segment_index} 失败: {e}")
return False
def get_decryption_key(self, key_uri, iv=None):
"""获取解密密钥"""
try:
response = requests.get(key_uri)
response.raise_for_status()
key = response.content
# 如果IV是十六进制字符串转换为bytes
if iv and isinstance(iv, str):
if iv.startswith('0x'):
iv = bytes.fromhex(iv[2:])
else:
iv = bytes.fromhex(iv)
elif not iv:
iv = b'\x00' * 16 # 默认IV
return key, iv
except Exception as e:
print(f"获取解密密钥失败: {e}")
return None, None
def _download_m3u8(self, m3u8_url, output_filename, task_id):
"""内部下载方法"""
# 初始化任务信息
task_info = {
'task_id': task_id,
'm3u8_url': m3u8_url,
'output_filename': output_filename,
'status': 'preparing',
'total_segments': 0,
'downloaded_segments': 0,
'progress': 0.0,
'output_file': '',
'start_time': time.time(),
'key': None,
'iv': None
}
with self.lock:
self.tasks[task_id] = task_info
try:
# 解析M3U8文件
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(m3u8_url, headers=headers)
response.raise_for_status()
m3u8_content = response.text
m3u8_obj = m3u8.loads(m3u8_content)
# 处理密钥
key = None
iv = None
if m3u8_obj.keys and m3u8_obj.keys[0]:
key_uri = m3u8_obj.keys[0].uri
if not key_uri.startswith('http'):
key_uri = urljoin(m3u8_url, key_uri)
key, iv = self.get_decryption_key(key_uri, m3u8_obj.keys[0].iv)
task_info['key'] = key
task_info['iv'] = iv
# 获取所有TS片段URL
ts_segments = []
for segment in m3u8_obj.segments:
ts_url = segment.uri
if not ts_url.startswith('http'):
ts_url = urljoin(m3u8_url, ts_url)
ts_segments.append(ts_url)
task_info['total_segments'] = len(ts_segments)
task_info['status'] = 'downloading'
# 设置输出文件路径
output_path = self.output_dir / output_filename
task_info['output_file'] = str(output_path)
# 创建临时目录存储TS片段
temp_dir = self.output_dir / f"temp_{task_id}"
temp_dir.mkdir(exist_ok=True)
print(f"开始下载任务 {task_id}: {len(ts_segments)} 个片段")
# 使用线程池下载所有TS片段
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = []
for i, ts_url in enumerate(ts_segments):
ts_path = temp_dir / f"segment_{i:05d}.ts"
future = executor.submit(
self.download_ts_segment,
task_info,
ts_url,
ts_path,
i
)
futures.append(future)
# 等待所有下载完成
results = []
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
# 检查下载结果
if not all(results):
task_info['status'] = 'failed'
task_info['error'] = '部分片段下载失败'
task_info['progress'] = 0.0
print(f"任务 {task_id} 下载失败,部分片段下载失败")
return
# 合并TS文件
print(f"开始合并TS文件...")
task_info['status'] = 'merging'
task_info['progress'] = 1.0
with open(output_path, 'wb') as outfile:
for i in range(len(ts_segments)):
ts_path = temp_dir / f"segment_{i:05d}.ts"
if ts_path.exists():
with open(ts_path, 'rb') as infile:
outfile.write(infile.read())
ts_path.unlink()
# 清理临时目录
temp_dir.rmdir()
task_info['status'] = 'completed'
task_info['progress'] = 1.0
task_info['end_time'] = time.time()
print(f"任务 {task_id} 完成: {output_path}")
except Exception as e:
task_info['status'] = 'failed'
task_info['error'] = str(e)
task_info['progress'] = 0.0
print(f"任务 {task_id} 失败: {e}")
def download(self, output_filename, m3u8_url):
"""
下载M3U8视频
Args:
output_filename: 输出文件名video.mp4
m3u8_url: M3U8文件URL
Returns:
str: 任务ID
"""
with self.lock:
self.task_counter += 1
task_id = f"task_{self.task_counter}"
thread = threading.Thread(
target=self._download_m3u8,
args=(m3u8_url, output_filename, task_id)
)
thread.daemon = True
thread.start()
return task_id
def get_progress(self, task_id):
"""
获取下载进度
Args:
task_id: 任务ID
Returns:
dict: 包含文件名和进度(0~1浮点数)的字典
"""
task_info = self.get_task_info(task_id)
if task_info['status'] == 'not_found':
return {'filename': '', 'progress': 0.0, 'status': 'not_found'}
progress = 0.0
if task_info['status'] == 'preparing':
progress = 0.0
elif task_info['status'] == 'downloading':
if task_info['total_segments'] > 0:
progress = task_info['downloaded_segments'] / task_info['total_segments']
else:
progress = 0.0
elif task_info['status'] == 'merging':
progress = 1.0
elif task_info['status'] == 'completed':
progress = 1.0
elif task_info['status'] == 'failed':
progress = 0.0
return {
'filename': task_info['output_filename'],
'progress': round(progress, 4),
'status': task_info['status'],
'task_id': task_id,
'output_file': task_info.get('output_file', ''),
'downloaded_segments': task_info.get('downloaded_segments', 0),
'total_segments': task_info.get('total_segments', 0)
}
def wait_for_completion(self, task_id, timeout=None):
"""
等待任务完成
Args:
task_id: 任务ID
timeout: 超时时间
Returns:
bool: 是否成功完成
"""
start_time = time.time()
while True:
task_info = self.get_task_info(task_id)
if task_info['status'] == 'completed':
return True
elif task_info['status'] == 'failed':
return False
elif timeout and (time.time() - start_time) > timeout:
return False
time.sleep(1)

57
docker/server/function.py Normal file
View File

@ -0,0 +1,57 @@
from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError
import os
async def crawl_missav(url):
result = {
'title': '',
'url': [],
'serial_number': '' # 新增字段存储番号
}
launch_args = {
"headless": False,
"args": ["--disable-blink-features=AutomationControlled"]
}
# 从环境变量获取代理
env_proxy = os.getenv('PROXY')
if env_proxy:
proxy = env_proxy
launch_args["proxy"] = {"server": proxy}
async with async_playwright() as p:
browser = await p.chromium.launch(**launch_args)
page = await browser.new_page()
page.set_default_timeout(60000)
try:
await page.goto(url, wait_until="domcontentloaded")
result['title'] = await page.title()
videos = await page.query_selector_all("video")
for i, video in enumerate(videos):
src = await video.get_attribute("src")
if (src != None):
result['url'].append(src)
# 新增:查找包含"番号:"的span标签并获取其同级下一个标签的文本
try:
# 查找所有包含"番号:"文本的span标签
spans_with_serial = await page.query_selector_all('span')
for span in spans_with_serial:
span_text = await span.text_content()
if span_text and '番号:' in span_text:
# 获取span的下一个同级元素
next_element = await span.evaluate_handle('element => element.nextElementSibling')
if next_element:
next_element_text = await next_element.text_content()
if next_element_text:
result['serial_number'] = next_element_text.strip()
break # 找到第一个就退出
except Exception as e:
print(f"[INFO] 查找番号时出错: {e}")
except PlaywrightTimeoutError:
print("[ERROR] 页面加载超时,可能被 Cloudflare 拦截")
finally:
await browser.close()
return result

View File

@ -0,0 +1,8 @@
playwright
flask
requests
m3u8
pycryptodome
tqdm
pathlib2
pyjwt

396
download.py Normal file
View File

@ -0,0 +1,396 @@
import os
import time
import threading
import requests
from urllib.parse import urljoin
import m3u8
from Crypto.Cipher import AES
import concurrent.futures
from pathlib import Path
import shutil
class M3U8Downloader:
def __init__(self, max_workers=5, output_dir="downloads", cache_dir="cache"):
self.max_workers = max_workers
self.output_dir = Path(output_dir)
self.cache_dir = Path(cache_dir)
# 创建目录
self.output_dir.mkdir(exist_ok=True)
self.cache_dir.mkdir(exist_ok=True)
# 清空缓存目录
self.clear_cache()
# 存储下载任务状态
self.tasks = {}
self.lock = threading.Lock()
self.task_counter = 0
def clear_cache(self):
"""清空缓存目录"""
try:
if self.cache_dir.exists():
# 删除缓存目录中的所有内容
for item in self.cache_dir.iterdir():
if item.is_file():
item.unlink()
elif item.is_dir():
shutil.rmtree(item)
print(f"缓存目录已清空: {self.cache_dir}")
except Exception as e:
print(f"清空缓存目录失败: {e}")
def get_task_info(self, task_id):
"""获取任务信息"""
with self.lock:
return self.tasks.get(task_id, {"status": "not_found"})
def list_tasks(self):
"""列出所有任务"""
with self.lock:
return {task_id: info for task_id, info in self.tasks.items()}
def get_all_tasks(self):
"""
获取全部任务的信息包括文件名和任务ID
Returns:
list: 包含所有任务信息的列表每个元素为字典
[{'task_id': 'task_1', 'filename': 'video1.mp4', 'status': 'downloading', 'progress': 0.56}, ...]
"""
with self.lock:
all_tasks = []
for task_id, task_info in self.tasks.items():
# 计算进度
progress = 0.0
if task_info['status'] == 'preparing':
progress = 0.0
elif task_info['status'] == 'downloading':
if task_info['total_segments'] > 0:
progress = task_info['downloaded_segments'] / task_info['total_segments']
else:
progress = 0.0
elif task_info['status'] == 'merging':
progress = 1.0
elif task_info['status'] == 'completed':
progress = 1.0
elif task_info['status'] == 'failed':
progress = 0.0
all_tasks.append({
'task_id': task_id,
'filename': task_info['output_filename'],
'status': task_info['status'],
'progress': round(progress, 4),
'start_time': time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(task_info.get('start_time', time.time())))
})
# 按开始时间倒序排列,最新的任务在前面
all_tasks.sort(key=lambda x: x['start_time'], reverse=True)
return all_tasks
def get_tasks_summary(self):
"""
获取任务摘要信息
Returns:
dict: 包含任务统计信息的字典
"""
all_tasks = self.get_all_tasks()
summary = {
'total': len(all_tasks),
'preparing': 0,
'downloading': 0,
'merging': 0,
'completed': 0,
'failed': 0
}
for task in all_tasks:
status = task['status']
if status in summary:
summary[status] += 1
return summary
def download_ts_segment(self, task_info, ts_url, output_path, segment_index):
"""下载单个TS片段"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(ts_url, headers=headers, stream=True, timeout=30)
response.raise_for_status()
ts_data = response.content
# 如果有加密,进行解密
if task_info['key'] and task_info['iv']:
cipher = AES.new(task_info['key'], AES.MODE_CBC, task_info['iv'])
ts_data = cipher.decrypt(ts_data)
# 确保缓存目录存在
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'wb') as f:
f.write(ts_data)
# 更新进度
with self.lock:
if task_info['task_id'] in self.tasks:
self.tasks[task_info['task_id']]['downloaded_segments'] += 1
return True
except Exception as e:
print(f"下载片段 {segment_index} 失败: {e}")
return False
def get_decryption_key(self, key_uri, iv=None):
"""获取解密密钥"""
try:
response = requests.get(key_uri)
response.raise_for_status()
key = response.content
# 如果IV是十六进制字符串转换为bytes
if iv and isinstance(iv, str):
if iv.startswith('0x'):
iv = bytes.fromhex(iv[2:])
else:
iv = bytes.fromhex(iv)
elif not iv:
iv = b'\x00' * 16 # 默认IV
return key, iv
except Exception as e:
print(f"获取解密密钥失败: {e}")
return None, None
def _download_m3u8(self, m3u8_url, output_filename, task_id):
"""内部下载方法"""
# 初始化任务信息
task_info = {
'task_id': task_id,
'm3u8_url': m3u8_url,
'output_filename': output_filename,
'status': 'preparing',
'total_segments': 0,
'downloaded_segments': 0,
'progress': 0.0,
'output_file': '',
'start_time': time.time(),
'key': None,
'iv': None
}
with self.lock:
self.tasks[task_id] = task_info
try:
# 解析M3U8文件
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(m3u8_url, headers=headers)
response.raise_for_status()
m3u8_content = response.text
m3u8_obj = m3u8.loads(m3u8_content)
# 处理密钥
key = None
iv = None
if m3u8_obj.keys and m3u8_obj.keys[0]:
key_uri = m3u8_obj.keys[0].uri
if not key_uri.startswith('http'):
key_uri = urljoin(m3u8_url, key_uri)
key, iv = self.get_decryption_key(key_uri, m3u8_obj.keys[0].iv)
task_info['key'] = key
task_info['iv'] = iv
# 获取所有TS片段URL
ts_segments = []
for segment in m3u8_obj.segments:
ts_url = segment.uri
if not ts_url.startswith('http'):
ts_url = urljoin(m3u8_url, ts_url)
ts_segments.append(ts_url)
task_info['total_segments'] = len(ts_segments)
task_info['status'] = 'downloading'
# 设置输出文件路径(在下载目录中)
output_path = self.output_dir / output_filename
task_info['output_file'] = str(output_path)
# 创建临时目录存储TS片段在缓存目录中
temp_dir = self.cache_dir / f"temp_{task_id}"
temp_dir.mkdir(exist_ok=True)
print(f"开始下载任务 {task_id}: {len(ts_segments)} 个片段")
print(f"缓存目录: {temp_dir}")
print(f"输出文件: {output_path}")
# 使用线程池下载所有TS片段
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = []
for i, ts_url in enumerate(ts_segments):
ts_path = temp_dir / f"segment_{i:05d}.ts"
future = executor.submit(
self.download_ts_segment,
task_info,
ts_url,
ts_path,
i
)
futures.append(future)
# 等待所有下载完成
results = []
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
# 检查下载结果
if not all(results):
task_info['status'] = 'failed'
task_info['error'] = '部分片段下载失败'
task_info['progress'] = 0.0
print(f"任务 {task_id} 下载失败,部分片段下载失败")
# 清理缓存
if temp_dir.exists():
shutil.rmtree(temp_dir)
return
# 合并TS文件到下载目录
print(f"开始合并TS文件...")
task_info['status'] = 'merging'
task_info['progress'] = 1.0
# 确保输出目录存在
self.output_dir.mkdir(exist_ok=True)
with open(output_path, 'wb') as outfile:
for i in range(len(ts_segments)):
ts_path = temp_dir / f"segment_{i:05d}.ts"
if ts_path.exists():
with open(ts_path, 'rb') as infile:
outfile.write(infile.read())
# 清理缓存目录
if temp_dir.exists():
shutil.rmtree(temp_dir)
task_info['status'] = 'completed'
task_info['progress'] = 1.0
task_info['end_time'] = time.time()
print(f"任务 {task_id} 完成: {output_path}")
except Exception as e:
task_info['status'] = 'failed'
task_info['error'] = str(e)
task_info['progress'] = 0.0
# 清理缓存
temp_dir = self.cache_dir / f"temp_{task_id}"
if temp_dir.exists():
shutil.rmtree(temp_dir)
print(f"任务 {task_id} 失败: {e}")
def download(self, output_filename, m3u8_url):
"""
下载M3U8视频
Args:
output_filename: 输出文件名video.mp4
m3u8_url: M3U8文件URL
Returns:
str: 任务ID
"""
with self.lock:
self.task_counter += 1
task_id = f"task_{self.task_counter}"
thread = threading.Thread(
target=self._download_m3u8,
args=(m3u8_url, output_filename, task_id)
)
thread.daemon = True
thread.start()
return task_id
def get_progress(self, task_id):
"""
获取下载进度
Args:
task_id: 任务ID
Returns:
dict: 包含文件名和进度(0~1浮点数)的字典
"""
task_info = self.get_task_info(task_id)
if task_info['status'] == 'not_found':
return {'filename': '', 'progress': 0.0, 'status': 'not_found'}
progress = 0.0
if task_info['status'] == 'preparing':
progress = 0.0
elif task_info['status'] == 'downloading':
if task_info['total_segments'] > 0:
progress = task_info['downloaded_segments'] / task_info['total_segments']
else:
progress = 0.0
elif task_info['status'] == 'merging':
progress = 1.0
elif task_info['status'] == 'completed':
progress = 1.0
elif task_info['status'] == 'failed':
progress = 0.0
return {
'filename': task_info['output_filename'],
'progress': round(progress, 4),
'status': task_info['status'],
'task_id': task_id,
'output_file': task_info.get('output_file', ''),
'downloaded_segments': task_info.get('downloaded_segments', 0),
'total_segments': task_info.get('total_segments', 0)
}
def wait_for_completion(self, task_id, timeout=None):
"""
等待任务完成
Args:
task_id: 任务ID
timeout: 超时时间
Returns:
bool: 是否成功完成
"""
start_time = time.time()
while True:
task_info = self.get_task_info(task_id)
if task_info['status'] == 'completed':
return True
elif task_info['status'] == 'failed':
return False
elif timeout and (time.time() - start_time) > timeout:
return False
time.sleep(1)

57
function.py Normal file
View File

@ -0,0 +1,57 @@
from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError
import os
async def crawl_missav(url):
result = {
'title': '',
'url': [],
'serial_number': '' # 新增字段存储番号
}
launch_args = {
"headless": False,
"args": ["--disable-blink-features=AutomationControlled"]
}
# 从环境变量获取代理
env_proxy = os.getenv('PROXY')
if env_proxy:
proxy = env_proxy
launch_args["proxy"] = {"server": proxy}
async with async_playwright() as p:
browser = await p.chromium.launch(**launch_args)
page = await browser.new_page()
page.set_default_timeout(60000)
try:
await page.goto(url, wait_until="domcontentloaded")
result['title'] = await page.title()
videos = await page.query_selector_all("video")
for i, video in enumerate(videos):
src = await video.get_attribute("src")
if (src != None):
result['url'].append(src)
# 新增:查找包含"番号:"的span标签并获取其同级下一个标签的文本
try:
# 查找所有包含"番号:"文本的span标签
spans_with_serial = await page.query_selector_all('span')
for span in spans_with_serial:
span_text = await span.text_content()
if span_text and '番号:' in span_text:
# 获取span的下一个同级元素
next_element = await span.evaluate_handle('element => element.nextElementSibling')
if next_element:
next_element_text = await next_element.text_content()
if next_element_text:
result['serial_number'] = next_element_text.strip()
break # 找到第一个就退出
except Exception as e:
print(f"[INFO] 查找番号时出错: {e}")
except PlaywrightTimeoutError:
print("[ERROR] 页面加载超时,可能被 Cloudflare 拦截")
finally:
await browser.close()
return result

8
requirements.txt Normal file
View File

@ -0,0 +1,8 @@
playwright
flask
requests
m3u8
pycryptodome
tqdm
pathlib2
pyjwt