From 6bda513bb7c270c252986dfd43ee80f5b59d8eea Mon Sep 17 00:00:00 2001 From: Byunghwa Yun Date: Fri, 20 May 2016 18:43:40 +0900 Subject: [PATCH] TAJO-2160: Implement string_agg function. --- .../engine/function/TestBuiltinFunctions.java | 29 ++++ .../engine/function/builtin/StringAgg.java | 132 ++++++++++++++++++ tajo-core/src/main/proto/InternalTypes.proto | 5 + 3 files changed, 166 insertions(+) create mode 100644 tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/StringAgg.java diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java index fa658ad369..8e03696f9b 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java @@ -748,4 +748,33 @@ public void testCorr() throws Exception { executeString("DROP TABLE testbuiltin11 PURGE"); } } + + @Test + public void testStringAgg() throws Exception { + Schema schema = SchemaBuilder.builder() + .add("id", TajoDataTypes.Type.INT4) + .add("value", TajoDataTypes.Type.TEXT) + .build(); + String[] data = new String[]{ + "1|\\N", + "1|a", + "1|b", + "1|c", + "2|d", + "2|f"}; + TajoTestingCluster.createTable(conf, "testbuiltin11", schema, data, 1); + + try { + ResultSet res = executeString("select string_agg(value, '|') as value from testbuiltin11 group by id order by id"); + String ascExpected = "value\n" + + "-------------------------------\n" + + "null|a|b|c\n" + + "d|f\n"; + + assertEquals(ascExpected, resultSetToString(res)); + res.close(); + } finally { + executeString("DROP TABLE testbuiltin11 PURGE"); + } + } } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/StringAgg.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/StringAgg.java new file mode 100644 index 0000000000..18f26ca3d7 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/StringAgg.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.function.builtin; + +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.tajo.InternalTypes.StringAggProto; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.engine.function.annotation.Description; +import org.apache.tajo.engine.function.annotation.ParamTypes; +import org.apache.tajo.plan.function.AggFunction; +import org.apache.tajo.plan.function.FunctionContext; +import org.apache.tajo.storage.Tuple; + +/** + * Function definition + * + * TEXT string_agg(expr TEXT, delimiter TEXT) + */ +@Description( + functionName = "string_agg", + description = "input values concatenated into a string, separated by delimiter", + example = "> SELECT string_agg(expr, delimiter);", + returnType = Type.TEXT, + paramTypes = { @ParamTypes(paramTypes = { Type.TEXT, Type.TEXT }) } +) +public class StringAgg extends AggFunction { + + public StringAgg() { + super(new Column[] { + new Column("expr", Type.TEXT), + new Column("delimiter", Type.TEXT) + }); + } + + @Override + public FunctionContext newContext() { + return new ConcatContext(); + } + + @Override + public void eval(FunctionContext ctx, Tuple params) { + ConcatContext concatCtx = (ConcatContext) ctx; + + String concatData = null; + if (!params.isBlankOrNull(0)) { + concatData = params.getText(0); + } + + if (concatCtx.concatData.length() > 0) { + concatCtx.concatData.append(concatCtx.delimiter); + } else { + // When first time, set the delimiter. + concatCtx.delimiter = StringEscapeUtils.unescapeJava(params.getText(1)); + } + concatCtx.concatData.append(concatData); + } + + @Override + public void merge(FunctionContext context, Tuple part) { + ConcatContext concatCtx = (ConcatContext) context; + if (part.isBlankOrNull(0)) { + return; + } + + ProtobufDatum datum = (ProtobufDatum) part.getProtobufDatum(0); + StringAggProto proto = (StringAggProto) datum.get(); + + String delimiter = proto.getDelimiter(); + String concatData = proto.getValue(); + + if (concatCtx.concatData.length() > 0) { + concatCtx.concatData.append(delimiter); + } + concatCtx.concatData.append(concatData); + } + + @Override + public Datum getPartialResult(FunctionContext ctx) { + ConcatContext concatCtx = (ConcatContext) ctx; + if (concatCtx.concatData.length() == 0) { + return NullDatum.get(); + } + + StringAggProto.Builder builder = StringAggProto.newBuilder(); + builder.setDelimiter(concatCtx.delimiter); + builder.setValue(concatCtx.concatData.toString()); + return new ProtobufDatum(builder.build()); + } + + @Override + public DataType getPartialResultType() { + return CatalogUtil.newDataType(Type.PROTOBUF, StringAggProto.class.getName()); + } + + @Override + public Datum terminate(FunctionContext ctx) { + ConcatContext concatCtx = (ConcatContext) ctx; + if (concatCtx.concatData.length() == 0) { + return NullDatum.get(); + } else { + return DatumFactory.createText(concatCtx.concatData.toString()); + } + } + + protected static class ConcatContext implements FunctionContext { + String delimiter; + StringBuilder concatData = new StringBuilder(128); + } +} diff --git a/tajo-core/src/main/proto/InternalTypes.proto b/tajo-core/src/main/proto/InternalTypes.proto index fd362bac0f..7baac90254 100644 --- a/tajo-core/src/main/proto/InternalTypes.proto +++ b/tajo-core/src/main/proto/InternalTypes.proto @@ -45,3 +45,8 @@ message CorrProto { required double yvar = 5; required double covar = 6; } + +message StringAggProto { + required string value = 1; + required string delimiter = 2; +}