mirror of
https://github.com/wcbing/wcbing-apt-repo.git
synced 2025-12-28 18:18:31 +08:00
158 lines
5.0 KiB
Python
Executable File
158 lines
5.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import gzip
|
|
import io
|
|
import json
|
|
import logging
|
|
import lzma
|
|
import os
|
|
import re
|
|
import requests
|
|
import sys
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from threading import Lock
|
|
import apt_pkg
|
|
|
|
apt_pkg.init() # 初始化 apt_pkg
|
|
|
|
package_version = {arch: {} for arch in ["all", "amd64", "i386", "arm64"]}
|
|
package_info = {arch: {} for arch in ["all", "amd64", "i386", "arm64"]}
|
|
lock = {arch: Lock() for arch in ["all", "amd64", "i386", "arm64"]}
|
|
|
|
USER_AGENT = "Debian APT-HTTP/1.3 (2.6.1)" # from Debian 12
|
|
|
|
"""
|
|
repo info json format:
|
|
"repo_name": {
|
|
"repo": repo url, end with "/"
|
|
"xxx_path": {
|
|
"arch": repo Packages file path of "arch", start with no "/"
|
|
}
|
|
}
|
|
"""
|
|
|
|
|
|
def read_repo_list(repo_list_file: str) -> dict:
|
|
try:
|
|
with open(repo_list_file, "r") as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
logging.error(f"Error reading repo list: {e}")
|
|
return {}
|
|
|
|
|
|
def get_remote_packages(repo_url: str, file_path: str) -> bytes:
|
|
"""
|
|
get the packages file content from remote repo
|
|
"""
|
|
file_url = repo_url + file_path
|
|
try:
|
|
response = requests.get(
|
|
file_url, timeout=10, headers={"User-Agent": USER_AGENT}
|
|
)
|
|
if response.status_code != 200:
|
|
logging.error(
|
|
f"GetError: {file_url} returned status {response.status_code}"
|
|
)
|
|
return b""
|
|
|
|
content = b""
|
|
if file_url.endswith(".gz"): # Packages.gz
|
|
with gzip.GzipFile(fileobj=io.BytesIO(response.content)) as f:
|
|
content = f.read()
|
|
elif file_url.endswith(".xz"): # Packages.xz
|
|
with lzma.LZMAFile(io.BytesIO(response.content)) as f:
|
|
content = f.read()
|
|
else: # Packages
|
|
content = response.content
|
|
|
|
# complete the two newlines if the ending is less than two newlines
|
|
# 结尾不足两个换行符的话,补全两个换行符
|
|
if not content.endswith(b"\n\n"):
|
|
content += b"\n"
|
|
return content.replace(b"Filename: ", f"Filename: {repo_url}".encode())
|
|
except Exception as e:
|
|
logging.error(f"Error fetching packages: {e}")
|
|
return b""
|
|
|
|
|
|
def get_latest(deb_packages: bytes):
|
|
"""
|
|
split the information of each packet, deduplication and store the latest in infoList
|
|
将每个包的信息分割开,去重并将最新的存放到 infoList 中
|
|
"""
|
|
deb_packages = re.sub(rb"^Package: ", b"{{start}}Package: ", deb_packages, flags=re.MULTILINE)
|
|
info_list = deb_packages.split(b"{{start}}")[1:]
|
|
|
|
find_name = re.compile(rb"Package: (.+)")
|
|
find_arch = re.compile(rb"Architecture: (.+)")
|
|
find_version = re.compile(rb"Version: (.+)")
|
|
|
|
for v in info_list:
|
|
try:
|
|
name = find_name.search(v).group(1).decode()
|
|
arch = find_arch.search(v).group(1).decode()
|
|
tmp_version = find_version.search(v).group(1).decode()
|
|
with lock[arch]:
|
|
# 使用 apt_pkg 进行版本比较
|
|
if name not in package_version[arch] or apt_pkg.version_compare(tmp_version, package_version[arch][name]) > 0:
|
|
package_version[arch][name] = tmp_version
|
|
package_info[arch][name] = v
|
|
except Exception as e:
|
|
logging.error(f"Error processing package {name}: {e}")
|
|
return
|
|
|
|
|
|
def process_repo(r: dict):
|
|
"""
|
|
获取仓库中不同架构子仓库的内容,最后调用 get_latest 去重并保存。
|
|
"""
|
|
try:
|
|
for path in r["path"].values():
|
|
get_latest(get_remote_packages(r["repo"], path))
|
|
except Exception as e:
|
|
logging.error(f"Error processing repo {r.get('name', 'unknown')}: {e}")
|
|
|
|
|
|
def parse_arguments():
|
|
parser = argparse.ArgumentParser(
|
|
description="A script to merge the latest versions Packages files"
|
|
)
|
|
parser.add_argument(
|
|
"-r",
|
|
"--repo",
|
|
type=str,
|
|
default="data/repo_list.json",
|
|
help="Path to the repository list file. Default is 'data/repo_list.json'.",
|
|
)
|
|
parser.add_argument("--local", type=str, help="Process Packages in local repo")
|
|
return parser.parse_args()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = parse_arguments()
|
|
|
|
# 处理本地 repo
|
|
if args.local:
|
|
with open(args.local) as f:
|
|
get_latest(f.read().encode())
|
|
|
|
# 读取 repo_list 配置
|
|
repo_list = read_repo_list(args.repo)
|
|
if not repo_list:
|
|
sys.exit()
|
|
|
|
# 多线程,同时限制最大线程数
|
|
with ThreadPoolExecutor(max_workers=10) as executor:
|
|
executor.map(process_repo, repo_list.values())
|
|
|
|
# 分别输出到不同文件
|
|
for arch in ["amd64", "arm64"]:
|
|
os.makedirs(f"deb/dists/wcbing/main/binary-{arch}/", exist_ok=True)
|
|
with open(f"deb/dists/wcbing/main/binary-{arch}/Packages", "+wb") as f:
|
|
for i in package_info[arch].values():
|
|
f.write(i)
|
|
for i in package_info["all"].values():
|
|
f.write(i)
|