Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions be/src/exec/es_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ Status EsScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos)
// get batch
TExtGetNextResult result;
RETURN_IF_ERROR(get_next_from_es(result));
VLOG(1) << "es get next success: result=" << apache::thrift::ThriftDebugString(result);
_offsets[_scan_range_idx] += result.rows.num_rows;

// convert
Expand Down Expand Up @@ -748,6 +747,12 @@ Status EsScanNode::materialize_row(MemPool* tuple_pool, Tuple* tuple,
}
*reinterpret_cast<int64_t*>(slot) = col.long_vals[val_idx];
break;
case TYPE_LARGEINT:
if (val_idx >= col.long_vals.size()) {
return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "LARGEINT"));
}
*reinterpret_cast<int128_t*>(slot) = col.long_vals[val_idx];
break;
case TYPE_DOUBLE:
if (val_idx >= col.double_vals.size()) {
return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "DOUBLE"));
Expand All @@ -767,10 +772,16 @@ Status EsScanNode::materialize_row(MemPool* tuple_pool, Tuple* tuple,
*reinterpret_cast<int8_t*>(slot) = col.bool_vals[val_idx];
break;
case TYPE_DATE:
if (val_idx >= col.long_vals.size() ||
!reinterpret_cast<DateTimeValue*>(slot)->from_unixtime(col.long_vals[val_idx])) {
return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "TYPE_DATE"));
}
reinterpret_cast<DateTimeValue*>(slot)->cast_to_date();
break;
case TYPE_DATETIME: {
if (val_idx >= col.long_vals.size() ||
!reinterpret_cast<DateTimeValue*>(slot)->from_unixtime(col.long_vals[val_idx])) {
return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "TYPE_DATE|TYPE_DATETIME"));
return Status(strings::Substitute(ERROR_INVALID_COL_DATA, "TYPE_DATETIME"));
}
break;
}
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/es_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class EsScanNode : public ScanNode {
bool get_disjuncts(ExprContext* context, Expr* conjunct, vector<TExtPredicate>& disjuncts);
bool to_ext_literal(ExprContext* context, Expr* expr, TExtLiteral* literal);


bool is_match_func(Expr* conjunct);

SlotDescriptor* get_slot_desc(SlotRef* slotRef);
Expand Down
12 changes: 6 additions & 6 deletions fe/src/main/java/org/apache/doris/planner/EsScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,15 @@ public void init(Analyzer analyzer) throws UserException {

assignBackends();
}

@Override
public int getNumInstances() {
return shardScanRanges.size();
}

@Override
public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) {
try {
return getShardLocations();
} catch (UserException e) {
LOG.error("errors while get es shard locations", e);
}
return null;
return shardScanRanges;
}

@Override
Expand Down