Skip to content

Commit 62c74e2

Browse files
committed
Initial support for Outbox pattern
- Provide repostiroy that represents the database that stores the business entities and message outbox. - Provide event abstraction and event sender service. - Default implementation for the event sender service that converts the events into entities and persist them into the outbox db. Related to spring-projects#8698
1 parent ce4ce74 commit 62c74e2

File tree

6 files changed

+324
-1
lines changed

6 files changed

+324
-1
lines changed

Diff for: build.gradle

+3-1
Original file line numberDiff line numberDiff line change
@@ -788,9 +788,11 @@ project('spring-integration-jpa') {
788788
api 'org.springframework:spring-orm'
789789
optionalApi "jakarta.persistence:jakarta.persistence-api:$jpaApiVersion"
790790

791-
testImplementation('org.springframework.data:spring-data-jpa') {
791+
api('org.springframework.data:spring-data-jpa') {
792792
exclude group: 'org.springframework'
793793
}
794+
api 'com.fasterxml.jackson.core:jackson-databind'
795+
794796
testImplementation "com.h2database:h2:$h2Version"
795797
testImplementation "org.hibernate.orm:hibernate-core:$hibernateVersion"
796798
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jpa.outbox;
18+
19+
import com.fasterxml.jackson.core.JsonProcessingException;
20+
import com.fasterxml.jackson.databind.ObjectMapper;
21+
22+
/**
23+
* {@link OutboxEventSender} interface implementation that converts the {@link ExportEvent} into {@link OutboxEvent}
24+
* entity and persists it in the outbox table via the {@link OutboxEventRepository} repository.
25+
*
26+
* @author Christian Tzolov
27+
*
28+
* @since 6.2
29+
*/
30+
public class DefaultOutboxEventSender implements OutboxEventSender {
31+
32+
private final OutboxEventRepository outboxEventRepository;
33+
34+
/**
35+
* Whether the outbox entry is removed after having been inserted. The removal of the entry does not impact the
36+
* Debezium connector from being able to emit CDC events. This is used as a way to keep the table’s underlying
37+
* storage from growing over time.
38+
*/
39+
private final boolean removeEventAfterInsert;
40+
41+
public DefaultOutboxEventSender(OutboxEventRepository outboxEventRepository, boolean removeEventAfterInsert) {
42+
this.outboxEventRepository = outboxEventRepository;
43+
this.removeEventAfterInsert = removeEventAfterInsert;
44+
}
45+
46+
@Override
47+
public void fire(ExportedEvent<?, ?> event) {
48+
49+
// Create an OutboxEvent object based on the ExportedEvent interface
50+
final OutboxEvent outboxEvent = new OutboxEvent(
51+
event.getAggregateType(),
52+
"" + event.getAggregateId(),
53+
event.getType(),
54+
payloadAsString(event.getPayload()),
55+
event.getTimestamp().toEpochMilli());
56+
57+
// We want the events table to remain empty; however this triggers both an INSERT and DELETE
58+
// in the database transaction log which is sufficient for Debezium to process the event.
59+
OutboxEvent savedOutboxEvent = this.outboxEventRepository.save(outboxEvent);
60+
if (this.removeEventAfterInsert) {
61+
this.outboxEventRepository.delete(savedOutboxEvent);
62+
}
63+
}
64+
65+
private static String payloadAsString(Object jsonNode) {
66+
try {
67+
return new ObjectMapper().writeValueAsString(jsonNode);
68+
}
69+
catch (JsonProcessingException e) {
70+
throw new IllegalArgumentException(e);
71+
}
72+
}
73+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jpa.outbox;
18+
19+
import java.time.Instant;
20+
21+
/**
22+
* Describes an event that should be exported via the "outbox" table.
23+
*
24+
* The {@link ExportedEvent} interface is parameterized to allow the application to designate the Java types used by
25+
* several attributes emitted by the event.
26+
*
27+
* It’s important that for a given Spring application, all implementations of the {@link ExportedEvent} interface use
28+
* the same parameter types or a build failure will be thrown since all events will use the same underlying database
29+
* table.
30+
*
31+
* @param <I> Specifies the aggregateId Java type.
32+
* @param <P> Specifies the event's payload type.
33+
*
34+
* @author Christian Tzolov
35+
*
36+
* @since 6.2
37+
*/
38+
public interface ExportedEvent<I, P> {
39+
40+
/**
41+
* The id of the aggregate affected by a given event. For example, the order id in case of events relating to an
42+
* order, or order lines of that order. This is used to ensure ordering of events within an aggregate type.
43+
*/
44+
I getAggregateId();
45+
46+
/**
47+
* The type of the aggregate affected by the event. For example, "order" in case of events relating to an order, or
48+
* order lines of that order. This is used as the topic name.
49+
*/
50+
String getAggregateType();
51+
52+
/**
53+
* The type of an event. For example, "Order Created" or "Order Line Cancelled" for events that belong to an given
54+
* aggregate type such as "order".
55+
*/
56+
String getType();
57+
58+
/**
59+
* The timestamp at which the event occurred.
60+
*/
61+
Instant getTimestamp();
62+
63+
/**
64+
* The event payload.
65+
*/
66+
P getPayload();
67+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jpa.outbox;
18+
19+
import java.util.UUID;
20+
21+
// import javax.validation.constraints.NotNull;
22+
23+
import jakarta.persistence.Entity;
24+
import jakarta.persistence.GeneratedValue;
25+
import jakarta.persistence.Id;
26+
27+
/**
28+
* The outbox event entity.
29+
*
30+
* The contents of any {@link ExportedEvent} is converted into a {@link OutboxEvent} entity definition and persisted to
31+
* the database in order for Debezium to capture the event.
32+
*
33+
* @author Christian Tzolov
34+
*
35+
* @since 6.2
36+
*/
37+
@Entity
38+
public class OutboxEvent {
39+
@Id
40+
@GeneratedValue
41+
private UUID id;
42+
43+
// @NotNull
44+
private String aggregateType;
45+
46+
// @NotNull
47+
private String aggregateId;
48+
49+
// @NotNull
50+
private String type;
51+
52+
// @NotNull
53+
private Long timestamp;
54+
55+
// @NotNull
56+
private String payload;
57+
58+
public OutboxEvent() {
59+
}
60+
61+
public OutboxEvent(String aggregateType, String aggregateId, String type, String payload, Long timestamp) {
62+
this.aggregateType = aggregateType;
63+
this.aggregateId = aggregateId;
64+
this.type = type;
65+
this.payload = payload;
66+
this.timestamp = timestamp;
67+
}
68+
69+
public UUID getId() {
70+
return this.id;
71+
}
72+
73+
public void setId(UUID id) {
74+
this.id = id;
75+
}
76+
77+
public String getAggregateType() {
78+
return this.aggregateType;
79+
}
80+
81+
public void setAggregateType(String aggregateType) {
82+
this.aggregateType = aggregateType;
83+
}
84+
85+
public String getAggregateId() {
86+
return this.aggregateId;
87+
}
88+
89+
public void setAggregateId(String aggregateId) {
90+
this.aggregateId = aggregateId;
91+
}
92+
93+
public String getType() {
94+
return this.type;
95+
}
96+
97+
public void setType(String type) {
98+
this.type = type;
99+
}
100+
101+
public Long getTimestamp() {
102+
return this.timestamp;
103+
}
104+
105+
public void setTimestamp(Long timestamp) {
106+
this.timestamp = timestamp;
107+
}
108+
109+
public String getPayload() {
110+
return this.payload;
111+
}
112+
113+
public void setPayload(String payload) {
114+
this.payload = payload;
115+
}
116+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jpa.outbox;
18+
19+
import java.util.UUID;
20+
21+
import org.springframework.data.jpa.repository.JpaRepository;
22+
23+
/**
24+
* Defines the Outbox table for the {@link OutboxEvent}s.
25+
*
26+
* @author Christian Tzolov
27+
*
28+
* @since 6.2
29+
*/
30+
public interface OutboxEventRepository extends JpaRepository<OutboxEvent, UUID> {
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jpa.outbox;
18+
19+
/**
20+
* Implementations of this interface are responsible to persist the {@link ExportEvent} in the Outbox table.
21+
*
22+
* @author Christian Tzolov
23+
*
24+
* @since 6.2
25+
*/
26+
public interface OutboxEventSender {
27+
28+
/**
29+
* Persist the event in the outbox table as {@link OutboxEvent} entity.
30+
*
31+
* @param event to be persisted in the outbox table.
32+
*/
33+
void fire(ExportedEvent<?, ?> event);
34+
}

0 commit comments

Comments
 (0)