style: update merge-apt-repo.py

This commit is contained in:
wcbing 2025-10-09 17:35:51 +08:00
parent a0b6e71f9a
commit dcf20c7d73

View File

@ -13,27 +13,36 @@ import sys
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
import apt_pkg
from apt_pkg import version_compare
apt_pkg.init() # 初始化 apt_pkg
package_version = {arch: {} for arch in ["all", "amd64", "i386", "arm64"]}
package_info = {arch: {} for arch in ["all", "amd64", "i386", "arm64"]}
lock = {arch: Lock() for arch in ["all", "amd64", "i386", "arm64"]}
USER_AGENT = "Debian APT-HTTP/1.3 (2.6.1)" # from Debian 12
"""
repo info json format:
"repo_name": {
"repo": repo url, end with "/"
"xxx_path": {
"arch": repo Packages file path of "arch", start with no "/"
USER_AGENT = "Debian APT-HTTP/1.3 (3.0.3)" # from Debian 13
arch_List = ["all", "amd64", "arm64", "i386"]
lock = {arch: Lock() for arch in arch_List}
packages = {arch: {} for arch in arch_List} # 存放用于生成 Packages 的内容
""" packages format:
{
"arch": {
"package1": {
"version": "1.0.0",
"package": ""
}
}
}
"""
def read_repo_list(repo_list_file: str) -> dict:
"""
repo info json format:
"repo_name": {
"repo": repo url, end with "/" is better
"path": {
"arch": repo Packages file path of "arch", don't start with "/"
}
}
"""
try:
with open(repo_list_file, "r") as f:
return json.load(f)
@ -46,7 +55,7 @@ def get_remote_packages(repo_url: str, file_path: str) -> bytes:
"""
get the packages file content from remote repo
"""
file_url = repo_url + file_path
file_url = os.path.join(repo_url, file_path)
try:
response = requests.get(
file_url, timeout=10, headers={"User-Agent": USER_AGENT}
@ -77,28 +86,33 @@ def get_remote_packages(repo_url: str, file_path: str) -> bytes:
return b""
def get_latest(deb_packages: bytes):
def split_latest(packages_file_content: bytes):
"""
split the information of each packet, deduplication and store the latest in infoList
将每个包的信息分割开去重并将最新的存放到 infoList
"""
deb_packages = re.sub(rb"^Package: ", b"{{start}}Package: ", deb_packages, flags=re.MULTILINE)
info_list = deb_packages.split(b"{{start}}")[1:]
packages_file_content = re.sub(
rb"^Package: ", b"{{start}}Package: ", packages_file_content, flags=re.MULTILINE
)
package_list = packages_file_content.split(b"{{start}}")[1:]
find_name = re.compile(rb"Package: (.+)")
find_arch = re.compile(rb"Architecture: (.+)")
find_version = re.compile(rb"Version: (.+)")
for v in info_list:
for package in package_list:
name = "unknown"
try:
name = find_name.search(v).group(1).decode()
arch = find_arch.search(v).group(1).decode()
tmp_version = find_version.search(v).group(1).decode()
name = find_name.search(package).group(1).decode()
arch = find_arch.search(package).group(1).decode()
tmp_version = find_version.search(package).group(1).decode()
with lock[arch]:
# 使用 apt_pkg 进行版本比较
if name not in package_version[arch] or apt_pkg.version_compare(tmp_version, package_version[arch][name]) > 0:
package_version[arch][name] = tmp_version
package_info[arch][name] = v
if (
name not in packages[arch]
or version_compare(tmp_version, packages[arch][name]["version"]) > 0
):
packages[arch][name] = {"package": package, "version": tmp_version}
except Exception as e:
logging.error(f"Error processing package {name}: {e}")
return
@ -110,7 +124,7 @@ def process_repo(r: dict):
"""
try:
for path in r["path"].values():
get_latest(get_remote_packages(r["repo"], path))
split_latest(get_remote_packages(r["repo"], path))
except Exception as e:
logging.error(f"Error processing repo {r.get('name', 'unknown')}: {e}")
@ -136,7 +150,7 @@ if __name__ == "__main__":
# 处理本地 repo
if args.local:
with open(args.local) as f:
get_latest(f.read().encode())
split_latest(f.read().encode())
# 读取 repo_list 配置
repo_list = read_repo_list(args.repo)
@ -151,7 +165,7 @@ if __name__ == "__main__":
for arch in ["amd64", "arm64"]:
os.makedirs(f"deb/dists/wcbing/main/binary-{arch}/", exist_ok=True)
with open(f"deb/dists/wcbing/main/binary-{arch}/Packages", "+wb") as f:
for i in package_info[arch].values():
f.write(i)
for i in package_info["all"].values():
f.write(i)
for i in packages[arch].values():
f.write(i["package"])
for i in packages["all"].values():
f.write(i["package"])