diff --git a/tez-tools/counter-diff/counter-diff.py b/tez-tools/counter-diff/counter-diff.py deleted file mode 100644 index a7ef9fbd76..0000000000 --- a/tez-tools/counter-diff/counter-diff.py +++ /dev/null @@ -1,238 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import json -import os -import shutil -import sys -import tempfile -import zipfile - -try: - from texttable import Texttable -except ImportError: - print( - "Could not import Texttable. Retry after 'pip install texttable'", - file=sys.stderr, - ) - sys.exit(1) - -tmpdir = tempfile.mkdtemp() - - -def extract_zip(filename): - file_dir = os.path.join(tmpdir, os.path.splitext(filename)[0]) - if not os.path.exists(file_dir): - os.makedirs(file_dir) - - zip_ref = zipfile.ZipFile(os.path.abspath(filename), "r") - zip_ref.extractall(os.path.abspath(file_dir)) - zip_ref.close() - return file_dir - - -def diff(file1, file2): - # extract ZIP files - file1_dir = extract_zip(file1) - file2_dir = extract_zip(file2) - - # tez debugtool writes json data to TEZ_DAG file whereas tez UI writes to dag.json - # also in dag.json data is inside "dag" root node - file1_using_dag_json = True - dag_json_file1 = os.path.join(file1_dir, "dag.json") - if not os.path.isfile(dag_json_file1): - file1_using_dag_json = False - dag_json_file1 = os.path.join(file1_dir, "TEZ_DAG") - if not os.path.isfile(dag_json_file1): - print("Unable to find dag.json/TEZ_DAG file inside the archive " + file1) - sys.exit() - - file2_using_dag_json = True - dag_json_file2 = os.path.join(file2_dir, "dag.json") - if not os.path.isfile(dag_json_file2): - file2_using_dag_json = False - dag_json_file2 = os.path.join(file2_dir, "TEZ_DAG") - if not os.path.isfile(dag_json_file2): - print("Unable to find dag.json/TEZ_DAG file inside the archive " + file1) - sys.exit() - - # populate diff table - difftable = {} - with open(dag_json_file1) as data_file: - file1_dag_json = ( - json.load(data_file)["dag"] - if file1_using_dag_json - else json.load(data_file) - ) - - # Safe access to otherinfo and counters - otherinfo = file1_dag_json.get("otherinfo", {}) - counters = otherinfo.get("counters", {}) - - # Iterate only if counterGroups exists - for group in counters.get("counterGroups", []): - countertable = {} - for counter in group["counters"]: - counterName = counter["counterName"] - countertable[counterName] = [] - countertable[counterName].append(counter["counterValue"]) - - groupName = group["counterGroupName"] - difftable[groupName] = countertable - - # add other info safely - countertable = {} - countertable["TIME_TAKEN"] = [otherinfo.get("timeTaken", 0)] - countertable["COMPLETED_TASKS"] = [otherinfo.get("numCompletedTasks", 0)] - countertable["SUCCEEDED_TASKS"] = [otherinfo.get("numSucceededTasks", 0)] - countertable["FAILED_TASKS"] = [otherinfo.get("numFailedTasks", 0)] - countertable["KILLED_TASKS"] = [otherinfo.get("numKilledTasks", 0)] - countertable["FAILED_TASK_ATTEMPTS"] = [ - otherinfo.get("numFailedTaskAttempts", 0) - ] - countertable["KILLED_TASK_ATTEMPTS"] = [ - otherinfo.get("numKilledTaskAttempts", 0) - ] - difftable["otherinfo"] = countertable - - with open(dag_json_file2) as data_file: - file2_dag_json = ( - json.load(data_file)["dag"] - if file2_using_dag_json - else json.load(data_file) - ) - - otherinfo = file2_dag_json.get("otherinfo", {}) - counters = otherinfo.get("counters", {}) - - for group in counters.get("counterGroups", []): - groupName = group["counterGroupName"] - if groupName not in difftable: - difftable[groupName] = {} - countertable = difftable[groupName] - for counter in group["counters"]: - counterName = counter["counterName"] - # if counter does not exist in file1, add it with 0 value - if counterName not in countertable: - countertable[counterName] = [0] - countertable[counterName].append(counter["counterValue"]) - - # append other info safely - countertable = difftable["otherinfo"] - countertable["TIME_TAKEN"].append(otherinfo.get("timeTaken", 0)) - countertable["COMPLETED_TASKS"].append(otherinfo.get("numCompletedTasks", 0)) - countertable["SUCCEEDED_TASKS"].append(otherinfo.get("numSucceededTasks", 0)) - countertable["FAILED_TASKS"].append(otherinfo.get("numFailedTasks", 0)) - countertable["KILLED_TASKS"].append(otherinfo.get("numKilledTasks", 0)) - countertable["FAILED_TASK_ATTEMPTS"].append( - otherinfo.get("numFailedTaskAttempts", 0) - ) - countertable["KILLED_TASK_ATTEMPTS"].append( - otherinfo.get("numKilledTaskAttempts", 0) - ) - difftable["otherinfo"] = countertable - - # if some counters are missing, consider it as 0 and compute delta difference - for k, v in difftable.items(): - for key, value in v.items(): - # if counter value does not exisit in file2, add it with 0 value - if len(value) == 1: - value.append(0) - - # store delta difference - delta = value[1] - value[0] - value.append(("+" if delta > 0 else "") + str(delta)) - - return difftable - - -def print_table(difftable, name1, name2, detailed=False): - table = Texttable(max_width=0) - table.set_cols_align(["l", "l", "l", "l", "l"]) - table.set_cols_valign(["m", "m", "m", "m", "m"]) - table.add_row(["Counter Group", "Counter Name", name1, name2, "delta"]) - for k in sorted(difftable): - # ignore task specific counters in default output - if not detailed and ("_INPUT_" in k or "_OUTPUT_" in k): - continue - - v = difftable[k] - row = [] - # counter group. using shortname here instead of FQCN - if detailed: - row.append(k) - else: - row.append(k.split(".")[-1]) - - # keys as list (counter names) - row.append("\n".join(list(v.keys()))) - - # counter values for dag1 - for key, value in v.items(): - if len(value) == 1: - value.append(0) - value.append(value[0] - value[1]) - - # dag1 counter values - name1Val = [] - for key, value in v.items(): - name1Val.append(str(value[0])) - row.append("\n".join(name1Val)) - - # dag2 counter values - name2Val = [] - for key, value in v.items(): - name2Val.append(str(value[1])) - row.append("\n".join(name2Val)) - - # delta values - deltaVal = [] - for key, value in v.items(): - deltaVal.append(str(value[2])) - row.append("\n".join(deltaVal)) - - table.add_row(row) - - print(table.draw() + "\n") - - -def main(argv): - sysargs = len(argv) - if sysargs < 2: - print("Usage: python3 counter-diff.py dag_file1.zip dag_file2.zip [--detail]") - return -1 - - file1 = argv[0] - file2 = argv[1] - difftable = diff(file1, file2) - - detailed = False - if sysargs == 3 and argv[2] == "--detail": - detailed = True - - print_table( - difftable, os.path.splitext(file1)[0], os.path.splitext(file2)[0], detailed - ) - - -if __name__ == "__main__": - try: - sys.exit(main(sys.argv[1:])) - finally: - shutil.rmtree(tmpdir) diff --git a/tez-tools/counter-diff/counter_diff.py b/tez-tools/counter-diff/counter_diff.py new file mode 100644 index 0000000000..24f5e3758b --- /dev/null +++ b/tez-tools/counter-diff/counter_diff.py @@ -0,0 +1,252 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" +Compares Tez DAG counters between two ZIP files. + +This module extracts DAG data from two provided zip archives, parses +the counters and other metrics, and outputs a formatted table showing +the values and the delta between them. +""" + +import argparse +import json +import sys +import zipfile +from pathlib import Path +from tempfile import TemporaryDirectory +from typing import Any, Dict, List, Optional + +# Check for texttable dependency +try: + from texttable import Texttable +except ImportError: + print( + "Could not import Texttable. Retry after 'pip install texttable'", + file=sys.stderr, + ) + sys.exit(1) + + +def extract_zip(filename: Path, target_dir: Path) -> Path: + """Extracts a zip file to a specific directory.""" + file_stem = filename.stem + extract_path = target_dir / file_stem + + if not extract_path.exists(): + extract_path.mkdir(parents=True, exist_ok=True) + + with zipfile.ZipFile(filename, "r") as zip_ref: + zip_ref.extractall(extract_path) + + return extract_path + + +def load_dag_json(extracted_dir: Path) -> Dict[str, Any]: + """ + Locates and loads the DAG JSON data from an extracted directory. + Handles differences between Tez UI (dag.json) and DebugTool (TEZ_DAG). + """ + dag_json_path = extracted_dir / "dag.json" + tez_dag_path = extracted_dir / "TEZ_DAG" + + target_file: Optional[Path] = None + is_ui_format = True + + if dag_json_path.is_file(): + target_file = dag_json_path + elif tez_dag_path.is_file(): + target_file = tez_dag_path + is_ui_format = False + + if not target_file: + raise FileNotFoundError( + f"Unable to find dag.json or TEZ_DAG inside {extracted_dir}" + ) + + with target_file.open("r", encoding="utf-8") as f: + data = json.load(f) + # Tez UI wraps content in a "dag" root node; DebugTool might not + return data.get("dag", data) if is_ui_format else data + + +def _parse_metrics(data: Dict[str, Any]) -> Dict[str, Dict[str, int]]: + """ + Helper function to parse counters and 'otherinfo' from DAG data + into a structured dictionary for easy comparison. + """ + metrics: Dict[str, Dict[str, int]] = {} + + # 1. Process Standard Counters + counters = data.get("otherinfo", {}).get("counters", {}) + for group in counters.get("counterGroups", []): + group_name = group["counterGroupName"] + if group_name not in metrics: + metrics[group_name] = {} + for counter in group["counters"]: + metrics[group_name][counter["counterName"]] = counter["counterValue"] + + # 2. Process 'Other Info' Metrics + other_info = data.get("otherinfo", {}) + other_metrics_map = { + "TIME_TAKEN": "timeTaken", + "COMPLETED_TASKS": "numCompletedTasks", + "SUCCEEDED_TASKS": "numSucceededTasks", + "FAILED_TASKS": "numFailedTasks", + "KILLED_TASKS": "numKilledTasks", + "FAILED_TASK_ATTEMPTS": "numFailedTaskAttempts", + "KILLED_TASK_ATTEMPTS": "numKilledTaskAttempts", + } + + metrics["otherinfo"] = {} + for display_key, json_key in other_metrics_map.items(): + metrics["otherinfo"][display_key] = other_info.get(json_key, 0) + + return metrics + + +def _compare_group( + group_m1: Dict[str, int], group_m2: Dict[str, int] +) -> Dict[str, List[Any]]: + """ + Compares counters between two dictionaries for a single group. + Returns a dictionary mapping counter names to [val1, val2, delta_str]. + """ + group_diff = {} + all_counters = set(group_m1.keys()) | set(group_m2.keys()) + + for counter in all_counters: + val1 = group_m1.get(counter, 0) + val2 = group_m2.get(counter, 0) + delta = val2 - val1 + delta_str = ("+" if delta > 0 else "") + str(delta) + + # Store in format expected by print_table: [val1, val2, delta] + group_diff[counter] = [val1, val2, delta_str] + + return group_diff + + +def calculate_diff(file1: Path, file2: Path, temp_dir: Path) -> Dict[str, Any]: + """Calculates the difference between counters in two ZIP files.""" + + # Extract + dir1 = extract_zip(file1, temp_dir) + dir2 = extract_zip(file2, temp_dir) + + # Load and Parse + # Combining load and parse to reduce local variables + metrics1 = _parse_metrics(load_dag_json(dir1)) + metrics2 = _parse_metrics(load_dag_json(dir2)) + + diff_table: Dict[str, Any] = {} + + # Identify all unique counter groups from both files + all_groups = set(metrics1.keys()) | set(metrics2.keys()) + + for group in all_groups: + diff_table[group] = _compare_group( + metrics1.get(group, {}), metrics2.get(group, {}) + ) + + return diff_table + + +def print_table( + diff_table: Dict[str, Any], name1: str, name2: str, detailed: bool = False +) -> None: + """Formats and prints the difference table.""" + table = Texttable(max_width=0) + table.set_cols_align(["l", "l", "l", "l", "l"]) + table.set_cols_valign(["m", "m", "m", "m", "m"]) + + # Header + table.add_row(["Counter Group", "Counter Name", name1, name2, "Delta"]) + + for group_name in sorted(diff_table.keys()): + # Filter internal task counters unless detailed view is requested + if not detailed and ("_INPUT_" in group_name or "_OUTPUT_" in group_name): + continue + + group_data = diff_table[group_name] + + # Prepare columns + c_names = [] + c_val1 = [] + c_val2 = [] + c_delta = [] + + for key, values in group_data.items(): + c_names.append(key) + c_val1.append(str(values[0])) + c_val2.append(str(values[1])) + c_delta.append(str(values[2])) + + # Format Group Name (Shorten unless detailed) + display_group_name = group_name if detailed else group_name.split(".")[-1] + + row = [ + display_group_name, + "\n".join(c_names), + "\n".join(c_val1), + "\n".join(c_val2), + "\n".join(c_delta), + ] + table.add_row(row) + + print(table.draw() + "\n") + + +def main() -> None: + """ + Main entry point for the CLI tool. + Parses arguments, executes comparison, and handles top-level errors. + """ + parser = argparse.ArgumentParser( + description="Compare TeZ counters between two DAG zip files." + ) + parser.add_argument("file1", type=Path, help="Path to the first DAG zip file") + parser.add_argument("file2", type=Path, help="Path to the second DAG zip file") + parser.add_argument( + "--detail", action="store_true", help="Show detailed task-specific counters" + ) + + args = parser.parse_args() + + if not args.file1.exists(): + print(f"Error: File '{args.file1}' does not exist.", file=sys.stderr) + sys.exit(1) + if not args.file2.exists(): + print(f"Error: File '{args.file2}' does not exist.", file=sys.stderr) + sys.exit(1) + + # Use a temporary directory context manager for automatic cleanup + with TemporaryDirectory() as temp_dir_str: + temp_dir = Path(temp_dir_str) + try: + diff_data = calculate_diff(args.file1, args.file2, temp_dir) + print_table( + diff_data, args.file1.stem, args.file2.stem, detailed=args.detail + ) + except (OSError, json.JSONDecodeError, zipfile.BadZipFile, KeyError) as e: + print(f"Error processing files: {e}", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/tez-tools/tez-log-split/logsplit.py b/tez-tools/tez-log-split/logsplit.py index 063b7fd90e..f7afa244e1 100644 --- a/tez-tools/tez-log-split/logsplit.py +++ b/tez-tools/tez-log-split/logsplit.py @@ -16,111 +16,171 @@ # specific language governing permissions and limitations # under the License. # +""" +Splits aggregated YARN/Hadoop logs into individual files. -import sys -import os +This module parses an aggregated log file (typically generated by +`yarn logs -applicationId `) and splits it into a directory structure +organized by Hostname -> ContainerID -> LogFileName. +""" + +import argparse +import gzip import re -from gzip import GzipFile as GZFile -from getopt import getopt +import sys +from pathlib import Path +from typing import IO, Optional, TextIO -def usage(): - sys.stderr.write(""" -usage: logsplit.py +def open_log_file(filename: Path) -> TextIO: + """Opens a log file (text or gzip) with error replacement.""" + if filename.suffix == ".gz": + return gzip.open(filename, mode="rt", encoding="utf-8", errors="replace") + return open(filename, mode="rt", encoding="utf-8", errors="replace") -Input files for this tool can be prepared by "yarn logs -applicationId ". -""") +class AggregatedLog: + """ + Parses an aggregated log stream and writes split files to disk. -def open_file(f): - if f.endswith(".gz"): - return GZFile(f) - return open(f) + Maintains state to track which container and log file is currently + being processed within the stream. + """ + # Regex constants + HEADER_CONTAINER_RE = re.compile(r"Container: (container_[a-z0-9_]+) on (.*)") + HEADER_LAST_ROW_RE = re.compile(r"^LogContents:$") + HEADER_LOG_TYPE_RE = re.compile(r"^LogType:(.*)") + # Matches "End of LogType:..." + LAST_LOG_LINE_RE = re.compile(r"^End of LogType:.*") -class AggregatedLog(object): - def __init__(self): + def __init__(self, output_base_dir: Path): + self.output_base_dir = output_base_dir self.in_container = False self.in_logfile = False - self.current_container_header = None - self.current_container_name = None - self.current_host_name = None # as read from log line: "hello.my.host.com_8041" - self.current_file = None - self.HEADER_CONTAINER_RE = re.compile( - "Container: (container_[a-z0-9_]+) on (.*)" - ) - self.HEADER_LAST_ROW_RE = re.compile("^LogContents:$") - self.HEADER_LOG_TYPE_RE = re.compile("^LogType:(.*)") - self.LAST_LOG_LINE_RE = re.compile("^End of LogType:.*") - def process(self, input_file): - self.output_folder = input_file.name + "_splitlogs" - os.mkdir(self.output_folder) - - for line in input_file: - self.parse(line) - - def parse(self, line): + # State variables + self.current_container_header: str = "" + self.current_container_name: Optional[str] = None + self.current_host_name: Optional[str] = None + self.current_file_handle: Optional[IO] = None + + # Ensure output directory exists + if not self.output_base_dir.exists(): + self.output_base_dir.mkdir(parents=True, exist_ok=True) + + def close_current_file(self) -> None: + """Safely closes the currently open file handle.""" + if self.current_file_handle: + self.current_file_handle.close() + self.current_file_handle = None + + def start_container_folder(self) -> Path: + """Creates the directory structure for the current container.""" + if not self.current_host_name or not self.current_container_name: + raise ValueError("Missing container or host name.") + + # Structure: output/hostname/container_id + container_dir = ( + self.output_base_dir / self.current_host_name / self.current_container_name + ) + container_dir.mkdir(parents=True, exist_ok=True) + return container_dir + + def create_file_in_current_container(self, file_name: str) -> None: + """Opens a new file within the current container directory.""" + self.close_current_file() + + container_dir = self.start_container_folder() + target_path = container_dir / file_name.strip() + + # pylint: disable=consider-using-with + self.current_file_handle = open(target_path, "w", encoding="utf-8") + + def write_to_current_file(self, line: str) -> None: + """Writes a line to the currently active log file if one is open.""" + if self.current_file_handle: + self.current_file_handle.write(line) + + def parse_line(self, line: str) -> None: + """ + Parses a single line from the aggregated log. + Updates state machines (entering/exiting containers or logs) + and writes content to the appropriate split file. + """ if self.in_container: if self.in_logfile: - m = self.LAST_LOG_LINE_RE.match(line) - if m: + # We are inside a specific log (e.g., syslog) + if self.LAST_LOG_LINE_RE.match(line): self.in_container = False self.in_logfile = False - self.current_file.close() + self.close_current_file() else: self.write_to_current_file(line) else: - m = self.HEADER_LOG_TYPE_RE.match(line) - if m: - file_name = m.group(1) + # We are inside a container block, looking for a LogType header + log_type_match = self.HEADER_LOG_TYPE_RE.match(line) + + if log_type_match: + file_name = log_type_match.group(1) self.create_file_in_current_container(file_name) + elif self.HEADER_LAST_ROW_RE.match(line): self.in_logfile = True - self.write_to_current_file( - self.current_container_header - ) # for host reference + # Write the container header into the log for reference + self.write_to_current_file(self.current_container_header) else: - m = self.HEADER_CONTAINER_RE.match(line) + # scan for new container block + container_match = self.HEADER_CONTAINER_RE.match(line) self.current_container_header = line - if m: + + if container_match: self.in_container = True - self.current_container_name = m.group(1) - self.current_host_name = m.group(2) + self.current_container_name = container_match.group(1) + self.current_host_name = container_match.group(2).strip() self.start_container_folder() - def start_container_folder(self): - container_dir = os.path.join( - self.output_folder, self.get_current_container_dir_name() - ) - if not os.path.exists(container_dir): - os.makedirs(container_dir) - - def create_file_in_current_container(self, file_name): - file_to_be_created = os.path.join( - self.output_folder, self.get_current_container_dir_name(), file_name - ) - file = open(file_to_be_created, "w+") - self.current_file = file + def process(self, input_file_path: Path) -> None: + """Main processing loop: reads the input file and feeds lines to the parser.""" + try: + with open_log_file(input_file_path) as f: + for line in f: + self.parse_line(line) + finally: + # Ensure cleanup if log ends abruptly + self.close_current_file() + + +def main() -> None: + """ + Main entry point for the CLI tool. + Sets up arguments, initializes the splitter, and handles execution errors. + """ + parser = argparse.ArgumentParser( + description="Split aggregated YARN logs into individual files per container." + ) + parser.add_argument( + "logfile", type=Path, help="Path to the aggregated log file (text or .gz)" + ) + args = parser.parse_args() - def write_to_current_file(self, line): - self.current_file.write(line) + if not args.logfile.exists(): + print(f"Error: File '{args.logfile}' not found.", file=sys.stderr) + sys.exit(1) - def get_current_container_dir_name(self): - return os.path.join(self.current_host_name, self.current_container_name) + # Define output folder name based on input filename + output_folder = args.logfile.parent / (args.logfile.stem + "_splitlogs") + splitter = AggregatedLog(output_folder) -def main(argv): - (opts, args) = getopt(argv, "") - input_file = args[0] - fp = open_file(input_file) - aggregated_log = AggregatedLog() - aggregated_log.process(fp) - print( - "Split application logs was written into folder " + aggregated_log.output_folder - ) - fp.close() + try: + print(f"Processing {args.logfile}...") + splitter.process(args.logfile) + print(f"Split application logs written to: {output_folder}") + except (OSError, ValueError) as e: + print(f"An error occurred: {e}", file=sys.stderr) + sys.exit(1) if __name__ == "__main__": - sys.exit(main(sys.argv[1:])) + main()