diff --git a/CHANGELOG.md b/CHANGELOG.md index d4d9b965d5..eeaf15c300 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,8 +12,17 @@ Recommendation: for ease of reading, use the following order: --> ## [Unreleased] +### Added +- GQL: `BigInt` scalar ### Changed +- GQL: `molecule` area: use `BigInt` for `ipnft_token_id` +- Allow `molecule` and `molecule.dev` accounts separation +- Collection datasets will ignore add and move operations that don't change the entry path, ref, or extra attributes and return `CollectionUpdateUpToDate` +- GQL: `Search::query()`: case insensitive search. +- GQL: `MoleculeMut::create_project()`: generate lowercase project account name. - Discontinued binary releases for MacOS Intel architecture (see #1323) and Windows (as we only ever supported WSL2) +### Fixed +- Investigation: potential unstable ordering of dataset entry listings ## [0.249.1] - 2025-09-25 ### Fixed diff --git a/Cargo.lock b/Cargo.lock index c6beb6e239..a95f577773 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7033,6 +7033,7 @@ dependencies = [ "kamu-webhooks-inmem", "kamu-webhooks-services", "messaging-outbox", + "num-bigint", "nutype", "observability", "oop", diff --git a/resources/schema.gql b/resources/schema.gql index 4a46f3a77b..e36eba1008 100644 --- a/resources/schema.gql +++ b/resources/schema.gql @@ -474,6 +474,11 @@ Base64-encoded binary data (url-safe, no padding) """ scalar Base64Usnp +""" +A big integer scalar type. +""" +scalar BigInt + type BlockRef { name: String! blockHash: Multihash! @@ -795,6 +800,25 @@ type CreateAccountSuccess implements CreateAccountResult { message: String! } +type CreateAnnouncementErrorInvalidAttachment implements CreateAnnouncementResult { + message: String! + isSuccess: Boolean! +} + +interface CreateAnnouncementResult { + isSuccess: Boolean! + message: String! +} + +type CreateAnnouncementSuccess implements CreateAnnouncementResult { + """ + ID of the newly-created announcement + """ + announcementId: String! + isSuccess: Boolean! + message: String! +} + interface CreateDatasetFromSnapshotResult { isSuccess: Boolean! message: String! @@ -829,6 +853,23 @@ type CreateDatasetResultSuccess implements CreateDatasetResult & CreateDatasetFr message: String! } +type CreateProjectErrorConflict implements CreateProjectResult { + project: MoleculeProject! + isSuccess: Boolean! + message: String! +} + +interface CreateProjectResult { + isSuccess: Boolean! + message: String! +} + +type CreateProjectSuccess implements CreateProjectResult { + project: MoleculeProject! + isSuccess: Boolean! + message: String! +} + interface CreateTokenResult { message: String! } @@ -2570,6 +2611,11 @@ input InitiatorFilterInput @oneOf { accounts: [AccountID!] } +""" +A scalar that can represent any JSON value. +""" +scalar JSON + type JdbcDesc { url: String! } @@ -2820,6 +2866,190 @@ type ModifyPasswordWrongOldPassword implements ModifyPasswordResult { message: String! } +type Molecule { + """ + Looks up the project + """ + project(ipnftUid: String!): MoleculeProject + """ + List the registered projects + """ + projects(page: Int, perPage: Int): MoleculeProjectConnection! + """ + Latest activity events across all projects in reverse chronological + order + """ + activity(page: Int, perPage: Int): MoleculeProjectEventConnection! +} + +type MoleculeMut { + createProject(ipnftSymbol: String!, ipnftUid: String!, ipnftAddress: String!, ipnftTokenId: U256!): CreateProjectResult! + """ + Looks up the project + """ + project(ipnftUid: String!): MoleculeProjectMut +} + +type MoleculeProject { + """ + System time when this version was created/updated + """ + systemTime: DateTime! + """ + Event time when this version was created/updated + """ + eventTime: DateTime! + """ + Symbolic name of the project + """ + ipnftSymbol: String! + """ + Unique ID of the IPNFT as `{ipnftAddress}_{ipnftTokenId}` + """ + ipnftUid: String! + """ + Address of the IPNFT contract + """ + ipnftAddress: String! + """ + Token ID withing the IPNFT contract + """ + ipnftTokenId: BigInt! + """ + Project's organizational account + """ + account: Account! + """ + Project's data room dataset + """ + dataRoom: Dataset! + """ + Project's announcements dataset + """ + announcements: Dataset! + """ + Project's activity events in reverse chronological order + """ + activity(page: Int, perPage: Int): MoleculeProjectEventConnection! +} + +type MoleculeProjectConnection { + """ + A shorthand for `edges { node { ... } }` + """ + nodes: [MoleculeProject!]! + """ + Approximate number of total nodes + """ + totalCount: Int! + """ + Page information + """ + pageInfo: PageBasedInfo! + edges: [MoleculeProjectEdge!]! +} + +type MoleculeProjectEdge { + node: MoleculeProject! +} + +interface MoleculeProjectEvent { + project: MoleculeProject! + systemTime: DateTime! +} + +type MoleculeProjectEventAnnouncement implements MoleculeProjectEvent { + """ + Associated project + """ + project: MoleculeProject! + """ + Announcement record + """ + announcement: JSON! + systemTime: DateTime! +} + +type MoleculeProjectEventConnection { + """ + A shorthand for `edges { node { ... } }` + """ + nodes: [MoleculeProjectEvent!]! + """ + Page information + """ + pageInfo: PageBasedInfo! + edges: [MoleculeProjectEventEdge!]! +} + +type MoleculeProjectEventDataRoomEntryAdded implements MoleculeProjectEvent { + """ + Associated project + """ + project: MoleculeProject! + """ + Collection entry + """ + entry: CollectionEntry! + systemTime: DateTime! +} + +type MoleculeProjectEventDataRoomEntryRemoved implements MoleculeProjectEvent { + """ + Associated project + """ + project: MoleculeProject! + """ + Collection entry + """ + entry: CollectionEntry! + systemTime: DateTime! +} + +type MoleculeProjectEventDataRoomEntryUpdated implements MoleculeProjectEvent { + """ + Associated project + """ + project: MoleculeProject! + """ + Collection entry + """ + newEntry: CollectionEntry! + systemTime: DateTime! +} + +type MoleculeProjectEventEdge { + node: MoleculeProjectEvent! +} + +type MoleculeProjectEventFileUpdated implements MoleculeProjectEvent { + """ + Associated project + """ + project: MoleculeProject! + """ + Versioned file dataset + """ + dataset: Dataset! + """ + New file version entry + """ + newEntry: VersionedFileEntry! + systemTime: DateTime! +} + +type MoleculeProjectMut { + """ + Creates an announcement record for the project + """ + createAnnouncement( headline: String!, body: String!, + """ + List of dataset DIDs to link + """ + attachments: [String!], moleculeAccessLevel: String!, moleculeChangeBy: String! + ): CreateAnnouncementResult! +} + """ MQTT quality of service class. @@ -2871,6 +3101,10 @@ type Mutation { """ accounts: AccountsMut! """ + Temporary: Molecule-specific functionality group + """ + molecule: MoleculeMut! + """ Collaboration-related functionality group Allows setting permissions for multiple datasets in batch mode @@ -3062,6 +3296,10 @@ type Query { Admin-related functionality group """ admin: Admin! + """ + Temporary: Molecule-specific functionality group + """ + molecule: Molecule! } enum QueryDialect { @@ -3929,6 +4167,11 @@ type TriggerFlowSuccess implements TriggerFlowResult { message: String! } +""" +256-bit unsigned number. Can be constructed from string. +""" +scalar U256 + scalar URL interface UnsetRoleResult { diff --git a/src/adapter/graphql/Cargo.toml b/src/adapter/graphql/Cargo.toml index 976e2243c7..0d9fc354c5 100644 --- a/src/adapter/graphql/Cargo.toml +++ b/src/adapter/graphql/Cargo.toml @@ -65,6 +65,7 @@ async-graphql = { version = "7", default-features = false, features = [ async-trait = { version = "0.1", default-features = false } base64 = { version = "0.22", default-features = false } bytes = { version = "1", default-features = false } +num-bigint = { version = "0.4", default-features = false } chrono = "0.4" # TODO: Currently needed for type conversions but ideally should be encapsulated by kamu-core datafusion = { version = "50", default-features = false, features = ["serde"] } diff --git a/src/adapter/graphql/src/lib.rs b/src/adapter/graphql/src/lib.rs index fa7c54c146..1a2d6ed4c4 100644 --- a/src/adapter/graphql/src/lib.rs +++ b/src/adapter/graphql/src/lib.rs @@ -28,3 +28,7 @@ pub(crate) mod utils; pub use config::*; pub use guards::*; pub use root::*; + +pub mod molecule { + pub use crate::queries::molecule::{Molecule, molecule_subject}; +} diff --git a/src/adapter/graphql/src/mutations/mod.rs b/src/adapter/graphql/src/mutations/mod.rs index efb4130314..dccedddbe2 100644 --- a/src/adapter/graphql/src/mutations/mod.rs +++ b/src/adapter/graphql/src/mutations/mod.rs @@ -19,6 +19,7 @@ mod dataset_mut; mod datasets_mut; mod flows_mut; mod metadata_chain_mut; +mod molecule_mut; mod webhooks_mut; pub(crate) use account_access_token_mut::*; @@ -32,4 +33,5 @@ pub(crate) use dataset_mut::*; pub(crate) use datasets_mut::*; pub(crate) use flows_mut::*; pub(crate) use metadata_chain_mut::*; +pub(crate) use molecule_mut::*; pub(crate) use webhooks_mut::*; diff --git a/src/adapter/graphql/src/mutations/molecule_mut.rs b/src/adapter/graphql/src/mutations/molecule_mut.rs new file mode 100644 index 0000000000..7d8bb03b8e --- /dev/null +++ b/src/adapter/graphql/src/mutations/molecule_mut.rs @@ -0,0 +1,452 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu::domain; +use kamu_accounts::{AccountServiceExt as _, CreateAccountUseCaseOptions, CurrentAccountSubject}; +use kamu_core::DatasetRegistryExt; +use kamu_core::auth::DatasetAction; + +use crate::prelude::*; +use crate::queries::{Molecule, MoleculeProject, molecule_subject}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub(crate) struct MoleculeMut; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[common_macros::method_names_consts(const_value_prefix = "GQL: ")] +#[Object] +impl MoleculeMut { + #[graphql(guard = "LoggedInGuard")] + #[tracing::instrument(level = "info", name = MoleculeMut_create_project, skip_all, fields(?ipnft_symbol, ?ipnft_uid))] + async fn create_project( + &self, + ctx: &Context<'_>, + mut ipnft_symbol: String, + ipnft_uid: String, + ipnft_address: String, + ipnft_token_id: U256, + ) -> Result { + ipnft_symbol.make_ascii_lowercase(); + let lowercase_ipnft_symbol = ipnft_symbol; + + use datafusion::prelude::*; + + let ( + subject, + account_svc, + create_account_use_case, + create_dataset_use_case, + rebac_svc, + push_ingest_use_case, + ) = from_catalog_n!( + ctx, + CurrentAccountSubject, + dyn kamu_accounts::AccountService, + dyn kamu_accounts::CreateAccountUseCase, + dyn kamu_datasets::CreateDatasetFromSnapshotUseCase, + dyn kamu_auth_rebac::RebacService, + dyn domain::PushIngestDataUseCase + ); + + // Check auth + let subject_molecule = molecule_subject(ctx)?; + + if ipnft_uid != format!("{ipnft_address}_{}", ipnft_token_id.as_ref()) { + return Err(Error::new("Inconsistent ipnft info").into()); + } + + // Resolve projects dataset + let (projects_dataset, df) = + Molecule::get_projects_snapshot(ctx, DatasetAction::Write, true).await?; + + // Check for conflicts + if let Some(df) = df { + let df = df + .filter( + col("ipnft_uid") + .eq(lit(&ipnft_uid)) + .or(lower(col("ipnft_symbol")).eq(lit(&lowercase_ipnft_symbol))), + ) + .int_err()?; + + let records = df.collect_json_aos().await.int_err()?; + if let Some(record) = records.into_iter().next() { + let project = MoleculeProject::from_json(record); + return Ok(CreateProjectResult::Conflict(CreateProjectErrorConflict { + project, + })); + } + } + + // Create a project account + let molecule_account = account_svc + .try_get_account_by_id(subject.account_id()) + .await? + .unwrap(); + + let project_account_name: odf::AccountName = + format!("{}.{lowercase_ipnft_symbol}", molecule_account.account_name) + .parse() + .int_err()?; + + let project_email = format!("support+{project_account_name}@kamu.dev") + .parse() + .unwrap(); + + // TODO: Remove tolerance to accounts that already exist after we have account + // deletion api? Reusing existing accounts may be a security threat via + // name squatting. + let project_account = if let Some(acc) = account_svc + .account_by_name(&project_account_name) + .await + .int_err()? + { + acc + } else { + // TODO: Set avatar and display name? + // https://avatars.githubusercontent.com/u/37688345?s=200&v=4 + create_account_use_case + .execute_derived( + &molecule_account, + &project_account_name, + CreateAccountUseCaseOptions::builder() + .maybe_email(Some(project_email)) + .build(), + ) + .await + .int_err()? + }; + + // Create `data-room` dataset + let snapshot = Molecule::dataset_snapshot_data_room(odf::DatasetAlias::new( + Some(project_account_name.clone()), + odf::DatasetName::new_unchecked("data-room"), + )); + let data_room_create_res = create_dataset_use_case + .execute( + snapshot, + kamu_datasets::CreateDatasetUseCaseOptions { + dataset_visibility: odf::DatasetVisibility::Private, + }, + ) + .await + .int_err()?; + + // Create `announcements` dataset + let snapshot = Molecule::dataset_snapshot_announcements(odf::DatasetAlias::new( + Some(project_account_name.clone()), + odf::DatasetName::new_unchecked("announcements"), + )); + let announcements_create_res = create_dataset_use_case + .execute( + snapshot, + kamu_datasets::CreateDatasetUseCaseOptions { + dataset_visibility: odf::DatasetVisibility::Private, + }, + ) + .await + .int_err()?; + + // Give maintainer permissions to molecule + rebac_svc + .set_account_dataset_relation( + &subject_molecule.account_id, + kamu_auth_rebac::AccountToDatasetRelation::Maintainer, + &data_room_create_res.dataset_handle.id, + ) + .await + .int_err()?; + + rebac_svc + .set_account_dataset_relation( + &subject_molecule.account_id, + kamu_auth_rebac::AccountToDatasetRelation::Maintainer, + &announcements_create_res.dataset_handle.id, + ) + .await + .int_err()?; + + // Add project entry + let now = chrono::Utc::now(); + let project = MoleculeProject { + account_id: project_account.id, + system_time: now, + event_time: now, + ipnft_symbol: lowercase_ipnft_symbol, + ipnft_address, + ipnft_token_id: ipnft_token_id.into(), + ipnft_uid, + data_room_dataset_id: data_room_create_res.dataset_handle.id, + announcements_dataset_id: announcements_create_res.dataset_handle.id, + }; + + push_ingest_use_case + .execute( + &projects_dataset, + kamu_core::DataSource::Buffer( + project.to_bytes(odf::metadata::OperationType::Append), + ), + kamu_core::PushIngestDataUseCaseOptions { + source_name: None, + source_event_time: None, + is_ingest_from_upload: false, + media_type: Some(file_utils::MediaType::NDJSON.to_owned()), + expected_head: None, + }, + None, + ) + .await + .int_err()?; + + Ok(CreateProjectResult::Success(CreateProjectSuccess { + project, + })) + } + + /// Looks up the project + #[tracing::instrument(level = "info", name = MoleculeMut_project, skip_all, fields(?ipnft_uid))] + async fn project( + &self, + ctx: &Context<'_>, + ipnft_uid: String, + ) -> Result> { + use datafusion::logical_expr::{col, lit}; + + let Some(df) = Molecule::get_projects_snapshot(ctx, DatasetAction::Read, false) + .await? + .1 + else { + return Ok(None); + }; + + let df = df.filter(col("ipnft_uid").eq(lit(ipnft_uid))).int_err()?; + + let records = df.collect_json_aos().await.int_err()?; + if records.is_empty() { + return Ok(None); + } + + assert_eq!(records.len(), 1); + let entry = MoleculeProjectMut::from_json(records.into_iter().next().unwrap()); + + Ok(Some(entry)) + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[expect(dead_code)] +pub struct MoleculeProjectMut { + pub account_id: odf::AccountID, + pub data_room_dataset_id: odf::DatasetID, + pub announcements_dataset_id: odf::DatasetID, +} + +impl MoleculeProjectMut { + pub fn from_json(record: serde_json::Value) -> Self { + let serde_json::Value::Object(record) = record else { + unreachable!() + }; + + let account_id = + odf::AccountID::from_did_str(record["account_id"].as_str().unwrap()).unwrap(); + + let data_room_dataset_id = + odf::DatasetID::from_did_str(record["data_room_dataset_id"].as_str().unwrap()).unwrap(); + + let announcements_dataset_id = + odf::DatasetID::from_did_str(record["announcements_dataset_id"].as_str().unwrap()) + .unwrap(); + + Self { + account_id, + data_room_dataset_id, + announcements_dataset_id, + } + } +} + +#[common_macros::method_names_consts(const_value_prefix = "GQL: ")] +#[Object] +impl MoleculeProjectMut { + /// Creates an announcement record for the project + #[tracing::instrument(level = "info", name = MoleculeProjectMut_create_announcement, skip_all)] + async fn create_announcement( + &self, + ctx: &Context<'_>, + headline: String, + body: String, + #[graphql(desc = "List of dataset DIDs to link")] attachments: Option>, + molecule_access_level: String, + molecule_change_by: String, + ) -> Result { + let (dataset_reg, push_ingest_use_case) = from_catalog_n!( + ctx, + dyn domain::DatasetRegistry, + dyn domain::PushIngestDataUseCase + ); + + // Validate attachment links + let attachments = attachments.unwrap_or_default(); + for att in &attachments { + let did = match odf::DatasetID::from_did_str(att) { + Ok(did) => did, + Err(err) => { + return Ok(CreateAnnouncementResult::InvalidAttachment( + CreateAnnouncementErrorInvalidAttachment { + message: err.to_string(), + }, + )); + } + }; + + if dataset_reg + .try_resolve_dataset_handle_by_ref(&did.as_local_ref()) + .await? + .is_none() + { + return Ok(CreateAnnouncementResult::InvalidAttachment( + CreateAnnouncementErrorInvalidAttachment { + message: format!("Dataset {did} not found"), + }, + )); + } + } + + let dataset = dataset_reg + .get_dataset_by_id(&self.announcements_dataset_id) + .await + .int_err()?; + + let announcement_id = uuid::Uuid::new_v4(); + + let record = serde_json::json!({ + "op": u8::from(odf::metadata::OperationType::Append), + "announcement_id": announcement_id.to_string(), + "headline": headline, + "body": body, + "attachments": attachments, + "molecule_access_level": molecule_access_level, + "molecule_change_by": molecule_change_by, + }); + + push_ingest_use_case + .execute( + &dataset, + kamu_core::DataSource::Buffer(bytes::Bytes::from_owner( + record.to_string().into_bytes(), + )), + kamu_core::PushIngestDataUseCaseOptions { + source_name: None, + source_event_time: None, + is_ingest_from_upload: false, + media_type: Some(file_utils::MediaType::NDJSON.to_owned()), + expected_head: None, + }, + None, + ) + .await + .int_err()?; + + Ok(CreateAnnouncementResult::Success( + CreateAnnouncementSuccess { + announcement_id: announcement_id.to_string(), + }, + )) + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Interface)] +#[graphql( + field(name = "is_success", ty = "bool"), + field(name = "message", ty = "String") +)] +pub enum CreateProjectResult { + Success(CreateProjectSuccess), + Conflict(CreateProjectErrorConflict), +} + +#[derive(SimpleObject)] +#[graphql(complex)] +pub struct CreateProjectSuccess { + pub project: MoleculeProject, +} +#[ComplexObject] +impl CreateProjectSuccess { + async fn is_success(&self) -> bool { + true + } + async fn message(&self) -> String { + String::new() + } +} + +#[derive(SimpleObject)] +#[graphql(complex)] +pub struct CreateProjectErrorConflict { + project: MoleculeProject, +} +#[ComplexObject] +impl CreateProjectErrorConflict { + async fn is_success(&self) -> bool { + false + } + async fn message(&self) -> String { + format!( + "Conflict with existing project {} ({})", + self.project.ipnft_symbol, self.project.ipnft_uid, + ) + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Interface)] +#[graphql( + field(name = "is_success", ty = "bool"), + field(name = "message", ty = "String") +)] +pub enum CreateAnnouncementResult { + Success(CreateAnnouncementSuccess), + InvalidAttachment(CreateAnnouncementErrorInvalidAttachment), +} + +#[derive(SimpleObject)] +#[graphql(complex)] +pub struct CreateAnnouncementSuccess { + /// ID of the newly-created announcement + pub announcement_id: String, +} +#[ComplexObject] +impl CreateAnnouncementSuccess { + async fn is_success(&self) -> bool { + true + } + async fn message(&self) -> String { + String::new() + } +} + +#[derive(SimpleObject)] +#[graphql(complex)] +pub struct CreateAnnouncementErrorInvalidAttachment { + message: String, +} +#[ComplexObject] +impl CreateAnnouncementErrorInvalidAttachment { + async fn is_success(&self) -> bool { + false + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/graphql/src/queries/mod.rs b/src/adapter/graphql/src/queries/mod.rs index 330bc0e068..bfc24f293b 100644 --- a/src/adapter/graphql/src/queries/mod.rs +++ b/src/adapter/graphql/src/queries/mod.rs @@ -15,6 +15,7 @@ mod build_info; mod data; mod datasets; mod flows; +pub(crate) mod molecule; mod search; mod tasks; mod webhooks; @@ -27,6 +28,7 @@ pub(crate) use build_info::*; pub(crate) use data::*; pub(crate) use datasets::*; pub(crate) use flows::*; +pub(crate) use molecule::*; pub(crate) use search::*; pub(crate) use tasks::*; pub(crate) use webhooks::*; diff --git a/src/adapter/graphql/src/queries/molecule.rs b/src/adapter/graphql/src/queries/molecule.rs new file mode 100644 index 0000000000..d229135a1a --- /dev/null +++ b/src/adapter/graphql/src/queries/molecule.rs @@ -0,0 +1,1052 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use kamu::domain; +use kamu_accounts::{CurrentAccountSubject, LoggedAccount}; +use kamu_auth_rebac::{RebacDatasetRefUnresolvedError, RebacDatasetRegistryFacade}; +use kamu_core::auth::DatasetAction; +use kamu_core::{DatasetRegistryExt, ResolvedDataset}; +use odf::utils::data::DataFrameExt; + +use super::{CollectionEntry, VersionedFileEntry}; +use crate::prelude::*; +use crate::queries::{Account, Dataset}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub struct Molecule; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +impl Molecule { + pub fn dataset_snapshot_projects(alias: odf::DatasetAlias) -> odf::metadata::DatasetSnapshot { + odf::DatasetSnapshot { + name: alias, + kind: odf::DatasetKind::Root, + metadata: vec![ + odf::metadata::AddPushSource { + source_name: "default".to_string(), + read: odf::metadata::ReadStepNdJson { + schema: Some( + [ + "op INT", + "account_id STRING", + "ipnft_symbol STRING", + "ipnft_uid STRING", + "ipnft_address STRING", + "ipnft_token_id STRING", + "data_room_dataset_id STRING", + "announcements_dataset_id STRING", + ] + .into_iter() + .map(str::to_string) + .collect(), + ), + ..Default::default() + } + .into(), + preprocess: None, + merge: odf::metadata::MergeStrategyChangelogStream { + primary_key: vec!["account_id".to_string()], + } + .into(), + } + .into(), + odf::metadata::SetInfo { + description: Some("List of projects tracked by Molecule.xyz".into()), + keywords: Some(vec![ + "DeSci".to_string(), + "BioTech".to_string(), + "Funding".to_string(), + "Crypto".to_string(), + ]), + } + .into(), + odf::metadata::SetAttachments { + attachments: odf::metadata::AttachmentsEmbedded { + items: vec![odf::metadata::AttachmentEmbedded { + path: "README.md".into(), + content: indoc::indoc!( + r#" + # Projects tracked by Molecule.xyz + + Molecule is a decentralized biotech protocol, + building a web3 marketplace for research-related IP. + Our platform and scalable framework for biotech DAOs + connects academics and biotech companies with quick and + easy funding, while enabling patient, researcher, and funder + communities to directly govern and own research-related IP. + + Find out more at https://molecule.xyz/ + "# + ) + .into(), + }], + } + .into(), + } + .into(), + ], + } + } + + pub fn dataset_snapshot_data_room(alias: odf::DatasetAlias) -> odf::DatasetSnapshot { + super::Collection::dataset_snapshot( + alias, + vec![ColumnInput { + name: "molecule_change_by".into(), + data_type: DataTypeInput { + ddl: "STRING".into(), + }, + }], + vec![], + ) + .expect("Schema is always valid as there are no user inputs") + } + + pub fn dataset_snapshot_announcements(alias: odf::DatasetAlias) -> odf::DatasetSnapshot { + odf::DatasetSnapshot { + name: alias, + kind: odf::DatasetKind::Root, + metadata: vec![ + odf::metadata::AddPushSource { + source_name: "default".to_string(), + read: odf::metadata::ReadStepNdJson { + schema: Some( + [ + "op INT", + "announcement_id STRING", + "headline STRING", + "body STRING", + "attachments Array", + "molecule_access_level STRING", + "molecule_change_by STRING", + ] + .into_iter() + .map(str::to_string) + .collect(), + ), + ..Default::default() + } + .into(), + preprocess: None, + merge: odf::metadata::MergeStrategyChangelogStream { + primary_key: vec!["announcement_id".to_string()], + } + .into(), + } + .into(), + odf::metadata::SetInfo { + description: Some("Project announcements".into()), + keywords: Some(vec![ + "DeSci".to_string(), + "BioTech".to_string(), + "Funding".to_string(), + "Crypto".to_string(), + ]), + } + .into(), + odf::metadata::SetAttachments { + attachments: odf::metadata::AttachmentsEmbedded { + items: vec![odf::metadata::AttachmentEmbedded { + path: "README.md".into(), + content: indoc::indoc!( + r#" + # Project announcements + + TODO + "# + ) + .into(), + }], + } + .into(), + } + .into(), + ], + } + } + + pub async fn get_projects_dataset( + ctx: &Context<'_>, + molecule_account_name: &odf::AccountName, + action: DatasetAction, + create_if_not_exist: bool, + ) -> Result { + let dataset_reg = from_catalog_n!(ctx, dyn RebacDatasetRegistryFacade); + + const PROJECTS_DATASET_NAME: &str = "projects"; + + let projects_dataset_alias = odf::DatasetAlias::new( + Some(molecule_account_name.clone()), + odf::DatasetName::new_unchecked(PROJECTS_DATASET_NAME), + ); + + match dataset_reg + .resolve_dataset_by_ref(&projects_dataset_alias.as_local_ref(), action) + .await + { + Ok(ds) => Ok(ds), + Err(RebacDatasetRefUnresolvedError::NotFound(_)) if create_if_not_exist => { + let create_dataset_use_case = + from_catalog_n!(ctx, dyn kamu_datasets::CreateDatasetFromSnapshotUseCase); + + let snapshot = Self::dataset_snapshot_projects(odf::DatasetAlias::new( + None, + odf::DatasetName::new_unchecked(PROJECTS_DATASET_NAME), + )); + + let create_res = create_dataset_use_case + .execute( + snapshot, + kamu_datasets::CreateDatasetUseCaseOptions { + dataset_visibility: odf::DatasetVisibility::Private, + }, + ) + .await + .int_err()?; + + // TODO: Use case should return ResolvedDataset directly + Ok(ResolvedDataset::new( + create_res.dataset, + create_res.dataset_handle, + )) + } + Err(RebacDatasetRefUnresolvedError::NotFound(err)) => Err(GqlError::Gql(err.into())), + Err(RebacDatasetRefUnresolvedError::Access(err)) => Err(GqlError::Access(err)), + Err(err) => Err(err.int_err().into()), + } + } + + pub async fn get_projects_snapshot( + ctx: &Context<'_>, + action: DatasetAction, + create_if_not_exist: bool, + ) -> Result<(ResolvedDataset, Option)> { + let query_svc = from_catalog_n!(ctx, dyn domain::QueryService); + + let subject_molecule = molecule_subject(ctx)?; + + // Resolve projects dataset + let projects_dataset = Self::get_projects_dataset( + ctx, + &subject_molecule.account_name, + action, + create_if_not_exist, + ) + .await?; + + // Query full data + let df = match query_svc + .get_data( + &projects_dataset.get_handle().as_local_ref(), + domain::GetDataOptions::default(), + ) + .await + { + Ok(res) => Ok(res.df), + Err(domain::QueryError::Access(err)) => Err(GqlError::Access(err)), + Err(err) => Err(err.int_err().into()), + }?; + + // Project into snapshot + let df = if let Some(df) = df { + Some( + odf::utils::data::changelog::project( + df, + &["account_id".to_string()], + &odf::metadata::DatasetVocabulary::default(), + ) + .int_err()?, + ) + } else { + None + }; + + Ok((projects_dataset, df)) + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[common_macros::method_names_consts(const_value_prefix = "GQL: ")] +#[Object] +impl Molecule { + const DEFAULT_PROJECTS_PER_PAGE: usize = 15; + const DEFAULT_ACTIVITY_EVENTS_PER_PAGE: usize = 15; + + /// Looks up the project + #[tracing::instrument(level = "info", name = Molecule_project, skip_all, fields(?ipnft_uid))] + async fn project( + &self, + ctx: &Context<'_>, + ipnft_uid: String, + ) -> Result> { + use datafusion::logical_expr::{col, lit}; + + let Some(df) = Self::get_projects_snapshot(ctx, DatasetAction::Read, false) + .await? + .1 + else { + return Ok(None); + }; + + let df = df.filter(col("ipnft_uid").eq(lit(ipnft_uid))).int_err()?; + + let records = df.collect_json_aos().await.int_err()?; + if records.is_empty() { + return Ok(None); + } + + assert_eq!(records.len(), 1); + let entry = MoleculeProject::from_json(records.into_iter().next().unwrap()); + + Ok(Some(entry)) + } + + /// List the registered projects + #[tracing::instrument(level = "info", name = Molecule_projects, skip_all)] + async fn projects( + &self, + ctx: &Context<'_>, + page: Option, + per_page: Option, + ) -> Result { + use datafusion::logical_expr::col; + + let page = page.unwrap_or(0); + let per_page = per_page.unwrap_or(Self::DEFAULT_PROJECTS_PER_PAGE); + + let Some(df) = Self::get_projects_snapshot(ctx, DatasetAction::Read, false) + .await? + .1 + else { + return Ok(MoleculeProjectConnection::new(Vec::new(), 0, per_page, 0)); + }; + + let total_count = df.clone().count().await.int_err()?; + let df = df + .sort(vec![col("ipnft_symbol").sort(true, false)]) + .int_err()? + .limit(page * per_page, Some(per_page)) + .int_err()?; + + let records = df.collect_json_aos().await.int_err()?; + + let nodes = records + .into_iter() + .map(MoleculeProject::from_json) + .collect(); + + Ok(MoleculeProjectConnection::new( + nodes, + page, + per_page, + total_count, + )) + } + + /// Latest activity events across all projects in reverse chronological + /// order + #[tracing::instrument(level = "info", name = Molecule_activity, skip_all)] + async fn activity( + &self, + ctx: &Context<'_>, + page: Option, + per_page: Option, + ) -> Result { + use datafusion::logical_expr::{col, lit}; + + let page = page.unwrap_or(0); + let per_page = per_page.unwrap_or(Self::DEFAULT_ACTIVITY_EVENTS_PER_PAGE); + + let query_svc = from_catalog_n!(ctx, dyn domain::QueryService); + + // TODO: PERF: This "brute force" approach will not scale with growth of + // projects and has to be revisited + let Some(df) = Self::get_projects_snapshot(ctx, DatasetAction::Read, false) + .await? + .1 + else { + return Ok(MoleculeProjectEventConnection::new(Vec::new(), 0, per_page)); + }; + + let projects_by_announcement_dataset: std::collections::BTreeMap< + odf::DatasetID, + Arc, + > = df + .collect_json_aos() + .await + .int_err()? + .into_iter() + .map(MoleculeProject::from_json) + .map(|p| (p.announcements_dataset_id.clone(), Arc::new(p))) + .collect(); + + let announcement_dataset_refs: Vec = projects_by_announcement_dataset + .keys() + .map(odf::DatasetID::as_local_ref) + .collect(); + + let mut announcement_dataframes = Vec::new(); + const DATASET_ID_COL: &str = "__dataset_id__"; + + for resp in query_svc + .get_data_multi(&announcement_dataset_refs, true) + .await + .int_err()? + { + let Some(df) = resp.df else { + // Skip empty datasets + continue; + }; + + // Attach dataset ID as a column to records to associate them later + let df = df + .with_column(DATASET_ID_COL, lit(resp.dataset_handle.id.to_string())) + .int_err()?; + + announcement_dataframes.push(df); + } + + let Some(df_global_announcements) = + DataFrameExt::union_all(announcement_dataframes).int_err()? + else { + return Ok(MoleculeProjectEventConnection::new(Vec::new(), 0, per_page)); + }; + + let vocab = odf::metadata::DatasetVocabulary::default(); + + let df_global_announcements = df_global_announcements + .sort(vec![col(&vocab.event_time_column).sort(false, false)]) + .int_err()? + .limit(per_page * page, Some(per_page)) + .int_err()?; + + let nodes = df_global_announcements + .collect_json_aos() + .await + .int_err()? + .into_iter() + .map(|mut record| { + let obj = record.as_object_mut().unwrap(); + obj.remove(&vocab.offset_column); + obj.remove(&vocab.operation_type_column); + + let did = odf::DatasetID::from_did_str( + obj.remove(DATASET_ID_COL).unwrap().as_str().unwrap(), + ) + .unwrap(); + + MoleculeProjectEvent::Announcement(MoleculeProjectEventAnnouncement { + project: Arc::clone(&projects_by_announcement_dataset[&did]), + announcement: record, + }) + }) + .collect(); + + Ok(MoleculeProjectEventConnection::new(nodes, page, per_page)) + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(SimpleObject, Clone)] +#[graphql(complex)] +pub struct MoleculeProject { + #[graphql(skip)] + pub account_id: odf::AccountID, + + /// System time when this version was created/updated + pub system_time: DateTime, + + /// Event time when this version was created/updated + pub event_time: DateTime, + + /// Symbolic name of the project + pub ipnft_symbol: String, + + /// Unique ID of the IPNFT as `{ipnftAddress}_{ipnftTokenId}` + pub ipnft_uid: String, + + /// Address of the IPNFT contract + pub ipnft_address: String, + + // NOTE: For backward compatibility (and existing projects), + // we continue using BigInt type, which is wider than needed U256. + /// Token ID withing the IPNFT contract + pub ipnft_token_id: BigInt, + + #[graphql(skip)] + pub data_room_dataset_id: odf::DatasetID, + + #[graphql(skip)] + pub announcements_dataset_id: odf::DatasetID, +} + +impl MoleculeProject { + pub fn from_json(record: serde_json::Value) -> Self { + let serde_json::Value::Object(mut record) = record else { + unreachable!() + }; + + // Parse system columns + let vocab = odf::metadata::DatasetVocabulary::default(); + + record.remove(&vocab.offset_column); + + record.remove(&vocab.operation_type_column); + + let system_time = DateTime::parse_from_rfc3339( + record + .remove(&vocab.system_time_column) + .unwrap() + .as_str() + .unwrap(), + ) + .unwrap() + .into(); + + let event_time = DateTime::parse_from_rfc3339( + record + .remove(&vocab.event_time_column) + .unwrap() + .as_str() + .unwrap(), + ) + .unwrap() + .into(); + + // Parse core columns + let account_id = + odf::AccountID::from_did_str(record.remove("account_id").unwrap().as_str().unwrap()) + .unwrap(); + + let ipnft_symbol = record + .remove("ipnft_symbol") + .unwrap() + .as_str() + .unwrap() + .to_string(); + + let ipnft_uid = record + .remove("ipnft_uid") + .unwrap() + .as_str() + .unwrap() + .to_string(); + + let ipnft_address = record + .remove("ipnft_address") + .unwrap() + .as_str() + .unwrap() + .to_string(); + + let ipnft_token_id = { + let value = record.remove("ipnft_token_id").unwrap(); + + if let Some(s) = value.as_str() { + BigInt::new(s.parse().unwrap()) + } else if let Some(n) = value.as_number() { + BigInt::new(n.to_string().parse().unwrap()) + } else { + panic!("Unexpected value for ipnft_token_id: {value:?}"); + } + }; + + let data_room_dataset_id = odf::DatasetID::from_did_str( + record + .remove("data_room_dataset_id") + .unwrap() + .as_str() + .unwrap(), + ) + .unwrap(); + + let announcements_dataset_id = odf::DatasetID::from_did_str( + record + .remove("announcements_dataset_id") + .unwrap() + .as_str() + .unwrap(), + ) + .unwrap(); + + Self { + account_id, + system_time, + event_time, + ipnft_symbol, + ipnft_uid, + ipnft_address, + ipnft_token_id, + data_room_dataset_id, + announcements_dataset_id, + } + } + + pub fn to_record_data(&self) -> serde_json::Value { + let mut r = serde_json::Value::Object(Default::default()); + r["account_id"] = self.account_id.to_string().into(); + r["ipnft_symbol"] = self.ipnft_symbol.clone().into(); + r["ipnft_uid"] = self.ipnft_uid.clone().into(); + r["ipnft_address"] = self.ipnft_address.clone().into(); + r["ipnft_token_id"] = self.ipnft_token_id.clone().into_inner().to_string().into(); + r["data_room_dataset_id"] = self.data_room_dataset_id.to_string().into(); + r["announcements_dataset_id"] = self.announcements_dataset_id.to_string().into(); + r + } + + pub fn to_bytes(&self, op: odf::metadata::OperationType) -> bytes::Bytes { + let mut record = self.to_record_data(); + record["op"] = u8::from(op).into(); + + let buf = record.to_string().into_bytes(); + bytes::Bytes::from_owner(buf) + } + + async fn get_activity_announcements( + self: &Arc, + ctx: &Context<'_>, + limit: usize, + ) -> Result> { + let query_svc = from_catalog_n!(ctx, dyn domain::QueryService); + + let df = match query_svc + .tail( + &self.announcements_dataset_id.as_local_ref(), + 0, + limit as u64, + domain::GetDataOptions::default(), + ) + .await + { + Ok(res) => match res.df { + Some(df) => df, + None => return Ok(Vec::new()), + }, + Err(domain::QueryError::Access(err)) => return Err(GqlError::Access(err)), + Err(err) => return Err(err.int_err().into()), + }; + + let records = df.collect_json_aos().await.int_err()?; + + let vocab = odf::metadata::DatasetVocabulary::default(); + + let records = records + .into_iter() + .map(|mut record| { + let obj = record.as_object_mut().unwrap(); + obj.remove(&vocab.offset_column); + obj.remove(&vocab.operation_type_column); + + MoleculeProjectEvent::Announcement(MoleculeProjectEventAnnouncement { + project: Arc::clone(self), + announcement: record, + }) + }) + .collect(); + + Ok(records) + } + + async fn get_activity_data_room( + self: &Arc, + ctx: &Context<'_>, + limit: usize, + ) -> Result> { + let query_svc = from_catalog_n!(ctx, dyn domain::QueryService); + + let df = match query_svc + .tail( + &self.data_room_dataset_id.as_local_ref(), + 0, + limit as u64, + domain::GetDataOptions::default(), + ) + .await + { + Ok(res) => match res.df { + Some(df) => df, + None => return Ok(Vec::new()), + }, + Err(domain::QueryError::Access(err)) => return Err(GqlError::Access(err)), + Err(err) => return Err(err.int_err().into()), + }; + + let records = df.collect_json_aos().await.int_err()?; + + let vocab = odf::metadata::DatasetVocabulary::default(); + + let records = records + .into_iter() + .map(|record| { + let op = record[&vocab.operation_type_column].as_i64().unwrap(); + let op = odf::metadata::OperationType::try_from(u8::try_from(op).unwrap()).unwrap(); + (op, record) + }) + .filter(|(op, _)| *op != odf::metadata::OperationType::CorrectFrom) + .map(|(op, record)| { + let entry = CollectionEntry::from_json(record)?; + + let event = match op { + odf::metadata::OperationType::Append => { + MoleculeProjectEvent::DataRoomEntryAdded( + MoleculeProjectEventDataRoomEntryAdded { + project: Arc::clone(self), + entry, + }, + ) + } + odf::metadata::OperationType::Retract => { + MoleculeProjectEvent::DataRoomEntryRemoved( + MoleculeProjectEventDataRoomEntryRemoved { + project: Arc::clone(self), + entry, + }, + ) + } + odf::metadata::OperationType::CorrectFrom => unreachable!(), + odf::metadata::OperationType::CorrectTo => { + MoleculeProjectEvent::DataRoomEntryUpdated( + MoleculeProjectEventDataRoomEntryUpdated { + project: Arc::clone(self), + new_entry: entry, + }, + ) + } + }; + + Ok(event) + }) + .collect::>()?; + + Ok(records) + } + + // TODO: PERF: This will get very expensive on large number of files and + // versions - we likely need to create a derivative dataset to track activity, + // so we could query it in a single go. + async fn get_activity_files( + self: &Arc, + ctx: &Context<'_>, + limit: usize, + ) -> Result> { + let (query_svc, dataset_reg) = + from_catalog_n!(ctx, dyn domain::QueryService, dyn domain::DatasetRegistry); + + let df = match query_svc + .get_data( + &self.data_room_dataset_id.as_local_ref(), + domain::GetDataOptions::default(), + ) + .await + { + Ok(res) => match res.df { + Some(df) => df, + None => return Ok(Vec::new()), + }, + Err(domain::QueryError::Access(err)) => return Err(GqlError::Access(err)), + Err(err) => return Err(err.int_err().into()), + }; + + let records = df + .select(vec![datafusion::prelude::col("ref")]) + .int_err()? + .distinct() + .int_err()? + .collect_json_aos() + .await + .int_err()?; + + let dataset_ids: Vec = records + .into_iter() + .map(|r| odf::DatasetID::from_did_str(r["ref"].as_str().unwrap()).unwrap()) + .collect(); + + let mut events = Vec::new(); + + for did in dataset_ids { + let resolved_dataset = dataset_reg + .get_dataset_by_ref(&did.as_local_ref()) + .await + .int_err()?; + + let df = match query_svc + .tail( + &resolved_dataset.get_handle().as_local_ref(), + 0, + limit as u64, + domain::GetDataOptions::default(), + ) + .await + { + Ok(res) => match res.df { + Some(df) => df, + None => continue, + }, + // Skip datasets we don't have access to + Err(domain::QueryError::Access(_)) => continue, + Err(err) => return Err(err.int_err().into()), + }; + + let records = df.collect_json_aos().await.int_err()?; + + let project_account = Account::from_account_id(ctx, self.account_id.clone()).await?; + + // TODO: Assuming every collection entry is a versioned file + for record in records { + let dataset = Dataset::new_access_checked( + project_account.clone(), + resolved_dataset.get_handle().clone(), + ); + + let entry = VersionedFileEntry::from_json(resolved_dataset.clone(), record)?; + + events.push(MoleculeProjectEvent::FileUpdated( + MoleculeProjectEventFileUpdated { + project: Arc::clone(self), + dataset, + new_entry: entry, + }, + )); + } + } + + Ok(events) + } +} + +#[common_macros::method_names_consts(const_value_prefix = "GQL: ")] +#[ComplexObject] +impl MoleculeProject { + const DEFAULT_ACTIVITY_EVENTS_PER_PAGE: usize = 15; + + /// Project's organizational account + #[tracing::instrument(level = "info", name = MoleculeProject_account, skip_all)] + async fn account(&self, ctx: &Context<'_>) -> Result { + let account = Account::from_account_id(ctx, self.account_id.clone()).await?; + Ok(account) + } + + /// Project's data room dataset + #[tracing::instrument(level = "info", name = MoleculeProject_data_room, skip_all)] + async fn data_room(&self, ctx: &Context<'_>) -> Result { + match Dataset::try_from_ref(ctx, &self.data_room_dataset_id.as_local_ref()).await? { + TransformInputDataset::Accessible(ds) => Ok(ds.dataset), + TransformInputDataset::NotAccessible(_) => Err(GqlError::Access( + odf::AccessError::Unauthorized("Dataset inaccessible".into()), + )), + } + } + + /// Project's announcements dataset + #[tracing::instrument(level = "info", name = MoleculeProject_announcements, skip_all)] + async fn announcements(&self, ctx: &Context<'_>) -> Result { + match Dataset::try_from_ref(ctx, &self.announcements_dataset_id.as_local_ref()).await? { + TransformInputDataset::Accessible(ds) => Ok(ds.dataset), + TransformInputDataset::NotAccessible(_) => Err(GqlError::Access( + odf::AccessError::Unauthorized("Dataset inaccessible".into()), + )), + } + } + + /// Project's activity events in reverse chronological order + #[tracing::instrument(level = "info", name = MoleculeProject_activity, skip_all)] + async fn activity( + &self, + ctx: &Context<'_>, + page: Option, + per_page: Option, + ) -> Result { + let page = page.unwrap_or(0); + let per_page = per_page.unwrap_or(Self::DEFAULT_ACTIVITY_EVENTS_PER_PAGE); + + // We fetch up to limit from all sources, then combine them and sort in reverse + // chronological order, and finally truncate to the requested page size + let limit = per_page * (page + 1); + + let project = Arc::new(self.clone()); + + let mut events = Vec::new(); + events.append(&mut project.get_activity_announcements(ctx, limit).await?); + events.append(&mut project.get_activity_data_room(ctx, limit).await?); + events.append(&mut project.get_activity_files(ctx, limit).await?); + + let mut events_timed = Vec::with_capacity(events.len()); + for e in events { + let system_time = e.system_time(ctx).await?; + events_timed.push((system_time, e)); + } + + events_timed.sort_by(|a, b| a.0.cmp(&b.0).reverse()); + + let nodes = events_timed + .into_iter() + .skip(page * per_page) + .take(per_page) + .map(|(_t, e)| e) + .collect(); + + Ok(MoleculeProjectEventConnection::new(nodes, page, per_page)) + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +page_based_connection!( + MoleculeProject, + MoleculeProjectConnection, + MoleculeProjectEdge +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Interface)] +#[graphql(field(name = "project", ty = "&Arc"))] +#[graphql(field(name = "system_time", ty = "DateTime"))] +pub enum MoleculeProjectEvent { + DataRoomEntryAdded(MoleculeProjectEventDataRoomEntryAdded), + DataRoomEntryRemoved(MoleculeProjectEventDataRoomEntryRemoved), + DataRoomEntryUpdated(MoleculeProjectEventDataRoomEntryUpdated), + Announcement(MoleculeProjectEventAnnouncement), + FileUpdated(MoleculeProjectEventFileUpdated), +} + +#[derive(SimpleObject)] +#[graphql(complex)] +pub struct MoleculeProjectEventDataRoomEntryAdded { + /// Associated project + pub project: Arc, + /// Collection entry + pub entry: CollectionEntry, +} +#[ComplexObject] +impl MoleculeProjectEventDataRoomEntryAdded { + async fn system_time(&self) -> DateTime { + self.entry.system_time + } +} + +#[derive(SimpleObject)] +#[graphql(complex)] +pub struct MoleculeProjectEventDataRoomEntryRemoved { + /// Associated project + pub project: Arc, + /// Collection entry + pub entry: CollectionEntry, +} +#[ComplexObject] +impl MoleculeProjectEventDataRoomEntryRemoved { + async fn system_time(&self) -> DateTime { + self.entry.system_time + } +} + +#[derive(SimpleObject)] +#[graphql(complex)] +pub struct MoleculeProjectEventDataRoomEntryUpdated { + /// Associated project + pub project: Arc, + /// Collection entry + pub new_entry: CollectionEntry, +} +#[ComplexObject] +impl MoleculeProjectEventDataRoomEntryUpdated { + async fn system_time(&self) -> DateTime { + self.new_entry.system_time + } +} + +#[derive(SimpleObject)] +#[graphql(complex)] +pub struct MoleculeProjectEventAnnouncement { + /// Associated project + pub project: Arc, + /// Announcement record + pub announcement: serde_json::Value, +} +#[ComplexObject] +impl MoleculeProjectEventAnnouncement { + async fn system_time(&self) -> DateTime { + let vocab = odf::metadata::DatasetVocabulary::default(); + + DateTime::parse_from_rfc3339( + self.announcement[&vocab.event_time_column] + .as_str() + .unwrap(), + ) + .unwrap() + .into() + } +} + +#[derive(SimpleObject)] +#[graphql(complex)] +pub struct MoleculeProjectEventFileUpdated { + /// Associated project + pub project: Arc, + + /// Versioned file dataset + pub dataset: Dataset, + + /// New file version entry + pub new_entry: VersionedFileEntry, +} +#[ComplexObject] +impl MoleculeProjectEventFileUpdated { + async fn system_time(&self) -> DateTime { + self.new_entry.system_time + } +} + +page_based_stream_connection!( + MoleculeProjectEvent, + MoleculeProjectEventConnection, + MoleculeProjectEventEdge +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +const MOLECULE_ORG_ACCOUNTS: [&str; 2] = ["molecule", "molecule.dev"]; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub fn molecule_subject(ctx: &Context<'_>) -> Result { + // Check auth + let subject = from_catalog_n!(ctx, CurrentAccountSubject); + let subject_molecule = match subject.as_ref() { + CurrentAccountSubject::Logged(subj) + if MOLECULE_ORG_ACCOUNTS.contains(&subj.account_name.as_str()) => + { + subj + } + _ => { + return Err(GqlError::Access(odf::AccessError::Unauthorized( + format!( + "Only accounts {} can provision projects", + MOLECULE_ORG_ACCOUNTS + .iter() + .map(|account_name| format!("'{account_name}'")) + .collect::>() + .join(", ") + ) + .as_str() + .into(), + ))); + } + }; + Ok(subject_molecule.clone()) +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/graphql/src/root.rs b/src/adapter/graphql/src/root.rs index 1db5a35d57..f6cc933ea3 100644 --- a/src/adapter/graphql/src/root.rs +++ b/src/adapter/graphql/src/root.rs @@ -73,6 +73,11 @@ impl Query { async fn admin(&self) -> Admin { Admin } + + /// Temporary: Molecule-specific functionality group + async fn molecule(&self) -> Molecule { + Molecule + } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -104,6 +109,11 @@ impl Mutation { AccountsMut } + /// Temporary: Molecule-specific functionality group + async fn molecule(&self) -> MoleculeMut { + MoleculeMut + } + /// Collaboration-related functionality group /// /// Allows setting permissions for multiple datasets in batch mode diff --git a/src/adapter/graphql/src/scalars/big_int.rs b/src/adapter/graphql/src/scalars/big_int.rs new file mode 100644 index 0000000000..9e7d2e57f7 --- /dev/null +++ b/src/adapter/graphql/src/scalars/big_int.rs @@ -0,0 +1,50 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use async_graphql::{InputValueError, InputValueResult, Scalar, ScalarType, Value}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[nutype::nutype(derive(AsRef, Clone, Debug, Into))] +pub struct BigInt(num_bigint::BigInt); + +#[Scalar] +/// A big integer scalar type. +impl ScalarType for BigInt { + fn parse(value: Value) -> InputValueResult { + match value { + Value::String(s) => { + let big_int = s + .parse() + .map_err(|e| InputValueError::custom(format!("Invalid BigInt: {e}")))?; + Ok(BigInt::new(big_int)) + } + Value::Number(n) => { + let n = n.to_string(); + + Err(InputValueError::custom(format!( + "Invalid BigInt: the value is expected to be a string (\"{n}\") instead of a \ + number ({n})" + ))) + } + v @ (Value::Null + | Value::Boolean(_) + | Value::Binary(_) + | Value::Enum(_) + | Value::List(_) + | Value::Object(_)) => Err(InputValueError::expected_type(v)), + } + } + + fn to_value(&self) -> Value { + Value::String(self.as_ref().to_string()) + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/graphql/src/scalars/mod.rs b/src/adapter/graphql/src/scalars/mod.rs index 38dc185ccf..88faeb51d1 100644 --- a/src/adapter/graphql/src/scalars/mod.rs +++ b/src/adapter/graphql/src/scalars/mod.rs @@ -11,6 +11,7 @@ mod access_token; mod account; mod auth; mod base64; +mod big_int; mod collection_path; mod data_batch; mod data_query; @@ -39,12 +40,14 @@ mod os_path; mod pagination; mod task_id; mod task_status; +mod u256; mod webhook_scalars; pub use access_token::*; pub use account::*; pub use auth::*; pub use base64::*; +pub use big_int::*; pub use collection_path::*; pub use data_batch::*; pub use data_query::*; @@ -73,6 +76,7 @@ pub use os_path::*; pub use pagination::*; pub use task_id::*; pub use task_status::*; +pub use u256::*; pub use webhook_scalars::*; macro_rules! simple_scalar { diff --git a/src/adapter/graphql/src/scalars/u256.rs b/src/adapter/graphql/src/scalars/u256.rs new file mode 100644 index 0000000000..ea35b59167 --- /dev/null +++ b/src/adapter/graphql/src/scalars/u256.rs @@ -0,0 +1,85 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::sync::LazyLock; + +use async_graphql::{InputValueError, InputValueResult, Scalar, ScalarType, Value}; + +use crate::scalars::BigInt; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +static MAX_U256: LazyLock = LazyLock::new(|| { + const ONES_256_BYTES: [u8; 256] = [b'1'; 256]; + const BINARY_RADIX: u32 = 2; + + let max_u256_as_str = std::str::from_utf8(&ONES_256_BYTES).unwrap(); + + // 115792089237316195423570985008687907853269984665640564039457584007913129639935 + num_bigint::BigInt::parse_bytes(max_u256_as_str.as_bytes(), BINARY_RADIX).unwrap() +}); + +#[nutype::nutype(derive(AsRef, Clone, Debug, Into))] +pub struct U256(num_bigint::BigInt); + +#[Scalar] +/// 256-bit unsigned number. Can be constructed from string. +impl ScalarType for U256 { + fn parse(value: Value) -> InputValueResult { + match value { + Value::String(s) => { + let big_int = s + .parse::() + .map_err(|e| InputValueError::custom(format!("Invalid U256: {e}")))?; + + if matches!(big_int.sign(), num_bigint::Sign::Minus) { + return Err(InputValueError::custom(format!( + "Invalid U256: negative values are not allowed for U256: {big_int}" + ))); + } + + if big_int > *MAX_U256 { + return Err(InputValueError::custom(format!( + "Invalid U256: value exceeds maximum for U256: {big_int}" + ))); + } + + Ok(U256::new(big_int)) + } + Value::Number(n) => { + let n = n.to_string(); + + Err(InputValueError::custom(format!( + "Invalid U256: the value is expected to be a string (\"{n}\") instead of a \ + number ({n})" + ))) + } + v @ (Value::Null + | Value::Boolean(_) + | Value::Binary(_) + | Value::Enum(_) + | Value::List(_) + | Value::Object(_)) => Err(InputValueError::expected_type(v)), + } + } + + fn to_value(&self) -> Value { + Value::String(self.as_ref().to_string()) + } +} + +impl From for BigInt { + fn from(value: U256) -> Self { + // U256 type is narrower than BigInt, so we can perform conversion without + // additional manipulation. + BigInt::new(value.into_inner()) + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/graphql/tests/tests/mod.rs b/src/adapter/graphql/tests/tests/mod.rs index 4319aa19d4..a00bb9506d 100644 --- a/src/adapter/graphql/tests/tests/mod.rs +++ b/src/adapter/graphql/tests/tests/mod.rs @@ -11,6 +11,7 @@ mod test_accounts; mod test_auth; mod test_error_handling; mod test_gql_account_flow_triggers; +mod test_gql_custom_molecule; mod test_gql_data; mod test_gql_dataset_adapter_collection; mod test_gql_dataset_adapter_versioned_file; diff --git a/src/adapter/graphql/tests/tests/test_gql_custom_molecule.rs b/src/adapter/graphql/tests/tests/test_gql_custom_molecule.rs new file mode 100644 index 0000000000..26c021b9e5 --- /dev/null +++ b/src/adapter/graphql/tests/tests/test_gql_custom_molecule.rs @@ -0,0 +1,1499 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use base64::Engine as _; +use bon::bon; +use indoc::indoc; +use kamu::testing::MockDatasetActionAuthorizer; +use kamu_accounts::{CurrentAccountSubject, LoggedAccount}; +use kamu_core::*; +use kamu_datasets::{CreateDatasetFromSnapshotUseCase, CreateDatasetResult}; +use serde_json::json; + +use crate::utils::{BaseGQLDatasetHarness, PredefinedAccountOpts, authentication_catalogs_ext}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +const CREATE_PROJECT: &str = indoc!( + r#" + mutation ( + $ipnftSymbol: String!, + $ipnftUid: String!, + $ipnftAddress: String!, + $ipnftTokenId: Int!, + ) { + molecule { + createProject( + ipnftSymbol: $ipnftSymbol, + ipnftUid: $ipnftUid, + ipnftAddress: $ipnftAddress, + ipnftTokenId: $ipnftTokenId, + ) { + isSuccess + message + __typename + ... on CreateProjectSuccess { + project { + account { id accountName } + ipnftUid + dataRoom { id alias } + announcements { id alias } + } + } + } + } + } + "# +); + +const CREATE_VERSIONED_FILE: &str = indoc!( + r#" + mutation ($datasetAlias: DatasetAlias!) { + datasets { + createVersionedFile( + datasetAlias: $datasetAlias, + datasetVisibility: PRIVATE, + ) { + ... on CreateDatasetResultSuccess { + dataset { + id + } + } + } + } + } + "# +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[test_log::test(tokio::test)] +async fn test_molecule_provision_project() { + let harness = GraphQLDatasetsHarness::builder() + .tenancy_config(TenancyConfig::MultiTenant) + .build() + .await; + + // Setup `projects` dataset + harness.create_projects_dataset().await; + + // List of projects is empty + const LIST_PROJECTS: &str = indoc!( + r#" + query { + molecule { + projects(page: 0, perPage: 100) { + nodes { + ipnftSymbol + ipnftUid + } + } + } + } + "# + ); + + let res = harness + .execute_authorized_query(async_graphql::Request::new(LIST_PROJECTS)) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["molecule"]["projects"]["nodes"], + json!([]), + ); + + // Create first project + let res = harness + .execute_authorized_query(async_graphql::Request::new(CREATE_PROJECT).variables( + async_graphql::Variables::from_json(json!({ + "ipnftSymbol": "VITAFAST", + "ipnftUid": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1_9", + "ipnftAddress": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1", + "ipnftTokenId": "9", + })), + )) + .await; + + assert!(res.is_ok(), "{res:#?}"); + let res = &res.data.into_json().unwrap()["molecule"]["createProject"]; + let project_account_id = res["project"]["account"]["id"].as_str().unwrap(); + let project_account_name = res["project"]["account"]["accountName"].as_str().unwrap(); + let data_room_did = res["project"]["dataRoom"]["id"].as_str().unwrap(); + let announcements_did = res["project"]["announcements"]["id"].as_str().unwrap(); + assert_ne!(project_account_id, harness.molecule_account_id.to_string()); + assert_eq!(project_account_name, "molecule.vitafast"); + pretty_assertions::assert_eq!( + *res, + json!({ + "isSuccess": true, + "message": "", + "__typename": "CreateProjectSuccess", + "project": { + "account": { + "id": project_account_id, + "accountName": project_account_name, + }, + "ipnftUid": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1_9", + "dataRoom": { + "id": data_room_did, + "alias": format!("{project_account_name}/data-room"), + }, + "announcements": { + "id": announcements_did, + "alias": format!("{project_account_name}/announcements"), + }, + }, + }), + ); + + // Read back the project entry by `ipnftUid`` + let res = harness + .execute_authorized_query( + async_graphql::Request::new(indoc!( + r#" + query ($ipnftUid: String!) { + molecule { + project(ipnftUid: $ipnftUid) { + account { id accountName } + ipnftSymbol + ipnftUid + ipnftAddress + ipnftTokenId + } + } + } + "# + )) + .variables(async_graphql::Variables::from_json(json!({ + "ipnftUid": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1_9", + }))), + ) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["molecule"]["project"], + json!({ + "account": { + "id": project_account_id, + "accountName": project_account_name, + }, + "ipnftSymbol": "vitafast", + "ipnftUid": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1_9", + "ipnftAddress": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1", + "ipnftTokenId": "9", + }), + ); + + // Project appears in the list + let res = harness + .execute_authorized_query(async_graphql::Request::new(LIST_PROJECTS)) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["molecule"]["projects"]["nodes"], + json!([{ + "ipnftSymbol": "vitafast", + "ipnftUid": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1_9", + }]), + ); + + // Ensure errors on ipnftUid collision + let res = harness + .execute_authorized_query(async_graphql::Request::new(CREATE_PROJECT).variables( + async_graphql::Variables::from_json(json!({ + "ipnftSymbol": "vitaslow", + "ipnftUid": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1_9", + "ipnftAddress": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1", + "ipnftTokenId": "9", + })), + )) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["molecule"]["createProject"], + json!({ + "isSuccess": false, + "message": "Conflict with existing project vitafast (0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1_9)", + "__typename": "CreateProjectErrorConflict", + }), + ); + + // Ensure errors on ipnftSymbol collision + let res = harness + .execute_authorized_query(async_graphql::Request::new(CREATE_PROJECT).variables( + async_graphql::Variables::from_json(json!({ + "ipnftSymbol": "vitafast", + "ipnftUid": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1_1", + "ipnftAddress": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1", + "ipnftTokenId": "1", + })), + )) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["molecule"]["createProject"], + json!({ + "isSuccess": false, + "message": "Conflict with existing project vitafast (0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1_9)", + "__typename": "CreateProjectErrorConflict", + }), + ); + + // Create another project + // Ensure errors on ipnftUid collision + let res = harness + .execute_authorized_query(async_graphql::Request::new(CREATE_PROJECT).variables( + async_graphql::Variables::from_json(json!({ + "ipnftSymbol": "vitaslow", + "ipnftUid": r#"0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1_108494037067113761580099112583860151730516105403483528465874625006707409835912"#, + "ipnftAddress": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1", + "ipnftTokenId": "108494037067113761580099112583860151730516105403483528465874625006707409835912", + })), + )) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["molecule"]["createProject"]["isSuccess"], + json!(true), + ); + + // Both projects appear in the list + let res = harness + .execute_authorized_query(async_graphql::Request::new(LIST_PROJECTS)) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["molecule"]["projects"]["nodes"], + json!([ + { + "ipnftSymbol": "vitafast", + "ipnftUid": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1_9", + }, + { + "ipnftSymbol": "vitaslow", + "ipnftUid": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1_108494037067113761580099112583860151730516105403483528465874625006707409835912", + } + ]), + ); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[test_log::test(tokio::test)] +async fn test_molecule_data_room_operations() { + let harness = GraphQLDatasetsHarness::builder() + .tenancy_config(TenancyConfig::MultiTenant) + .build() + .await; + + // Create project (projects dataset is auto-created) + let res = harness + .execute_authorized_query(async_graphql::Request::new(CREATE_PROJECT).variables( + async_graphql::Variables::from_json(json!({ + "ipnftSymbol": "vitafast", + "ipnftUid": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1_9", + "ipnftAddress": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1", + "ipnftTokenId": "9", + })), + )) + .await; + + assert!(res.is_ok(), "{res:#?}"); + let res = &res.data.into_json().unwrap()["molecule"]["createProject"]; + let project_account_name = res["project"]["account"]["accountName"].as_str().unwrap(); + let data_room_did = res["project"]["dataRoom"]["id"].as_str().unwrap(); + + // Create versioned file + // Molecule account can create new dataset in project org + let res = harness + .execute_authorized_query( + async_graphql::Request::new(CREATE_VERSIONED_FILE).variables( + async_graphql::Variables::from_json(json!({ + // TODO: Need ability to create datasets with target AccountID + "datasetAlias": format!("{project_account_name}/test-file"), + })), + ), + ) + .await; + + assert!(res.is_ok(), "{res:#?}"); + let test_file_did = res.data.into_json().unwrap()["datasets"]["createVersionedFile"]["dataset"] + ["id"] + .as_str() + .unwrap() + .to_string(); + + // Upload new file version + // Molecule should have write access to datasets it creates in the project + let res = harness + .execute_authorized_query( + async_graphql::Request::new(indoc!( + r#" + mutation ($datasetId: DatasetID!, $content: Base64Usnp!) { + datasets { + byId(datasetId: $datasetId) { + asVersionedFile { + uploadNewVersion(content: $content) { + isSuccess + message + } + } + } + } + } + "# + )) + .variables(async_graphql::Variables::from_json(json!({ + "datasetId": &test_file_did, + "content": base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(b"hello"), + }))), + ) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["datasets"]["byId"]["asVersionedFile"]["uploadNewVersion"], + json!({ + "isSuccess": true, + "message": "", + }) + ); + + // Link new file into the project data room + // Molecule should have write access to core datasets + let res = harness + .execute_authorized_query( + async_graphql::Request::new(indoc!( + r#" + mutation ($datasetId: DatasetID!, $entry: CollectionEntryInput!) { + datasets { + byId(datasetId: $datasetId) { + asCollection { + addEntry(entry: $entry) { + isSuccess + message + } + } + } + } + } + "# + )) + .variables(async_graphql::Variables::from_json(json!({ + "datasetId": data_room_did, + "entry": { + "path": "/foo", + "ref": test_file_did, + "extraData": { + "molecule_change_by": "did:ethr:0x43f3F090af7fF638ad0EfD64c5354B6945fE75BC" + }, + }, + }))), + ) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["datasets"]["byId"]["asCollection"]["addEntry"], + json!({ + "isSuccess": true, + "message": "", + }) + ); + + // Go from project to a file in one query + let res = harness + .execute_authorized_query( + async_graphql::Request::new(indoc!( + r#" + query ($ipnftUid: String!) { + molecule { + project(ipnftUid: $ipnftUid) { + dataRoom { + asCollection { + latest { + entry(path: "/foo") { + extraData + asDataset { + asVersionedFile { + latest { + content + extraData + } + } + } + } + } + } + } + } + } + } + "# + )) + .variables(async_graphql::Variables::from_json(json!({ + "ipnftUid": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1_9", + }))), + ) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["molecule"]["project"]["dataRoom"]["asCollection"]["latest"] + ["entry"], + json!({ + "asDataset": { + "asVersionedFile": { + "latest": { + "content": base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(b"hello"), + "extraData": {}, + } + } + }, + "extraData": { + "molecule_change_by": "did:ethr:0x43f3F090af7fF638ad0EfD64c5354B6945fE75BC", + }, + }), + ); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[test_log::test(tokio::test)] +async fn test_molecule_announcements_operations() { + let harness = GraphQLDatasetsHarness::builder() + .tenancy_config(TenancyConfig::MultiTenant) + .build() + .await; + + // Create project (projects dataset is auto-created) + let res = harness + .execute_authorized_query(async_graphql::Request::new(CREATE_PROJECT).variables( + async_graphql::Variables::from_json(json!({ + "ipnftSymbol": "vitafast", + "ipnftUid": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1_9", + "ipnftAddress": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1", + "ipnftTokenId": "9", + })), + )) + .await; + + assert!(res.is_ok(), "{res:#?}"); + let res = &res.data.into_json().unwrap()["molecule"]["createProject"]; + let project_account_name = res["project"]["account"]["accountName"].as_str().unwrap(); + let announcements_did = res["project"]["announcements"]["id"].as_str().unwrap(); + + // Announcements are empty + const LIST_ANNOUNCEMENTS: &str = indoc!( + r#" + query ($datasetId: DatasetID!) { + datasets { + byId(datasetId: $datasetId) { + data { + tail(dataFormat: JSON_AOS) { + ... on DataQueryResultSuccess { + data { content } + } + } + } + } + } + } + "# + ); + + let res = harness + .execute_authorized_query(async_graphql::Request::new(LIST_ANNOUNCEMENTS).variables( + async_graphql::Variables::from_json(json!({ + "datasetId": announcements_did, + })), + )) + .await; + + assert!(res.is_ok(), "{res:#?}"); + let content = res.data.into_json().unwrap()["datasets"]["byId"]["data"]["tail"]["data"] + ["content"] + .as_str() + .unwrap() + .to_string(); + let content: serde_json::Value = serde_json::from_str(&content).unwrap(); + pretty_assertions::assert_eq!(content, json!([])); + + // Create a few versioned files to use as attachments + let res = harness + .execute_authorized_query( + async_graphql::Request::new(CREATE_VERSIONED_FILE).variables( + async_graphql::Variables::from_json(json!({ + // TODO: Need ability to create datasets with target AccountID + "datasetAlias": format!("{project_account_name}/test-file-1"), + })), + ), + ) + .await; + + assert!(res.is_ok(), "{res:#?}"); + let test_file_1 = res.data.into_json().unwrap()["datasets"]["createVersionedFile"]["dataset"] + ["id"] + .as_str() + .unwrap() + .to_string(); + + let res = harness + .execute_authorized_query( + async_graphql::Request::new(CREATE_VERSIONED_FILE).variables( + async_graphql::Variables::from_json(json!({ + // TODO: Need ability to create datasets with target AccountID + "datasetAlias": format!("{project_account_name}/test-file-2"), + })), + ), + ) + .await; + + assert!(res.is_ok(), "{res:#?}"); + let test_file_2 = res.data.into_json().unwrap()["datasets"]["createVersionedFile"]["dataset"] + ["id"] + .as_str() + .unwrap() + .to_string(); + + // Create an announcement without attachments + const CREATE_ANNOUNCEMENT: &str = indoc!( + r#" + mutation ( + $ipnftUid: String!, + $headline: String!, + $body: String!, + $attachments: [String!], + $moleculeAccessLevel: String!, + $moleculeChangeBy: String!, + ) { + molecule { + project(ipnftUid: $ipnftUid) { + createAnnouncement( + headline: $headline, + body: $body, + attachments: $attachments, + moleculeAccessLevel: $moleculeAccessLevel, + moleculeChangeBy: $moleculeChangeBy, + ) { + isSuccess + message + } + } + } + } + "# + ); + + let res = harness + .execute_authorized_query(async_graphql::Request::new(CREATE_ANNOUNCEMENT).variables( + async_graphql::Variables::from_json(json!({ + "ipnftUid": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1_9", + "headline": "Test announcement 1", + "body": "Blah blah", + "moleculeAccessLevel": "holders", + "moleculeChangeBy": "did:ethr:0x43f3F090af7fF638ad0EfD64c5354B6945fE75BC", + })), + )) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["molecule"]["project"]["createAnnouncement"], + json!({ + "isSuccess": true, + "message": "", + }) + ); + + // Create an announcement with one attachment + let res = harness + .execute_authorized_query(async_graphql::Request::new(CREATE_ANNOUNCEMENT).variables( + async_graphql::Variables::from_json(json!({ + "ipnftUid": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1_9", + "headline": "Test announcement 2", + "body": "Blah blah", + "attachments": [test_file_1], + "moleculeAccessLevel": "holders", + "moleculeChangeBy": "did:ethr:0x43f3F090af7fF638ad0EfD64c5354B6945fE75BC", + })), + )) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["molecule"]["project"]["createAnnouncement"], + json!({ + "isSuccess": true, + "message": "", + }) + ); + + // Create an announcement with two attachments + let res = harness + .execute_authorized_query(async_graphql::Request::new(CREATE_ANNOUNCEMENT).variables( + async_graphql::Variables::from_json(json!({ + "ipnftUid": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1_9", + "headline": "Test announcement 3", + "body": "Blah blah", + "attachments": [test_file_1, test_file_2], + "moleculeAccessLevel": "holders", + "moleculeChangeBy": "did:ethr:0x43f3F090af7fF638ad0EfD64c5354B6945fE75BC", + })), + )) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["molecule"]["project"]["createAnnouncement"], + json!({ + "isSuccess": true, + "message": "", + }) + ); + + // Create an announcement with invalid attachment DID + let res = harness + .execute_authorized_query(async_graphql::Request::new(CREATE_ANNOUNCEMENT).variables( + async_graphql::Variables::from_json(json!({ + "ipnftUid": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1_9", + "headline": "Test announcement 3", + "body": "Blah blah", + "attachments": ["x"], + "moleculeAccessLevel": "holders", + "moleculeChangeBy": "did:ethr:0x43f3F090af7fF638ad0EfD64c5354B6945fE75BC", + })), + )) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["molecule"]["project"]["createAnnouncement"], + json!({ + "isSuccess": false, + "message": "Value 'x' is not a valid did:odf", + }) + ); + + // Create an announcement with attachment DID that does not exist + let res = harness + .execute_authorized_query(async_graphql::Request::new(CREATE_ANNOUNCEMENT).variables( + async_graphql::Variables::from_json(json!({ + "ipnftUid": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1_9", + "headline": "Test announcement 3", + "body": "Blah blah", + "attachments": [odf::DatasetID::new_seeded_ed25519(b"does-not-exist").to_string()], + "moleculeAccessLevel": "holders", + "moleculeChangeBy": "did:ethr:0x43f3F090af7fF638ad0EfD64c5354B6945fE75BC", + })), + )) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["molecule"]["project"]["createAnnouncement"], + json!({ + "isSuccess": false, + "message": "Dataset did:odf:fed011ba79f25e520298ba6945dd6197083a366364bef178d5899b100c434748d88e5 not found", + }) + ); + + // Announcements are listed as expected + let res = harness + .execute_authorized_query(async_graphql::Request::new(LIST_ANNOUNCEMENTS).variables( + async_graphql::Variables::from_json(json!({ + "datasetId": announcements_did, + })), + )) + .await; + + assert!(res.is_ok(), "{res:#?}"); + let content = res.data.into_json().unwrap()["datasets"]["byId"]["data"]["tail"]["data"] + ["content"] + .as_str() + .unwrap() + .to_string(); + let mut content: serde_json::Value = serde_json::from_str(&content).unwrap(); + let any = ""; + content.as_array_mut().unwrap().iter_mut().for_each(|r| { + let obj = r.as_object_mut().unwrap(); + obj["system_time"] = any.to_string().into(); + obj["event_time"] = any.to_string().into(); + obj["announcement_id"] = any.to_string().into(); + }); + pretty_assertions::assert_eq!( + content, + json!( + [ + { + "molecule_access_level": "holders", + "announcement_id": any, + "attachments": [], + "body": "Blah blah", + "molecule_change_by": "did:ethr:0x43f3F090af7fF638ad0EfD64c5354B6945fE75BC", + "event_time": any, + "headline": "Test announcement 1", + "offset": 0, + "op": 0, + "system_time": any, + }, + { + "molecule_access_level": "holders", + "announcement_id": any, + "attachments": [test_file_1], + "body": "Blah blah", + "molecule_change_by": "did:ethr:0x43f3F090af7fF638ad0EfD64c5354B6945fE75BC", + "event_time": any, + "headline": "Test announcement 2", + "offset": 1, + "op": 0, + "system_time": any, + }, + { + "molecule_access_level": "holders", + "announcement_id": any, + "attachments": [test_file_1, test_file_2], + "body": "Blah blah", + "molecule_change_by": "did:ethr:0x43f3F090af7fF638ad0EfD64c5354B6945fE75BC", + "event_time": any, + "headline": "Test announcement 3", + "offset": 2, + "op": 0, + "system_time": any, + }, + ] + ) + ); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[test_log::test(tokio::test)] +async fn test_molecule_activity() { + let harness = GraphQLDatasetsHarness::builder() + .tenancy_config(TenancyConfig::MultiTenant) + .build() + .await; + + // Create project (projects dataset is auto-created) + let res = harness + .execute_authorized_query(async_graphql::Request::new(CREATE_PROJECT).variables( + async_graphql::Variables::from_json(json!({ + "ipnftSymbol": "vitafast", + "ipnftUid": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1_9", + "ipnftAddress": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1", + "ipnftTokenId": "9", + })), + )) + .await; + + assert!(res.is_ok(), "{res:#?}"); + let res = &res.data.into_json().unwrap()["molecule"]["createProject"]; + let project_account_name = res["project"]["account"]["accountName"].as_str().unwrap(); + let data_room_did = res["project"]["dataRoom"]["id"].as_str().unwrap(); + + // Create a few versioned files + let res = harness + .execute_authorized_query( + async_graphql::Request::new(CREATE_VERSIONED_FILE).variables( + async_graphql::Variables::from_json(json!({ + // TODO: Need ability to create datasets with target AccountID + "datasetAlias": format!("{project_account_name}/test-file-1"), + })), + ), + ) + .await; + + assert!(res.is_ok(), "{res:#?}"); + let test_file_1 = res.data.into_json().unwrap()["datasets"]["createVersionedFile"]["dataset"] + ["id"] + .as_str() + .unwrap() + .to_string(); + + let res = harness + .execute_authorized_query( + async_graphql::Request::new(CREATE_VERSIONED_FILE).variables( + async_graphql::Variables::from_json(json!({ + // TODO: Need ability to create datasets with target AccountID + "datasetAlias": format!("{project_account_name}/test-file-2"), + })), + ), + ) + .await; + + assert!(res.is_ok(), "{res:#?}"); + let test_file_2 = res.data.into_json().unwrap()["datasets"]["createVersionedFile"]["dataset"] + ["id"] + .as_str() + .unwrap() + .to_string(); + + // Upload new file versions + const UPLOAD_NEW_VERSION: &str = indoc!( + r#" + mutation ($datasetId: DatasetID!, $content: Base64Usnp!) { + datasets { + byId(datasetId: $datasetId) { + asVersionedFile { + uploadNewVersion(content: $content) { + isSuccess + message + } + } + } + } + } + "# + ); + let res = harness + .execute_authorized_query(async_graphql::Request::new(UPLOAD_NEW_VERSION).variables( + async_graphql::Variables::from_json(json!({ + "datasetId": &test_file_1, + "content": base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(b"file 1"), + })), + )) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["datasets"]["byId"]["asVersionedFile"]["uploadNewVersion"], + json!({ + "isSuccess": true, + "message": "", + }) + ); + + let res = harness + .execute_authorized_query(async_graphql::Request::new(UPLOAD_NEW_VERSION).variables( + async_graphql::Variables::from_json(json!({ + "datasetId": &test_file_2, + "content": base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(b"file 2"), + })), + )) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["datasets"]["byId"]["asVersionedFile"]["uploadNewVersion"], + json!({ + "isSuccess": true, + "message": "", + }) + ); + + // Link new file into the project data room + const COLLECTION_ADD_ENTRY: &str = indoc!( + r#" + mutation ($datasetId: DatasetID!, $entry: CollectionEntryInput!) { + datasets { + byId(datasetId: $datasetId) { + asCollection { + addEntry(entry: $entry) { + isSuccess + message + } + } + } + } + } + "# + ); + + let res = harness + .execute_authorized_query(async_graphql::Request::new(COLLECTION_ADD_ENTRY).variables( + async_graphql::Variables::from_json(json!({ + "datasetId": data_room_did, + "entry": { + "path": "/foo", + "ref": test_file_1, + "extraData": { + "molecule_change_by": "did:ethr:0x43f3F090af7fF638ad0EfD64c5354B6945fE75BC" + }, + }, + })), + )) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["datasets"]["byId"]["asCollection"]["addEntry"], + json!({ + "isSuccess": true, + "message": "", + }) + ); + + let res = harness + .execute_authorized_query(async_graphql::Request::new(COLLECTION_ADD_ENTRY).variables( + async_graphql::Variables::from_json(json!({ + "datasetId": data_room_did, + "entry": { + "path": "/bar", + "ref": test_file_2, + "extraData": { + "molecule_change_by": "did:ethr:0x43f3F090af7fF638ad0EfD64c5354B6945fE75BC" + }, + }, + })), + )) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["datasets"]["byId"]["asCollection"]["addEntry"], + json!({ + "isSuccess": true, + "message": "", + }) + ); + + // Move a file (retract + append) + let res = harness + .execute_authorized_query( + async_graphql::Request::new(indoc!( + r#" + mutation ($datasetId: DatasetID!, $pathFrom: CollectionPath!, $pathTo: CollectionPath!) { + datasets { + byId(datasetId: $datasetId) { + asCollection { + moveEntry(pathFrom: $pathFrom, pathTo: $pathTo) { + isSuccess + message + } + } + } + } + } + "# + )) + .variables(async_graphql::Variables::from_json(json!({ + "datasetId": &data_room_did, + "pathFrom": "/bar", + "pathTo": "/baz" + }))), + ) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["datasets"]["byId"]["asCollection"]["moveEntry"], + json!({ + "isSuccess": true, + "message": "", + }) + ); + + // Update a file (correction from-to) + let res = harness + .execute_authorized_query( + async_graphql::Request::new(indoc!( + r#" + mutation ($datasetId: DatasetID!, $pathFrom: CollectionPath!, $pathTo: CollectionPath!, $extraData: JSON) { + datasets { + byId(datasetId: $datasetId) { + asCollection { + moveEntry(pathFrom: $pathFrom, pathTo: $pathTo, extraData: $extraData) { + isSuccess + message + } + } + } + } + } + "# + )) + .variables(async_graphql::Variables::from_json(json!({ + "datasetId": &data_room_did, + "pathFrom": "/foo", + "pathTo": "/foo", + "extraData": { + "molecule_change_by": "did:ethr:0x43f3F090af7fF638ad0EfD64c5354B6945fE75BD" + }, + }))), + ) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["datasets"]["byId"]["asCollection"]["moveEntry"], + json!({ + "isSuccess": true, + "message": "", + }) + ); + + // Create an announcement + const CREATE_ANNOUNCEMENT: &str = indoc!( + r#" + mutation ( + $ipnftUid: String!, + $headline: String!, + $body: String!, + $attachments: [String!], + $moleculeAccessLevel: String!, + $moleculeChangeBy: String!, + ) { + molecule { + project(ipnftUid: $ipnftUid) { + createAnnouncement( + headline: $headline, + body: $body, + attachments: $attachments, + moleculeAccessLevel: $moleculeAccessLevel, + moleculeChangeBy: $moleculeChangeBy, + ) { + isSuccess + message + } + } + } + } + "# + ); + + let res = harness + .execute_authorized_query(async_graphql::Request::new(CREATE_ANNOUNCEMENT).variables( + async_graphql::Variables::from_json(json!({ + "ipnftUid": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1_9", + "headline": "Test announcement 1", + "body": "Blah blah", + "attachments": [test_file_1, test_file_2], + "moleculeAccessLevel": "holders", + "moleculeChangeBy": "did:ethr:0x43f3F090af7fF638ad0EfD64c5354B6945fE75BC", + })), + )) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["molecule"]["project"]["createAnnouncement"], + json!({ + "isSuccess": true, + "message": "", + }) + ); + + // Upload new file version + let res = harness + .execute_authorized_query(async_graphql::Request::new(UPLOAD_NEW_VERSION).variables( + async_graphql::Variables::from_json(json!({ + "datasetId": &test_file_1, + "content": base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(b"file 1 - updated"), + })), + )) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["datasets"]["byId"]["asVersionedFile"]["uploadNewVersion"], + json!({ + "isSuccess": true, + "message": "", + }) + ); + + // Remove a file + let res = harness + .execute_authorized_query( + async_graphql::Request::new(indoc!( + r#" + mutation ($datasetId: DatasetID!, $path: CollectionPath!) { + datasets { + byId(datasetId: $datasetId) { + asCollection { + removeEntry(path: $path) { + isSuccess + message + } + } + } + } + } + "# + )) + .variables(async_graphql::Variables::from_json(json!({ + "datasetId": &data_room_did, + "path": "/bar", + }))), + ) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["datasets"]["byId"]["asCollection"]["removeEntry"], + json!({ + "isSuccess": true, + "message": "", + }) + ); + + // Check project activity events + const LIST_EVENTS: &str = indoc!( + r#" + query ($ipnftUid: String!) { + molecule { + project(ipnftUid: $ipnftUid) { + activity { + nodes { + __typename + ... on MoleculeProjectEventDataRoomEntryAdded { + entry { + path + } + } + ... on MoleculeProjectEventDataRoomEntryRemoved { + entry { + path + } + } + ... on MoleculeProjectEventDataRoomEntryUpdated { + newEntry { + path + } + } + ... on MoleculeProjectEventAnnouncement { + announcement + } + ... on MoleculeProjectEventFileUpdated { + dataset { alias } + newEntry { + version + } + } + } + } + } + } + } + "# + ); + let res = harness + .execute_authorized_query(async_graphql::Request::new(LIST_EVENTS).variables( + async_graphql::Variables::from_json(json!({ + "ipnftUid": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc1_9", + })), + )) + .await; + + assert!(res.is_ok(), "{res:#?}"); + let mut json = res.data.into_json().unwrap(); + let nodes = &mut json["molecule"]["project"]["activity"]["nodes"]; + + let any = serde_json::Value::Null; + nodes[1]["announcement"]["announcement_id"] = any.clone(); + nodes[1]["announcement"]["system_time"] = any.clone(); + nodes[1]["announcement"]["event_time"] = any.clone(); + + pretty_assertions::assert_eq!( + *nodes, + json!([ + { + "__typename": "MoleculeProjectEventFileUpdated", + "dataset": { + "alias": "molecule.vitafast/test-file-1", + }, + "newEntry": { + "version": 2, + }, + }, + { + "__typename": "MoleculeProjectEventAnnouncement", + "announcement": { + "announcement_id": &any, + "attachments": [&test_file_1, &test_file_2], + "body": "Blah blah", + "event_time": &any, + "headline": "Test announcement 1", + "molecule_access_level": "holders", + "molecule_change_by": "did:ethr:0x43f3F090af7fF638ad0EfD64c5354B6945fE75BC", + "system_time": &any, + }, + }, + { + "__typename": "MoleculeProjectEventDataRoomEntryUpdated", + "newEntry": { + "path": "/foo", + }, + }, + { + "__typename": "MoleculeProjectEventDataRoomEntryAdded", + "entry": { + "path": "/baz", + }, + }, + { + "__typename": "MoleculeProjectEventDataRoomEntryRemoved", + "entry": { + "path": "/bar", + }, + }, + { + "__typename": "MoleculeProjectEventDataRoomEntryAdded", + "entry": { + "path": "/bar", + }, + }, + { + "__typename": "MoleculeProjectEventDataRoomEntryAdded", + "entry": { + "path": "/foo", + }, + }, + { + "__typename": "MoleculeProjectEventFileUpdated", + "dataset": { + "alias": "molecule.vitafast/test-file-2", + }, + "newEntry": { + "version": 1, + }, + }, + { + "__typename": "MoleculeProjectEventFileUpdated", + "dataset": { + "alias": "molecule.vitafast/test-file-1", + }, + "newEntry": { + "version": 1, + }, + }, + ]) + ); + + /////////////////////////////////////////////////////////////////////////////// + + // Create another project + let res = harness + .execute_authorized_query(async_graphql::Request::new(CREATE_PROJECT).variables( + async_graphql::Variables::from_json(json!({ + "ipnftSymbol": "vitaslow", + "ipnftUid": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc2_10", + "ipnftAddress": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc2", + "ipnftTokenId": "10", + })), + )) + .await; + + assert!(res.is_ok(), "{res:#?}"); + + // Create an announcement + let res = harness + .execute_authorized_query(async_graphql::Request::new(CREATE_ANNOUNCEMENT).variables( + async_graphql::Variables::from_json(json!({ + "ipnftUid": "0xcaD88677CA87a7815728C72D74B4ff4982d54Fc2_10", + "headline": "Test announcement 2", + "body": "Blah blah bleh", + "attachments": [], + "moleculeAccessLevel": "holders", + "moleculeChangeBy": "did:ethr:0x43f3F090af7fF638ad0EfD64c5354B6945fE75BC", + })), + )) + .await; + + assert!(res.is_ok(), "{res:#?}"); + pretty_assertions::assert_eq!( + res.data.into_json().unwrap()["molecule"]["project"]["createAnnouncement"], + json!({ + "isSuccess": true, + "message": "", + }) + ); + + // Check global activity events + const LIST_ACTIVITY: &str = indoc!( + r#" + query { + molecule { + activity { + nodes { + __typename + project { + ipnftSymbol + } + ... on MoleculeProjectEventDataRoomEntryAdded { + entry { + path + } + } + ... on MoleculeProjectEventDataRoomEntryRemoved { + entry { + path + } + } + ... on MoleculeProjectEventDataRoomEntryUpdated { + newEntry { + path + } + } + ... on MoleculeProjectEventAnnouncement { + announcement + } + ... on MoleculeProjectEventFileUpdated { + dataset { alias } + newEntry { + version + } + } + } + } + } + } + "# + ); + let res = harness + .execute_authorized_query(async_graphql::Request::new(LIST_ACTIVITY)) + .await; + + assert!(res.is_ok(), "{res:#?}"); + let mut json = res.data.into_json().unwrap(); + let nodes = &mut json["molecule"]["activity"]["nodes"]; + nodes[0]["announcement"]["announcement_id"] = any.clone(); + nodes[0]["announcement"]["system_time"] = any.clone(); + nodes[0]["announcement"]["event_time"] = any.clone(); + nodes[1]["announcement"]["announcement_id"] = any.clone(); + nodes[1]["announcement"]["system_time"] = any.clone(); + nodes[1]["announcement"]["event_time"] = any.clone(); + + // NOTE: Only announcements are currently supported + pretty_assertions::assert_eq!( + *nodes, + json!([ + { + "__typename": "MoleculeProjectEventAnnouncement", + "project": { + "ipnftSymbol": "vitaslow", + }, + "announcement": { + "announcement_id": &any, + "attachments": [], + "body": "Blah blah bleh", + "event_time": &any, + "headline": "Test announcement 2", + "molecule_access_level": "holders", + "molecule_change_by": "did:ethr:0x43f3F090af7fF638ad0EfD64c5354B6945fE75BC", + "system_time": &any, + }, + }, + { + "__typename": "MoleculeProjectEventAnnouncement", + "project": { + "ipnftSymbol": "vitafast", + }, + "announcement": { + "announcement_id": &any, + "attachments": [&test_file_1, &test_file_2], + "body": "Blah blah", + "event_time": &any, + "headline": "Test announcement 1", + "molecule_access_level": "holders", + "molecule_change_by": "did:ethr:0x43f3F090af7fF638ad0EfD64c5354B6945fE75BC", + "system_time": &any, + }, + }, + ]) + ); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Harness +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[oop::extend(BaseGQLDatasetHarness, base_gql_harness)] +struct GraphQLDatasetsHarness { + base_gql_harness: BaseGQLDatasetHarness, + catalog_authorized: dill::Catalog, + molecule_account_id: odf::AccountID, +} + +#[bon] +impl GraphQLDatasetsHarness { + #[builder] + pub async fn new( + tenancy_config: TenancyConfig, + mock_dataset_action_authorizer: Option, + ) -> Self { + let base_gql_harness = BaseGQLDatasetHarness::builder() + .tenancy_config(tenancy_config) + .maybe_mock_dataset_action_authorizer(mock_dataset_action_authorizer) + .build(); + + let cache_dir = base_gql_harness.temp_dir().join("cache"); + std::fs::create_dir(&cache_dir).unwrap(); + + let base_catalog = dill::CatalogBuilder::new_chained(base_gql_harness.catalog()) + .add::() + .add_value(kamu::EngineConfigDatafusionEmbeddedBatchQuery::default()) + .add::() + .add::() + .add::() + .add_value(kamu::EngineConfigDatafusionEmbeddedIngest::default()) + .add::() + .add::() + .add::() + .add::() + .add::() + .build(); + + let molecule_account_id = odf::AccountID::new_generated_ed25519().1; + + let (_catalog_anonymous, catalog_authorized) = authentication_catalogs_ext( + &base_catalog, + Some(CurrentAccountSubject::Logged(LoggedAccount { + account_id: molecule_account_id.clone(), + account_name: "molecule".parse().unwrap(), + })), + PredefinedAccountOpts { + is_admin: false, + can_provision_accounts: true, + }, + ) + .await; + + Self { + base_gql_harness, + catalog_authorized, + molecule_account_id, + } + } + + pub async fn create_projects_dataset(&self) -> CreateDatasetResult { + let snapshot = kamu_adapter_graphql::molecule::Molecule::dataset_snapshot_projects( + "molecule/projects".parse().unwrap(), + ); + + let create_dataset = self + .catalog_authorized + .get_one::() + .unwrap(); + + create_dataset + .execute(snapshot, Default::default()) + .await + .unwrap() + } + + pub async fn execute_authorized_query( + &self, + query: impl Into, + ) -> async_graphql::Response { + kamu_adapter_graphql::schema_quiet() + .execute(query.into().data(self.catalog_authorized.clone())) + .await + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/graphql/tests/tests/test_scalars.rs b/src/adapter/graphql/tests/tests/test_scalars.rs index 6f31542440..cb858b505f 100644 --- a/src/adapter/graphql/tests/tests/test_scalars.rs +++ b/src/adapter/graphql/tests/tests/test_scalars.rs @@ -8,13 +8,14 @@ // by the Apache License, Version 2.0. use async_graphql::*; -use kamu_adapter_graphql::scalars::ExtraData; +use kamu_adapter_graphql::scalars::{BigInt, ExtraData, U256}; use kamu_adapter_graphql::traits::ResponseExt; +use pretty_assertions::assert_eq; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #[tokio::test] -async fn extra_data() { +async fn test_extra_data() { fn request(variables_as_value: Value) -> Request { Request::new(indoc::indoc!( r#" @@ -30,17 +31,17 @@ async fn extra_data() { { let res = schema.execute(request(value!({ "extraData": {} }))).await; - pretty_assertions::assert_eq!(value!({ "extraData": {} }), res.data, "{res:?}"); + assert_eq!(value!({ "extraData": {} }), res.data, "{res:?}"); } { let res = schema .execute(request(value!({ "extraData": {"foo": "bar"} }))) .await; - pretty_assertions::assert_eq!(value!({ "extraData": {"foo": "bar"} }), res.data, "{res:?}"); + assert_eq!(value!({ "extraData": {"foo": "bar"} }), res.data, "{res:?}"); } { let res = schema.execute(request(value!({ "extraData": 1 }))).await; - pretty_assertions::assert_eq!( + assert_eq!( [ "Failed to parse \"ExtraData\": Invalid input value: '1'. A flat object is \ expected, such as: '{}', '{\"foo\": \"bar\"}'" @@ -53,7 +54,7 @@ async fn extra_data() { let res = schema .execute(request(value!({ "extraData": "foo" }))) .await; - pretty_assertions::assert_eq!( + assert_eq!( [ "Failed to parse \"ExtraData\": Invalid input value: '\"foo\"'. A flat object is \ expected, such as: '{}', '{\"foo\": \"bar\"}'" @@ -64,7 +65,7 @@ async fn extra_data() { } { let res = schema.execute(request(value!({ "extraData": [] }))).await; - pretty_assertions::assert_eq!( + assert_eq!( [ "Failed to parse \"ExtraData\": Invalid input value: '[]'. A flat object is \ expected, such as: '{}', '{\"foo\": \"bar\"}'" @@ -77,7 +78,7 @@ async fn extra_data() { let res = schema .execute(request(value!({ "extraData": {"foo": {"bar": "baz"}} }))) .await; - pretty_assertions::assert_eq!( + assert_eq!( [ "Failed to parse \"ExtraData\": Invalid input value: '{foo: {bar: \"baz\"}}'. A \ flat object is expected, such as: '{}', '{\"foo\": \"bar\"}'" @@ -90,6 +91,251 @@ async fn extra_data() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +#[tokio::test] +async fn test_big_int() { + fn request(variables_as_value: Value) -> Request { + Request::new(indoc::indoc!( + r#" + query ($bigInt: BigInt!) { + bigInt(value: $bigInt) + } + "# + )) + .variables(Variables::from_value(variables_as_value)) + } + + let schema = schema(); + + { + let res = schema + .execute(request( + value!({ + "bigInt": "108494037067113761580099112583860151730516105403483528465874625006707409835912" + }), + )) + .await; + assert_eq!( + value!({ + "bigInt": "108494037067113761580099112583860151730516105403483528465874625006707409835912" + }), + res.data, + "{res:?}" + ); + } + { + let res = schema + .execute(request(value!({ + "bigInt": "" + }))) + .await; + assert_eq!( + [ + "Failed to parse \"BigInt\": Invalid BigInt: cannot parse integer from empty \ + string" + ], + *res.error_messages(), + "{res:?}" + ); + } + { + let res = schema + .execute(request(value!({ + "bigInt": 9 + }))) + .await; + assert_eq!( + [ + "Failed to parse \"BigInt\": Invalid BigInt: the value is expected to be a string \ + (\"9\") instead of a number (9)" + ], + *res.error_messages(), + "{res:?}" + ); + } + { + let res = schema + .execute(request(value!({ + "bigInt": "" + }))) + .await; + assert_eq!( + [ + "Failed to parse \"BigInt\": Invalid BigInt: cannot parse integer from empty \ + string" + ], + *res.error_messages(), + "{res:?}" + ); + } + { + let res = schema + .execute(request(value!({ + "bigInt": "0xFFFF" + }))) + .await; + assert_eq!( + ["Failed to parse \"BigInt\": Invalid BigInt: invalid digit found in string"], + *res.error_messages(), + "{res:?}" + ); + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[tokio::test] +async fn test_u256() { + fn request(variables_as_value: Value) -> Request { + Request::new(indoc::indoc!( + r#" + query ($u256: U256!) { + u256(value: $u256) + } + "# + )) + .variables(Variables::from_value(variables_as_value)) + } + + let schema = schema(); + + { + let res = schema + .execute(request( + value!({ + "u256": "108494037067113761580099112583860151730516105403483528465874625006707409835912" + }), + )) + .await; + assert_eq!( + value!({ + "u256": "108494037067113761580099112583860151730516105403483528465874625006707409835912" + }), + res.data, + "{res:?}" + ); + } + { + let res = schema + .execute(request(value!({ + "u256": "0" + }))) + .await; + assert_eq!( + value!({ + "u256": "0" + }), + res.data, + "{res:?}" + ); + } + { + let res = schema + .execute(request( + value!({ + // MAX + "u256": "115792089237316195423570985008687907853269984665640564039457584007913129639935" + }), + )) + .await; + assert_eq!( + value!({ + "u256": "115792089237316195423570985008687907853269984665640564039457584007913129639935" + }), + res.data, + "{res:?}" + ); + } + { + let res = schema + .execute(request( + value!({ + // MAX + 1 + "u256": "115792089237316195423570985008687907853269984665640564039457584007913129639936" + }), + )) + .await; + assert_eq!( + [ + "Failed to parse \"U256\": Invalid U256: value exceeds maximum for U256: \ + 115792089237316195423570985008687907853269984665640564039457584007913129639936" + ], + *res.error_messages(), + "{res:?}" + ); + } + { + let res = schema + .execute(request(value!({ + "u256": "-1" + }))) + .await; + assert_eq!( + [ + "Failed to parse \"U256\": Invalid U256: negative values are not allowed for \ + U256: -1" + ], + *res.error_messages(), + "{res:?}" + ); + } + { + let res = schema + .execute(request(value!({ + "u256": -1 + }))) + .await; + assert_eq!( + [ + "Failed to parse \"U256\": Invalid U256: the value is expected to be a string \ + (\"-1\") instead of a number (-1)" + ], + *res.error_messages(), + "{res:?}" + ); + } + { + let res = schema + .execute(request(value!({ + "u256": 42 + }))) + .await; + assert_eq!( + [ + "Failed to parse \"U256\": Invalid U256: the value is expected to be a string \ + (\"42\") instead of a number (42)" + ], + *res.error_messages(), + "{res:?}" + ); + } + { + let res = schema + .execute(request(value!({ + "u256": "" + }))) + .await; + assert_eq!( + ["Failed to parse \"U256\": Invalid U256: cannot parse integer from empty string"], + *res.error_messages(), + "{res:?}" + ); + } + { + let res = schema + .execute(request(value!({ + "u256": "0xFFFF" + }))) + .await; + assert_eq!( + ["Failed to parse \"U256\": Invalid U256: invalid digit found in string"], + *res.error_messages(), + "{res:?}" + ); + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + struct TestScalarQuery; #[Object] @@ -97,6 +343,14 @@ impl TestScalarQuery { async fn extra_data(&self, value: ExtraData) -> ExtraData { value } + + async fn big_int(&self, value: BigInt) -> BigInt { + value + } + + async fn u256(&self, value: U256) -> U256 { + value + } } type TestScalarSchema = Schema; diff --git a/src/domain/core/src/services/dataset_registry.rs b/src/domain/core/src/services/dataset_registry.rs index a0c18f83d2..5603997f8f 100644 --- a/src/domain/core/src/services/dataset_registry.rs +++ b/src/domain/core/src/services/dataset_registry.rs @@ -35,6 +35,8 @@ pub trait DatasetRegistry: odf::dataset::DatasetHandleResolver { dataset_ids: &[Cow], ) -> Result; + // TODO: This likely should be a sync function to signify that handles are + // transformed to resolved datasets with no overhead async fn get_dataset_by_handle(&self, dataset_handle: &odf::DatasetHandle) -> ResolvedDataset; } diff --git a/src/domain/core/src/services/query_service.rs b/src/domain/core/src/services/query_service.rs index 9af12a2cb1..07e7515b68 100644 --- a/src/domain/core/src/services/query_service.rs +++ b/src/domain/core/src/services/query_service.rs @@ -49,7 +49,7 @@ pub trait QueryService: Send + Sync { ) -> Result; /// Prepares an execution plan for the SQL statement and returns a - /// [DataFrame] that can be used to get schema and data, and the state + /// [`DataFrameExt`] that can be used to get schema and data, and the state /// information that can be used for reproducibility. async fn sql_statement( &self, @@ -83,13 +83,24 @@ pub trait QueryService: Send + Sync { // TODO: Introduce additional options that could be used to narrow down the // number of files we collect to construct the dataframe. // - /// Returns a [DataFrame] representing the contents of an entire dataset + /// Returns a [`DataFrameExt`] representing the contents of an entire + /// dataset async fn get_data( &self, dataset_ref: &odf::DatasetRef, options: GetDataOptions, ) -> Result; + // TODO: Consider replacing this function with a more sophisticated session + // context builder that can be reused for multiple queries + /// Returns [`DataFrameExt`]s representing the contents of multiple datasets + /// in a batch + async fn get_data_multi( + &self, + dataset_refs: &[odf::DatasetRef], + skip_if_missing_or_inaccessible: bool, + ) -> Result, QueryError>; + /// Lists engines known to the system and recommended for use async fn get_known_engines(&self) -> Result, InternalError>; } diff --git a/src/domain/datasets/services/src/use_cases/create_dataset_from_snapshot_use_case_impl.rs b/src/domain/datasets/services/src/use_cases/create_dataset_from_snapshot_use_case_impl.rs index 1e566b34aa..e987afbe04 100644 --- a/src/domain/datasets/services/src/use_cases/create_dataset_from_snapshot_use_case_impl.rs +++ b/src/domain/datasets/services/src/use_cases/create_dataset_from_snapshot_use_case_impl.rs @@ -40,6 +40,9 @@ pub struct CreateDatasetFromSnapshotUseCaseImpl { create_helper: Arc, did_secret_encryption_key: Option, did_secret_key_repo: Arc, + + // TODO: Rebac is here temporarily - using Lazy to avoid modifying all tests + rebac_svc: dill::Lazy>, } #[component(pub)] @@ -54,6 +57,7 @@ impl CreateDatasetFromSnapshotUseCaseImpl { create_helper: Arc, did_secret_encryption_config: Arc, did_secret_key_repo: Arc, + rebac_svc: dill::Lazy>, ) -> Self { Self { current_account_subject, @@ -66,6 +70,7 @@ impl CreateDatasetFromSnapshotUseCaseImpl { .as_ref() .map(|encryption_key| SecretString::from(encryption_key.clone())), did_secret_key_repo, + rebac_svc, } } } @@ -103,7 +108,8 @@ impl CreateDatasetFromSnapshotUseCase for CreateDatasetFromSnapshotUseCaseImpl { // Resolve a target account and full alias of the dataset let (canonical_alias, target_account_id) = self .create_helper - .resolve_alias_target(&snapshot.name, subject)?; + .resolve_alias_target(&snapshot.name, subject) + .await?; let dataset_did = self.did_generator.generate_dataset_id(); // Make a seed block @@ -152,14 +158,23 @@ impl CreateDatasetFromSnapshotUseCase for CreateDatasetFromSnapshotUseCaseImpl { ) .await?; - // TODO: Creating dataset under another account is not supported yet. - // In future we should check organization-level permissions here. + // TODO: HACK: SEC: When creating a dataaset under another account we currently + // give subject a "maintainer" role on it. In future this should be refactored + // into organization-level permissions. // // See: https://github.com/kamu-data/kamu-node/issues/233 - assert_eq!( - target_account_id, subject.account_id, - "Creating dataset under another account is not supported yet" - ); + if target_account_id != subject.account_id { + self.rebac_svc + .get() + .int_err()? + .set_account_dataset_relation( + &subject.account_id, + kamu_auth_rebac::AccountToDatasetRelation::Maintainer, + &store_result.dataset_id, + ) + .await + .int_err()?; + } // Notify interested parties the dataset was created self.create_helper diff --git a/src/domain/datasets/services/src/use_cases/create_dataset_use_case_impl.rs b/src/domain/datasets/services/src/use_cases/create_dataset_use_case_impl.rs index 87a92a6e54..42703ab5b1 100644 --- a/src/domain/datasets/services/src/use_cases/create_dataset_use_case_impl.rs +++ b/src/domain/datasets/services/src/use_cases/create_dataset_use_case_impl.rs @@ -10,6 +10,7 @@ use std::sync::Arc; use dill::{component, interface}; +use internal_error::*; use kamu_accounts::CurrentAccountSubject; use kamu_core::ResolvedDataset; use kamu_datasets::{ @@ -28,6 +29,9 @@ use crate::utils::CreateDatasetUseCaseHelper; pub struct CreateDatasetUseCaseImpl { current_account_subject: Arc, create_helper: Arc, + + // TODO: Rebac is here temporarily - using Lazy to avoid modifying all tests + rebac_svc: dill::Lazy>, } #[common_macros::method_names_consts] @@ -57,7 +61,8 @@ impl CreateDatasetUseCase for CreateDatasetUseCaseImpl { // Resolve target account and full alias of the dataset let (canonical_alias, target_account_id) = self .create_helper - .resolve_alias_target(dataset_alias, subject)?; + .resolve_alias_target(dataset_alias, subject) + .await?; // Dataset entry goes first, this guarantees name collision check self.create_helper @@ -83,14 +88,23 @@ impl CreateDatasetUseCase for CreateDatasetUseCaseImpl { ) .await?; - // TODO: Creating dataset under another account is not supported yet. - // In future we should check organization-level permissions here. + // TODO: HACK: SEC: When creating a dataaset under another account we currently + // give subject a "maintainer" role on it. In future this should be refactored + // into organization-level permissions. // // See: https://github.com/kamu-data/kamu-node/issues/233 - assert_eq!( - target_account_id, subject.account_id, - "Creating dataset under another account is not supported yet" - ); + if target_account_id != subject.account_id { + self.rebac_svc + .get() + .int_err()? + .set_account_dataset_relation( + &subject.account_id, + kamu_auth_rebac::AccountToDatasetRelation::Maintainer, + &store_result.dataset_id, + ) + .await + .int_err()?; + } // Notify interested parties the dataset was created self.create_helper diff --git a/src/domain/datasets/services/src/utils/create_dataset_use_case_helper.rs b/src/domain/datasets/services/src/utils/create_dataset_use_case_helper.rs index 5a0eb20685..2c851ca260 100644 --- a/src/domain/datasets/services/src/utils/create_dataset_use_case_helper.rs +++ b/src/domain/datasets/services/src/utils/create_dataset_use_case_helper.rs @@ -35,6 +35,9 @@ pub struct CreateDatasetUseCaseHelper { dataset_entry_writer: Arc, dataset_storage_unit_writer: Arc, outbox: Arc, + + // TODO: This is here temporarily - using Lazy to avoid modifying all tests + account_svc: dill::Lazy>, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -43,7 +46,7 @@ impl CreateDatasetUseCaseHelper { /// Given a raw alias under which user wants to create a dataset and user's /// credentials determine the target account under which dataset will be /// created and whether user has permissions to do so - pub fn resolve_alias_target( + pub async fn resolve_alias_target( &self, raw_alias: &odf::DatasetAlias, subject: &LoggedAccount, @@ -64,13 +67,33 @@ impl CreateDatasetUseCaseHelper { && *account_name != subject.account_name { // Creating dataset for account different that the subject - // TODO: check organization permissions + // + // TODO: HACK: SEC: Validate subject account has permissions to create datasets + // in target account e.g. by checking `member-of` relationship for + // organizations. Currently only allowing cross-account creation for `kamu` and + // `molecule` / `molecule.dev`. // // See: https://github.com/kamu-data/kamu-node/issues/233 - Err(odf::AccessError::Unauthorized( - format!("Cannot create a dataset in account {account_name}").into(), - ) - .into()) + if let Some(account) = self + .account_svc + .get() + .int_err()? + .account_by_name(account_name) + .await? + && (subject.account_name == "kamu" + || subject.account_name == "molecule" + || subject.account_name == "molecule.dev") + && account + .account_name + .starts_with(subject.account_name.as_str()) + { + Ok((CanonicalDatasetAlias::new(raw_alias.clone()), account.id)) + } else { + Err(odf::AccessError::Unauthorized( + format!("Cannot create a dataset in account {account_name}").into(), + ) + .into()) + } } else { Ok(( CanonicalDatasetAlias::new(odf::DatasetAlias::new( diff --git a/src/infra/core/src/services/query_service_impl.rs b/src/infra/core/src/services/query_service_impl.rs index 235696eca6..9bbed489d9 100644 --- a/src/infra/core/src/services/query_service_impl.rs +++ b/src/infra/core/src/services/query_service_impl.rs @@ -676,6 +676,92 @@ impl QueryService for QueryServiceImpl { }) } + #[tracing::instrument(level = "info", name = QueryServiceImpl_get_data_multi, skip_all)] + async fn get_data_multi( + &self, + dataset_refs: &[odf::DatasetRef], + skip_if_missing_or_inaccessible: bool, + ) -> Result, QueryError> { + let refs: Vec<&odf::DatasetRef> = dataset_refs.iter().collect(); + let classified = self + .rebac_dataset_registry_facade + .classify_dataset_refs_by_allowance(&refs, DatasetAction::Read) + .await?; + + if !skip_if_missing_or_inaccessible + && let Some((_, err)) = classified.inaccessible_refs.into_iter().next() + { + use kamu_auth_rebac::RebacDatasetRefUnresolvedError as E; + let err = match err { + E::NotFound(e) => QueryError::DatasetNotFound(e), + E::Access(e) => QueryError::Access(e), + e @ E::Internal(_) => QueryError::Internal(e.int_err()), + }; + return Err(err); + } + + let mut resolved_datasets = Vec::new(); + for (_, hdl) in classified.accessible_resolved_refs { + let resolved = self.dataset_registry.get_dataset_by_handle(&hdl).await; + let head = resolved + .as_metadata_chain() + .resolve_ref(&odf::BlockRef::Head) + .await + .int_err()?; + resolved_datasets.push((resolved, head)); + } + + let input_datasets = resolved_datasets + .iter() + .map(|(ds, head)| { + ( + ds.get_id().clone(), + QueryOptionsDataset { + alias: ds.get_alias().to_string(), + block_hash: Some(head.clone()), + hints: DatasetQueryHints { + handle: Some(ds.get_handle().clone()), + last_records_to_consider: None, + does_not_need_schema: false, + }, + }, + ) + }) + .collect(); + + let ctx = self + .session_context(QueryOptions { + input_datasets: Some(input_datasets), + }) + .await?; + + let mut results = Vec::new(); + + for (ds, head) in resolved_datasets { + let df = ctx + .table(TableReference::bare(ds.get_alias().to_string())) + .await?; + + let res = if df.schema().fields().is_empty() { + GetDataResponse { + df: None, + dataset_handle: ds.take_handle(), + block_hash: head, + } + } else { + GetDataResponse { + df: Some(df.into()), + dataset_handle: ds.take_handle(), + block_hash: head, + } + }; + + results.push(res); + } + + Ok(results) + } + async fn get_known_engines(&self) -> Result, InternalError> { Ok(vec![ EngineDesc { diff --git a/src/infra/datasets/postgres/.sqlx/query-8804a2c11cde8436d4230ec2142454fa46b4ef971dd48f523605b41d9354e0e8.json b/src/infra/datasets/postgres/.sqlx/query-4106a49bfb09634154ebde02f2815f3e8b4720e559b1e71091a4d39d391f8416.json similarity index 92% rename from src/infra/datasets/postgres/.sqlx/query-8804a2c11cde8436d4230ec2142454fa46b4ef971dd48f523605b41d9354e0e8.json rename to src/infra/datasets/postgres/.sqlx/query-4106a49bfb09634154ebde02f2815f3e8b4720e559b1e71091a4d39d391f8416.json index 83780c9f1c..1a0feeacc6 100644 --- a/src/infra/datasets/postgres/.sqlx/query-8804a2c11cde8436d4230ec2142454fa46b4ef971dd48f523605b41d9354e0e8.json +++ b/src/infra/datasets/postgres/.sqlx/query-4106a49bfb09634154ebde02f2815f3e8b4720e559b1e71091a4d39d391f8416.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT dataset_id as \"id: _\",\n owner_id as \"owner_id: _\",\n owner_name,\n dataset_name as name,\n created_at as \"created_at: _\",\n kind as \"kind: _\"\n FROM dataset_entries\n WHERE dataset_id = ANY($1)\n ORDER BY created_at\n ", + "query": "\n SELECT dataset_id as \"id: _\",\n owner_id as \"owner_id: _\",\n owner_name,\n dataset_name as name,\n created_at as \"created_at: _\",\n kind as \"kind: _\"\n FROM dataset_entries\n WHERE dataset_id = ANY($1)\n ORDER BY owner_name, dataset_name\n ", "describe": { "columns": [ { @@ -58,5 +58,5 @@ false ] }, - "hash": "8804a2c11cde8436d4230ec2142454fa46b4ef971dd48f523605b41d9354e0e8" + "hash": "4106a49bfb09634154ebde02f2815f3e8b4720e559b1e71091a4d39d391f8416" } diff --git a/src/infra/datasets/postgres/.sqlx/query-07bc67ff4b2e223da7236c8cc0b2b1b38f45b6d1569d63b4dbd16317e48d1bfb.json b/src/infra/datasets/postgres/.sqlx/query-88c65d5e30d6a07aca8d7f00b505f49ba599d2240e45d3d4279b28477b927d74.json similarity index 89% rename from src/infra/datasets/postgres/.sqlx/query-07bc67ff4b2e223da7236c8cc0b2b1b38f45b6d1569d63b4dbd16317e48d1bfb.json rename to src/infra/datasets/postgres/.sqlx/query-88c65d5e30d6a07aca8d7f00b505f49ba599d2240e45d3d4279b28477b927d74.json index 58f245cf05..166c28b7e4 100644 --- a/src/infra/datasets/postgres/.sqlx/query-07bc67ff4b2e223da7236c8cc0b2b1b38f45b6d1569d63b4dbd16317e48d1bfb.json +++ b/src/infra/datasets/postgres/.sqlx/query-88c65d5e30d6a07aca8d7f00b505f49ba599d2240e45d3d4279b28477b927d74.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT dataset_id as \"id: _\",\n owner_id as \"owner_id: _\",\n owner_name,\n dataset_name as name,\n created_at as \"created_at: _\",\n kind as \"kind: _\"\n FROM dataset_entries\n WHERE owner_id = $1\n LIMIT $2 OFFSET $3\n ", + "query": "\n SELECT dataset_id as \"id: _\",\n owner_id as \"owner_id: _\",\n owner_name,\n dataset_name as name,\n created_at as \"created_at: _\",\n kind as \"kind: _\"\n FROM dataset_entries\n WHERE owner_id = $1\n ORDER BY dataset_name\n LIMIT $2 OFFSET $3\n ", "describe": { "columns": [ { @@ -60,5 +60,5 @@ false ] }, - "hash": "07bc67ff4b2e223da7236c8cc0b2b1b38f45b6d1569d63b4dbd16317e48d1bfb" + "hash": "88c65d5e30d6a07aca8d7f00b505f49ba599d2240e45d3d4279b28477b927d74" } diff --git a/src/infra/datasets/postgres/.sqlx/query-3d11399dd129f45de4c9e1e74fad20445276ada75b4fc000d8d5c86b34dc4950.json b/src/infra/datasets/postgres/.sqlx/query-c47a86c5b15f12834a6ba56734d36ca20d4fc5e0949ecbf500da10332d02fd7c.json similarity index 89% rename from src/infra/datasets/postgres/.sqlx/query-3d11399dd129f45de4c9e1e74fad20445276ada75b4fc000d8d5c86b34dc4950.json rename to src/infra/datasets/postgres/.sqlx/query-c47a86c5b15f12834a6ba56734d36ca20d4fc5e0949ecbf500da10332d02fd7c.json index 5734ee8fb1..b6320e1544 100644 --- a/src/infra/datasets/postgres/.sqlx/query-3d11399dd129f45de4c9e1e74fad20445276ada75b4fc000d8d5c86b34dc4950.json +++ b/src/infra/datasets/postgres/.sqlx/query-c47a86c5b15f12834a6ba56734d36ca20d4fc5e0949ecbf500da10332d02fd7c.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n dataset_id as \"id: _\",\n owner_id as \"owner_id: _\",\n owner_name as owner_name,\n dataset_name as name,\n created_at as \"created_at: _\",\n kind as \"kind: _\"\n FROM dataset_entries\n ORDER BY dataset_name ASC\n LIMIT $1 OFFSET $2\n ", + "query": "\n SELECT\n dataset_id as \"id: _\",\n owner_id as \"owner_id: _\",\n owner_name as owner_name,\n dataset_name as name,\n created_at as \"created_at: _\",\n kind as \"kind: _\"\n FROM dataset_entries\n ORDER BY owner_name, dataset_name\n LIMIT $1 OFFSET $2\n ", "describe": { "columns": [ { @@ -59,5 +59,5 @@ false ] }, - "hash": "3d11399dd129f45de4c9e1e74fad20445276ada75b4fc000d8d5c86b34dc4950" + "hash": "c47a86c5b15f12834a6ba56734d36ca20d4fc5e0949ecbf500da10332d02fd7c" } diff --git a/src/infra/datasets/postgres/src/repos/postgres_dataset_entry_repository.rs b/src/infra/datasets/postgres/src/repos/postgres_dataset_entry_repository.rs index e532c6b294..4a69374f6d 100644 --- a/src/infra/datasets/postgres/src/repos/postgres_dataset_entry_repository.rs +++ b/src/infra/datasets/postgres/src/repos/postgres_dataset_entry_repository.rs @@ -97,7 +97,7 @@ impl DatasetEntryRepository for PostgresDatasetEntryRepository { created_at as "created_at: _", kind as "kind: _" FROM dataset_entries - ORDER BY dataset_name ASC + ORDER BY owner_name, dataset_name LIMIT $1 OFFSET $2 "#, limit, @@ -172,7 +172,7 @@ impl DatasetEntryRepository for PostgresDatasetEntryRepository { kind as "kind: _" FROM dataset_entries WHERE dataset_id = ANY($1) - ORDER BY created_at + ORDER BY owner_name, dataset_name "#, &dataset_ids_search, ) @@ -311,6 +311,7 @@ impl DatasetEntryRepository for PostgresDatasetEntryRepository { kind as "kind: _" FROM dataset_entries WHERE owner_id = $1 + ORDER BY dataset_name LIMIT $2 OFFSET $3 "#, stack_owner_id.as_str(), diff --git a/src/infra/datasets/sqlite/.sqlx/query-d706b5a5a1327be2a631a49699255521826d7fed791c71843650c579254f4990.json b/src/infra/datasets/sqlite/.sqlx/query-1682d31abf224e141e4e3630668b3508807161bca691e1be2b7b2697ef7794f1.json similarity index 87% rename from src/infra/datasets/sqlite/.sqlx/query-d706b5a5a1327be2a631a49699255521826d7fed791c71843650c579254f4990.json rename to src/infra/datasets/sqlite/.sqlx/query-1682d31abf224e141e4e3630668b3508807161bca691e1be2b7b2697ef7794f1.json index 0475dec715..eafd5cf5de 100644 --- a/src/infra/datasets/sqlite/.sqlx/query-d706b5a5a1327be2a631a49699255521826d7fed791c71843650c579254f4990.json +++ b/src/infra/datasets/sqlite/.sqlx/query-1682d31abf224e141e4e3630668b3508807161bca691e1be2b7b2697ef7794f1.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n SELECT\n dataset_id as \"id: _\",\n owner_id as \"owner_id: _\",\n owner_name,\n dataset_name as name,\n created_at as \"created_at: _\",\n kind as \"kind: _\"\n FROM dataset_entries\n ORDER BY dataset_name ASC\n LIMIT $1 OFFSET $2\n ", + "query": "\n SELECT\n dataset_id as \"id: _\",\n owner_id as \"owner_id: _\",\n owner_name,\n dataset_name as name,\n created_at as \"created_at: _\",\n kind as \"kind: _\"\n FROM dataset_entries\n ORDER BY owner_name, dataset_name ASC\n LIMIT $1 OFFSET $2\n ", "describe": { "columns": [ { @@ -46,5 +46,5 @@ false ] }, - "hash": "d706b5a5a1327be2a631a49699255521826d7fed791c71843650c579254f4990" + "hash": "1682d31abf224e141e4e3630668b3508807161bca691e1be2b7b2697ef7794f1" } diff --git a/src/infra/datasets/sqlite/.sqlx/query-a37a2fa04b4ce435edd324cbbad6b86f980762cc3aed9951b2e1232e3738f717.json b/src/infra/datasets/sqlite/.sqlx/query-5418b1d579b46ddead1e8a407bfa76471b3f4b37e131e914fc57de86b8aeaed2.json similarity index 86% rename from src/infra/datasets/sqlite/.sqlx/query-a37a2fa04b4ce435edd324cbbad6b86f980762cc3aed9951b2e1232e3738f717.json rename to src/infra/datasets/sqlite/.sqlx/query-5418b1d579b46ddead1e8a407bfa76471b3f4b37e131e914fc57de86b8aeaed2.json index 227eca994d..69cd4d23dc 100644 --- a/src/infra/datasets/sqlite/.sqlx/query-a37a2fa04b4ce435edd324cbbad6b86f980762cc3aed9951b2e1232e3738f717.json +++ b/src/infra/datasets/sqlite/.sqlx/query-5418b1d579b46ddead1e8a407bfa76471b3f4b37e131e914fc57de86b8aeaed2.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n SELECT\n dataset_id as \"id: _\",\n owner_id as \"owner_id: _\",\n owner_name,\n dataset_name as name,\n created_at as \"created_at: _\",\n kind as \"kind: _\"\n FROM dataset_entries\n WHERE owner_id = $1\n LIMIT $2 OFFSET $3\n ", + "query": "\n SELECT\n dataset_id as \"id: _\",\n owner_id as \"owner_id: _\",\n owner_name,\n dataset_name as name,\n created_at as \"created_at: _\",\n kind as \"kind: _\"\n FROM dataset_entries\n WHERE owner_id = $1\n ORDER BY dataset_name ASC\n LIMIT $2 OFFSET $3\n ", "describe": { "columns": [ { @@ -46,5 +46,5 @@ false ] }, - "hash": "a37a2fa04b4ce435edd324cbbad6b86f980762cc3aed9951b2e1232e3738f717" + "hash": "5418b1d579b46ddead1e8a407bfa76471b3f4b37e131e914fc57de86b8aeaed2" } diff --git a/src/infra/datasets/sqlite/src/repos/sqlite_dateset_entry_repository.rs b/src/infra/datasets/sqlite/src/repos/sqlite_dateset_entry_repository.rs index cc6e9017f9..ee05770c9b 100644 --- a/src/infra/datasets/sqlite/src/repos/sqlite_dateset_entry_repository.rs +++ b/src/infra/datasets/sqlite/src/repos/sqlite_dateset_entry_repository.rs @@ -99,7 +99,7 @@ impl DatasetEntryRepository for SqliteDatasetEntryRepository { created_at as "created_at: _", kind as "kind: _" FROM dataset_entries - ORDER BY dataset_name ASC + ORDER BY owner_name, dataset_name ASC LIMIT $1 OFFSET $2 "#, limit, @@ -169,7 +169,7 @@ impl DatasetEntryRepository for SqliteDatasetEntryRepository { kind FROM dataset_entries WHERE dataset_id IN ({}) - ORDER BY created_at + ORDER BY owner_name, dataset_name "#, sqlite_generate_placeholders_list(dataset_ids.len(), NonZeroUsize::new(1).unwrap()) ); @@ -333,6 +333,7 @@ impl DatasetEntryRepository for SqliteDatasetEntryRepository { kind as "kind: _" FROM dataset_entries WHERE owner_id = $1 + ORDER BY dataset_name ASC LIMIT $2 OFFSET $3 "#, owner_id_as_str, diff --git a/src/odf/data-utils/src/data/dataframe_ext.rs b/src/odf/data-utils/src/data/dataframe_ext.rs index b64a8e0d26..4d6db4dd7b 100644 --- a/src/odf/data-utils/src/data/dataframe_ext.rs +++ b/src/odf/data-utils/src/data/dataframe_ext.rs @@ -255,6 +255,25 @@ impl DataFrameExt { Ok(Some(column.value(0))) } } + + pub fn union_all(dfs: Vec) -> Result> { + let mut iter = dfs.into_iter(); + let Some(df) = iter.next() else { + return Ok(None); + }; + + let (state, plan) = df.into_parts(); + let mut builder = datafusion::logical_expr::LogicalPlanBuilder::new(plan); + + for df in iter { + let (_, plan) = df.into_parts(); + builder = builder.union(plan)?; + } + + let plan = builder.build()?; + + Ok(Some(Self(DataFrame::new(state, plan)))) + } } ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////