From ee8ae7de38c92c03a114fe002ee3a1204f3a6c88 Mon Sep 17 00:00:00 2001 From: leventov Date: Fri, 2 Sep 2016 19:42:13 +0300 Subject: [PATCH 1/4] Eager file unmapping in IndexIO, IndexMerger and IndexMergerV9. The exact purpose for this change is to allow running IndexMergeBenchmark in Windows, however should also be universally 'better' than non-deterministic unmapping, done when MappedByteBuffers are garbage-collected (BACKEND-312) --- .../main/java/io/druid/segment/IndexIO.java | 603 +++++++++--------- .../java/io/druid/segment/IndexMerger.java | 9 + .../java/io/druid/segment/IndexMergerV9.java | 120 ++-- 3 files changed, 377 insertions(+), 355 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/IndexIO.java b/processing/src/main/java/io/druid/segment/IndexIO.java index 73f1ac8aa999..08ac8dbb0adf 100644 --- a/processing/src/main/java/io/druid/segment/IndexIO.java +++ b/processing/src/main/java/io/druid/segment/IndexIO.java @@ -32,6 +32,7 @@ import com.google.common.collect.Sets; import com.google.common.io.ByteStreams; import com.google.common.io.Closeables; +import com.google.common.io.Closer; import com.google.common.io.Files; import com.google.common.primitives.Ints; import com.google.inject.Inject; @@ -269,13 +270,14 @@ public boolean convertSegment( case 6: case 7: log.info("Old version, re-persisting."); - QueryableIndex segmentToConvert = loadIndex(toConvert); - new IndexMerger(mapper, this).append( - Arrays.asList(new QueryableIndexIndexableAdapter(segmentToConvert)), - null, - converted, - indexSpec - ); + try (QueryableIndex segmentToConvert = loadIndex(toConvert)) { + new IndexMerger(mapper, this).append( + Arrays.asList(new QueryableIndexIndexableAdapter(segmentToConvert)), + null, + converted, + indexSpec + ); + } return true; case 8: defaultIndexIOHandler.convertV8toV9(toConvert, converted, indexSpec); @@ -545,347 +547,348 @@ public void convertV8toV9(File v8Dir, File v9Dir, IndexSpec indexSpec) Closeables.close(indexIn, false); } - SmooshedFileMapper v8SmooshedFiles = Smoosh.map(v8Dir); - - v9Dir.mkdirs(); - final FileSmoosher v9Smoosher = new FileSmoosher(v9Dir); + Closer closer = Closer.create(); + try { + SmooshedFileMapper v8SmooshedFiles = Smoosh.map(v8Dir); + closer.register(v8SmooshedFiles); - ByteStreams.write(Ints.toByteArray(9), Files.newOutputStreamSupplier(new File(v9Dir, "version.bin"))); + v9Dir.mkdirs(); + final FileSmoosher v9Smoosher = new FileSmoosher(v9Dir); + closer.register(v9Smoosher); - Map> bitmapIndexes = Maps.newHashMap(); - final ByteBuffer invertedBuffer = v8SmooshedFiles.mapFile("inverted.drd"); - BitmapSerdeFactory bitmapSerdeFactory = indexSpec.getBitmapSerdeFactory(); + ByteStreams.write(Ints.toByteArray(9), Files.newOutputStreamSupplier(new File(v9Dir, "version.bin"))); - while (invertedBuffer.hasRemaining()) { - final String dimName = serializerUtils.readString(invertedBuffer); - bitmapIndexes.put( - dimName, - GenericIndexed.read(invertedBuffer, bitmapSerdeFactory.getObjectStrategy()) - ); - } + Map> bitmapIndexes = Maps.newHashMap(); + final ByteBuffer invertedBuffer = v8SmooshedFiles.mapFile("inverted.drd"); + BitmapSerdeFactory bitmapSerdeFactory = indexSpec.getBitmapSerdeFactory(); - Map spatialIndexes = Maps.newHashMap(); - final ByteBuffer spatialBuffer = v8SmooshedFiles.mapFile("spatial.drd"); - while (spatialBuffer != null && spatialBuffer.hasRemaining()) { - spatialIndexes.put( - serializerUtils.readString(spatialBuffer), - ByteBufferSerializer.read( - spatialBuffer, new IndexedRTree.ImmutableRTreeObjectStrategy( - bitmapSerdeFactory.getBitmapFactory() - ) - ) - ); - } + while (invertedBuffer.hasRemaining()) { + final String dimName = serializerUtils.readString(invertedBuffer); + bitmapIndexes.put( + dimName, + GenericIndexed.read(invertedBuffer, bitmapSerdeFactory.getObjectStrategy()) + ); + } - final LinkedHashSet skippedFiles = Sets.newLinkedHashSet(); - final Set skippedDimensions = Sets.newLinkedHashSet(); - for (String filename : v8SmooshedFiles.getInternalFilenames()) { - log.info("Processing file[%s]", filename); - if (filename.startsWith("dim_")) { - final ColumnDescriptor.Builder builder = ColumnDescriptor.builder(); - builder.setValueType(ValueType.STRING); + Map spatialIndexes = Maps.newHashMap(); + final ByteBuffer spatialBuffer = v8SmooshedFiles.mapFile("spatial.drd"); + while (spatialBuffer != null && spatialBuffer.hasRemaining()) { + spatialIndexes.put( + serializerUtils.readString(spatialBuffer), + ByteBufferSerializer.read( + spatialBuffer, new IndexedRTree.ImmutableRTreeObjectStrategy( + bitmapSerdeFactory.getBitmapFactory() + ) + ) + ); + } - final List outParts = Lists.newArrayList(); + final LinkedHashSet skippedFiles = Sets.newLinkedHashSet(); + final Set skippedDimensions = Sets.newLinkedHashSet(); + for (String filename : v8SmooshedFiles.getInternalFilenames()) { + log.info("Processing file[%s]", filename); + if (filename.startsWith("dim_")) { + final ColumnDescriptor.Builder builder = ColumnDescriptor.builder(); + builder.setValueType(ValueType.STRING); - ByteBuffer dimBuffer = v8SmooshedFiles.mapFile(filename); - String dimension = serializerUtils.readString(dimBuffer); - if (!filename.equals(String.format("dim_%s.drd", dimension))) { - throw new ISE("loaded dimension[%s] from file[%s]", dimension, filename); - } + final List outParts = Lists.newArrayList(); - ByteArrayOutputStream nameBAOS = new ByteArrayOutputStream(); - serializerUtils.writeString(nameBAOS, dimension); - outParts.add(ByteBuffer.wrap(nameBAOS.toByteArray())); + ByteBuffer dimBuffer = v8SmooshedFiles.mapFile(filename); + String dimension = serializerUtils.readString(dimBuffer); + if (!filename.equals(String.format("dim_%s.drd", dimension))) { + throw new ISE("loaded dimension[%s] from file[%s]", dimension, filename); + } - GenericIndexed dictionary = GenericIndexed.read( - dimBuffer, GenericIndexed.STRING_STRATEGY - ); + ByteArrayOutputStream nameBAOS = new ByteArrayOutputStream(); + serializerUtils.writeString(nameBAOS, dimension); + outParts.add(ByteBuffer.wrap(nameBAOS.toByteArray())); - if (dictionary.size() == 0) { - log.info("Dimension[%s] had cardinality 0, equivalent to no column, so skipping.", dimension); - skippedDimensions.add(dimension); - continue; - } + GenericIndexed dictionary = GenericIndexed.read( + dimBuffer, GenericIndexed.STRING_STRATEGY + ); - int emptyStrIdx = dictionary.indexOf(""); - List singleValCol = null; - VSizeIndexed multiValCol = VSizeIndexed.readFromByteBuffer(dimBuffer.asReadOnlyBuffer()); - GenericIndexed bitmaps = bitmapIndexes.get(dimension); - ImmutableRTree spatialIndex = spatialIndexes.get(dimension); - - final BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory(); - boolean onlyOneValue = true; - MutableBitmap nullsSet = null; - for (int i = 0; i < multiValCol.size(); ++i) { - VSizeIndexedInts rowValue = multiValCol.get(i); - if (!onlyOneValue) { - break; + if (dictionary.size() == 0) { + log.info("Dimension[%s] had cardinality 0, equivalent to no column, so skipping.", dimension); + skippedDimensions.add(dimension); + continue; } - if (rowValue.size() > 1) { - onlyOneValue = false; - } - if (rowValue.size() == 0 || rowValue.get(0) == emptyStrIdx) { - if (nullsSet == null) { - nullsSet = bitmapFactory.makeEmptyMutableBitmap(); + + int emptyStrIdx = dictionary.indexOf(""); + List singleValCol = null; + VSizeIndexed multiValCol = VSizeIndexed.readFromByteBuffer(dimBuffer.asReadOnlyBuffer()); + GenericIndexed bitmaps = bitmapIndexes.get(dimension); + ImmutableRTree spatialIndex = spatialIndexes.get(dimension); + + final BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory(); + boolean onlyOneValue = true; + MutableBitmap nullsSet = null; + for (int i = 0; i < multiValCol.size(); ++i) { + VSizeIndexedInts rowValue = multiValCol.get(i); + if (!onlyOneValue) { + break; + } + if (rowValue.size() > 1) { + onlyOneValue = false; + } + if (rowValue.size() == 0 || rowValue.get(0) == emptyStrIdx) { + if (nullsSet == null) { + nullsSet = bitmapFactory.makeEmptyMutableBitmap(); + } + nullsSet.add(i); } - nullsSet.add(i); } - } - if (onlyOneValue) { - log.info("Dimension[%s] is single value, converting...", dimension); - final boolean bumpedDictionary; - if (nullsSet != null) { - log.info("Dimension[%s] has null rows.", dimension); - final ImmutableBitmap theNullSet = bitmapFactory.makeImmutableBitmap(nullsSet); - - if (dictionary.get(0) != null) { - log.info("Dimension[%s] has no null value in the dictionary, expanding...", dimension); - bumpedDictionary = true; - final List nullList = Lists.newArrayList(); - nullList.add(null); - - dictionary = GenericIndexed.fromIterable( - Iterables.concat(nullList, dictionary), - GenericIndexed.STRING_STRATEGY - ); - - bitmaps = GenericIndexed.fromIterable( - Iterables.concat(Arrays.asList(theNullSet), bitmaps), - bitmapSerdeFactory.getObjectStrategy() - ); + if (onlyOneValue) { + log.info("Dimension[%s] is single value, converting...", dimension); + final boolean bumpedDictionary; + if (nullsSet != null) { + log.info("Dimension[%s] has null rows.", dimension); + final ImmutableBitmap theNullSet = bitmapFactory.makeImmutableBitmap(nullsSet); + + if (dictionary.get(0) != null) { + log.info("Dimension[%s] has no null value in the dictionary, expanding...", dimension); + bumpedDictionary = true; + final List nullList = Lists.newArrayList(); + nullList.add(null); + + dictionary = GenericIndexed.fromIterable( + Iterables.concat(nullList, dictionary), + GenericIndexed.STRING_STRATEGY + ); + + bitmaps = GenericIndexed.fromIterable( + Iterables.concat(Arrays.asList(theNullSet), bitmaps), + bitmapSerdeFactory.getObjectStrategy() + ); + } else { + bumpedDictionary = false; + bitmaps = GenericIndexed.fromIterable( + Iterables.concat( + Arrays.asList( + bitmapFactory + .union(Arrays.asList(theNullSet, bitmaps.get(0))) + ), + Iterables.skip(bitmaps, 1) + ), + bitmapSerdeFactory.getObjectStrategy() + ); + } } else { bumpedDictionary = false; - bitmaps = GenericIndexed.fromIterable( - Iterables.concat( - Arrays.asList( - bitmapFactory - .union(Arrays.asList(theNullSet, bitmaps.get(0))) - ), - Iterables.skip(bitmaps, 1) - ), - bitmapSerdeFactory.getObjectStrategy() - ); } - } else { - bumpedDictionary = false; - } - final VSizeIndexed finalMultiValCol = multiValCol; - singleValCol = new AbstractList() - { - @Override - public Integer get(int index) - { - final VSizeIndexedInts ints = finalMultiValCol.get(index); - return ints.size() == 0 ? 0 : ints.get(0) + (bumpedDictionary ? 1 : 0); - } + final VSizeIndexed finalMultiValCol = multiValCol; + singleValCol = new AbstractList() { + @Override + public Integer get(int index) { + final VSizeIndexedInts ints = finalMultiValCol.get(index); + return ints.size() == 0 ? 0 : ints.get(0) + (bumpedDictionary ? 1 : 0); + } - @Override - public int size() - { - return finalMultiValCol.size(); - } - }; + @Override + public int size() { + return finalMultiValCol.size(); + } + }; - multiValCol = null; - } else { - builder.setHasMultipleValues(true); - } + multiValCol = null; + } else { + builder.setHasMultipleValues(true); + } - final CompressedObjectStrategy.CompressionStrategy compressionStrategy = indexSpec.getDimensionCompression(); - - final DictionaryEncodedColumnPartSerde.LegacySerializerBuilder columnPartBuilder = DictionaryEncodedColumnPartSerde - .legacySerializerBuilder() - .withDictionary(dictionary) - .withBitmapSerdeFactory(bitmapSerdeFactory) - .withBitmaps(bitmaps) - .withSpatialIndex(spatialIndex) - .withByteOrder(BYTE_ORDER); - - if (singleValCol != null) { - if (compressionStrategy != CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED) { - columnPartBuilder.withSingleValuedColumn( - CompressedVSizeIntsIndexedSupplier.fromList( - singleValCol, + final CompressedObjectStrategy.CompressionStrategy compressionStrategy = indexSpec.getDimensionCompression(); + + final DictionaryEncodedColumnPartSerde.LegacySerializerBuilder columnPartBuilder = DictionaryEncodedColumnPartSerde + .legacySerializerBuilder() + .withDictionary(dictionary) + .withBitmapSerdeFactory(bitmapSerdeFactory) + .withBitmaps(bitmaps) + .withSpatialIndex(spatialIndex) + .withByteOrder(BYTE_ORDER); + + if (singleValCol != null) { + if (compressionStrategy != CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED) { + columnPartBuilder.withSingleValuedColumn( + CompressedVSizeIntsIndexedSupplier.fromList( + singleValCol, + dictionary.size(), + CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(dictionary.size()), + BYTE_ORDER, + compressionStrategy + ) + ); + } else { + columnPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size())); + } + } else if (compressionStrategy != CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED) { + columnPartBuilder.withMultiValuedColumn( + CompressedVSizeIndexedSupplier.fromIterable( + multiValCol, dictionary.size(), - CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(dictionary.size()), BYTE_ORDER, compressionStrategy ) ); } else { - columnPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size())); + columnPartBuilder.withMultiValuedColumn(multiValCol); } - } else if (compressionStrategy != CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED) { - columnPartBuilder.withMultiValuedColumn( - CompressedVSizeIndexedSupplier.fromIterable( - multiValCol, - dictionary.size(), - BYTE_ORDER, - compressionStrategy - ) - ); - } else { - columnPartBuilder.withMultiValuedColumn(multiValCol); - } - final ColumnDescriptor serdeficator = builder - .addSerde(columnPartBuilder.build()) - .build(); + final ColumnDescriptor serdeficator = builder + .addSerde(columnPartBuilder.build()) + .build(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator)); - byte[] specBytes = baos.toByteArray(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator)); + byte[] specBytes = baos.toByteArray(); - final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter( - dimension, serdeficator.numBytes() + specBytes.length - ); - channel.write(ByteBuffer.wrap(specBytes)); - serdeficator.write(channel); - channel.close(); - } else if (filename.startsWith("met_")) { - if (!filename.endsWith(String.format("%s.drd", BYTE_ORDER))) { - skippedFiles.add(filename); - continue; - } + final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter( + dimension, serdeficator.numBytes() + specBytes.length + ); + channel.write(ByteBuffer.wrap(specBytes)); + serdeficator.write(channel); + channel.close(); + } else if (filename.startsWith("met_")) { + if (!filename.endsWith(String.format("%s.drd", BYTE_ORDER))) { + skippedFiles.add(filename); + continue; + } - MetricHolder holder = MetricHolder.fromByteBuffer(v8SmooshedFiles.mapFile(filename)); - final String metric = holder.getName(); + MetricHolder holder = MetricHolder.fromByteBuffer(v8SmooshedFiles.mapFile(filename)); + final String metric = holder.getName(); - final ColumnDescriptor.Builder builder = ColumnDescriptor.builder(); + final ColumnDescriptor.Builder builder = ColumnDescriptor.builder(); - switch (holder.getType()) { - case LONG: - builder.setValueType(ValueType.LONG); - builder.addSerde( - LongGenericColumnPartSerde.legacySerializerBuilder() - .withByteOrder(BYTE_ORDER) - .withDelegate(holder.longType) - .build() - ); - break; - case FLOAT: - builder.setValueType(ValueType.FLOAT); - builder.addSerde( - FloatGenericColumnPartSerde.legacySerializerBuilder() - .withByteOrder(BYTE_ORDER) - .withDelegate(holder.floatType) - .build() - ); - break; - case COMPLEX: - if (!(holder.complexType instanceof GenericIndexed)) { - throw new ISE("Serialized complex types must be GenericIndexed objects."); - } - final GenericIndexed column = (GenericIndexed) holder.complexType; - final String complexType = holder.getTypeName(); - builder.setValueType(ValueType.COMPLEX); - builder.addSerde( - ComplexColumnPartSerde.legacySerializerBuilder() - .withTypeName(complexType) - .withDelegate(column).build() - ); - break; - default: - throw new ISE("Unknown type[%s]", holder.getType()); - } + switch (holder.getType()) { + case LONG: + builder.setValueType(ValueType.LONG); + builder.addSerde( + LongGenericColumnPartSerde.legacySerializerBuilder() + .withByteOrder(BYTE_ORDER) + .withDelegate(holder.longType) + .build() + ); + break; + case FLOAT: + builder.setValueType(ValueType.FLOAT); + builder.addSerde( + FloatGenericColumnPartSerde.legacySerializerBuilder() + .withByteOrder(BYTE_ORDER) + .withDelegate(holder.floatType) + .build() + ); + break; + case COMPLEX: + if (!(holder.complexType instanceof GenericIndexed)) { + throw new ISE("Serialized complex types must be GenericIndexed objects."); + } + final GenericIndexed column = (GenericIndexed) holder.complexType; + final String complexType = holder.getTypeName(); + builder.setValueType(ValueType.COMPLEX); + builder.addSerde( + ComplexColumnPartSerde.legacySerializerBuilder() + .withTypeName(complexType) + .withDelegate(column).build() + ); + break; + default: + throw new ISE("Unknown type[%s]", holder.getType()); + } - final ColumnDescriptor serdeficator = builder.build(); + final ColumnDescriptor serdeficator = builder.build(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator)); - byte[] specBytes = baos.toByteArray(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator)); + byte[] specBytes = baos.toByteArray(); - final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter( - metric, serdeficator.numBytes() + specBytes.length - ); - channel.write(ByteBuffer.wrap(specBytes)); - serdeficator.write(channel); - channel.close(); - } else if (String.format("time_%s.drd", BYTE_ORDER).equals(filename)) { - CompressedLongsIndexedSupplier timestamps = CompressedLongsIndexedSupplier.fromByteBuffer( - v8SmooshedFiles.mapFile(filename), BYTE_ORDER - ); + final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter( + metric, serdeficator.numBytes() + specBytes.length + ); + channel.write(ByteBuffer.wrap(specBytes)); + serdeficator.write(channel); + channel.close(); + } else if (String.format("time_%s.drd", BYTE_ORDER).equals(filename)) { + CompressedLongsIndexedSupplier timestamps = CompressedLongsIndexedSupplier.fromByteBuffer( + v8SmooshedFiles.mapFile(filename), BYTE_ORDER + ); - final ColumnDescriptor.Builder builder = ColumnDescriptor.builder(); - builder.setValueType(ValueType.LONG); - builder.addSerde( - LongGenericColumnPartSerde.legacySerializerBuilder() - .withByteOrder(BYTE_ORDER) - .withDelegate(timestamps) - .build() - ); - final ColumnDescriptor serdeficator = builder.build(); + final ColumnDescriptor.Builder builder = ColumnDescriptor.builder(); + builder.setValueType(ValueType.LONG); + builder.addSerde( + LongGenericColumnPartSerde.legacySerializerBuilder() + .withByteOrder(BYTE_ORDER) + .withDelegate(timestamps) + .build() + ); + final ColumnDescriptor serdeficator = builder.build(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator)); - byte[] specBytes = baos.toByteArray(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator)); + byte[] specBytes = baos.toByteArray(); - final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter( - "__time", serdeficator.numBytes() + specBytes.length - ); - channel.write(ByteBuffer.wrap(specBytes)); - serdeficator.write(channel); - channel.close(); - } else { - skippedFiles.add(filename); + final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter( + "__time", serdeficator.numBytes() + specBytes.length + ); + channel.write(ByteBuffer.wrap(specBytes)); + serdeficator.write(channel); + channel.close(); + } else { + skippedFiles.add(filename); + } } - } - final ByteBuffer indexBuffer = v8SmooshedFiles.mapFile("index.drd"); + final ByteBuffer indexBuffer = v8SmooshedFiles.mapFile("index.drd"); - indexBuffer.get(); // Skip the version byte - final GenericIndexed dims8 = GenericIndexed.read( - indexBuffer, GenericIndexed.STRING_STRATEGY - ); - final GenericIndexed dims9 = GenericIndexed.fromIterable( - Iterables.filter( - dims8, new Predicate() - { - @Override - public boolean apply(String s) - { - return !skippedDimensions.contains(s); + indexBuffer.get(); // Skip the version byte + final GenericIndexed dims8 = GenericIndexed.read( + indexBuffer, GenericIndexed.STRING_STRATEGY + ); + final GenericIndexed dims9 = GenericIndexed.fromIterable( + Iterables.filter( + dims8, new Predicate() { + @Override + public boolean apply(String s) { + return !skippedDimensions.contains(s); + } } - } - ), - GenericIndexed.STRING_STRATEGY - ); - final GenericIndexed availableMetrics = GenericIndexed.read( - indexBuffer, GenericIndexed.STRING_STRATEGY - ); - final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer)); - final BitmapSerdeFactory segmentBitmapSerdeFactory = mapper.readValue( - serializerUtils.readString(indexBuffer), - BitmapSerdeFactory.class - ); + ), + GenericIndexed.STRING_STRATEGY + ); + final GenericIndexed availableMetrics = GenericIndexed.read( + indexBuffer, GenericIndexed.STRING_STRATEGY + ); + final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer)); + final BitmapSerdeFactory segmentBitmapSerdeFactory = mapper.readValue( + serializerUtils.readString(indexBuffer), + BitmapSerdeFactory.class + ); - Set columns = Sets.newTreeSet(); - columns.addAll(Lists.newArrayList(dims9)); - columns.addAll(Lists.newArrayList(availableMetrics)); - GenericIndexed cols = GenericIndexed.fromIterable(columns, GenericIndexed.STRING_STRATEGY); - - final String segmentBitmapSerdeFactoryString = mapper.writeValueAsString(segmentBitmapSerdeFactory); - - final long numBytes = cols.getSerializedSize() + dims9.getSerializedSize() + 16 - + serializerUtils.getSerializedStringByteSize(segmentBitmapSerdeFactoryString); - final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes); - cols.writeToChannel(writer); - dims9.writeToChannel(writer); - serializerUtils.writeLong(writer, dataInterval.getStartMillis()); - serializerUtils.writeLong(writer, dataInterval.getEndMillis()); - serializerUtils.writeString(writer, segmentBitmapSerdeFactoryString); - writer.close(); - - final ByteBuffer metadataBuffer = v8SmooshedFiles.mapFile("metadata.drd"); - if (metadataBuffer != null) { - v9Smoosher.add("metadata.drd", metadataBuffer); - } + Set columns = Sets.newTreeSet(); + columns.addAll(Lists.newArrayList(dims9)); + columns.addAll(Lists.newArrayList(availableMetrics)); + GenericIndexed cols = GenericIndexed.fromIterable(columns, GenericIndexed.STRING_STRATEGY); + + final String segmentBitmapSerdeFactoryString = mapper.writeValueAsString(segmentBitmapSerdeFactory); + + final long numBytes = cols.getSerializedSize() + dims9.getSerializedSize() + 16 + + serializerUtils.getSerializedStringByteSize(segmentBitmapSerdeFactoryString); + final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes); + cols.writeToChannel(writer); + dims9.writeToChannel(writer); + serializerUtils.writeLong(writer, dataInterval.getStartMillis()); + serializerUtils.writeLong(writer, dataInterval.getEndMillis()); + serializerUtils.writeString(writer, segmentBitmapSerdeFactoryString); + writer.close(); + + final ByteBuffer metadataBuffer = v8SmooshedFiles.mapFile("metadata.drd"); + if (metadataBuffer != null) { + v9Smoosher.add("metadata.drd", metadataBuffer); + } - log.info("Skipped files[%s]", skippedFiles); + log.info("Skipped files[%s]", skippedFiles); - v9Smoosher.close(); + } finally { + closer.close(); + } } } diff --git a/processing/src/main/java/io/druid/segment/IndexMerger.java b/processing/src/main/java/io/druid/segment/IndexMerger.java index 10622015749f..1201bd652709 100644 --- a/processing/src/main/java/io/druid/segment/IndexMerger.java +++ b/processing/src/main/java/io/druid/segment/IndexMerger.java @@ -48,6 +48,7 @@ import com.metamx.collections.spatial.ImmutableRTree; import com.metamx.collections.spatial.RTree; import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy; +import com.metamx.common.ByteBufferUtils; import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.Pair; @@ -933,6 +934,14 @@ public void close() throws IOException File dimOutFile = dimOuts.get(i).getFile(); final MappedByteBuffer dimValsMapped = Files.map(dimOutFile); + closer.register(new Closeable() + { + @Override + public void close() throws IOException + { + ByteBufferUtils.unmap(dimValsMapped); + } + }); if (!dimension.equals(serializerUtils.readString(dimValsMapped))) { throw new ISE("dimensions[%s] didn't equate!? This is a major WTF moment.", dimension); diff --git a/processing/src/main/java/io/druid/segment/IndexMergerV9.java b/processing/src/main/java/io/druid/segment/IndexMergerV9.java index 6936f4864066..299047666301 100644 --- a/processing/src/main/java/io/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/io/druid/segment/IndexMergerV9.java @@ -37,6 +37,7 @@ import com.metamx.collections.spatial.ImmutableRTree; import com.metamx.collections.spatial.RTree; import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy; +import com.metamx.common.ByteBufferUtils; import com.metamx.common.ISE; import com.metamx.common.io.smoosh.FileSmoosher; import com.metamx.common.io.smoosh.SmooshedWriter; @@ -382,7 +383,11 @@ private void makeDimensionColumns( final DictionaryEncodedColumnPartSerde.SerializerBuilder partBuilder = DictionaryEncodedColumnPartSerde .serializerBuilder() .withDictionary(dimValueWriters.get(i)) - .withValue(dimWriters.get(i), hasMultiValue, compressionStrategy != CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED) + .withValue( + dimWriters.get(i), + hasMultiValue, + compressionStrategy != CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED + ) .withBitmapSerdeFactory(bitmapSerdeFactory) .withBitmapIndex(bitmapIndexWriters.get(i)) .withSpatialIndex(spatialIndexWriters.get(i)) @@ -536,73 +541,78 @@ private void makeInvertedIndexes( fos.close(); final MappedByteBuffer dimValsMapped = Files.map(dimValueFile); - Indexed dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.STRING_STRATEGY); + try { + Indexed dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.STRING_STRATEGY); - ByteBufferWriter spatialIndexWriter = spatialIndexWriters.get(dimIndex); - RTree tree = null; - if (spatialIndexWriter != null) { - BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory(); - tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory); - } + ByteBufferWriter spatialIndexWriter = spatialIndexWriters.get(dimIndex); + RTree tree = null; + if (spatialIndexWriter != null) { + BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory(); + tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory); + } - IndexSeeker[] dictIdSeeker = toIndexSeekers(adapters, dimConversions, dimension); + IndexSeeker[] dictIdSeeker = toIndexSeekers(adapters, dimConversions, dimension); - ImmutableBitmap nullRowBitmap = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap( - nullRowsList.get(dimIndex) - ); + ImmutableBitmap nullRowBitmap = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap( + nullRowsList.get(dimIndex) + ); - //Iterate all dim values's dictionary id in ascending order which in line with dim values's compare result. - for (int dictId = 0; dictId < dimVals.size(); dictId++) { - progress.progress(); - List> convertedInverteds = Lists.newArrayListWithCapacity(adapters.size()); - for (int j = 0; j < adapters.size(); ++j) { - int seekedDictId = dictIdSeeker[j].seek(dictId); - if (seekedDictId != IndexSeeker.NOT_EXIST) { - convertedInverteds.add( - new ConvertingIndexedInts( - adapters.get(j).getBitmapIndex(dimension, seekedDictId), rowNumConversions.get(j) - ) - ); + //Iterate all dim values's dictionary id in ascending order which in line with dim values's compare result. + for (int dictId = 0; dictId < dimVals.size(); dictId++) { + progress.progress(); + List> convertedInverteds = Lists.newArrayListWithCapacity(adapters.size()); + for (int j = 0; j < adapters.size(); ++j) { + int seekedDictId = dictIdSeeker[j].seek(dictId); + if (seekedDictId != IndexSeeker.NOT_EXIST) { + convertedInverteds.add( + new ConvertingIndexedInts( + adapters.get(j).getBitmapIndex(dimension, seekedDictId), rowNumConversions.get(j) + ) + ); + } } - } - MutableBitmap bitset = bitmapSerdeFactory.getBitmapFactory().makeEmptyMutableBitmap(); - for (Integer row : CombiningIterable.createSplatted( - convertedInverteds, - Ordering.natural().nullsFirst() - )) { - if (row != INVALID_ROW) { - bitset.add(row); + MutableBitmap bitset = bitmapSerdeFactory.getBitmapFactory().makeEmptyMutableBitmap(); + for (Integer row : CombiningIterable.createSplatted( + convertedInverteds, + Ordering.natural().nullsFirst() + )) { + if (row != INVALID_ROW) { + bitset.add(row); + } } - } - ImmutableBitmap bitmapToWrite = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(bitset); - if ((dictId == 0) && (Iterables.getFirst(dimVals, "") == null)) { - bitmapToWrite = nullRowBitmap.union(bitmapToWrite); - } - bitmapIndexWriters.get(dimIndex).write(bitmapToWrite); - - if (spatialIndexWriter != null) { - String dimVal = dimVals.get(dictId); - if (dimVal != null) { - List stringCoords = Lists.newArrayList(SPLITTER.split(dimVal)); - float[] coords = new float[stringCoords.size()]; - for (int j = 0; j < coords.length; j++) { - coords[j] = Float.valueOf(stringCoords.get(j)); + ImmutableBitmap bitmapToWrite = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(bitset); + if ((dictId == 0) && (Iterables.getFirst(dimVals, "") == null)) { + bitmapToWrite = nullRowBitmap.union(bitmapToWrite); + } + bitmapIndexWriters.get(dimIndex).write(bitmapToWrite); + + if (spatialIndexWriter != null) { + String dimVal = dimVals.get(dictId); + if (dimVal != null) { + List stringCoords = Lists.newArrayList(SPLITTER.split(dimVal)); + float[] coords = new float[stringCoords.size()]; + for (int j = 0; j < coords.length; j++) { + coords[j] = Float.valueOf(stringCoords.get(j)); + } + tree.insert(coords, bitset); } - tree.insert(coords, bitset); } } + if (spatialIndexWriter != null) { + spatialIndexWriter.write(ImmutableRTree.newImmutableFromMutable(tree)); + } + log.info( + "Completed dim[%s] inverted with cardinality[%,d] in %,d millis.", + dimension, + dimVals.size(), + System.currentTimeMillis() - dimStartTime + ); } - if (spatialIndexWriter != null) { - spatialIndexWriter.write(ImmutableRTree.newImmutableFromMutable(tree)); + finally { + ByteBufferUtils.unmap(dimValsMapped); } - log.info( - "Completed dim[%s] inverted with cardinality[%,d] in %,d millis.", - dimension, - dimVals.size(), - System.currentTimeMillis() - dimStartTime - ); } log.info("Completed inverted index in %,d millis.", System.currentTimeMillis() - startTime); progress.stopSection(section); From c65c651a73af982107c1f0ecbcd9ff260ca8656a Mon Sep 17 00:00:00 2001 From: leventov Date: Mon, 5 Sep 2016 13:20:23 +0300 Subject: [PATCH 2/4] Use Closer with a proper pattern in IndexIO, IndexMerger and IndexMergerV9 --- .../src/main/java/io/druid/segment/IndexIO.java | 12 +++++++----- .../src/main/java/io/druid/segment/IndexMerger.java | 3 +++ .../main/java/io/druid/segment/IndexMergerV9.java | 3 +++ 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/IndexIO.java b/processing/src/main/java/io/druid/segment/IndexIO.java index 08ac8dbb0adf..1e9c00bc5c16 100644 --- a/processing/src/main/java/io/druid/segment/IndexIO.java +++ b/processing/src/main/java/io/druid/segment/IndexIO.java @@ -549,12 +549,10 @@ public void convertV8toV9(File v8Dir, File v9Dir, IndexSpec indexSpec) Closer closer = Closer.create(); try { - SmooshedFileMapper v8SmooshedFiles = Smoosh.map(v8Dir); - closer.register(v8SmooshedFiles); + SmooshedFileMapper v8SmooshedFiles = closer.register(Smoosh.map(v8Dir)); v9Dir.mkdirs(); - final FileSmoosher v9Smoosher = new FileSmoosher(v9Dir); - closer.register(v9Smoosher); + final FileSmoosher v9Smoosher = closer.register(new FileSmoosher(v9Dir)); ByteStreams.write(Ints.toByteArray(9), Files.newOutputStreamSupplier(new File(v9Dir, "version.bin"))); @@ -886,7 +884,11 @@ public boolean apply(String s) { log.info("Skipped files[%s]", skippedFiles); - } finally { + } + catch (Throwable t) { + throw closer.rethrow(t); + } + finally { closer.close(); } } diff --git a/processing/src/main/java/io/druid/segment/IndexMerger.java b/processing/src/main/java/io/druid/segment/IndexMerger.java index 1201bd652709..cc73cd6078c2 100644 --- a/processing/src/main/java/io/druid/segment/IndexMerger.java +++ b/processing/src/main/java/io/druid/segment/IndexMerger.java @@ -1087,6 +1087,9 @@ public void close() throws IOException indexIO.getDefaultIndexIOHandler().convertV8toV9(v8OutDir, outDir, indexSpec); return outDir; } + catch (Throwable t) { + throw closer.rethrow(t); + } finally { closer.close(); } diff --git a/processing/src/main/java/io/druid/segment/IndexMergerV9.java b/processing/src/main/java/io/druid/segment/IndexMergerV9.java index 299047666301..048fdd7bee22 100644 --- a/processing/src/main/java/io/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/io/druid/segment/IndexMergerV9.java @@ -260,6 +260,9 @@ public void close() throws IOException return outDir; } + catch (Throwable t) { + throw closer.rethrow(t); + } finally { closer.close(); } From 1cb4b115c69bf0f41796067b3f77850635603f59 Mon Sep 17 00:00:00 2001 From: leventov Date: Mon, 5 Sep 2016 13:35:28 +0300 Subject: [PATCH 3/4] Unmap file in IndexMergerV9.makeInvertedIndexes() using try-with-resources --- .../main/java/io/druid/segment/IndexMergerV9.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/IndexMergerV9.java b/processing/src/main/java/io/druid/segment/IndexMergerV9.java index 048fdd7bee22..a255bb559bfb 100644 --- a/processing/src/main/java/io/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/io/druid/segment/IndexMergerV9.java @@ -544,7 +544,14 @@ private void makeInvertedIndexes( fos.close(); final MappedByteBuffer dimValsMapped = Files.map(dimValueFile); - try { + try (Closeable dimValsMappedUnmapper = new Closeable() + { + @Override + public void close() + { + ByteBufferUtils.unmap(dimValsMapped); + } + }) { Indexed dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.STRING_STRATEGY); ByteBufferWriter spatialIndexWriter = spatialIndexWriters.get(dimIndex); @@ -613,9 +620,6 @@ private void makeInvertedIndexes( System.currentTimeMillis() - dimStartTime ); } - finally { - ByteBufferUtils.unmap(dimValsMapped); - } } log.info("Completed inverted index in %,d millis.", System.currentTimeMillis() - startTime); progress.stopSection(section); From 91d9a847ec033f63af5d4b3e712a47230abb5a75 Mon Sep 17 00:00:00 2001 From: leventov Date: Wed, 7 Sep 2016 20:00:41 +0300 Subject: [PATCH 4/4] Reformat IndexIO --- .../main/java/io/druid/segment/IndexIO.java | 61 ++++++++++--------- 1 file changed, 33 insertions(+), 28 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/IndexIO.java b/processing/src/main/java/io/druid/segment/IndexIO.java index 1e9c00bc5c16..0c6ad9f10585 100644 --- a/processing/src/main/java/io/druid/segment/IndexIO.java +++ b/processing/src/main/java/io/druid/segment/IndexIO.java @@ -121,17 +121,17 @@ public IndexIO(ObjectMapper mapper, ColumnConfig columnConfig) this.columnConfig = Preconditions.checkNotNull(columnConfig, "null ColumnConfig"); defaultIndexIOHandler = new DefaultIndexIOHandler(mapper); indexLoaders = ImmutableMap.builder() - .put(0, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) - .put(1, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) - .put(2, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) - .put(3, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) - .put(4, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) - .put(5, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) - .put(6, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) - .put(7, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) - .put(8, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) - .put(9, new V9IndexLoader(columnConfig)) - .build(); + .put(0, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) + .put(1, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) + .put(2, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) + .put(3, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) + .put(4, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) + .put(5, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) + .put(6, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) + .put(7, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) + .put(8, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) + .put(9, new V9IndexLoader(columnConfig)) + .build(); } @@ -676,15 +676,18 @@ public void convertV8toV9(File v8Dir, File v9Dir, IndexSpec indexSpec) } final VSizeIndexed finalMultiValCol = multiValCol; - singleValCol = new AbstractList() { + singleValCol = new AbstractList() + { @Override - public Integer get(int index) { + public Integer get(int index) + { final VSizeIndexedInts ints = finalMultiValCol.get(index); return ints.size() == 0 ? 0 : ints.get(0) + (bumpedDictionary ? 1 : 0); } @Override - public int size() { + public int size() + { return finalMultiValCol.size(); } }; @@ -761,18 +764,18 @@ public int size() { builder.setValueType(ValueType.LONG); builder.addSerde( LongGenericColumnPartSerde.legacySerializerBuilder() - .withByteOrder(BYTE_ORDER) - .withDelegate(holder.longType) - .build() + .withByteOrder(BYTE_ORDER) + .withDelegate(holder.longType) + .build() ); break; case FLOAT: builder.setValueType(ValueType.FLOAT); builder.addSerde( FloatGenericColumnPartSerde.legacySerializerBuilder() - .withByteOrder(BYTE_ORDER) - .withDelegate(holder.floatType) - .build() + .withByteOrder(BYTE_ORDER) + .withDelegate(holder.floatType) + .build() ); break; case COMPLEX: @@ -784,8 +787,8 @@ public int size() { builder.setValueType(ValueType.COMPLEX); builder.addSerde( ComplexColumnPartSerde.legacySerializerBuilder() - .withTypeName(complexType) - .withDelegate(column).build() + .withTypeName(complexType) + .withDelegate(column).build() ); break; default: @@ -813,9 +816,9 @@ public int size() { builder.setValueType(ValueType.LONG); builder.addSerde( LongGenericColumnPartSerde.legacySerializerBuilder() - .withByteOrder(BYTE_ORDER) - .withDelegate(timestamps) - .build() + .withByteOrder(BYTE_ORDER) + .withDelegate(timestamps) + .build() ); final ColumnDescriptor serdeficator = builder.build(); @@ -842,9 +845,11 @@ public int size() { ); final GenericIndexed dims9 = GenericIndexed.fromIterable( Iterables.filter( - dims8, new Predicate() { + dims8, new Predicate() + { @Override - public boolean apply(String s) { + public boolean apply(String s) + { return !skippedDimensions.contains(s); } } @@ -868,7 +873,7 @@ public boolean apply(String s) { final String segmentBitmapSerdeFactoryString = mapper.writeValueAsString(segmentBitmapSerdeFactory); final long numBytes = cols.getSerializedSize() + dims9.getSerializedSize() + 16 - + serializerUtils.getSerializedStringByteSize(segmentBitmapSerdeFactoryString); + + serializerUtils.getSerializedStringByteSize(segmentBitmapSerdeFactoryString); final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes); cols.writeToChannel(writer); dims9.writeToChannel(writer);