Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 42 additions & 7 deletions java/pom.xml
Original file line number Diff line number Diff line change
@@ -1,42 +1,77 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.couchbase</groupId>
<groupId>com.couchbase.client.demo</groupId>
<artifactId>devguide</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<log4j.version>1.2.17</log4j.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>

<dependencies>
<dependency>
<groupId>com.couchbase.client</groupId>
<artifactId>java-client</artifactId>
<version>2.7.9</version>
<version>3.0.8</version>
</dependency>

<dependency>
<groupId>com.couchbase.client</groupId>
<artifactId>encryption</artifactId>
<version>2.0.1</version>
<artifactId>couchbase-encryption</artifactId>
<version>3.0.0-pre.1</version>
</dependency>

<!-- logging dependencies -->

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<version>${log4j.version}</version>
</dependency>

</dependencies>

<repositories>

<repository>
<id>couchbase</id>
<name>Couchbase Preview Repository</name>
<url>http://files.couchbase.com/maven2</url>
</repository>

<repository>
<id>sonatypeSnapshots</id>
<name>Sonatype Snapshots</name>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
</repository>

</repositories>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
Expand Down
73 changes: 29 additions & 44 deletions java/src/main/java/com/couchbase/devguide/BulkGet.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
/*
* Copyright (c) 2020 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.couchbase.devguide;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.JsonLongDocument;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.error.DocumentDoesNotExistException;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
import com.couchbase.client.java.ReactiveCollection;
import com.couchbase.client.java.json.JsonObject;
import reactor.core.publisher.Flux;

/**
* Example of Bulk Get in Java for the Couchbase Developer Guide.
Expand All @@ -32,47 +43,21 @@ protected void doWork() {

// Insert 10 documents, the old way
for (String id : keys) {
JsonDocument doc = JsonDocument.create(id, content);
bucket.upsert(doc);
collection.upsert(id, content);
}

JsonObject jo = collection.get(key+"_1").contentAsObject();
System.out.println(jo);
// Describe what we want to do asynchronously using RxJava Observables:

Observable<JsonDocument> asyncBulkGet = Observable
// Use RxJava from to start from the keys we know in advance
.from(keys)
//now use flatMap to asynchronously retrieve (get) each corresponding document using the SDK
.flatMap(new Func1<String, Observable<JsonDocument>>() {
public Observable<JsonDocument> call(String key) {
if (key.endsWith("3"))
return bucket.async().get(key).delay(3, TimeUnit.SECONDS); //artificial delay for item 3
return bucket.async().get(key);
}
});

// So far we've described and not triggered the processing, let's subscribe
/*
* Note: since our app is not fully asynchronous, we want to revert back to blocking at the end,
* so we subscribe using toBlocking().
*
* toBlocking will throw any exception that was propagated through the Observer's onError method.
*
* The SDK is doing its own parallelisation so the blocking is just waiting for the last item,
* notice how our artificial delay doesn't impact printout of the other values, that come in the order
* in which the server answered...
*/
try {
asyncBulkGet.toBlocking()
// we'll still printout each inserted document (with CAS gotten from the server)
// toBlocking() also offers several ways of getting one of the emitted values (first(), single(), last())
.forEach(new Action1<JsonDocument>() {
public void call(JsonDocument jsonDocument) {
LOGGER.info("Found " + jsonDocument);
}
});
} catch (Exception e) {
LOGGER.error("Error during bulk get", e);
}
ReactiveCollection reactiveCollection = collection.reactive();
Flux<Object> resultFlux = Flux.range(0, 10)
.map(index -> {return key + "_" + index; } )
.flatMap( k -> reactiveCollection.get(k));

resultFlux.subscribe(System.out::println);

}

public static void main(String[] args) {
Expand Down
82 changes: 26 additions & 56 deletions java/src/main/java/com/couchbase/devguide/BulkInsert.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
package com.couchbase.devguide;

import java.util.concurrent.TimeUnit;
/*
* Copyright (c) 2020 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import javax.sound.midi.Soundbank;
package com.couchbase.devguide;

import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.JsonLongDocument;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.error.DocumentDoesNotExistException;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
import com.couchbase.client.java.ReactiveCollection;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.client.java.kv.MutationResult;
import reactor.core.publisher.Flux;

/**
* Example of Bulk Insert in Java for the Couchbase Developer Guide.
Expand All @@ -26,52 +35,13 @@ protected void doWork() {

// Describe what we want to do asynchronously using RxJava Observables:

Observable<JsonDocument> asyncProcessing = Observable
// Use RxJava range + map to generate 10 keys. One could also use "from" with a pre-existing collection of keys.
.range(0, 10)
.map(new Func1<Integer, String>() {
public String call(Integer i) {
return key + "_" + i;
}
})
//then create a JsonDocument out each one of these keys
.map(new Func1<String, JsonDocument>() {
public JsonDocument call(String s) {
return JsonDocument.create(s, content);
}
})
//now use flatMap to asynchronously call the SDK upsert operation on each
.flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
public Observable<JsonDocument> call(JsonDocument doc) {
if (doc.id().endsWith("3"))
return bucket.async().upsert(doc).delay(3, TimeUnit.SECONDS); //artificial delay for item 3
return bucket.async().upsert(doc);
}
});
ReactiveCollection reactiveCollection = collection.reactive();
Flux<MutationResult> resultFlux = Flux.range(0, 10)
.map(index -> {return key + "_" + index; } )
.flatMap( k -> reactiveCollection.upsert(k, content));

resultFlux.subscribe(System.out::println);

// So far we've described and not triggered the processing, let's subscribe
/*
* Note: since our app is not fully asynchronous, we want to revert back to blocking at the end,
* so we subscribe using toBlocking().
*
* toBlocking will throw any exception that was propagated through the Observer's onError method.
*
* The SDK is doing its own parallelisation so the blocking is just waiting for the last item,
* notice how our artificial delay doesn't impact printout of the other values, that come in the order
* in which the server answered...
*/
try {
asyncProcessing.toBlocking()
// we'll still printout each inserted document (with CAS gotten from the server)
// toBlocking() also offers several ways of getting one of the emitted values (first(), single(), last())
.forEach(new Action1<JsonDocument>() {
public void call(JsonDocument jsonDocument) {
LOGGER.info("Inserted " + jsonDocument);
}
});
} catch (Exception e) {
LOGGER.error("Error during bulk insert", e);
}
}

