Skip to content

Commit f1ef77c

Browse files
committed
Restaurant materialized view added / query side / trigger
1 parent 4bd8825 commit f1ef77c

17 files changed

+321
-26
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ serde = { version = "1.0.203", features = ["derive"] }
2222
fmodel-rust = "0.7.0"
2323
serde_json = "1.0.117"
2424
uuid = { version = "1.8.0", features = ["serde", "v4"] }
25+
thiserror = "1.0.61"
2526

2627
[dev-dependencies]
2728
pgrx-tests = "=0.11.4"

src/application/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
pub mod order_restaurant_aggregate;
2+
pub mod restaurant_materialized_view;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
use crate::domain::api::RestaurantEvent;
2+
use crate::domain::restaurant_view::{RestaurantView, RestaurantViewState};
3+
use crate::framework::application::materialized_view::MaterializedView;
4+
use crate::infrastructure::restaurant_view_state_repository::RestaurantViewStateRepository;
5+
6+
/// A convenient type alias for the restaurant materialized view.
7+
pub type RestaurantMeterializedView<'a> = MaterializedView<
8+
Option<RestaurantViewState>,
9+
RestaurantEvent,
10+
RestaurantViewStateRepository,
11+
RestaurantView<'a>,
12+
>;

src/domain/api.rs

+27
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
use crate::framework::domain::api::Identifier;
12
use pgrx::{PostgresEnum, PostgresType};
23
use serde::{Deserialize, Serialize};
4+
use std::fmt;
35
use uuid::Uuid;
46

57
// ########################################################
@@ -10,6 +12,12 @@ use uuid::Uuid;
1012
// Similarly, in Rust, the 'newtype' idiom brings compile-time guarantees that the correct value type is supplied. The 'newtype' is a struct that wraps a single value and provides a new type for that value. A 'newtype' is the same as the underlying type at runtime, so it will not introduce any performance overhead.
1113
#[derive(PostgresType, Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
1214
pub struct RestaurantId(pub Uuid);
15+
impl fmt::Display for RestaurantId {
16+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
17+
// Delegate the formatting to the inner Uuid
18+
write!(f, "{}", self.0)
19+
}
20+
}
1321

1422
#[derive(PostgresType, Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
1523
pub struct RestaurantName(pub String);
@@ -170,6 +178,16 @@ pub enum RestaurantEvent {
170178
OrderPlaced(OrderPlaced),
171179
}
172180

181+
impl Identifier for RestaurantEvent {
182+
fn identifier(&self) -> Uuid {
183+
match self {
184+
RestaurantEvent::Created(e) => e.identifier.0,
185+
RestaurantEvent::MenuChanged(e) => e.identifier.0,
186+
RestaurantEvent::OrderPlaced(e) => e.identifier.0,
187+
}
188+
}
189+
}
190+
173191
/// Fact/Event that a restaurant was created
174192
#[derive(PostgresType, Serialize, Deserialize, Debug, PartialEq, Clone, Eq)]
175193
pub struct RestaurantCreated {
@@ -206,6 +224,15 @@ pub enum OrderEvent {
206224
Prepared(OrderPrepared),
207225
}
208226

227+
impl Identifier for OrderEvent {
228+
fn identifier(&self) -> Uuid {
229+
match self {
230+
OrderEvent::Created(e) => e.identifier.0,
231+
OrderEvent::Prepared(e) => e.identifier.0,
232+
}
233+
}
234+
}
235+
209236
/// Fact/Event that an order was created
210237
#[derive(PostgresType, Serialize, Deserialize, Debug, PartialEq, Clone, Eq)]
211238
pub struct OrderCreated {

src/domain/mod.rs

+10
Original file line numberDiff line numberDiff line change
@@ -200,3 +200,13 @@ pub fn sum_to_event(event: &Sum<RestaurantEvent, OrderEvent>) -> Event {
200200
},
201201
}
202202
}
203+
204+
pub fn event_to_restaurant_event(event: &Event) -> Option<RestaurantEvent> {
205+
match event {
206+
Event::RestaurantCreated(e) => Some(RestaurantEvent::Created(e.to_owned())),
207+
Event::RestaurantMenuChanged(e) => Some(RestaurantEvent::MenuChanged(e.to_owned())),
208+
Event::OrderPlaced(e) => Some(RestaurantEvent::OrderPlaced(e.to_owned())),
209+
Event::OrderCreated(_e) => None,
210+
Event::OrderPrepared(_e) => None,
211+
}
212+
}

src/domain/order_decider.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use pgrx::error;
33

44
use crate::domain::api::{
55
OrderCommand, OrderCreated, OrderEvent, OrderId, OrderLineItem, OrderPrepared, OrderStatus,
6-
Reason, RestaurantId,
6+
RestaurantId,
77
};
88

