Module psfdata.psfxl

Functions

def hex2signed(h: str) ‑> int
Expand source code
def hex2signed(h: str) -> int:
    assert len(h) <= 16
    h = h.rjust(16, '0')
    return int.from_bytes(bytes.fromhex(h), byteorder='big', signed=True)
def read_xl_chunk(f: DataBuffer,
offset: int)
Expand source code
def read_xl_chunk(f: DataBuffer, offset: int):
    f.seek(offset + 1)
    desc_str = ""
    while (c := f.read(1)) != b'\0':
        desc_str += c.decode()
    desc_str = desc_str.rstrip('\n')

    m = xl_marker.match(desc_str)
    if m is None:
        raise ValueError(f"Invalid chunk marker: {desc_str}")
    chunk_props: dict[str, int] = {k: hex2signed(v) for k, v in m.groupdict().items() if v is not None}

    # value start is at next word boundary
    value_start = 8*((f.tell()+7) // 8)

    # READ X VECTOR
    if 'xoffset' in chunk_props:
        x_start = offset - chunk_props['xoffset']
    else:
        x_start = value_start

    # print(f"X starts at {x_start}")
    f.seek(x_start)
    x_bytes = f.read(chunk_props['xlen'])
    x_uncomp = blosc2.decompress(x_bytes, as_bytearray=True)
    x_value = np.frombuffer(x_uncomp, dtype='float')
    # print(f"x = {x_value} ({len(x_value)})")

    # READ Y VECTOR (or single value)
    f.seek(value_start)
    if chunk_props['type'] == 0xa0:
        if chunk_props['csize'] == 8:
            y_single, = struct.unpack('d', f.read(8))
            y_value = np.full(x_value.shape, y_single)
        else:
            y_bytes = f.read(chunk_props['csize'])
            y_value = np.frombuffer(y_bytes, dtype='float')
            # print(f">>> special y data: {y_value}")
            # print(f">>> for x values: {x_value}")
    else:
        y_start = value_start
        if chunk_props['type'] in [0x22]:
            y_start += chunk_props['xlen']
        # print(f"Y starts at {y_start}")
        f.seek(y_start)
        if chunk_props['type'] in [0x22, 0xa2]:
            y_bytes = f.read(chunk_props['ylen'])
        else:
            y_bytes = f.read(chunk_props['xlen'])
        try:
            y_uncomp = blosc2.decompress(y_bytes, as_bytearray=True)
        except:
            print(f"{hex(len(y_bytes))}")
            # l = 8*(len(y_bytes) // 8)
            # l = chunk_props['npoints']
            # l = 124
            print(np.frombuffer(y_bytes[:64], dtype='float'))
            # hexprint(bytearray(y_bytes))
            # print(l)
            raise
        y_value = np.frombuffer(y_uncomp, dtype='float')

    # print(f"y = {y_value} ({len(y_value)})")
    nextpos = chunk_props['previous']

    return x_value, y_value, nextpos
def read_xl_signal(f: DataBuffer,
offset: int)
Expand source code
def read_xl_signal(f: DataBuffer, offset: int):
    nextpos = offset
    xs = []
    ys = []
    i = 0

    while nextpos != -1:
        x, y, nextpos = read_xl_chunk(f, nextpos)
        xs.append(x)
        ys.append(y)
        i += 1
    x = np.concatenate(list(reversed(xs)))
    lst = list(reversed(ys))
    y = np.concatenate(lst)
    from psfdata import Waveform
    wfm = Waveform(x, 's', y, 'V')
    return wfm

Classes

class DataBuffer (f: _io.BufferedReader)
Expand source code
class DataBuffer:
    def __init__(self, f: BufferedReader):
        self._data = f.read()
        self._pos = 0

    def seek(self, pos: int):
        self._pos = pos

    def tell(self) -> int:
        return self._pos

    def read(self, nbytes: int) -> bytes:
        d = self._data[self._pos: self._pos+nbytes]
        self._pos += nbytes
        return d

Methods

def read(self, nbytes: int) ‑> bytes
Expand source code
def read(self, nbytes: int) -> bytes:
    d = self._data[self._pos: self._pos+nbytes]
    self._pos += nbytes
    return d
def seek(self, pos: int)
Expand source code
def seek(self, pos: int):
    self._pos = pos
def tell(self) ‑> int
Expand source code
def tell(self) -> int:
    return self._pos