Source code for gavo.grammars.hdf5grammar

"""
A grammar producing rows from a table within an HDF5 file.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import h5py

from gavo.grammars import common
from gavo import base


[docs]class AstropyHDF5TableIterator(common.RowIterator): """A row iterator generating rawdicts from Astropy-serialised HDF5 tables. The table is assumed to contain record arrays; NULL values are properly handled through associated .mask columns. """ def _makeRowBuilder(self, sourceDS): """returns a function that builds rawdicts from a h5py dataset sourceDS. It is aware of the astropy convention of adding boolean .mask columns and uses these to produced Nones. """ names, nullMap = [], {} for index, name in enumerate(sourceDS.dtype.names): if name.endswith(".mask"): nullMap[index] = name[:-5] names.append(None) else: names.append(name) if nullMap: def makeRow(row): res = dict(zip(names, row)) del res[None] for index, name in nullMap.items(): if row[index]: res[name] = None return res return makeRow else: return lambda row: dict(zip(names, row)) def _iterRows(self): hdf = h5py.File(self.sourceToken, "r") try: sourceDS = hdf[self.grammar.dataset] except KeyError: raise base.ReportableError(f"Dataset {self.grammar.dataset} not" f" found in {self.sourceToken}. The following datasets" " are visible in the root: "+(", ".join(hdf.keys()))) buildRow = self._makeRowBuilder(sourceDS) for row in sourceDS: yield buildRow(row)
[docs]class VaexHDF5TableIterator(common.RowIterator): """A row iterator generating rawdicts from Vaex-serialised HDF5 tables. Here, the columns come in separate arrays, much like FITS tables. """ _chunkSize = 10000 def _iterRows(self): hdf = h5py.File(self.sourceToken, "r") try: sourceDS = hdf[self.grammar.dataset] except KeyError: raise base.ReportableError(f"Dataset {self.grammar.dataset} not" f" found in {self.sourceToken}. Note that we want the" " parent of the columns group here. The following datasets" " are visible in the root: "+(", ".join(hdf.keys()))) cols = sourceDS["columns"] names = cols.keys() arrs = [c["data"] for c in cols.values()] for offset in range(0, arrs[0].shape[0], self._chunkSize): curArrs = [arr[offset:offset+self._chunkSize] for arr in arrs] for index in range(len(curArrs[0])): yield dict(zip(names, [arr[index] for arr in curArrs]))
[docs]class HDF5Grammar(common.Grammar): """a grammar for parsing single tables from HDF5 files. These result in typed records, i.e., values normally come in the types they are supposed to have. The keys in the rows are the column names as given in the HDF file. Regrettably, there are about as many conventions to serialise tables in HDF5 as there are programmes writing HDF5. This grammar supports a few styles; ask to have more included. Styles currently implemented: :astropy: The table comes as a record array. The grammar is aware of the astropy convention of using adding mask columns as name+".mask" and will turn masked values to Nones. :vaex: The table comes as a group with the columns as individual arrays in the group member's data dataset. Put the parent of the columns group into the dataset attribute here. This class is not intended for ingesting large HDF5 files, as it will only process a few thousand rows per second on usual hardware. Use `Element directgrammar`_ for large files. """ name_ = "hdf5Grammar" _dataset = base.UnicodeAttribute("dataset", default=base.Undefined, description="The name of the HDF5 dataset/group containing the table." " At this point, only datasets that are children of root are" " supported.", copyable=True) _style = base.EnumeratedUnicodeAttribute("style", default="astropy", validValues=["astropy", "vaex"], description="Style of the table serialisation.", copyable=True) rowIterator = AstropyHDF5TableIterator
[docs] def onElementComplete(self): if self.style=="vaex": self.rowIterator = VaexHDF5TableIterator
if __name__=="__main__": f = h5py.File("/home/msdemlei/tmp/hdparse/hdex.hdf5", "r") table = f["testdata"] print(table.shape) print(table.dtype) for row in table: print(row)