From 5908276708c83ab4b73d8c31ecec9edb2e5dfca6 Mon Sep 17 00:00:00 2001 From: "Y. SREENIVASULU REDDY" Date: Mon, 16 Dec 2024 15:05:45 +0530 Subject: [PATCH] HBASE-25839 Bulk Import fails with java.io.IOException: Type mismatch in value from map --- .../apache/hadoop/hbase/mapreduce/Import.java | 9 ++-- .../hbase/mapreduce/TestImportExport.java | 41 +++++++++++++++++++ 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index 9f507dc3eaaf..4adcfbfcd3f6 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -202,8 +202,11 @@ public CellWritableComparable(Cell kv) { @Override public void write(DataOutput out) throws IOException { - out.writeInt(PrivateCellUtil.estimatedSerializedSizeOfKey(kv)); - out.writeInt(0); + int keyLen = PrivateCellUtil.estimatedSerializedSizeOfKey(kv); + int valueLen = 0; // We avoid writing value here. So just serialize as if an empty value. + out.writeInt(keyLen + valueLen + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE); + out.writeInt(keyLen); + out.writeInt(valueLen); PrivateCellUtil.writeFlatKey(kv, out); } @@ -413,7 +416,7 @@ public void map(ImmutableBytesWritable row, Result value, Context context) throw // skip if we filtered it out if (kv == null) continue; Cell ret = convertKv(kv, cfRenameMap); - context.write(new CellWritableComparable(ret), ret); + context.write(new CellWritableComparable(ret), new MapReduceExtendedCell(ret)); } } } catch (InterruptedException e) { diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java index 437689844f8c..fb4a8a502442 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java @@ -508,6 +508,47 @@ public void testWithFilter() throws Throwable { importTable.close(); } + /** + * Create a simple table, run an Export Job on it, Import with bulk output and enable largeResult + */ + @Test + public void testBulkImportAndLargeResult() throws Throwable { + // Create simple table to export + TableDescriptor desc = TableDescriptorBuilder + .newBuilder(TableName.valueOf(name.getMethodName())) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build()) + .build(); + UTIL.getAdmin().createTable(desc); + Table exportTable = UTIL.getConnection().getTable(desc.getTableName()); + + Put p1 = new Put(ROW1); + p1.addColumn(FAMILYA, QUAL, now, QUAL); + + // Having another row would actually test the filter. + Put p2 = new Put(ROW2); + p2.addColumn(FAMILYA, QUAL, now, QUAL); + + exportTable.put(Arrays.asList(p1, p2)); + + // Export the simple table + String[] args = new String[] { name.getMethodName(), FQ_OUTPUT_DIR, "1000" }; + assertTrue(runExport(args)); + + // Import to a new table + final String IMPORT_TABLE = name.getMethodName() + "import"; + desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(IMPORT_TABLE)) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build()) + .build(); + UTIL.getAdmin().createTable(desc); + + String O_OUTPUT_DIR = + new Path(OUTPUT_DIR + 1).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString(); + + args = new String[] { "-D" + Import.BULK_OUTPUT_CONF_KEY + "=" + O_OUTPUT_DIR, + "-D" + Import.HAS_LARGE_RESULT + "=" + true, IMPORT_TABLE, FQ_OUTPUT_DIR, "1000" }; + assertTrue(runImport(args)); + } + /** * Count the number of keyvalues in the specified table with the given filter * @param table the table to scan