前言
之前写了个工具[识别MD文档中的图片链接并下载]来下载处理文档里面的图片,但是还是有点麻烦,于是直接爬吧。
此工具仅个人使用,此处只作分享,不要乱搞。
GITHUB
项目地址:GitHub
注意事项
请遵守目标网站的robots.txt和使用条款
大量下载时请设置合理的超时时间和间隔
自定义前缀命名方式需要指定–prefix参数
暂停功能会暂停下载但不会中断当前正在下载的图片
打包时需要安装tkinterdnd2库
图片下载器
这是一个可以从网页批量下载图片的工具,提供GUI界面和命令行两种使用方式。
功能特点
支持通过ID或class选择器定位网页中的图片
提供多种图片命名方式:原始文件名、UUID、时间戳、自定义前缀
支持暂停/继续下载功能
支持断点续传(记录已下载文件)
可设置超时时间和重试次数
提供下载进度显示
安装要求
Python 3.6+
依赖库:
pip install requests beautifulsoup4 tkinterdnd2 pyinstaller
使用方法
GUI版本
运行GUI界面:
python gui.py
界面参数说明:
目标URL:要下载的网页地址
选择器类型:id或class
选择器值:元素的id或class值
保存目录:图片保存路径(默认为downloaded_images)
命名方式:图片文件名生成规则
超时时间:单张图片下载超时时间(秒)
最大重试次数:下载失败重试次数
命令行版本
基本用法:
python cli.py <url> <selector_value> [options]
选项参数:
--selector-type 选择器类型(id/class),默认为id
--save-dir 保存目录,默认为downloaded_images
--naming 命名方式(original/uuid/timestamp/custom),默认为original
--prefix 自定义前缀(当命名方式为custom时使用)
--timeout 超时时间(秒),默认为15
--retries 重试次数,默认为3
示例:
python cli.py "https://example.com/gallery" "gallery-container" --selector-type class --naming timestamp
打包说明
将GUI版本打包为可执行文件:
pyinstaller gui.spec
打包后程序位于dist/gui目录
代码
UTIL
import os
import requests
import uuid
import time
import random
import string
from bs4 import BeautifulSoup
from urllib.parse import urljoin
# 全局变量
DOWNLOADED_FILES = set()
is_paused = False
stop_download = False
def load_downloaded_files(save_dir):
"""加载已下载的文件列表"""
if os.path.exists(os.path.join(save_dir, "downloaded.txt")):
with open(os.path.join(save_dir, "downloaded.txt"), "r") as f:
return set(line.strip() for line in f)
return set()
def save_downloaded_file(save_dir, filename):
"""保存已下载的文件名"""
with open(os.path.join(save_dir, "downloaded.txt"), "a") as f:
f.write(f"{filename}\n")
def generate_filename(img_url, naming_option="original", custom_prefix=""):
"""生成文件名"""
original_name = os.path.basename(img_url)
name, ext = os.path.splitext(original_name)
if naming_option == "original":
return original_name if original_name else f"image_{hash(img_url)}{ext}"
elif naming_option == "uuid":
return f"{uuid.uuid4()}{ext}"
elif naming_option == "timestamp":
return f"{int(time.time())}{ext}"
elif naming_option == "custom":
random_str = ''.join(random.choices(string.ascii_letters + string.digits, k=6))
return f"{custom_prefix}_{random_str}{ext}"
else:
raise ValueError("Invalid naming option.")
def download_images_from_gallery(
url,
gallery_selector,
selector_type="id",
save_dir=None,
naming_option="original",
custom_prefix="",
timeout=10,
max_retries=3,
progress_callback=None
):
"""下载图片,支持暂停和继续"""
global is_paused, stop_download, DOWNLOADED_FILES
if save_dir is None:
save_dir = "downloaded_images"
os.makedirs(save_dir, exist_ok=True)
DOWNLOADED_FILES = load_downloaded_files(save_dir)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
try:
response = requests.get(url, headers=headers, timeout=timeout)
response.raise_for_status()
except requests.exceptions.RequestException as e:
if progress_callback:
progress_callback(f"无法访问网页: {e}")
return
soup = BeautifulSoup(response.text, 'html.parser')
if selector_type == "id":
gallery = soup.find(id=gallery_selector)
elif selector_type == "class":
gallery = soup.find(class_=gallery_selector)
else:
if progress_callback:
progress_callback("无效的选择器类型,请使用 'id' 或 'class'")
return
if not gallery:
if progress_callback:
progress_callback(f"未找到 {selector_type} 为 '{gallery_selector}' 的")
return
img_tags = gallery.find_all('img')
if not img_tags:
if progress_callback:
progress_callback("中没有找到图片")
return
total_images = len(img_tags)
for idx, img in enumerate(img_tags):
if stop_download:
if progress_callback:
progress_callback("用户取消下载,程序退出。")
return
while is_paused:
if stop_download:
return
time.sleep(0.5)
img_url = img.get('src')
if not img_url:
continue
img_url = urljoin(url, img_url)
try:
img_name = generate_filename(img_url, naming_option, custom_prefix)
except ValueError as e:
if progress_callback:
progress_callback(f"文件名生成失败: {e}")
continue
if img_name in DOWNLOADED_FILES:
if progress_callback:
progress_callback(f"跳过已下载: {img_name}")
continue
for attempt in range(max_retries):
if stop_download:
return
while is_paused:
if stop_download:
return
time.sleep(0.5)
try:
img_data = requests.get(img_url, headers=headers, timeout=timeout).content
img_path = os.path.join(save_dir, img_name)
counter = 1
while os.path.exists(img_path):
name, ext = os.path.splitext(img_name)
img_path = os.path.join(save_dir, f"{name}_{counter}{ext}")
counter += 1
with open(img_path, 'wb') as f:
f.write(img_data)
save_downloaded_file(save_dir, img_name)
if progress_callback:
progress_callback(f"下载成功 ({idx + 1}/{total_images}): {img_path}")
break
except (requests.exceptions.RequestException, KeyboardInterrupt) as e:
if isinstance(e, KeyboardInterrupt):
if progress_callback:
progress_callback("\n用户中断下载,程序退出。")
return
if attempt == max_retries - 1:
if progress_callback:
progress_callback(f"下载失败(重试 {max_retries} 次): {img_url}, 错误: {e}")
else:
if progress_callback:
progress_callback(f"下载失败(第 {attempt + 1} 次重试): {img_url}, 错误: {e}")
time.sleep(1)
GUI
import threading
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
from util import (
download_images_from_gallery
)
# GUI特有的全局变量
pause_event = threading.Event()
class ImageDownloaderApp:
def __init__(self, root):
self.root = root
self.root.title("图片下载器")
self.root.geometry("600x500")
# 全局状态
self.is_paused = False
self.stop_download = False
self.download_thread = None
# 输入框和标签
tk.Label(root, text="目标 URL:").grid(row=0, column=0, padx=5, pady=5, sticky="e")
self.url_entry = tk.Entry(root, width=50)
self.url_entry.grid(row=0, column=1, padx=5, pady=5)
tk.Label(root, text="选择器类型:").grid(row=1, column=0, padx=5, pady=5, sticky="e")
self.selector_type = ttk.Combobox(root, values=["id", "class"])
self.selector_type.grid(row=1, column=1, padx=5, pady=5)
self.selector_type.set("id")
tk.Label(root, text="选择器值:").grid(row=2, column=0, padx=5, pady=5, sticky="e")
self.selector_entry = tk.Entry(root, width=50)
self.selector_entry.grid(row=2, column=1, padx=5, pady=5)
tk.Label(root, text="保存目录:").grid(row=3, column=0, padx=5, pady=5, sticky="e")
self.save_dir_entry = tk.Entry(root, width=50)
self.save_dir_entry.grid(row=3, column=1, padx=5, pady=5)
self.browse_button = tk.Button(root, text="浏览", command=self.browse_directory)
self.browse_button.grid(row=3, column=2, padx=5, pady=5)
tk.Label(root, text="命名方式:").grid(row=4, column=0, padx=5, pady=5, sticky="e")
self.naming_option = ttk.Combobox(root, values=["original", "uuid", "timestamp", "custom"])
self.naming_option.grid(row=4, column=1, padx=5, pady=5)
self.naming_option.set("original")
tk.Label(root, text="自定义前缀:").grid(row=5, column=0, padx=5, pady=5, sticky="e")
self.custom_prefix_entry = tk.Entry(root, width=50)
self.custom_prefix_entry.grid(row=5, column=1, padx=5, pady=5)
tk.Label(root, text="超时时间(秒):").grid(row=6, column=0, padx=5, pady=5, sticky="e")
self.timeout_entry = tk.Entry(root, width=10)
self.timeout_entry.grid(row=6, column=1, padx=5, pady=5, sticky="w")
self.timeout_entry.insert(0, "15")
tk.Label(root, text="最大重试次数:").grid(row=7, column=0, padx=5, pady=5, sticky="e")
self.retries_entry = tk.Entry(root, width=10)
self.retries_entry.grid(row=7, column=1, padx=5, pady=5, sticky="w")
self.retries_entry.insert(0, "3")
# 控制按钮
self.download_button = tk.Button(root, text="开始下载", command=self.start_download)
self.download_button.grid(row=8, column=0, pady=10)
self.pause_button = tk.Button(root, text="暂停", command=self.pause_download, state=tk.DISABLED)
self.pause_button.grid(row=8, column=1, pady=10)
self.cancel_button = tk.Button(root, text="取消", command=self.cancel_download, state=tk.DISABLED)
self.cancel_button.grid(row=8, column=2, pady=10)
# 日志输出
self.log_text = tk.Text(root, height=10, width=70)
self.log_text.grid(row=9, column=0, columnspan=3, padx=5, pady=5)
self.log_text.config(state=tk.DISABLED)
def browse_directory(self):
"""选择保存目录"""
directory = filedialog.askdirectory()
if directory:
self.save_dir_entry.delete(0, tk.END)
self.save_dir_entry.insert(0, directory)
def log_message(self, message):
"""在日志框中显示消息"""
self.log_text.config(state=tk.NORMAL)
self.log_text.insert(tk.END, message + "\n")
self.log_text.see(tk.END)
self.log_text.config(state=tk.DISABLED)
def start_download(self):
"""启动下载任务"""
global is_paused, stop_download
is_paused = False
stop_download = False
url = self.url_entry.get()
selector_type = self.selector_type.get()
selector_value = self.selector_entry.get()
save_dir = self.save_dir_entry.get() if self.save_dir_entry.get() else None
naming_option = self.naming_option.get()
custom_prefix = self.custom_prefix_entry.get()
try:
timeout = int(self.timeout_entry.get())
max_retries = int(self.retries_entry.get())
except ValueError:
messagebox.showerror("错误", "超时时间和重试次数必须为整数!")
return
if not url or not selector_value:
messagebox.showerror("错误", "请输入目标 URL 和选择器值!")
return
self.log_message("开始下载...")
self.download_button.config(state=tk.DISABLED)
self.pause_button.config(state=tk.NORMAL)
self.cancel_button.config(state=tk.NORMAL)
# 启动下载线程
self.download_thread = threading.Thread(
target=download_images_from_gallery,
args=(url, selector_value, selector_type, save_dir, naming_option, custom_prefix, timeout, max_retries, self.log_message),
daemon=True
)
self.download_thread.start()
# 检查线程是否完成
self.root.after(100, self.check_thread)
def pause_download(self):
"""暂停或继续下载"""
global is_paused
from util import is_paused
is_paused = not is_paused
if is_paused:
self.pause_button.config(text="继续")
self.log_message("下载已暂停")
else:
self.pause_button.config(text="暂停")
self.log_message("下载已继续")
def cancel_download(self):
"""取消下载"""
global stop_download
stop_download = True
self.download_button.config(state=tk.NORMAL)
self.pause_button.config(state=tk.DISABLED, text="暂停")
self.cancel_button.config(state=tk.DISABLED)
self.log_message("下载已取消")
def check_thread(self):
"""检查下载线程是否完成"""
if self.download_thread.is_alive():
self.root.after(100, self.check_thread)
else:
self.download_button.config(state=tk.NORMAL)
self.pause_button.config(state=tk.DISABLED, text="暂停")
self.cancel_button.config(state=tk.DISABLED)
self.log_message("下载完成!")
if __name__ == "__main__":
root = tk.Tk()
app = ImageDownloaderApp(root)
root.mainloop()
CLI
import argparse
import signal
import sys
# 导入下载图片的函数
from util import (
download_images_from_gallery
)
# 定义一个回调函数,用于在命令行界面显示进度信息
def cli_progress_callback(message):
print(message)
# 定义一个信号处理器,用于响应用户中断下载的请求
def signal_handler(sig, frame):
global stop_download
stop_download = True
print("\n下载已取消")
sys.exit(0)
# 主函数,负责处理命令行参数并启动图片下载过程
def main():
# 设置信号处理器,以便能够响应中断信号
signal.signal(signal.SIGINT, signal_handler)
# 创建命令行参数解析器
parser = argparse.ArgumentParser(description='图片下载器 CLI 版本')
# 添加目标网页URL参数
parser.add_argument('url', help='目标网页URL')
# 添加画廊选择器值参数
parser.add_argument('selector_value', help='画廊选择器值')
# 添加选择器类型参数,默认为id
parser.add_argument('--selector-type', choices=['id', 'class'], default='id',
help='选择器类型 (id 或 class), 默认为 id')
# 添加图片保存目录参数
parser.add_argument('--save-dir', help='图片保存目录, 默认为 downloaded_images')
# 添加文件名命名方式参数,默认为original
parser.add_argument('--naming', choices=['original', 'uuid', 'timestamp', 'custom'],
default='original', help='文件名命名方式, 默认为 original')
# 添加自定义文件名前缀参数,仅在命名方式为custom时使用
parser.add_argument('--prefix', help='自定义文件名前缀 (当命名方式为 custom 时使用)')
# 添加下载超时时间参数,默认为15秒
parser.add_argument('--timeout', type=int, default=15,
help='下载超时时间(秒), 默认为 15')
# 添加下载失败重试次数参数,默认为3次
parser.add_argument('--retries', type=int, default=3,
help='下载失败重试次数, 默认为 3')
# 解析命令行参数
args = parser.parse_args()
# 检查参数有效性:当命名方式为custom且未提供前缀时,显示错误信息并退出
if args.naming == 'custom' and not args.prefix:
print("错误: 当使用 custom 命名方式时必须提供 --prefix 参数")
sys.exit(1)
# 打印下载信息
print(f"开始下载: {args.url}")
print(f"选择器: {args.selector_type}={args.selector_value}")
print(f"保存目录: {args.save_dir or 'downloaded_images'}")
print(f"命名方式: {args.naming}{' (前缀: ' + args.prefix + ')' if args.naming == 'custom' else ''}")
print(f"超时: {args.timeout}秒, 重试: {args.retries}次")
print("按 Ctrl+C 取消下载\n")
# 调用函数执行图片下载
download_images_from_gallery(
url=args.url,
gallery_selector=args.selector_value,
selector_type=args.selector_type,
save_dir=args.save_dir,
naming_option=args.naming,
custom_prefix=args.prefix,
timeout=args.timeout,
max_retries=args.retries,
progress_callback=cli_progress_callback
)
# 下载完成后,显示完成信息
print("\n下载完成!")
# 当脚本直接执行时,调用主函数
if __name__ == "__main__":
main()