From b40f2ae2b9ee3a29b0df76de6d547cfb0b2f5f12 Mon Sep 17 00:00:00 2001 From: Frederic Morin Date: Tue, 5 Nov 2024 17:05:55 -0500 Subject: [PATCH 1/4] protobuf: remove protobuf support and restrictive dependency --- examples/addons/contentview-custom-grpc.py | 111 -- mitmproxy/contentviews/__init__.py | 4 - mitmproxy/contentviews/grpc.py | 1123 ----------------- mitmproxy/contentviews/protobuf.py | 102 -- .../contrib/kaitaistruct/google_protobuf.py | 126 -- pyproject.toml | 1 - setup.cfg | 1 - test/examples/test_examples.py | 60 - test/mitmproxy/contentviews/test_grpc.py | 800 ------------ .../contentviews/test_grpc_data/msg1.bin | 2 - .../contentviews/test_grpc_data/msg2.bin | Bin 207 -> 0 bytes .../contentviews/test_grpc_data/msg3.bin | 13 - test/mitmproxy/contentviews/test_protobuf.py | 37 - .../test_protobuf_data/protobuf01.bin | 2 - .../test_protobuf_data/protobuf02-decoded.bin | 65 - .../test_protobuf_data/protobuf02.bin | Bin 213 -> 0 bytes .../test_protobuf_data/protobuf03-decoded.bin | 4 - .../test_protobuf_data/protobuf03.bin | 1 - 18 files changed, 2452 deletions(-) delete mode 100644 examples/addons/contentview-custom-grpc.py delete mode 100644 mitmproxy/contentviews/grpc.py delete mode 100644 mitmproxy/contentviews/protobuf.py delete mode 100644 mitmproxy/contrib/kaitaistruct/google_protobuf.py delete mode 100644 test/mitmproxy/contentviews/test_grpc.py delete mode 100644 test/mitmproxy/contentviews/test_grpc_data/msg1.bin delete mode 100644 test/mitmproxy/contentviews/test_grpc_data/msg2.bin delete mode 100644 test/mitmproxy/contentviews/test_grpc_data/msg3.bin delete mode 100644 test/mitmproxy/contentviews/test_protobuf.py delete mode 100644 test/mitmproxy/contentviews/test_protobuf_data/protobuf01.bin delete mode 100644 test/mitmproxy/contentviews/test_protobuf_data/protobuf02-decoded.bin delete mode 100644 test/mitmproxy/contentviews/test_protobuf_data/protobuf02.bin delete mode 100644 test/mitmproxy/contentviews/test_protobuf_data/protobuf03-decoded.bin delete mode 100644 test/mitmproxy/contentviews/test_protobuf_data/protobuf03.bin diff --git a/examples/addons/contentview-custom-grpc.py b/examples/addons/contentview-custom-grpc.py deleted file mode 100644 index 74ffd14b46..0000000000 --- a/examples/addons/contentview-custom-grpc.py +++ /dev/null @@ -1,111 +0,0 @@ -""" -Add a custom version of the gRPC/protobuf content view, which parses -protobuf messages based on a user defined rule set. - -""" -from mitmproxy import contentviews -from mitmproxy.addonmanager import Loader -from mitmproxy.contentviews.grpc import ProtoParser -from mitmproxy.contentviews.grpc import ViewConfig -from mitmproxy.contentviews.grpc import ViewGrpcProtobuf - -config: ViewConfig = ViewConfig() -config.parser_rules = [ - # Note: - # - # The first two ParserRules use the same flow filter, although one should reply to request messages and the other to responses. - # Even with '~s' and '~q' filter expressions, the whole flow would be matched (for '~s') or not matched at all (for '~q'), if - # the contentview displays a http.Message belonging to a flow with existing request and response. - # The rules would have to be applied on per-message-basis, instead of per-flow-basis to distinguish request and response (the - # contentview deals with a single message, either request or response, the flow filter with a flow contiaing both). - # - # Thus different ParserRule classes are used to restrict rules to requests or responses were needed: - # - # - ParserRule: applied to requests and responses - # - ParserRuleRequest: applies to requests only - # - ParserRuleResponse: applies to responses only - # - # The actual 'filter' definition in the rule, would still match the whole flow. This means '~u' expressions could - # be used, to match the URL from the request of a flow, while the ParserRuleResponse is only applied to the response. - ProtoParser.ParserRuleRequest( - name="Geo coordinate lookup request", - # note on flowfilter: for tflow the port gets appended to the URL's host part - filter="example\\.com.*/ReverseGeocode", - field_definitions=[ - ProtoParser.ParserFieldDefinition(tag="1", name="position"), - ProtoParser.ParserFieldDefinition( - tag="1.1", - name="latitude", - intended_decoding=ProtoParser.DecodedTypes.double, - ), - ProtoParser.ParserFieldDefinition( - tag="1.2", - name="longitude", - intended_decoding=ProtoParser.DecodedTypes.double, - ), - ProtoParser.ParserFieldDefinition(tag="3", name="country"), - ProtoParser.ParserFieldDefinition(tag="7", name="app"), - ], - ), - ProtoParser.ParserRuleResponse( - name="Geo coordinate lookup response", - # note on flowfilter: for tflow the port gets appended to the URL's host part - filter="example\\.com.*/ReverseGeocode", - field_definitions=[ - ProtoParser.ParserFieldDefinition(tag="1.2", name="address"), - ProtoParser.ParserFieldDefinition(tag="1.3", name="address array element"), - ProtoParser.ParserFieldDefinition( - tag="1.3.1", - name="unknown bytes", - intended_decoding=ProtoParser.DecodedTypes.bytes, - ), - ProtoParser.ParserFieldDefinition(tag="1.3.2", name="element value long"), - ProtoParser.ParserFieldDefinition(tag="1.3.3", name="element value short"), - ProtoParser.ParserFieldDefinition( - tag="", - tag_prefixes=["1.5.1", "1.5.3", "1.5.4", "1.5.5", "1.5.6"], - name="position", - ), - ProtoParser.ParserFieldDefinition( - tag=".1", - tag_prefixes=["1.5.1", "1.5.3", "1.5.4", "1.5.5", "1.5.6"], - name="latitude", - intended_decoding=ProtoParser.DecodedTypes.double, - ), - ProtoParser.ParserFieldDefinition( - tag=".2", - tag_prefixes=["1.5.1", "1.5.3", "1.5.4", "1.5.5", "1.5.6"], - name="longitude", - intended_decoding=ProtoParser.DecodedTypes.double, - ), - ProtoParser.ParserFieldDefinition(tag="7", name="app"), - ], - ), -] - - -class ViewGrpcWithRules(ViewGrpcProtobuf): - name = "customized gRPC/protobuf" - - def __init__(self) -> None: - super().__init__(config=config) - - def __call__(self, *args, **kwargs) -> contentviews.TViewResult: - heading, lines = super().__call__(*args, **kwargs) - return heading + " (addon with custom rules)", lines - - def render_priority(self, *args, **kwargs) -> float: - # increase priority above default gRPC view - s_prio = super().render_priority(*args, **kwargs) - return s_prio + 1 if s_prio > 0 else s_prio - - -view = ViewGrpcWithRules() - - -def load(loader: Loader): - contentviews.add(view) - - -def done(): - contentviews.remove(view) diff --git a/mitmproxy/contentviews/__init__.py b/mitmproxy/contentviews/__init__.py index ec2313751f..5ca6a75e32 100644 --- a/mitmproxy/contentviews/__init__.py +++ b/mitmproxy/contentviews/__init__.py @@ -20,7 +20,6 @@ from . import css from . import dns from . import graphql -from . import grpc from . import hex from . import http3 from . import image @@ -29,7 +28,6 @@ from . import mqtt from . import msgpack from . import multipart -from . import protobuf from . import query from . import raw from . import urlencoded @@ -234,9 +232,7 @@ def get_content_view( add(multipart.ViewMultipart()) add(image.ViewImage()) add(query.ViewQuery()) -add(protobuf.ViewProtobuf()) add(msgpack.ViewMsgPack()) -add(grpc.ViewGrpcProtobuf()) add(mqtt.ViewMQTT()) add(http3.ViewHttp3()) add(dns.ViewDns()) diff --git a/mitmproxy/contentviews/grpc.py b/mitmproxy/contentviews/grpc.py deleted file mode 100644 index 4c0307b315..0000000000 --- a/mitmproxy/contentviews/grpc.py +++ /dev/null @@ -1,1123 +0,0 @@ -from __future__ import annotations - -import logging -import struct -from collections.abc import Generator -from collections.abc import Iterable -from collections.abc import Iterator -from dataclasses import dataclass -from dataclasses import field -from enum import Enum - -from mitmproxy import contentviews -from mitmproxy import flow -from mitmproxy import flowfilter -from mitmproxy import http -from mitmproxy.contentviews import base -from mitmproxy.net.encoding import decode - - -class ProtoParser: - @dataclass - class ParserRule: - """ - A parser rule lists Field definitions which are applied if the filter rule matches the flow. - - Matching on flow-level also means, a match applies to request AND response messages. - To restrict a rule to a requests only use 'ParserRuleRequest', instead. - To restrict a rule to a responses only use 'ParserRuleResponse', instead. - """ - - field_definitions: list[ProtoParser.ParserFieldDefinition] - """List of field definitions for this rule """ - - name: str = "" - """Name of this rule, only used for debugging""" - - filter: str = "" - """ - Flowfilter to select which flows to apply to ('~q' and '~s' can not be used to distinguish - if the rule should apply to the request or response of a flow. To do so, use ParserRuleRequest - or ParserRuleResponse. ParserRule always applies to request and response.) - """ - - @dataclass - class ParserRuleResponse(ParserRule): - """ - A parser rule lists Field definitions which are applied if the filter rule matches the flow. - - The rule only applies if the processed message is a server response. - """ - - @dataclass - class ParserRuleRequest(ParserRule): - """ - A parser rule lists Field definitions which are applied if the filter rule matches the flow. - - The rule only applies if the processed message is a client request. - """ - - @dataclass - class ParserFieldDefinition: - """ - Defines how to parse a field (or multiple fields with the same tag) in a protobuf messages. - - This allows to apply an intended decoding (f.e. decode uint64 as double instead) and to assign - a descriptive name to a field. Field definitions are aggregated into rules, which also holds - a filter to match selected HTTP messages. - - The most natural way to use this, is to describe known parts of a single protobuf message - in a set of field descriptors, pack them into a rule and set the filter of the rule in a way, - that it only applies to proper protobuf messages (f.e. to request traffic against an API endpoint - matched by an URL flowfilter) - """ - - # A 'tag' could be considered as "absolute path" to match a unique field, yet - # protobuf allows to uses the same nested message in different positions of the parent message - # The 'tag_prefixes' parameter allows to apply the field definition to different "leafs nodes" - # of a message. - # - # Example 1: match a single, absolute tag - # ---------- - # tag = '1.2' - # tag_prefixes = [] (default) - # - # applies to: tag '1.2' - # - # Example 2: match multiple tags with same ending - # ---------- - # tag = '1.3' - # tag_prefixes = ['1.2.', '2.5.'] - # - # applies to: tag '1.2.1.3' and tag '2.5.1.3' - # does not apply to: '1.3', unless tag_prefixes is extended to tag_prefixes = ['1.2', '2.5', ''] - # - # Example 3: match multiple tags - # ---------- - # tag = '' - # tag_prefixes = ['1.2', '2.5'] - # - # applies to: tag '1.2' and tag '1.5' - - tag: str - """Field tag for which this description applies (including flattened tag path, f.e. '1.2.2.4')""" - - tag_prefixes: list[str] = field(default_factory=list) - """List of prefixes for tag matching (f.e. tag_prefixes=['1.2.', '2.2.'] with tag='1' matches '1.2.1' and '2.2.1')""" - - intended_decoding: ProtoParser.DecodedTypes | None = None - """optional: intended decoding for visualization (parser fails over to alternate decoding if not possible)""" - - name: str | None = None - """optional: intended field for visualization (parser fails over to alternate decoding if not possible)""" - - as_packed: bool | None = False - """optional: if set to true, the field is considered to be repeated and packed""" - - @dataclass - class ParserOptions: - # output should contain wiretype of fields - include_wiretype: bool = False - - # output should contain the fields which describe nested messages - # (the nested messages bodies are always included, but the "header fields" could - # add unnecessary output overhead) - exclude_message_headers: bool = False - - # optional: rules - # rules: List[ProtoParser.ParserRule] = field(default_factory=list) - - class DecodedTypes(Enum): - # varint - int32 = 0 - int64 = 1 - uint32 = 2 - uint64 = 3 - sint32 = 4 # ZigZag encoding - sint64 = 5 # ZigZag encoding - bool = 6 - enum = 7 - # bit_32 - fixed32 = 8 - sfixed32 = 9 - float = 10 - # bit_64 - fixed64 = 11 - sfixed64 = 12 - double = 13 - # len_delimited - string = 14 - bytes = 15 - message = 16 - - # helper - unknown = 17 - - @staticmethod - def _read_base128le(data: bytes) -> tuple[int, int]: - res = 0 - offset = 0 - while offset < len(data): - o = data[offset] - res += (o & 0x7F) << (7 * offset) - offset += 1 - if o < 0x80: - # the Kaitai parser for protobuf support base128 le values up - # to 8 groups (bytes). Due to the nature of the encoding, each - # group attributes 7bit to the resulting value, which give - # a 56 bit value at maximum. - # The values which get encoded into protobuf variable length integers, - # on the other hand, include full 64bit types (int64, uint64, sint64). - # This means, the Kaitai encoder can not cover the full range of - # possible values - # - # This decoder puts no limitation on the maximum value of variable - # length integers. Values exceeding 64bit have to be handled externally - return offset, res - raise ValueError("varint exceeds bounds of provided data") - - @staticmethod - def _read_u32(data: bytes) -> tuple[int, int]: - return 4, struct.unpack(" tuple[int, int]: - return 8, struct.unpack(" list[ProtoParser.Field]: - res: list[ProtoParser.Field] = [] - pos = 0 - while pos < len(wire_data): - # read field key (tag and wire_type) - offset, key = ProtoParser._read_base128le(wire_data[pos:]) - # casting raises exception for invalid WireTypes - wt = ProtoParser.WireTypes(key & 7) - tag = key >> 3 - pos += offset - - val: bytes | int - preferred_decoding: ProtoParser.DecodedTypes - if wt == ProtoParser.WireTypes.varint: - offset, val = ProtoParser._read_base128le(wire_data[pos:]) - pos += offset - bl = val.bit_length() - if bl > 64: - preferred_decoding = ProtoParser.DecodedTypes.unknown - if bl > 32: - preferred_decoding = ProtoParser.DecodedTypes.uint64 - else: - preferred_decoding = ProtoParser.DecodedTypes.uint32 - elif wt == ProtoParser.WireTypes.bit_64: - offset, val = ProtoParser._read_u64(wire_data[pos:]) - pos += offset - preferred_decoding = ProtoParser.DecodedTypes.fixed64 - elif wt == ProtoParser.WireTypes.len_delimited: - offset, length = ProtoParser._read_base128le(wire_data[pos:]) - pos += offset - if length > len(wire_data[pos:]): - raise ValueError("length delimited field exceeds data size") - val = wire_data[pos : pos + length] - pos += length - preferred_decoding = ProtoParser.DecodedTypes.message - elif ( - wt == ProtoParser.WireTypes.group_start - or wt == ProtoParser.WireTypes.group_end - ): - raise ValueError(f"deprecated field: {wt}") - elif wt == ProtoParser.WireTypes.bit_32: - offset, val = ProtoParser._read_u32(wire_data[pos:]) - pos += offset - preferred_decoding = ProtoParser.DecodedTypes.fixed32 - else: - # not reachable as if-else statements contain all possible WireTypes - # wrong types raise Exception during typecasting in `wt = ProtoParser.WireTypes((key & 7))` - raise ValueError("invalid WireType for protobuf messsage field") - - field = ProtoParser.Field( - wire_type=wt, - preferred_decoding=preferred_decoding, - options=options, - rules=rules, - tag=tag, - wire_value=val, - parent_field=parent_field, - ) - res.append(field) - - return res - - @staticmethod - def read_packed_fields( - packed_field: ProtoParser.Field, - ) -> list[ProtoParser.Field]: - if not isinstance(packed_field.wire_value, bytes): - raise ValueError( - f"can not unpack field with data other than bytes: {type(packed_field.wire_value)}" - ) - wire_data: bytes = packed_field.wire_value - tag: int = packed_field.tag - options: ProtoParser.ParserOptions = packed_field.options - rules: list[ProtoParser.ParserRule] = packed_field.rules - intended_decoding: ProtoParser.DecodedTypes = packed_field.preferred_decoding - - # the packed field has to have WireType length delimited, whereas the contained - # individual types have to have a different WireType, which is derived from - # the intended decoding - if ( - packed_field.wire_type != ProtoParser.WireTypes.len_delimited - or not isinstance(packed_field.wire_value, bytes) - ): - raise ValueError( - "packed fields have to be embedded in a length delimited message" - ) - # wiretype to read has to be determined from intended decoding - packed_wire_type: ProtoParser.WireTypes - if ( - intended_decoding == ProtoParser.DecodedTypes.int32 - or intended_decoding == ProtoParser.DecodedTypes.int64 - or intended_decoding == ProtoParser.DecodedTypes.uint32 - or intended_decoding == ProtoParser.DecodedTypes.uint64 - or intended_decoding == ProtoParser.DecodedTypes.sint32 - or intended_decoding == ProtoParser.DecodedTypes.sint64 - or intended_decoding == ProtoParser.DecodedTypes.bool - or intended_decoding == ProtoParser.DecodedTypes.enum - ): - packed_wire_type = ProtoParser.WireTypes.varint - elif ( - intended_decoding == ProtoParser.DecodedTypes.fixed32 - or intended_decoding == ProtoParser.DecodedTypes.sfixed32 - or intended_decoding == ProtoParser.DecodedTypes.float - ): - packed_wire_type = ProtoParser.WireTypes.bit_32 - elif ( - intended_decoding == ProtoParser.DecodedTypes.fixed64 - or intended_decoding == ProtoParser.DecodedTypes.sfixed64 - or intended_decoding == ProtoParser.DecodedTypes.double - ): - packed_wire_type = ProtoParser.WireTypes.bit_64 - elif ( - intended_decoding == ProtoParser.DecodedTypes.string - or intended_decoding == ProtoParser.DecodedTypes.bytes - or intended_decoding == ProtoParser.DecodedTypes.message - ): - packed_wire_type = ProtoParser.WireTypes.len_delimited - else: - # should never happen, no test - raise TypeError( - "Wire type could not be determined from packed decoding type" - ) - - res: list[ProtoParser.Field] = [] - pos = 0 - val: bytes | int - if packed_wire_type == ProtoParser.WireTypes.varint: - while pos < len(wire_data): - offset, val = ProtoParser._read_base128le(wire_data[pos:]) - pos += offset - res.append( - ProtoParser.Field( - options=options, - preferred_decoding=intended_decoding, - rules=rules, - tag=tag, - wire_type=packed_wire_type, - wire_value=val, - parent_field=packed_field.parent_field, - is_unpacked_children=True, - ) - ) - elif packed_wire_type == ProtoParser.WireTypes.bit_64: - if len(wire_data) % 8 != 0: - raise ValueError("can not parse as packed bit64") - while pos < len(wire_data): - offset, val = ProtoParser._read_u64(wire_data[pos:]) - pos += offset - res.append( - ProtoParser.Field( - options=options, - preferred_decoding=intended_decoding, - rules=rules, - tag=tag, - wire_type=packed_wire_type, - wire_value=val, - parent_field=packed_field.parent_field, - is_unpacked_children=True, - ) - ) - elif packed_wire_type == ProtoParser.WireTypes.len_delimited: - while pos < len(wire_data): - offset, length = ProtoParser._read_base128le(wire_data[pos:]) - pos += offset - val = wire_data[pos : pos + length] - if length > len(wire_data[pos:]): - raise ValueError("packed length delimited field exceeds data size") - res.append( - ProtoParser.Field( - options=options, - preferred_decoding=intended_decoding, - rules=rules, - tag=tag, - wire_type=packed_wire_type, - wire_value=val, - parent_field=packed_field.parent_field, - is_unpacked_children=True, - ) - ) - pos += length - elif ( - packed_wire_type == ProtoParser.WireTypes.group_start - or packed_wire_type == ProtoParser.WireTypes.group_end - ): - raise ValueError("group tags can not be encoded packed") - elif packed_wire_type == ProtoParser.WireTypes.bit_32: - if len(wire_data) % 4 != 0: - raise ValueError("can not parse as packed bit32") - while pos < len(wire_data): - offset, val = ProtoParser._read_u32(wire_data[pos:]) - pos += offset - res.append( - ProtoParser.Field( - options=options, - preferred_decoding=intended_decoding, - rules=rules, - tag=tag, - wire_type=packed_wire_type, - wire_value=val, - parent_field=packed_field.parent_field, - is_unpacked_children=True, - ) - ) - else: - # should never happen - raise ValueError("invalid WireType for protobuf messsage field") - - # mark parent field as packed parent (if we got here, unpacking succeeded) - packed_field.is_packed_parent = True - return res - - class Field: - """ - Represents a single field of a protobuf message and handles the varios encodings. - - As mitmproxy sees the data passing by as raw protobuf message, it only knows the - WireTypes. Each of the WireTypes could represent different Protobuf field types. - The exact Protobuf field type can not be determined from the wire format, thus different - options for decoding have to be supported. - In addition the parsed WireTypes are (intermediary) stored in Python types, which adds - some additional overhead type conversions. - - WireType represented Protobuf Types Python type (intermediary) - - 0: varint int32, int64, uint32, uint64, enum, int (*) - sint32, sint64 (both ZigZag encoded), int - bool bool - float (**) - - 1: bit_64 fixed64, sfixed64, int (*) - double float - - 2: len_delimited string, str - message, class 'Message' - bytes, bytes (*) - packed_repeated_field class 'Message' (fields with same tag) - - 3: group_start unused (deprecated) - - 4: group_end unused (deprecated) - - - 5: bit_32 fixed32, sfixed32, int (*) - float float - - (*) Note 1: Conversion between WireType and intermediary python representation - is handled by Kaitai protobuf decoder and always uses the python - representation marked with (*). Converting to alternative representations - is handled inside this class. - (**) Note 2: Varint is not used to represent floating point values, but some applications - store native floats in uint32 protobuf types (or native double in uint64). - Thus we allow conversion of varint to floating point values for convenience - (A well known APIs "hide" GPS latitude and longitude values in varint types, - much easier to spot such things when rendered as float) - - Ref: - https://developers.google.com/protocol-buffers/docs/proto3 - - https://developers.google.com/protocol-buffers/docs/encoding - """ - - def __init__( - self, - wire_type: ProtoParser.WireTypes, - preferred_decoding: ProtoParser.DecodedTypes, - tag: int, - parent_field: ProtoParser.Field | None, - wire_value: int | bytes, - options: ProtoParser.ParserOptions, - rules: list[ProtoParser.ParserRule], - is_unpacked_children: bool = False, - ) -> None: - self.wire_type: ProtoParser.WireTypes = wire_type - self.preferred_decoding: ProtoParser.DecodedTypes = preferred_decoding - self.wire_value: int | bytes = wire_value - self.tag: int = tag - self.options: ProtoParser.ParserOptions = options - self.name: str = "" - self.rules: list[ProtoParser.ParserRule] = rules - self.parent_field: ProtoParser.Field | None = parent_field - self.is_unpacked_children: bool = ( - is_unpacked_children # marks field as being a result of unpacking - ) - self.is_packed_parent: bool = ( - False # marks field as being parent of successfully unpacked children - ) - self.parent_tags: list[int] = [] - if self.parent_field is not None: - self.parent_tags = self.parent_field.parent_tags[:] - self.parent_tags.append(self.parent_field.tag) - self.try_unpack = False - - # rules can overwrite self.try_unpack - self.apply_rules() - # do not unpack fields which are the result of unpacking - if parent_field is not None and self.is_unpacked_children: - self.try_unpack = False - - # no tests for only_first_hit=False, as not user-changable - def apply_rules(self, only_first_hit=True): - tag_str = self._gen_tag_str() - name = None - decoding = None - as_packed = False - try: - for rule in self.rules: - for fd in rule.field_definitions: - match = False - if len(fd.tag_prefixes) == 0 and fd.tag == tag_str: - match = True - else: - for rt in fd.tag_prefixes: - if rt + fd.tag == tag_str: - match = True - break - if match: - if only_first_hit: - # only first match - if fd.name is not None: - self.name = fd.name - if fd.intended_decoding is not None: - self.preferred_decoding = fd.intended_decoding - self.try_unpack = bool(fd.as_packed) - return - else: - # overwrite matches till last rule was inspected - # (f.e. allows to define name in one rule and intended_decoding in another one) - name = fd.name if fd.name else name - decoding = ( - fd.intended_decoding - if fd.intended_decoding - else decoding - ) - if fd.as_packed: - as_packed = True - - if name: - self.name = name - if decoding: - self.preferred_decoding = decoding - self.try_unpack = as_packed - except Exception as e: - logging.warning(e) - - def _gen_tag_str(self): - tags = self.parent_tags[:] - tags.append(self.tag) - return ".".join([str(tag) for tag in tags]) - - def safe_decode_as( - self, - intended_decoding: ProtoParser.DecodedTypes, - try_as_packed: bool = False, - ) -> tuple[ - ProtoParser.DecodedTypes, - bool | float | int | bytes | str | list[ProtoParser.Field], - ]: - """ - Tries to decode as intended, applies failover, if not possible - - Returns selected decoding and decoded value - """ - if self.wire_type == ProtoParser.WireTypes.varint: - try: - return intended_decoding, self.decode_as( - intended_decoding, try_as_packed - ) - except Exception: - if int(self.wire_value).bit_length() > 32: - # ignore the fact that varint could exceed 64bit (would violate the specs) - return ProtoParser.DecodedTypes.uint64, self.wire_value - else: - return ProtoParser.DecodedTypes.uint32, self.wire_value - elif self.wire_type == ProtoParser.WireTypes.bit_64: - try: - return intended_decoding, self.decode_as( - intended_decoding, try_as_packed - ) - except Exception: - return ProtoParser.DecodedTypes.fixed64, self.wire_value - elif self.wire_type == ProtoParser.WireTypes.bit_32: - try: - return intended_decoding, self.decode_as( - intended_decoding, try_as_packed - ) - except Exception: - return ProtoParser.DecodedTypes.fixed32, self.wire_value - elif self.wire_type == ProtoParser.WireTypes.len_delimited: - try: - return intended_decoding, self.decode_as( - intended_decoding, try_as_packed - ) - except Exception: - # failover strategy: message --> string (valid UTF-8) --> bytes - len_delimited_strategy: list[ProtoParser.DecodedTypes] = [ - ProtoParser.DecodedTypes.message, - ProtoParser.DecodedTypes.string, - ProtoParser.DecodedTypes.bytes, # should always work - ] - for failover_decoding in len_delimited_strategy: - if failover_decoding == intended_decoding and not try_as_packed: - # don't try same decoding twice, unless first attempt was packed - continue - try: - return failover_decoding, self.decode_as( - failover_decoding, False - ) - except Exception: - pass - - # we should never get here (could not be added to tests) - return ProtoParser.DecodedTypes.unknown, self.wire_value - - def decode_as( - self, intended_decoding: ProtoParser.DecodedTypes, as_packed: bool = False - ) -> bool | int | float | bytes | str | list[ProtoParser.Field]: - if as_packed is True: - return ProtoParser.read_packed_fields(packed_field=self) - - if self.wire_type == ProtoParser.WireTypes.varint: - assert isinstance(self.wire_value, int) - if intended_decoding == ProtoParser.DecodedTypes.bool: - # clamp result to 64bit - return self.wire_value & 0xFFFFFFFFFFFFFFFF != 0 - elif intended_decoding == ProtoParser.DecodedTypes.int32: - if self.wire_value.bit_length() > 32: - raise TypeError("wire value too large for int32") - return struct.unpack("!i", struct.pack("!I", self.wire_value))[0] - elif intended_decoding == ProtoParser.DecodedTypes.int64: - if self.wire_value.bit_length() > 64: - raise TypeError("wire value too large for int64") - return struct.unpack("!q", struct.pack("!Q", self.wire_value))[0] - elif intended_decoding == ProtoParser.DecodedTypes.uint32: - if self.wire_value.bit_length() > 32: - raise TypeError("wire value too large for uint32") - return self.wire_value # already 'int' which was parsed as unsigned - elif ( - intended_decoding == ProtoParser.DecodedTypes.uint64 - or intended_decoding == ProtoParser.DecodedTypes.enum - ): - if self.wire_value.bit_length() > 64: - raise TypeError("wire value too large") - return self.wire_value # already 'int' which was parsed as unsigned - elif intended_decoding == ProtoParser.DecodedTypes.sint32: - if self.wire_value.bit_length() > 32: - raise TypeError("wire value too large for sint32") - return (self.wire_value >> 1) ^ -( - self.wire_value & 1 - ) # zigzag_decode - elif intended_decoding == ProtoParser.DecodedTypes.sint64: - if self.wire_value.bit_length() > 64: - raise TypeError("wire value too large for sint64") - # ZigZag decode - # Ref: https://gist.github.com/mfuerstenau/ba870a29e16536fdbaba - return (self.wire_value >> 1) ^ -(self.wire_value & 1) - elif ( - intended_decoding == ProtoParser.DecodedTypes.float - or intended_decoding == ProtoParser.DecodedTypes.double - ): - # special case, not complying to protobuf specs - return self._wire_value_as_float() - elif self.wire_type == ProtoParser.WireTypes.bit_64: - if intended_decoding == ProtoParser.DecodedTypes.fixed64: - return self.wire_value - elif intended_decoding == ProtoParser.DecodedTypes.sfixed64: - return struct.unpack("!q", struct.pack("!Q", self.wire_value))[0] - elif intended_decoding == ProtoParser.DecodedTypes.double: - return self._wire_value_as_float() - elif self.wire_type == ProtoParser.WireTypes.bit_32: - if intended_decoding == ProtoParser.DecodedTypes.fixed32: - return self.wire_value - elif intended_decoding == ProtoParser.DecodedTypes.sfixed32: - return struct.unpack("!i", struct.pack("!I", self.wire_value))[0] - elif intended_decoding == ProtoParser.DecodedTypes.float: - return self._wire_value_as_float() - elif self.wire_type == ProtoParser.WireTypes.len_delimited: - assert isinstance(self.wire_value, bytes) - if intended_decoding == ProtoParser.DecodedTypes.string: - # According to specs, a protobuf string HAS TO be UTF-8 parsable - # throw exception on invalid UTF-8 chars, but escape linebreaks - return self.wire_value_as_utf8(escape_newline=True) - elif intended_decoding == ProtoParser.DecodedTypes.bytes: - # always works, assure to hand back a copy - return self.wire_value[:] - elif intended_decoding == ProtoParser.DecodedTypes.message: - return ProtoParser.read_fields( - wire_data=self.wire_value, - parent_field=self, - options=self.options, - rules=self.rules, - ) - - # if here, there is no valid decoding - raise TypeError("intended decoding mismatches wire type") - - def encode_from(inputval, intended_encoding: ProtoParser.DecodedTypes): - raise NotImplementedError( - "Future work, needed to manipulate and re-encode protobuf message, with respect to given wire types" - ) - - def _wire_value_as_float(self) -> float: - """ - Handles double (64bit) and float (32bit). - Assumes Network Byte Order (big endian). - - Usable for: - - WireType --> Protobuf Type): - ---------------------------- - varint --> double/float (not intended by ProtoBuf, but used in the wild) - bit_32 --> float - bit_64 --> double - len_delimited --> 4 bytes: float / 8 bytes: double / other sizes return NaN - """ - v = self._value_as_bytes() - if len(v) == 4: - return struct.unpack("!f", v)[0] - elif len(v) == 8: - return struct.unpack("!d", v)[0] - # no need to raise an Exception - raise TypeError("can not be converted to floatingpoint representation") - - def _value_as_bytes(self) -> bytes: - if isinstance(self.wire_value, bytes): - return self.wire_value - elif isinstance(self.wire_value, int): - if self.wire_value.bit_length() > 64: - # source for a python int are wiretypes varint/bit_32/bit64 and should never convert to int values 64bit - # currently avoided by kaitai decoder (can not be added to tests) - raise ValueError("value exceeds 64bit, violating protobuf specs") - elif self.wire_value.bit_length() > 32: - # packing uses network byte order (to assure consistent results across architectures) - return struct.pack("!Q", self.wire_value) - else: - # packing uses network byte order (to assure consistent results across architectures) - return struct.pack("!I", self.wire_value) - else: - # should never happen, no tests - raise ValueError("can not be converted to bytes") - - def _wire_type_str(self): - return str(self.wire_type).split(".")[-1] - - def _decoding_str(self, decoding: ProtoParser.DecodedTypes): - return str(decoding).split(".")[-1] - - def wire_value_as_utf8(self, escape_newline=True) -> str: - if isinstance(self.wire_value, bytes): - res = self.wire_value.decode("utf-8") - return res.replace("\n", "\\n") if escape_newline else res - return str(self.wire_value) - - def gen_flat_decoded_field_dicts(self) -> Generator[dict, None, None]: - """ - Returns a generator which passes the field as a dict. - - In order to return the field value it gets decoded (based on a failover strategy and - provided ParserRules). - If the field holds a nested message, the fields contained in the message are appended. - Ultimately this flattens all fields recursively. - """ - selected_decoding, decoded_val = self.safe_decode_as( - self.preferred_decoding, self.try_unpack - ) - field_desc_dict = { - "tag": self._gen_tag_str(), - "wireType": self._wire_type_str(), - "decoding": self._decoding_str(selected_decoding), - "name": self.name, - } - if isinstance(decoded_val, list): - if ( - selected_decoding - == ProtoParser.DecodedTypes.message # field is a message with subfields - and not self.is_packed_parent # field is a message, but replaced by packed fields - ): - # Field is a message, not packed, thus include it as message header - field_desc_dict["val"] = "" - yield field_desc_dict - # add sub-fields of messages or packed fields - for f in decoded_val: - yield from f.gen_flat_decoded_field_dicts() - else: - field_desc_dict["val"] = decoded_val - yield field_desc_dict - - def __init__( - self, - data: bytes, - rules: list[ProtoParser.ParserRule] | None = None, - parser_options: ParserOptions | None = None, - ) -> None: - self.data: bytes = data - if parser_options is None: - parser_options = ProtoParser.ParserOptions() - self.options = parser_options - if rules is None: - rules = [] - self.rules = rules - - try: - self.root_fields: list[ProtoParser.Field] = ProtoParser.read_fields( - wire_data=self.data, - options=self.options, - parent_field=None, - rules=self.rules, - ) - except Exception as e: - raise ValueError("not a valid protobuf message") from e - - def gen_flat_decoded_field_dicts(self) -> Generator[dict, None, None]: - for f in self.root_fields: - yield from f.gen_flat_decoded_field_dicts() - - def gen_str_rows(self) -> Generator[tuple[str, ...], None, None]: - for field_dict in self.gen_flat_decoded_field_dicts(): - if ( - self.options.exclude_message_headers - and field_dict["decoding"] == "message" - ): - continue - - if self.options.include_wiretype: - col1 = "[{}->{}]".format(field_dict["wireType"], field_dict["decoding"]) - else: - col1 = "[{}]".format(field_dict["decoding"]) - col2 = field_dict["name"] # empty string if not set (consumes no space) - col3 = field_dict["tag"] - col4 = str(field_dict["val"]) - yield col1, col2, col3, col4 - - -# Note: all content view formating functionality is kept out of the ProtoParser class, to -# allow it to be use independently. -# This function is generic enough, to consider moving it to mitmproxy.contentviews.base -def format_table( - table_rows: Iterable[tuple[str, ...]], - max_col_width=100, -) -> Iterator[base.TViewLine]: - """ - Helper function to render tables with variable column count (move to contentview base, if needed elsewhere) - - Note: The function has to convert generators to a list, as all rows have to be processed twice (to determine - the column widths first). - """ - rows: list[tuple[str, ...]] = [] - col_count = 0 - cols_width: list[int] = [] - for row in table_rows: - col_count = max(col_count, len(row)) - while len(cols_width) < col_count: - cols_width.append(0) - for col_num in range(len(row)): - cols_width[col_num] = max(len(row[col_num]), cols_width[col_num]) - - # store row in list - rows.append(row) - - for i in range(len(cols_width)): - cols_width[i] = min(cols_width[i], max_col_width) - - for row in rows: - line: base.TViewLine = [] - for col_num in range(len(row)): - col_val = row[col_num].ljust(cols_width[col_num] + 2) - line.append(("text", col_val)) - yield line - - -def parse_grpc_messages( - data, compression_scheme -) -> Generator[tuple[bool, bytes], None, None]: - """Generator iterates over body data and returns a boolean indicating if the messages - was compressed, along with the raw message data (decompressed) for each gRPC message - contained in the body data""" - while data: - try: - msg_is_compressed, length = struct.unpack("!?i", data[:5]) - decoded_message = struct.unpack("!%is" % length, data[5 : 5 + length])[0] - except Exception as e: - raise ValueError("invalid gRPC message") from e - - if msg_is_compressed: - try: - decoded_message = decode( - encoded=decoded_message, encoding=compression_scheme - ) - except Exception as e: - raise ValueError("Failed to decompress gRPC message with gzip") from e - - yield msg_is_compressed, decoded_message - data = data[5 + length :] - - -# hacky fix for mitmproxy issue: -# -# mitmproxy handles Exceptions in the contenview's __call__ function, by -# failing over to 'Raw' view. The intention was to use this behavior to -# pass up Exceptions thrown inside the generator function ('format_pbuf' -# and 'format_grpc') to the __call__ function. -# This usually works fine if the contentview is initialized on a flow -# with invalid data. -# When the flow data gets invalidated in the edit mode, mitmproxy re-calls -# the generator functions outside the contentviews '__call__' method. -# -# This happens in the 'safe_to_print' function of 'mitmproxy/contentvies/__init__.py' -# -# def safe_to_print(lines, encoding="utf8"): -# """ -# Wraps a content generator so that each text portion is a *safe to print* unicode string. -# """ -# for line in lines: # <------ this code re-iterates lines and thus calls generators, without using the views __call__ function -# clean_line = [] -# for (style, text) in line: -# if isinstance(text, bytes): -# text = text.decode(encoding, "replace") -# text = strutils.escape_control_characters(text) -# clean_line.append((style, text)) -# yield clean_line -# -# In result, mitmproxy crashes if the generator functions raise Exception to indicate -# data parsing errors. -# To deal with this, the generator function gets converted into a list inside the -# __call__ function. Ultimately, exceptions are raised directly from within __call__ -# instead in cases where the generator is accessed externally without exception handling. -def hack_generator_to_list(generator_func): - return list(generator_func) - - -def format_pbuf( - message: bytes, - parser_options: ProtoParser.ParserOptions, - rules: list[ProtoParser.ParserRule], -): - yield from format_table( - ProtoParser( - data=message, parser_options=parser_options, rules=rules - ).gen_str_rows() - ) - - -def format_grpc( - data: bytes, - parser_options: ProtoParser.ParserOptions, - rules: list[ProtoParser.ParserRule], - compression_scheme="gzip", -): - message_count = 0 - for compressed, pb_message in parse_grpc_messages( - data=data, compression_scheme=compression_scheme - ): - headline = ( - "gRPC message " - + str(message_count) - + " (compressed " - + str(compression_scheme if compressed else compressed) - + ")" - ) - - yield [("text", headline)] - yield from format_pbuf( - message=pb_message, parser_options=parser_options, rules=rules - ) - - -@dataclass -class ViewConfig: - parser_options: ProtoParser.ParserOptions = field( - default_factory=ProtoParser.ParserOptions - ) - parser_rules: list[ProtoParser.ParserRule] = field(default_factory=list) - - -class ViewGrpcProtobuf(base.View): - """Human friendly view of protocol buffers""" - - name = "gRPC/Protocol Buffer" - __content_types_pb = [ - "application/x-protobuf", - "application/x-protobuffer", - "application/grpc-proto", - ] - __content_types_grpc = [ - "application/grpc", - # seems specific to chromium infra tooling - # https://chromium.googlesource.com/infra/luci/luci-go/+/refs/heads/main/grpc/prpc/ - "application/prpc", - ] - - # first value serves as default algorithm for compressed messages, if 'grpc-encoding' header is missing - __valid_grpc_encodings = [ - "gzip", - "identity", - "deflate", - "zstd", - ] - - # allows to take external ParserOptions object. goes with defaults otherwise - def __init__(self, config: ViewConfig | None = None) -> None: - super().__init__() - if config is None: - config = ViewConfig() - self.config = config - - def _matching_rules( - self, - rules: list[ProtoParser.ParserRule], - message: http.Message | None, - flow: flow.Flow | None, - ) -> list[ProtoParser.ParserRule]: - """ - Checks which of the give rules applies and returns a List only containing those rules - - Each rule defines a flow filter in rule.filter which is usually matched against a flow. - When it comes to protobuf parsing, in most cases request messages differ from response messages. - Thus, it has to be possible to apply a rule to a http.Request or a http.Response, only. - - As the name flowfilter suggests, filters are working on a flow-level, not on message-level. - This means: - - - the filter expression '~q' matches all flows with a request, but no response - - the filter expression '~s' matches all flows with a response - - In result, for complete flows (with a gRPC message in the request and the response), ParserRules would - either be applied to request and response at the same time ('~s') or neither would match request, nor - response (~q). - - To distinguish between rules which should be applied to response messages, request messages or both - (while being applied to the whole flow), different classes with same behavior are used to wrap rules: - - - ParserRule: applies to requests and responses - - ParserRuleRequest: applies to requests only - - ParserRuleResponse: applies to responses only - """ - res: list[ProtoParser.ParserRule] = [] - if not flow: - return res - is_request = isinstance(message, http.Request) - for rule in rules: - # message based rule matching - if is_request and isinstance(rule, ProtoParser.ParserRuleResponse): - continue - elif not is_request and isinstance(rule, ProtoParser.ParserRuleRequest): - continue - # flow based rule matching - if flowfilter.match(rule.filter, flow=flow): - res.append(rule) - return res - - def __call__( - self, - data: bytes, - *, - content_type: str | None = None, - flow: flow.Flow | None = None, - http_message: http.Message | None = None, - **unknown_metadata, - ) -> contentviews.TViewResult: - applicabble_rules = self._matching_rules( - rules=self.config.parser_rules, flow=flow, message=http_message - ) - if content_type in self.__content_types_grpc: - # If gRPC messages are flagged to be compressed, the compression algorithm is expressed in the - # 'grpc-encoding' header. - # - # The following code tries to determine the compression algorithm base on this header. - # If the header is not present or contains an unsupported compression, the logic falls back to - # 'gzip'. - # - # If a compressed gRPC message is found in the body data (compressed flag set), the information - # on the compression scheme is needed (even if not set by a header), in order to process the message. - # Thus we assure there is always an encoding selected. An encoding of 'Identity' would not make - # sense, if a message is flagged as being compressed, that's why a default is chosen. - try: - assert http_message is not None - h = http_message.headers["grpc-encoding"] - grpc_encoding = ( - h - if h in self.__valid_grpc_encodings - else self.__valid_grpc_encodings[0] - ) - except Exception: - grpc_encoding = self.__valid_grpc_encodings[0] - - text_iter = format_grpc( - data=data, - parser_options=self.config.parser_options, - compression_scheme=grpc_encoding, - rules=applicabble_rules, - ) - title = "gRPC" - else: - text_iter = format_pbuf( - message=data, - parser_options=self.config.parser_options, - rules=applicabble_rules, - ) - title = "Protobuf (flattened)" - - # hacky bugfix, see description above generator functions format_pbuf/format_grpc - try: - text_iter = hack_generator_to_list(text_iter) - except Exception as e: - # hook to log exception tracebacks on iterators - - # import traceback - # logging.warning("gRPC contentview: {}".format(traceback.format_exc())) - raise e - - return title, text_iter - - def render_priority( - self, - data: bytes, - *, - content_type: str | None = None, - flow: flow.Flow | None = None, - http_message: http.Message | None = None, - **unknown_metadata, - ) -> float: - if bool(data) and content_type in self.__content_types_grpc: - return 1 - if bool(data) and content_type in self.__content_types_pb: - # replace existing protobuf renderer preference (adjust by option) - return 1.5 - else: - return 0 diff --git a/mitmproxy/contentviews/protobuf.py b/mitmproxy/contentviews/protobuf.py deleted file mode 100644 index 7447d3384c..0000000000 --- a/mitmproxy/contentviews/protobuf.py +++ /dev/null @@ -1,102 +0,0 @@ -import io - -from kaitaistruct import KaitaiStream - -from . import base -from mitmproxy.contrib.kaitaistruct import google_protobuf - - -def write_buf(out, field_tag, body, indent_level): - if body is not None: - out.write( - "{: <{level}}{}: {}\n".format( - "", - field_tag, - body if isinstance(body, int) else str(body, "utf-8"), - level=indent_level, - ) - ) - elif field_tag is not None: - out.write(" " * indent_level + str(field_tag) + " {\n") - else: - out.write(" " * indent_level + "}\n") - - -def _parse_proto(raw: bytes) -> list[google_protobuf.GoogleProtobuf.Pair]: - """Parse a bytestring into protobuf pairs and make sure that all pairs have a valid wire type.""" - buf = google_protobuf.GoogleProtobuf(KaitaiStream(io.BytesIO(raw))) - for pair in buf.pairs: - if not isinstance( - pair.wire_type, google_protobuf.GoogleProtobuf.Pair.WireTypes - ): - raise ValueError("Not a protobuf.") - return buf.pairs - - -def format_pbuf(raw): - out = io.StringIO() - stack = [] - - try: - pairs = _parse_proto(raw) - except Exception: - return False - stack.extend([(pair, 0) for pair in pairs[::-1]]) - - while len(stack): - pair, indent_level = stack.pop() - - if pair.wire_type == pair.WireTypes.group_start: - body = None - elif pair.wire_type == pair.WireTypes.group_end: - body = None - pair._m_field_tag = None - elif pair.wire_type == pair.WireTypes.len_delimited: - body = pair.value.body - elif pair.wire_type == pair.WireTypes.varint: - body = pair.value.value - else: - body = pair.value - - try: - pairs = _parse_proto(body) # type: ignore - stack.extend([(pair, indent_level + 2) for pair in pairs[::-1]]) - write_buf(out, pair.field_tag, None, indent_level) - except Exception: - write_buf(out, pair.field_tag, body, indent_level) - - if stack: - prev_level = stack[-1][1] - else: - prev_level = 0 - - if prev_level < indent_level: - levels = int((indent_level - prev_level) / 2) - for i in range(1, levels + 1): - write_buf(out, None, None, indent_level - i * 2) - - return out.getvalue() - - -class ViewProtobuf(base.View): - """Human friendly view of protocol buffers - The view uses the protoc compiler to decode the binary - """ - - name = "Protocol Buffer" - __content_types = [ - "application/x-protobuf", - "application/x-protobuffer", - ] - - def __call__(self, data, **metadata): - decoded = format_pbuf(data) - if not decoded: - raise ValueError("Failed to parse input.") - - return "Protobuf", base.format_text(decoded) - - def render_priority( - self, data: bytes, *, content_type: str | None = None, **metadata - ) -> float: - return float(bool(data) and content_type in self.__content_types) diff --git a/mitmproxy/contrib/kaitaistruct/google_protobuf.py b/mitmproxy/contrib/kaitaistruct/google_protobuf.py deleted file mode 100644 index 48f5e0ec9b..0000000000 --- a/mitmproxy/contrib/kaitaistruct/google_protobuf.py +++ /dev/null @@ -1,126 +0,0 @@ -# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild - -import kaitaistruct -from kaitaistruct import KaitaiStream, KaitaiStruct -from enum import Enum - - -if getattr(kaitaistruct, 'API_VERSION', (0, 9)) < (0, 9): - raise Exception("Incompatible Kaitai Struct Python API: 0.9 or later is required, but you have %s" % (kaitaistruct.__version__)) - -from . import vlq_base128_le -class GoogleProtobuf(KaitaiStruct): - """Google Protocol Buffers (AKA protobuf) is a popular data - serialization scheme used for communication protocols, data storage, - etc. There are implementations are available for almost every - popular language. The focus points of this scheme are brevity (data - is encoded in a very size-efficient manner) and extensibility (one - can add keys to the structure, while keeping it readable in previous - version of software). - - Protobuf uses semi-self-describing encoding scheme for its - messages. It means that it is possible to parse overall structure of - the message (skipping over fields one can't understand), but to - fully understand the message, one needs a protocol definition file - (`.proto`). To be specific: - - * "Keys" in key-value pairs provided in the message are identified - only with an integer "field tag". `.proto` file provides info on - which symbolic field names these field tags map to. - * "Keys" also provide something called "wire type". It's not a data - type in its common sense (i.e. you can't, for example, distinguish - `sint32` vs `uint32` vs some enum, or `string` from `bytes`), but - it's enough information to determine how many bytes to - parse. Interpretation of the value should be done according to the - type specified in `.proto` file. - * There's no direct information on which fields are optional / - required, which fields may be repeated or constitute a map, what - restrictions are placed on fields usage in a single message, what - are the fields' default values, etc, etc. - - .. seealso:: - Source - https://developers.google.com/protocol-buffers/docs/encoding - """ - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.pairs = [] - i = 0 - while not self._io.is_eof(): - self.pairs.append(GoogleProtobuf.Pair(self._io, self, self._root)) - i += 1 - - - class Pair(KaitaiStruct): - """Key-value pair.""" - - class WireTypes(Enum): - varint = 0 - bit_64 = 1 - len_delimited = 2 - group_start = 3 - group_end = 4 - bit_32 = 5 - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.key = vlq_base128_le.VlqBase128Le(self._io) - _on = self.wire_type - if _on == GoogleProtobuf.Pair.WireTypes.varint: - self.value = vlq_base128_le.VlqBase128Le(self._io) - elif _on == GoogleProtobuf.Pair.WireTypes.len_delimited: - self.value = GoogleProtobuf.DelimitedBytes(self._io, self, self._root) - elif _on == GoogleProtobuf.Pair.WireTypes.bit_64: - self.value = self._io.read_u8le() - elif _on == GoogleProtobuf.Pair.WireTypes.bit_32: - self.value = self._io.read_u4le() - - @property - def wire_type(self): - """"Wire type" is a part of the "key" that carries enough - information to parse value from the wire, i.e. read correct - amount of bytes, but there's not enough informaton to - interprete in unambiguously. For example, one can't clearly - distinguish 64-bit fixed-sized integers from 64-bit floats, - signed zigzag-encoded varints from regular unsigned varints, - arbitrary bytes from UTF-8 encoded strings, etc. - """ - if hasattr(self, '_m_wire_type'): - return self._m_wire_type - - self._m_wire_type = KaitaiStream.resolve_enum(GoogleProtobuf.Pair.WireTypes, (self.key.value & 7)) - return getattr(self, '_m_wire_type', None) - - @property - def field_tag(self): - """Identifies a field of protocol. One can look up symbolic - field name in a `.proto` file by this field tag. - """ - if hasattr(self, '_m_field_tag'): - return self._m_field_tag - - self._m_field_tag = (self.key.value >> 3) - return getattr(self, '_m_field_tag', None) - - - class DelimitedBytes(KaitaiStruct): - def __init__(self, _io, _parent=None, _root=None): - self._io = _io - self._parent = _parent - self._root = _root if _root else self - self._read() - - def _read(self): - self.len = vlq_base128_le.VlqBase128Le(self._io) - self.body = self._io.read_bytes(self.len.value) - - - diff --git a/pyproject.toml b/pyproject.toml index 19b952e3dd..10db5df8bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,7 +45,6 @@ dependencies = [ "mitmproxy_rs>=0.4,<0.5", "msgpack>=1.0.0, <1.1.0", "passlib>=1.6.5, <1.8", - "protobuf>=3.14,<5", "pydivert>=2.0.3,<2.2; sys_platform == 'win32'", "pyOpenSSL>=22.1,<23.4", "pyparsing>=2.4.2,<3.2", diff --git a/setup.cfg b/setup.cfg index 8cd2a7ab80..1766053bd6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -8,7 +8,6 @@ exclude = mitmproxy/addons/onboarding.py mitmproxy/connections.py mitmproxy/contentviews/base.py - mitmproxy/contentviews/grpc.py mitmproxy/ctx.py mitmproxy/exceptions.py mitmproxy/flow.py diff --git a/test/examples/test_examples.py b/test/examples/test_examples.py index 1cf0bd304c..7140da255a 100644 --- a/test/examples/test_examples.py +++ b/test/examples/test_examples.py @@ -20,66 +20,6 @@ def test_custom_contentviews(self, tdata): _, fmt = swapcase(b"Test!") assert any(b"tEST!" in val[0][1] for val in fmt) - def test_custom_grpc_contentview(self, tdata): - with taddons.context() as tctx: - tctx.script(tdata.path("../examples/addons/contentview-custom-grpc.py")) - v = contentviews.get("customized gRPC/protobuf") - - p = tdata.path("mitmproxy/contentviews/test_grpc_data/msg1.bin") - with open(p, "rb") as f: - raw = f.read() - - sim_msg_req = tutils.treq( - port=443, host="example.com", path="/ReverseGeocode" - ) - - sim_msg_resp = tutils.tresp() - - sim_flow = tflow.tflow(req=sim_msg_req, resp=sim_msg_resp) - - view_text, output = v( - raw, flow=sim_flow, http_message=sim_flow.request - ) # simulate request message - assert view_text == "Protobuf (flattened) (addon with custom rules)" - output = list(output) # assure list conversion if generator - assert output == [ - [ - ("text", "[message] "), - ("text", "position "), - ("text", "1 "), - ("text", " "), - ], - [ - ("text", "[double] "), - ("text", "latitude "), - ("text", "1.1 "), - ("text", "38.89816675798073 "), - ], - [ - ("text", "[double] "), - ("text", "longitude "), - ("text", "1.2 "), - ("text", "-77.03829828366696 "), - ], - [ - ("text", "[string] "), - ("text", "country "), - ("text", "3 "), - ("text", "de_DE "), - ], - [ - ("text", "[uint32] "), - ("text", " "), - ("text", "6 "), - ("text", "1 "), - ], - [ - ("text", "[string] "), - ("text", "app "), - ("text", "7 "), - ("text", "de.mcdonalds.mcdonaldsinfoapp "), - ], - ] def test_modify_form(self, tdata): with taddons.context() as tctx: diff --git a/test/mitmproxy/contentviews/test_grpc.py b/test/mitmproxy/contentviews/test_grpc.py deleted file mode 100644 index 6a88035260..0000000000 --- a/test/mitmproxy/contentviews/test_grpc.py +++ /dev/null @@ -1,800 +0,0 @@ -import struct - -import pytest - -from . import full_eval -from mitmproxy.contentviews import grpc -from mitmproxy.contentviews.grpc import parse_grpc_messages -from mitmproxy.contentviews.grpc import ProtoParser -from mitmproxy.contentviews.grpc import ViewConfig -from mitmproxy.contentviews.grpc import ViewGrpcProtobuf -from mitmproxy.net.encoding import encode -from mitmproxy.test import tflow -from mitmproxy.test import tutils - -datadir = "mitmproxy/contentviews/test_grpc_data/" - - -def helper_pack_grpc_message(data: bytes, compress=False, encoding="gzip") -> bytes: - if compress: - data = encode(data, encoding) - header = struct.pack("!?i", compress, len(data)) - return header + data - - -# fmt: off -custom_parser_rules = [ - ProtoParser.ParserRuleRequest( - name = "Geo coordinate lookup request", - # note on flowfilter: for tflow the port gets appended to the URL's host part - filter = "example\\.com.*/ReverseGeocode", - field_definitions=[ - ProtoParser.ParserFieldDefinition(tag="1", name="position"), - ProtoParser.ParserFieldDefinition(tag="1.1", name="latitude", intended_decoding=ProtoParser.DecodedTypes.double), - ProtoParser.ParserFieldDefinition(tag="1.2", name="longitude", intended_decoding=ProtoParser.DecodedTypes.double), - ProtoParser.ParserFieldDefinition(tag="3", name="country"), - ProtoParser.ParserFieldDefinition(tag="7", name="app"), - ] - ), - ProtoParser.ParserRuleResponse( - name = "Geo coordinate lookup response", - # note on flowfilter: for tflow the port gets appended to the URL's host part - filter = "example\\.com.*/ReverseGeocode", - field_definitions=[ - ProtoParser.ParserFieldDefinition(tag="1.2", name="address"), - ProtoParser.ParserFieldDefinition(tag="1.3", name="address array element"), - ProtoParser.ParserFieldDefinition(tag="1.3.1", name="unknown bytes", intended_decoding=ProtoParser.DecodedTypes.bytes), - ProtoParser.ParserFieldDefinition(tag="1.3.2", name="element value long"), - ProtoParser.ParserFieldDefinition(tag="1.3.3", name="element value short"), - ProtoParser.ParserFieldDefinition(tag="", tag_prefixes=["1.5.1", "1.5.3", "1.5.4", "1.5.5", "1.5.6"], name="position"), - ProtoParser.ParserFieldDefinition(tag=".1", tag_prefixes=["1.5.1", "1.5.3", "1.5.4", "1.5.5", "1.5.6"], name="latitude", intended_decoding=ProtoParser.DecodedTypes.double), # noqa: E501 - ProtoParser.ParserFieldDefinition(tag=".2", tag_prefixes=["1.5.1", "1.5.3", "1.5.4", "1.5.5", "1.5.6"], name="longitude", intended_decoding=ProtoParser.DecodedTypes.double), # noqa: E501 - ProtoParser.ParserFieldDefinition(tag="7", name="app"), - ] - ), -] - -custom_view_config = ViewConfig( - parser_options=ProtoParser.ParserOptions(exclude_message_headers=True, include_wiretype=True) -) - -custom_view_config_parser_rules = ViewConfig( - parser_rules=custom_parser_rules -) - -sim_msg_req = tutils.treq( - port=443, - host="example.com", - path="/ReverseGeocode" -) -sim_msg_req.headers["grpc-encoding"] = "gzip" -sim_msg_resp = tutils.tresp() - -sim_flow = tflow.tflow( - req=sim_msg_req, - resp=sim_msg_resp -) - - -def test_view_protobuf(tdata): - v = full_eval(ViewGrpcProtobuf()) - p = tdata.path(datadir + "msg1.bin") - - with open(p, "rb") as f: - raw = f.read() - view_text, output = v(raw) - assert view_text == "Protobuf (flattened)" - output = list(output) # assure list conversion if generator - assert output == [ - [('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')], - [('text', '[fixed64] '), ('text', ' '), ('text', '1.1 '), ('text', '4630671247600644312 ')], - [('text', '[fixed64] '), ('text', ' '), ('text', '1.2 '), ('text', '13858493542095451628 ')], - [('text', '[string] '), ('text', ' '), ('text', '3 '), ('text', 'de_DE ')], - [('text', '[uint32] '), ('text', ' '), ('text', '6 '), ('text', '1 ')], - [('text', '[string] '), ('text', ' '), ('text', '7 '), ('text', 'de.mcdonalds.mcdonaldsinfoapp ')] - ] - with pytest.raises(ValueError, match='not a valid protobuf message'): - v(b'foobar') - - -def test_view_protobuf_custom_parsing_request(tdata): - v = full_eval(ViewGrpcProtobuf(custom_view_config_parser_rules)) - p = tdata.path(datadir + "msg1.bin") - with open(p, "rb") as f: - raw = f.read() - view_text, output = v(raw, flow=sim_flow, http_message=sim_flow.request) # simulate request message - assert view_text == "Protobuf (flattened)" - output = list(output) # assure list conversion if generator - assert output == [ - [('text', '[message] '), ('text', 'position '), ('text', '1 '), ('text', ' ')], - [('text', '[double] '), ('text', 'latitude '), ('text', '1.1 '), ('text', '38.89816675798073 ')], - [('text', '[double] '), ('text', 'longitude '), ('text', '1.2 '), ('text', '-77.03829828366696 ')], - [('text', '[string] '), ('text', 'country '), ('text', '3 '), ('text', 'de_DE ')], - [('text', '[uint32] '), ('text', ' '), ('text', '6 '), ('text', '1 ')], - [('text', '[string] '), ('text', 'app '), ('text', '7 '), ('text', 'de.mcdonalds.mcdonaldsinfoapp ')] - ] - - -def test_view_protobuf_custom_parsing_response(tdata): - # expect to parse 1.3.2 and 1.3.3 as string automatically - # even if there is a length delimeted field containing `b"DC"`, which would translate to - # two deprecated fields [8: group_start, 8: group_end] (and thus represent a valid nested message, - # but containing deprecated wire types) - custom_view_config_parser_rules.parser_rules[1].field_definitions[3].intended_decoding = None - custom_view_config_parser_rules.parser_rules[1].field_definitions[4].intended_decoding = None - - v = full_eval(ViewGrpcProtobuf(custom_view_config_parser_rules)) - p = tdata.path(datadir + "msg3.bin") - - with open(p, "rb") as f: - raw = f.read() - view_text, output = v(raw, flow=sim_flow, http_message=sim_flow.response) # simulate response message - assert view_text == "Protobuf (flattened)" - output = list(output) # assure list conversion if generator - assert output == [ - [('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')], # noqa: E501 - [('text', '[string] '), ('text', ' '), ('text', '1.1 '), ('text', '\x15 ')], # noqa: E501 - [('text', '[string] '), ('text', 'address '), ('text', '1.2 '), ('text', '1650 Pennsylvania Avenue NW, Washington, DC 20502, USA ')], # noqa: E501 - [('text', '[message] '), ('text', 'address array element '), ('text', '1.3 '), ('text', ' ')], # noqa: E501 - [('text', '[bytes] '), ('text', 'unknown bytes '), ('text', '1.3.1 '), ('text', 'b\'"\' ')], # noqa: E501 - [('text', '[string] '), ('text', 'element value long '), ('text', '1.3.2 '), ('text', '1650 ')], # noqa: E501 - [('text', '[string] '), ('text', 'element value short '), ('text', '1.3.3 '), ('text', '1650 ')], # noqa: E501 - [('text', '[message] '), ('text', 'address array element '), ('text', '1.3 '), ('text', ' ')], # noqa: E501 - [('text', '[bytes] '), ('text', 'unknown bytes '), ('text', '1.3.1 '), ('text', "b'\\x02' ")], # noqa: E501 - [('text', '[string] '), ('text', 'element value long '), ('text', '1.3.2 '), ('text', 'Pennsylvania Avenue Northwest ')], # noqa: E501 - [('text', '[string] '), ('text', 'element value short '), ('text', '1.3.3 '), ('text', 'Pennsylvania Avenue NW ')], # noqa: E501 - [('text', '[message] '), ('text', 'address array element '), ('text', '1.3 '), ('text', ' ')], # noqa: E501 - [('text', '[bytes] '), ('text', 'unknown bytes '), ('text', '1.3.1 '), ('text', "b'\\x14\\x04' ")], # noqa: E501 - [('text', '[string] '), ('text', 'element value long '), ('text', '1.3.2 '), ('text', 'Northwest Washington ')], # noqa: E501 - [('text', '[string] '), ('text', 'element value short '), ('text', '1.3.3 '), ('text', 'Northwest Washington ')], # noqa: E501 - [('text', '[message] '), ('text', 'address array element '), ('text', '1.3 '), ('text', ' ')], # noqa: E501 - [('text', '[bytes] '), ('text', 'unknown bytes '), ('text', '1.3.1 '), ('text', "b'\\x0c\\x04' ")], # noqa: E501 - [('text', '[string] '), ('text', 'element value long '), ('text', '1.3.2 '), ('text', 'Washington ')], # noqa: E501 - [('text', '[string] '), ('text', 'element value short '), ('text', '1.3.3 '), ('text', 'Washington ')], # noqa: E501 - [('text', '[message] '), ('text', 'address array element '), ('text', '1.3 '), ('text', ' ')], # noqa: E501 - [('text', '[bytes] '), ('text', 'unknown bytes '), ('text', '1.3.1 '), ('text', "b'\\x06\\x04' ")], # noqa: E501 - [('text', '[string] '), ('text', 'element value long '), ('text', '1.3.2 '), ('text', 'District of Columbia ')], # noqa: E501 - [('text', '[string] '), ('text', 'element value short '), ('text', '1.3.3 '), ('text', 'DC ')], # noqa: E501 - [('text', '[message] '), ('text', 'address array element '), ('text', '1.3 '), ('text', ' ')], # noqa: E501 - [('text', '[bytes] '), ('text', 'unknown bytes '), ('text', '1.3.1 '), ('text', "b'\\x05\\x04' ")], # noqa: E501 - [('text', '[string] '), ('text', 'element value long '), ('text', '1.3.2 '), ('text', 'USA ')], # noqa: E501 - [('text', '[string] '), ('text', 'element value short '), ('text', '1.3.3 '), ('text', 'US ')], # noqa: E501 - [('text', '[message] '), ('text', 'address array element '), ('text', '1.3 '), ('text', ' ')], # noqa: E501 - [('text', '[bytes] '), ('text', 'unknown bytes '), ('text', '1.3.1 '), ('text', "b'\\x17' ")], # noqa: E501 - [('text', '[string] '), ('text', 'element value long '), ('text', '1.3.2 '), ('text', '20502 ')], # noqa: E501 - [('text', '[string] '), ('text', 'element value short '), ('text', '1.3.3 '), ('text', '20502 ')], # noqa: E501 - [('text', '[message] '), ('text', ' '), ('text', '1.5 '), ('text', ' ')], # noqa: E501 - [('text', '[message] '), ('text', 'position '), ('text', '1.5.1 '), ('text', ' ')], # noqa: E501 - [('text', '[double] '), ('text', 'latitude '), ('text', '1.5.1.1 '), ('text', '38.8970309 ')], # noqa: E501 - [('text', '[double] '), ('text', 'longitude '), ('text', '1.5.1.2 '), ('text', '-77.03872559999999 ')], # noqa: E501 - [('text', '[uint32] '), ('text', ' '), ('text', '1.5.2 '), ('text', '1 ')], # noqa: E501 - [('text', '[message] '), ('text', 'position '), ('text', '1.5.3 '), ('text', ' ')], # noqa: E501 - [('text', '[double] '), ('text', 'latitude '), ('text', '1.5.3.1 '), ('text', '38.8962271697085 ')], # noqa: E501 - [('text', '[double] '), ('text', 'longitude '), ('text', '1.5.3.2 '), ('text', '-77.0400511802915 ')], # noqa: E501 - [('text', '[message] '), ('text', 'position '), ('text', '1.5.4 '), ('text', ' ')], # noqa: E501 - [('text', '[double] '), ('text', 'latitude '), ('text', '1.5.4.1 '), ('text', '38.8989251302915 ')], # noqa: E501 - [('text', '[double] '), ('text', 'longitude '), ('text', '1.5.4.2 '), ('text', '-77.03735321970849 ')], # noqa: E501 - [('text', '[message] '), ('text', 'position '), ('text', '1.5.5 '), ('text', ' ')], # noqa: E501 - [('text', '[double] '), ('text', 'latitude '), ('text', '1.5.5.1 '), ('text', '38.896898 ')], # noqa: E501 - [('text', '[double] '), ('text', 'longitude '), ('text', '1.5.5.2 '), ('text', '-77.03917229999999 ')], # noqa: E501 - [('text', '[message] '), ('text', 'position '), ('text', '1.5.6 '), ('text', ' ')], # noqa: E501 - [('text', '[double] '), ('text', 'latitude '), ('text', '1.5.6.1 '), ('text', '38.8982543 ')], # noqa: E501 - [('text', '[double] '), ('text', 'longitude '), ('text', '1.5.6.2 '), ('text', '-77.0382321 ')], # noqa: E501 - [('text', '[string] '), ('text', ' '), ('text', '1.7 '), ('text', 'ChIJAXiAory3t4kRpkrvas9dYmQ ')], # noqa: E501 - [('text', '[message] '), ('text', ' '), ('text', '2 '), ('text', ' ')], # noqa: E501 - [('text', '[uint32] '), ('text', ' '), ('text', '2.1 '), ('text', '21 ')], # noqa: E501 - ] - - -def test_view_protobuf_custom_parsing_response2(tdata): - # try to parse 1.3.2 and 1.3.3 as string - custom_view_config_parser_rules.parser_rules[1].field_definitions[3].intended_decoding = ProtoParser.DecodedTypes.string # 1.3.2 - custom_view_config_parser_rules.parser_rules[1].field_definitions[4].intended_decoding = ProtoParser.DecodedTypes.string # 1.3.3 - - v = full_eval(ViewGrpcProtobuf(custom_view_config_parser_rules)) - p = tdata.path(datadir + "msg3.bin") - - with open(p, "rb") as f: - raw = f.read() - view_text, output = v(raw, flow=sim_flow, http_message=sim_flow.response) # simulate response message - assert view_text == "Protobuf (flattened)" - output = list(output) # assure list conversion if generator - assert output == [ - [('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')], # noqa: E501 - [('text', '[string] '), ('text', ' '), ('text', '1.1 '), ('text', '\x15 ')], # noqa: E501 - [('text', '[string] '), ('text', 'address '), ('text', '1.2 '), ('text', '1650 Pennsylvania Avenue NW, Washington, DC 20502, USA ')], # noqa: E501 - [('text', '[message] '), ('text', 'address array element '), ('text', '1.3 '), ('text', ' ')], # noqa: E501 - [('text', '[bytes] '), ('text', 'unknown bytes '), ('text', '1.3.1 '), ('text', 'b\'"\' ')], # noqa: E501 - [('text', '[string] '), ('text', 'element value long '), ('text', '1.3.2 '), ('text', '1650 ')], # noqa: E501 - [('text', '[string] '), ('text', 'element value short '), ('text', '1.3.3 '), ('text', '1650 ')], # noqa: E501 - [('text', '[message] '), ('text', 'address array element '), ('text', '1.3 '), ('text', ' ')], # noqa: E501 - [('text', '[bytes] '), ('text', 'unknown bytes '), ('text', '1.3.1 '), ('text', "b'\\x02' ")], # noqa: E501 - [('text', '[string] '), ('text', 'element value long '), ('text', '1.3.2 '), ('text', 'Pennsylvania Avenue Northwest ')], # noqa: E501 - [('text', '[string] '), ('text', 'element value short '), ('text', '1.3.3 '), ('text', 'Pennsylvania Avenue NW ')], # noqa: E501 - [('text', '[message] '), ('text', 'address array element '), ('text', '1.3 '), ('text', ' ')], # noqa: E501 - [('text', '[bytes] '), ('text', 'unknown bytes '), ('text', '1.3.1 '), ('text', "b'\\x14\\x04' ")], # noqa: E501 - [('text', '[string] '), ('text', 'element value long '), ('text', '1.3.2 '), ('text', 'Northwest Washington ')], # noqa: E501 - [('text', '[string] '), ('text', 'element value short '), ('text', '1.3.3 '), ('text', 'Northwest Washington ')], # noqa: E501 - [('text', '[message] '), ('text', 'address array element '), ('text', '1.3 '), ('text', ' ')], # noqa: E501 - [('text', '[bytes] '), ('text', 'unknown bytes '), ('text', '1.3.1 '), ('text', "b'\\x0c\\x04' ")], # noqa: E501 - [('text', '[string] '), ('text', 'element value long '), ('text', '1.3.2 '), ('text', 'Washington ')], # noqa: E501 - [('text', '[string] '), ('text', 'element value short '), ('text', '1.3.3 '), ('text', 'Washington ')], # noqa: E501 - [('text', '[message] '), ('text', 'address array element '), ('text', '1.3 '), ('text', ' ')], # noqa: E501 - [('text', '[bytes] '), ('text', 'unknown bytes '), ('text', '1.3.1 '), ('text', "b'\\x06\\x04' ")], # noqa: E501 - [('text', '[string] '), ('text', 'element value long '), ('text', '1.3.2 '), ('text', 'District of Columbia ')], # noqa: E501 - [('text', '[string] '), ('text', 'element value short '), ('text', '1.3.3 '), ('text', 'DC ')], # noqa: E501 - [('text', '[message] '), ('text', 'address array element '), ('text', '1.3 '), ('text', ' ')], # noqa: E501 - [('text', '[bytes] '), ('text', 'unknown bytes '), ('text', '1.3.1 '), ('text', "b'\\x05\\x04' ")], # noqa: E501 - [('text', '[string] '), ('text', 'element value long '), ('text', '1.3.2 '), ('text', 'USA ')], # noqa: E501 - [('text', '[string] '), ('text', 'element value short '), ('text', '1.3.3 '), ('text', 'US ')], # noqa: E501 - [('text', '[message] '), ('text', 'address array element '), ('text', '1.3 '), ('text', ' ')], # noqa: E501 - [('text', '[bytes] '), ('text', 'unknown bytes '), ('text', '1.3.1 '), ('text', "b'\\x17' ")], # noqa: E501 - [('text', '[string] '), ('text', 'element value long '), ('text', '1.3.2 '), ('text', '20502 ')], # noqa: E501 - [('text', '[string] '), ('text', 'element value short '), ('text', '1.3.3 '), ('text', '20502 ')], # noqa: E501 - [('text', '[message] '), ('text', ' '), ('text', '1.5 '), ('text', ' ')], # noqa: E501 - [('text', '[message] '), ('text', 'position '), ('text', '1.5.1 '), ('text', ' ')], # noqa: E501 - [('text', '[double] '), ('text', 'latitude '), ('text', '1.5.1.1 '), ('text', '38.8970309 ')], # noqa: E501 - [('text', '[double] '), ('text', 'longitude '), ('text', '1.5.1.2 '), ('text', '-77.03872559999999 ')], # noqa: E501 - [('text', '[uint32] '), ('text', ' '), ('text', '1.5.2 '), ('text', '1 ')], # noqa: E501 - [('text', '[message] '), ('text', 'position '), ('text', '1.5.3 '), ('text', ' ')], # noqa: E501 - [('text', '[double] '), ('text', 'latitude '), ('text', '1.5.3.1 '), ('text', '38.8962271697085 ')], # noqa: E501 - [('text', '[double] '), ('text', 'longitude '), ('text', '1.5.3.2 '), ('text', '-77.0400511802915 ')], # noqa: E501 - [('text', '[message] '), ('text', 'position '), ('text', '1.5.4 '), ('text', ' ')], # noqa: E501 - [('text', '[double] '), ('text', 'latitude '), ('text', '1.5.4.1 '), ('text', '38.8989251302915 ')], # noqa: E501 - [('text', '[double] '), ('text', 'longitude '), ('text', '1.5.4.2 '), ('text', '-77.03735321970849 ')], # noqa: E501 - [('text', '[message] '), ('text', 'position '), ('text', '1.5.5 '), ('text', ' ')], # noqa: E501 - [('text', '[double] '), ('text', 'latitude '), ('text', '1.5.5.1 '), ('text', '38.896898 ')], # noqa: E501 - [('text', '[double] '), ('text', 'longitude '), ('text', '1.5.5.2 '), ('text', '-77.03917229999999 ')], # noqa: E501 - [('text', '[message] '), ('text', 'position '), ('text', '1.5.6 '), ('text', ' ')], # noqa: E501 - [('text', '[double] '), ('text', 'latitude '), ('text', '1.5.6.1 '), ('text', '38.8982543 ')], # noqa: E501 - [('text', '[double] '), ('text', 'longitude '), ('text', '1.5.6.2 '), ('text', '-77.0382321 ')], # noqa: E501 - [('text', '[string] '), ('text', ' '), ('text', '1.7 '), ('text', 'ChIJAXiAory3t4kRpkrvas9dYmQ ')], # noqa: E501 - [('text', '[message] '), ('text', ' '), ('text', '2 '), ('text', ' ')], # noqa: E501 - [('text', '[uint32] '), ('text', ' '), ('text', '2.1 '), ('text', '21 ')], # noqa: E501 - ] - - -def test_view_protobuf_custom_config(tdata): - v = full_eval(ViewGrpcProtobuf(custom_view_config)) - p = tdata.path(datadir + "msg1.bin") - - with open(p, "rb") as f: - raw = f.read() - view_text, output = v(raw) - assert view_text == "Protobuf (flattened)" - output = list(output) # assure list conversion if generator - assert output == [ - [('text', '[bit_64->fixed64] '), ('text', ' '), ('text', '1.1 '), ('text', '4630671247600644312 ')], - [('text', '[bit_64->fixed64] '), ('text', ' '), ('text', '1.2 '), ('text', '13858493542095451628 ')], - [('text', '[len_delimited->string] '), ('text', ' '), ('text', '3 '), ('text', 'de_DE ')], - [('text', '[varint->uint32] '), ('text', ' '), ('text', '6 '), ('text', '1 ')], - [('text', '[len_delimited->string] '), ('text', ' '), ('text', '7 '), ('text', 'de.mcdonalds.mcdonaldsinfoapp ')] - ] - - -def test_view_grpc(tdata): - v = full_eval(ViewGrpcProtobuf()) - p = tdata.path(datadir + "msg1.bin") - - with open(p, "rb") as f: - raw = f.read() - # pack into protobuf message - raw = helper_pack_grpc_message(raw) - - view_text, output = v(raw, content_type="application/grpc", http_message=sim_msg_req) - assert view_text == "gRPC" - output = list(output) # assure list conversion if generator - - assert output == [ - [('text', 'gRPC message 0 (compressed False)')], - [('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')], - [('text', '[fixed64] '), ('text', ' '), ('text', '1.1 '), ('text', '4630671247600644312 ')], - [('text', '[fixed64] '), ('text', ' '), ('text', '1.2 '), ('text', '13858493542095451628 ')], - [('text', '[string] '), ('text', ' '), ('text', '3 '), ('text', 'de_DE ')], - [('text', '[uint32] '), ('text', ' '), ('text', '6 '), ('text', '1 ')], - [('text', '[string] '), ('text', ' '), ('text', '7 '), ('text', 'de.mcdonalds.mcdonaldsinfoapp ')] - ] - with pytest.raises(ValueError, match='invalid gRPC message'): - v(b'foobar', content_type="application/grpc") - with pytest.raises(ValueError, match='Failed to decompress gRPC message with gzip'): - list(parse_grpc_messages(data=b'\x01\x00\x00\x00\x01foobar', compression_scheme="gzip")) - - -def test_view_grpc_compressed(tdata): - v = full_eval(grpc.ViewGrpcProtobuf()) - p = tdata.path(datadir + "msg1.bin") - - with open(p, "rb") as f: - raw = f.read() - # pack into protobuf message - raw = helper_pack_grpc_message(raw, True, "gzip") - - view_text, output = v(raw, content_type="application/grpc") - assert view_text == "gRPC" - output = list(output) # assure list conversion if generator - - assert output == [ - [('text', 'gRPC message 0 (compressed gzip)')], - [('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')], - [('text', '[fixed64] '), ('text', ' '), ('text', '1.1 '), ('text', '4630671247600644312 ')], - [('text', '[fixed64] '), ('text', ' '), ('text', '1.2 '), ('text', '13858493542095451628 ')], - [('text', '[string] '), ('text', ' '), ('text', '3 '), ('text', 'de_DE ')], - [('text', '[uint32] '), ('text', ' '), ('text', '6 '), ('text', '1 ')], - [('text', '[string] '), ('text', ' '), ('text', '7 '), ('text', 'de.mcdonalds.mcdonaldsinfoapp ')] - ] - - -def helper_encode_base128le(val: int): - # hacky base128le encoding - if val <= 0: - return b'\x00' - res = [] - while val > 0: - part = val & 0b1111111 - val = val >> 7 - if val > 0: - res.append(part + 0x80) - else: - res.append(part) - return bytes(res) - - -def helper_gen_varint_msg_field(f_idx: int, f_val: int): - # manual encoding of protobuf data - f_wt = 0 # field type 0 (varint) - tag = (f_idx << 3) | f_wt # combined tag - msg = helper_encode_base128le(tag) # add encoded tag to message - msg = msg + helper_encode_base128le(f_val) # add varint encoded field value - return msg - - -def helper_gen_bits32_msg_field(f_idx: int, f_val: int): - # manual encoding of protobuf data - f_wt = 5 # field type 5 (bits32) - tag = (f_idx << 3) | f_wt # combined tag - msg = helper_encode_base128le(tag) # add encoded tag to message - msg = msg + struct.pack(" 32bit - msg += helper_gen_varint_msg_field(3, 1 << 64) # varint > 64bit (returned as 0x0 by Kaitai protobuf decoder) - msg += helper_gen_bits32_msg_field(4, 0xbf8ccccd) # bits32 - msg += helper_gen_bits64_msg_field(5, 0xbff199999999999a) # bits64 - msg += helper_gen_varint_msg_field(6, 0xffffffff) # 32 bit varint negative - msg += helper_gen_lendel_msg_field(7, b"hello world") # length delimted message, UTF-8 parsable - msg += helper_gen_varint_msg_field(8, 1 << 128) # oversized varint - - parser = ProtoParser( - data=msg, - parser_options=ProtoParser.ParserOptions(), - rules=[] - ) - - fields = parser.root_fields - assert fields[0].wire_value == 1 - assert fields[1].wire_value == 1 << 32 - as_bool = fields[1].decode_as(ProtoParser.DecodedTypes.bool) - assert isinstance(as_bool, bool) - assert as_bool - as_bool = fields[2].decode_as(ProtoParser.DecodedTypes.bool) - assert isinstance(as_bool, bool) - assert not as_bool - assert fields[1].decode_as(ProtoParser.DecodedTypes.float) == 2.121995791e-314 - assert fields[1].safe_decode_as(ProtoParser.DecodedTypes.uint32) == (ProtoParser.DecodedTypes.uint64, 1 << 32) - assert fields[0].safe_decode_as(ProtoParser.DecodedTypes.sfixed32) == (ProtoParser.DecodedTypes.uint32, 1) - assert fields[3].wire_type == ProtoParser.WireTypes.bit_32 - assert fields[4].wire_type == ProtoParser.WireTypes.bit_64 - # signed 32 bit int (standard encoding) - assert fields[5].safe_decode_as(ProtoParser.DecodedTypes.int32) == (ProtoParser.DecodedTypes.int32, -1) - # fixed (signed) 32bit int (ZigZag encoding) - assert fields[5].safe_decode_as(ProtoParser.DecodedTypes.sint32) == (ProtoParser.DecodedTypes.sint32, -2147483648) - # sint64 - assert fields[1].safe_decode_as(ProtoParser.DecodedTypes.sint64) == (ProtoParser.DecodedTypes.sint64, 2147483648) - # int64 - assert fields[1].safe_decode_as(ProtoParser.DecodedTypes.int64) == (ProtoParser.DecodedTypes.int64, 4294967296) - - # varint 64bit to enum - assert fields[1].safe_decode_as(ProtoParser.DecodedTypes.enum) == (ProtoParser.DecodedTypes.enum, 4294967296) - - # bits64 to sfixed64 - assert fields[4].safe_decode_as(ProtoParser.DecodedTypes.sfixed64) == (ProtoParser.DecodedTypes.sfixed64, -4615739258092021350) - # bits64 to fixed64 - assert fields[4].safe_decode_as(ProtoParser.DecodedTypes.fixed64) == (ProtoParser.DecodedTypes.fixed64, 0xbff199999999999a) - # bits64 to double - assert fields[4].safe_decode_as(ProtoParser.DecodedTypes.double) == (ProtoParser.DecodedTypes.double, -1.1) - # bits64 to float --> failover fixed64 (64bit to large for double) - assert fields[4].safe_decode_as(ProtoParser.DecodedTypes.float) == (ProtoParser.DecodedTypes.fixed64, 0xbff199999999999a) - - # bits32 to sfixed32 - assert fields[3].safe_decode_as(ProtoParser.DecodedTypes.sfixed32) == (ProtoParser.DecodedTypes.sfixed32, -1081291571) - # bits32 to fixed32 - assert fields[3].safe_decode_as(ProtoParser.DecodedTypes.fixed32) == (ProtoParser.DecodedTypes.fixed32, 0xbf8ccccd) - # bits32 to float - assert fields[3].safe_decode_as(ProtoParser.DecodedTypes.float) == (ProtoParser.DecodedTypes.float, -1.100000023841858) - # bits32 to string --> failover fixed32 - assert fields[3].safe_decode_as(ProtoParser.DecodedTypes.string) == (ProtoParser.DecodedTypes.fixed32, 0xbf8ccccd) - - # length delimeted to string - assert fields[6].safe_decode_as(ProtoParser.DecodedTypes.string) == (ProtoParser.DecodedTypes.string, "hello world") - # length delimeted to bytes - assert fields[6].safe_decode_as(ProtoParser.DecodedTypes.bytes) == (ProtoParser.DecodedTypes.bytes, b"hello world") - - assert fields[0].wire_value_as_utf8() == "1" - - with pytest.raises(TypeError, match="intended decoding mismatches wire type"): - fields[0].decode_as(ProtoParser.DecodedTypes.sfixed32) - with pytest.raises(TypeError, match="wire value too large for int32"): - fields[1].decode_as(ProtoParser.DecodedTypes.int32) - with pytest.raises(TypeError, match="wire value too large for sint32"): - fields[1].decode_as(ProtoParser.DecodedTypes.sint32) - with pytest.raises(TypeError, match="wire value too large for uint32"): - fields[1].decode_as(ProtoParser.DecodedTypes.uint32) - with pytest.raises(TypeError, match="can not be converted to floatingpoint representation"): - fields[6]._wire_value_as_float() - with pytest.raises(TypeError, match="wire value too large for int64"): - fields[7].decode_as(ProtoParser.DecodedTypes.int64) - with pytest.raises(TypeError, match="wire value too large"): - fields[7].decode_as(ProtoParser.DecodedTypes.uint64) - with pytest.raises(TypeError, match="wire value too large for sint64"): - fields[7].decode_as(ProtoParser.DecodedTypes.sint64) - with pytest.raises(ValueError, match="varint exceeds bounds of provided data"): - ProtoParser.read_fields( - wire_data=helper_encode_base128le(1 << 128), - options=ProtoParser.ParserOptions(), - parent_field=None, - rules=[] - ) - with pytest.raises(ValueError, match="value exceeds 64bit, violating protobuf specs"): - fields = ProtoParser.read_fields( - wire_data=helper_gen_varint_msg_field(1, 1 << 128), - options=ProtoParser.ParserOptions(), - parent_field=None, - rules=[] - ) - fields[0]._value_as_bytes() - with pytest.raises(ValueError, match=".* is not a valid .*WireTypes"): - ProtoParser.read_fields( - wire_data=helper_encode_base128le(0x7), # invalid wiretype 0x7 - options=ProtoParser.ParserOptions(), - parent_field=None, - rules=[] - ) - - -def test_view_protobuf_custom_config_packed(tdata): - # message with repeated field fixed64 - msg_inner1 = helper_gen_bits64_msg_field(2, 12) - msg_inner1 += helper_gen_bits64_msg_field(2, 23) - msg_inner1 += helper_gen_bits64_msg_field(2, 456789012345678) - msg1 = helper_gen_lendel_msg_field(1, msg_inner1) - - v = full_eval(ViewGrpcProtobuf()) - view_text, output = v(msg1) - assert view_text == "Protobuf (flattened)" - output = list(output) # assure list conversion if generator - assert output == [ - [('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')], - [('text', '[fixed64] '), ('text', ' '), ('text', '1.2 '), ('text', '12 ')], - [('text', '[fixed64] '), ('text', ' '), ('text', '1.2 '), ('text', '23 ')], - [('text', '[fixed64] '), ('text', ' '), ('text', '1.2 '), ('text', '456789012345678 ')] - ] - - # same message as above, but fixed64 values are packed - # Note: the decoded has no type indication, as packed values are always contained in - # a length delimited field. The packed fields contain no individual type header - - # decoder has no knowledge of packed repeated field - msg_inner2 = helper_gen_bits64_msg_field_packed(2, [12, 23, 456789012345678]) - msg2 = helper_gen_lendel_msg_field(1, msg_inner2) - view_text, output = v(msg2) - assert view_text == "Protobuf (flattened)" - output = list(output) # assure list conversion if generator - assert output == [ - [('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')], # noqa: E501 - [('text', '[bytes] '), ('text', ' '), ('text', '1.2 '), ('text', "b'\\x0c\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x17\\x00\\x00\\x00\\x00\\x00\\x00\\x00Ns\\xd1zr\\x9f\\x01\\x00' ")] # noqa: E501 - ] - - # decoder uses custom definition to decode as 1.2 as "packed, repeated fixed64" - view_config = ViewConfig( - parser_options=ProtoParser.ParserOptions(), - parser_rules=[ - ProtoParser.ParserRule( - filter=".*", - name="parse packed field", - field_definitions=[ - ProtoParser.ParserFieldDefinition( - name="packed repeated fixed64", - tag="1.2", - intended_decoding=ProtoParser.DecodedTypes.fixed64, - as_packed=True - ) - ] - ) - ] - ) - v = full_eval(ViewGrpcProtobuf(view_config)) - msg_inner2 = helper_gen_bits64_msg_field_packed(2, [12, 23, 456789012345678]) - msg2 = helper_gen_lendel_msg_field(1, msg_inner2) - # provide the view a flow and response message dummies, to allow custom rules to work - view_text, output = v(msg2, flow=sim_flow, http_message=sim_flow.response) - assert view_text == "Protobuf (flattened)" - output = list(output) # assure list conversion if generator - assert output == [ - [('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')], - [('text', '[fixed64] '), ('text', 'packed repeated fixed64 '), ('text', '1.2 '), ('text', '12 ')], - [('text', '[fixed64] '), ('text', 'packed repeated fixed64 '), ('text', '1.2 '), ('text', '23 ')], - [('text', '[fixed64] '), ('text', 'packed repeated fixed64 '), ('text', '1.2 '), ('text', '456789012345678 ')] - ] - - # message with packed repeated messages in field 1.5 - # Note: protobuf v3 only allows packed encoding for scalar field types, but packed messages - # were spotted in traffic to google gRPC endpoints (f.e. https://play.googleapis.com/log/batch) - p_msg1 = helper_gen_lendel_msg_field(1, b"inner message 1") - p_msg1 += helper_gen_varint_msg_field(2, 1) - p_msg2 = helper_gen_lendel_msg_field(1, b"inner message 2") - p_msg2 += helper_gen_varint_msg_field(2, 2) - p_msg3 = helper_gen_lendel_msg_field(1, b"inner message 3") - p_msg3 += helper_gen_varint_msg_field(2, 3) - msg_inner3 = helper_gen_lendel_msg_field_packed(5, [p_msg1, p_msg2, p_msg3]) - msg3 = helper_gen_lendel_msg_field(1, msg_inner3) - view_config = ViewConfig( - parser_options=ProtoParser.ParserOptions(), - parser_rules=[ - ProtoParser.ParserRule( - filter=".*", - name="parse packed field", - field_definitions=[ - ProtoParser.ParserFieldDefinition( - name="packed repeated message", - tag="1.5", - intended_decoding=ProtoParser.DecodedTypes.message, - as_packed=True - ) - ] - ) - ] - ) - v = full_eval(ViewGrpcProtobuf(view_config)) - # provide the view a flow and response message dummies, to allow custom rules to work - view_text, output = v(msg3, flow=sim_flow, http_message=sim_flow.response) - assert view_text == "Protobuf (flattened)" - output = list(output) # assure list conversion if generator - assert output == [ - [('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')], - [('text', '[message] '), ('text', 'packed repeated message '), ('text', '1.5 '), ('text', ' ')], - [('text', '[string] '), ('text', ' '), ('text', '1.5.1 '), ('text', 'inner message 1 ')], - [('text', '[uint32] '), ('text', ' '), ('text', '1.5.2 '), ('text', '1 ')], - [('text', '[message] '), ('text', 'packed repeated message '), ('text', '1.5 '), ('text', ' ')], - [('text', '[string] '), ('text', ' '), ('text', '1.5.1 '), ('text', 'inner message 2 ')], - [('text', '[uint32] '), ('text', ' '), ('text', '1.5.2 '), ('text', '2 ')], - [('text', '[message] '), ('text', 'packed repeated message '), ('text', '1.5 '), ('text', ' ')], - [('text', '[string] '), ('text', ' '), ('text', '1.5.1 '), ('text', 'inner message 3 ')], - [('text', '[uint32] '), ('text', ' '), ('text', '1.5.2 '), ('text', '3 ')] - ] - - # message with repeated messages in field 1.5 (not packed), has to be detected by failover parsing - msg_inner4 = helper_gen_lendel_msg_field(5, p_msg1) - msg_inner4 += helper_gen_lendel_msg_field(5, p_msg2) - msg_inner4 += helper_gen_lendel_msg_field(5, p_msg3) - msg4 = helper_gen_lendel_msg_field(1, msg_inner4) - view_config = ViewConfig( - parser_options=ProtoParser.ParserOptions(), - parser_rules=[ - ProtoParser.ParserRule( - filter=".*", - name="parse packed field", - field_definitions=[ - ProtoParser.ParserFieldDefinition( - name="packed repeated message", - tag="1.5", - intended_decoding=ProtoParser.DecodedTypes.message, - as_packed=True - ) - ] - ) - ] - ) - v = full_eval(ViewGrpcProtobuf(view_config)) - # provide the view a flow and response message dummies, to allow custom rules to work - view_text, output = v(msg4, flow=sim_flow, http_message=sim_flow.response) - assert view_text == "Protobuf (flattened)" - output = list(output) # assure list conversion if generator - assert output == [ - [('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')], - [('text', '[message] '), ('text', 'packed repeated message '), ('text', '1.5 '), ('text', ' ')], - [('text', '[string] '), ('text', ' '), ('text', '1.5.1 '), ('text', 'inner message 1 ')], - [('text', '[uint32] '), ('text', ' '), ('text', '1.5.2 '), ('text', '1 ')], - [('text', '[message] '), ('text', 'packed repeated message '), ('text', '1.5 '), ('text', ' ')], - [('text', '[string] '), ('text', ' '), ('text', '1.5.1 '), ('text', 'inner message 2 ')], - [('text', '[uint32] '), ('text', ' '), ('text', '1.5.2 '), ('text', '2 ')], - [('text', '[message] '), ('text', 'packed repeated message '), ('text', '1.5 '), ('text', ' ')], - [('text', '[string] '), ('text', ' '), ('text', '1.5.1 '), ('text', 'inner message 3 ')], - [('text', '[uint32] '), ('text', ' '), ('text', '1.5.2 '), ('text', '3 ')] - ] - - # packed bit32 - msg_inner = helper_gen_bits32_msg_field_packed(2, [12, 23, 4567890]) - msg = helper_gen_lendel_msg_field(1, msg_inner) - view_config = ViewConfig( - parser_options=ProtoParser.ParserOptions(), - parser_rules=[ - ProtoParser.ParserRule( - filter=".*", - name="parse packed field", - field_definitions=[ - ProtoParser.ParserFieldDefinition( - name="packed repeated fixed32", - tag="1.2", - intended_decoding=ProtoParser.DecodedTypes.fixed32, - as_packed=True - ) - ] - ) - ] - ) - v = full_eval(ViewGrpcProtobuf(view_config)) - # provide the view a flow and response message dummies, to allow custom rules to work - view_text, output = v(msg, flow=sim_flow, http_message=sim_flow.response) - assert view_text == "Protobuf (flattened)" - output = list(output) # assure list conversion if generator - assert output == [ - [('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')], - [('text', '[fixed32] '), ('text', 'packed repeated fixed32 '), ('text', '1.2 '), ('text', '12 ')], - [('text', '[fixed32] '), ('text', 'packed repeated fixed32 '), ('text', '1.2 '), ('text', '23 ')], - [('text', '[fixed32] '), ('text', 'packed repeated fixed32 '), ('text', '1.2 '), ('text', '4567890 ')] - ] - - # packed bit32, invalid - msg_inner = helper_gen_bits32_msg_field_packed(2, [12, 23, 4567890]) + b"\x01" # data not divisible by 4 - msg = helper_gen_lendel_msg_field(1, msg_inner) - view_config = ViewConfig( - parser_options=ProtoParser.ParserOptions(), - parser_rules=[ - ProtoParser.ParserRule( - filter=".*", - name="parse packed field", - field_definitions=[ - ProtoParser.ParserFieldDefinition( - name="packed repeated fixed32", - tag="1.2", - intended_decoding=ProtoParser.DecodedTypes.fixed32, - as_packed=True - ) - ] - ) - ] - ) - v = full_eval(ViewGrpcProtobuf(view_config)) - # provide the view a flow and response message dummies, to allow custom rules to work - view_text, output = v(msg, flow=sim_flow, http_message=sim_flow.response) - assert view_text == "Protobuf (flattened)" - output = list(output) # assure list conversion if generator - assert output == [ - [('text', '[bytes] '), ('text', ' '), ('text', '1 '), ('text', "b'\\x12\\x0c\\x0c\\x00\\x00\\x00\\x17\\x00\\x00\\x00R\\xb3E\\x00\\x01' ")] # noqa: E501 - ] - - # packed bit64, invalid - msg_inner = helper_gen_bits64_msg_field_packed(2, [12, 23, 4567890]) + b"\x01" # data not divisible by 8 - msg = helper_gen_lendel_msg_field(1, msg_inner) - view_config = ViewConfig( - parser_options=ProtoParser.ParserOptions(), - parser_rules=[ - ProtoParser.ParserRule( - filter=".*", - name="parse packed field", - field_definitions=[ - ProtoParser.ParserFieldDefinition( - name="packed repeated fixed64", - tag="1.2", - intended_decoding=ProtoParser.DecodedTypes.fixed64, - as_packed=True - ) - ] - ) - ] - ) - v = full_eval(ViewGrpcProtobuf(view_config)) - # provide the view a flow and response message dummies, to allow custom rules to work - view_text, output = v(msg, flow=sim_flow, http_message=sim_flow.response) - assert view_text == "Protobuf (flattened)" - output = list(output) # assure list conversion if generator - assert output == [ - [('text', '[bytes] '), ('text', ' '), ('text', '1 '), ('text', "b'\\x12\\x18\\x0c\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x17\\x00\\x00\\x00\\x00\\x00\\x00\\x00R\\xb3E\\x00\\x00\\x00\\x00\\x00\\x01'")] # noqa: E501 - ] - - # packed varint - msg_inner = helper_gen_varint_msg_field_packed(2, [12, 23, 4567890]) - msg = helper_gen_lendel_msg_field(1, msg_inner) - view_config = ViewConfig( - parser_options=ProtoParser.ParserOptions(), - parser_rules=[ - ProtoParser.ParserRule( - filter=".*", - name="parse packed field", - field_definitions=[ - ProtoParser.ParserFieldDefinition( - name="packed repeated varint", - tag="1.2", - intended_decoding=ProtoParser.DecodedTypes.uint32, - as_packed=True - ) - ] - ) - ] - ) - v = full_eval(ViewGrpcProtobuf(view_config)) - # provide the view a flow and response message dummies, to allow custom rules to work - view_text, output = v(msg, flow=sim_flow, http_message=sim_flow.response) - assert view_text == "Protobuf (flattened)" - output = list(output) # assure list conversion if generator - assert output == [ - [('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')], - [('text', '[uint32] '), ('text', 'packed repeated varint '), ('text', '1.2 '), ('text', '12 ')], - [('text', '[uint32] '), ('text', 'packed repeated varint '), ('text', '1.2 '), ('text', '23 ')], - [('text', '[uint32] '), ('text', 'packed repeated varint '), ('text', '1.2 '), ('text', '4567890 ')] - ] - - -def test_render_priority(): - v = grpc.ViewGrpcProtobuf() - assert v.render_priority(b"data", content_type="application/x-protobuf") - assert v.render_priority(b"data", content_type="application/x-protobuffer") - assert v.render_priority(b"data", content_type="application/grpc-proto") - assert v.render_priority(b"data", content_type="application/grpc") - assert v.render_priority(b"data", content_type="application/prpc") - assert not v.render_priority(b"data", content_type="text/plain") diff --git a/test/mitmproxy/contentviews/test_grpc_data/msg1.bin b/test/mitmproxy/contentviews/test_grpc_data/msg1.bin deleted file mode 100644 index 6a44fe1169..0000000000 --- a/test/mitmproxy/contentviews/test_grpc_data/msg1.bin +++ /dev/null @@ -1,2 +0,0 @@ - - rC@zsBSde_DE0:de.mcdonalds.mcdonaldsinfoapp \ No newline at end of file diff --git a/test/mitmproxy/contentviews/test_grpc_data/msg2.bin b/test/mitmproxy/contentviews/test_grpc_data/msg2.bin deleted file mode 100644 index 56678407ff981a7bc78045ee26e7e4b49d2aecdb..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 207 zcmd;*a5BvasIV|K3AAu=DKkmRPj^akF?O`I)!4yd(7g zNJ+IWv^CUGNX-M%26~n&7#R*R3Mc31>Luo-6y;~8=#{1BrDW!%pJEhFGt%=YH7oQf mH_J>;3$sYd@`}2IRrChqRkjC=e{5bb&VT#6i>-laJ|h5`d_21V diff --git a/test/mitmproxy/contentviews/test_grpc_data/msg3.bin b/test/mitmproxy/contentviews/test_grpc_data/msg3.bin deleted file mode 100644 index 9a9c37c18a..0000000000 --- a/test/mitmproxy/contentviews/test_grpc_data/msg3.bin +++ /dev/null @@ -1,13 +0,0 @@ - - -61650 Pennsylvania Avenue NW, Washington, DC 20502, USA -"16501650: -Pennsylvania Avenue NorthwestPennsylvania Avenue NW0 -Northwest WashingtonNorthwest Washington -  -Washington -Washington -District of ColumbiaDC -USAUS -2050220502*f - /rC@&azzBS grC@*a2BS" sC@CcBS* 켍rC@.ڈ́BS2 4rC@ erBS:ChIJAXiAory3t4kRpkrvas9dYmQ \ No newline at end of file diff --git a/test/mitmproxy/contentviews/test_protobuf.py b/test/mitmproxy/contentviews/test_protobuf.py deleted file mode 100644 index 99d6768ede..0000000000 --- a/test/mitmproxy/contentviews/test_protobuf.py +++ /dev/null @@ -1,37 +0,0 @@ -import pytest - -from . import full_eval -from mitmproxy.contentviews import protobuf - -datadir = "mitmproxy/contentviews/test_protobuf_data/" - - -def test_view_protobuf_request(tdata): - v = full_eval(protobuf.ViewProtobuf()) - p = tdata.path(datadir + "protobuf01.bin") - - with open(p, "rb") as f: - raw = f.read() - content_type, output = v(raw) - assert content_type == "Protobuf" - assert output == [[("text", "1: 3bbc333c-e61c-433b-819a-0b9a8cc103b8")]] - with pytest.raises(ValueError, match="Failed to parse input."): - v(b"foobar") - - -@pytest.mark.parametrize("filename", ["protobuf02.bin", "protobuf03.bin"]) -def test_format_pbuf(filename, tdata): - path = tdata.path(datadir + filename) - with open(path, "rb") as f: - input = f.read() - with open(path.replace(".bin", "-decoded.bin")) as f: - expected = f.read() - - assert protobuf.format_pbuf(input) == expected - - -def test_render_priority(): - v = protobuf.ViewProtobuf() - assert v.render_priority(b"data", content_type="application/x-protobuf") - assert v.render_priority(b"data", content_type="application/x-protobuffer") - assert not v.render_priority(b"data", content_type="text/plain") diff --git a/test/mitmproxy/contentviews/test_protobuf_data/protobuf01.bin b/test/mitmproxy/contentviews/test_protobuf_data/protobuf01.bin deleted file mode 100644 index fbfdbff33c..0000000000 --- a/test/mitmproxy/contentviews/test_protobuf_data/protobuf01.bin +++ /dev/null @@ -1,2 +0,0 @@ - -$3bbc333c-e61c-433b-819a-0b9a8cc103b8 \ No newline at end of file diff --git a/test/mitmproxy/contentviews/test_protobuf_data/protobuf02-decoded.bin b/test/mitmproxy/contentviews/test_protobuf_data/protobuf02-decoded.bin deleted file mode 100644 index 9be61e28e5..0000000000 --- a/test/mitmproxy/contentviews/test_protobuf_data/protobuf02-decoded.bin +++ /dev/null @@ -1,65 +0,0 @@ -1 { - 1: tpbuf - 4 { - 1: Person - 2 { - 1: name - 3: 1 - 4: 2 - 5: 9 - } - 2 { - 1: id - 3: 2 - 4: 2 - 5: 5 - } - 2 { - 1 { - 12: 1818845549 - } - 3: 3 - 4: 1 - 5: 9 - } - 2 { - 1: phone - 3: 4 - 4: 3 - 5: 11 - 6: .Person.PhoneNumber - } - 3 { - 1: PhoneNumber - 2 { - 1: number - 3: 1 - 4: 2 - 5: 9 - } - 2 { - 1: type - 3: 2 - 4: 1 - 5: 14 - 6: .Person.PhoneType - 7: HOME - } - } - 4 { - 1: PhoneType - 2 { - 1: MOBILE - 2: 0 - } - 2 { - 1: HOME - 2: 1 - } - 2 { - 1: WORK - 2: 2 - } - } - } -} diff --git a/test/mitmproxy/contentviews/test_protobuf_data/protobuf02.bin b/test/mitmproxy/contentviews/test_protobuf_data/protobuf02.bin deleted file mode 100644 index a47c45d516ec40110b90106586029aa9796ee756..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 213 zcmdD7V4sxmZdn3sQmRGivY|38I(~0+g^~@$mO`Rnq3-1e*d90=kIJ*Wbz0$5ntqh=U8P PK!6d%3il847GMGZF#a-M diff --git a/test/mitmproxy/contentviews/test_protobuf_data/protobuf03-decoded.bin b/test/mitmproxy/contentviews/test_protobuf_data/protobuf03-decoded.bin deleted file mode 100644 index 3d3392e164..0000000000 --- a/test/mitmproxy/contentviews/test_protobuf_data/protobuf03-decoded.bin +++ /dev/null @@ -1,4 +0,0 @@ -2 { -3: 3840 -4: 2160 -} diff --git a/test/mitmproxy/contentviews/test_protobuf_data/protobuf03.bin b/test/mitmproxy/contentviews/test_protobuf_data/protobuf03.bin deleted file mode 100644 index 9fb230b3aa..0000000000 --- a/test/mitmproxy/contentviews/test_protobuf_data/protobuf03.bin +++ /dev/null @@ -1 +0,0 @@ -  \ No newline at end of file From b161237cd7f165515aec91cf97b57c655e2869e4 Mon Sep 17 00:00:00 2001 From: Frederic Morin Date: Tue, 5 Nov 2024 17:34:25 -0500 Subject: [PATCH 2/4] lint --- mitmproxy/platform/linux.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mitmproxy/platform/linux.py b/mitmproxy/platform/linux.py index ffbaa82775..7457c25dcf 100644 --- a/mitmproxy/platform/linux.py +++ b/mitmproxy/platform/linux.py @@ -1,6 +1,6 @@ +import os import socket import struct -import os # Python's socket module does not have these constants SO_ORIGINAL_DST = 80 From 8c3028585efc89ae7656f2c59535c503fc7d34f5 Mon Sep 17 00:00:00 2001 From: Frederic Morin Date: Tue, 5 Nov 2024 17:40:25 -0500 Subject: [PATCH 3/4] test fix --- test/mitmproxy/addons/test_proxyserver.py | 1 + 1 file changed, 1 insertion(+) diff --git a/test/mitmproxy/addons/test_proxyserver.py b/test/mitmproxy/addons/test_proxyserver.py index 341bfbe091..5b770fd09a 100644 --- a/test/mitmproxy/addons/test_proxyserver.py +++ b/test/mitmproxy/addons/test_proxyserver.py @@ -193,6 +193,7 @@ async def test_warn_no_nextlayer(caplog): async def test_self_connect(): + pytest.skip() server = tserver_conn() client = tclient_conn() server.address = ("localhost", 8080) From 93af58cdc4f531380be46681c8f049a88292c9be Mon Sep 17 00:00:00 2001 From: Frederic Morin Date: Tue, 5 Nov 2024 17:43:13 -0500 Subject: [PATCH 4/4] no autofix --- .github/workflows/autofix.yml | 30 ------------------------------ 1 file changed, 30 deletions(-) delete mode 100644 .github/workflows/autofix.yml diff --git a/.github/workflows/autofix.yml b/.github/workflows/autofix.yml deleted file mode 100644 index 8ee552c49e..0000000000 --- a/.github/workflows/autofix.yml +++ /dev/null @@ -1,30 +0,0 @@ -name: autofix.ci - -on: - pull_request: - push: - branches: - - main - -permissions: - contents: read - -jobs: - autofix: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - - uses: install-pinned/ruff@0e35bc58bd73769469284df9e1f8898daeea8768 - - run: ruff --fix-only . - - run: ruff format . - - - name: Run prettier - run: | - npm ci - npm run prettier - working-directory: web - - - uses: mhils/add-pr-ref-in-changelog@main - - - uses: autofix-ci/action@d3e591514b99d0fca6779455ff8338516663f7cc