前言

必应使用https://www.halo.run/store/apps/app-KdbUz可以正常推送索引(百度就不管了),但是谷歌不行想批量索引还有点难,于是便自己写了个工具。

功能

  • 自动解析XML格式的站点地图

  • 批量提交URL到Google索引API

  • 内置速率控制和指数退避重试机制

  • 自动记录已提交成功的URL,避免重复提交

  • 详细的日志记录(控制台输出+日志文件)

KEYFILE获取

  1. 开飞机

  2. 进入网站

  3. 搜索index

  4. 选择Web Search Indexing API

  5. 启用或管理,忘了,我现在是管理

  6. 点击凭据找到管理服务账号

  7. 创建服务账号

  8. 然后完成创建,权限看着给吧,不懂就全给(如我一般)

  9. 返回凭据,点击创建的账号

  10. 点击密钥,然后添加键,创建新密钥,选择json

  11. 进入网站

  12. 点击设置,点击用户和权限,点击添加用户

  13. 用户是前面创建的账号,复制粘贴,点击添加

  14. 恭喜你,完成了!!!!

代码

import logging
import time
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any
from xml.etree import ElementTree

import requests
from google.auth.transport.requests import AuthorizedSession
from google.oauth2 import service_account


# 配置项使用类
class Config:
    SITEMAP_URL = "你网站的sitemap URl"
    GOOGLE_JSON_KEYFILE = "步骤2获取的json文件名,记得带后缀3"
    LOG_DIR = Path("logs") # 日志存储地址
    SUBMITTED_URLS_FILE = Path("logs/submitted_urls.txt")  # 存储已提交成功的URL
    API_ENDPOINT = "https://indexing.googleapis.com/v3/urlNotifications:publish" # 不修改
    SCOPES = ['https://www.googleapis.com/auth/indexing'] # 不修改


# 初始化日志系统(单例模式)
def setup_logger() -> logging.Logger:
    log = logging.getLogger("GoogleIndexingTool")
    if log.handlers:  # 防止重复添加handler
        return log

    log.setLevel(logging.INFO)
    Config.LOG_DIR.mkdir(parents=True, exist_ok=True)

    formatter = logging.Formatter(
        "%(asctime)s - %(levelname)s - %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S"
    )

    # 文件处理器(按天滚动)
    file_handler = logging.FileHandler(
        filename=Config.LOG_DIR / f"indexing_{datetime.now().strftime('%Y%m%d')}.log",
        encoding="utf-8"
    )
    file_handler.setFormatter(formatter)

    # 控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)

    log.addHandler(file_handler)
    log.addHandler(console_handler)
    return log


logger = setup_logger()


def load_submitted_urls() -> set[str]:
    """加载已成功提交的URL集合"""
    try:
        if Config.SUBMITTED_URLS_FILE.exists():
            with open(Config.SUBMITTED_URLS_FILE, 'r', encoding='utf-8') as f:
                return {line.strip() for line in f if line.strip()}
        return set()
    except Exception as e:
        logger.error(f"加载已提交URL文件失败: {str(e)}")
        return set()


def save_submitted_urls(urls: set[str]) -> None:
    """保存新提交的URL到文件"""
    try:
        Config.SUBMITTED_URLS_FILE.parent.mkdir(parents=True, exist_ok=True)
        with open(Config.SUBMITTED_URLS_FILE, 'w', encoding='utf-8') as f:
            f.write('\n'.join(urls))
    except Exception as e:
        logger.error(f"保存已提交URL文件失败: {str(e)}")


def validate_config() -> None:
    """验证必要配置文件是否存在"""
    if not Path(Config.GOOGLE_JSON_KEYFILE).exists():
        logger.error("Google服务账号密钥文件不存在")
        raise FileNotFoundError(f"Keyfile {Config.GOOGLE_JSON_KEYFILE} not found")


def parse_sitemap(sitemap_url: str) -> List[str]:
    """解析站点地图并提取所有URL

    Args:
        sitemap_url: 站点地图的URL地址

    Returns:
        包含所有URL的列表,解析失败时返回空列表
    """
    try:
        logger.info(f"开始解析站点地图: {sitemap_url}")
        response = requests.get(sitemap_url, timeout=10)
        response.raise_for_status()

        root = ElementTree.fromstring(response.content)
        ns = {'ns': 'http://www.sitemaps.org/schemas/sitemap/0.9'}
        urls = [url.find('ns:loc', ns).text for url in root.findall('ns:url', ns)]

        logger.info(f"成功解析到 {len(urls)} 个URL")
        return urls
    except requests.RequestException as e:
        logger.error(f"请求站点地图失败: {str(e)}")
    except ElementTree.ParseError as e:
        logger.error(f"解析XML内容失败: {str(e)}")
    except Exception as e:
        logger.error(f"解析站点地图异常: {str(e)}", exc_info=True)
    return []


