Skip to content

Added mango selector to changes feed #63

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 66 additions & 41 deletions src/main/java/org/lightcouch/Changes.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.lightcouch;

import java.io.BufferedReader;
Expand All @@ -26,15 +25,18 @@
import org.lightcouch.ChangesResult.Row;

import com.google.gson.Gson;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;

/**
* <p>Contains the Change Notifications API, supports <i>normal</i> and <i>continuous</i> feed Changes.
* <p>
* Contains the Change Notifications API, supports <i>normal</i> and <i>continuous</i> feed Changes.
* <h3>Usage Example:</h3>
* <pre>
* // feed type normal
* // feed type normal
* String since = dbClient.context().info().getUpdateSeq(); // latest update seq
* ChangesResult changeResult = dbClient.changes()
* .since(since)
* .since(since)
* .limit(10)
* .filter("example/filter")
* .getChanges();
Expand All @@ -46,32 +48,35 @@
*
* // feed type continuous
* Changes changes = dbClient.changes()
* .includeDocs(true)
* .includeDocs(true)
* .heartBeat(30000)
* .continuousChanges();
*
* while (changes.hasNext()) {
* .continuousChanges();
*
* while (changes.hasNext()) {
* ChangesResult.Row feed = changes.next();
* String docId = feed.getId();
* JsonObject doc = feed.getDoc();
* // changes.stop(); // stop continuous feed
* }
* </pre>
*
* @see ChangesResult
* @since 0.0.2
* @author Ahmed Yehia
*/
public class Changes {

private BufferedReader reader;
private HttpGet httpGet;
private HttpRequestBase httpRequest;
private Row nextRow;
private boolean stop;

private CouchDbClientBase dbc;
private Gson gson;
private URIBuilder uriBuilder;


private String selector;

Changes(CouchDbClientBase dbc) {
this.dbc = dbc;
this.gson = dbc.getGson();
Expand All @@ -80,22 +85,39 @@ public class Changes {

/**
* Requests Change notifications of feed type continuous.
* <p>Feed notifications are accessed in an <i>iterator</i> style.
* <p>
* Feed notifications are accessed in an <i>iterator</i> style.
*/
public Changes continuousChanges() {
final URI uri = uriBuilder.query("feed", "continuous").build();
httpGet = new HttpGet(uri);
final InputStream in = dbc.get(httpGet);
final InputStreamReader is = new InputStreamReader(in, Charsets.UTF_8);
setReader(new BufferedReader(is));
if (selector == null) {
final HttpGet get = new HttpGet(uri);
httpRequest = get;
final InputStream in = dbc.get(get);
final InputStreamReader is = new InputStreamReader(in, Charsets.UTF_8);
setReader(new BufferedReader(is));
} else {
final HttpPost post = new HttpPost(uri);
httpRequest = post;
final InputStream in = dbc.post(post, selector);
final InputStreamReader is = new InputStreamReader(in, Charsets.UTF_8);
setReader(new BufferedReader(is));
}

return this;
}

// Query Params
public Changes selector(String json) {
uriBuilder.query("filter", "_selector");
this.selector = json;
return this;
}

/**
* Checks whether a feed is available in the continuous stream, blocking
* until a feed is received.
* Checks whether a feed is available in the continuous stream, blocking until a feed is received.
*/
public boolean hasNext() {
public boolean hasNext() {
return readNextRow();
}

Expand All @@ -118,26 +140,29 @@ public void stop() {
*/
public ChangesResult getChanges() {
final URI uri = uriBuilder.query("feed", "normal").build();
return dbc.get(uri, ChangesResult.class);
if (selector == null) {
return dbc.get(uri, ChangesResult.class);
} else {
return dbc.post(uri, selector, ChangesResult.class);
}
}

// Query Params

public Changes since(String since) {
uriBuilder.query("since", since);
return this;
}

public Changes limit(int limit) {
uriBuilder.query("limit", limit);
return this;
}

public Changes heartBeat(long heartBeat) {
uriBuilder.query("heartbeat", heartBeat);
return this;
}

public Changes timeout(long timeout) {
uriBuilder.query("timeout", timeout);
return this;
Expand All @@ -147,42 +172,42 @@ public Changes filter(String filter) {
uriBuilder.query("filter", filter);
return this;
}

public Changes includeDocs(boolean includeDocs) {
uriBuilder.query("include_docs", includeDocs);
return this;
}

public Changes style(String style) {
uriBuilder.query("style", style);
return this;
}

// Helper

// Helper
/**
* Reads and sets the next feed in the stream.
*/
private boolean readNextRow() {
boolean hasNext = false;
try {
if(!stop) {
String row = "";
if (!stop) {
String row = "";
do {
row = getReader().readLine();
} while(row.length() == 0);
if(!row.startsWith("{\"last_seq\":")) {
row = getReader().readLine();
} while (row.length() == 0);

if (!row.startsWith("{\"last_seq\":")) {
setNextRow(gson.fromJson(row, Row.class));
hasNext = true;
}
}
}
}
} catch (Exception e) {
terminate();
throw new CouchDbException("Error reading continuous stream.", e);
}
if(!hasNext)
}
if (!hasNext) {
terminate();
}
return hasNext;
}

Expand All @@ -201,9 +226,9 @@ private Row getNextRow() {
private void setNextRow(Row nextRow) {
this.nextRow = nextRow;
}

private void terminate() {
httpGet.abort();
httpRequest.abort();
CouchDbUtil.close(getReader());
}
}
Loading