diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStream.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStream.java index 03ce13c9780..f0fc2b9aa0e 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStream.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStream.java @@ -220,79 +220,79 @@ public void close() throws Exception { * @return Whether or not more data was found. */ public boolean next() { - try { - if (completed.isDone() && queue.isEmpty()) { - return false; - } - - pending--; - requestOutstanding(); + while (true) { // Add a loop to make the logic iterative + try { + if (completed.isDone() && queue.isEmpty()) { + return false; + } - Object data = queue.take(); - if (DONE == data) { - queue.put(DONE); - // Other code ignores the value of this CompletableFuture, only whether it's completed (or has an exception) - completed.complete(null); - return false; - } else if (DONE_EX == data) { - queue.put(DONE_EX); - if (ex instanceof Exception) { - throw (Exception) ex; + pending--; + requestOutstanding(); + + Object data = queue.take(); + if (DONE == data) { + queue.put(DONE); + completed.complete(null); + return false; + } else if (DONE_EX == data) { + queue.put(DONE_EX); + if (ex instanceof Exception) { + throw (Exception) ex; + } else { + throw new Exception(ex); + } } else { - throw new Exception(ex); - } - } else { - try (ArrowMessage msg = ((ArrowMessage) data)) { - if (msg.getMessageType() == HeaderType.NONE) { - updateMetadata(msg); - // We received a message without data, so erase any leftover data - if (fulfilledRoot != null) { - fulfilledRoot.clear(); - } - } else if (msg.getMessageType() == HeaderType.RECORD_BATCH) { - checkMetadataVersion(msg); - // Ensure we have the root - root.get().clear(); - try (ArrowRecordBatch arb = msg.asRecordBatch()) { - loader.load(arb); - } - updateMetadata(msg); - } else if (msg.getMessageType() == HeaderType.DICTIONARY_BATCH) { - checkMetadataVersion(msg); - // Ensure we have the root - root.get().clear(); - try (ArrowDictionaryBatch arb = msg.asDictionaryBatch()) { - final long id = arb.getDictionaryId(); - if (dictionaries == null) { - throw new IllegalStateException("Dictionary ownership was claimed by the application."); + try (ArrowMessage msg = ((ArrowMessage) data)) { + if (msg.getMessageType() == HeaderType.NONE) { + updateMetadata(msg); + if (fulfilledRoot != null) { + fulfilledRoot.clear(); } - final Dictionary dictionary = dictionaries.lookup(id); - if (dictionary == null) { - throw new IllegalArgumentException("Dictionary not defined in schema: ID " + id); + } else if (msg.getMessageType() == HeaderType.RECORD_BATCH) { + checkMetadataVersion(msg); + root.get().clear(); + try (ArrowRecordBatch arb = msg.asRecordBatch()) { + loader.load(arb); } - - final FieldVector vector = dictionary.getVector(); - final VectorSchemaRoot dictionaryRoot = new VectorSchemaRoot(Collections.singletonList(vector.getField()), - Collections.singletonList(vector), 0); - final VectorLoader dictionaryLoader = new VectorLoader(dictionaryRoot); - dictionaryLoader.load(arb.getDictionary()); + updateMetadata(msg); + } else if (msg.getMessageType() == HeaderType.DICTIONARY_BATCH) { + checkMetadataVersion(msg); + root.get().clear(); + try (ArrowDictionaryBatch arb = msg.asDictionaryBatch()) { + final long id = arb.getDictionaryId(); + if (dictionaries == null) { + throw new IllegalStateException("Dictionary ownership was claimed by the application."); + } + final Dictionary dictionary = dictionaries.lookup(id); + if (dictionary == null) { + throw new IllegalArgumentException("Dictionary not defined in schema: ID " + id); + } + + final FieldVector vector = dictionary.getVector(); + final VectorSchemaRoot dictionaryRoot = new VectorSchemaRoot( + Collections.singletonList(vector.getField()), + Collections.singletonList(vector), 0); + final VectorLoader dictionaryLoader = new VectorLoader(dictionaryRoot); + dictionaryLoader.load(arb.getDictionary()); + } + continue; // Skip the remaining code and iterate again + } else { + throw new UnsupportedOperationException("Message type is unsupported: " + msg.getMessageType()); } - return next(); - } else { - throw new UnsupportedOperationException("Message type is unsupported: " + msg.getMessageType()); + return true; } - return true; } + } catch (RuntimeException e) { + throw e; + } catch (ExecutionException e) { + throw StatusUtils.fromThrowable(e.getCause()); + } catch (Exception e) { + throw new RuntimeException(e); } - } catch (RuntimeException e) { - throw e; - } catch (ExecutionException e) { - throw StatusUtils.fromThrowable(e.getCause()); - } catch (Exception e) { - throw new RuntimeException(e); } } + /** Update our metadata reference with a new one from this message. */ private void updateMetadata(ArrowMessage msg) { if (this.applicationMetadata != null) {