99
/// The state of the Order is represented by this struct. It belongs to the Domain layer.

src/domain/restaurant_view.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
use fmodel_rust::view::View;
2+
use pgrx::PostgresType;
23
use serde::{Deserialize, Serialize};
34

45
use crate::domain::api::{RestaurantEvent, RestaurantId, RestaurantMenu, RestaurantName};
56

67
/// The state of the Restaurant View is represented by this struct. It belongs to the Domain layer.
7-
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
8+
#[derive(PostgresType, Clone, PartialEq, Debug, Serialize, Deserialize)]
89
pub struct RestaurantViewState {
910
pub identifier: RestaurantId,
1011
pub name: RestaurantName,
1112
pub menu: RestaurantMenu,
1213
}
1314

1415
/// A convenient type alias for the Restaurant view
15-
type RestaurantView<'a> = View<'a, Option<RestaurantViewState>, RestaurantEvent>;
16+
pub type RestaurantView<'a> = View<'a, Option<RestaurantViewState>, RestaurantEvent>;
1617

1718
/// View represents the event handling algorithm. It belongs to the Domain layer.
1819
pub fn restaurant_view<'a>() -> RestaurantView<'a> {

src/framework/application/event_sourced_aggregate.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ where
7171
version = Some(ver);
7272
current_events.push(event);
7373
}
74-
let new_events = self.decider.compute_new_events(&current_events, command);
74+
let new_events = self.compute_new_events(&current_events, command);
7575
self.repository.save(&new_events, &version)
7676
}
7777
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use crate::framework::infrastructure::errors::ErrorMessage;
2+
use crate::framework::infrastructure::view_state_repository::ViewStateRepository;
3+
use fmodel_rust::view::ViewStateComputation;
4+
use std::marker::PhantomData;
5+
6+
/// Materialized View.
7+
///
8+
/// It is using a `View` / [ViewStateComputation] to compute new state based on the current state and the event.
9+
/// It is using a [ViewStateRepository] to fetch the current state and to save the new state.
10+
///
11+
/// Generic parameters:
12+
///
13+
/// - `S` - State
14+
/// - `E` - Event
15+
/// - `Repository` - View State repository
16+
/// - `View` - View
17+
pub struct MaterializedView<S, E, Repository, View>
18+
where
19+
Repository: ViewStateRepository<E, S>,
20+
View: ViewStateComputation<E, S>,
21+
{
22+
repository: Repository,
23+
view: View,
24+
_marker: PhantomData<(S, E)>,
25+
}
26+
27+
/// Implementation of the view state computation for the materialized view.
28+
impl<S, E, Repository, View> ViewStateComputation<E, S> for MaterializedView<S, E, Repository, View>
29+
where
30+
Repository: ViewStateRepository<E, S>,
31+
View: ViewStateComputation<E, S>,
32+
{
33+
/// Computes new state based on the current state and the events.
34+
fn compute_new_state(&self, current_state: Option<S>, events: &[&E]) -> S {
35+
self.view.compute_new_state(current_state, events)
36+
}
37+
}
38+
39+
/// Implementation of the `handle` method for the materialized view.
40+
impl<S, E, Repository, View> MaterializedView<S, E, Repository, View>
41+
where
42+
Repository: ViewStateRepository<E, S>,
43+
View: ViewStateComputation<E, S>,
44+
{
45+
/// Creates a new instance of [MaterializedView].
46+
pub fn new(repository: Repository, view: View) -> Self {
47+
MaterializedView {
48+
repository,
49+
view,
50+
_marker: PhantomData,
51+
}
52+
}
53+
/// Handles the event by fetching the state from the repository, computing new state based on the current state and the event, and saving the new state to the repository.
54+
pub fn handle(&self, event: &E) -> Result<S, ErrorMessage> {
55+
let state = self.repository.fetch_state(event)?;
56+
let new_state = self.compute_new_state(state, &[event]);
57+
self.repository.save(&new_state)
58+
}
59+
}

src/framework/application/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
pub mod event_sourced_aggregate;
2+
pub mod materialized_view;

src/framework/infrastructure/errors.rs

+17
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
use pgrx::prelude::*;
2+
use pgrx::TryFromDatumError;
13
use serde::{Deserialize, Serialize};
24
use std::error::Error;
35
use std::fmt;
6+
use std::num::TryFromIntError;
47

