Module psfdata.psfbin

Classes

class PsfBinFile (path: pathlib._local.Path)
Expand source code
class PsfBinFile(PsfFile):
    _path: Path
    _is_sweep: bool
    _is_psfxl_index: bool
    _data: MemoryViewAbs

    _toc: dict[SectionType, SectionInfo]
    _header: dict[str, str | int | float | tuple]
    _sweep_info: dict[str, Any] | None
    _signal_info: dict[str, dict]
    _value_section: SimpleValueSection | SweepValueSection | None

    def __init__(self, path: Path) -> None:
        logger.info(f"Loading PSF file: {path}")

        self._header = {}
        self._sweep_info = None
        self._signal_info = {}

        self._is_sweep = False
        self._value_section = None

        self._path = path

        with open(path, "rb") as f:
            self._data = MemoryViewAbs(f.read())
        logger.info(f"Size is {len(self._data)} bytes.")

        signature = self._data.read_int32(peek=True)
        assert signature in VALID_SIGNATURES

        self._toc = {}
        if self._validate(self._data.data):
            self._read_toc()
            self.is_psfxl_index = False
        else:
            logger.info(f"No TOC found, assuming this is a PSFXL index file.")
            self.is_psfxl_index = True

        self._read_header()
        self._read_defs()
        if not self.is_psfxl_index:
            self._read_traces()

    def __str__(self) -> str:
        cl = self.__class__.__name__

        if self.is_psfxl_index:
            s = f"{cl}:{self._path.name}: PSF-XL index file"
        else:
            s = f"{cl}:{self._path.name}: {len(self.names)} signals"
        if self._is_sweep:
            s += f" (sweep: {self._npoints} points)"
        return s

    @property
    def header(self) -> dict[str, Any]:
        return self._header

    @property
    def sweep_info(self) -> SignalDef | None:
        if self._sweep_section is None:
            return None
        return self._sweep_section.sweep_def

    @property
    def names(self) -> list[str]:
        if SectionType.TRACE in self._toc:
            return [t.name for t in self._trace_section.flattened()]
        else:
            if isinstance(self._value_section, SimpleValueSection):
                return list(self._value_section.values.keys())
            else:
                return list(self._trace_section.traces_by_name.keys())

    def signal_info(self, name: str) -> SignalDef | Group | None:
        if self._is_sweep:
            return self._trace_section.traces_by_name[name]
        else:
            # TODO:
            return None

    def get_signal(self, name: str) -> Waveform | int | float | dict:
        if self.is_psfxl_index:
            fn = Path(self._path.parent / (self._path.name+".psfxl"))
            with open(fn, 'rb') as f:
                rdr = DataBuffer(f)
            psfxl_idx = self._trace_section.traces_by_name[name].properties['psfxl_idx']
            assert isinstance(psfxl_idx, tuple)
            assert isinstance(psfxl_idx[0], int)
            wfm = read_xl_signal(rdr, psfxl_idx[0])
            wfm.name = name
            return wfm
        elif self.sweep_info is not None:
            xunits = self.sweep_info.properties.get('units', '-')
            info = self.signal_info(name)
            yunits = info.properties.get('units', '-')
            wfm = Waveform(self._swept_values, xunits, self._trace_values[name], yunits, name)
            return wfm
        else:
            return self._value_section.get_signal(name)

    def get_signals(self, names: Iterable[str]) -> dict[str, Waveform]:
        if self.is_psfxl_index:
            fn = Path(self._path.parent / (self._path.name+".psfxl"))
            with open(fn, 'rb') as f:
                rdr = DataBuffer(f)

            result = {}
            for name in names:
                psfxl_idx = self._trace_section.traces_by_name[name].properties['psfxl_idx']
                assert isinstance(psfxl_idx, tuple)
                assert isinstance(psfxl_idx[0], int)
                wfm = read_xl_signal(rdr, psfxl_idx[0])
                wfm.name = name
                result[name] = wfm

            return result
        else:
            return {}

    @staticmethod
    def _validate(data: memoryview) -> bool:
        return data[-12:-4].tobytes() == b"Clarissa"

    def _read_toc(self):
        logger.info("Read table of contents...")
        # last int32 is the combined size of the sections, excluding the TOC
        datasize = self._data.get_int32(-4)
        # TOC is a number of 8-byte blocks following the last section,
        # and before the 12-byte footer:
        nsections = (self._data.abspos + len(self._data) - 12 - datasize) // 8
        toc_start = (len(self._data) - 12 - 8 * nsections)

        types = []
        offsets = []
        for s in range(nsections):
            types.append(self._data.get_int32(toc_start + 8 * s))
            offsets.append(self._data.get_int32(toc_start + 8*s + 4))
        offsets.append(toc_start)

        logger.info("Sections are:")
        for s in range(nsections):
            t = SectionType(types[s])
            self._toc[t] = SectionInfo(t, offsets[s], offsets[s+1] - offsets[s])

            logger.info(f"    {self._toc[t]}")

    def _read_header(self) -> None:
        if self.is_psfxl_index:
            header_section = HeaderSection(self._data[4:])
        else:
            s = self._toc[SectionType.HEADER]
            header_section = HeaderSection(self._data[s.offset:s.offset+s.size])
        self._rest = header_section._tail
        self._header = header_section.props
        self._is_sweep = self._header["PSF sweeps"] != 0
        self._npoints = self._header["PSF sweep points"]
        self._is_windowed = "PSF window size" in self._header

    def _read_defs(self) -> None:
        """Read type/sweep/trace sections"""

        if self.is_psfxl_index:
            self._type_section = TypeSection(self._rest)
        else:
            s = self._toc[SectionType.TYPE]
            self._type_section = TypeSection(self._data[s.offset:s.offset+s.size])

        self._rest = self._type_section._tail

        self._sweep_section = None
        if self._is_sweep:
            if self.is_psfxl_index:
                self._sweep_section = SweepSection(self._rest, self._type_section.typedefs)
            else:
                s = self._toc[SectionType.SWEEP]
                self._sweep_section = SweepSection(self._data[s.offset:s.offset+s.size], self._type_section.typedefs)
            self._rest = self._sweep_section._tail

        if self.is_psfxl_index:
            self._trace_section = TraceSection(self._rest, self._type_section.typedefs)
        elif SectionType.TRACE in self._toc:
            s = self._toc[SectionType.TRACE]
            self._trace_section = TraceSection(self._data[s.offset:s.offset+s.size], self._type_section.typedefs)

    def _read_traces(self) -> None:
        """Read ValueSection"""
        s = self._toc[SectionType.VALUE]
        data = self._data[s.offset:s.offset+s.size]
        if self._is_sweep:
            assert self._sweep_section is not None
            self._value_section = SweepValueSection(data, self._sweep_section, self._trace_section,
                                                    is_windowed=self._is_windowed)

            if self._value_section.endpos == 0xFFFFFFFF:
                return

            self._swept_values, self._trace_values = self._value_section.get_data(self._npoints)
        else:
            self._value_section = SimpleValueSection(data, self._type_section.typedefs)