public static void main(String[] args) {
Expand Down
60 changes: 39 additions & 21 deletions java/src/main/java/com/couchbase/devguide/Cas.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,32 @@
/*
* Copyright (c) 2020 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.couchbase.devguide;

import com.couchbase.client.java.json.JsonArray;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.client.java.kv.GetResult;
import com.couchbase.client.java.kv.ReplaceOptions;

import java.util.concurrent.CountDownLatch;

import com.couchbase.client.java.document.JsonArrayDocument;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonArray;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.error.CASMismatchException;

/**
* Example of Cas (Check and Set) handling in Java for the Couchbase Developer Guide.
* TODO: not tested
*/
public class Cas extends ConnectionBase {

Expand All @@ -18,26 +35,26 @@ public class Cas extends ConnectionBase {

@Override
protected void doWork() {
JsonArrayDocument initialDoc = JsonArrayDocument.create(KEY, JsonArray.empty());
bucket.upsert(initialDoc);
JsonArray initialDoc = JsonArray.create().add("initial");
bucket.defaultCollection().upsert(KEY, initialDoc);

LOGGER.info("Will attempt concurrent document mutations without CAS");
parallel(false);

JsonArray currentList = bucket.get(KEY, JsonArrayDocument.class).content();
JsonArray currentList = bucket.defaultCollection().get(KEY).contentAsArray();
LOGGER.info("Current list has " + currentList.size() + " elements");
if (currentList.size() != PARALLEL) {
LOGGER.info("Concurrent modifications removed some of our items! " + currentList.toString());
}

// Reset the list again
bucket.upsert(initialDoc);
bucket.defaultCollection().upsert(KEY,initialDoc);

//The same as above, but using CAS
LOGGER.info("Will attempt concurrent modifications using CAS");
parallel(true);

currentList = bucket.get(KEY, JsonArrayDocument.class).content();
currentList = bucket.defaultCollection().get(KEY).contentAsArray();
LOGGER.info("Current list has " + currentList.size() + " elements: " + currentList.toString());
if (currentList.size() != PARALLEL) {
LOGGER.error("Expected the whole list of elements - " + currentList.toString());
Expand All @@ -46,30 +63,31 @@ protected void doWork() {

public void iterationWithoutCAS(int idx, CountDownLatch latch) {
//this code plainly ignores the CAS by creating a new document (CAS O)
JsonArray l = bucket.get(KEY, JsonArrayDocument.class).content();
l.add("item_" + idx);
JsonArrayDocument updatedDoc = JsonArrayDocument.create(KEY, l);
bucket.replace(updatedDoc);

JsonArray l = bucket.defaultCollection().get(KEY).contentAsArray();
l.add("value_"+idx);
bucket.defaultCollection().replace(KEY, l);
latch.countDown();
}

public void iterationWithCAS(int idx, CountDownLatch latch) {
String item = "item_" + idx;

while(true) {
JsonArrayDocument current = bucket.get(KEY, JsonArrayDocument.class);
JsonArray l = current.content();
l.add(item);
//GetResult current = bucket.defaultCollection().get(KEY);
//JsonArray l = bucket.defaultCollection().get(KEY).contentAsArray();
//l.add( "value_"+idx);

//we mutated the content of the document, and the SDK injected the CAS value in there as well
// so we can use it directly
try {
bucket.replace(current);
GetResult current = bucket.defaultCollection().get(KEY);
JsonArray l = current.contentAsArray();
l.add( "value_"+idx);
bucket.defaultCollection().replace(KEY,l, ReplaceOptions.replaceOptions().cas(current.cas()));
break; //success! stop the loop
} catch (CASMismatchException e) {
} catch (RuntimeException e) {
//in case a parallel execution already updated the document, continue trying
LOGGER.info("Cas mismatch for item " + item);
LOGGER.info(e+" Cas mismatch for item " + item);
}
}
latch.countDown();
Expand Down
Loading