Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,37 @@ public static OtpOutputStream encodeTsQueryRequest(String queryText, byte[] cove
}

public static OtpOutputStream encodeTsPutRequest(String tableName, Collection<Row> rows)
{
return encodeTsPutRequest(tableName, Collections.<String>emptyList(), rows);
}

public static OtpOutputStream encodeTsPutRequest(String tableName, Collection<String> columns, Collection<Row> rows)
{
final OtpOutputStream os = new OtpOutputStream();
os.write(OtpExternal.versionTag); // NB: this is the reqired 0x83 (131) value

// TsPutReq is a 4-tuple: {'tsputreq', tableName, [], [rows]}
// columns is empte
// TsPutReq is a 4-tuple: {'tsputreq', tableName, [columns], [rows]}
os.write_tuple_head(4);

// tsputreq Atom
os.write_atom(TS_PUT_REQ);

// Table Name Binary
os.write_binary(tableName.getBytes(StandardCharsets.UTF_8));
// columns is an empty list

// Columns List
if(columns != null && !columns.isEmpty())
{
os.write_list_head(columns.size());

for (String column : columns)
{
os.write_binary(column.getBytes(StandardCharsets.UTF_8));
}
}
os.write_nil();

// write a list of rows
// Rows List
// each row is a tuple of cells
os.write_list_head(rows.size());
for (Row row : rows)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package com.basho.riak.client.core.operations.ts;

import com.basho.riak.client.core.operations.TTBFutureOperation;
import com.basho.riak.client.core.query.timeseries.CollectionConverters;
import com.basho.riak.client.core.query.timeseries.ColumnDescription;
import com.basho.riak.client.core.query.timeseries.ConvertibleIterable;
import com.basho.riak.client.core.query.timeseries.Row;
import com.basho.riak.protobuf.RiakTsPB;
import com.google.protobuf.ByteString;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

Expand Down Expand Up @@ -53,6 +58,7 @@ public static class Builder
{
private final String tableName;
private Collection<Row> rows;
private Collection<String> columns;

public Builder(String tableName)
{
Expand All @@ -64,9 +70,38 @@ public Builder(String tableName)
this.tableName = tableName;
}

public Builder withColumns(Collection<ColumnDescription> columns)
/**
* Add the names & order of the columns to be inserted.
* Order is implied by the order of the names in the Collection.
* <b>NOTE:</b>: As of Riak TS 1.4, this functionality is not implemented server-side,
* and any stored data is expected to be in the order of the table.
* @param columnNames The names of the columns to insert, and an implicit order.
* @return a reference to this object
*/
public Builder withColumns(Collection<String> columnNames)
{
throw new UnsupportedOperationException();
columns = columnNames;
return this;
}

/**
* Add the names & order of the columns to be inserted.
* Order is implied by the order of the ColumnDescriptions in the Collection.
* <b>NOTE:</b>: As of Riak TS 1.4, this functionality is not implemented server-side,
* and any stored data is expected to be in the order of the table.
* @param columns The ColumnDescriptions that contain a column name and an implicit order.
* @return a reference to this object
*/
public Builder withColumnDescriptions(Collection<? extends ColumnDescription> columns)
{
columns = new ArrayList<>(columns.size());

for (ColumnDescription column : columns)
{
this.columns.add(column.getName());
}

return this;
}

public Builder withRows(Collection<Row> rows)
Expand All @@ -85,6 +120,11 @@ public Collection<Row> getRows()
return rows;
}

public Collection<String> getColumns()
{
return columns;
}

public StoreOperation build()
{
return new StoreOperation(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ static class StoreEncoder extends BuilderTTBEncoder<StoreOperation.Builder>
@Override
OtpOutputStream buildMessage()
{
return TermToBinaryCodec.encodeTsPutRequest(builder.getTableName(), builder.getRows());
return TermToBinaryCodec.encodeTsPutRequest(builder.getTableName(), builder.getColumns(), builder.getRows());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* @author Sergey Galkin <srggal at gmail dot com>
* @since 2.0.3
*/
final class CollectionConverters
public final class CollectionConverters
{
private CollectionConverters() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void encodesPutRequestCorrectly_2()
// NB: this is what Erlang generates, an old-style float
// 99,51,46,52,50,57,57,57,57,57,57,57,57,57,57,57,57,57,55,49,53,55,56,101,43,48,49,0,0,0,0,0, // float_ext len 31
// NB: this is what JInterface generates, a new-style float
70, 64, 65, 38, 102, 102, 102, 102, 102,
70,64,65,38,102,102,102,102,102,
106, // null cell empty list
106}; // list arity 1 end

Expand All @@ -112,6 +112,60 @@ public void encodesPutRequestCorrectly_2()
}
}

@Test
public void encodesPutWithColumnsCorrectly()
{
/*
{tsputreq,<<"test_table">>,
[<<"a">>,<<"b">>,<<"c">>,<<"d">>,<<"e">>,<<"f">>,<<"g">>],
[{<<"series">>,<<"family">>,12345678,1,true,34.3,[]}]}
*/
final byte[] exp = {
(byte)131,104,4,
100,0,8,116,115,112,117,116,114,101,113,
109,0,0,0,10,116,101,115,116,95,116,97,98,108,101,
108,0,0,0,7,
109,0,0,0,1,97,
109,0,0,0,1,98,
109,0,0,0,1,99,
109,0,0,0,1,100,
109,0,0,0,1,101,
109,0,0,0,1,102,
109,0,0,0,1,103,
106,
108,0,0,0,1,
104,7,
109,0,0,0,6,115,101,114,105,101,115,
109,0,0,0,6,102,97,109,105,108,121,
98,0,(byte)188,97,78,
97,1,
100,0,4,116,114,117,101,
// NB: this is what Erlang generates, an old-style float
// 99,51,46,52,50,57,57,57,57,57,57,57,57,57,57,57,57,57,55,49,53,55,56,101,43,48,49,0,0,0,0,0, // float_ext len 31
// NB: this is what JInterface generates, a new-style float
70,64,65,38,102,102,102,102,102,
106,
106};


final List<String> columns = Arrays.asList("a", "b", "c", "d", "e", "f", "g");
final ArrayList<Row> rows = new ArrayList<>(1);
rows.add(new Row(new Cell("series"), new Cell("family"), Cell.newTimestamp(12345678),
new Cell(1L), new Cell(true), new Cell(34.3), null));

try
{
OtpOutputStream os = TermToBinaryCodec.encodeTsPutRequest(TABLE_NAME, columns, rows);
os.flush();
byte[] msg = os.toByteArray();
Assert.assertArrayEquals(exp, msg);
}
catch (IOException ex)
{
Assert.fail(ex.getMessage());
}
}

@Test
public void encodesGetRequestCorrectly()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import com.basho.riak.client.core.RiakFuture;
import com.basho.riak.client.core.operations.ts.StoreOperation;
import com.basho.riak.client.core.query.timeseries.ColumnDescription;
import org.junit.Test;

import java.util.Collection;
import java.util.concurrent.ExecutionException;

import static org.junit.Assert.assertTrue;
Expand All @@ -26,4 +28,17 @@ public void writesDataWithoutError() throws ExecutionException, InterruptedExcep
assertTrue(future.isSuccess());
}

@Test
public void writesDataWithColumnsOptionWithoutError() throws ExecutionException, InterruptedException
{
StoreOperation storeOp = new StoreOperation.Builder(tableName)
.withColumnDescriptions(GeoCheckinWideTableDefinition.getFullColumnDescriptions())
.withRows(rows).build();

RiakFuture<Void, String> future = cluster.execute(storeOp);

future.get();
assertTrue(future.isSuccess());
}

}