refactor: save packages info to file, change packages format in merge-apt-repo

This commit is contained in:
wcbing 2025-09-25 10:04:18 +08:00
parent fa1a30604d
commit 36493c5a2f
3 changed files with 108 additions and 60 deletions

View File

@ -4,11 +4,13 @@ import os
import sqlite3
import sys
import logging
import re
from threading import Lock
BASE_DIR = "deb"
DEB_BASE_DIR = "deb"
PACKAGES_DIR = "packages"
DB_DIR = "data"
USER_AGENT = "Debian APT-HTTP/1.3 (2.6.1)" # from Debian 12
USER_AGENT = "Debian APT-HTTP/1.3 (3.0.3)" # from Debian 13
version_lock = Lock()
@ -19,28 +21,65 @@ logging.basicConfig(
)
def download(url: str) -> None:
def download(url: str, file_path: str) -> bool:
"""Download file using curl with APT User-Agent."""
file_path = os.path.join(BASE_DIR, url.split("?")[0])
os.makedirs(os.path.dirname(file_path), exist_ok=True)
subprocess.run(["curl", "-H", f"User-Agent: {USER_AGENT}", "-fsLo", file_path, url])
curl_process = subprocess.run(
["curl", "-H", f"User-Agent: {USER_AGENT}", "-fsLo", file_path, url]
)
if curl_process.returncode or not os.path.exists(file_path):
logging.error(f"Failed to download {url}")
return False
return True
def scan(name, arch, url, file_path) -> bool:
scan_process = subprocess.run(
["apt-ftparchive", "packages", file_path], capture_output=True
)
package = scan_process.stdout.decode()
package = re.sub(
r"^(Filename: ).*", f"\\1{url}", package, flags=re.MULTILINE
) # 替换 Filename 开头的行
package_file_path = os.path.join(PACKAGES_DIR, arch, f"{name}.package")
try:
with open(package_file_path, "w") as f:
f.write(package)
return True
except IOError as e:
logging.error(f"Failed to write package file for {name}: {e}")
return False
def check_download(name: str, version: str, url: str, arch: str) -> None:
"""Check and handle package download/update."""
logging.info("%s:%s = %s", name, arch, version)
db_path = os.path.join("data", f"{BASE_DIR}.db")
file_path = os.path.join(DEB_BASE_DIR, arch, f"{name}_{version}_{arch}.deb")
local_version = None
db_path = os.path.join(DB_DIR, f"{DEB_BASE_DIR}.db")
# get local version
with version_lock, sqlite3.connect(db_path) as conn:
res = conn.execute(
f"SELECT version, url FROM '{arch}' WHERE name = ?", (name,)
f"SELECT version FROM '{arch}' WHERE name = ?", (name,)
).fetchone()
if res:
local_version, local_url = res
if local_version != version:
local_version = res[0]
if local_version == version:
return
# download and scan
logging.info(f"Downloading {name}:{arch} ({version})")
os.makedirs(os.path.join(DEB_BASE_DIR, arch), exist_ok=True)
if not download(url, file_path):
return
os.makedirs(os.path.join(PACKAGES_DIR, arch), exist_ok=True)
if not scan(name, arch, url, file_path):
return
if res:
print(f"Update: {name}:{arch} ({local_version} -> {version})")
download(url)
# update database
with version_lock, sqlite3.connect(db_path) as conn:
conn.execute(
@ -49,13 +88,11 @@ def check_download(name: str, version: str, url: str, arch: str) -> None:
)
conn.commit()
# remove old version
if local_url != url: # 防止固定下载链接
old_file_path = os.path.join(BASE_DIR, local_url.split("?")[0])
old_file_path = os.path.join(DEB_BASE_DIR, arch, f"{name}_{local_version}_{arch}.deb")
if os.path.exists(old_file_path):
os.remove(old_file_path)
else:
print(f"AddNew: {name}:{arch} ({version})")
download(url)
# update database
with version_lock, sqlite3.connect(db_path) as conn:
conn.execute(

View File

@ -13,27 +13,36 @@ import sys
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
import apt_pkg
from apt_pkg import version_compare
apt_pkg.init() # 初始化 apt_pkg
package_version = {arch: {} for arch in ["all", "amd64", "i386", "arm64"]}
package_info = {arch: {} for arch in ["all", "amd64", "i386", "arm64"]}
lock = {arch: Lock() for arch in ["all", "amd64", "i386", "arm64"]}
arch_List = ["amd64", "arm64", "all", "i386"]
packages = {arch: {} for arch in arch_List} # 存放用于生成 Packages 的内容
""" packages format:
{
"arch": {
"package1": {
"version": "1.0.0",
"package": ""
}
}
}
"""
lock = {arch: Lock() for arch in arch_List}
USER_AGENT = "Debian APT-HTTP/1.3 (2.6.1)" # from Debian 12
USER_AGENT = "Debian APT-HTTP/1.3 (3.0.3)" # from Debian 13
def read_repo_list(repo_list_file: str) -> dict:
"""
repo info json format:
"repo_name": {
"repo": repo url, end with "/"
"xxx_path": {
"arch": repo Packages file path of "arch", start with no "/"
"repo": repo url, end with "/" is better
"path": {
"arch": repo Packages file path of "arch", don't start with "/"
}
}
"""
def read_repo_list(repo_list_file: str) -> dict:
try:
with open(repo_list_file, "r") as f:
return json.load(f)
@ -46,7 +55,7 @@ def get_remote_packages(repo_url: str, file_path: str) -> bytes:
"""
get the packages file content from remote repo
"""
file_url = repo_url + file_path
file_url = os.path.join(repo_url, file_path)
try:
response = requests.get(
file_url, timeout=10, headers={"User-Agent": USER_AGENT}
@ -77,28 +86,33 @@ def get_remote_packages(repo_url: str, file_path: str) -> bytes:
return b""
def get_latest(deb_packages: bytes):
def split_latest(packages_file_content: bytes):
"""
split the information of each packet, deduplication and store the latest in infoList
将每个包的信息分割开去重并将最新的存放到 infoList
"""
deb_packages = re.sub(rb"^Package: ", b"{{start}}Package: ", deb_packages, flags=re.MULTILINE)
info_list = deb_packages.split(b"{{start}}")[1:]
packages_file_content = re.sub(
rb"^Package: ", b"{{start}}Package: ", packages_file_content, flags=re.MULTILINE
)
package_list = packages_file_content.split(b"{{start}}")[1:]
find_name = re.compile(rb"Package: (.+)")
find_arch = re.compile(rb"Architecture: (.+)")
find_version = re.compile(rb"Version: (.+)")
for v in info_list:
for package in package_list:
name = "unknown"
try:
name = find_name.search(v).group(1).decode()
arch = find_arch.search(v).group(1).decode()
tmp_version = find_version.search(v).group(1).decode()
name = find_name.search(package).group(1).decode()
arch = find_arch.search(package).group(1).decode()
tmp_version = find_version.search(package).group(1).decode()
with lock[arch]:
# 使用 apt_pkg 进行版本比较
if name not in package_version[arch] or apt_pkg.version_compare(tmp_version, package_version[arch][name]) > 0:
package_version[arch][name] = tmp_version
package_info[arch][name] = v
if (
name not in packages[arch]
or version_compare(tmp_version, packages[arch][name]["version"]) > 0
):
packages[arch][name] = {"package": package, "version": tmp_version}
except Exception as e:
logging.error(f"Error processing package {name}: {e}")
return
@ -110,7 +124,7 @@ def process_repo(r: dict):
"""
try:
for path in r["path"].values():
get_latest(get_remote_packages(r["repo"], path))
split_latest(get_remote_packages(r["repo"], path))
except Exception as e:
logging.error(f"Error processing repo {r.get('name', 'unknown')}: {e}")
@ -136,7 +150,7 @@ if __name__ == "__main__":
# 处理本地 repo
if args.local:
with open(args.local) as f:
get_latest(f.read().encode())
split_latest(f.read().encode())
# 读取 repo_list 配置
repo_list = read_repo_list(args.repo)
@ -151,7 +165,7 @@ if __name__ == "__main__":
for arch in ["amd64", "arm64"]:
os.makedirs(f"deb/dists/wcbing/main/binary-{arch}/", exist_ok=True)
with open(f"deb/dists/wcbing/main/binary-{arch}/Packages", "+wb") as f:
for i in package_info[arch].values():
f.write(i)
for i in package_info["all"].values():
f.write(i)
for i in packages[arch].values():
f.write(i["package"])
for i in packages["all"].values():
f.write(i["package"])

9
run.sh
View File

@ -5,13 +5,10 @@
find get -type f -name "*.sh" -exec sh {} \;
# generate the Packages file
## generate the local Packages file
cd deb
apt-ftparchive packages . > tmpPackages
sed -i "s|\./\(https\?\):/|\1://|g" tmpPackages
## merge the Packages file from local package
cat $(find packages -name "*.package") > deb/tmpPackages
cd ..
# merge the Packages files from third-party repositories
## merge the Packages files from third-party repositories
./merge-apt-repo.py --local deb/tmpPackages
# generate the Release file