From d21beceff53e5a9f2bf933bcacd4f69fb7a9a1fe Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Fri, 23 Jun 2017 13:47:52 -0700 Subject: [PATCH 1/5] Add managed-ledger admin tool to access ml binary information --- .../resources/mltool/MLDataFormats_pb2.py | 342 ++++++++++++++++++ .../mltool/ManagedLedgerAdminTool.py | 261 +++++++++++++ .../src/main/resources/mltool/__init__.py | 0 3 files changed, 603 insertions(+) create mode 100644 pulsar-client-admin/src/main/resources/mltool/MLDataFormats_pb2.py create mode 100644 pulsar-client-admin/src/main/resources/mltool/ManagedLedgerAdminTool.py create mode 100644 pulsar-client-admin/src/main/resources/mltool/__init__.py diff --git a/pulsar-client-admin/src/main/resources/mltool/MLDataFormats_pb2.py b/pulsar-client-admin/src/main/resources/mltool/MLDataFormats_pb2.py new file mode 100644 index 0000000000000..debfb19af4d80 --- /dev/null +++ b/pulsar-client-admin/src/main/resources/mltool/MLDataFormats_pb2.py @@ -0,0 +1,342 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: MLDataFormats.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='MLDataFormats.proto', + package='', + serialized_pb=_b('\n\x13MLDataFormats.proto\"\xc9\x01\n\x11ManagedLedgerInfo\x12\x31\n\nledgerInfo\x18\x01 \x03(\x0b\x32\x1d.ManagedLedgerInfo.LedgerInfo\x12/\n\x12terminatedPosition\x18\x02 \x01(\x0b\x32\x13.NestedPositionInfo\x1aP\n\nLedgerInfo\x12\x10\n\x08ledgerId\x18\x01 \x02(\x03\x12\x0f\n\x07\x65ntries\x18\x02 \x01(\x03\x12\x0c\n\x04size\x18\x03 \x01(\x03\x12\x11\n\ttimestamp\x18\x04 \x01(\x03\"c\n\x0cPositionInfo\x12\x10\n\x08ledgerId\x18\x01 \x02(\x03\x12\x0f\n\x07\x65ntryId\x18\x02 \x02(\x03\x12\x30\n\x19individualDeletedMessages\x18\x03 \x03(\x0b\x32\r.MessageRange\"7\n\x12NestedPositionInfo\x12\x10\n\x08ledgerId\x18\x01 \x02(\x03\x12\x0f\n\x07\x65ntryId\x18\x02 \x02(\x03\"f\n\x0cMessageRange\x12*\n\rlowerEndpoint\x18\x01 \x02(\x0b\x32\x13.NestedPositionInfo\x12*\n\rupperEndpoint\x18\x02 \x02(\x0b\x32\x13.NestedPositionInfo\"\x95\x01\n\x11ManagedCursorInfo\x12\x17\n\x0f\x63ursorsLedgerId\x18\x01 \x02(\x03\x12\x1a\n\x12markDeleteLedgerId\x18\x02 \x01(\x03\x12\x19\n\x11markDeleteEntryId\x18\x03 \x01(\x03\x12\x30\n\x19individualDeletedMessages\x18\x04 \x03(\x0b\x32\r.MessageRangeB\'\n#org.apache.bookkeeper.mledger.protoH\x01') +) +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + + + + +_MANAGEDLEDGERINFO_LEDGERINFO = _descriptor.Descriptor( + name='LedgerInfo', + full_name='ManagedLedgerInfo.LedgerInfo', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='ledgerId', full_name='ManagedLedgerInfo.LedgerInfo.ledgerId', index=0, + number=1, type=3, cpp_type=2, label=2, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='entries', full_name='ManagedLedgerInfo.LedgerInfo.entries', index=1, + number=2, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='size', full_name='ManagedLedgerInfo.LedgerInfo.size', index=2, + number=3, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='timestamp', full_name='ManagedLedgerInfo.LedgerInfo.timestamp', index=3, + number=4, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=145, + serialized_end=225, +) + +_MANAGEDLEDGERINFO = _descriptor.Descriptor( + name='ManagedLedgerInfo', + full_name='ManagedLedgerInfo', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='ledgerInfo', full_name='ManagedLedgerInfo.ledgerInfo', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='terminatedPosition', full_name='ManagedLedgerInfo.terminatedPosition', index=1, + number=2, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[_MANAGEDLEDGERINFO_LEDGERINFO, ], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=24, + serialized_end=225, +) + + +_POSITIONINFO = _descriptor.Descriptor( + name='PositionInfo', + full_name='PositionInfo', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='ledgerId', full_name='PositionInfo.ledgerId', index=0, + number=1, type=3, cpp_type=2, label=2, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='entryId', full_name='PositionInfo.entryId', index=1, + number=2, type=3, cpp_type=2, label=2, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='individualDeletedMessages', full_name='PositionInfo.individualDeletedMessages', index=2, + number=3, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=227, + serialized_end=326, +) + + +_NESTEDPOSITIONINFO = _descriptor.Descriptor( + name='NestedPositionInfo', + full_name='NestedPositionInfo', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='ledgerId', full_name='NestedPositionInfo.ledgerId', index=0, + number=1, type=3, cpp_type=2, label=2, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='entryId', full_name='NestedPositionInfo.entryId', index=1, + number=2, type=3, cpp_type=2, label=2, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=328, + serialized_end=383, +) + + +_MESSAGERANGE = _descriptor.Descriptor( + name='MessageRange', + full_name='MessageRange', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='lowerEndpoint', full_name='MessageRange.lowerEndpoint', index=0, + number=1, type=11, cpp_type=10, label=2, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='upperEndpoint', full_name='MessageRange.upperEndpoint', index=1, + number=2, type=11, cpp_type=10, label=2, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=385, + serialized_end=487, +) + + +_MANAGEDCURSORINFO = _descriptor.Descriptor( + name='ManagedCursorInfo', + full_name='ManagedCursorInfo', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='cursorsLedgerId', full_name='ManagedCursorInfo.cursorsLedgerId', index=0, + number=1, type=3, cpp_type=2, label=2, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='markDeleteLedgerId', full_name='ManagedCursorInfo.markDeleteLedgerId', index=1, + number=2, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='markDeleteEntryId', full_name='ManagedCursorInfo.markDeleteEntryId', index=2, + number=3, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='individualDeletedMessages', full_name='ManagedCursorInfo.individualDeletedMessages', index=3, + number=4, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=490, + serialized_end=639, +) + +_MANAGEDLEDGERINFO_LEDGERINFO.containing_type = _MANAGEDLEDGERINFO +_MANAGEDLEDGERINFO.fields_by_name['ledgerInfo'].message_type = _MANAGEDLEDGERINFO_LEDGERINFO +_MANAGEDLEDGERINFO.fields_by_name['terminatedPosition'].message_type = _NESTEDPOSITIONINFO +_POSITIONINFO.fields_by_name['individualDeletedMessages'].message_type = _MESSAGERANGE +_MESSAGERANGE.fields_by_name['lowerEndpoint'].message_type = _NESTEDPOSITIONINFO +_MESSAGERANGE.fields_by_name['upperEndpoint'].message_type = _NESTEDPOSITIONINFO +_MANAGEDCURSORINFO.fields_by_name['individualDeletedMessages'].message_type = _MESSAGERANGE +DESCRIPTOR.message_types_by_name['ManagedLedgerInfo'] = _MANAGEDLEDGERINFO +DESCRIPTOR.message_types_by_name['PositionInfo'] = _POSITIONINFO +DESCRIPTOR.message_types_by_name['NestedPositionInfo'] = _NESTEDPOSITIONINFO +DESCRIPTOR.message_types_by_name['MessageRange'] = _MESSAGERANGE +DESCRIPTOR.message_types_by_name['ManagedCursorInfo'] = _MANAGEDCURSORINFO + +ManagedLedgerInfo = _reflection.GeneratedProtocolMessageType('ManagedLedgerInfo', (_message.Message,), dict( + + LedgerInfo = _reflection.GeneratedProtocolMessageType('LedgerInfo', (_message.Message,), dict( + DESCRIPTOR = _MANAGEDLEDGERINFO_LEDGERINFO, + __module__ = 'MLDataFormats_pb2' + # @@protoc_insertion_point(class_scope:ManagedLedgerInfo.LedgerInfo) + )) + , + DESCRIPTOR = _MANAGEDLEDGERINFO, + __module__ = 'MLDataFormats_pb2' + # @@protoc_insertion_point(class_scope:ManagedLedgerInfo) + )) +_sym_db.RegisterMessage(ManagedLedgerInfo) +_sym_db.RegisterMessage(ManagedLedgerInfo.LedgerInfo) + +PositionInfo = _reflection.GeneratedProtocolMessageType('PositionInfo', (_message.Message,), dict( + DESCRIPTOR = _POSITIONINFO, + __module__ = 'MLDataFormats_pb2' + # @@protoc_insertion_point(class_scope:PositionInfo) + )) +_sym_db.RegisterMessage(PositionInfo) + +NestedPositionInfo = _reflection.GeneratedProtocolMessageType('NestedPositionInfo', (_message.Message,), dict( + DESCRIPTOR = _NESTEDPOSITIONINFO, + __module__ = 'MLDataFormats_pb2' + # @@protoc_insertion_point(class_scope:NestedPositionInfo) + )) +_sym_db.RegisterMessage(NestedPositionInfo) + +MessageRange = _reflection.GeneratedProtocolMessageType('MessageRange', (_message.Message,), dict( + DESCRIPTOR = _MESSAGERANGE, + __module__ = 'MLDataFormats_pb2' + # @@protoc_insertion_point(class_scope:MessageRange) + )) +_sym_db.RegisterMessage(MessageRange) + +ManagedCursorInfo = _reflection.GeneratedProtocolMessageType('ManagedCursorInfo', (_message.Message,), dict( + DESCRIPTOR = _MANAGEDCURSORINFO, + __module__ = 'MLDataFormats_pb2' + # @@protoc_insertion_point(class_scope:ManagedCursorInfo) + )) +_sym_db.RegisterMessage(ManagedCursorInfo) + + +DESCRIPTOR.has_options = True +DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n#org.apache.bookkeeper.mledger.protoH\001')) +# @@protoc_insertion_point(module_scope) diff --git a/pulsar-client-admin/src/main/resources/mltool/ManagedLedgerAdminTool.py b/pulsar-client-admin/src/main/resources/mltool/ManagedLedgerAdminTool.py new file mode 100644 index 0000000000000..05b8e327483e5 --- /dev/null +++ b/pulsar-client-admin/src/main/resources/mltool/ManagedLedgerAdminTool.py @@ -0,0 +1,261 @@ +#!/usr/bin/env python + +import argparse +import traceback + +from kazoo.client import KazooClient + +from mltool import MLDataFormats_pb2 + +''' +This util provides api to access managed-ledger data if it's ONLY stored in binary format in to zookeeper. +It also provides command line tool-access to execute these commands. + +Run: run from parent directory +python -m mltool.ManagedLedgerAdminTool +''' +managedLedgerPath = "/managed-ledgers/" +printMlCommand = "print-ml" +deleteMlLedgerIds = "delete-ml-ledgers" +printCursorsCommands = "print-cursor" +updateMakDeleteCursor = "update-mark-delete-cursor" + +''' +Returns managed-ledger info for given managed-leger path +Parameters +---------- +zk : KazooClient + Zookeeper-client instance to query zk-client. +mlPath : str + managed-ledger path +''' +def getManagedLedgerInfo(zk, mlPath): + try: + # get managed-ledger info + mlData = zk.get(mlPath)[0] + mlInfo = MLDataFormats_pb2.ManagedLedgerInfo() + mlInfo.ParseFromString(mlData) + return mlInfo + except Exception as e: + traceback.print_exc() + print 'Failed to get data for {} due to {}'.format(mlPath, repr(e)) + + +''' +Delete specific ledgerIds from the managed-ledger info and updates into zk +Parameters +---------- +zk : KazooClient + Zookeeper-client instance to query zk-client. +mlPath : str + managed-ledger path +deleteLedgerIds : str + comma separated deleting ledger-ids (eg: 123,124) +''' +def deleteLedgerIdsFromManagedLedgerInfo(zk, mlPath, deletLedgerIds): + try: + # get managed-ledger info + mlData = zk.get(mlPath)[0] + mlInfo = MLDataFormats_pb2.ManagedLedgerInfo() + mlInfo.ParseFromString(mlData) + ledgerInfoList = mlInfo.ledgerInfo + + for ledgerInfo in ledgerInfoList: + print ledgerInfo.ledgerId + if ledgerInfo.ledgerId in deletLedgerIds: + ledgerInfoList.remove(ledgerInfo) + updatedMlInfo = mlInfo.SerializeToString(); + zk.set(mlPath, updatedMlInfo, -1) + print 'Updated {} with value\n{}'.format(mlPath, str(mlInfo)) + + except Exception as e: + traceback.print_exc() + print 'Failed to delete ledgerIds for {} due to {}'.format(mlPath, repr(e)) + +''' +Returns managed-ledger cursor info for given managed-cursor path +Parameters +---------- +zk : KazooClient + Zookeeper-client instance to query zk-client. +mlPath : str + managed-ledger path +cursorName : str + managed-cursor path +''' +def getManagedCursorInfo(zk, mlPath): + try: + cursors = zk.get_children(mlPath) + cursorList = {} + for cursor in cursors: + cursorData = zk.get(mlPath + "/" + cursor)[0] + cursorInfo = MLDataFormats_pb2.ManagedCursorInfo() + cursorInfo.ParseFromString(cursorData) + cursorList[cursor] = cursorInfo + return cursorList + except Exception as e: + traceback.print_exc() + print 'Failed to get ml-cursor {} due to {}'.format(mlPath, repr(e)) + + +''' +Update mark-delete position of the given managed-cursor into zk +Parameters +---------- +zk : KazooClient + Zookeeper-client instance to query zk-client. +mlPath : str + managed-ledger path +cursorName : str + managed-cursor path +markDeletePosition: str + markDeletePosition combination of : (eg. 123:1) +''' +def updateCursorMarkDelete(zk, cursorPath, markDeleteLedgerId, markDeleteEntryId): + try: + cursorData = zk.get(cursorPath)[0] + cursorInfo = MLDataFormats_pb2.ManagedCursorInfo() + cursorInfo.ParseFromString(cursorData) + cursorInfo.markDeleteLedgerId = markDeleteLedgerId + cursorInfo.markDeleteEntryId = markDeleteEntryId + sData = cursorInfo.SerializeToString() + zk.set(cursorPath, sData, -1) + print 'Updated {} with value \n{}'.format(cursorPath, cursorInfo) + except Exception as e: + traceback.print_exc() + print 'Failed to update ml-cursor {} due to {}'.format(cursorPath, repr(e)) + + + +''' +print managed-ledger info for given managed-leger path +Parameters +---------- +zk : KazooClient + Zookeeper-client instance to query zk-client. +mlPath : str + managed-ledger path + +eg: +--zkServer localhost:2181 --mlPath sample/standalone/ns1/persistent/test --command print-ml +''' +def printManagedLedgerCommand(zk, mlPath): + print getManagedLedgerInfo(zk, mlPath) + + +''' +print managed-ledger cursor info for given managed-cursor path +Parameters +---------- +zk : KazooClient + Zookeeper-client instance to query zk-client. +mlPath : str + managed-ledger path +cursorName : str + managed-cursor path + +eg: +--zkServer localhost:2181 --mlPath sample/standalone/ns1/persistent/test --cursorName s1 --command print-cursor +''' +def printManagedCursorCommand(zk, mlPath, cursorName): + try: + if cursorName: + print getManagedCursorInfo(zk, mlPath)[cursorName] + else: + print 'Usage: --command {} [--cursorName]'.format(printCursorsCommands) + except Exception as e: + traceback.print_exc() + print 'No cursor found for {}/{}'.format(mlPath, cursorName) + +''' +delete specific ledgerIds from the managed-ledger info +Parameters +---------- +zk : KazooClient + Zookeeper-client instance to query zk-client. +mlPath : str + managed-ledger path +deleteLedgerIds : str + comma separated deleting ledger-ids (eg: 123,124) +eg: +--zkServer localhost:2181 --mlPath sample/standalone/ns1/persistent/test --command delete-ml-ledgers --ledgerIds 3 +''' +def deleteMLLedgerIdsCommand(zk, mlPath, deleteLedgerIds): + try: + if deleteLedgerIds: + deletLedgerIds = set(deleteLedgerIds.split(",")) + deletLedgerIdSet = set() + for id in deletLedgerIds: + deletLedgerIdSet.add(long(id)) + print deletLedgerIdSet + deleteLedgerIdsFromManagedLedgerInfo(zk, mlPath, deletLedgerIdSet) + else: + print 'Usage: --command {} [--ledgerIds]'.format(deleteMlLedgerIds) + except Exception as e: + traceback.print_exc() + print 'Failed to delete ml-ledger_ids {} due to {}'.format(mlPath, repr(e)) + +''' +Update mark-delete position of the given managed-cursor +Parameters +---------- +zk : KazooClient + Zookeeper-client instance to query zk-client. +mlPath : str + managed-ledger path +cursorName : str + managed-cursor path +markDeletePosition: str + markDeletePosition combination of : (eg. 123:1) + +eg: +--zkServer localhost:2181 --mlPath sample/standalone/ns1/persistent/test --cursorName s1 --cursorMarkDelete 0:1 --command update-mark-delete-cursor +''' +def updateMarkDeleteOfCursorCommand(zk, mlPath, cursorName, markDeletePosition): + try: + if cursorName: + if markDeletePosition: + positionPair = markDeletePosition.split(":") + if len(positionPair) == 2: + updateCursorMarkDelete(zk, mlPath + "/" + cursorName, (long(positionPair[0])), long(positionPair[1])) + else: + print "markDeletePosition must be in format :" + else: + print 'Usage: --command {} [----cursorName] [--cursorMarkDelete]'.format(updateMakDeleteCursor) + else: + print 'Usage: --command {} [--cursorName] [--cursorMarkDelete]'.format(updateMakDeleteCursor) + except Exception as e: + traceback.print_exc() + print 'Failed to update ml-cursor {}/{} due to {}'.format(mlPath, cursorName, repr(e)) + +if __name__ in '__main__': + parser = argparse.ArgumentParser() + commandHelpText = 'Managed-ledger command: \n{}, {}, {}, {}'.format(printMlCommand, deleteMlLedgerIds, printCursorsCommands, updateMakDeleteCursor) + parser.add_argument("--zkServer", "-zk", required=True, help="ZooKeeperServer:port") + parser.add_argument("--command", "-cmd", required=True, help=commandHelpText) + parser.add_argument("--mlPath", "-mlp", required=True, help="Managed-ledger path") + parser.add_argument("--ledgerIds", "-lid", required=False, help="Delete ledger ids: comma separated") + parser.add_argument("--cursorName", "-cn", required=False, help="ML-cursor name") + parser.add_argument("--cursorMarkDelete", "-cm", required=False, help="Cursor mark delete position: :") + args = parser.parse_args() + + zkSrvr = args.zkServer + command = args.command + mlPath = managedLedgerPath + args.mlPath + deleteLedgerIds = args.ledgerIds + cursorName = args.cursorName + cursorMarkDelete = args.cursorMarkDelete + + zk = KazooClient(hosts=zkSrvr) + zk.start() + + if command == printMlCommand: + printManagedLedgerCommand(zk, mlPath) + elif command == deleteMlLedgerIds: + deleteMLLedgerIdsCommand(zk, mlPath, deleteLedgerIds) + elif command == printCursorsCommands: + printManagedCursorCommand(zk, mlPath, cursorName) + elif command == updateMakDeleteCursor: + updateMarkDeleteOfCursorCommand(zk, mlPath, cursorName, cursorMarkDelete) + else: + print '{} command not found. supported command : {}'.format(command, commandHelpText) diff --git a/pulsar-client-admin/src/main/resources/mltool/__init__.py b/pulsar-client-admin/src/main/resources/mltool/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d From b237ce6b4ef366c1857bb1c7d80da27b67490dd1 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Sat, 24 Jun 2017 10:56:41 -0700 Subject: [PATCH 2/5] address code comment: support text format, taking argumental command, handling kazoo package, update mlPath-argument --- .../mltool/ManagedLedgerAdminTool.py | 105 +++++++++++++----- 1 file changed, 76 insertions(+), 29 deletions(-) diff --git a/pulsar-client-admin/src/main/resources/mltool/ManagedLedgerAdminTool.py b/pulsar-client-admin/src/main/resources/mltool/ManagedLedgerAdminTool.py index 05b8e327483e5..8194cb93ffc6e 100644 --- a/pulsar-client-admin/src/main/resources/mltool/ManagedLedgerAdminTool.py +++ b/pulsar-client-admin/src/main/resources/mltool/ManagedLedgerAdminTool.py @@ -1,22 +1,43 @@ -#!/usr/bin/env python +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# import argparse import traceback - -from kazoo.client import KazooClient - +from google.protobuf.text_format import Merge +from google.protobuf.text_format import MessageToString +import sys + from mltool import MLDataFormats_pb2 +try: + from kazoo.client import KazooClient +except Exception as missingLib: + print "You need Kazoo ZK client library. Get it from: pip install kazoo" + ''' -This util provides api to access managed-ledger data if it's ONLY stored in binary format in to zookeeper. +This util provides API to access managed-ledger data if it's ONLY stored in binary format in to zookeeper. It also provides command line tool-access to execute these commands. - -Run: run from parent directory -python -m mltool.ManagedLedgerAdminTool ''' managedLedgerPath = "/managed-ledgers/" -printMlCommand = "print-ml" -deleteMlLedgerIds = "delete-ml-ledgers" +printMlCommand = "print-managed-ledger" +deleteMlLedgerIds = "delete-managed-ledger-ids" printCursorsCommands = "print-cursor" updateMakDeleteCursor = "update-mark-delete-cursor" @@ -34,7 +55,10 @@ def getManagedLedgerInfo(zk, mlPath): # get managed-ledger info mlData = zk.get(mlPath)[0] mlInfo = MLDataFormats_pb2.ManagedLedgerInfo() - mlInfo.ParseFromString(mlData) + try: + mlInfo.ParseFromString(mlData) + except Exception as formatException: + Merge(mlData, mlInfo) return mlInfo except Exception as e: traceback.print_exc() @@ -57,14 +81,23 @@ def deleteLedgerIdsFromManagedLedgerInfo(zk, mlPath, deletLedgerIds): # get managed-ledger info mlData = zk.get(mlPath)[0] mlInfo = MLDataFormats_pb2.ManagedLedgerInfo() - mlInfo.ParseFromString(mlData) + isTextFormat = False + try: + mlInfo.ParseFromString(mlData) + except Exception as formatException: + Merge(mlData, mlInfo) + isTextFormat = True + ledgerInfoList = mlInfo.ledgerInfo for ledgerInfo in ledgerInfoList: - print ledgerInfo.ledgerId if ledgerInfo.ledgerId in deletLedgerIds: ledgerInfoList.remove(ledgerInfo) - updatedMlInfo = mlInfo.SerializeToString(); + updatedMlInfo = None + if isTextFormat: + updatedMlInfo = MessageToString(mlInfo, True) + else: + updatedMlInfo = mlInfo.SerializeToString(); zk.set(mlPath, updatedMlInfo, -1) print 'Updated {} with value\n{}'.format(mlPath, str(mlInfo)) @@ -90,7 +123,10 @@ def getManagedCursorInfo(zk, mlPath): for cursor in cursors: cursorData = zk.get(mlPath + "/" + cursor)[0] cursorInfo = MLDataFormats_pb2.ManagedCursorInfo() - cursorInfo.ParseFromString(cursorData) + try: + cursorInfo.ParseFromString(cursorData) + except Exception as formatException: + Merge(cursorData, cursorInfo) cursorList[cursor] = cursorInfo return cursorList except Exception as e: @@ -115,10 +151,19 @@ def updateCursorMarkDelete(zk, cursorPath, markDeleteLedgerId, markDeleteEntryId try: cursorData = zk.get(cursorPath)[0] cursorInfo = MLDataFormats_pb2.ManagedCursorInfo() - cursorInfo.ParseFromString(cursorData) + isTextFormat = False + try: + cursorInfo.ParseFromString(cursorData) + except Exception as formatException: + Merge(cursorData, cursorInfo) + isTextFormat = True cursorInfo.markDeleteLedgerId = markDeleteLedgerId cursorInfo.markDeleteEntryId = markDeleteEntryId - sData = cursorInfo.SerializeToString() + sData = None + if isTextFormat: + sData = MessageToString(cursorInfo, True) + else: + sData = cursorInfo.SerializeToString() zk.set(cursorPath, sData, -1) print 'Updated {} with value \n{}'.format(cursorPath, cursorInfo) except Exception as e: @@ -137,7 +182,7 @@ def updateCursorMarkDelete(zk, cursorPath, markDeleteLedgerId, markDeleteEntryId managed-ledger path eg: ---zkServer localhost:2181 --mlPath sample/standalone/ns1/persistent/test --command print-ml +print-managed-ledger --zkServer localhost:2181 --managedLedgerPath sample/standalone/ns1/persistent/test ''' def printManagedLedgerCommand(zk, mlPath): print getManagedLedgerInfo(zk, mlPath) @@ -155,7 +200,7 @@ def printManagedLedgerCommand(zk, mlPath): managed-cursor path eg: ---zkServer localhost:2181 --mlPath sample/standalone/ns1/persistent/test --cursorName s1 --command print-cursor +print-cursor --zkServer localhost:2181 --managedLedgerPath sample/standalone/ns1/persistent/test --cursorName s1 ''' def printManagedCursorCommand(zk, mlPath, cursorName): try: @@ -178,7 +223,7 @@ def printManagedCursorCommand(zk, mlPath, cursorName): deleteLedgerIds : str comma separated deleting ledger-ids (eg: 123,124) eg: ---zkServer localhost:2181 --mlPath sample/standalone/ns1/persistent/test --command delete-ml-ledgers --ledgerIds 3 +delete-managed-ledger-ids --zkServer localhost:2181 --managedLedgerPath sample/standalone/ns1/persistent/test --ledgerIds 3 ''' def deleteMLLedgerIdsCommand(zk, mlPath, deleteLedgerIds): try: @@ -187,7 +232,6 @@ def deleteMLLedgerIdsCommand(zk, mlPath, deleteLedgerIds): deletLedgerIdSet = set() for id in deletLedgerIds: deletLedgerIdSet.add(long(id)) - print deletLedgerIdSet deleteLedgerIdsFromManagedLedgerInfo(zk, mlPath, deletLedgerIdSet) else: print 'Usage: --command {} [--ledgerIds]'.format(deleteMlLedgerIds) @@ -209,7 +253,7 @@ def deleteMLLedgerIdsCommand(zk, mlPath, deleteLedgerIds): markDeletePosition combination of : (eg. 123:1) eg: ---zkServer localhost:2181 --mlPath sample/standalone/ns1/persistent/test --cursorName s1 --cursorMarkDelete 0:1 --command update-mark-delete-cursor +update-mark-delete-cursor --zkServer localhost:2181 --managedLedgerPath sample/standalone/ns1/persistent/test --cursorName s1 --cursorMarkDelete 0:1 ''' def updateMarkDeleteOfCursorCommand(zk, mlPath, cursorName, markDeletePosition): try: @@ -224,24 +268,27 @@ def updateMarkDeleteOfCursorCommand(zk, mlPath, cursorName, markDeletePosition): print 'Usage: --command {} [----cursorName] [--cursorMarkDelete]'.format(updateMakDeleteCursor) else: print 'Usage: --command {} [--cursorName] [--cursorMarkDelete]'.format(updateMakDeleteCursor) + + except Exception as e: traceback.print_exc() print 'Failed to update ml-cursor {}/{} due to {}'.format(mlPath, cursorName, repr(e)) if __name__ in '__main__': + command = sys.argv[1] + arguments = sys.argv[2:] + parser = argparse.ArgumentParser() commandHelpText = 'Managed-ledger command: \n{}, {}, {}, {}'.format(printMlCommand, deleteMlLedgerIds, printCursorsCommands, updateMakDeleteCursor) parser.add_argument("--zkServer", "-zk", required=True, help="ZooKeeperServer:port") - parser.add_argument("--command", "-cmd", required=True, help=commandHelpText) - parser.add_argument("--mlPath", "-mlp", required=True, help="Managed-ledger path") + parser.add_argument("--managedLedgerPath", "-mlp", required=True, help="Managed-ledger path") parser.add_argument("--ledgerIds", "-lid", required=False, help="Delete ledger ids: comma separated") - parser.add_argument("--cursorName", "-cn", required=False, help="ML-cursor name") + parser.add_argument("--cursorName", "-cn", required=False, help="Managed-ledger cursor name") parser.add_argument("--cursorMarkDelete", "-cm", required=False, help="Cursor mark delete position: :") - args = parser.parse_args() + args = parser.parse_args(arguments) zkSrvr = args.zkServer - command = args.command - mlPath = managedLedgerPath + args.mlPath + mlPath = managedLedgerPath + args.managedLedgerPath deleteLedgerIds = args.ledgerIds cursorName = args.cursorName cursorMarkDelete = args.cursorMarkDelete @@ -258,4 +305,4 @@ def updateMarkDeleteOfCursorCommand(zk, mlPath, cursorName, markDeletePosition): elif command == updateMakDeleteCursor: updateMarkDeleteOfCursorCommand(zk, mlPath, cursorName, cursorMarkDelete) else: - print '{} command not found. supported command : {}'.format(command, commandHelpText) + print '{} command not found. supported {}, pass command as a first argument'.format(command, commandHelpText) From 6829b9ab15cfcc6f1eaf9a98952d92f04699e8cb Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Sat, 24 Jun 2017 11:39:38 -0700 Subject: [PATCH 3/5] move ml-admin tool to bin --- .../mltool => bin/proto}/MLDataFormats_pb2.py | 0 .../mltool => bin/proto}/__init__.py | 0 .../pulsar-managed-ledger-admin.py | 66 ++++++++++--------- 3 files changed, 35 insertions(+), 31 deletions(-) rename {pulsar-client-admin/src/main/resources/mltool => bin/proto}/MLDataFormats_pb2.py (100%) rename {pulsar-client-admin/src/main/resources/mltool => bin/proto}/__init__.py (100%) rename pulsar-client-admin/src/main/resources/mltool/ManagedLedgerAdminTool.py => bin/pulsar-managed-ledger-admin.py (82%) diff --git a/pulsar-client-admin/src/main/resources/mltool/MLDataFormats_pb2.py b/bin/proto/MLDataFormats_pb2.py similarity index 100% rename from pulsar-client-admin/src/main/resources/mltool/MLDataFormats_pb2.py rename to bin/proto/MLDataFormats_pb2.py diff --git a/pulsar-client-admin/src/main/resources/mltool/__init__.py b/bin/proto/__init__.py similarity index 100% rename from pulsar-client-admin/src/main/resources/mltool/__init__.py rename to bin/proto/__init__.py diff --git a/pulsar-client-admin/src/main/resources/mltool/ManagedLedgerAdminTool.py b/bin/pulsar-managed-ledger-admin.py similarity index 82% rename from pulsar-client-admin/src/main/resources/mltool/ManagedLedgerAdminTool.py rename to bin/pulsar-managed-ledger-admin.py index 8194cb93ffc6e..8d6270277ea6a 100644 --- a/pulsar-client-admin/src/main/resources/mltool/ManagedLedgerAdminTool.py +++ b/bin/pulsar-managed-ledger-admin.py @@ -24,7 +24,7 @@ from google.protobuf.text_format import MessageToString import sys -from mltool import MLDataFormats_pb2 +from proto import MLDataFormats_pb2 try: from kazoo.client import KazooClient @@ -32,8 +32,8 @@ print "You need Kazoo ZK client library. Get it from: pip install kazoo" ''' -This util provides API to access managed-ledger data if it's ONLY stored in binary format in to zookeeper. -It also provides command line tool-access to execute these commands. +This util provides API to access managed-ledger data and also +provides command line tool-access to execute these commands. ''' managedLedgerPath = "/managed-ledgers/" printMlCommand = "print-managed-ledger" @@ -275,34 +275,38 @@ def updateMarkDeleteOfCursorCommand(zk, mlPath, cursorName, markDeletePosition): print 'Failed to update ml-cursor {}/{} due to {}'.format(mlPath, cursorName, repr(e)) if __name__ in '__main__': + + commandHelpText = 'Managed-ledger command: \n{}, {}, {}, {}'.format(printMlCommand, deleteMlLedgerIds, printCursorsCommands, updateMakDeleteCursor) + + try: command = sys.argv[1] - arguments = sys.argv[2:] - - parser = argparse.ArgumentParser() - commandHelpText = 'Managed-ledger command: \n{}, {}, {}, {}'.format(printMlCommand, deleteMlLedgerIds, printCursorsCommands, updateMakDeleteCursor) - parser.add_argument("--zkServer", "-zk", required=True, help="ZooKeeperServer:port") - parser.add_argument("--managedLedgerPath", "-mlp", required=True, help="Managed-ledger path") - parser.add_argument("--ledgerIds", "-lid", required=False, help="Delete ledger ids: comma separated") - parser.add_argument("--cursorName", "-cn", required=False, help="Managed-ledger cursor name") - parser.add_argument("--cursorMarkDelete", "-cm", required=False, help="Cursor mark delete position: :") - args = parser.parse_args(arguments) + except Exception as indexError: + print 'ERROR: Pass command as a first argument, supported {}\n\n'.format(commandHelpText) + arguments = sys.argv[2:] + parser = argparse.ArgumentParser() + parser.add_argument("--zkServer", "-zk", required=True, help="ZooKeeperServer:port") + parser.add_argument("--managedLedgerPath", "-mlp", required=True, help="Managed-ledger path") + parser.add_argument("--ledgerIds", "-lid", required=False, help="Delete ledger ids: comma separated") + parser.add_argument("--cursorName", "-cn", required=False, help="Managed-ledger cursor name") + parser.add_argument("--cursorMarkDelete", "-cm", required=False, help="Cursor mark delete position: :") + args = parser.parse_args(arguments) - zkSrvr = args.zkServer - mlPath = managedLedgerPath + args.managedLedgerPath - deleteLedgerIds = args.ledgerIds - cursorName = args.cursorName - cursorMarkDelete = args.cursorMarkDelete + zkSrvr = args.zkServer + mlPath = managedLedgerPath + args.managedLedgerPath + deleteLedgerIds = args.ledgerIds + cursorName = args.cursorName + cursorMarkDelete = args.cursorMarkDelete - zk = KazooClient(hosts=zkSrvr) - zk.start() - - if command == printMlCommand: - printManagedLedgerCommand(zk, mlPath) - elif command == deleteMlLedgerIds: - deleteMLLedgerIdsCommand(zk, mlPath, deleteLedgerIds) - elif command == printCursorsCommands: - printManagedCursorCommand(zk, mlPath, cursorName) - elif command == updateMakDeleteCursor: - updateMarkDeleteOfCursorCommand(zk, mlPath, cursorName, cursorMarkDelete) - else: - print '{} command not found. supported {}, pass command as a first argument'.format(command, commandHelpText) + zk = KazooClient(hosts=zkSrvr) + zk.start() + + if command == printMlCommand: + printManagedLedgerCommand(zk, mlPath) + elif command == deleteMlLedgerIds: + deleteMLLedgerIdsCommand(zk, mlPath, deleteLedgerIds) + elif command == printCursorsCommands: + printManagedCursorCommand(zk, mlPath, cursorName) + elif command == updateMakDeleteCursor: + updateMarkDeleteOfCursorCommand(zk, mlPath, cursorName, cursorMarkDelete) + else: + print '{} command not found. supported {}, pass command as a first argument'.format(command, commandHelpText) From 49370bd08c7ce58f77b49c9bc4587155eb37ca7b Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Sat, 24 Jun 2017 11:44:44 -0700 Subject: [PATCH 4/5] exclude license-header for proto file --- bin/proto/__init__.py | 18 ++++++++++++++++++ pom.xml | 1 + 2 files changed, 19 insertions(+) diff --git a/bin/proto/__init__.py b/bin/proto/__init__.py index e69de29bb2d1d..5fb255256df6d 100644 --- a/bin/proto/__init__.py +++ b/bin/proto/__init__.py @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# \ No newline at end of file diff --git a/pom.xml b/pom.xml index 40d02e5276cb3..7b577b7a8278c 100644 --- a/pom.xml +++ b/pom.xml @@ -551,6 +551,7 @@ flexible messaging model and an intuitive client API. **/*.txt **/*.md **/proto/*.java + bin/proto/* **/*.patch src/test/resources/** src/main/resources/** From bedab30d781c532b860ed46305beb902422fc576 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Sat, 24 Jun 2017 18:12:09 -0700 Subject: [PATCH 5/5] rename file-name and exit on import-error --- ...ar-managed-ledger-admin.py => pulsar-managed-ledger-admin} | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename bin/{pulsar-managed-ledger-admin.py => pulsar-managed-ledger-admin} (99%) mode change 100644 => 100755 diff --git a/bin/pulsar-managed-ledger-admin.py b/bin/pulsar-managed-ledger-admin old mode 100644 new mode 100755 similarity index 99% rename from bin/pulsar-managed-ledger-admin.py rename to bin/pulsar-managed-ledger-admin index 8d6270277ea6a..d56620c3bcc31 --- a/bin/pulsar-managed-ledger-admin.py +++ b/bin/pulsar-managed-ledger-admin @@ -1,4 +1,4 @@ -#!/usr/bin/env bash +#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -29,7 +29,7 @@ try: from kazoo.client import KazooClient except Exception as missingLib: - print "You need Kazoo ZK client library. Get it from: pip install kazoo" + sys.exit("You need Kazoo ZK client library. Get it from: pip install kazoo") ''' This util provides API to access managed-ledger data and also