Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ public class TripleNode {

public TripleNode(TriplePattern aTriplePattern, Node aNode, int aNodeIdx) {
assert (0 <= aNodeIdx && aNodeIdx <= 2);
this.hashCodeValue = this.calcHashCode();
this.tp = aTriplePattern;
this.node = aNode;
this.nodeIdx = aNodeIdx;
this.hashCodeValue = this.calcHashCode();
}

public TripleNode(TriplePattern aTriplePattern, String aNode, int aNodeIdx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public Node getObject() {
* @return
*/
public Map<TripleNode, TripleNode> findMatches(TriplePattern other) {
Map<TripleNode, TripleNode> substitutionMap = new HashMap<>();
Map<TripleNode, TripleNode> substitutionMap = new HashMap<>(3);

if (this.getSubject() instanceof Var || other.getSubject() instanceof Var) {
substitutionMap.put(new TripleNode(this, this.getSubject(), 0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.jena.graph.Node;
import org.apache.jena.sparql.core.Var;
Expand All @@ -16,6 +17,8 @@

public class TripleVarBindingSet {

private static final int PROGRESS_MILESTONE_SIZE = 1000;
private static final int LARGE_BS_SIZE = 30000;
private Set<TriplePattern> graphPattern;
private Set<TripleVarBinding> bindings;
private Set<TripleNode> tripleVarsCache;
Expand Down Expand Up @@ -156,8 +159,18 @@ public String toString() {
* @return
*/
public TripleVarBindingSet merge(TripleVarBindingSet aGraphBindingSet) {

LOG.trace("Merging {} bindings with our {} bindings.", aGraphBindingSet.getBindings().size(),
this.getBindings().size());

TripleVarBindingSet gbs = new TripleVarBindingSet(this.graphPattern);

final int otherBindingSetSize = aGraphBindingSet.getBindings().size();
final long totalCount = (long) otherBindingSetSize * (long) this.getBindings().size();
if (totalCount > LARGE_BS_SIZE)
LOG.warn("Merging 2 large BindingSets ({} * {} = {}). This can take some time.",
aGraphBindingSet.getBindings().size(), this.getBindings().size(), totalCount);

if (this.bindings.isEmpty()) {
for (TripleVarBinding tvb2 : aGraphBindingSet.getBindings()) {
gbs.add(tvb2);
Expand All @@ -166,17 +179,30 @@ public TripleVarBindingSet merge(TripleVarBindingSet aGraphBindingSet) {
// Cartesian product is the base case
gbs.addAll(aGraphBindingSet.getBindings());
gbs.addAll(this.bindings);
this.bindings.stream().parallel().forEach(tvb1 -> {
AtomicLong progress = new AtomicLong(0);

final int milestoneSize = PROGRESS_MILESTONE_SIZE;
AtomicLong nextMilestone = new AtomicLong(milestoneSize);

this.bindings.stream().parallel().forEach(tvb1 -> {
for (TripleVarBinding otherB : aGraphBindingSet.getBindings()) {
// always add a merged version of the two bindings, except when they conflict.
if (!tvb1.isConflicting(otherB)) {
gbs.add(tvb1.merge(otherB));
}
}
final long current = progress.incrementAndGet();

if (totalCount > LARGE_BS_SIZE && current == nextMilestone.get()) {
LOG.trace("{}/{} BindingSet merge tasks done!", current * otherBindingSetSize, totalCount);
nextMilestone.set(current + milestoneSize);
}
});
}

if (totalCount > LARGE_BS_SIZE)
LOG.debug("Merging large BindingSets done!");

return gbs;
}

Expand Down Expand Up @@ -252,6 +278,8 @@ public TripleVarBindingSet translate(Set<TriplePattern> graphPattern, Set<Match>
}
}

LOG.trace("Translated binding set with '{}' bindings and '{}' matches in '{}ms'.", this.bindings.size(),
match.size(), System.currentTimeMillis() - start);
return newOne;

}
Expand Down