🎨 Format Python code with Black
This commit is contained in:
committed by
github-actions[bot]
parent
318a1f6a93
commit
1e969d51a9
@ -21,10 +21,20 @@ import github
|
||||
# add apps/tools to sys.path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from rest_api import GithubAPI, GitlabAPI, GiteaForgejoAPI, RefType # noqa: E402,E501 pylint: disable=import-error,wrong-import-position
|
||||
from rest_api import (
|
||||
GithubAPI,
|
||||
GitlabAPI,
|
||||
GiteaForgejoAPI,
|
||||
RefType,
|
||||
) # noqa: E402,E501 pylint: disable=import-error,wrong-import-position
|
||||
import appslib.logging_sender # noqa: E402 pylint: disable=import-error,wrong-import-position
|
||||
from appslib.utils import REPO_APPS_ROOT, get_catalog # noqa: E402 pylint: disable=import-error,wrong-import-position
|
||||
from app_caches import app_cache_folder # noqa: E402 pylint: disable=import-error,wrong-import-position
|
||||
from appslib.utils import (
|
||||
REPO_APPS_ROOT,
|
||||
get_catalog,
|
||||
) # noqa: E402 pylint: disable=import-error,wrong-import-position
|
||||
from app_caches import (
|
||||
app_cache_folder,
|
||||
) # noqa: E402 pylint: disable=import-error,wrong-import-position
|
||||
|
||||
|
||||
STRATEGIES = [
|
||||
@ -44,11 +54,30 @@ STRATEGIES = [
|
||||
|
||||
|
||||
@cache
|
||||
def get_github() -> tuple[Optional[tuple[str, str]], Optional[github.Github], Optional[github.InputGitAuthor]]:
|
||||
def get_github() -> tuple[
|
||||
Optional[tuple[str, str]],
|
||||
Optional[github.Github],
|
||||
Optional[github.InputGitAuthor],
|
||||
]:
|
||||
try:
|
||||
github_login = (REPO_APPS_ROOT / ".github_login").open("r", encoding="utf-8").read().strip()
|
||||
github_token = (REPO_APPS_ROOT / ".github_token").open("r", encoding="utf-8").read().strip()
|
||||
github_email = (REPO_APPS_ROOT / ".github_email").open("r", encoding="utf-8").read().strip()
|
||||
github_login = (
|
||||
(REPO_APPS_ROOT / ".github_login")
|
||||
.open("r", encoding="utf-8")
|
||||
.read()
|
||||
.strip()
|
||||
)
|
||||
github_token = (
|
||||
(REPO_APPS_ROOT / ".github_token")
|
||||
.open("r", encoding="utf-8")
|
||||
.read()
|
||||
.strip()
|
||||
)
|
||||
github_email = (
|
||||
(REPO_APPS_ROOT / ".github_email")
|
||||
.open("r", encoding="utf-8")
|
||||
.read()
|
||||
.strip()
|
||||
)
|
||||
|
||||
auth = (github_login, github_token)
|
||||
github_api = github.Github(github_token)
|
||||
@ -96,7 +125,9 @@ class LocalOrRemoteRepo:
|
||||
if not self.manifest_path.exists():
|
||||
raise RuntimeError(f"{app.name}: manifest.toml doesnt exists?")
|
||||
# app is in fact a path
|
||||
self.manifest_raw = (app / "manifest.toml").open("r", encoding="utf-8").read()
|
||||
self.manifest_raw = (
|
||||
(app / "manifest.toml").open("r", encoding="utf-8").read()
|
||||
)
|
||||
|
||||
elif isinstance(app, str):
|
||||
# It's remote
|
||||
@ -187,7 +218,9 @@ class AppAutoUpdater:
|
||||
|
||||
self.main_upstream = self.manifest.get("upstream", {}).get("code")
|
||||
|
||||
def run(self, edit: bool = False, commit: bool = False, pr: bool = False) -> tuple[State, str, str, str]:
|
||||
def run(
|
||||
self, edit: bool = False, commit: bool = False, pr: bool = False
|
||||
) -> tuple[State, str, str, str]:
|
||||
state = State.up_to_date
|
||||
main_version = ""
|
||||
pr_url = ""
|
||||
@ -212,7 +245,11 @@ class AppAutoUpdater:
|
||||
commit_msg += f"\n{msg}"
|
||||
|
||||
self.repo.manifest_raw = self.replace_version_and_asset_in_manifest(
|
||||
self.repo.manifest_raw, version, assets, infos, is_main=source == "main",
|
||||
self.repo.manifest_raw,
|
||||
version,
|
||||
assets,
|
||||
infos,
|
||||
is_main=source == "main",
|
||||
)
|
||||
|
||||
if state == State.up_to_date:
|
||||
@ -246,7 +283,9 @@ class AppAutoUpdater:
|
||||
return (state, self.current_version, main_version, pr_url)
|
||||
|
||||
@staticmethod
|
||||
def relevant_versions(tags: list[str], app_id: str, version_regex: Optional[str]) -> tuple[str, str]:
|
||||
def relevant_versions(
|
||||
tags: list[str], app_id: str, version_regex: Optional[str]
|
||||
) -> tuple[str, str]:
|
||||
|
||||
def apply_version_regex(tag: str) -> Optional[str]:
|
||||
# First preprocessing according to the manifest version_regex…
|
||||
@ -255,7 +294,9 @@ class AppAutoUpdater:
|
||||
if match is None:
|
||||
return None
|
||||
# Basically: either groupdict if named capture gorups, sorted by names, or groups()
|
||||
tag = ".".join(dict(sorted(match.groupdict().items())).values() or match.groups())
|
||||
tag = ".".join(
|
||||
dict(sorted(match.groupdict().items())).values() or match.groups()
|
||||
)
|
||||
|
||||
# Then remove leading v
|
||||
tag = tag.lstrip("v")
|
||||
@ -264,7 +305,9 @@ class AppAutoUpdater:
|
||||
def version_numbers(tag: str) -> Optional[tuple[int, ...]]:
|
||||
filter_keywords = ["start", "rc", "beta", "alpha"]
|
||||
if any(keyword in tag for keyword in filter_keywords):
|
||||
logging.debug(f"Tag {tag} contains filtered keyword from {filter_keywords}.")
|
||||
logging.debug(
|
||||
f"Tag {tag} contains filtered keyword from {filter_keywords}."
|
||||
)
|
||||
return None
|
||||
|
||||
t_to_check = tag
|
||||
@ -302,7 +345,9 @@ class AppAutoUpdater:
|
||||
def tag_to_int_tuple(tag: str) -> tuple[int, ...]:
|
||||
tag = tag.lstrip("v").replace("-", ".").rstrip(".")
|
||||
int_tuple = tag.split(".")
|
||||
assert all(i.isdigit() for i in int_tuple), f"Cant convert {tag} to int tuple :/"
|
||||
assert all(
|
||||
i.isdigit() for i in int_tuple
|
||||
), f"Cant convert {tag} to int tuple :/"
|
||||
return tuple(int(i) for i in int_tuple)
|
||||
|
||||
@staticmethod
|
||||
@ -317,8 +362,9 @@ class AppAutoUpdater:
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to compute sha256 for {url} : {e}") from e
|
||||
|
||||
def get_source_update(self, name: str, infos: dict[str, Any]
|
||||
) -> Optional[tuple[str, Union[str, dict[str, str]], str]]:
|
||||
def get_source_update(
|
||||
self, name: str, infos: dict[str, Any]
|
||||
) -> Optional[tuple[str, Union[str, dict[str, str]], str]]:
|
||||
autoupdate = infos.get("autoupdate")
|
||||
if autoupdate is None:
|
||||
return None
|
||||
@ -327,7 +373,9 @@ class AppAutoUpdater:
|
||||
asset = autoupdate.get("asset", "tarball")
|
||||
strategy = autoupdate.get("strategy")
|
||||
if strategy not in STRATEGIES:
|
||||
raise ValueError(f"Unknown update strategy '{strategy}' for '{name}', expected one of {STRATEGIES}")
|
||||
raise ValueError(
|
||||
f"Unknown update strategy '{strategy}' for '{name}', expected one of {STRATEGIES}"
|
||||
)
|
||||
|
||||
result = self.get_latest_version_and_asset(strategy, asset, autoupdate)
|
||||
if result is None:
|
||||
@ -347,14 +395,22 @@ class AppAutoUpdater:
|
||||
print("Up to date")
|
||||
return None
|
||||
try:
|
||||
if self.tag_to_int_tuple(self.current_version) > self.tag_to_int_tuple(new_version):
|
||||
print("Up to date (current version appears more recent than newest version found)")
|
||||
if self.tag_to_int_tuple(self.current_version) > self.tag_to_int_tuple(
|
||||
new_version
|
||||
):
|
||||
print(
|
||||
"Up to date (current version appears more recent than newest version found)"
|
||||
)
|
||||
return None
|
||||
except (AssertionError, ValueError):
|
||||
pass
|
||||
|
||||
if isinstance(assets, dict) and isinstance(infos.get("url"), str) or \
|
||||
isinstance(assets, str) and not isinstance(infos.get("url"), str):
|
||||
if (
|
||||
isinstance(assets, dict)
|
||||
and isinstance(infos.get("url"), str)
|
||||
or isinstance(assets, str)
|
||||
and not isinstance(infos.get("url"), str)
|
||||
):
|
||||
raise RuntimeError(
|
||||
"It looks like there's an inconsistency between the old asset list and the new ones... "
|
||||
"One is arch-specific, the other is not... Did you forget to define arch-specific regexes? "
|
||||
@ -364,7 +420,9 @@ class AppAutoUpdater:
|
||||
if isinstance(assets, str) and infos["url"] == assets:
|
||||
print(f"URL for asset {name} is up to date")
|
||||
return None
|
||||
if isinstance(assets, dict) and assets == {k: infos[k]["url"] for k in assets.keys()}:
|
||||
if isinstance(assets, dict) and assets == {
|
||||
k: infos[k]["url"] for k in assets.keys()
|
||||
}:
|
||||
print(f"URLs for asset {name} are up to date")
|
||||
return None
|
||||
print(f"Update needed for {name}")
|
||||
@ -376,21 +434,26 @@ class AppAutoUpdater:
|
||||
name: url for name, url in assets.items() if re.match(regex, name)
|
||||
}
|
||||
if not matching_assets:
|
||||
raise RuntimeError(f"No assets matching regex '{regex}' in {list(assets.keys())}")
|
||||
raise RuntimeError(
|
||||
f"No assets matching regex '{regex}' in {list(assets.keys())}"
|
||||
)
|
||||
if len(matching_assets) > 1:
|
||||
raise RuntimeError(f"Too many assets matching regex '{regex}': {matching_assets}")
|
||||
raise RuntimeError(
|
||||
f"Too many assets matching regex '{regex}': {matching_assets}"
|
||||
)
|
||||
return next(iter(matching_assets.items()))
|
||||
|
||||
def get_latest_version_and_asset(self, strategy: str, asset: Union[str, dict], autoupdate
|
||||
) -> Optional[tuple[str, Union[str, dict[str, str]], str]]:
|
||||
def get_latest_version_and_asset(
|
||||
self, strategy: str, asset: Union[str, dict], autoupdate
|
||||
) -> Optional[tuple[str, Union[str, dict[str, str]], str]]:
|
||||
upstream = autoupdate.get("upstream", self.main_upstream).strip("/")
|
||||
version_re = autoupdate.get("version_regex", None)
|
||||
_, remote_type, revision_type = strategy.split("_")
|
||||
|
||||
api: Union[GithubAPI, GitlabAPI, GiteaForgejoAPI]
|
||||
if remote_type == "github":
|
||||
assert (
|
||||
upstream and upstream.startswith("https://github.com/")
|
||||
assert upstream and upstream.startswith(
|
||||
"https://github.com/"
|
||||
), f"When using strategy {strategy}, having a defined upstream code repo on github.com is required"
|
||||
api = GithubAPI(upstream, auth=get_github()[0])
|
||||
if remote_type == "gitlab":
|
||||
@ -404,7 +467,9 @@ class AppAutoUpdater:
|
||||
for release in api.releases()
|
||||
if not release["draft"] and not release["prerelease"]
|
||||
}
|
||||
latest_version_orig, latest_version = self.relevant_versions(list(releases.keys()), self.app_id, version_re)
|
||||
latest_version_orig, latest_version = self.relevant_versions(
|
||||
list(releases.keys()), self.app_id, version_re
|
||||
)
|
||||
latest_release = releases[latest_version_orig]
|
||||
latest_assets = {
|
||||
a["name"]: a["browser_download_url"]
|
||||
@ -425,7 +490,9 @@ class AppAutoUpdater:
|
||||
_, url = self.find_matching_asset(latest_assets, asset)
|
||||
return latest_version, url, latest_release_html_url
|
||||
except RuntimeError as e:
|
||||
raise RuntimeError(f"{e}.\nFull release details on {latest_release_html_url}.") from e
|
||||
raise RuntimeError(
|
||||
f"{e}.\nFull release details on {latest_release_html_url}."
|
||||
) from e
|
||||
|
||||
if isinstance(asset, dict):
|
||||
new_assets = {}
|
||||
@ -434,34 +501,50 @@ class AppAutoUpdater:
|
||||
_, url = self.find_matching_asset(latest_assets, asset_regex)
|
||||
new_assets[asset_name] = url
|
||||
except RuntimeError as e:
|
||||
raise RuntimeError(f"{e}.\nFull release details on {latest_release_html_url}.") from e
|
||||
raise RuntimeError(
|
||||
f"{e}.\nFull release details on {latest_release_html_url}."
|
||||
) from e
|
||||
return latest_version, new_assets, latest_release_html_url
|
||||
|
||||
return None
|
||||
|
||||
if revision_type == "tag":
|
||||
if asset != "tarball":
|
||||
raise ValueError("For the latest tag strategies, only asset = 'tarball' is supported")
|
||||
raise ValueError(
|
||||
"For the latest tag strategies, only asset = 'tarball' is supported"
|
||||
)
|
||||
tags = [t["name"] for t in api.tags()]
|
||||
latest_version_orig, latest_version = self.relevant_versions(tags, self.app_id, version_re)
|
||||
latest_version_orig, latest_version = self.relevant_versions(
|
||||
tags, self.app_id, version_re
|
||||
)
|
||||
latest_tarball = api.url_for_ref(latest_version_orig, RefType.tags)
|
||||
return latest_version, latest_tarball, ""
|
||||
|
||||
if revision_type == "commit":
|
||||
if asset != "tarball":
|
||||
raise ValueError("For the latest commit strategies, only asset = 'tarball' is supported")
|
||||
raise ValueError(
|
||||
"For the latest commit strategies, only asset = 'tarball' is supported"
|
||||
)
|
||||
commits = api.commits()
|
||||
latest_commit = commits[0]
|
||||
latest_tarball = api.url_for_ref(latest_commit["sha"], RefType.commits)
|
||||
# Let's have the version as something like "2023.01.23"
|
||||
latest_commit_date = datetime.strptime(latest_commit["commit"]["author"]["date"][:10], "%Y-%m-%d")
|
||||
latest_commit_date = datetime.strptime(
|
||||
latest_commit["commit"]["author"]["date"][:10], "%Y-%m-%d"
|
||||
)
|
||||
version_format = autoupdate.get("force_version", "%Y.%m.%d")
|
||||
latest_version = latest_commit_date.strftime(version_format)
|
||||
return latest_version, latest_tarball, ""
|
||||
return None
|
||||
|
||||
def replace_version_and_asset_in_manifest(self, content: str, new_version: str, new_assets_urls: Union[str, dict],
|
||||
current_assets: dict, is_main: bool):
|
||||
def replace_version_and_asset_in_manifest(
|
||||
self,
|
||||
content: str,
|
||||
new_version: str,
|
||||
new_assets_urls: Union[str, dict],
|
||||
current_assets: dict,
|
||||
is_main: bool,
|
||||
):
|
||||
replacements = []
|
||||
if isinstance(new_assets_urls, str):
|
||||
replacements = [
|
||||
@ -471,16 +554,21 @@ class AppAutoUpdater:
|
||||
if isinstance(new_assets_urls, dict):
|
||||
replacements = [
|
||||
repl
|
||||
for key, url in new_assets_urls.items() for repl in (
|
||||
for key, url in new_assets_urls.items()
|
||||
for repl in (
|
||||
(current_assets[key]["url"], url),
|
||||
(current_assets[key]["sha256"], self.sha256_of_remote_file(url))
|
||||
(current_assets[key]["sha256"], self.sha256_of_remote_file(url)),
|
||||
)
|
||||
]
|
||||
|
||||
if is_main:
|
||||
|
||||
def repl(m: re.Match) -> str:
|
||||
return m.group(1) + new_version + '~ynh1"'
|
||||
content = re.sub(r"(\s*version\s*=\s*[\"\'])([^~\"\']+)(\~ynh\d+[\"\'])", repl, content)
|
||||
|
||||
content = re.sub(
|
||||
r"(\s*version\s*=\s*[\"\'])([^~\"\']+)(\~ynh\d+[\"\'])", repl, content
|
||||
)
|
||||
|
||||
for old, new in replacements:
|
||||
content = content.replace(old, new)
|
||||
@ -538,22 +626,41 @@ def run_autoupdate_for_multiprocessing(data) -> tuple[str, tuple[State, str, str
|
||||
except Exception:
|
||||
log_str = stdoutswitch.reset()
|
||||
import traceback
|
||||
|
||||
t = traceback.format_exc()
|
||||
return (app, (State.failure, log_str, str(t), ""))
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("apps", nargs="*", type=Path,
|
||||
help="If not passed, the script will run on the catalog. Github keys required.")
|
||||
parser.add_argument("--edit", action=argparse.BooleanOptionalAction, default=True,
|
||||
help="Edit the local files")
|
||||
parser.add_argument("--commit", action=argparse.BooleanOptionalAction, default=False,
|
||||
help="Create a commit with the changes")
|
||||
parser.add_argument("--pr", action=argparse.BooleanOptionalAction, default=False,
|
||||
help="Create a pull request with the changes")
|
||||
parser.add_argument(
|
||||
"apps",
|
||||
nargs="*",
|
||||
type=Path,
|
||||
help="If not passed, the script will run on the catalog. Github keys required.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--edit",
|
||||
action=argparse.BooleanOptionalAction,
|
||||
default=True,
|
||||
help="Edit the local files",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--commit",
|
||||
action=argparse.BooleanOptionalAction,
|
||||
default=False,
|
||||
help="Create a commit with the changes",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--pr",
|
||||
action=argparse.BooleanOptionalAction,
|
||||
default=False,
|
||||
help="Create a pull request with the changes",
|
||||
)
|
||||
parser.add_argument("--paste", action="store_true")
|
||||
parser.add_argument("-j", "--processes", type=int, default=multiprocessing.cpu_count())
|
||||
parser.add_argument(
|
||||
"-j", "--processes", type=int, default=multiprocessing.cpu_count()
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
appslib.logging_sender.enable()
|
||||
@ -572,8 +679,10 @@ def main() -> None:
|
||||
apps_failed = {}
|
||||
|
||||
with multiprocessing.Pool(processes=args.processes) as pool:
|
||||
tasks = pool.imap(run_autoupdate_for_multiprocessing,
|
||||
((app, args.edit, args.commit, args.pr) for app in apps))
|
||||
tasks = pool.imap(
|
||||
run_autoupdate_for_multiprocessing,
|
||||
((app, args.edit, args.commit, args.pr) for app in apps),
|
||||
)
|
||||
for app, result in tqdm.tqdm(tasks, total=len(apps), ascii=" ·#"):
|
||||
state, current_version, main_version, pr_url = result
|
||||
if state == State.up_to_date:
|
||||
@ -592,7 +701,9 @@ def main() -> None:
|
||||
matrix_message += f"\n- {len(apps_already)} pending update PRs"
|
||||
for app, info in apps_already.items():
|
||||
paste_message += f"\n- {app}"
|
||||
paste_message += f" ({info[0]} -> {info[1]})" if info[1] else " (app version did not change)"
|
||||
paste_message += (
|
||||
f" ({info[0]} -> {info[1]})" if info[1] else " (app version did not change)"
|
||||
)
|
||||
if info[2]:
|
||||
paste_message += f" see {info[2]}"
|
||||
|
||||
@ -601,7 +712,9 @@ def main() -> None:
|
||||
matrix_message += f"\n- {len(apps_updated)} new apps PRs"
|
||||
for app, info in apps_updated.items():
|
||||
paste_message += f"\n- {app}"
|
||||
paste_message += f" ({info[0]} -> {info[1]})" if info[1] else " (app version did not change)"
|
||||
paste_message += (
|
||||
f" ({info[0]} -> {info[1]})" if info[1] else " (app version did not change)"
|
||||
)
|
||||
if info[2]:
|
||||
paste_message += f" see {info[2]}"
|
||||
|
||||
|
@ -15,11 +15,10 @@ class RefType(Enum):
|
||||
class GithubAPI:
|
||||
def __init__(self, upstream: str, auth: Optional[tuple[str, str]] = None):
|
||||
self.upstream = upstream
|
||||
self.upstream_repo = upstream.replace("https://github.com/", "")\
|
||||
.strip("/")
|
||||
self.upstream_repo = upstream.replace("https://github.com/", "").strip("/")
|
||||
assert (
|
||||
len(self.upstream_repo.split("/")) == 2
|
||||
), f"'{upstream}' doesn't seem to be a github repository ?"
|
||||
len(self.upstream_repo.split("/")) == 2
|
||||
), f"'{upstream}' doesn't seem to be a github repository ?"
|
||||
self.auth = auth
|
||||
|
||||
def internal_api(self, uri: str) -> Any:
|
||||
@ -74,7 +73,12 @@ class GitlabAPI:
|
||||
# Second chance for some buggy gitlab instances...
|
||||
name = self.project_path.split("/")[-1]
|
||||
projects = self.internal_api(f"projects?search={name}")
|
||||
project = next(filter(lambda x: x.get("path_with_namespace") == self.project_path, projects))
|
||||
project = next(
|
||||
filter(
|
||||
lambda x: x.get("path_with_namespace") == self.project_path,
|
||||
projects,
|
||||
)
|
||||
)
|
||||
|
||||
assert isinstance(project, dict)
|
||||
project_id = project.get("id", None)
|
||||
@ -95,13 +99,11 @@ class GitlabAPI:
|
||||
return [
|
||||
{
|
||||
"sha": commit["id"],
|
||||
"commit": {
|
||||
"author": {
|
||||
"date": commit["committed_date"]
|
||||
}
|
||||
}
|
||||
"commit": {"author": {"date": commit["committed_date"]}},
|
||||
}
|
||||
for commit in self.internal_api(f"projects/{self.project_id}/repository/commits")
|
||||
for commit in self.internal_api(
|
||||
f"projects/{self.project_id}/repository/commits"
|
||||
)
|
||||
]
|
||||
|
||||
def releases(self) -> list[dict[str, Any]]:
|
||||
@ -114,16 +116,21 @@ class GitlabAPI:
|
||||
"prerelease": False,
|
||||
"draft": False,
|
||||
"html_url": release["_links"]["self"],
|
||||
"assets": [{
|
||||
"name": asset["name"],
|
||||
"browser_download_url": asset["direct_asset_url"]
|
||||
} for asset in release["assets"]["links"]],
|
||||
}
|
||||
"assets": [
|
||||
{
|
||||
"name": asset["name"],
|
||||
"browser_download_url": asset["direct_asset_url"],
|
||||
}
|
||||
for asset in release["assets"]["links"]
|
||||
],
|
||||
}
|
||||
for source in release["assets"]["sources"]:
|
||||
r["assets"].append({
|
||||
"name": f"source.{source['format']}",
|
||||
"browser_download_url": source['url']
|
||||
})
|
||||
r["assets"].append(
|
||||
{
|
||||
"name": f"source.{source['format']}",
|
||||
"browser_download_url": source["url"],
|
||||
}
|
||||
)
|
||||
retval.append(r)
|
||||
|
||||
return retval
|
||||
|
Reference in New Issue
Block a user