Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
e2bdf33
Add argument validation to HadoopTables#create (#298)
chenjunjiedada Jul 25, 2019
1596d61
Install source JAR when running install target (#310)
electrum Jul 25, 2019
daf0620
Add projectStrict for Dates and Timestamps (#283)
moulimukherjee Jul 26, 2019
77a456a
Correctly publish artifacts on JitPack (#321)
electrum Jul 26, 2019
5366efe
Add build info to README.md (#304)
timmylicheng Jul 29, 2019
c36740b
Convert Iceberg time type to Hive string type (#325)
electrum Jul 29, 2019
f4fc8ff
Add overwrite option to write builders (#318)
arina-ielchiieva Jul 30, 2019
08e0873
Fix out of order Pig partition fields (#326)
danielcweeks Jul 30, 2019
1dda1ad
Add mapping to Iceberg for external name-based schemas (#338)
rdblue Jul 31, 2019
c56c0b8
Site: Fix broken link to Iceberg API (#333)
edgarRd Jul 31, 2019
b179990
Add forTable method for Avro WriteBuilder (#322)
arina-ielchiieva Jul 31, 2019
c7bf199
Remove multiple literal strings check rule for scala (#335)
chenjunjiedada Jul 31, 2019
a7c2d01
Fix invalid javadoc url in README.md (#336)
lys0716 Jul 31, 2019
fde0dcc
Use UnicodeUtil.truncateString for Truncate transform. (#340)
rdblue Jul 31, 2019
26765c8
Refactor metrics tests for reuse (#331)
edgarRd Aug 1, 2019
a7bed2b
Spark: Add support for write-audit-publish workflows (#342)
rdblue Aug 1, 2019
62d09d7
Avoid write failures if metrics mode is invalid (#301)
rdblue Aug 1, 2019
30d45f8
Fix truncateStringMax in UnicodeUtil (#334)
Jul 31, 2019
61d1e79
Merge branch 'master' into vectorized-batch-sizing
prodeezy Aug 1, 2019
67f4371
[Vectorization] Added batch sizing, switched to BufferAllocator, othe…
prodeezy Aug 1, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ The core Java library that tracks table snapshots and metadata is complete, but

The [Iceberg format specification][iceberg-spec] is being actively updated and is open for comment. Until the specification is complete and released, it carries no compatibility guarantees. The spec is currently evolving as the Java reference implementation changes.

[Java API javadocs][iceberg-javadocs] are available for the 0.6.0 tag.
[Java API javadocs][iceberg-javadocs] are available for the master.

[iceberg-javadocs]: https://iceberg.apache.org/javadoc/0.6.0/index.html?com/netflix/iceberg/package-summary.html
[iceberg-javadocs]: https://iceberg.apache.org/javadoc/master
[iceberg-spec]: https://iceberg.apache.org/spec


Expand All @@ -54,6 +54,9 @@ Community discussions happen primarily on the [dev mailing list][dev-list] or on

Iceberg is built using Gradle 5.2.1.

* To invoke a build and run tests: `./gradlew build`
* To skip tests: `./gradlew build -x test`

Iceberg table support is organized in library modules:

* `iceberg-common` contains utility classes used in other modules
Expand Down
7 changes: 7 additions & 0 deletions api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,11 @@ public interface SnapshotUpdate<ThisT> extends PendingUpdate<Snapshot> {
*/
ThisT deleteWith(Consumer<String> deleteFunc);

/**
* Called to stage a snapshot in table metadata, but not update the current snapshot id.
*
* @return this for method chaining
*/
ThisT stageOnly();

}
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,10 @@ public <T> Expression predicate(BoundPredicate<T> pred) {
// similarly, if partitioning by day(ts) and hour(ts), the more restrictive
// projection should be used. ts = 2019-01-01T01:00:00 produces day=2019-01-01 and
// hour=2019-01-01-01. the value will be in 2019-01-01-01 and not in 2019-01-01-02.
result = Expressions.and(
result,
((Transform<T, ?>) part.transform()).project(part.name(), pred));
UnboundPredicate<?> inclusiveProjection = ((Transform<T, ?>) part.transform()).project(part.name(), pred);
if (inclusiveProjection != null) {
result = Expressions.and(result, inclusiveProjection);
}
}

return result;
Expand Down Expand Up @@ -251,9 +252,10 @@ public <T> Expression predicate(BoundPredicate<T> pred) {
// any timestamp where either projection predicate is true must match the original
// predicate. For example, ts = 2019-01-01T03:00:00 matches the hour projection but not
// the day, but does match the original predicate.
result = Expressions.or(
result,
((Transform<T, ?>) part.transform()).projectStrict(part.name(), pred));
UnboundPredicate<?> strictProjection = ((Transform<T, ?>) part.transform()).projectStrict(part.name(), pred);
if (strictProjection != null) {
result = Expressions.or(result, strictProjection);
}
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@

package org.apache.iceberg.expressions;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
Expand Down Expand Up @@ -200,42 +197,66 @@ public <T> Expression notEq(BoundReference<T> ref, Literal<T> lit) {
@Override
@SuppressWarnings("unchecked")
public <T> Expression predicate(BoundPredicate<T> pred) {
// Get the strict projection of this predicate in partition data, then use it to determine
// whether to return the original predicate. The strict projection returns true iff the
// original predicate would have returned true, so the predicate can be eliminated if the
// strict projection evaluates to true.
/**
* Get the strict projection and inclusive projection of this predicate in partition data,
* then use them to determine whether to return the original predicate. The strict projection
* returns true iff the original predicate would have returned true, so the predicate can be
* eliminated if the strict projection evaluates to true. Similarly the inclusive projection
* returns false iff the original predicate would have returned false, so the predicate can
* also be eliminated if the inclusive projection evaluates to false.
*/

//
// If there is no strict projection or if it evaluates to false, then return the predicate.
List<PartitionField> parts = spec.getFieldsBySourceId(pred.ref().fieldId());
if (parts == null) {
return pred; // not associated inclusive a partition field, can't be evaluated
}

List<UnboundPredicate<?>> strictProjections = Lists.transform(parts,
part -> ((Transform<T, ?>) part.transform()).projectStrict(part.name(), pred));
for (PartitionField part : parts) {

if (Iterables.all(strictProjections, Objects::isNull)) {
// if there are no strict projections, the predicate must be in the residual
return pred;
}
// checking the strict projection
UnboundPredicate<?> strictProjection = ((Transform<T, ?>) part.transform()).projectStrict(part.name(), pred);
Expression strictResult = null;

if (strictProjection != null) {
Expression bound = strictProjection.bind(spec.partitionType(), caseSensitive);
if (bound instanceof BoundPredicate) {
strictResult = super.predicate((BoundPredicate<?>) bound);
} else {
// if the result is not a predicate, then it must be a constant like alwaysTrue or alwaysFalse
strictResult = bound;
}
}

Expression result = Expressions.alwaysFalse();
for (UnboundPredicate<?> strictProjection : strictProjections) {
if (strictProjection == null) {
continue;
if (strictResult != null && strictResult.op() == Expression.Operation.TRUE) {
// If strict is true, returning true
return Expressions.alwaysTrue();
}

Expression bound = strictProjection.bind(spec.partitionType(), caseSensitive);
if (bound instanceof BoundPredicate) {
// evaluate the bound predicate, which will return alwaysTrue or alwaysFalse
result = Expressions.or(result, super.predicate((BoundPredicate<?>) bound));
} else {
// update the result expression with the non-predicate residual (e.g. alwaysTrue)
result = Expressions.or(result, bound);
// checking the inclusive projection
UnboundPredicate<?> inclusiveProjection = ((Transform<T, ?>) part.transform()).project(part.name(), pred);
Expression inclusiveResult = null;
if (inclusiveProjection != null) {
Expression boundInclusive = inclusiveProjection.bind(spec.partitionType(), caseSensitive);
if (boundInclusive instanceof BoundPredicate) {
// using predicate method specific to inclusive
inclusiveResult = super.predicate((BoundPredicate<?>) boundInclusive);
} else {
// if the result is not a predicate, then it must be a constant like alwaysTrue or alwaysFalse
inclusiveResult = boundInclusive;
}
}

if (inclusiveResult != null && inclusiveResult.op() == Expression.Operation.FALSE) {
// If inclusive is false, returning false
return Expressions.alwaysFalse();
}

}

return result;
// neither strict not inclusive predicate was conclusive, returning the original pred
return pred;
}

@Override
Expand Down
7 changes: 5 additions & 2 deletions api/src/main/java/org/apache/iceberg/transforms/Dates.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,11 @@ public UnboundPredicate<Integer> project(String fieldName, BoundPredicate<Intege
}

@Override
public UnboundPredicate<Integer> projectStrict(String fieldName, BoundPredicate<Integer> predicate) {
return null;
public UnboundPredicate<Integer> projectStrict(String fieldName, BoundPredicate<Integer> pred) {
if (pred.op() == NOT_NULL || pred.op() == IS_NULL) {
return Expressions.predicate(pred.op(), fieldName);
}
return ProjectionUtil.truncateIntegerStrict(fieldName, pred, this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,90 @@ static <T> UnboundPredicate<T> truncateInteger(
}
}

static UnboundPredicate<Integer> truncateIntegerStrict(
String name, BoundPredicate<Integer> pred, Transform<Integer, Integer> transform) {
int boundary = pred.literal().value();
switch (pred.op()) {
case LT:
// predicate would be <= the previous partition
return predicate(Expression.Operation.LT_EQ, name, transform.apply(boundary) - 1);
case LT_EQ:
// Checking if the literal is at the upper partition boundary
if (transform.apply(boundary + 1).equals(transform.apply(boundary))) {
// Literal is not at upper boundary, for eg: 2019-07-02T02:12:34.0000
// the predicate can be < 2019-07-01
return predicate(Expression.Operation.LT_EQ, name, transform.apply(boundary) - 1);
} else {
// Literal is not at upper boundary, for eg: 2019-07-02T23:59:59.99999
// the predicate can be <= 2019-07-02
return predicate(Expression.Operation.LT_EQ, name, transform.apply(boundary));
}
case GT:
// predicate would be >= the next partition
return predicate(Expression.Operation.GT_EQ, name, transform.apply(boundary) + 1);
case GT_EQ:
// Checking if the literal is at the lower partition boundary
if (transform.apply(boundary - 1).equals(transform.apply(boundary))) {
// Literal is not at lower boundary, for eg: 2019-07-02T02:12:34.0000
// the predicate can be >= 2019-07-03
return predicate(Expression.Operation.GT_EQ, name, transform.apply(boundary) + 1);
} else {
// Literal was at the lower boundary, for eg: 2019-07-02T00:00:00.0000
// the predicate can be >= 2019-07-02
return predicate(Expression.Operation.GT_EQ, name, transform.apply(boundary));
}
case NOT_EQ:
return predicate(Expression.Operation.NOT_EQ, name, transform.apply(boundary));
case EQ:
// there is no predicate that guarantees equality because adjacent ints transform to the same value
return null;
default:
return null;
}
}

static UnboundPredicate<Integer> truncateLongStrict(
String name, BoundPredicate<Long> pred, Transform<Long, Integer> transform) {
long boundary = pred.literal().value();
switch (pred.op()) {
case LT:
// predicate would be <= the previous partition
return predicate(Expression.Operation.LT_EQ, name, transform.apply(boundary) - 1);
case LT_EQ:
// Checking if the literal is at the upper partition boundary
if (transform.apply(boundary + 1L).equals(transform.apply(boundary))) {
// Literal is not at upper boundary, for eg: 2019-07-02T02:12:34.0000
// the predicate can be <= 2019-07-01
return predicate(Expression.Operation.LT_EQ, name, transform.apply(boundary) - 1);
} else {
// Literal is not at upper boundary, for eg: 2019-07-02T23:59:59.99999
// the predicate can be <= 2019-07-02
return predicate(Expression.Operation.LT_EQ, name, transform.apply(boundary));
}
case GT:
// predicate would be >= the next partition
return predicate(Expression.Operation.GT_EQ, name, transform.apply(boundary) + 1);
case GT_EQ:
// Checking if the literal is at the lower partition boundary
if (transform.apply(boundary - 1L).equals(transform.apply(boundary))) {
// Literal is not at lower boundary, for eg: 2019-07-02T02:12:34.0000
// the predicate can be >= 2019-07-03
return predicate(Expression.Operation.GT_EQ, name, transform.apply(boundary) + 1);
} else {
// Literal was at the lower boundary, for eg: 2019-07-02T00:00:00.0000
// the predicate can be >= 2019-07-02
return predicate(Expression.Operation.GT_EQ, name, transform.apply(boundary));
}
case NOT_EQ:
return predicate(Expression.Operation.NOT_EQ, name, transform.apply(boundary));
case EQ:
// there is no predicate that guarantees equality because adjacent longs transform to the same value
return null;
default:
return null;
}
}

static <T> UnboundPredicate<T> truncateLong(
String name, BoundPredicate<Long> pred, Transform<Long, T> transform) {
long boundary = pred.literal().value();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,11 @@ public UnboundPredicate<Integer> project(String fieldName, BoundPredicate<Long>
}

@Override
public UnboundPredicate<Integer> projectStrict(String fieldName, BoundPredicate<Long> predicate) {
return null;
public UnboundPredicate<Integer> projectStrict(String fieldName, BoundPredicate<Long> pred) {
if (pred.op() == NOT_NULL || pred.op() == IS_NULL) {
return Expressions.predicate(pred.op(), fieldName);
}
return ProjectionUtil.truncateLongStrict(fieldName, pred, this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.util.UnicodeUtil;

import static org.apache.iceberg.expressions.Expression.Operation.IS_NULL;
import static org.apache.iceberg.expressions.Expression.Operation.LT;
Expand Down Expand Up @@ -233,7 +234,7 @@ public Integer width() {

@Override
public CharSequence apply(CharSequence value) {
return value.subSequence(0, Math.min(value.length(), length));
return UnicodeUtil.truncateString(value, length);
}

@Override
Expand Down
6 changes: 3 additions & 3 deletions api/src/main/java/org/apache/iceberg/util/UnicodeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ public static Literal<CharSequence> truncateStringMax(Literal<CharSequence> inpu

// Try incrementing the code points from the end
for (int i = length - 1; i >= 0; i--) {
int nextCodePoint = truncatedStringBuffer.codePointAt(i) + 1;
// Get the offset in the truncated string buffer where the number of unicode characters = i
int offsetByCodePoint = truncatedStringBuffer.offsetByCodePoints(0, i);
int nextCodePoint = truncatedStringBuffer.codePointAt(offsetByCodePoint) + 1;
// No overflow
if (nextCodePoint != 0 && Character.isValidCodePoint(nextCodePoint)) {
// Get the offset in the truncated string buffer where the number of unicode characters = i
int offsetByCodePoint = truncatedStringBuffer.offsetByCodePoints(0, i);
truncatedStringBuffer.setLength(offsetByCodePoint);
// Append next code point to the truncated substring
truncatedStringBuffer.appendCodePoint(nextCodePoint);
Expand Down
Loading