Expand source code
def read_xl_chunk(f: DataBuffer, offset: int):
f.seek(offset + 1)
desc_str = ""
while (c := f.read(1)) != b'\0':
desc_str += c.decode()
desc_str = desc_str.rstrip('\n')
m = xl_marker.match(desc_str)
if m is None:
raise ValueError(f"Invalid chunk marker: {desc_str}")
chunk_props: dict[str, int] = {k: hex2signed(v) for k, v in m.groupdict().items() if v is not None}
# value start is at next word boundary
value_start = 8*((f.tell()+7) // 8)
# READ X VECTOR
if 'xoffset' in chunk_props:
x_start = offset - chunk_props['xoffset']
else:
x_start = value_start
# print(f"X starts at {x_start}")
f.seek(x_start)
x_bytes = f.read(chunk_props['xlen'])
x_uncomp = blosc2.decompress(x_bytes, as_bytearray=True)
x_value = np.frombuffer(x_uncomp, dtype='float')
# print(f"x = {x_value} ({len(x_value)})")
# READ Y VECTOR (or single value)
f.seek(value_start)
if chunk_props['type'] == 0xa0:
if chunk_props['csize'] == 8:
y_single, = struct.unpack('d', f.read(8))
y_value = np.full(x_value.shape, y_single)
else:
y_bytes = f.read(chunk_props['csize'])
y_value = np.frombuffer(y_bytes, dtype='float')
# print(f">>> special y data: {y_value}")
# print(f">>> for x values: {x_value}")
else:
y_start = value_start
if chunk_props['type'] in [0x22]:
y_start += chunk_props['xlen']
# print(f"Y starts at {y_start}")
f.seek(y_start)
if chunk_props['type'] in [0x22, 0xa2]:
y_bytes = f.read(chunk_props['ylen'])
else:
y_bytes = f.read(chunk_props['xlen'])
try:
y_uncomp = blosc2.decompress(y_bytes, as_bytearray=True)
except:
print(f"{hex(len(y_bytes))}")
# l = 8*(len(y_bytes) // 8)
# l = chunk_props['npoints']
# l = 124
print(np.frombuffer(y_bytes[:64], dtype='float'))
# hexprint(bytearray(y_bytes))
# print(l)
raise
y_value = np.frombuffer(y_uncomp, dtype='float')
# print(f"y = {y_value} ({len(y_value)})")
nextpos = chunk_props['previous']
return x_value, y_value, nextpos