-
Notifications
You must be signed in to change notification settings - Fork 3.7k
PIP 13: Subscribe to topics represented by regular expressions
The consumer needs to handle subscription to topics represented by regular expressions. The scope is namespace in first stage, all topics/patten should be targeted in same namespace, This will make easy authentication and authorization control.
At last, we should add and implementation a serials of new methods in PulsarClient.java
Consumer subscribe(Collection<String> topics, String subscription);
Consumer subscribe(Pattern topicsPattern, String subscription);The goals the should be achieved are these below, we could achieve it one by one:
- support subscription to multiple topics in the same namespace (no guarantee on ordering between topics)
- support regex based subscription
- auto-discover topic addition/deletion
This will need a new implementation of ConsumerBase which wrapper over multiple single-topic-consumers, let’s name it as TopicsConsumerImpl.
When user call new method
Consumer subscribe(Collection<String> topics, String subscription);
It will iteratively new a ConsumerImpl for each topic, and return a TopicsConsumerImpl. The main work is:
- This
TopicsConsumerImplclass should provide implementation of abstract methods inConsumerBase, Should also provide some specific methods such as:
// maintain a map for all the <Topic, Consumer>, after we subscribe all the topics.
private final ConcurrentMap<String, ConsumerImpl> consumers = new ConcurrentHashMap<>();
// get topics
Set<String> getTopics();
// get consumers
List<ConsumerImpl> getConsumers();
// subscribe a topic
void subscribeTopic(String topic);
// unSubscribe a topic
void unSubscribeTopic(String topic);- While Message receive/ack, the message identify is needed. In the implementation, we need handle Message identify(MessageId) differently for some of the abstract methods in
ConsumerBase, because we have to addMessageIdwith additionalString topicorconsumer id, Or we may need to changeMessageIdDatainPulsarApi.proto.
As mentioned before, the scope is namespace. The main work is:
- In above
TopicsConsumerImplclass, need to keep thePattern, which was passed in from api for subscription. - leverage currently pulsar admin API of
getListto get a list of Topics. Ininterface PersistentTopics:
List<String> getList(String namespace) throws PulsarAdminException;
List<String> getPartitionedTopicList(String namespace) throws PulsarAdminException;- The process of new method
Consumer subscribe(String namespace, Pattern topicsPattern, String subscription)should be like this:
- call method
List<String> getList(String namespace)to get all the topics; - Use
topicsPatternto filter out the matched sub-topics-list. - construct the
TopicsConsumerImplwith the the sub-topics-list.
The main work is:
- provide a listener, which based on topics changes, to do subscribe and unsubscribe on individual topic when target topic been changed(remove/add).
Interface TopicsChangeListener {
// unsubscribe and delete ConsumerImpl in the `consumers` map in `TopicsConsumerImpl` based on added topics.
void onTopicsRemoved(Collection<String> topics);
// subscribe and create a list of new ConsumerImpl, added them to the `consumers` map in `TopicsConsumerImpl`.
void onTopicsAdded(Collection<String> topics);
}Add a method void registerListener(TopicsChangeListener listener) to TopicsConsumerImpl
- Based on above work, using a timer, periodically call
List<String> getList(String namespace). And comparing the filtered fresh sub-topics-list with current topics holden inTopicsConsumerImpl, try to get 2 lists:newAddedTopicsListandremovedTopicsList. - If the 2 lists not empty, call
TopicsChangeListener.onTopicsAdded(newAddedTopicsList), andTopicsChangeListener.onTopicsRemoved(removedTopicsList)to do subscribe and unsubscribe, and updateconsumersmap inTopicsConsumerImpl.
The changes will be mostly on the surface and on client side:
- add and implementation a serials of new methods in
org.apache.pulsar.client.api.PulsarClient.java
Consumer subscribe(Collection<String> topics, String subscription);
Consumer subscribe(Pattern topicsPattern, String subscription);- add and implenentation of new
Consumer, which isTopicsConsumerImpl, returned by abovesubscribemethod