2
0
mirror of https://github.com/openkmip/pykmip synced 2025-12-20 10:13:18 +00:00

Updating support for the BigInteger primitive

This change fixes various bugs with the original BigInteger
implementation, adding in a working version of the primitive. A full
unit test suite is included.
This commit is contained in:
Peter Hamilton
2015-08-27 09:00:00 -04:00
parent 9c7edd65d2
commit 89a6e21a06
2 changed files with 359 additions and 271 deletions

View File

@@ -15,6 +15,7 @@
import logging
import six
import struct
import sys
from struct import pack, unpack
@@ -357,105 +358,131 @@ class LongInteger(Base):
class BigInteger(Base):
BLOCK_SIZE = 8
SHIFT_SIZE = 64
"""
An encodeable object representing a big integer value.
def __init__(self, value=None, tag=Tags.DEFAULT):
A BigInteger is one of the KMIP primitive object types. It is encoded as
a signed, big-endian, integer of arbitrary size. For more information, see
Section 9.1 of the KMIP 1.1 specification.
"""
def __init__(self, value=0, tag=Tags.DEFAULT):
super(BigInteger, self).__init__(tag, type=Types.BIG_INTEGER)
self.value = value
if self.value is not None:
self.real_length = utils.count_bytes(self.value)
self.padding_length = self.BLOCK_SIZE - (self.length %
self.BLOCK_SIZE)
if self.padding_length == self.BLOCK_SIZE:
self.padding_length = 0
else:
self.length = None
self.padding_length = None
self.validate()
def read_value(self, istream):
if (self.length < self.BLOCK_SIZE) or (self.length % self.BLOCK_SIZE):
raise errors.InvalidLengthError(BigInteger.__name__,
('multiple'
'of {0}'.format(self.BLOCK_SIZE)),
self.length)
self.value = 0
num_blocks = self.length / self.BLOCK_SIZE
# Read first block as signed data
self.value = unpack('!q', str(istream.read(self.BLOCK_SIZE)))[0]
# Shift current value and add on next unsigned block
for _ in range(num_blocks - 1):
self.value = self.value << self.SHIFT_SIZE
stream_data = istream.read(self.BLOCK_SIZE)
self.value += unpack('!Q', stream_data)[0]
self.validate()
def read(self, istream):
"""
Read the encoding of the BigInteger from the input stream.
Args:
istream (stream): A buffer containing the encoded bytes of the
value of a BigInteger. Usually a BytearrayStream object.
Required.
Raises:
InvalidPrimitiveLength: if the big integer encoding read in has
an invalid encoded length.
"""
super(BigInteger, self).read(istream)
self.read_value(istream)
def write_value(self, ostream):
# 1. Determine the sign of the value (+/-); save it.
# 2. Extend hex of value with 0s until encoding is right size (8x).
# 3. Write out each block of the encoding as signed, 2s complement:
# pack('!q', sign * block)
# Check for a valid length before even trying to parse the value.
if self.length % 8:
raise exceptions.InvalidPrimitiveLength(
"invalid big integer length read; "
"expected: multiple of 8, observed: {0}".format(self.length))
# Determine sign for padding
pad_byte = 0x00
pad_nybl = 0x0
sign = 1
binary = ''
if self.value < 0:
pad_byte = 0xff
pad_nybl = 0xf
# Read the value byte by byte and convert it into binary, padding each
# byte as needed.
for _ in range(self.length):
byte = struct.unpack('!B', istream.read(1))[0]
bits = "{0:b}".format(byte)
pad = len(bits) % 8
if pad:
bits = ('0' * (8 - pad)) + bits
binary += bits
# Compose padding bytes
pad = ''
for _ in range(self.padding_length):
pad += hex(pad_byte)[2:]
# If the value is negative, convert via two's complement.
if binary[0] == '1':
sign = -1
binary = binary.replace('1', 'i')
binary = binary.replace('0', '1')
binary = binary.replace('i', '0')
str_rep = hex(self.value).rstrip("Ll")[2:]
if len(str_rep) % 2:
pad += hex(pad_nybl)[2]
pivot = binary.rfind('0')
binary = binary[0:pivot] + '1' + ('0' * len(binary[pivot + 1:]))
# Compose value for block-based write
str_rep = pad + str_rep
num_blocks = len(str_rep) / self.BLOCK_SIZE
# Write first block as signed data
block = int(str_rep[0:self.BLOCK_SIZE], 16)
ostream.write(pack('!q', block))
# Write remaining blocks as unsigned data
for i in range(1, num_blocks):
block = str_rep[(self.BLOCK_SIZE * i):(self.BLOCK_SIZE * (i + 1))]
block = int(block, 16)
ostream.write(pack('!Q', block))
# Convert the value back to an integer and reapply the sign.
self.value = int(binary, 2) * sign
def write(self, ostream):
"""
Write the encoding of the BigInteger to the output stream.
Args:
ostream (Stream): A buffer to contain the encoded bytes of a
BigInteger object. Usually a BytearrayStream object.
Required.
"""
# Convert the value to binary and pad it as needed.
binary = "{0:b}".format(abs(self.value))
binary = ("0" * (64 - (len(binary) % 64))) + binary
# If the value is negative, convert via two's complement.
if self.value < 0:
binary = binary.replace('1', 'i')
binary = binary.replace('0', '1')
binary = binary.replace('i', '0')
pivot = binary.rfind('0')
binary = binary[0:pivot] + '1' + ('0' * len(binary[pivot + 1:]))
# Convert each byte to hex and build the hex string for the value.
hexadecimal = b''
for i in range(0, len(binary), 8):
byte = binary[i:i + 8]
byte = int(byte, 2)
hexadecimal += struct.pack('!B', byte)
self.length = len(hexadecimal)
super(BigInteger, self).write(ostream)
self.write_value(ostream)
ostream.write(hexadecimal)
def validate(self):
self.__validate()
"""
Verify that the value of the BigInteger is valid.
def __validate(self):
Raises:
TypeError: if the value is not of type int or long
"""
if self.value is not None:
data_type = type(self.value)
if data_type not in six.integer_types:
raise errors.StateTypeError(
BigInteger.__name__, "{0}".format(six.integer_types),
data_type)
num_bytes = utils.count_bytes(self.length)
if num_bytes > self.LENGTH_SIZE:
raise errors.StateOverflowError(
BigInteger.__name__, 'length', self.LENGTH_SIZE,
num_bytes)
if not isinstance(self.value, six.integer_types):
raise TypeError('expected (one of): {0}, observed: {1}'.format(
six.integer_types, type(self.value)))
def __repr__(self):
return "BigInteger(value={0}, tag={1})".format(self.value, self.tag)
def __str__(self):
return str(self.value)
def __eq__(self, other):
if isinstance(other, BigInteger):
if self.value == other.value:
return True
else:
return False
else:
return NotImplemented
def __ne__(self, other):
if isinstance(other, BigInteger):
return not self.__eq__(other)
else:
return NotImplemented
class Enumeration(Integer):