Skip to content

Commit c767552

Browse files
RSocket: add unreliable channels support (#17)
1 parent b339fd4 commit c767552

File tree

6 files changed

+153
-1
lines changed

6 files changed

+153
-1
lines changed

gradle/publishing.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ subprojects {
2222
afterEvaluate {
2323
description = project.description
2424
}
25-
url = "https://jauntsdn.com"
25+
url = "https://jauntsdn.github.io"
2626
licenses {
2727
license {
2828
name = "The Apache Software License, Version 2.0"

rsocket-futures/src/main/java/com/jauntsdn/rsocket/RSocket.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616

1717
package com.jauntsdn.rsocket;
1818

19+
import io.netty.buffer.ByteBuf;
1920
import java.util.Optional;
2021
import java.util.concurrent.CompletionStage;
2122
import java.util.concurrent.ScheduledExecutorService;
23+
import java.util.function.Consumer;
2224
import java.util.function.Function;
2325

2426
/**
@@ -34,6 +36,14 @@ public interface RSocket extends MessageStreams, Availability {
3436
*/
3537
CompletionStage<Void> metadataPush(Message message);
3638

39+
/**
40+
* @return unreliable unidirectional or bidirectional communication channel if underlying
41+
* transport supports It (e.g. QUIC-DATAGRAM extension, WebTransport-QUIC)
42+
*/
43+
default Optional<UnreliableChannel> unreliableChannel() {
44+
return Optional.empty();
45+
}
46+
3747
/**
3848
* @return lightweight {@link ScheduledExecutorService} intended for non-fine-grained tasks
3949
* scheduling (e.g. timeouts).
@@ -52,6 +62,26 @@ default double availability() {
5262
return availability(0);
5363
}
5464

65+
interface UnreliableChannel {
66+
67+
Consumer<ByteBuf> unreliableChannel(Consumer<ByteBuf> messages);
68+
69+
Attributes attributes();
70+
71+
interface Attributes {
72+
73+
int messageSizeLimit();
74+
75+
Direction direction();
76+
}
77+
78+
enum Direction {
79+
INBOUND,
80+
OUTBOUND,
81+
BIDIRECTIONAL
82+
}
83+
}
84+
5585
@FunctionalInterface
5686
interface Interceptor extends Function<RSocket, RSocket> {}
5787

rsocket-grpc/src/main/java/com/jauntsdn/rsocket/RSocket.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.jauntsdn.rsocket;
1818

1919
import io.grpc.stub.StreamObserver;
20+
import io.netty.buffer.ByteBuf;
2021
import java.util.Optional;
2122
import java.util.concurrent.ScheduledExecutorService;
2223
import java.util.function.Function;
@@ -34,6 +35,14 @@ public interface RSocket extends MessageStreams, Availability {
3435
*/
3536
void metadataPush(Message message, StreamObserver<Message> responseObserver);
3637

38+
/**
39+
* @return unreliable unidirectional or bidirectional communication channel if underlying
40+
* transport supports It (e.g. QUIC-DATAGRAM extension, WebTransport-QUIC)
41+
*/
42+
default Optional<UnreliableChannel> unreliableChannel() {
43+
return Optional.empty();
44+
}
45+
3746
/**
3847
* @return lightweight {@link ScheduledExecutorService} intended for non-fine-grained tasks
3948
* scheduling (e.g. timeouts).
@@ -52,6 +61,26 @@ default double availability() {
5261
return availability(0);
5362
}
5463

64+
interface UnreliableChannel {
65+
66+
StreamObserver<ByteBuf> unreliableChannel(StreamObserver<ByteBuf> messages);
67+
68+
Attributes attributes();
69+
70+
interface Attributes {
71+
72+
int messageSizeLimit();
73+
74+
Direction direction();
75+
}
76+
77+
enum Direction {
78+
INBOUND,
79+
OUTBOUND,
80+
BIDIRECTIONAL
81+
}
82+
}
83+
5584
@FunctionalInterface
5685
interface Interceptor extends Function<RSocket, RSocket> {}
5786

rsocket-mutiny/src/main/java/com/jauntsdn/rsocket/RSocket.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@
1616

1717
package com.jauntsdn.rsocket;
1818

19+
import io.netty.buffer.ByteBuf;
20+
import io.smallrye.mutiny.Multi;
1921
import io.smallrye.mutiny.Uni;
2022
import java.util.Optional;
23+
import java.util.concurrent.Flow;
2124
import java.util.concurrent.ScheduledExecutorService;
2225
import java.util.function.Function;
2326

@@ -34,6 +37,14 @@ public interface RSocket extends MessageStreams, Availability {
3437
*/
3538
Uni<Void> metadataPush(Message message);
3639

40+
/**
41+
* @return unreliable unidirectional or bidirectional communication channel if underlying
42+
* transport supports It (e.g. QUIC-DATAGRAM extension, WebTransport-QUIC)
43+
*/
44+
default Optional<UnreliableChannel> unreliableChannel() {
45+
return Optional.empty();
46+
}
47+
3748
/**
3849
* @return lightweight {@link ScheduledExecutorService} intended for non-fine-grained tasks
3950
* scheduling (e.g. timeouts).
@@ -52,6 +63,26 @@ default double availability() {
5263
return availability(0);
5364
}
5465

66+
interface UnreliableChannel {
67+
68+
Multi<ByteBuf> unreliableChannel(Flow.Publisher<ByteBuf> messages);
69+
70+
Attributes attributes();
71+
72+
interface Attributes {
73+
74+
int messageSizeLimit();
75+
76+
Direction direction();
77+
}
78+
79+
enum Direction {
80+
INBOUND,
81+
OUTBOUND,
82+
BIDIRECTIONAL
83+
}
84+
}
85+
5586
@FunctionalInterface
5687
interface Interceptor extends Function<RSocket, RSocket> {}
5788

rsocket-reactor/src/main/java/com/jauntsdn/rsocket/RSocket.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616

1717
package com.jauntsdn.rsocket;
1818

19+
import io.netty.buffer.ByteBuf;
1920
import java.util.Optional;
2021
import java.util.concurrent.ScheduledExecutorService;
2122
import java.util.function.Function;
23+
import org.reactivestreams.Publisher;
24+
import reactor.core.publisher.Flux;
2225
import reactor.core.publisher.Mono;
2326
import reactor.core.scheduler.Scheduler;
2427

@@ -35,6 +38,14 @@ public interface RSocket extends MessageStreams, Availability {
3538
*/
3639
Mono<Void> metadataPush(Message message);
3740

41+
/**
42+
* @return unreliable unidirectional or bidirectional communication channel if underlying
43+
* transport supports It (e.g. QUIC-DATAGRAM extension, WebTransport-QUIC)
44+
*/
45+
default Optional<UnreliableChannel> unreliableChannel() {
46+
return Optional.empty();
47+
}
48+
3849
/**
3950
* @return lightweight {@link ScheduledExecutorService} intended for non-fine-grained tasks
4051
* scheduling (e.g. timeouts).
@@ -53,6 +64,26 @@ default double availability() {
5364
return availability(0);
5465
}
5566

67+
interface UnreliableChannel {
68+
69+
Flux<ByteBuf> unreliableChannel(Publisher<ByteBuf> messages);
70+
71+
Attributes attributes();
72+
73+
interface Attributes {
74+
75+
int messageSizeLimit();
76+
77+
Direction direction();
78+
}
79+
80+
enum Direction {
81+
INBOUND,
82+
OUTBOUND,
83+
BIDIRECTIONAL
84+
}
85+
}
86+
5687
@FunctionalInterface
5788
interface Interceptor extends Function<RSocket, RSocket> {}
5889

rsocket-rxjava/src/main/java/com/jauntsdn/rsocket/RSocket.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616

1717
package com.jauntsdn.rsocket;
1818

19+
import io.netty.buffer.ByteBuf;
1920
import io.reactivex.rxjava3.core.Completable;
21+
import io.reactivex.rxjava3.core.Flowable;
2022
import io.reactivex.rxjava3.core.Scheduler;
2123
import java.util.Optional;
2224
import java.util.concurrent.ScheduledExecutorService;
2325
import java.util.function.Function;
26+
import org.reactivestreams.Publisher;
2427

2528
/**
2629
* Models RSocket interactions as described in
@@ -35,6 +38,14 @@ public interface RSocket extends MessageStreams, Availability {
3538
*/
3639
Completable metadataPush(Message message);
3740

41+
/**
42+
* @return unreliable unidirectional or bidirectional communication channel if underlying
43+
* transport supports It (e.g. QUIC-DATAGRAM extension, WebTransport-QUIC)
44+
*/
45+
default Optional<UnreliableChannel> unreliableChannel() {
46+
return Optional.empty();
47+
}
48+
3849
/**
3950
* @return lightweight {@link ScheduledExecutorService} intended for non-fine-grained tasks
4051
* scheduling (e.g. timeouts).
@@ -53,6 +64,26 @@ default double availability() {
5364
return availability(0);
5465
}
5566

67+
interface UnreliableChannel {
68+
69+
Flowable<ByteBuf> unreliableChannel(Publisher<ByteBuf> messages);
70+
71+
Attributes attributes();
72+
73+
interface Attributes {
74+
75+
int messageSizeLimit();
76+
77+
Direction direction();
78+
}
79+
80+
enum Direction {
81+
INBOUND,
82+
OUTBOUND,
83+
BIDIRECTIONAL
84+
}
85+
}
86+
5687
@FunctionalInterface
5788
interface Interceptor extends Function<RSocket, RSocket> {}
5889

0 commit comments

Comments
 (0)