expand: mirror, auto-pagination beyond 50, retry-on-5xx, branch protection rules
* `fj repo mirror <url>` does a full migrate (pull-mirror with `--mirror`, one-shot import otherwise). `fj repo mirror-sync` triggers a refresh on an existing pull-mirror. * List endpoints transparently follow `Link: rel=next` when the caller asks for more than the 50-per-page API cap. So `fj pr list -L 200` or `fj issue list -L 100` now Just Works without manual paging. * HTTP client retries idempotent requests (GET/HEAD/OPTIONS/PUT/DELETE) up to 3 times with exponential backoff (200/400/800 ms) on transport errors and 5xx responses. POST and PATCH are never retried. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
35d88bb370
commit
21311b6340
|
|
@ -88,8 +88,6 @@ pub async fn list(
|
||||||
let path = format!("/api/v1/repos/{owner}/{name}/issues");
|
let path = format!("/api/v1/repos/{owner}/{name}/issues");
|
||||||
let mut query: Vec<(String, String)> = vec![
|
let mut query: Vec<(String, String)> = vec![
|
||||||
("state".into(), opts.state.as_str().into()),
|
("state".into(), opts.state.as_str().into()),
|
||||||
("limit".into(), opts.limit.clamp(1, 50).to_string()),
|
|
||||||
("page".into(), opts.page.max(1).to_string()),
|
|
||||||
("type".into(), "issues".into()),
|
("type".into(), "issues".into()),
|
||||||
];
|
];
|
||||||
if let Some(l) = opts.labels {
|
if let Some(l) = opts.labels {
|
||||||
|
|
@ -101,6 +99,14 @@ pub async fn list(
|
||||||
if let Some(q) = opts.query {
|
if let Some(q) = opts.query {
|
||||||
query.push(("q".into(), q.into()));
|
query.push(("q".into(), q.into()));
|
||||||
}
|
}
|
||||||
|
if opts.limit > 50 {
|
||||||
|
let items = client
|
||||||
|
.get_all::<Issue>(&path, &query, opts.limit as usize)
|
||||||
|
.await?;
|
||||||
|
return Ok(Page::single(items));
|
||||||
|
}
|
||||||
|
query.push(("limit".into(), opts.limit.clamp(1, 50).to_string()));
|
||||||
|
query.push(("page".into(), opts.page.max(1).to_string()));
|
||||||
client.get_page::<Issue>(&path, &query).await
|
client.get_page::<Issue>(&path, &query).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -119,6 +119,15 @@ pub async fn list(
|
||||||
opts: ListOptions,
|
opts: ListOptions,
|
||||||
) -> Result<Page<Pull>> {
|
) -> Result<Page<Pull>> {
|
||||||
let path = format!("/api/v1/repos/{owner}/{name}/pulls");
|
let path = format!("/api/v1/repos/{owner}/{name}/pulls");
|
||||||
|
// When the caller wants more than the API's per-page cap, transparently
|
||||||
|
// follow Link rel=next.
|
||||||
|
if opts.limit > 50 {
|
||||||
|
let query: Vec<(String, String)> = vec![("state".into(), opts.state.as_str().into())];
|
||||||
|
let items = client
|
||||||
|
.get_all::<Pull>(&path, &query, opts.limit as usize)
|
||||||
|
.await?;
|
||||||
|
return Ok(Page::single(items));
|
||||||
|
}
|
||||||
let query: Vec<(String, String)> = vec![
|
let query: Vec<(String, String)> = vec![
|
||||||
("state".into(), opts.state.as_str().into()),
|
("state".into(), opts.state.as_str().into()),
|
||||||
("limit".into(), opts.limit.clamp(1, 50).to_string()),
|
("limit".into(), opts.limit.clamp(1, 50).to_string()),
|
||||||
|
|
|
||||||
|
|
@ -45,6 +45,12 @@ pub struct ListOptions<'a> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn list_for_user(client: &Client, opts: ListOptions<'_>) -> Result<Page<Repo>> {
|
pub async fn list_for_user(client: &Client, opts: ListOptions<'_>) -> Result<Page<Repo>> {
|
||||||
|
if opts.limit > 50 {
|
||||||
|
let items = client
|
||||||
|
.get_all::<Repo>("/api/v1/user/repos", &[], opts.limit as usize)
|
||||||
|
.await?;
|
||||||
|
return Ok(Page::single(items));
|
||||||
|
}
|
||||||
let limit = opts.limit.clamp(1, 50);
|
let limit = opts.limit.clamp(1, 50);
|
||||||
let page = opts.page.max(1);
|
let page = opts.page.max(1);
|
||||||
let query: Vec<(String, String)> = vec![
|
let query: Vec<(String, String)> = vec![
|
||||||
|
|
@ -254,3 +260,41 @@ pub async fn set_topics(client: &Client, owner: &str, name: &str, topics: &[Stri
|
||||||
res.error_for_status()?;
|
res.error_for_status()?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
pub struct Migrate<'a> {
|
||||||
|
pub clone_addr: &'a str,
|
||||||
|
pub repo_name: &'a str,
|
||||||
|
/// User or org login that will own the migrated repo.
|
||||||
|
pub repo_owner: &'a str,
|
||||||
|
#[serde(default)]
|
||||||
|
pub mirror: bool,
|
||||||
|
#[serde(default)]
|
||||||
|
pub private: bool,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub description: Option<&'a str>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub auth_username: Option<&'a str>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub auth_password: Option<&'a str>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub auth_token: Option<&'a str>,
|
||||||
|
/// Service: "git", "github", "gitea", "gitlab", "gogs", "onedev", ...
|
||||||
|
pub service: &'a str,
|
||||||
|
/// For pull-mirror: how often to refresh from the source.
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub mirror_interval: Option<&'a str>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn migrate(client: &Client, body: &Migrate<'_>) -> Result<Repo> {
|
||||||
|
client
|
||||||
|
.json(Method::POST, "/api/v1/repos/migrate", &[], Some(body))
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn mirror_sync(client: &Client, owner: &str, name: &str) -> Result<()> {
|
||||||
|
let path = format!("/api/v1/repos/{owner}/{name}/mirror-sync");
|
||||||
|
let res = client.request(Method::POST, &path, &[], None).await?;
|
||||||
|
res.error_for_status()?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -45,6 +45,10 @@ pub enum RepoSub {
|
||||||
Branches(BranchesArgs),
|
Branches(BranchesArgs),
|
||||||
/// Manage repo topics (tags).
|
/// Manage repo topics (tags).
|
||||||
Topics(TopicsArgs),
|
Topics(TopicsArgs),
|
||||||
|
/// Migrate or mirror a repo from another git host.
|
||||||
|
Mirror(MirrorArgs),
|
||||||
|
/// Manually trigger a sync on a pull-mirror.
|
||||||
|
MirrorSync(MirrorSyncArgs),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Args)]
|
#[derive(Debug, Args)]
|
||||||
|
|
@ -172,6 +176,42 @@ pub struct TopicsArgs {
|
||||||
pub set: Option<String>,
|
pub set: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Args)]
|
||||||
|
pub struct MirrorArgs {
|
||||||
|
/// Source URL (e.g. `https://github.com/foo/bar.git`).
|
||||||
|
pub source_url: String,
|
||||||
|
/// Destination as `[owner/]name`. Defaults to the source's basename
|
||||||
|
/// under your user account.
|
||||||
|
#[arg(long)]
|
||||||
|
pub dest: Option<String>,
|
||||||
|
/// Migrate as a pull-mirror instead of a one-shot import.
|
||||||
|
#[arg(long)]
|
||||||
|
pub mirror: bool,
|
||||||
|
/// Make the destination repo private.
|
||||||
|
#[arg(long)]
|
||||||
|
pub private: bool,
|
||||||
|
/// Description for the new repo.
|
||||||
|
#[arg(long)]
|
||||||
|
pub description: Option<String>,
|
||||||
|
/// Username for HTTP source-auth.
|
||||||
|
#[arg(long)]
|
||||||
|
pub auth_user: Option<String>,
|
||||||
|
/// Password / token for HTTP source-auth.
|
||||||
|
#[arg(long)]
|
||||||
|
pub auth_pass: Option<String>,
|
||||||
|
/// Service hint: git, github, gitea, gitlab, gogs.
|
||||||
|
#[arg(long, default_value = "git")]
|
||||||
|
pub service: String,
|
||||||
|
/// Refresh interval for pull-mirrors (e.g. `8h`, `1d`).
|
||||||
|
#[arg(long)]
|
||||||
|
pub interval: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Args)]
|
||||||
|
pub struct MirrorSyncArgs {
|
||||||
|
pub repo: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn run(cmd: RepoCmd, host: Option<&str>) -> Result<()> {
|
pub async fn run(cmd: RepoCmd, host: Option<&str>) -> Result<()> {
|
||||||
match cmd.command {
|
match cmd.command {
|
||||||
RepoSub::List(args) => list(args, host).await,
|
RepoSub::List(args) => list(args, host).await,
|
||||||
|
|
@ -187,9 +227,57 @@ pub async fn run(cmd: RepoCmd, host: Option<&str>) -> Result<()> {
|
||||||
RepoSub::Delete(args) => delete(args, host).await,
|
RepoSub::Delete(args) => delete(args, host).await,
|
||||||
RepoSub::Branches(args) => branches(args, host).await,
|
RepoSub::Branches(args) => branches(args, host).await,
|
||||||
RepoSub::Topics(args) => topics(args, host).await,
|
RepoSub::Topics(args) => topics(args, host).await,
|
||||||
|
RepoSub::Mirror(args) => mirror(args, host).await,
|
||||||
|
RepoSub::MirrorSync(args) => mirror_sync(args, host).await,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn mirror(args: MirrorArgs, host: Option<&str>) -> Result<()> {
|
||||||
|
let client = Client::connect(host)?;
|
||||||
|
let me = api::user::current(&client).await?;
|
||||||
|
let (dest_owner, dest_name) = match args.dest.as_deref() {
|
||||||
|
Some(slug) => match slug.split_once('/') {
|
||||||
|
Some((o, n)) => (o.to_string(), n.to_string()),
|
||||||
|
None => (me.login.clone(), slug.to_string()),
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
let basename = std::path::Path::new(&args.source_url)
|
||||||
|
.file_stem()
|
||||||
|
.and_then(|s| s.to_str())
|
||||||
|
.ok_or_else(|| anyhow!("can't infer destination name from {}", args.source_url))?;
|
||||||
|
(me.login.clone(), basename.to_string())
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let body = api::repo::Migrate {
|
||||||
|
clone_addr: &args.source_url,
|
||||||
|
repo_name: &dest_name,
|
||||||
|
repo_owner: &dest_owner,
|
||||||
|
mirror: args.mirror,
|
||||||
|
private: args.private,
|
||||||
|
description: args.description.as_deref(),
|
||||||
|
auth_username: args.auth_user.as_deref(),
|
||||||
|
auth_password: args.auth_pass.as_deref(),
|
||||||
|
auth_token: None,
|
||||||
|
service: &args.service,
|
||||||
|
mirror_interval: args.interval.as_deref(),
|
||||||
|
};
|
||||||
|
let r = api::repo::migrate(&client, &body).await?;
|
||||||
|
println!(
|
||||||
|
"✓ {} {}",
|
||||||
|
if args.mirror { "Mirrored" } else { "Imported" },
|
||||||
|
r.full_name
|
||||||
|
);
|
||||||
|
println!("{}", r.html_url);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn mirror_sync(args: MirrorSyncArgs, host: Option<&str>) -> Result<()> {
|
||||||
|
let ctx = resolve_repo(args.repo.as_deref(), host)?;
|
||||||
|
api::repo::mirror_sync(&ctx.client, &ctx.owner, &ctx.name).await?;
|
||||||
|
println!("✓ Triggered mirror sync for {}/{}", ctx.owner, ctx.name);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn list(args: ListArgs, host: Option<&str>) -> Result<()> {
|
async fn list(args: ListArgs, host: Option<&str>) -> Result<()> {
|
||||||
let client = Client::connect(host)?;
|
let client = Client::connect(host)?;
|
||||||
let opts = api::repo::ListOptions {
|
let opts = api::repo::ListOptions {
|
||||||
|
|
|
||||||
|
|
@ -162,14 +162,6 @@ impl Client {
|
||||||
for (k, v) in extra.iter() {
|
for (k, v) in extra.iter() {
|
||||||
headers.insert(k.clone(), v.clone());
|
headers.insert(k.clone(), v.clone());
|
||||||
}
|
}
|
||||||
let mut req = self
|
|
||||||
.http
|
|
||||||
.request(method.clone(), url.clone())
|
|
||||||
.headers(headers)
|
|
||||||
.query(query);
|
|
||||||
if let Some(body) = body {
|
|
||||||
req = req.json(body);
|
|
||||||
}
|
|
||||||
if debug_enabled() {
|
if debug_enabled() {
|
||||||
let q = if query.is_empty() {
|
let q = if query.is_empty() {
|
||||||
String::new()
|
String::new()
|
||||||
|
|
@ -189,11 +181,56 @@ impl Client {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let res = req.send().await.context("sending HTTP request")?;
|
|
||||||
if debug_enabled() {
|
let retries = if is_idempotent(&method) { 3 } else { 1 };
|
||||||
eprintln!("← {} {}", res.status(), res.url());
|
let mut last_err: Option<anyhow::Error> = None;
|
||||||
|
for attempt in 0..retries {
|
||||||
|
let mut req = self
|
||||||
|
.http
|
||||||
|
.request(method.clone(), url.clone())
|
||||||
|
.headers(headers.clone())
|
||||||
|
.query(query);
|
||||||
|
if let Some(body) = body {
|
||||||
|
req = req.json(body);
|
||||||
}
|
}
|
||||||
Ok(res)
|
match req.send().await {
|
||||||
|
Ok(res) => {
|
||||||
|
let status = res.status();
|
||||||
|
if status.is_server_error() && attempt + 1 < retries {
|
||||||
|
if debug_enabled() {
|
||||||
|
eprintln!(
|
||||||
|
"← {status} {} (retrying after backoff, attempt {}/{})",
|
||||||
|
res.url(),
|
||||||
|
attempt + 1,
|
||||||
|
retries
|
||||||
|
);
|
||||||
|
}
|
||||||
|
backoff(attempt).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if debug_enabled() {
|
||||||
|
eprintln!("← {status} {}", res.url());
|
||||||
|
}
|
||||||
|
return Ok(res);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
if attempt + 1 < retries {
|
||||||
|
if debug_enabled() {
|
||||||
|
eprintln!(
|
||||||
|
"transport error: {e}; retrying ({}/{})",
|
||||||
|
attempt + 1,
|
||||||
|
retries
|
||||||
|
);
|
||||||
|
}
|
||||||
|
backoff(attempt).await;
|
||||||
|
last_err = Some(anyhow::Error::new(e));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
return Err(anyhow::Error::new(e).context("sending HTTP request"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(last_err.unwrap_or_else(|| anyhow!("retries exhausted")))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Issue a request and decode a JSON body, mapping non-2xx to a typed
|
/// Issue a request and decode a JSON body, mapping non-2xx to a typed
|
||||||
|
|
@ -235,6 +272,81 @@ impl Client {
|
||||||
let items: Vec<T> = res.json().await.context("decoding JSON list response")?;
|
let items: Vec<T> = res.json().await.context("decoding JSON list response")?;
|
||||||
Ok(Page::from_headers(items, &headers))
|
Ok(Page::from_headers(items, &headers))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// GET that follows `Link: rel=next` until either `total_limit` items have
|
||||||
|
/// been collected or there are no more pages. Use this when the caller's
|
||||||
|
/// requested limit exceeds Forgejo's per-page cap (50).
|
||||||
|
pub async fn get_all<T: DeserializeOwned>(
|
||||||
|
&self,
|
||||||
|
path: &str,
|
||||||
|
base_query: &[(String, String)],
|
||||||
|
total_limit: usize,
|
||||||
|
) -> Result<Vec<T>> {
|
||||||
|
let mut out: Vec<T> = Vec::new();
|
||||||
|
let mut current_path: String = path.to_string();
|
||||||
|
let mut current_query: Vec<(String, String)> = base_query.to_vec();
|
||||||
|
// Make sure per-page is set to the API's max so we don't make extra
|
||||||
|
// round-trips.
|
||||||
|
if !current_query.iter().any(|(k, _)| k == "limit") {
|
||||||
|
current_query.push(("limit".into(), "50".into()));
|
||||||
|
} else {
|
||||||
|
for (k, v) in current_query.iter_mut() {
|
||||||
|
if k == "limit" {
|
||||||
|
*v = "50".to_string();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let res = self
|
||||||
|
.request(Method::GET, ¤t_path, ¤t_query, None)
|
||||||
|
.await?;
|
||||||
|
let res = ensure_success(res).await?;
|
||||||
|
let headers = res.headers().clone();
|
||||||
|
let mut items: Vec<T> = res.json().await.context("decoding JSON list response")?;
|
||||||
|
if items.is_empty() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let need = total_limit.saturating_sub(out.len());
|
||||||
|
if items.len() > need {
|
||||||
|
items.truncate(need);
|
||||||
|
}
|
||||||
|
out.append(&mut items);
|
||||||
|
if out.len() >= total_limit {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// Find Link: rel=next.
|
||||||
|
let Some(next) = headers
|
||||||
|
.get(reqwest::header::LINK)
|
||||||
|
.and_then(|l| l.to_str().ok())
|
||||||
|
.map(pagination::parse_link_header)
|
||||||
|
.and_then(|links| {
|
||||||
|
links
|
||||||
|
.into_iter()
|
||||||
|
.find_map(|(url, rel)| (rel == "next").then_some(url))
|
||||||
|
})
|
||||||
|
else {
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
let parsed = Url::parse(&next)?;
|
||||||
|
current_path = parsed.path().to_string();
|
||||||
|
current_query = parsed
|
||||||
|
.query_pairs()
|
||||||
|
.map(|(k, v)| (k.into_owned(), v.into_owned()))
|
||||||
|
.collect();
|
||||||
|
}
|
||||||
|
Ok(out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_idempotent(m: &Method) -> bool {
|
||||||
|
matches!(m.as_str(), "GET" | "HEAD" | "OPTIONS" | "PUT" | "DELETE")
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn backoff(attempt: u32) {
|
||||||
|
let base_ms: u64 = 200;
|
||||||
|
let delay = base_ms * (1u64 << attempt);
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn build_base_url(hostname: &str, host: &Host) -> Result<Url> {
|
fn build_base_url(hostname: &str, host: &Host) -> Result<Url> {
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,19 @@ pub struct Page<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Page<T> {
|
impl<T> Page<T> {
|
||||||
|
/// Wrap a fully-collected list (from `get_all`) in a Page with no
|
||||||
|
/// pagination metadata.
|
||||||
|
pub fn single(items: Vec<T>) -> Self {
|
||||||
|
Self {
|
||||||
|
items,
|
||||||
|
next: None,
|
||||||
|
prev: None,
|
||||||
|
last: None,
|
||||||
|
first: None,
|
||||||
|
total: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn from_headers(items: Vec<T>, headers: &HeaderMap) -> Self {
|
pub fn from_headers(items: Vec<T>, headers: &HeaderMap) -> Self {
|
||||||
let mut next = None;
|
let mut next = None;
|
||||||
let mut prev = None;
|
let mut prev = None;
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue