Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.hyracks.algebricks.rewriter.rules.FactorRedundantGroupAndDecorVarsRule;
import org.apache.hyracks.algebricks.rewriter.rules.InferTypesRule;
import org.apache.hyracks.algebricks.rewriter.rules.InlineAssignIntoAggregateRule;
import org.apache.hyracks.algebricks.rewriter.rules.InlineVariablesRule;
import org.apache.hyracks.algebricks.rewriter.rules.IntroJoinInsideSubplanRule;
import org.apache.hyracks.algebricks.rewriter.rules.IntroduceAggregateCombinerRule;
import org.apache.hyracks.algebricks.rewriter.rules.IntroduceGroupByCombinerRule;
Expand Down Expand Up @@ -92,6 +91,7 @@
import org.apache.vxquery.compiler.rewriter.rules.algebricksalternatives.ExtractFunctionsFromJoinConditionRule;
import org.apache.vxquery.compiler.rewriter.rules.algebricksalternatives.InlineNestedVariablesRule;
import org.apache.vxquery.compiler.rewriter.rules.algebricksalternatives.MoveFreeVariableOperatorOutOfSubplanRule;
import org.apache.vxquery.compiler.rewriter.rules.algebricksalternatives.VXQueryInlineVariablesRule;

public class RewriteRuleset {
RewriteRuleset() {
Expand Down Expand Up @@ -272,7 +272,7 @@ public static final List<IAlgebraicRewriteRule> buildNormalizationRuleCollection
public static final List<IAlgebraicRewriteRule> buildCondPushDownRuleCollection() {
List<IAlgebraicRewriteRule> condPushDown = new LinkedList<>();
condPushDown.add(new PushSelectDownRule());
condPushDown.add(new InlineVariablesRule());
condPushDown.add(new VXQueryInlineVariablesRule());
condPushDown.add(new SubplanOutOfGroupRule());
condPushDown.add(new RemoveRedundantVariablesRule());
condPushDown.add(new RemoveUnusedAssignAndAggregateRule());
Expand All @@ -289,7 +289,7 @@ public static final List<IAlgebraicRewriteRule> buildCondPushDownRuleCollection(

public static final List<IAlgebraicRewriteRule> buildJoinInferenceRuleCollection() {
List<IAlgebraicRewriteRule> joinInference = new LinkedList<>();
joinInference.add(new InlineVariablesRule());
joinInference.add(new VXQueryInlineVariablesRule());
joinInference.add(new ComplexJoinInferenceRule());
return joinInference;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,18 @@
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.rewriter.rules.InlineVariablesRule;
import org.apache.vxquery.functions.BuiltinOperators;

/**
* Modifies the InlineVariablesRule to also process nested plans.
*/
public class InlineNestedVariablesRule extends InlineVariablesRule {

public class InlineNestedVariablesRule extends VXQueryInlineVariablesRule {
protected boolean inlineVariables(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.vxquery.compiler.rewriter.rules.algebricksalternatives;

import org.apache.hyracks.algebricks.rewriter.rules.InlineVariablesRule;
import org.apache.vxquery.functions.BuiltinOperators;

// VXQuery implementation of InlineVariablesRule to specify functions we should not inline
public class VXQueryInlineVariablesRule extends InlineVariablesRule {

public VXQueryInlineVariablesRule() {
// Ignore element constructor because we need to assign each instance a unique ID
doNotInlineFuncs.add(BuiltinOperators.ELEMENT_CONSTRUCTOR.getFunctionIdentifier());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@
<param name="parameter1" type="node()*"/>
<param name="parameter2" type="node()*"/>
<return type="node()*"/>
<runtime type="scalar" class="org.apache.vxquery.runtime.functions.sequence.OpIntersectScalarEvaluatorFactory"/>
<!-- implementation assumes input in document order -->
<property type="DocumentOrder" class="org.apache.vxquery.compiler.rewriter.rules.propagationpolicies.InputPropertyPropagationPolicy">
<argument value="0"/>
Expand Down Expand Up @@ -493,6 +494,7 @@
<param name="parameter1" type="node()*"/>
<param name="parameter2" type="node()*"/>
<return type="node()*"/>
<runtime type="scalar" class="org.apache.vxquery.runtime.functions.sequence.OpUnionScalarEvaluatorFactory"/>
<!-- as we do the doc-order-sort and the duplicate elimination -->
<!-- after the concatenation, we can reuse the concat iterator -->
<property type="DocumentOrder" class="org.apache.vxquery.compiler.rewriter.rules.propagationpolicies.InputPropertyPropagationPolicy">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
// file currently reading and
// send it to parser
InputStream in = fs.open(xmlDocument).getWrappedStream();

parser.parseHDFSElements(in, writer, fta, tupleIndex);
in.close();
}
Expand Down Expand Up @@ -260,7 +261,10 @@ public void xmlAndJsonCollection(File directory) throws HyracksDataException {
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Starting to read XML document: " + file.getAbsolutePath());
}
parser.parseElements(file, writer, tupleIndex);
ITreeNodeIdProvider myNodeIdProvider = new TreeNodeIdProvider(partitionId, dataSourceId, totalDataSources, file.getAbsolutePath());
XMLParser myparser = new XMLParser(false, myNodeIdProvider, nodeId, appender, childSeq,
dCtx.getStaticContext());
myparser.parseElements(file, writer, tupleIndex);
} else if (fileName.endsWith(".json")) {
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Starting to read JSON document: " + file.getAbsolutePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@
import org.apache.vxquery.exceptions.ErrorCode;
import org.apache.vxquery.exceptions.SystemException;
import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentScalarEvaluator;

import org.apache.vxquery.xmlparser.ITreeNodeIdProvider;
import org.apache.vxquery.xmlparser.TreeNodeIdProvider;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.data.std.api.IMutableValueStorage;
import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;

public abstract class AbstractNodeConstructorScalarEvaluator extends AbstractTaggedValueArgumentScalarEvaluator {
protected final static ITreeNodeIdProvider NodeConstructorIdProvider = new TreeNodeIdProvider((short) 0);

protected final IHyracksTaskContext ctx;

private final ArrayBackedValueStorage abvs;
Expand All @@ -42,12 +45,19 @@ public abstract class AbstractNodeConstructorScalarEvaluator extends AbstractTag

private final ArrayBackedValueStorage contentAbvs;

protected final ITreeNodeIdProvider nodeIdProvider;

public AbstractNodeConstructorScalarEvaluator(IHyracksTaskContext ctx, IScalarEvaluator[] args) {
super(args);
this.ctx = ctx;
abvs = new ArrayBackedValueStorage();
db = createsDictionary() ? new DictionaryBuilder() : null;
contentAbvs = createsDictionary() ? new ArrayBackedValueStorage() : abvs;
if (createsNodeId()) {
nodeIdProvider = NodeConstructorIdProvider;
} else {
nodeIdProvider = null;
}
}

@Override
Expand All @@ -58,7 +68,13 @@ protected final void evaluate(TaggedValuePointable[] args, IPointable result) th
DataOutput mainOut = abvs.getDataOutput();
mainOut.write(ValueTag.NODE_TREE_TAG);
byte header = (byte) (createsDictionary() ? NodeTreePointable.HEADER_DICTIONARY_EXISTS_MASK : 0);

header |= (byte) (nodeIdProvider != null ? NodeTreePointable.HEADER_NODEID_EXISTS_MASK : 0);
mainOut.write(header);

if (nodeIdProvider != null) {
mainOut.writeInt(nodeIdProvider.getId());
}
constructNode(db, args, contentAbvs);
if (createsDictionary()) {
db.write(abvs);
Expand All @@ -74,4 +90,6 @@ protected abstract void constructNode(DictionaryBuilder db, TaggedValuePointable
throws IOException, SystemException;

protected abstract boolean createsDictionary();

protected abstract boolean createsNodeId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,9 @@ protected void constructNode(DictionaryBuilder db, TaggedValuePointable[] args,
protected boolean createsDictionary() {
return true;
}

@Override
protected boolean createsNodeId() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,9 @@ protected void constructNode(DictionaryBuilder db, TaggedValuePointable[] args,
protected boolean createsDictionary() {
return false;
}

@Override
protected boolean createsNodeId() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
import org.apache.vxquery.datamodel.values.ValueTag;
import org.apache.vxquery.exceptions.ErrorCode;
import org.apache.vxquery.exceptions.SystemException;

import org.apache.vxquery.xmlparser.ITreeNodeIdProvider;
import org.apache.vxquery.xmlparser.TreeNodeIdProvider;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.data.std.api.IMutableValueStorage;
Expand Down Expand Up @@ -71,6 +72,10 @@ public class ElementNodeConstructorScalarEvaluator extends AbstractNodeConstruct

private final IMutableValueStorage abvs;

private final boolean createNodeIds;

private static int nodeIdCounter = 0;

public ElementNodeConstructorScalarEvaluator(IHyracksTaskContext ctx, IScalarEvaluator[] args) {
super(ctx, args);
anb = new AttributeNodeBuilder();
Expand All @@ -84,6 +89,8 @@ public ElementNodeConstructorScalarEvaluator(IHyracksTaskContext ctx, IScalarEva
cqp = (CodedQNamePointable) CodedQNamePointable.FACTORY.createPointable();
strp = (UTF8StringPointable) UTF8StringPointable.FACTORY.createPointable();
seqp = (SequencePointable) SequencePointable.FACTORY.createPointable();

createNodeIds = nodeIdProvider != null;
}

@Override
Expand All @@ -102,6 +109,10 @@ protected void constructNode(DictionaryBuilder db, TaggedValuePointable[] args,
namep.getLocalName(strp);
int localCode = db.lookup(strp);
enb.setName(uriCode, localCode, prefixCode);

if (createNodeIds)
enb.setLocalNodeId(nodeIdCounter++);

TaggedValuePointable valueArg = args[1];
enb.startAttributeChunk();
int index = processAttributes(valueArg, db);
Expand Down Expand Up @@ -173,6 +184,11 @@ private void copyAttribute(ElementNodeBuilder enb, DictionaryBuilder db, NodeTre
int newPrefixCode = recode(cqp.getPrefixCode(), ntp, db, strp);
int newLocalCode = recode(cqp.getLocalCode(), ntp, db, strp);
anb.setName(newURICode, newLocalCode, newPrefixCode);

if (createNodeIds) {
anb.setLocalNodeId(nodeIdCounter++);
}

anp.getValue(ntp, vp);
anb.setValue(vp);
enb.endAttribute(anb);
Expand All @@ -198,6 +214,11 @@ private void copyElement(ElementNodeBuilder enb, DictionaryBuilder db, NodeTreeP
int newPrefixCode = recode(cqp.getPrefixCode(), ntp, db, strp);
int newLocalCode = recode(cqp.getLocalCode(), ntp, db, strp);
tempEnb.setName(newURICode, newLocalCode, newPrefixCode);

if (createNodeIds) {
tempEnb.setLocalNodeId(nodeIdCounter++);
}

tempEnb.startAttributeChunk();
if (enp.attributesChunkExists()) {
enp.getAttributeSequence(ntp, seqp);
Expand Down Expand Up @@ -286,8 +307,8 @@ private int recode(int oldCode, NodeTreePointable ntp, DictionaryBuilder db, UTF
return db.lookup(tempStrp);
}

private void processChildren(TaggedValuePointable tvp, int start, DictionaryBuilder db) throws IOException,
SystemException {
private void processChildren(TaggedValuePointable tvp, int start, DictionaryBuilder db)
throws IOException, SystemException {
if (tvp.getTag() == ValueTag.SEQUENCE_TAG) {
tvp.getValue(seqp);
TaggedValuePointable tempTvp = ppool.takeOne(TaggedValuePointable.class);
Expand Down Expand Up @@ -379,6 +400,11 @@ protected boolean createsDictionary() {
return true;
}

@Override
protected boolean createsNodeId() {
return true;
}

private void copyComment(TaggedValuePointable tvp, NodeTreePointable ntp, IMutableValueStorage mvs)
throws IOException {
VoidPointable vp = ppool.takeOne(VoidPointable.class);
Expand All @@ -388,6 +414,11 @@ private void copyComment(TaggedValuePointable tvp, NodeTreePointable ntp, IMutab
tcnp.getValue(ntp, vp);

cnb.reset(mvs);

if (createNodeIds) {
cnb.setLocalNodeId(nodeIdCounter++);
}

cnb.setValue(vp);

ppool.giveBack(vp);
Expand All @@ -404,6 +435,11 @@ private void copyPI(TaggedValuePointable tvp, NodeTreePointable ntp, IMutableVal
pnp.getTarget(ntp, vp2);

pnb.reset(mvs);

if (createNodeIds) {
pnb.setLocalNodeId(nodeIdCounter++);
}

pnb.setContent(vp2);
pnb.setTarget(vp1);

Expand All @@ -412,13 +448,19 @@ private void copyPI(TaggedValuePointable tvp, NodeTreePointable ntp, IMutableVal
ppool.giveBack(vp2);
}

private void copyText(TaggedValuePointable tvp, NodeTreePointable ntp, IMutableValueStorage mvs) throws IOException {
private void copyText(TaggedValuePointable tvp, NodeTreePointable ntp, IMutableValueStorage mvs)
throws IOException {
VoidPointable vp = ppool.takeOne(VoidPointable.class);
TextOrCommentNodePointable tcnp = ppool.takeOne(TextOrCommentNodePointable.class);
tvp.getValue(tcnp);
tcnp.getValue(ntp, vp);

tnb.reset(mvs);

if (createNodeIds) {
tnb.setLocalNodeId(nodeIdCounter++);
}

tnb.setValue(vp);

ppool.giveBack(vp);
Expand All @@ -431,6 +473,11 @@ private void convertToText(TaggedValuePointable tvp, IMutableValueStorage mvs) t
TextNodeBuilder tnb = new TextNodeBuilder();
tvp.getValue(vp);
tnb.reset(mvs);

if (createNodeIds) {
tnb.setLocalNodeId(nodeIdCounter++);
}

tnb.setValue(vp);

ppool.giveBack(vp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.vxquery.runtime.functions.node;

import java.io.DataInputStream;
import java.util.HashMap;
import java.util.Map;

import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
Expand Down Expand Up @@ -56,8 +58,7 @@ protected IScalarEvaluator createEvaluator(IHyracksTaskContext ctx, IScalarEvalu
final ByteBufferInputStream bbis = new ByteBufferInputStream();
final DataInputStream di = new DataInputStream(bbis);
final int partition = ctx.getTaskAttemptId().getTaskId().getPartition();
final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider((short) partition);
final String nodeId = ctx.getJobletContext().getServiceContext().getNodeId();
final String nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();

return new AbstractTaggedValueArgumentScalarEvaluator(args) {
@Override
Expand All @@ -78,6 +79,7 @@ protected void evaluate(TaggedValuePointable[] args, IPointable result) throws S
tvp.getValue(stringp);
try {
// Only one document should be parsed so its ok to have a unique parser.
ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider((short) partition, stringp.toString());
IParser parser = new XMLParser(false, nodeIdProvider, nodeId);
FunctionHelper.readInDocFromPointable(stringp, abvs, parser);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,9 @@ protected void constructNode(DictionaryBuilder db, TaggedValuePointable[] args,
protected boolean createsDictionary() {
return false;
}

@Override
protected boolean createsNodeId() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,9 @@ protected void constructNode(DictionaryBuilder db, TaggedValuePointable[] args,
protected boolean createsDictionary() {
return false;
}

@Override
protected boolean createsNodeId() {
return false;
}
}
Loading