LZSS in Python
This post walks through a simple LZSS compression implementation, written in Python.
Background
The LZSS code I’m presenting here is based on this GitHub project, but I’ve created my own fork with some improvements and optimizations.
For some background on LZSS, Wikipedia has a pretty good description.
The remainder of this post will walk through an implementation of compression followed by decompression.
Compression
Overview
The main function for compression is fairly short:
def compress(data: bytes) -> bytes:
output_buffer = bitarray(endian="big")
output_buffer.fromlist = lambda x: output_buffer.frombytes(bytes(x))
i = 0
while i < len(data):
if match := find_longest_match(data, i):
match_distance, match_length = match
output_buffer.append(IS_MATCH_BIT)
dist_hi, dist_lo = match_distance >> 4, (match_distance) & 0xF
output_buffer.fromlist([dist_hi, (dist_lo << 4) | (match_length - LENGTH_OFFSET)])
i += match_length
else:
output_buffer.append(not IS_MATCH_BIT)
output_buffer.fromlist([data[i]])
i += 1
output_buffer.fill() # Pad to complete last byte
return output_buffer.tobytes()
This function takes in a bytes
object
of uncompressed bytes
and returns a
bytes
object of compressed bytes.
To understand how this code works, we’ll walk through a few compression examples.
Example 1
For our first compression example, we’ll compress the
bytes
object
b"zzzzz"
.
On the first iteration of our loop while i < len(data):
,
find_longest_match(data, i)
will return
None
because we’re looking at the first byte in our raw
data and there are no previous bytes to match against. We’ll walk through
find_longest_match
in more detail later.
So, we append a single bit, 0
,
followed by the byte b"z"
(IS_MATCH_BIT
is a
bool
set to True
).
On the second loop iteration, things are a bit more interesting. Since our next 4 bytes
b"zzzz"
share a common value with our first byte
b"z"
,
find_longest_match
will return a
non-None
Tuple
containing a match_distance
equal to 1
and match_length
equal to 4.
Let’s discuss what these values mean.
match_distance
equal to 1 means that we found a match which
is just one previous position away from our current position.
match_length
equal to 4 means that we matched 4 bytes. In this
case, it means that our match of b"z"
is repeated 4 times.
Since we successfully found a match, we append a single bit value of 1,
IS_MATCH_BIT
, followed by values for
match_distance
and
match_length
packed into 16 bits
(12 bits for match_distance
and 4 bits for match_length
).
Overall, b"zzzzz"
gets packed into
9 bits + 17 bits == 26 bits!
Since we pad our bytes
object with
output_buffer.fill()
, our final output is
4 bytes, compressed down from 5 bytes.
Example 2
For our second example, we’ll use a set of bytes which are a little bit more interesting,
b"wxyzwxy
(although still pretty boring, I know).
For the first 4 bytes,
b"w"
,
b"x"
,
b"y"
, and
b"z"
, find_longest_match
returns None
for each byte, so we pack these 4 bytes into a total of
4 * 9 bits == 36 bits.
For our next byte, w
,
find_longest_match
returns
match_distance
of 4 and
match_length
of 3.
Thus, the remaining bytes b"wxy"
are compressed to 17 bits.
Overall, b"wxyzwxy
compresses down to
36 + 17 == 53 bits, down from 56 bits (not a significant change, but hang on for the next example).
Example 3
For this 3rd and final example, let’s suppose our
bytes
object is
b"wxyzwxyzwxy"
.
As with the previous example, the first 4 bytes will require a total of 36 bits to store.
However, the remaining b"wxyzwxy"
will be summarized
with a
match_distance
of 4 and
match_length
of 7 such that our entire
bytes
object compresses down to
56 bits from 88!
Find longest match
Based on the previous 3 examples, you probably have an idea of what
find_longest_match
is doing, and now
we’ll walk through specifics of how it actually works. Below is its code:
def find_longest_match(data: bytes, current_position: int) -> Optional[Tuple[int, int]]:
end_of_buffer = min(current_position + MATCH_LENGTH_MASK + LENGTH_OFFSET, len(data))
search_start = max(0, current_position - WINDOW_SIZE)
for match_candidate_end in range(end_of_buffer, current_position + LENGTH_OFFSET + 1, -1):
match_candidate = data[current_position:match_candidate_end]
for search_position in range(search_start, current_position):
if match_candidate == get_wrapped_slice(data[search_position:current_position], len(match_candidate)):
return current_position - search_position, len(match_candidate)
The outer loop of this function,
for match_candidate_end in ...
,
is used to create match candidates, starting with the
longest possible candidate and shrinking the candidate length by 1 after every iteration.
For example, if we are working with input data
b"ghxyz"
, and current_position
is 1, corresponding to b"h"
, our first value for
match_candidate
will be
b"hxyz
. Since our match search starts and ends with
b"g"
, a match won’t be found and our next
match_candidate
will be
b"hxy
.
The inner loop of this function,
for search_position in ...
,
checks to see if
match_candidate
is identical to
any previous byte sequences.
The function
get_wrapped_slice
allows
us to find matches where the search sequence is actually shorter than
match_candidate
as we saw in the
first compression example involving
b"zzzzz"
.
The code for
get_wrapped_slice
can be seen here:
def get_wrapped_slice(x: bytes, num_bytes: int) -> bytes:
"""
Examples:
f(b"1234567", 5) -> b"12345"
f(b"123", 5) -> b"12312"
"""
repetitions = num_bytes // len(x)
remainder = num_bytes % len(x)
return x * repetitions + x[:remainder]
Also, if you’re wondering about the LENGTH_OFFSET
constant, it exists because
we only consider substrings of length 2 and greater and just
output any substring of length 1 (9 bits uncompressed is better than a 17-bit
reference for the flag, distance, and length).
Since lengths 0 and 1 are unused, we can encode lengths 2-17 in only 4 bits.
Here is the declaration for LENGTH_OFFSET
along with our other constants:
MATCH_LENGTH_MASK: Final[int] = 0xF
WINDOW_SIZE: Final[int] = 0xFFF
IS_MATCH_BIT: Final[bool] = True
# We only consider substrings of length 2 and greater, and just
# output any substring of length 1 (9 bits uncompressed is better than a 17-bit
# reference for the flag, distance, and length)
# Since lengths 0 and 1 are unused, we can encode lengths 2-17 in only 4 bits.
LENGTH_OFFSET: Final[int] = 2
That’s all there is to it for the compression code!
Decompression
The decompression code is much shorter overall and is covered with the single function shown below:
def decompress(compressed_bytes: bytes) -> bytes:
data = bitarray(endian="big")
data.frombytes(compressed_bytes)
assert data, f"Cannot decompress {compressed_bytes}"
output_buffer = []
while len(data) >= 9: # Anything less than 9 bits is padding
if data.pop(0) != IS_MATCH_BIT:
byte = data[:8].tobytes()
del data[:8]
output_buffer.append(byte)
else:
hi, lo = data[:16].tobytes()
del data[:16]
distance = (hi << 4) | (lo >> 4)
length = (lo & MATCH_LENGTH_MASK) + LENGTH_OFFSET
for _ in range(length):
output_buffer.append(output_buffer[-distance])
return b"".join(output_buffer)
Essentially, this function is just a
while
loop
that decodes bytes until only padding remains.
For our first compression example of
b"zzzzz"
, our compressed bits
should look like
00111101 01000000 00000101 00000000
.
To make this a bit more readable, I’ll change the spacing between groups of bits:
0 01111010 1 00000000 00010100 000000
.
For the order in which they appear,
0
corresponds to the no match flag, not IS_MATCH_BIT
,
01111010
corresponds to the binary representation for
b"z"
,
1
corresponds to the match flag, IS_MATCH_BIT
,
00000000
corresponds to the most significant bits for
match_distance
,
00010100
corresponds to the least significant bits for
match_distance
and bits for
match_length
, and
000000
corresponds to padding bits.
On the first iteration of our
while
loop,
the leading
0
is popped
so the if
branch is executed and
01010001
is interpreted/stored as
b"z"
.
On the second iteration of our loop,
1
is popped so the
else
branch is executed and
the bits
00000000 00010100
are parsed into
match distance
and
length
values.
This simple loop:
for _ in range(length):
output_buffer.append(output_buffer[-distance])
elegantly utilizes the
match distance
and
length
values just decoded.
Summary
That’s all there is to it! If you check out the repo containing this code, you’ll see that everything fits cleanly into less than 100 lines of code.
If you notice any mistakes or have any feedback, please feel free to reach out.