58
/// Error message to be returned to the client
69
#[derive(Serialize, Deserialize)]
@@ -24,3 +27,17 @@ impl fmt::Debug for ErrorMessage {
2427

2528
/// Implement Error for ErrorMessage
2629
impl Error for ErrorMessage {}
30+
31+
#[derive(thiserror::Error, Debug)]
32+
pub enum TriggerError {
33+
#[error("Null Trigger Tuple found")]
34+
NullTriggerTuple,
35+
#[error("PgHeapTuple error: {0}")]
36+
PgHeapTuple(#[from] PgHeapTupleError),
37+
#[error("TryFromDatumError error: {0}")]
38+
TryFromDatum(#[from] TryFromDatumError),
39+
#[error("TryFromInt error: {0}")]
40+
TryFromInt(#[from] TryFromIntError),
41+
#[error("Event Handling Error: {0}")]
42+
EventHandlingError(String),
43+
}

src/framework/infrastructure/event_repository.rs

+8-12
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,14 @@
11
use crate::framework::domain::api::{DeciderType, EventType, Identifier, IsFinal};
22
use crate::framework::infrastructure::errors::ErrorMessage;
3+
use crate::framework::infrastructure::to_payload;
34
use pgrx::{IntoDatum, JsonB, PgBuiltInOids, Spi, Uuid};
45
use serde::de::DeserializeOwned;
56
use serde::Serialize;
67
use std::fmt::Debug;
78
use uuid::Uuid as UUID;
89

9-
/// Converts a `JsonB` to an event payload type.
10-
fn to_event<E: DeserializeOwned>(jsonb: JsonB) -> Result<E, ErrorMessage> {
11-
let value = jsonb.0.clone();
12-
serde_json::from_value(value).map_err(|err| ErrorMessage {
13-
message: "Failed to deserialize event: ".to_string() + &err.to_string(),
14-
})
15-
}
16-
/// A trait for event repositories.
10+
/// A trait for event repositories / the command side of the CQRS pattern.
11+
/// Default implementation includes fetching and saving events.
1712
pub trait EventRepository<C, E>
1813
where
1914
C: Identifier,
@@ -55,7 +50,7 @@ where
5550
.to_string(),
5651
})?;
5752

58-
results.push((to_event(data)?, UUID::from_bytes(*event_id.as_bytes())));
53+
results.push((to_payload(data)?, UUID::from_bytes(*event_id.as_bytes())));
5954
}
6055
Ok(results)
6156
})
@@ -139,7 +134,7 @@ where
139134
.to_string(),
140135
})?;
141136

142-
results.push((to_event(data)?, UUID::from_bytes(*event_id.as_bytes())));
137+
results.push((to_payload(data)?, UUID::from_bytes(*event_id.as_bytes())));
143138
}
144139
version = Some(event_id);
145140
}
@@ -149,6 +144,7 @@ where
149144
}
150145

151146
/// A trait for event orchestrating repositories.
147+
/// Default implementation includes fetching events, fetching latest version and saving events.
152148
pub trait EventOrchestratingRepository<C, E>
153149
where
154150
C: Identifier,
@@ -196,7 +192,7 @@ where
196192
"Failed to fetch event id (map `data` to `JsonB`): No event id found"
197193
.to_string(),
198194
})?;
199-
results.push((to_event(data)?, UUID::from_bytes(*event_id.as_bytes())));
195+
results.push((to_payload(data)?, UUID::from_bytes(*event_id.as_bytes())));
200196
}
201197
Ok(results)
202198
})
@@ -313,7 +309,7 @@ where
313309
"Failed to save event id (map `data` to `JsonB`): No event id found"
314310
.to_string(),
315311
})?;
316-
results.push((to_event(data)?, UUID::from_bytes(*event_id.as_bytes())));
312+
results.push((to_payload(data)?, UUID::from_bytes(*event_id.as_bytes())));
317313
}
318314
}
319315
Ok(results)

src/framework/infrastructure/mod.rs

+13
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,15 @@
1+
use crate::framework::infrastructure::errors::ErrorMessage;
2+
use pgrx::JsonB;
3+
use serde::de::DeserializeOwned;
4+
15
pub mod errors;
26
pub mod event_repository;
7+
pub mod view_state_repository;
8+
9+
/// Converts a `JsonB` to the payload type.
10+
pub fn to_payload<E: DeserializeOwned>(jsonb: JsonB) -> Result<E, ErrorMessage> {
11+
let value = jsonb.0.clone();
12+
serde_json::from_value(value).map_err(|err| ErrorMessage {
13+
message: "Failed to deserialize payload: ".to_string() + &err.to_string(),
14+
})
15+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
use crate::framework::infrastructure::errors::ErrorMessage;
2+
3+
/// A trait for a view state repository / the query side of the CQRS pattern.
4+
pub trait ViewStateRepository<E, S> {
5+
/// Fetches current state, based on the event.
6+
fn fetch_state(&self, event: &E) -> Result<Option<S>, ErrorMessage>;
7+
/// Saves the new state.
8+
fn save(&self, state: &S) -> Result<S, ErrorMessage>;
9+
}

src/infrastructure/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
pub mod order_restaurant_event_repository;
2+
pub mod restaurant_view_state_repository;

0 commit comments

Comments
 (0)