Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 137 additions & 33 deletions src/operation_management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,29 @@ pub enum FileMode {
Write,
}

// NOTE: The ordering of these entries is important, because it is the order in which tables are
// processed in get_changeset_dependencies. If model Foo depends on model Bar, then foo must come
// first in this list. For instance, an edge depends on a source node and a target node, so the
// edges table must come before the nodes table here. This is because in
// get_changeset_dependencies, if we process edges first, we correctly find any nodes that were
// created that the edge must be associated with. If we instead processed nodes first, some nodes
// associated with edges would get counted as new nodes, and if we recrate the edge it might not be
// associated with the correct recreated nodes.
const CHANGE_TABLES: &[&str; 12] = &[
"accessions",
"path_edges",
"paths",
"block_group_edges",
"edges",
"accession_paths",
"accession_edges",
"nodes",
"sequences",
"block_groups",
"collections",
"samples",
];

#[derive(Deserialize, Serialize, Debug)]
pub struct DependencyModels {
pub sequences: Vec<Sequence>,
Expand Down Expand Up @@ -422,7 +445,13 @@ pub fn apply_changeset(

let mut dep_bg_map = HashMap::new();
for bg in dependencies.block_group.iter() {
if !Collection::exists(conn, &bg.collection_name) {
Collection::create(conn, &bg.collection_name);
}
let sample_name = bg.sample_name.as_ref().map(|v| v as &str);
if let Some(sample_name) = sample_name {
Sample::get_or_create(conn, sample_name);
}
let new_bg = BlockGroup::create(conn, &bg.collection_name, sample_name, &bg.name);
dep_bg_map.insert(&bg.id, new_bg.id);
}
Expand All @@ -434,10 +463,23 @@ pub fn apply_changeset(
}

let mut dep_edge_map = HashMap::new();
let new_edges = Edge::bulk_create(
conn,
&dependencies.edges.iter().map(EdgeData::from).collect(),
);
let updated_edges = dependencies
.edges
.iter()
.map(|edge| EdgeData {
source_node_id: *dep_node_map
.get(&edge.source_node_id)
.unwrap_or(&edge.source_node_id),
source_coordinate: edge.source_coordinate,
source_strand: edge.source_strand,
target_node_id: *dep_node_map
.get(&edge.target_node_id)
.unwrap_or(&edge.target_node_id),
target_coordinate: edge.target_coordinate,
target_strand: edge.target_strand,
})
.collect::<Vec<EdgeData>>();
let new_edges = Edge::bulk_create(conn, &updated_edges);
for (index, edge_id) in new_edges.iter().enumerate() {
dep_edge_map.insert(&dependencies.edges[index].id, *edge_id);
}
Expand All @@ -456,14 +498,24 @@ pub fn apply_changeset(
}

let mut dep_accession_edge_map = HashMap::new();
let new_accession_edges = AccessionEdge::bulk_create(
conn,
&dependencies
.accession_edges
.iter()
.map(AccessionEdgeData::from)
.collect(),
);
let updated_accession_edges = dependencies
.accession_edges
.iter()
.map(|edge| AccessionEdgeData {
source_node_id: *dep_node_map
.get(&edge.source_node_id)
.unwrap_or(&edge.source_node_id),
source_coordinate: edge.source_coordinate,
source_strand: edge.source_strand,
target_node_id: *dep_node_map
.get(&edge.target_node_id)
.unwrap_or(&edge.target_node_id),
target_coordinate: edge.target_coordinate,
target_strand: edge.target_strand,
chromosome_index: edge.chromosome_index,
})
.collect::<Vec<AccessionEdgeData>>();
let new_accession_edges = AccessionEdge::bulk_create(conn, &updated_accession_edges);
for (index, edge_id) in new_accession_edges.iter().enumerate() {
dep_accession_edge_map.insert(&dependencies.accession_edges[index].id, *edge_id);
}
Expand Down Expand Up @@ -1059,20 +1111,7 @@ pub fn end_operation<'a>(
}

pub fn attach_session(session: &mut session::Session) {
for table in [
"collections",
"samples",
"sequences",
"block_groups",
"paths",
"nodes",
"edges",
"path_edges",
"block_group_edges",
"accessions",
"accession_edges",
"accession_paths",
] {
for table in CHANGE_TABLES {
session.attach(Some(table)).unwrap();
}
}
Expand Down Expand Up @@ -1203,7 +1242,14 @@ mod tests {
use crate::imports::fasta::import_fasta;
use crate::models::file_types::FileTypes;
use crate::models::operations::{setup_db, Branch, Operation, OperationState};
use crate::models::{edge::Edge, metadata, node::Node, sample::Sample};
use crate::models::{
edge::Edge,
metadata,
node::{Node, PATH_END_NODE_ID, PATH_START_NODE_ID},
path::Path as PathModel,
path_edge::PathEdge,
sample::Sample,
};
use crate::test_helpers::{
create_operation, get_connection, get_operation_connection, setup_block_group,
setup_gen_dir,
Expand Down Expand Up @@ -1520,22 +1566,66 @@ mod tests {
.save(conn);
let random_node_id = Node::create(conn, random_seq.hash.as_str(), None);

let new_edge = Edge::create(
let new_edge1 = Edge::create(
conn,
random_node_id,
PATH_START_NODE_ID,
0,
Strand::Forward,
existing_node_id,
0,
Strand::Forward,
);
let block_group_edge = BlockGroupEdgeData {
let block_group_edge1 = BlockGroupEdgeData {
block_group_id: bg_id,
edge_id: new_edge.id,
edge_id: new_edge1.id,
chromosome_index: 0,
phased: 0,
};
BlockGroupEdge::bulk_create(conn, &[block_group_edge]);

let new_edge2 = Edge::create(
conn,
existing_node_id,
1,
Strand::Forward,
random_node_id,
0,
Strand::Forward,
);
let block_group_edge2 = BlockGroupEdgeData {
block_group_id: bg_id,
edge_id: new_edge2.id,
chromosome_index: 0,
phased: 0,
};

let new_edge3 = Edge::create(
conn,
random_node_id,
1,
Strand::Forward,
PATH_END_NODE_ID,
0,
Strand::Forward,
);
let block_group_edge3 = BlockGroupEdgeData {
block_group_id: bg_id,
edge_id: new_edge3.id,
chromosome_index: 0,
phased: 0,
};

BlockGroupEdge::bulk_create(
conn,
&[block_group_edge1, block_group_edge2, block_group_edge3],
);

let _path = PathModel::create(
conn,
"test_path",
bg_id,
&[new_edge1.id, new_edge2.id, new_edge3.id],
);

let operation = end_operation(
conn,
op_conn,
Expand All @@ -1556,13 +1646,27 @@ mod tests {
get_changeset_path(&operation).join(format!("{op_id}.dep", op_id = operation.hash));
let dependencies: DependencyModels =
serde_json::from_reader(fs::File::open(dependency_path).unwrap()).unwrap();
assert_eq!(dependencies.sequences.len(), 1);
assert_eq!(dependencies.sequences.len(), 2);
assert_eq!(
dependencies.block_group[0].collection_name,
dep_bg.collection_name
);
assert_eq!(dependencies.block_group[0].name, dep_bg.name);
assert_eq!(dependencies.block_group[0].sample_name, dep_bg.sample_name);

let result_path = dependencies.paths.first().unwrap();
assert_eq!(result_path.block_group_id, bg_id);
let result_edges = PathEdge::edges_for_path(conn, result_path.id);
assert_eq!(result_edges.len(), 3);
assert_eq!(result_edges[0].id, new_edge1.id);
assert_eq!(result_edges[0].source_node_id, PATH_START_NODE_ID);
assert_eq!(result_edges[0].target_node_id, existing_node_id);
assert_eq!(result_edges[1].id, new_edge2.id);
assert_eq!(result_edges[1].source_node_id, existing_node_id);
assert_eq!(result_edges[1].target_node_id, random_node_id);
assert_eq!(result_edges[2].id, new_edge3.id);
assert_eq!(result_edges[2].source_node_id, random_node_id);
assert_eq!(result_edges[2].target_node_id, PATH_END_NODE_ID);
}

#[test]
Expand Down