Base class for ASCII and binary PSF files (and other file formats?)

Ancestors

Instance variables

prop header : dict[str, typing.Any]
Expand source code
@property
def header(self) -> dict[str, Any]:
    return self._header
prop names : list[str]
Expand source code
@property
def names(self) -> list[str]:
    if SectionType.TRACE in self._toc:
        return [t.name for t in self._trace_section.flattened()]
    else:
        if isinstance(self._value_section, SimpleValueSection):
            return list(self._value_section.values.keys())
        else:
            return list(self._trace_section.traces_by_name.keys())
prop sweep_infoSignalDef | None
Expand source code
@property
def sweep_info(self) -> SignalDef | None:
    if self._sweep_section is None:
        return None
    return self._sweep_section.sweep_def

Methods

def get_signal(self, name: str) ‑> Waveform | int | float | dict
Expand source code
def get_signal(self, name: str) -> Waveform | int | float | dict:
    if self.is_psfxl_index:
        fn = Path(self._path.parent / (self._path.name+".psfxl"))
        with open(fn, 'rb') as f:
            rdr = DataBuffer(f)
        psfxl_idx = self._trace_section.traces_by_name[name].properties['psfxl_idx']
        assert isinstance(psfxl_idx, tuple)
        assert isinstance(psfxl_idx[0], int)
        wfm = read_xl_signal(rdr, psfxl_idx[0])
        wfm.name = name
        return wfm
    elif self.sweep_info is not None:
        xunits = self.sweep_info.properties.get('units', '-')
        info = self.signal_info(name)
        yunits = info.properties.get('units', '-')
        wfm = Waveform(self._swept_values, xunits, self._trace_values[name], yunits, name)
        return wfm
    else:
        return self._value_section.get_signal(name)
def get_signals(self, names: Iterable[str]) ‑> dict[str, Waveform]
Expand source code
def get_signals(self, names: Iterable[str]) -> dict[str, Waveform]:
    if self.is_psfxl_index:
        fn = Path(self._path.parent / (self._path.name+".psfxl"))
        with open(fn, 'rb') as f:
            rdr = DataBuffer(f)

        result = {}
        for name in names:
            psfxl_idx = self._trace_section.traces_by_name[name].properties['psfxl_idx']
            assert isinstance(psfxl_idx, tuple)
            assert isinstance(psfxl_idx[0], int)
            wfm = read_xl_signal(rdr, psfxl_idx[0])
            wfm.name = name
            result[name] = wfm

        return result
    else:
        return {}
def signal_info(self, name: str) ‑> SignalDef | Group | None
Expand source code
def signal_info(self, name: str) -> SignalDef | Group | None:
    if self._is_sweep:
        return self._trace_section.traces_by_name[name]
    else:
        # TODO:
        return None

Inherited members

class SectionInfo (type: SectionType,
offset: int,
size: int)
Expand source code
@dataclass
class SectionInfo:
    type: SectionType
    offset: int
    size: int

SectionInfo(type: psfdata.psfbin_defs.SectionType, offset: int, size: int)

Instance variables

var offset : int

The type of the None singleton.

var size : int

The type of the None singleton.

var typeSectionType

The type of the None singleton.