Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion javaobj.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import struct
import sys

from modifiedutf8 import decode_modified_utf8

try:
# Python 2
from StringIO import StringIO as BytesIO
Expand Down Expand Up @@ -111,7 +113,10 @@ def to_str(data, encoding="UTF-8"):
if type(data) is str:
# Nothing to do
return data
return str(data, encoding)
try:
return str(data, encoding)
except UnicodeDecodeError:
return decode_modified_utf8(data)[0]

def read_to_str(data):
"""
Expand Down
169 changes: 169 additions & 0 deletions modifiedutf8.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# Migrated from
# https://github.com/swstephe/py2jdbc/blob/master/py2jdbc/mutf8.py

class DecodeMap(object):
"""
A utility class which manages masking, comparing and mapping in bits.
If the mask and compare fails, this will raise UnicodeDecodeError so
encode and decode will correctly handle bad characters.
"""
def __init__(self, count, mask, value, bits):
"""
Initialize a DecodeMap, entry from a static dictionary for the module.
It automatically calculates the mask for the bits for the value, (always
assumed to be at the bottom of the byte).
:param count: The number of bytes in this entire sequence.
:param mask: The mask to apply to the byte at this position.
:param value: The value of masked bits, (without shifting).
:param bits: The number of bits.
"""
self.count = count
self.mask = mask
self.value = value
self.bits = bits
self.mask2 = (1 << bits) - 1

def apply(self, byte, value, data, i, count):
"""
Apply mask, compare to expected value, shift and return
result. Eventually, this could become a `reduce` function.
:param byte: The byte to compare
:param value: The currently accumulated value.
:param data: The data buffer, (array of bytes).
:param i: The position within the data buffer.
:param count: The position of this comparison.
:return: A new value with the bits merged in.
:raises: UnicodeDecodeError if maked bits don't match.
"""
if byte & self.mask == self.value:
value <<= self.bits
value |= byte & self.mask2
else:
raise UnicodeDecodeError(
NAME, data, i, i + count,
"invalid {}-byte sequence".format(self.count)
)
return value

def __repr__(self):
return "DecodeMap({})".format(
', '.join(
'{}=0x{:02x}'.format(n, getattr(self, n))
for n in ('count', 'mask', 'value', 'bits', 'mask2')
)
)


DECODER_MAP = {
2: (
(0xc0, 0x80, 6),
),
3: (
(0xc0, 0x80, 6),
(0xc0, 0x80, 6)
),
6: (
(0xf0, 0xa0, 4),
(0xc0, 0x80, 6),
(0xff, 0xed, 0),
(0xf0, 0xb0, 4),
(0xc0, 0x80, 6),
)
}
DECODE_MAP = dict(
(k, tuple(
DecodeMap(k, *vv) for vv in v)
)
for k, v in DECODER_MAP.items()
)


def decoder(data):
"""
This generator processes a sequence of bytes in Modified UTF-8 encoding and produces
a sequence of unicode string characters. It takes bits from the byte until it matches
one of the known encoding serquences.
It uses `DecodeMap` to mask, compare and generate values.
:param data: a string of bytes in Modified UTF-8 encoding.
:return: a generator producing a string of unicode characters
:raises: `UnicodeDecodeError` if unrecognized byte in sequence is encountered.
"""
def next_byte(_it, start, count):
try:
return next(_it)[1]
except StopIteration:
raise UnicodeDecodeError(
NAME, data, start, start + count,
"incomplete byte sequence"
)

it = iter(enumerate(data))
for i, d in it:
if d == 0x00: # 00000000
raise UnicodeDecodeError(
NAME, data, i, i + 1,
"embedded zero-byte not allowed"
)
elif d & 0x80: # 1xxxxxxx
if d & 0x40: # 11xxxxxx
if d & 0x20: # 111xxxxx
if d & 0x10: # 1111xxxx
raise UnicodeDecodeError(
NAME, data, i, i + 1,
"invalid encoding character"
)
elif d == 0xed:
value = 0
for i1, dm in enumerate(DECODE_MAP[6]):
d1 = next_byte(it, i, i1 + 1)
value = dm.apply(d1, value, data, i, i1 + 1)
else: # 1110xxxx
value = d & 0x0f
for i1, dm in enumerate(DECODE_MAP[3]):
d1 = next_byte(it, i, i1 + 1)
value = dm.apply(d1, value, data, i, i1 + 1)
else: # 110xxxxx
value = d & 0x1f
for i1, dm in enumerate(DECODE_MAP[2]):
d1 = next_byte(it, i, i1 + 1)
value = dm.apply(d1, value, data, i, i1 + 1)
else: # 10xxxxxx
raise UnicodeDecodeError(
NAME, data, i, i + 1,
"misplaced continuation character"
)
else: # 0xxxxxxx
value = d
# noinspection PyCompatibility
yield mutf8_unichr(value)


def decode_modified_utf8(data, errors='strict'):
"""
Decodes a sequence of bytes to a unicode text and length using Modified UTF-8.
This function is designed to be used with Python `codecs` module.
:param data: a string of bytes in Modified UTF-8
:param errors: handle decoding errors
:return: unicode text and length
:raises: `UnicodeDecodeError` if sequence is invalid.
"""
value, length = u'', 0
it = iter(decoder(data))
while True:
try:
value += next(it)
length += 1
except StopIteration:
break
except UnicodeDecodeError as e:
if errors == 'strict':
raise e
elif errors == 'ignore':
pass
elif errors == 'replace':
value += u'\uFFFD'
length += 1
return value, length

def mutf8_unichr(value):
return chr(value)