mirror of
https://github.com/wcbing/wcbing-apt-repo.git
synced 2025-12-28 18:18:31 +08:00
refactor: save packages info to file, change packages format in merge-apt-repo
This commit is contained in:
parent
a575074c65
commit
029697da14
@ -4,11 +4,13 @@ import os
|
||||
import sqlite3
|
||||
import sys
|
||||
import logging
|
||||
import re
|
||||
from threading import Lock
|
||||
|
||||
BASE_DIR = "deb"
|
||||
DEB_BASE_DIR = "deb"
|
||||
PACKAGES_DIR = "packages"
|
||||
DB_DIR = "data"
|
||||
USER_AGENT = "Debian APT-HTTP/1.3 (2.6.1)" # from Debian 12
|
||||
USER_AGENT = "Debian APT-HTTP/1.3 (3.0.3)" # from Debian 13
|
||||
|
||||
version_lock = Lock()
|
||||
|
||||
@ -19,43 +21,78 @@ logging.basicConfig(
|
||||
)
|
||||
|
||||
|
||||
def download(url: str) -> None:
|
||||
def download(url: str, file_path: str) -> bool:
|
||||
"""Download file using curl with APT User-Agent."""
|
||||
file_path = os.path.join(BASE_DIR, url.split("?")[0])
|
||||
os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
||||
subprocess.run(["curl", "-H", f"User-Agent: {USER_AGENT}", "-fsLo", file_path, url])
|
||||
curl_process = subprocess.run(
|
||||
["curl", "-H", f"User-Agent: {USER_AGENT}", "-fsLo", file_path, url]
|
||||
)
|
||||
if curl_process.returncode or not os.path.exists(file_path):
|
||||
logging.error(f"Failed to download {url}")
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def scan(name, arch, url, file_path) -> bool:
|
||||
scan_process = subprocess.run(
|
||||
["apt-ftparchive", "packages", file_path], capture_output=True
|
||||
)
|
||||
package = scan_process.stdout.decode()
|
||||
package = re.sub(
|
||||
r"^(Filename: ).*", f"\\1{url}", package, flags=re.MULTILINE
|
||||
) # 替换 Filename 开头的行
|
||||
|
||||
package_file_path = os.path.join(PACKAGES_DIR, arch, f"{name}.package")
|
||||
|
||||
try:
|
||||
with open(package_file_path, "w") as f:
|
||||
f.write(package)
|
||||
return True
|
||||
except IOError as e:
|
||||
logging.error(f"Failed to write package file for {name}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def check_download(name: str, version: str, url: str, arch: str) -> None:
|
||||
"""Check and handle package download/update."""
|
||||
logging.info("%s:%s = %s", name, arch, version)
|
||||
|
||||
db_path = os.path.join("data", f"{BASE_DIR}.db")
|
||||
file_path = os.path.join(DEB_BASE_DIR, arch, f"{name}_{version}_{arch}.deb")
|
||||
local_version = None
|
||||
db_path = os.path.join(DB_DIR, f"{DEB_BASE_DIR}.db")
|
||||
# get local version
|
||||
with version_lock, sqlite3.connect(db_path) as conn:
|
||||
res = conn.execute(
|
||||
f"SELECT version, url FROM '{arch}' WHERE name = ?", (name,)
|
||||
f"SELECT version FROM '{arch}' WHERE name = ?", (name,)
|
||||
).fetchone()
|
||||
if res:
|
||||
local_version, local_url = res
|
||||
if local_version != version:
|
||||
print(f"Update: {name}:{arch} ({local_version} -> {version})")
|
||||
download(url)
|
||||
# update database
|
||||
with version_lock, sqlite3.connect(db_path) as conn:
|
||||
conn.execute(
|
||||
f"UPDATE '{arch}' SET version = ?, url = ? WHERE name = ?",
|
||||
(version, url, name),
|
||||
)
|
||||
conn.commit()
|
||||
# remove old version
|
||||
if local_url != url: # 防止固定下载链接
|
||||
old_file_path = os.path.join(BASE_DIR, local_url.split("?")[0])
|
||||
if os.path.exists(old_file_path):
|
||||
os.remove(old_file_path)
|
||||
local_version = res[0]
|
||||
if local_version == version:
|
||||
return
|
||||
|
||||
# download and scan
|
||||
logging.info(f"Downloading {name}:{arch} ({version})")
|
||||
os.makedirs(os.path.join(DEB_BASE_DIR, arch), exist_ok=True)
|
||||
if not download(url, file_path):
|
||||
return
|
||||
os.makedirs(os.path.join(PACKAGES_DIR, arch), exist_ok=True)
|
||||
if not scan(name, arch, url, file_path):
|
||||
return
|
||||
|
||||
if res:
|
||||
print(f"Update: {name}:{arch} ({local_version} -> {version})")
|
||||
# update database
|
||||
with version_lock, sqlite3.connect(db_path) as conn:
|
||||
conn.execute(
|
||||
f"UPDATE '{arch}' SET version = ?, url = ? WHERE name = ?",
|
||||
(version, url, name),
|
||||
)
|
||||
conn.commit()
|
||||
# remove old version
|
||||
old_file_path = os.path.join(DEB_BASE_DIR, arch, f"{name}_{local_version}_{arch}.deb")
|
||||
if os.path.exists(old_file_path):
|
||||
os.remove(old_file_path)
|
||||
else:
|
||||
print(f"AddNew: {name}:{arch} ({version})")
|
||||
download(url)
|
||||
# update database
|
||||
with version_lock, sqlite3.connect(db_path) as conn:
|
||||
conn.execute(
|
||||
|
||||
@ -13,27 +13,36 @@ import sys
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from threading import Lock
|
||||
import apt_pkg
|
||||
from apt_pkg import version_compare
|
||||
|
||||
apt_pkg.init() # 初始化 apt_pkg
|
||||
|
||||
package_version = {arch: {} for arch in ["all", "amd64", "i386", "arm64"]}
|
||||
package_info = {arch: {} for arch in ["all", "amd64", "i386", "arm64"]}
|
||||
lock = {arch: Lock() for arch in ["all", "amd64", "i386", "arm64"]}
|
||||
|
||||
USER_AGENT = "Debian APT-HTTP/1.3 (2.6.1)" # from Debian 12
|
||||
|
||||
"""
|
||||
repo info json format:
|
||||
"repo_name": {
|
||||
"repo": repo url, end with "/"
|
||||
"xxx_path": {
|
||||
"arch": repo Packages file path of "arch", start with no "/"
|
||||
arch_List = ["amd64", "arm64", "all", "i386"]
|
||||
packages = {arch: {} for arch in arch_List} # 存放用于生成 Packages 的内容
|
||||
""" packages format:
|
||||
{
|
||||
"arch": {
|
||||
"package1": {
|
||||
"version": "1.0.0",
|
||||
"package": ""
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
lock = {arch: Lock() for arch in arch_List}
|
||||
|
||||
USER_AGENT = "Debian APT-HTTP/1.3 (3.0.3)" # from Debian 13
|
||||
|
||||
def read_repo_list(repo_list_file: str) -> dict:
|
||||
"""
|
||||
repo info json format:
|
||||
"repo_name": {
|
||||
"repo": repo url, end with "/" is better
|
||||
"path": {
|
||||
"arch": repo Packages file path of "arch", don't start with "/"
|
||||
}
|
||||
}
|
||||
"""
|
||||
try:
|
||||
with open(repo_list_file, "r") as f:
|
||||
return json.load(f)
|
||||
@ -46,7 +55,7 @@ def get_remote_packages(repo_url: str, file_path: str) -> bytes:
|
||||
"""
|
||||
get the packages file content from remote repo
|
||||
"""
|
||||
file_url = repo_url + file_path
|
||||
file_url = os.path.join(repo_url, file_path)
|
||||
try:
|
||||
response = requests.get(
|
||||
file_url, timeout=10, headers={"User-Agent": USER_AGENT}
|
||||
@ -77,28 +86,33 @@ def get_remote_packages(repo_url: str, file_path: str) -> bytes:
|
||||
return b""
|
||||
|
||||
|
||||
def get_latest(deb_packages: bytes):
|
||||
def split_latest(packages_file_content: bytes):
|
||||
"""
|
||||
split the information of each packet, deduplication and store the latest in infoList
|
||||
将每个包的信息分割开,去重并将最新的存放到 infoList 中
|
||||
"""
|
||||
deb_packages = re.sub(rb"^Package: ", b"{{start}}Package: ", deb_packages, flags=re.MULTILINE)
|
||||
info_list = deb_packages.split(b"{{start}}")[1:]
|
||||
packages_file_content = re.sub(
|
||||
rb"^Package: ", b"{{start}}Package: ", packages_file_content, flags=re.MULTILINE
|
||||
)
|
||||
package_list = packages_file_content.split(b"{{start}}")[1:]
|
||||
|
||||
find_name = re.compile(rb"Package: (.+)")
|
||||
find_arch = re.compile(rb"Architecture: (.+)")
|
||||
find_version = re.compile(rb"Version: (.+)")
|
||||
|
||||
for v in info_list:
|
||||
for package in package_list:
|
||||
name = "unknown"
|
||||
try:
|
||||
name = find_name.search(v).group(1).decode()
|
||||
arch = find_arch.search(v).group(1).decode()
|
||||
tmp_version = find_version.search(v).group(1).decode()
|
||||
name = find_name.search(package).group(1).decode()
|
||||
arch = find_arch.search(package).group(1).decode()
|
||||
tmp_version = find_version.search(package).group(1).decode()
|
||||
with lock[arch]:
|
||||
# 使用 apt_pkg 进行版本比较
|
||||
if name not in package_version[arch] or apt_pkg.version_compare(tmp_version, package_version[arch][name]) > 0:
|
||||
package_version[arch][name] = tmp_version
|
||||
package_info[arch][name] = v
|
||||
if (
|
||||
name not in packages[arch]
|
||||
or version_compare(tmp_version, packages[arch][name]["version"]) > 0
|
||||
):
|
||||
packages[arch][name] = {"package": package, "version": tmp_version}
|
||||
except Exception as e:
|
||||
logging.error(f"Error processing package {name}: {e}")
|
||||
return
|
||||
@ -110,7 +124,7 @@ def process_repo(r: dict):
|
||||
"""
|
||||
try:
|
||||
for path in r["path"].values():
|
||||
get_latest(get_remote_packages(r["repo"], path))
|
||||
split_latest(get_remote_packages(r["repo"], path))
|
||||
except Exception as e:
|
||||
logging.error(f"Error processing repo {r.get('name', 'unknown')}: {e}")
|
||||
|
||||
@ -136,7 +150,7 @@ if __name__ == "__main__":
|
||||
# 处理本地 repo
|
||||
if args.local:
|
||||
with open(args.local) as f:
|
||||
get_latest(f.read().encode())
|
||||
split_latest(f.read().encode())
|
||||
|
||||
# 读取 repo_list 配置
|
||||
repo_list = read_repo_list(args.repo)
|
||||
@ -151,7 +165,7 @@ if __name__ == "__main__":
|
||||
for arch in ["amd64", "arm64"]:
|
||||
os.makedirs(f"deb/dists/wcbing/main/binary-{arch}/", exist_ok=True)
|
||||
with open(f"deb/dists/wcbing/main/binary-{arch}/Packages", "+wb") as f:
|
||||
for i in package_info[arch].values():
|
||||
f.write(i)
|
||||
for i in package_info["all"].values():
|
||||
f.write(i)
|
||||
for i in packages[arch].values():
|
||||
f.write(i["package"])
|
||||
for i in packages["all"].values():
|
||||
f.write(i["package"])
|
||||
|
||||
9
run.sh
9
run.sh
@ -5,17 +5,10 @@
|
||||
find get -type f -name "*.sh" -exec sh {} \;
|
||||
|
||||
# generate the Packages file
|
||||
## generate the local Packages file
|
||||
cd deb
|
||||
apt-ftparchive packages . > tmpPackages
|
||||
sed -i "s|\./\(https\?\):/|\1://|g" tmpPackages
|
||||
|
||||
cd ..
|
||||
|
||||
## merge the Packages file from local package
|
||||
cat $(find packages -name "*.package") >> deb/tmpPackages
|
||||
|
||||
# merge the Packages files from third-party repositories
|
||||
## merge the Packages files from third-party repositories
|
||||
./merge-apt-repo.py --local deb/tmpPackages
|
||||
|
||||
# generate the Release file
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user