From 995cd0cd6642b50f78e46fa9fdd1522e93045534 Mon Sep 17 00:00:00 2001 From: Ryan Gonzalez Date: Fri, 26 Sep 2025 15:35:01 -0500 Subject: [PATCH 1/4] Don't preserve existing files when performing an upload Preserving `_link`, in particular, is problematic, as that preserves the link to the original package (akin to `keeplink`), which has caused numerous issues, such as: - Changes to the origin can result in build conflicts, which currently result in the runner waiting forever due to the bad state. - Broken links inhibit the ability to get the `xsrcmd5`, which is needed for monitoring the build logs. But keeping files around is of questionable utility regardless, and `osc-plugin-dput` doesn't even do that anymore. For simplicity, just remove this entire feature altogether. --- Cargo.lock | 4 +- src/handler.rs | 6 +-- src/monitor.rs | 4 +- src/prune.rs | 29 ++++++++++--- src/upload.rs | 116 ++++++++++++------------------------------------- 5 files changed, 57 insertions(+), 102 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8528cb5..6971eb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1547,7 +1547,7 @@ checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" [[package]] name = "open-build-service-api" version = "0.1.0" -source = "git+https://github.com/collabora/open-build-service-rs#a069b8e215c062e8cd9ec33e10cdf1fdb398f7fa" +source = "git+https://github.com/collabora/open-build-service-rs#bd9f78ec0a26fcd9c4b23dfc8b8b1db299713fd1" dependencies = [ "base16ct", "bytes", @@ -1565,7 +1565,7 @@ dependencies = [ [[package]] name = "open-build-service-mock" version = "0.1.0" -source = "git+https://github.com/collabora/open-build-service-rs#a069b8e215c062e8cd9ec33e10cdf1fdb398f7fa" +source = "git+https://github.com/collabora/open-build-service-rs#bd9f78ec0a26fcd9c4b23dfc8b8b1db299713fd1" dependencies = [ "base16ct", "base64ct", diff --git a/src/handler.rs b/src/handler.rs index 91205cb..1e7b526 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -1343,11 +1343,7 @@ mod tests { .await ); - if build_info.is_branched { - dir.linkinfo.into_iter().next().unwrap().xsrcmd5 - } else { - dir.srcmd5 - } + dir.srcmd5 } ); diff --git a/src/monitor.rs b/src/monitor.rs index 0180c28..96dc9b0 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -345,7 +345,9 @@ mod tests { TEST_PACKAGE_2.to_owned(), MockBranchOptions { srcmd5: branch_srcmd5.clone(), - xsrcmd5: branch_xsrcmd5.clone(), + link_resolution: MockLinkResolution::Available { + xsrcmd5: branch_xsrcmd5.clone(), + }, ..Default::default() }, ); diff --git a/src/prune.rs b/src/prune.rs index 7d6ed3e..b1d1ca4 100644 --- a/src/prune.rs +++ b/src/prune.rs @@ -5,6 +5,25 @@ use tracing::info; use crate::retry_request; +async fn is_originally_branched( + client: &obs::Client, + project: &str, + package: &str, +) -> Result { + // Replacing the files in a package without setting keeplink will clear the + // linkinfo, so in order to determine if this was once branched, fetch the + // very first revision and check for linkinfo there. + retry_request!( + client + .project(project.to_owned()) + .package(package.to_owned()) + .list(Some("1")) + .await + .wrap_err("Failed to list package @ revision 1") + ) + .map(|first_rev| !first_rev.linkinfo.is_empty()) +} + #[tracing::instrument(skip(client))] pub async fn prune_branch( client: &obs::Client, @@ -14,6 +33,11 @@ pub async fn prune_branch( ) -> Result<()> { // Do a sanity check to make sure this project & package are actually // linked (i.e. we're not going to be nuking the main repository). + ensure!( + is_originally_branched(client, project, package).await?, + "Rejecting attempt to prune a non-branched package" + ); + let dir = retry_request!( client .project(project.to_owned()) @@ -23,11 +47,6 @@ pub async fn prune_branch( .wrap_err("Failed to list package") )?; - ensure!( - !dir.linkinfo.is_empty(), - "Rejecting attempt to prune a non-branched package" - ); - if let Some(expected_rev) = expected_rev { if dir.rev.as_deref() != Some(expected_rev) { info!( diff --git a/src/upload.rs b/src/upload.rs index 3551a58..03d5d44 100644 --- a/src/upload.rs +++ b/src/upload.rs @@ -1,10 +1,8 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; -use bytes::Bytes; use camino::{Utf8Path, Utf8PathBuf}; use color_eyre::eyre::{Context, Result, ensure, eyre}; use derivative::*; -use futures_util::{Stream, TryStreamExt}; use gitlab_runner::outputln; use md5::{Digest, Md5}; use open_build_service_api as obs; @@ -20,19 +18,6 @@ type Md5String = String; const META_NAME: &str = "_meta"; -async fn collect_byte_stream( - stream: impl Stream>, -) -> Result> { - let mut data = vec![]; - stream - .try_for_each(|chunk| { - data.extend_from_slice(&chunk); - futures_util::future::ready(Ok(())) - }) - .await?; - Ok(data) -} - pub fn compute_md5(data: &[u8]) -> String { base16ct::lower::encode_string(&Md5::digest(data)) } @@ -168,42 +153,6 @@ impl ObsDscUploader { } } - #[instrument(skip(self))] - async fn find_files_to_remove( - &self, - files: &HashMap, - ) -> Result> { - let mut to_remove = HashSet::new(); - - for file in files.keys() { - if file.ends_with(".dsc") { - to_remove.insert(file.clone()); - - let contents = retry_request!( - collect_byte_stream( - self.client - .project(self.project.clone()) - .package(self.package.clone()) - .source_file(file) - .await?, - ) - .instrument(info_span!("find_files_to_remove:download", %file)) - .await - )?; - - let _span = info_span!("find_files_to_remove:parse", %file); - let dsc: Dsc = - rfc822_like::from_str(discard_pgp(std::str::from_utf8(&contents[..])?))?; - - to_remove.extend(dsc.files.into_iter().map(|f| f.filename)); - } else if file.ends_with(".changes") { - to_remove.insert(file.clone()); - } - } - - Ok(to_remove) - } - #[instrument(skip(self))] async fn get_latest_meta_md5(&self) -> Result { let dir = retry_request!( @@ -321,11 +270,7 @@ impl ObsDscUploader { let present_files: HashMap<_, _> = dir.entries.into_iter().map(|e| (e.name, e.md5)).collect(); - let mut files_to_commit = present_files.clone(); - - for to_remove in self.find_files_to_remove(&files_to_commit).await? { - files_to_commit.remove(&to_remove); - } + let mut files_to_commit = HashMap::new(); for file in &self.dsc.files { files_to_commit.insert(file.filename.clone(), file.hash.clone()); @@ -366,15 +311,14 @@ impl ObsDscUploader { .await })?; - let build_srcmd5 = if let Some(link) = current_dir.linkinfo.into_iter().next() { - link.xsrcmd5 - } else { - current_dir.srcmd5 - }; + ensure!( + current_dir.linkinfo.is_empty(), + "linkinfo unexpectedly present after upload" + ); Ok(UploadResult { rev, - build_srcmd5, + build_srcmd5: current_dir.srcmd5, unchanged, }) } @@ -758,17 +702,15 @@ d29ybGQgaGVsbG8K let mut dir = assert_ok!(package_1.list(None).await); assert_eq!(dir.rev.as_deref(), Some("3")); - assert_eq!(dir.entries.len(), 5); + assert_eq!(dir.entries.len(), 4); dir.entries.sort_by(|a, b| a.name.cmp(&b.name)); assert_eq!(dir.entries[0].name, "_meta"); - assert_eq!(dir.entries[1].name, already_present_file); - assert_eq!(dir.entries[1].md5, already_present_md5); - assert_eq!(dir.entries[2].name, dsc2_file.file_name().unwrap()); - assert_eq!(dir.entries[2].md5, dsc2_md5); - assert_eq!(dir.entries[3].name, test1_file); - assert_eq!(dir.entries[3].md5, test1_md5_a); - assert_eq!(dir.entries[4].name, test2_file); - assert_eq!(dir.entries[4].md5, test2_md5); + assert_eq!(dir.entries[1].name, dsc2_file.file_name().unwrap()); + assert_eq!(dir.entries[1].md5, dsc2_md5); + assert_eq!(dir.entries[2].name, test1_file); + assert_eq!(dir.entries[2].md5, test1_md5_a); + assert_eq!(dir.entries[3].name, test2_file); + assert_eq!(dir.entries[3].md5, test2_md5); assert_eq!(dir.srcmd5, result.build_srcmd5); @@ -794,17 +736,15 @@ d29ybGQgaGVsbG8K let mut dir = assert_ok!(package_1.list(None).await); assert_eq!(dir.rev.as_deref(), Some("4")); - assert_eq!(dir.entries.len(), 5); + assert_eq!(dir.entries.len(), 4); dir.entries.sort_by(|a, b| a.name.cmp(&b.name)); assert_eq!(dir.entries[0].name, "_meta"); - assert_eq!(dir.entries[1].name, already_present_file); - assert_eq!(dir.entries[1].md5, already_present_md5); - assert_eq!(dir.entries[2].name, dsc3_file.file_name().unwrap()); - assert_eq!(dir.entries[2].md5, dsc3_md5); - assert_eq!(dir.entries[3].name, test1_file); - assert_eq!(dir.entries[3].md5, test1_md5_b); - assert_eq!(dir.entries[4].name, test2_file); - assert_eq!(dir.entries[4].md5, test2_md5); + assert_eq!(dir.entries[1].name, dsc3_file.file_name().unwrap()); + assert_eq!(dir.entries[1].md5, dsc3_md5); + assert_eq!(dir.entries[2].name, test1_file); + assert_eq!(dir.entries[2].md5, test1_md5_b); + assert_eq!(dir.entries[3].name, test2_file); + assert_eq!(dir.entries[3].md5, test2_md5); assert_eq!(dir.srcmd5, result.build_srcmd5); @@ -826,15 +766,13 @@ d29ybGQgaGVsbG8K let mut dir = assert_ok!(package_1.list(None).await); assert_eq!(dir.rev.as_deref(), Some("5")); - assert_eq!(dir.entries.len(), 4); + assert_eq!(dir.entries.len(), 3); dir.entries.sort_by(|a, b| a.name.cmp(&b.name)); assert_eq!(dir.entries[0].name, "_meta"); - assert_eq!(dir.entries[1].name, already_present_file); - assert_eq!(dir.entries[1].md5, already_present_md5); - assert_eq!(dir.entries[2].name, dsc4_file.file_name().unwrap()); - assert_eq!(dir.entries[2].md5, dsc4_md5); - assert_eq!(dir.entries[3].name, test1_file); - assert_eq!(dir.entries[3].md5, test1_md5_b); + assert_eq!(dir.entries[1].name, dsc4_file.file_name().unwrap()); + assert_eq!(dir.entries[1].md5, dsc4_md5); + assert_eq!(dir.entries[2].name, test1_file); + assert_eq!(dir.entries[2].md5, test1_md5_b); // Re-upload with no changes and ensure the old commit is returned. @@ -873,6 +811,6 @@ d29ybGQgaGVsbG8K // XXX: the mock apis don't set the correct rev values on branch yet assert_matches!(dir.rev, Some(_)); - assert_eq!(dir.linkinfo[0].xsrcmd5, result.build_srcmd5); + assert!(dir.linkinfo.is_empty()); } } From ba3a5b3f535cd504c7c54eeeb7506db7ede06b00 Mon Sep 17 00:00:00 2001 From: Ryan Gonzalez Date: Tue, 1 Jul 2025 10:38:27 -0500 Subject: [PATCH 2/4] Move Handler's ArtifactDirectory implementation to a separate type Passing around `self` trips up the borrow checker if we need to get a reference to a field at the same time, and it looks a bit confusing anyway. --- src/handler.rs | 137 +++++++++++++++++++++++++++++++------------------ 1 file changed, 87 insertions(+), 50 deletions(-) diff --git a/src/handler.rs b/src/handler.rs index 1e7b526..897c524 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -228,6 +228,51 @@ pub struct HandlerOptions { pub log_tail: u64, } +struct GitLabArtifacts<'a> { + job: &'a Job, + artifacts: &'a mut HashMap, +} + +#[async_trait] +impl ArtifactDirectory for GitLabArtifacts<'_> { + #[instrument(skip(self, path), path = path.as_ref())] + async fn open(&self, path: impl AsRef + Send) -> Result { + let path = path.as_ref(); + + if let Some(file) = self.artifacts.get(path) { + let file = file + .try_clone() + .await + .wrap_err("Failed to reopen artifact")?; + return Ok(file); + } + + for dep in self.job.dependencies() { + if let Some(file) = check_for_artifact(dep, path).await? { + return Ok(file); + } + } + + Err(MissingArtifact(path.to_owned()).into()) + } + + #[tracing::instrument(skip(self, path, func), path = path.as_ref())] + async fn save_with(&mut self, path: P, func: F) -> Result + where + Report: From, + Ret: Send, + Err: Send, + F: for<'a> SaveCallback<'a, Ret, Err> + Send, + P: AsRef + Send, + { + let mut writer = ArtifactWriter::new().await?; + let ret = func(&mut writer).await?; + self.artifacts + .insert(path.as_ref().to_owned(), writer.into_reader().await?); + Ok(ret) + } +} + pub struct ObsJobHandler { job: Job, client: obs::Client, @@ -289,6 +334,11 @@ impl ObsJobHandler { #[instrument(skip(self))] async fn run_dput(&mut self, args: DputAction) -> Result<()> { + let mut artifacts = GitLabArtifacts { + job: &self.job, + artifacts: &mut self.artifacts, + }; + let branch_to = if !args.branch_to.is_empty() { Some(args.branch_to) } else { @@ -305,7 +355,7 @@ impl ObsJobHandler { args.project.clone(), branch_to, args.dsc.as_str().into(), - self, + &artifacts, ) .await?; @@ -318,7 +368,10 @@ impl ObsJobHandler { enabled_repos: HashMap::new(), }; debug!("Saving initial build info: {:?}", build_info); - build_info.clone().save(self, &args.build_info_out).await?; + build_info + .clone() + .save(&mut artifacts, &args.build_info_out) + .await?; let initial_build_meta = BuildMeta::get_if_package_exists( self.client.clone(), @@ -334,7 +387,7 @@ impl ObsJobHandler { .await?; debug!(?initial_build_meta); - let result = uploader.upload_package(self).await?; + let result = uploader.upload_package(&artifacts).await?; // If we couldn't get the metadata before because the package didn't // exist yet, get it now but without history, so we leave the previous @@ -392,14 +445,21 @@ impl ObsJobHandler { ..build_info }; debug!("Saving complete build info: {:?}", build_info); - build_info.save(self, &args.build_info_out).await?; + build_info + .save(&mut artifacts, &args.build_info_out) + .await?; Ok(()) } #[instrument(skip(self))] async fn run_generate_monitor(&mut self, args: GenerateMonitorAction) -> Result<()> { - let build_info_data = self.read_string(&args.build_info).await?; + let mut artifacts = GitLabArtifacts { + job: &self.job, + artifacts: &mut self.artifacts, + }; + + let build_info_data = artifacts.read_string(&args.build_info).await?; let build_info: ObsBuildInfo = serde_yaml::from_str(&build_info_data) .wrap_err("Failed to parse provided build info file")?; @@ -432,7 +492,9 @@ impl ObsJobHandler { }, )?; - self.write(&args.pipeline_out, pipeline.as_bytes()).await?; + artifacts + .write(&args.pipeline_out, pipeline.as_bytes()) + .await?; outputln!("Wrote pipeline file '{}'.", args.pipeline_out); Ok(()) @@ -440,6 +502,11 @@ impl ObsJobHandler { #[instrument(skip(self))] async fn run_monitor(&mut self, args: MonitorAction) -> Result<()> { + let mut artifacts = GitLabArtifacts { + job: &self.job, + artifacts: &mut self.artifacts, + }; + let monitor = ObsMonitor::new( self.client.clone(), MonitoredPackage { @@ -459,7 +526,7 @@ impl ObsJobHandler { debug!("Completed with: {:?}", completion); let mut log_file = monitor - .download_build_log(&args.build_log_out, self) + .download_build_log(&args.build_log_out, &mut artifacts) .await?; match completion { @@ -506,13 +573,18 @@ impl ObsJobHandler { #[instrument(skip(self))] async fn run_download_binaries(&mut self, args: DownloadBinariesAction) -> Result<()> { + let mut artifacts = GitLabArtifacts { + job: &self.job, + artifacts: &mut self.artifacts, + }; + let binaries = download_binaries( self.client.clone(), &args.project, &args.package, &args.repository, &args.arch, - self, + &mut artifacts, &args.build_results_dir, ) .await?; @@ -528,8 +600,13 @@ impl ObsJobHandler { return Ok(()); } + let artifacts = GitLabArtifacts { + job: &self.job, + artifacts: &mut self.artifacts, + }; + let build_info_data = if args.ignore_missing_build_info { - if let Some(build_info_data) = self + if let Some(build_info_data) = artifacts .read_string(&args.build_info) .await .missing_artifact_to_none()? @@ -543,7 +620,7 @@ impl ObsJobHandler { return Ok(()); } } else { - self.read_string(&args.build_info).await? + artifacts.read_string(&args.build_info).await? }; let build_info: ObsBuildInfo = serde_yaml::from_str(&build_info_data) @@ -701,46 +778,6 @@ async fn check_for_artifact( Ok(None) } -#[async_trait] -impl ArtifactDirectory for ObsJobHandler { - #[instrument(skip(self, path), path = path.as_ref())] - async fn open(&self, path: impl AsRef + Send) -> Result { - let path = path.as_ref(); - - if let Some(file) = self.artifacts.get(path) { - let file = file - .try_clone() - .await - .wrap_err("Failed to reopen artifact")?; - return Ok(file); - } - - for dep in self.job.dependencies() { - if let Some(file) = check_for_artifact(dep, path).await? { - return Ok(file); - } - } - - Err(MissingArtifact(path.to_owned()).into()) - } - - #[tracing::instrument(skip(self, path, func), path = path.as_ref())] - async fn save_with(&mut self, path: P, func: F) -> Result - where - Report: From, - Ret: Send, - Err: Send, - F: for<'a> SaveCallback<'a, Ret, Err> + Send, - P: AsRef + Send, - { - let mut writer = ArtifactWriter::new().await?; - let ret = func(&mut writer).await?; - self.artifacts - .insert(path.as_ref().to_owned(), writer.into_reader().await?); - Ok(ret) - } -} - #[cfg(test)] mod tests { use std::{ From 8c79e8295179f95d34eee93b5fbf7485cfd934f8 Mon Sep 17 00:00:00 2001 From: Ryan Gonzalez Date: Tue, 1 Jul 2025 10:38:27 -0500 Subject: [PATCH 3/4] Introduce a custom logging system that forwards to GitLab Instead of hardcoding the dependence on gitlab-runner's outputln, a custom macro is used with a unique field name. This field is picked up by a rather cursed bridge layer rewrites the events to instead use the fields gitlab-runner wants. --- src/build_meta.rs | 3 +- src/handler.rs | 6 +- src/logging.rs | 166 ++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 4 +- src/monitor.rs | 3 +- src/prune.rs | 6 +- src/upload.rs | 3 +- 7 files changed, 177 insertions(+), 14 deletions(-) create mode 100644 src/logging.rs diff --git a/src/build_meta.rs b/src/build_meta.rs index 9b8ca0c..3588110 100644 --- a/src/build_meta.rs +++ b/src/build_meta.rs @@ -1,12 +1,11 @@ use std::{collections::HashMap, time::Duration}; use color_eyre::eyre::{Result, WrapErr}; -use gitlab_runner::outputln; use open_build_service_api as obs; use serde::{Deserialize, Serialize}; use tracing::{Instrument, debug, info_span, instrument}; -use crate::retry_request; +use crate::{outputln, retry_request}; #[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash)] pub struct RepoArch { diff --git a/src/handler.rs b/src/handler.rs index 897c524..eed3d82 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -13,7 +13,6 @@ use futures_util::StreamExt; use gitlab_runner::{ JobHandler, JobResult, Phase, UploadableFile, job::{Dependency, Job, Variable}, - outputln, }; use open_build_service_api as obs; use serde::{Deserialize, Serialize}; @@ -38,6 +37,7 @@ use crate::{ RepoArch, }, monitor::{MonitoredPackage, ObsMonitor, PackageCompletion, PackageMonitoringOptions}, + outputln, pipeline::{GeneratePipelineOptions, PipelineDownloadBinaries, generate_monitor_pipeline}, prune::prune_branch, retry_request, @@ -798,7 +798,7 @@ mod tests { use tracing_subscriber::{Layer, Registry, filter::Targets, prelude::*}; use zip::ZipArchive; - use crate::{test_support::*, upload::compute_md5}; + use crate::{logging::GitLabForwarder, test_support::*, upload::compute_md5}; use super::*; @@ -866,7 +866,7 @@ mod tests { ), ) .with(tracing_error::ErrorLayer::default()) - .with(layer), + .with(GitLabForwarder::new(layer)), ) .await } diff --git a/src/logging.rs b/src/logging.rs new file mode 100644 index 0000000..138c9ab --- /dev/null +++ b/src/logging.rs @@ -0,0 +1,166 @@ +use gitlab_runner::GitlabLayer; +use tracing::{ + Event, Level, Metadata, Subscriber, + field::{self, Field, FieldSet}, + span::{Attributes, Id}, + subscriber::Interest, +}; +use tracing_subscriber::{ + Layer, + filter::{Filtered, Targets}, + layer::{Context, Filter}, + registry::LookupSpan, +}; + +struct OutputTester(bool); + +impl field::Visit for OutputTester { + fn record_bool(&mut self, field: &field::Field, value: bool) { + if field.name() == "obs_gitlab_runner.output" { + self.0 = value + } + } + + fn record_debug(&mut self, _field: &field::Field, _value: &dyn std::fmt::Debug) {} +} + +struct MessageExtractor(Option); + +impl field::Visit for MessageExtractor { + fn record_str(&mut self, field: &Field, value: &str) { + if field.name() == "message" { + self.0 = Some(value.to_owned()); + } + } + + fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) { + if field.name() == "message" { + self.0 = Some(format!("{value:?}")); + } + } +} + +// This mostly wraps a standard GitlabLayer, but it bypasses the filter to pass +// through any events with `obs_gitlab_runner.output` set, rewriting them to +// instead use `gitlab.output`. +pub struct GitLabForwarder>(Filtered); + +impl LookupSpan<'span>, F: Filter + 'static> + GitLabForwarder +{ + pub fn new(inner: Filtered) -> Filtered { + GitLabForwarder(inner).with_filter(Targets::new().with_targets([ + ("obs_gitlab_runner", Level::TRACE), + // This target is used to inject the current job ID, which + // gitlab-runner needs to actually send the logs out. + ("gitlab_runner::gitlab::job", Level::ERROR), + ])) + } +} + +impl LookupSpan<'span>, F: Filter + 'static> + Layer for GitLabForwarder +{ + fn on_register_dispatch(&self, subscriber: &tracing::Dispatch) { + self.0.on_register_dispatch(subscriber); + } + + fn on_layer(&mut self, subscriber: &mut S) { + self.0.on_layer(subscriber); + } + + fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest { + self.0.register_callsite(metadata) + } + + fn enabled(&self, metadata: &Metadata<'_>, ctx: Context<'_, S>) -> bool { + // This controls *global* event filtering, not local, so the inner filter + // should always return `true`. But we need to call it anyway, because + // `Filter` will *save internal state* that's needed for other API + // calls, and thus otherwise the event will always be treated as + // disabled. (Of course, events in the span we want to forward will + // also be disabled by this, which is why bypassing the filter in + // `on_event` is important.) + let enabled = self.0.enabled(metadata, ctx.clone()); + assert!(enabled); + true + } + + fn event_enabled(&self, event: &Event<'_>, ctx: Context<'_, S>) -> bool { + self.0.event_enabled(event, ctx) + } + + fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) { + self.0.on_new_span(attrs, id, ctx); + } + + fn on_follows_from(&self, span: &Id, follows: &Id, ctx: Context<'_, S>) { + self.0.on_follows_from(span, follows, ctx); + } + + fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) { + let mut visitor = OutputTester(false); + event.record(&mut visitor); + if !visitor.0 { + // No special behavior needed, so just forward it as-is. + self.0.on_event(event, ctx); + return; + } + + let mut visitor = MessageExtractor(None); + event.record(&mut visitor); + let Some(message) = visitor.0 else { + return; + }; + + // Create a new event that contains the fields needed for gitlab-runner. + let fields = FieldSet::new(&["gitlab.output", "message"], event.metadata().callsite()); + let mut iter = fields.iter(); + let values = [ + // "gitlab.output = true" + (&iter.next().unwrap(), Some(&true as &dyn tracing::Value)), + // "message" + (&iter.next().unwrap(), Some(&message as &dyn tracing::Value)), + ]; + + let value_set = fields.value_set(&values); + + let event = if event.is_contextual() { + // This event's parent is None, but if that's given to new_child_of, + // then this will be treated as an event at the *root*, i.e. + // completely parentless. By using `Event::new`, another contextual + // event will be created, which can still be tied to the correct + // `event_span`. + Event::new(event.metadata(), &value_set) + } else { + Event::new_child_of(event.parent().cloned(), event.metadata(), &value_set) + }; + + // Bypass the filter completely, because the event was almost certainly + // filtered out in its `enabled` due to lacking `gitlab.output`. + self.0.inner().on_event(&event, ctx); + } + + fn on_enter(&self, id: &Id, ctx: Context<'_, S>) { + self.0.on_enter(id, ctx) + } + + fn on_exit(&self, id: &Id, ctx: Context<'_, S>) { + self.0.on_exit(id, ctx) + } + + fn on_close(&self, id: Id, ctx: Context<'_, S>) { + self.0.on_close(id, ctx) + } + + fn on_id_change(&self, old: &Id, new: &Id, ctx: Context<'_, S>) { + self.0.on_id_change(old, new, ctx); + } +} + +#[macro_export] +macro_rules! outputln { + ($($args:tt)*) => { + ::tracing::trace!(obs_gitlab_runner.output = true, $($args)*) + }; +} diff --git a/src/main.rs b/src/main.rs index 6d6f623..3dc5976 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ use std::{fmt, str::FromStr, sync::Arc}; use clap::Parser; use color_eyre::eyre::Result; use gitlab_runner::{GitlabLayer, RunnerBuilder}; +use logging::GitLabForwarder; use strum::{Display, EnumString}; use tracing::{Subscriber, error, info}; use tracing_subscriber::{ @@ -22,6 +23,7 @@ mod binaries; mod build_meta; mod dsc; mod handler; +mod logging; mod monitor; mod pipeline; mod prune; @@ -121,7 +123,7 @@ async fn main() { let registry = tracing_subscriber::registry() .with(tracing_error::ErrorLayer::default()) - .with(layer); + .with(GitLabForwarder::new(layer)); match args.log_format { LogFormat::Compact => registry diff --git a/src/monitor.rs b/src/monitor.rs index 96dc9b0..95c6c7c 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -3,14 +3,13 @@ use std::time::Duration; use color_eyre::eyre::{Context, Report, Result, ensure, eyre}; use derivative::*; use futures_util::stream::StreamExt; -use gitlab_runner::outputln; use open_build_service_api as obs; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use tracing::{debug, instrument}; use crate::{ artifacts::{ArtifactDirectory, ArtifactReader, ArtifactWriter}, - retry_request, + outputln, retry_request, }; #[derive(Debug)] diff --git a/src/prune.rs b/src/prune.rs index b1d1ca4..088f871 100644 --- a/src/prune.rs +++ b/src/prune.rs @@ -1,9 +1,7 @@ use color_eyre::eyre::{Context, Result, ensure}; -use gitlab_runner::outputln; use open_build_service_api as obs; -use tracing::info; -use crate::retry_request; +use crate::{outputln, retry_request}; async fn is_originally_branched( client: &obs::Client, @@ -49,7 +47,7 @@ pub async fn prune_branch( if let Some(expected_rev) = expected_rev { if dir.rev.as_deref() != Some(expected_rev) { - info!( + outputln!( "Latest revision is {}, skipping prune", dir.rev.as_deref().unwrap_or("[unknown]") ); diff --git a/src/upload.rs b/src/upload.rs index 03d5d44..844698b 100644 --- a/src/upload.rs +++ b/src/upload.rs @@ -3,7 +3,6 @@ use std::collections::HashMap; use camino::{Utf8Path, Utf8PathBuf}; use color_eyre::eyre::{Context, Result, ensure, eyre}; use derivative::*; -use gitlab_runner::outputln; use md5::{Digest, Md5}; use open_build_service_api as obs; use tracing::{Instrument, debug, info_span, instrument, trace}; @@ -11,7 +10,7 @@ use tracing::{Instrument, debug, info_span, instrument, trace}; use crate::{ artifacts::ArtifactDirectory, dsc::{Dsc, discard_pgp}, - retry_request, + outputln, retry_request, }; type Md5String = String; From 24438891f0f54b9f60b304bb2eb3d0f847e4e34a Mon Sep 17 00:00:00 2001 From: Ryan Gonzalez Date: Tue, 1 Jul 2025 10:38:27 -0500 Subject: [PATCH 4/4] Isolate GitLab-specific command handling All the generic command handling is now part of an `actions` module, leaving only GitLab-specific functionality in `handler`. --- src/actions.rs | 387 ++++++++++++++++++++++++++++++++++++++ src/handler.rs | 490 ++++++++----------------------------------------- src/main.rs | 1 + 3 files changed, 463 insertions(+), 415 deletions(-) create mode 100644 src/actions.rs diff --git a/src/actions.rs b/src/actions.rs new file mode 100644 index 0000000..140f8e0 --- /dev/null +++ b/src/actions.rs @@ -0,0 +1,387 @@ +use std::{collections::HashMap, io::SeekFrom}; + +use camino::{Utf8Path, Utf8PathBuf}; +use clap::{ArgAction, Parser}; +use color_eyre::eyre::{Context, Report, Result}; +use open_build_service_api as obs; +use serde::{Deserialize, Serialize}; +use tokio::io::{AsyncSeekExt, AsyncWriteExt}; +use tracing::{debug, instrument}; + +use crate::{ + artifacts::{ArtifactDirectory, ArtifactReader, ArtifactWriter, MissingArtifactToNone}, + binaries::download_binaries, + build_meta::{ + BuildHistoryRetrieval, BuildMeta, BuildMetaOptions, CommitBuildInfo, DisabledRepos, + RepoArch, + }, + monitor::{MonitoredPackage, ObsMonitor, PackageCompletion, PackageMonitoringOptions}, + outputln, + prune::prune_branch, + retry_request, + upload::ObsDscUploader, +}; + +pub const DEFAULT_BUILD_INFO: &str = "build-info.yml"; +pub const DEFAULT_BUILD_LOG: &str = "build.log"; + +// Our flags can all take explicit values, because it makes it easier to +// conditionally set things in the pipelines. +pub trait FlagSupportingExplicitValue { + fn flag_supporting_explicit_value(self) -> Self; +} + +impl FlagSupportingExplicitValue for clap::Arg { + fn flag_supporting_explicit_value(self) -> Self { + self.num_args(0..=1) + .require_equals(true) + .required(false) + .default_value("false") + .default_missing_value("true") + .action(ArgAction::Set) + } +} + +#[derive(Parser, Debug)] +pub struct DputAction { + pub project: String, + pub dsc: String, + #[clap(long, default_value = "")] + pub branch_to: String, + #[clap(long, default_value_t = DEFAULT_BUILD_INFO.to_owned().into())] + pub build_info_out: Utf8PathBuf, + #[clap(long, flag_supporting_explicit_value())] + pub rebuild_if_unchanged: bool, +} + +#[derive(Parser, Debug)] +pub struct MonitorAction { + #[clap(long)] + pub project: String, + #[clap(long)] + pub package: String, + #[clap(long)] + pub rev: String, + #[clap(long)] + pub srcmd5: String, + #[clap(long)] + pub repository: String, + #[clap(long)] + pub arch: String, + #[clap(long)] + pub prev_endtime_for_commit: Option, + #[clap(long)] + pub build_log_out: String, +} + +#[derive(Parser, Debug)] +pub struct DownloadBinariesAction { + #[clap(long)] + pub project: String, + #[clap(long)] + pub package: String, + #[clap(long)] + pub repository: String, + #[clap(long)] + pub arch: String, + #[clap(long)] + pub build_results_dir: Utf8PathBuf, +} + +#[derive(Parser, Debug)] +pub struct PruneAction { + #[clap(long, default_value_t = DEFAULT_BUILD_INFO.to_owned())] + pub build_info: String, + #[clap(long, flag_supporting_explicit_value())] + pub ignore_missing_build_info: bool, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ObsBuildInfo { + pub project: String, + pub package: String, + pub rev: Option, + pub srcmd5: Option, + pub is_branched: bool, + pub enabled_repos: HashMap, +} + +impl ObsBuildInfo { + #[instrument(skip(artifacts))] + async fn save(self, artifacts: &mut impl ArtifactDirectory, path: &Utf8Path) -> Result<()> { + artifacts + .save_with(path, async |file: &mut ArtifactWriter| { + let data = + serde_yaml::to_string(&self).wrap_err("Failed to serialize build info")?; + file.write_all(data.as_bytes()) + .await + .wrap_err("Failed to write build info file")?; + Ok::<_, Report>(()) + }) + .await + } +} + +#[derive(Debug, thiserror::Error)] +#[error("Failed build")] +pub struct FailedBuild; + +pub const LOG_TAIL_2MB: u64 = 2 * 1024 * 1024; + +pub struct Actions { + pub client: obs::Client, +} + +impl Actions { + #[instrument(skip_all, fields(args))] + pub async fn dput( + &mut self, + args: DputAction, + artifacts: &mut impl ArtifactDirectory, + ) -> Result<()> { + let branch_to = if !args.branch_to.is_empty() { + Some(args.branch_to) + } else { + None + }; + let is_branched = branch_to.is_some(); + + // The upload prep and actual upload are split in two so that we can + // already tell what the project & package name are, so build-info.yaml + // can be written and pruning can take place regardless of the actual + // *upload* success. + let uploader = ObsDscUploader::prepare( + self.client.clone(), + args.project.clone(), + branch_to, + args.dsc.as_str().into(), + artifacts, + ) + .await?; + + let build_info = ObsBuildInfo { + project: uploader.project().to_owned(), + package: uploader.package().to_owned(), + rev: None, + srcmd5: None, + is_branched, + enabled_repos: HashMap::new(), + }; + debug!("Saving initial build info: {:?}", build_info); + build_info + .clone() + .save(artifacts, &args.build_info_out) + .await?; + + let initial_build_meta = BuildMeta::get_if_package_exists( + self.client.clone(), + build_info.project.clone(), + build_info.package.clone(), + &BuildMetaOptions { + history_retrieval: BuildHistoryRetrieval::Full, + // Getting disabled repos has to happen *after* the upload, + // since the new version can change the supported architectures. + disabled_repos: DisabledRepos::Keep, + }, + ) + .await?; + debug!(?initial_build_meta); + + let result = uploader.upload_package(artifacts).await?; + + // If we couldn't get the metadata before because the package didn't + // exist yet, get it now but without history, so we leave the previous + // endtime empty (if there was no previous package, there were no + // previous builds). + let mut build_meta = if let Some(mut build_meta) = initial_build_meta { + build_meta + .remove_disabled_repos(&Default::default()) + .await?; + build_meta + } else { + BuildMeta::get( + self.client.clone(), + build_info.project.clone(), + build_info.package.clone(), + &BuildMetaOptions { + history_retrieval: BuildHistoryRetrieval::None, + disabled_repos: DisabledRepos::Skip { + wait_options: Default::default(), + }, + }, + ) + .await? + }; + + if result.unchanged { + outputln!("Package unchanged at revision {}.", result.rev); + + if args.rebuild_if_unchanged { + retry_request!( + self.client + .project(build_info.project.clone()) + .package(build_info.package.clone()) + .rebuild() + .await + .wrap_err("Failed to trigger rebuild") + )?; + } else { + // Clear out the history used to track endtime values. This is + // normally important to make sure the monitor doesn't + // accidentally pick up an old build result...but if we didn't + // rebuild anything, picking up the old result is *exactly* the + // behavior we want. + build_meta.clear_stored_history(); + } + } else { + outputln!("Package uploaded with revision {}.", result.rev); + } + + let enabled_repos = build_meta.get_commit_build_info(&result.build_srcmd5); + let build_info = ObsBuildInfo { + rev: Some(result.rev), + srcmd5: Some(result.build_srcmd5), + enabled_repos, + ..build_info + }; + debug!("Saving complete build info: {:?}", build_info); + build_info.save(artifacts, &args.build_info_out).await?; + + Ok(()) + } + + #[instrument(skip_all, fields(args))] + pub async fn monitor> + Send>( + &mut self, + args: MonitorAction, + monitoring_options: PackageMonitoringOptions, + log_tail_cb: impl FnOnce(ArtifactReader) -> F, + log_tail_bytes: u64, + artifacts: &mut impl ArtifactDirectory, + ) -> Result<()> { + let monitor = ObsMonitor::new( + self.client.clone(), + MonitoredPackage { + project: args.project.clone(), + package: args.package.clone(), + repository: args.repository.clone(), + arch: args.arch.clone(), + rev: args.rev.clone(), + srcmd5: args.srcmd5.clone(), + prev_endtime_for_commit: args.prev_endtime_for_commit, + }, + ); + + let completion = monitor.monitor_package(monitoring_options).await?; + debug!("Completed with: {:?}", completion); + + let mut log_file = monitor + .download_build_log(&args.build_log_out, artifacts) + .await?; + + match completion { + PackageCompletion::Succeeded => { + outputln!("Build succeeded!"); + } + PackageCompletion::Superceded => { + outputln!("Build was superceded by a newer revision."); + } + PackageCompletion::Disabled => { + outputln!("Package is disabled for this architecture."); + } + PackageCompletion::Failed(reason) => { + log_file + .file + .seek(SeekFrom::End( + -(std::cmp::min(log_tail_bytes, log_file.len) as i64), + )) + .await + .wrap_err("Failed to find length of log file")?; + + log_tail_cb(log_file.file).await?; + + outputln!("{}", "=".repeat(64)); + outputln!( + "Build failed with reason '{}'.", + reason.to_string().to_lowercase() + ); + outputln!("The last 2MB of the build log is printed above."); + outputln!( + "(Full logs are available in the build artifact '{}'.)", + args.build_log_out + ); + return Err(FailedBuild.into()); + } + } + + Ok(()) + } + + #[instrument(skip_all, fields(args))] + pub async fn download_binaries( + &mut self, + args: DownloadBinariesAction, + actions: &mut impl ArtifactDirectory, + ) -> Result<()> { + let binaries = download_binaries( + self.client.clone(), + &args.project, + &args.package, + &args.repository, + &args.arch, + actions, + &args.build_results_dir, + ) + .await?; + + outputln!("Downloaded {} artifact(s).", binaries.paths.len()); + Ok(()) + } + + #[instrument(skip_all, fields(args))] + pub async fn prune( + &mut self, + args: PruneAction, + artifacts: &impl ArtifactDirectory, + ) -> Result<()> { + let build_info_data = if args.ignore_missing_build_info { + if let Some(build_info_data) = artifacts + .read_string(&args.build_info) + .await + .missing_artifact_to_none()? + { + build_info_data + } else { + outputln!( + "Skipping prune: build info file '{}' not found.", + args.build_info + ); + return Ok(()); + } + } else { + artifacts.read_string(&args.build_info).await? + }; + + let build_info: ObsBuildInfo = serde_yaml::from_str(&build_info_data) + .wrap_err("Failed to parse provided build info file")?; + + if build_info.is_branched { + outputln!( + "Pruning branched package {}/{}...", + build_info.project, + build_info.package + ); + prune_branch( + &self.client, + &build_info.project, + &build_info.package, + build_info.rev.as_deref(), + ) + .await?; + } else { + outputln!("Skipping prune: package was not branched."); + } + + Ok(()) + } +} diff --git a/src/handler.rs b/src/handler.rs index eed3d82..6b761e6 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -1,12 +1,12 @@ use std::{ borrow::Cow, collections::{HashMap, HashSet}, - io::{Seek, SeekFrom}, + io::Seek, }; use async_trait::async_trait; use camino::{Utf8Path, Utf8PathBuf}; -use clap::{ArgAction, Parser, Subcommand}; +use clap::{Parser, Subcommand}; use color_eyre::eyre::{Context, Report, Result, eyre}; use derivative::*; use futures_util::StreamExt; @@ -15,69 +15,28 @@ use gitlab_runner::{ job::{Dependency, Job, Variable}, }; use open_build_service_api as obs; -use serde::{Deserialize, Serialize}; -use tokio::{ - fs::File as AsyncFile, - io::{AsyncSeekExt, AsyncWriteExt}, -}; +use tokio::fs::File as AsyncFile; use tokio_util::{ compat::{Compat, TokioAsyncReadCompatExt}, io::ReaderStream, }; -use tracing::{debug, error, instrument, warn}; +use tracing::{error, instrument, warn}; use crate::{ - artifacts::{ - ArtifactDirectory, ArtifactReader, ArtifactWriter, MissingArtifact, MissingArtifactToNone, - SaveCallback, - }, - binaries::download_binaries, - build_meta::{ - BuildHistoryRetrieval, BuildMeta, BuildMetaOptions, CommitBuildInfo, DisabledRepos, - RepoArch, + actions::{ + Actions, DEFAULT_BUILD_INFO, DEFAULT_BUILD_LOG, DownloadBinariesAction, DputAction, + FailedBuild, FlagSupportingExplicitValue, LOG_TAIL_2MB, MonitorAction, ObsBuildInfo, + PruneAction, }, - monitor::{MonitoredPackage, ObsMonitor, PackageCompletion, PackageMonitoringOptions}, + artifacts::{ArtifactDirectory, ArtifactReader, ArtifactWriter, MissingArtifact, SaveCallback}, + monitor::PackageMonitoringOptions, outputln, pipeline::{GeneratePipelineOptions, PipelineDownloadBinaries, generate_monitor_pipeline}, - prune::prune_branch, - retry_request, - upload::ObsDscUploader, }; -const DEFAULT_BUILD_INFO: &str = "build-info.yml"; const DEFAULT_MONITOR_PIPELINE: &str = "obs.yml"; const DEFAULT_PIPELINE_JOB_PREFIX: &str = "obs"; const DEFAULT_ARTIFACT_EXPIRATION: &str = "3 days"; -const DEFAULT_BUILD_LOG: &str = "build.log"; - -// Our flags can all take explicit values, because it makes it easier to -// conditionally set things in the pipelines. -trait FlagSupportingExplicitValue { - fn flag_supporting_explicit_value(self) -> Self; -} - -impl FlagSupportingExplicitValue for clap::Arg { - fn flag_supporting_explicit_value(self) -> Self { - self.num_args(0..=1) - .require_equals(true) - .required(false) - .default_value("false") - .default_missing_value("true") - .action(ArgAction::Set) - } -} - -#[derive(Parser, Debug)] -struct DputAction { - project: String, - dsc: String, - #[clap(long, default_value = "")] - branch_to: String, - #[clap(long, default_value_t = DEFAULT_BUILD_INFO.to_owned().into())] - build_info_out: Utf8PathBuf, - #[clap(long, flag_supporting_explicit_value())] - rebuild_if_unchanged: bool, -} #[derive(Parser, Debug)] struct GenerateMonitorAction { @@ -100,51 +59,6 @@ struct GenerateMonitorAction { build_log_out: String, } -#[derive(Parser, Debug)] -struct MonitorAction { - #[clap(long)] - project: String, - #[clap(long)] - package: String, - #[clap(long)] - rev: String, - #[clap(long)] - srcmd5: String, - #[clap(long)] - repository: String, - #[clap(long)] - arch: String, - #[clap(long)] - prev_endtime_for_commit: Option, - #[clap(long)] - build_log_out: String, -} - -#[derive(Parser, Debug)] -struct DownloadBinariesAction { - #[clap(long)] - project: String, - #[clap(long)] - package: String, - #[clap(long)] - repository: String, - #[clap(long)] - arch: String, - #[clap(long)] - build_results_dir: Utf8PathBuf, -} - -#[derive(Parser, Debug)] -struct PruneAction { - #[clap(long, default_value_t = DEFAULT_BUILD_INFO.to_owned())] - build_info: String, - #[clap(long, flag_supporting_explicit_value())] - ignore_missing_build_info: bool, - #[clap(long, flag_supporting_explicit_value())] - only_if_job_unsuccessful: bool, -} - -#[cfg(test)] #[derive(Parser, Debug)] struct EchoAction { args: Vec, @@ -157,12 +71,23 @@ struct EchoAction { } #[derive(Subcommand)] -enum Action { +enum CommonAction { Dput(DputAction), - GenerateMonitor(GenerateMonitorAction), Monitor(MonitorAction), DownloadBinaries(DownloadBinariesAction), - Prune(PruneAction), + Prune { + #[clap(long, flag_supporting_explicit_value())] + only_if_job_unsuccessful: bool, + #[clap(flatten)] + args: PruneAction, + }, +} + +#[derive(Subcommand)] +enum JobAction { + #[clap(flatten)] + Common(CommonAction), + GenerateMonitor(GenerateMonitorAction), #[cfg(test)] Echo(EchoAction), } @@ -172,48 +97,9 @@ enum Action { #[clap(no_binary_name = true)] struct Command { #[clap(subcommand)] - action: Action, + action: JobAction, } -#[derive(Clone, Debug, Serialize, Deserialize)] -struct ObsBuildInfo { - project: String, - package: String, - rev: Option, - srcmd5: Option, - is_branched: bool, - enabled_repos: HashMap, -} - -impl ObsBuildInfo { - #[instrument(skip(artifacts))] - async fn save(self, artifacts: &mut impl ArtifactDirectory, path: &Utf8Path) -> Result<()> { - artifacts - .save_with(path, async |file: &mut ArtifactWriter| { - let data = - serde_yaml::to_string(&self).wrap_err("Failed to serialize build info")?; - file.write_all(data.as_bytes()) - .await - .wrap_err("Failed to write build info file")?; - Ok::<_, Report>(()) - }) - .await - } -} - -#[derive(Debug)] -struct FailedBuild; - -impl std::fmt::Display for FailedBuild { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{self:?}") - } -} - -impl std::error::Error for FailedBuild {} - -const LOG_TAIL_2MB: u64 = 2 * 1024 * 1024; - fn get_job_variable<'job>(job: &'job Job, key: &str) -> Result> { job.variable(key) .ok_or_else(|| eyre!("Failed to get variable ${}", key)) @@ -275,9 +161,9 @@ impl ArtifactDirectory for GitLabArtifacts<'_> { pub struct ObsJobHandler { job: Job, - client: obs::Client, options: HandlerOptions, + actions: Actions, script_failed: bool, artifacts: HashMap, } @@ -286,8 +172,8 @@ impl ObsJobHandler { pub fn new(job: Job, client: obs::Client, options: HandlerOptions) -> Self { ObsJobHandler { job, - client, options, + actions: Actions { client }, script_failed: false, artifacts: HashMap::new(), } @@ -333,127 +219,7 @@ impl ObsJobHandler { } #[instrument(skip(self))] - async fn run_dput(&mut self, args: DputAction) -> Result<()> { - let mut artifacts = GitLabArtifacts { - job: &self.job, - artifacts: &mut self.artifacts, - }; - - let branch_to = if !args.branch_to.is_empty() { - Some(args.branch_to) - } else { - None - }; - let is_branched = branch_to.is_some(); - - // The upload prep and actual upload are split in two so that we can - // already tell what the project & package name are, so build-info.yaml - // can be written and pruning can take place regardless of the actual - // *upload* success. - let uploader = ObsDscUploader::prepare( - self.client.clone(), - args.project.clone(), - branch_to, - args.dsc.as_str().into(), - &artifacts, - ) - .await?; - - let build_info = ObsBuildInfo { - project: uploader.project().to_owned(), - package: uploader.package().to_owned(), - rev: None, - srcmd5: None, - is_branched, - enabled_repos: HashMap::new(), - }; - debug!("Saving initial build info: {:?}", build_info); - build_info - .clone() - .save(&mut artifacts, &args.build_info_out) - .await?; - - let initial_build_meta = BuildMeta::get_if_package_exists( - self.client.clone(), - build_info.project.clone(), - build_info.package.clone(), - &BuildMetaOptions { - history_retrieval: BuildHistoryRetrieval::Full, - // Getting disabled repos has to happen *after* the upload, - // since the new version can change the supported architectures. - disabled_repos: DisabledRepos::Keep, - }, - ) - .await?; - debug!(?initial_build_meta); - - let result = uploader.upload_package(&artifacts).await?; - - // If we couldn't get the metadata before because the package didn't - // exist yet, get it now but without history, so we leave the previous - // endtime empty (if there was no previous package, there were no - // previous builds). - let mut build_meta = if let Some(mut build_meta) = initial_build_meta { - build_meta - .remove_disabled_repos(&Default::default()) - .await?; - build_meta - } else { - BuildMeta::get( - self.client.clone(), - build_info.project.clone(), - build_info.package.clone(), - &BuildMetaOptions { - history_retrieval: BuildHistoryRetrieval::None, - disabled_repos: DisabledRepos::Skip { - wait_options: Default::default(), - }, - }, - ) - .await? - }; - - if result.unchanged { - outputln!("Package unchanged at revision {}.", result.rev); - - if args.rebuild_if_unchanged { - retry_request!( - self.client - .project(build_info.project.clone()) - .package(build_info.package.clone()) - .rebuild() - .await - .wrap_err("Failed to trigger rebuild") - )?; - } else { - // Clear out the history used to track endtime values. This is - // normally important to make sure the monitor doesn't - // accidentally pick up an old build result...but if we didn't - // rebuild anything, picking up the old result is *exactly* the - // behavior we want. - build_meta.clear_stored_history(); - } - } else { - outputln!("Package uploaded with revision {}.", result.rev); - } - - let enabled_repos = build_meta.get_commit_build_info(&result.build_srcmd5); - let build_info = ObsBuildInfo { - rev: Some(result.rev), - srcmd5: Some(result.build_srcmd5), - enabled_repos, - ..build_info - }; - debug!("Saving complete build info: {:?}", build_info); - build_info - .save(&mut artifacts, &args.build_info_out) - .await?; - - Ok(()) - } - - #[instrument(skip(self))] - async fn run_generate_monitor(&mut self, args: GenerateMonitorAction) -> Result<()> { + async fn generate_monitor(&mut self, args: GenerateMonitorAction) -> Result<()> { let mut artifacts = GitLabArtifacts { job: &self.job, artifacts: &mut self.artifacts, @@ -500,152 +266,6 @@ impl ObsJobHandler { Ok(()) } - #[instrument(skip(self))] - async fn run_monitor(&mut self, args: MonitorAction) -> Result<()> { - let mut artifacts = GitLabArtifacts { - job: &self.job, - artifacts: &mut self.artifacts, - }; - - let monitor = ObsMonitor::new( - self.client.clone(), - MonitoredPackage { - project: args.project.clone(), - package: args.package.clone(), - repository: args.repository.clone(), - arch: args.arch.clone(), - rev: args.rev.clone(), - srcmd5: args.srcmd5.clone(), - prev_endtime_for_commit: args.prev_endtime_for_commit, - }, - ); - - let completion = monitor - .monitor_package(self.options.monitor.clone()) - .await?; - debug!("Completed with: {:?}", completion); - - let mut log_file = monitor - .download_build_log(&args.build_log_out, &mut artifacts) - .await?; - - match completion { - PackageCompletion::Succeeded => { - outputln!("Build succeeded!"); - } - PackageCompletion::Superceded => { - outputln!("Build was superceded by a newer revision."); - } - PackageCompletion::Disabled => { - outputln!("Package is disabled for this architecture."); - } - PackageCompletion::Failed(reason) => { - log_file - .file - .seek(SeekFrom::End( - -(std::cmp::min(self.options.log_tail, log_file.len) as i64), - )) - .await - .wrap_err("Failed to find length of log file")?; - - let mut log_stream = ReaderStream::new(log_file.file); - while let Some(bytes) = log_stream.next().await { - let bytes = bytes.wrap_err("Failed to stream log bytes")?; - self.job.trace(String::from_utf8_lossy(&bytes).as_ref()); - } - - outputln!("{}", "=".repeat(64)); - outputln!( - "Build failed with reason '{}'.", - reason.to_string().to_lowercase() - ); - outputln!("The last 2MB of the build log is printed above."); - outputln!( - "(Full logs are available in the build artifact '{}'.)", - args.build_log_out - ); - return Err(FailedBuild.into()); - } - } - - Ok(()) - } - - #[instrument(skip(self))] - async fn run_download_binaries(&mut self, args: DownloadBinariesAction) -> Result<()> { - let mut artifacts = GitLabArtifacts { - job: &self.job, - artifacts: &mut self.artifacts, - }; - - let binaries = download_binaries( - self.client.clone(), - &args.project, - &args.package, - &args.repository, - &args.arch, - &mut artifacts, - &args.build_results_dir, - ) - .await?; - - outputln!("Downloaded {} artifact(s).", binaries.paths.len()); - Ok(()) - } - - #[instrument(skip(self))] - async fn run_prune(&mut self, args: PruneAction) -> Result<()> { - if args.only_if_job_unsuccessful && !self.script_failed { - outputln!("Skipping prune: main script was successful."); - return Ok(()); - } - - let artifacts = GitLabArtifacts { - job: &self.job, - artifacts: &mut self.artifacts, - }; - - let build_info_data = if args.ignore_missing_build_info { - if let Some(build_info_data) = artifacts - .read_string(&args.build_info) - .await - .missing_artifact_to_none()? - { - build_info_data - } else { - outputln!( - "Skipping prune: build info file '{}' not found.", - args.build_info - ); - return Ok(()); - } - } else { - artifacts.read_string(&args.build_info).await? - }; - - let build_info: ObsBuildInfo = serde_yaml::from_str(&build_info_data) - .wrap_err("Failed to parse provided build info file")?; - - if build_info.is_branched { - outputln!( - "Pruning branched package {}/{}...", - build_info.project, - build_info.package - ); - prune_branch( - &self.client, - &build_info.project, - &build_info.package, - build_info.rev.as_deref(), - ) - .await?; - } else { - outputln!("Skipping prune: package was not branched."); - } - - Ok(()) - } - #[instrument(skip(self))] async fn command(&mut self, cmdline: &str) -> Result<()> { // TODO: inject user? @@ -657,13 +277,49 @@ impl ObsJobHandler { let command = Command::try_parse_from(args)?; match command.action { - Action::Dput(args) => self.run_dput(args).await?, - Action::GenerateMonitor(args) => self.run_generate_monitor(args).await?, - Action::Monitor(args) => self.run_monitor(args).await?, - Action::DownloadBinaries(args) => self.run_download_binaries(args).await?, - Action::Prune(args) => self.run_prune(args).await?, + JobAction::Common(action) => { + let mut artifacts = GitLabArtifacts { + job: &self.job, + artifacts: &mut self.artifacts, + }; + + match action { + CommonAction::Dput(args) => self.actions.dput(args, &mut artifacts).await?, + CommonAction::Monitor(args) => { + self.actions + .monitor( + args, + self.options.monitor.clone(), + async |file| { + let mut log_stream = ReaderStream::new(file); + while let Some(bytes) = log_stream.next().await { + let bytes = bytes.wrap_err("Failed to stream log bytes")?; + self.job.trace(String::from_utf8_lossy(&bytes).as_ref()); + } + Ok(()) + }, + self.options.log_tail, + &mut artifacts, + ) + .await? + } + CommonAction::DownloadBinaries(args) => { + self.actions.download_binaries(args, &mut artifacts).await? + } + CommonAction::Prune { + only_if_job_unsuccessful: true, + .. + } if !self.script_failed => { + outputln!("Skipping prune: main script was successful.") + } + CommonAction::Prune { args, .. } => { + self.actions.prune(args, &artifacts).await? + } + } + } + JobAction::GenerateMonitor(args) => self.generate_monitor(args).await?, #[cfg(test)] - Action::Echo(args) => { + JobAction::Echo(args) => { use color_eyre::eyre::ensure; let mut output = args.args.join(&args.sep); @@ -798,7 +454,9 @@ mod tests { use tracing_subscriber::{Layer, Registry, filter::Targets, prelude::*}; use zip::ZipArchive; - use crate::{logging::GitLabForwarder, test_support::*, upload::compute_md5}; + use crate::{ + build_meta::RepoArch, logging::GitLabForwarder, test_support::*, upload::compute_md5, + }; use super::*; @@ -1909,6 +1567,8 @@ mod tests { )] test: Option, ) { + use crate::build_meta::CommitBuildInfo; + const TEST_MONITOR_TIMEOUT: &str = "10 minutes"; with_context(async |mut context| { diff --git a/src/main.rs b/src/main.rs index 3dc5976..5e03b55 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,6 +18,7 @@ use url::Url; use crate::handler::{HandlerOptions, ObsJobHandler}; +mod actions; mod artifacts; mod binaries; mod build_meta;