Source code for compressor.core

"""
``compressor.core``

Low-level functionality with the core of the process that the main
program makes use of.

It contains auxiliary functions.
"""
import binascii
import heapq
from collections import Counter
from functools import total_ordering
from typing import List, Sequence, io  # type: ignore

from compressor.constants import BUFF_SIZE, BYTE, ENC, LEFT, RIGHT
from compressor.util import (StreamFile, default_filename, pack, tobinary,
                             unpack)


[docs]@total_ordering class CharNode: """ Object that wraps/encapsulates the definition of a character in the text being processed. Used for comparison, and helper with its properties & methods. """ def __init__(self, value, freq, left=None, right=None) -> None: """ Represent a character as a node in a tree. :param value: the original character :param freq: float with the occurrence average of `value` in the processed text. :param left: left child of this node. :param right: right child of this node in the tree. """ self._value = value self.freq = freq self.left = left self.right = right def __le__(self, other) -> bool: """ Compare if this character is less or equal than another one of the same kind. :param other: Another CharNode with properties. :return: self <= other :rtype: bool """ if self.__class__ is other.__class__: return self.freq <= other.freq return NotImplemented def __eq__(self, other) -> bool: if self.__class__ is other.__class__: return self.freq == other.freq return NotImplemented def __hash__(self): return hash(self.freq) @property def value(self): """Expose the value being hold as read-only.""" return self._value @property def leaf(self) -> bool: """ Checks if the current node is a leaf in the tree. It is a leaf when it does not have any children (neither left nor right). :return: True if this node has no children, False otherwise. """ return self.left is None and self.right is None
[docs]def create_tree_code(charset: List[CharNode]) -> CharNode: """ Receives a :list: of :CharNode: (characters) charset, namely leaves in the tree, and returns a tree with the corresponding prefix-free code. :param charset: iterable with all the characters to process. :return: iterable with a tree of the prefix-free code for the charset. """ alpha_heap = charset heapq.heapify(alpha_heap) for _ in charset[:-1]: left_char = heapq.heappop(alpha_heap) right_char = heapq.heappop(alpha_heap) new_symbol = CharNode( value='{0.value}{1.value}'.format(left_char, right_char), freq=left_char.freq + right_char.freq, left=left_char, right=right_char ) heapq.heappush(alpha_heap, new_symbol) return heapq.heappop(alpha_heap)
[docs]def parse_tree_code(tree: CharNode, table: dict = None, code: bytes = b'') -> dict: """ Given the tree with the chars-frequency processed, return a table that maps each character with its binary representation on the new code: left --> 0 right --> 1 :param tree: iterable with the tree as returned by `create_tree_code` :param table: Map with the translation for the characters to its code in the new system (prefix-free). :param code: The code prefix so far. :return: Mapping with with the original char to its new code. """ table = table or {} if tree.leaf: table[tree.value] = code return table table.update(parse_tree_code(tree.left, table, code + LEFT)) table.update(parse_tree_code(tree.right, table, code + RIGHT)) return table
[docs]def process_frequencies(stream: Sequence[str]) -> List[CharNode]: """ Given a stream of text, return a list of CharNode with the frequencies for each character. :param stream: sequence with all the characters. """ counts = Counter(stream) return [CharNode(value=value, freq=freq) for value, freq in counts.items()]
[docs]def save_table(dest_file: io, table: dict) -> None: """ Store the table in the destination file. c: char L: code of c (unsigned Long) :param dest_file: opened file where to write the `table`. :param table: Mapping table with the chars and their codes. """ offset = len(table) tokens = [(char.encode(ENC), int(b'1' + codec, base=2)) for char, codec in table.items()] chars, codecs = zip(*tokens) dest_file.write(pack('i', offset)) dest_file.write(pack('{0}c'.format(offset), *chars)) dest_file.write(pack('{0}L'.format(offset), *codecs))
[docs]def process_line_compression(buffer_line: str, output_file: io, table: dict) -> None: """ Transform `buffer_line` into the new code, per-byte, based on `table` and save the new byte-stream into `output_file`. :param buffer_line: a chunk of the text to process. :param output_file: The opened file where to write the result. :param table: Translation table for the characters in `buffer_line`. """ bitarray = [] # type: list chr_buffer = b'' for char in buffer_line: encoded_char = table[char] chr_buffer += encoded_char bitarray.extend(int(chr(x)) for x in encoded_char) array_str = ''.join(str(x) for x in bitarray) # Add a sentinel first bit array_str = '1' + array_str array_str += '0' * (BYTE - (len(array_str) % BYTE)) # 0-pad stream = hex(int(array_str, 2))[2:] block = binascii.a2b_hex(stream) block_length = len(array_str) // BYTE original_length = len(buffer_line) output_file.write(pack('I', block_length)) output_file.write(pack('I', original_length)) output_file.write(block)
[docs]def compress_and_save_content(input_filename: str, output_file: io, table: dict) -> None: """ Opens and processes <input_filename>. Iterates over the file and writes the contents on output_file. :param input_filename: the source to be compressed :param output_file: opened file where to write the outcome :param table: mapping table for the char encoding """ with StreamFile(input_filename, BUFF_SIZE) as source: for buff in source: process_line_compression(buff, output_file, table)
def _sizeof(code: str) -> int: sizes = {'i': 4, 'c': 1, 'L': 4, 'I': 4} return sizes.get(code, 1)
[docs]def retrieve_table(dest_file: io) -> dict: """ Read the binary file, and return the translation table as a reversed dictionary. """ offset, *_ = unpack('i', dest_file.read(_sizeof('i'))) chars = dest_file.read(offset * _sizeof('c')) codes = dest_file.read(offset * _sizeof('L')) chars = unpack('{}c'.format(offset), chars) codes = unpack('{}L'.format(offset), codes) return {"b{0}".format(tobinary(code)): str(char, encoding=ENC) for char, code in zip(chars, codes)}
def _save_checksum(ofile: io, checksum: int): """Persist the number of bytes as the first byte in <ofile>.""" ofile.write(pack('L', checksum)) def _retrieve_checksum(ifile: io) -> int: rawdata = ifile.read(_sizeof('L')) return unpack('L', rawdata)[0]
[docs]def save_compressed_file(filename: str, table: dict, checksum: int, dest_file: str = '') -> None: """ Given the original file by its `filename`, save a new one. `table` contains the new codes for each character on `filename`. """ new_file = dest_file or default_filename(filename) with open(new_file, 'wb') as target: _save_checksum(target, checksum) save_table(target, table) compress_and_save_content(filename, target, table)
def _decode_block(binary_content: bytes, table: dict, block_length: int) -> str: """Transform the compressed content of a block into the original text.""" newchars = [] cont = binascii.hexlify(binary_content) bitarray = tobinary(cont) # Ignore first bit, sentinel bitarray = bitarray[1:] window_start, window_end = 0, 1 part = bitarray[window_start:window_end] restored = 0 # bytes while part: char = table.get(bitarray[window_start:window_end], False) while not char and bitarray[window_start:window_end]: window_end += 1 char = table.get(bitarray[window_start:window_end], False) newchars.append(char) restored += 1 if restored == block_length: break window_start, window_end = window_end, window_end + 1 part = bitarray[window_start:window_end] return ''.join(newchars)[:block_length]
[docs]def decode_file_content(compfile: io, table: dict, checksum: int) -> str: """ Reconstruct the remaining part of the <compfile>, starting right after the metadata, decoding each bit according to the <table>. """ original_stream = '' next_block = compfile.read(_sizeof('I')) while len(original_stream) < checksum and next_block: block_size, *_ = unpack('I', next_block) block_length, *_ = unpack('I', compfile.read(_sizeof('I'))) binary_content = compfile.read(block_size) retrieved = _decode_block(binary_content, table, block_length) original_stream += retrieved next_block = compfile.read(_sizeof('I')) return original_stream
def _reorganize_table_keys(table: dict) -> dict: """Change the keys of the table to be more easily readable: bytes->str""" return {k[2:]: v for k, v in table.items()}
[docs]def retrieve_compressed_file(filename: str, dest_file: str = '') -> None: """ EXTRACT - Reconstruct the original file from the compressed copy. Write the output in the indicated `dest_file`. """ with open(filename, 'rb') as src: checksum = _retrieve_checksum(src) map_table = retrieve_table(src) table = _reorganize_table_keys(map_table) dest_filename = dest_file or default_filename(filename, suffix='extr') stream = decode_file_content(src, table, checksum) # Dump the decoded extraction into its destination with open(dest_filename, 'w+') as out: out.write(stream)