def create_authenticated_session() -> AuthorizedSession:
    """创建Google认证会话"""
    credentials = service_account.Credentials.from_service_account_file(
        Config.GOOGLE_JSON_KEYFILE,
        scopes=Config.SCOPES
    )
    return AuthorizedSession(credentials)


def submit_single_url(session: AuthorizedSession, url: str) -> Dict[str, Any]:
    """提交单个URL到Google索引(含指数退避重试)"""
    max_retries = 3
    retry_delay = 1  # 初始等待时间(秒)
    for attempt in range(max_retries):
        try:
            payload = {"url": url, "type": "URL_UPDATED"}
            response = session.post(Config.API_ENDPOINT, json=payload)

            # 处理429状态码
            if response.status_code == 429:
                if attempt < max_retries - 1:
                    logger.warning(
                        f"[重试{attempt + 1}/{max_retries}] URL触发速率限制: {url} - "
                        f"等待{retry_delay}秒后重试"
                    )
                    time.sleep(retry_delay)
                    retry_delay *= 2
                    continue
                else:
                    logger.error(f"URL超过最大重试次数: {url}")
                    return {"url": url, "status": 429, "error": "Rate limit exceeded"}

            # 处理其他HTTP错误
            if response.status_code >= 400:
                logger.error(f"API请求异常: {url} - 状态码 {response.status_code}")

            return {
                "url": url,
                "status": response.status_code,
                "response": response.text
            }

        except Exception as e:
            logger.error(f"请求异常: {url} (尝试 {attempt + 1}/{max_retries})", exc_info=True)
            if attempt == max_retries - 1:
                return {"url": url, "error": str(e)}
            time.sleep(retry_delay)
            retry_delay *= 2

    return {"url": url, "error": "Max retries exceeded"}


def google_indexing_api(urls: List[str]) -> List[Dict[str, Any]]:
    """批量提交URL(含速率控制)"""
    try:
        logger.info("初始化Google API认证会话")
        session = create_authenticated_session()
        results = []

        # 加载已成功提交的URL并过滤
        submitted_urls = load_submitted_urls()
        new_urls = [url for url in urls if url not in submitted_urls]
        skipped_count = len(urls) - len(new_urls)

        if skipped_count > 0:
            logger.info(f"跳过 {skipped_count} 个已成功提交的URL")

        # 根据Google官方速率限制设置请求间隔(200次/分钟 ≈ 0.3秒/次)
        REQUEST_INTERVAL = 5  # 秒

        logger.info(f"开始批量提交URL(总数:{len(urls)},间隔:{REQUEST_INTERVAL}秒)")
        for index, url in enumerate(urls, 1):
            start_time = time.time()

            result = submit_single_url(session, url)
            status = result.get('status', 0)

            if 200 <= status < 300:
                logger.info(f"[{index}/{len(urls)}] 提交成功: {url}")
            else:
                logger.error(f"[{index}/{len(urls)}] 提交失败: {url} - 状态码: {status}")

            results.append(result)

            # 如果提交成功则记录URL
            if 200 <= result.get('status', 0) < 300:
                submitted_urls.add(url)

            # 精确控制请求间隔
            elapsed = time.time() - start_time
            if elapsed < REQUEST_INTERVAL:
                time.sleep(REQUEST_INTERVAL - elapsed)

        # 保存新提交的URL
        if submitted_urls:
            save_submitted_urls(submitted_urls)

        return results
    except Exception as e:
        logger.error("API认证失败", exc_info=True)
        return []


def generate_report(results: List[Dict[str, Any]]) -> Dict[str, int]:
    """生成提交结果统计报告"""
    success = sum(1 for r in results if 200 <= r.get('status', 0) < 300)
    return {
        "total": len(results),
        "success": success,
        "failures": len(results) - success
    }


def main() -> None:
    """主程序逻辑"""
    try:
        logger.info("=" * 50)
        logger.info("Google索引提交工具启动")

        validate_config()

        if urls := parse_sitemap(Config.SITEMAP_URL):
            results = google_indexing_api(urls)
            report = generate_report(results)

            logger.info("\n提交结果统计:")
            logger.info(f"总提交数 : {report['total']}")
            logger.info(f"成功数量 : {report['success']}")
            logger.info(f"失败数量 : {report['failures']}")
        else:
            logger.error("未找到有效URL,程序终止")

    except KeyboardInterrupt:
        logger.error("用户中断操作")
    except Exception as e:
        logger.error("程序运行异常", exc_info=True)
    finally:
        logger.info("工具运行结束\n")


if __name__ == "__main__":
    main()

后记

时间紧迫,配图后续更新。