More error reporting and stats for ingestion tasks#5418
More error reporting and stats for ingestion tasks#5418jon-wei merged 12 commits intoapache:masterfrom
Conversation
bbb8d03 to
ad91de1
Compare
09ce529 to
5f70e1a
Compare
5f70e1a to
278144e
Compare
clintropolis
left a comment
There was a problem hiding this comment.
reviewed through KafkaSupervisorTest.java
| return false; | ||
| } | ||
|
|
||
| if (!Objects.equals(location, that.location)) { |
|
|
||
| public CircularBuffer(int capacity) | ||
| { | ||
| buffer = (E[]) new Object[capacity]; |
There was a problem hiding this comment.
Maybe explode with a precondition check that capacity is larger than 0 here instead of exploding out of bounds here
There was a problem hiding this comment.
Added a preconditions check
|
|
||
| import com.google.common.base.Preconditions; | ||
|
|
||
| public class CircularBuffer<E> |
There was a problem hiding this comment.
Any reason not to use https://google.github.io/guava/releases/23.0/api/docs/com/google/common/collect/EvictingQueue.html? However, it would require a different strategy for getMessagesFromSavedParseExceptions where getLatest is used.
There was a problem hiding this comment.
Decided to keep CircularBuffer for now, since it was already in the codebase and I do want getLatest
There was a problem hiding this comment.
CircularBuffer is used in this as well as ChangeRequestHistory and ChangeRequestHistory requires a randomly-accessible circular array. I think it's fine to keep this.
However, would you add some javadocs to this class? I also think we need some unit tests for this class, but it's not mandatory for this PR.
| private final Map<String, Object> metrics; | ||
|
|
||
| @Nullable | ||
| private final String errorMsg; |
|
|
||
| package io.druid.indexer; | ||
|
|
||
| public enum IngestionState |
There was a problem hiding this comment.
Is this same for all types of tasks? If so, I think it's better to expand TaskState to include these new states because every task is the ingestion task and we don't have to keep two states for them.
There was a problem hiding this comment.
I decided to keep them separate, since I mean for IngestionState to be an additional qualifier on the existing states (RUNNING,FAILED,SUCCESS). For example, a task could be RUNNING and in DETERMINE_PARTITIONS, or RUNNING and in BUILD_SEGMENTS, or similarly with FAILED.
| } | ||
| return location.equals(that.location); | ||
|
|
||
| if (!Objects.equals(location, that.location)) { |
| return TaskStatus.failure( | ||
| getId(), | ||
| getTaskCompletionMetrics(), | ||
| e.getMessage(), |
There was a problem hiding this comment.
How about passing the full stack trace rather than a simple message? It would be much helpful to understand what's going on. If some applications need only error messages, they can parse the stack trace on their own.
There was a problem hiding this comment.
I changed this to the following:
- The stack trace is passed in now, but TaskStatus will truncate the error message to 100 characters. This is to avoid large ZK/metadata objects.
- The full error + stack trace is provided in the TaskReport that's uploaded when the task completes
| } else { | ||
| fireDepartmentMetrics.incrementThrownAway(); | ||
| } | ||
| catch (ParseException e) { |
There was a problem hiding this comment.
Looks like this catch clause isn't necessary and handleParseException() can be called directly like
if (addResult.getParseException() != null) {
handleParseException(e, record);
}
There was a problem hiding this comment.
Changed this and similar places to use handleParseException
| Objects.equals(intermediateHandoffPeriod, that.intermediateHandoffPeriod) && | ||
| Objects.equals(logParseExceptions, that.logParseExceptions) && | ||
| Objects.equals(maxParseExceptions, that.maxParseExceptions) && | ||
| Objects.equals(maxSavedParseExceptions, that.maxSavedParseExceptions); |
There was a problem hiding this comment.
New variables can't be null.
There was a problem hiding this comment.
Changed these, I don't really think this matters much though since these comparisons aren't in any hot path
| private IndexTask indexTaskSpec; | ||
|
|
||
| @JsonIgnore | ||
| private final AuthorizerMapper authorizerMapper; |
| } | ||
|
|
||
| @Nullable | ||
| public static List<String> getMessagesFromSavedParseExceptions(CircularBuffer<Throwable> savedParseExceptions) |
There was a problem hiding this comment.
IndexTask class is already very long enough. Suggest to extract as a method of a separate util class.
There was a problem hiding this comment.
moved this to IndexTaskUtils
|
|
||
| // Note: This method needs to be thread safe. | ||
| protected abstract Integer addToFacts( | ||
| protected abstract Pair<Integer, List<String>> addToFacts( |
There was a problem hiding this comment.
I think Pair makes code difficult to understand because I have to imagine what makes the pair. It would be much great to add a new class.
There was a problem hiding this comment.
Changed this to use a new class instead of Pair
|
|
||
| @VisibleForTesting | ||
| TimeAndDims toTimeAndDims(InputRow row) | ||
| Pair<TimeAndDims, List<String>> toTimeAndDims(InputRow row) |
There was a problem hiding this comment.
Changed this to use a new class instead of Pair
b76d8df to
c6ff6cc
Compare
c6ff6cc to
bbda06b
Compare
6fa18f0 to
d5f1e28
Compare
| } | ||
| catch (Exception e) { | ||
| throw new ParseException(e, "Unparseable timestamp found!"); | ||
| throw new ParseException(e, "Unparseable timestamp found! Event: " + theMap); |
There was a problem hiding this comment.
nit: ParseException supports formatted string.
There was a problem hiding this comment.
Used formatted string
|
|
||
| package io.druid.indexer; | ||
|
|
||
| public enum IngestionState |
|
|
||
| import com.google.common.base.Preconditions; | ||
|
|
||
| public class CircularBuffer<E> |
There was a problem hiding this comment.
CircularBuffer is used in this as well as ChangeRequestHistory and ChangeRequestHistory requires a randomly-accessible circular array. I think it's fine to keep this.
However, would you add some javadocs to this class? I also think we need some unit tests for this class, but it's not mandatory for this PR.
| @Nullable | ||
| default Map<String, Object> getStats() | ||
| { | ||
| throw new UnsupportedOperationException("This Jobby does not implement getJobStats()."); |
There was a problem hiding this comment.
Please add the class name to the exception message.
| @Nullable | ||
| default String getErrorMessage() | ||
| { | ||
| throw new UnsupportedOperationException("This Jobby does not implement getErrorMessage()."); |
| return jsonMapper.writeValueAsString(taskDiagsMap); | ||
| } | ||
| catch (IOException | InterruptedException ie) { | ||
| log.error("couldn't get failure cause for job."); |
There was a problem hiding this comment.
Please add the exception and job name to the log message.
There was a problem hiding this comment.
Added exception and job name
| { | ||
| boolean run(); | ||
|
|
||
| @Nullable |
There was a problem hiding this comment.
Would you please add a javadoc describing when the return value can be null?
| throw new UnsupportedOperationException("This Jobby does not implement getJobStats()."); | ||
| } | ||
|
|
||
| @Nullable |
| } | ||
| writeString(dim, out); | ||
| typeHelper.serialize(out, row.getRaw(dim), reportParseExceptions); | ||
| String parseExceptionMessage = typeHelper.serialize(out, row.getRaw(dim), true); |
There was a problem hiding this comment.
Probably reportParseExceptions should be passed instead of true?
There was a problem hiding this comment.
Removed reportParseExceptions here, it should always be true (always thrown, handling depends on config)
|
|
||
| void serialize(ByteArrayDataOutput out, Object value, boolean reportParseExceptions); | ||
| @Nullable | ||
| String serialize(ByteArrayDataOutput out, Object value, boolean reportParseExceptions); |
There was a problem hiding this comment.
I'm not sure about returning an exception message. Probably it's possible that this method always throws a ParseException on parsing errors and reportParseExceptions can be handled by the caller.
There was a problem hiding this comment.
Changed this throw parse exception instead of returning the message
* Add more indexing task status and error reporting * PR comments, add support in AppenderatorDriverRealtimeIndexTask * Use TaskReport instead of metrics/context * Fix tests * Use TaskReport uploads * Refactor fire department metrics retrieval * Refactor input row serde in hadoop task * Refactor hadoop task loader names * Truncate error message in TaskStatus, add errorMsg to task report * PR comments
* This commit introduces a new tuning config called 'maxBytesInMemory' for ingestion tasks Currently a config called 'maxRowsInMemory' is present which affects how much memory gets used for indexing.If this value is not optimal for your JVM heap size, it could lead to OutOfMemoryError sometimes. A lower value will lead to frequent persists which might be bad for query performance and a higher value will limit number of persists but require more jvm heap space and could lead to OOM. 'maxBytesInMemory' is an attempt to solve this problem. It limits the total number of bytes kept in memory before persisting. * The default value is 1/3(Runtime.maxMemory()) * To maintain the current behaviour set 'maxBytesInMemory' to -1 * If both 'maxRowsInMemory' and 'maxBytesInMemory' are present, both of them will be respected i.e. the first one to go above threshold will trigger persist * Fix check style and remove a comment * Add overlord unsecured paths to coordinator when using combined service (#5579) * Add overlord unsecured paths to coordinator when using combined service * PR comment * More error reporting and stats for ingestion tasks (#5418) * Add more indexing task status and error reporting * PR comments, add support in AppenderatorDriverRealtimeIndexTask * Use TaskReport instead of metrics/context * Fix tests * Use TaskReport uploads * Refactor fire department metrics retrieval * Refactor input row serde in hadoop task * Refactor hadoop task loader names * Truncate error message in TaskStatus, add errorMsg to task report * PR comments * Allow getDomain to return disjointed intervals (#5570) * Allow getDomain to return disjointed intervals * Indentation issues * Adding feature thetaSketchConstant to do some set operation in PostAgg (#5551) * Adding feature thetaSketchConstant to do some set operation in PostAggregator * Updated review comments for PR #5551 - Adding thetaSketchConstant * Fixed CI build issue * Updated review comments 2 for PR #5551 - Adding thetaSketchConstant * Fix taskDuration docs for KafkaIndexingService (#5572) * With incremental handoff the changed line is no longer true. * Add doc for automatic pendingSegments (#5565) * Add missing doc for automatic pendingSegments * address comments * Fix indexTask to respect forceExtendableShardSpecs (#5509) * Fix indexTask to respect forceExtendableShardSpecs * add comments * Deprecate spark2 profile in pom.xml (#5581) Deprecated due to #5382 * CompressionUtils: Add support for decompressing xz, bz2, zip. (#5586) Also switch various firehoses to the new method. Fixes #5585. * This commit introduces a new tuning config called 'maxBytesInMemory' for ingestion tasks Currently a config called 'maxRowsInMemory' is present which affects how much memory gets used for indexing.If this value is not optimal for your JVM heap size, it could lead to OutOfMemoryError sometimes. A lower value will lead to frequent persists which might be bad for query performance and a higher value will limit number of persists but require more jvm heap space and could lead to OOM. 'maxBytesInMemory' is an attempt to solve this problem. It limits the total number of bytes kept in memory before persisting. * The default value is 1/3(Runtime.maxMemory()) * To maintain the current behaviour set 'maxBytesInMemory' to -1 * If both 'maxRowsInMemory' and 'maxBytesInMemory' are present, both of them will be respected i.e. the first one to go above threshold will trigger persist * Address code review comments * Fix the coding style according to druid conventions * Add more javadocs * Rename some variables/methods * Other minor issues * Address more code review comments * Some refactoring to put defaults in IndexTaskUtils * Added check for maxBytesInMemory in AppenderatorImpl * Decrement bytes in abandonSegment * Test unit test for multiple sinks in single appenderator * Fix some merge conflicts after rebase * Fix some style checks * Merge conflicts * Fix failing tests Add back check for 0 maxBytesInMemory in OnHeapIncrementalIndex * Address PR comments * Put defaults for maxRows and maxBytes in TuningConfig * Change/add javadocs * Refactoring and renaming some variables/methods * Fix TeamCity inspection warnings * Added maxBytesInMemory config to HadoopTuningConfig * Updated the docs and examples * Added maxBytesInMemory config in docs * Removed references to maxRowsInMemory under tuningConfig in examples * Set maxBytesInMemory to 0 until used Set the maxBytesInMemory to 0 if user does not set it as part of tuningConfing and set to part of max jvm memory when ingestion task starts * Update toString in KafkaSupervisorTuningConfig * Use correct maxBytesInMemory value in AppenderatorImpl * Update DEFAULT_MAX_BYTES_IN_MEMORY to 1/6 max jvm memory Experimenting with various defaults, 1/3 jvm memory causes OOM * Update docs to correct maxBytesInMemory default value * Minor to rename and add comment * Add more details in docs * Address new PR comments * Address PR comments * Fix spelling typo
…e#5583) * This commit introduces a new tuning config called 'maxBytesInMemory' for ingestion tasks Currently a config called 'maxRowsInMemory' is present which affects how much memory gets used for indexing.If this value is not optimal for your JVM heap size, it could lead to OutOfMemoryError sometimes. A lower value will lead to frequent persists which might be bad for query performance and a higher value will limit number of persists but require more jvm heap space and could lead to OOM. 'maxBytesInMemory' is an attempt to solve this problem. It limits the total number of bytes kept in memory before persisting. * The default value is 1/3(Runtime.maxMemory()) * To maintain the current behaviour set 'maxBytesInMemory' to -1 * If both 'maxRowsInMemory' and 'maxBytesInMemory' are present, both of them will be respected i.e. the first one to go above threshold will trigger persist * Fix check style and remove a comment * Add overlord unsecured paths to coordinator when using combined service (apache#5579) * Add overlord unsecured paths to coordinator when using combined service * PR comment * More error reporting and stats for ingestion tasks (apache#5418) * Add more indexing task status and error reporting * PR comments, add support in AppenderatorDriverRealtimeIndexTask * Use TaskReport instead of metrics/context * Fix tests * Use TaskReport uploads * Refactor fire department metrics retrieval * Refactor input row serde in hadoop task * Refactor hadoop task loader names * Truncate error message in TaskStatus, add errorMsg to task report * PR comments * Allow getDomain to return disjointed intervals (apache#5570) * Allow getDomain to return disjointed intervals * Indentation issues * Adding feature thetaSketchConstant to do some set operation in PostAgg (apache#5551) * Adding feature thetaSketchConstant to do some set operation in PostAggregator * Updated review comments for PR apache#5551 - Adding thetaSketchConstant * Fixed CI build issue * Updated review comments 2 for PR apache#5551 - Adding thetaSketchConstant * Fix taskDuration docs for KafkaIndexingService (apache#5572) * With incremental handoff the changed line is no longer true. * Add doc for automatic pendingSegments (apache#5565) * Add missing doc for automatic pendingSegments * address comments * Fix indexTask to respect forceExtendableShardSpecs (apache#5509) * Fix indexTask to respect forceExtendableShardSpecs * add comments * Deprecate spark2 profile in pom.xml (apache#5581) Deprecated due to apache#5382 * CompressionUtils: Add support for decompressing xz, bz2, zip. (apache#5586) Also switch various firehoses to the new method. Fixes apache#5585. * This commit introduces a new tuning config called 'maxBytesInMemory' for ingestion tasks Currently a config called 'maxRowsInMemory' is present which affects how much memory gets used for indexing.If this value is not optimal for your JVM heap size, it could lead to OutOfMemoryError sometimes. A lower value will lead to frequent persists which might be bad for query performance and a higher value will limit number of persists but require more jvm heap space and could lead to OOM. 'maxBytesInMemory' is an attempt to solve this problem. It limits the total number of bytes kept in memory before persisting. * The default value is 1/3(Runtime.maxMemory()) * To maintain the current behaviour set 'maxBytesInMemory' to -1 * If both 'maxRowsInMemory' and 'maxBytesInMemory' are present, both of them will be respected i.e. the first one to go above threshold will trigger persist * Address code review comments * Fix the coding style according to druid conventions * Add more javadocs * Rename some variables/methods * Other minor issues * Address more code review comments * Some refactoring to put defaults in IndexTaskUtils * Added check for maxBytesInMemory in AppenderatorImpl * Decrement bytes in abandonSegment * Test unit test for multiple sinks in single appenderator * Fix some merge conflicts after rebase * Fix some style checks * Merge conflicts * Fix failing tests Add back check for 0 maxBytesInMemory in OnHeapIncrementalIndex * Address PR comments * Put defaults for maxRows and maxBytes in TuningConfig * Change/add javadocs * Refactoring and renaming some variables/methods * Fix TeamCity inspection warnings * Added maxBytesInMemory config to HadoopTuningConfig * Updated the docs and examples * Added maxBytesInMemory config in docs * Removed references to maxRowsInMemory under tuningConfig in examples * Set maxBytesInMemory to 0 until used Set the maxBytesInMemory to 0 if user does not set it as part of tuningConfing and set to part of max jvm memory when ingestion task starts * Update toString in KafkaSupervisorTuningConfig * Use correct maxBytesInMemory value in AppenderatorImpl * Update DEFAULT_MAX_BYTES_IN_MEMORY to 1/6 max jvm memory Experimenting with various defaults, 1/3 jvm memory causes OOM * Update docs to correct maxBytesInMemory default value * Minor to rename and add comment * Add more details in docs * Address new PR comments * Address PR comments * Fix spelling typo
More error reporting and stats for ingestion tasks (apache#5418) These changes are required for the tasks api improvement commit
* Add more indexing task status and error reporting * PR comments, add support in AppenderatorDriverRealtimeIndexTask * Use TaskReport instead of metrics/context * Fix tests * Use TaskReport uploads * Refactor fire department metrics retrieval * Refactor input row serde in hadoop task * Refactor hadoop task loader names * Truncate error message in TaskStatus, add errorMsg to task report * PR comments
| if (ingestionSchema.getTuningConfig().isReportParseExceptions()) { | ||
| throw e; | ||
| } else { | ||
| unparseable++; |
There was a problem hiding this comment.
This change makes the if (parseable > 0) below dead. See #7737.
This PR adds more ingestion status and error reporting for IndexTask, KafkaIndexTask, and HadoopIndexTask.
Things changed:
reportParseExceptionsandignoreInvalidRowsare now deprecatedlogParseExceptions- log.error() when a parse exception occursmaxParseExceptions- The maximum number of parse exceptions allowed before a task failsmaxSavedParseExceptions- Save the last N parse exceptions to a circular buffer, retrievable via a newunparseableEventsendpoint on the tasks, and also exposed in the completed task statusreportParseExceptionsis true, this is treated asmaxParseExceptionsset to 0.processedWithErrors-> This refers to rows where the row was parseable but a column value could not be parsed into the correct type (e.g., a row that has "helloworld" for a long column). These rows are counted against themaxParseExceptionslimit.errorMsg-> Message showing cause for task failure, truncated to 100 characters. The full message is available in the TaskReport described below.rowStatspeon endpoint (http://localhost:<task port>/druid/worker/v1/chat/<task id>/rowStats)?fullwill return stats for all phases.unparseableEventspeon endpoint (http://localhost:<task port>/druid/worker/v1/chat/<task id>/unparseableEvents)http://<overlord>/druid/indexer/v1/task/<task_id>/reports) after a task completes. Contains the following:ingestionState-> shows what ingestion phase the ingestion task is currently in, possible values are NOT_STARTED, DETERMINE_PARTITIONS, BUILD_SEGMENTS, and COMPLETEDrowStatsunparseableEventserrorMsg-> shows the failure cause of a taskExample task report:
Example
rowStatsoutput:Example
unparseableEventsoutput: