Source code for gavo.grammars.directhdf5

"""
Helpers for generating boosters for HDF5 data.

HDF5 is fairly complex, and directgrammar is too long as is.  Also, I don't
want to require h5py as a fixed dependency of DaCHS; if it's not
there, you should still be able to use other sorts of direct grammars.

TODO: Most of the hdf interface functions return 0 on success, != on
failure; we'd like a macro to catch these.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import h5py

from gavo import base
from gavo import utils

# See directgrammar.HDF5vaexCodeGenerator for how this import works
from gavo.grammars import directgrammar


[docs]class BaseHDF5CodeGenerator(directgrammar._NumpyMetaCodeGenerator): """An abstract base for generating boosters for various sorts of HDF5 files. Our strategy when parsing from them is to read CHUNK_SIZE items at a time into the corresponding arrays, and then iterating over these chunks, building the records. The difference between the various flavours is where the column metadata is taken from and where the data is then pulled from. """ _h5typemap = { "byte": "H5T_NATIVE_CHAR", "int8": "H5T_NATIVE_CHAR", "ubyte": "H5T_NATIVE_UCHAR", "uint8": "H5T_NATIVE_UCHAR", "short": "H5T_NATIVE_SHORT", "int16": "H5T_NATIVE_SHORT", "ushort": "H5T_NATIVE_USHORT", "uint16": "H5T_NATIVE_USHORT", "cint": "H5T_NATIVE_INT", "int32": "H5T_NATIVE_INT", "uint": "H5T_NATIVE_UINT", "uintc": "H5T_NATIVE_UINT", "int64": "H5T_NATIVE_LLONG", "uint64": "H5T_NATIVE_ULLONG", "ulonglong": "H5T_NATIVE_ULLONG", "single": "H5T_NATIVE_FLOAT", "float16": "H5T_NATIVE_FLOAT", "float32": "H5T_NATIVE_FLOAT", "double": "H5T_NATIVE_DOUBLE", "float64": "H5T_NATIVE_DOUBLE", } def __init__(self, grammar, tableDef): directgrammar._NumpyMetaCodeGenerator.__init__(self, grammar, tableDef) if self.grammar.parent.sources is None: raise base.StructureError("Cannot make HDF5 vaex booster without" " a sources element on the embedding data.") try: self.sampleSource = next(self.grammar.parent.sources.iterSources()) hdf = h5py.File(self.sampleSource) except StopIteration: raise base.StructureError("Building an HDF5 booster requires" " at least one matching source.") cols = self.getColumns(hdf) self.inputColumns = [] for name, numpyType in cols: if numpyType.ndim!=0: raise NotImplementedError("Cannot handle arrays in vaex" " HDF5 yet. Please contact the authors.") try: cType = self.numpyTypes[str(numpyType)] except KeyError: raise NotImplementedError("Cannot handle the numpy type" f" {numpyType} yet. Please contact the authors.") self.inputColumns.append((name, numpyType, cType)) hdf.close() nameMap = {} if self.grammar.mapKeys: nameMap = self.grammar.mapKeys.maps self.byName = utils.CaseSemisensitiveDict( (nameMap.get(t[0], t[0]), t) for t in self.inputColumns) def _getStructDefinition(self): """returns a definition for a C structure for holding one record. """ res = ["typedef struct InRec_s {"] for name, _, cType in self.inputColumns: res.append(f" {cType} {name};") res.append("} InRec;") return "\n".join(res) def _getArrayDecls(self): """returns the declarations for the buffer arrays. """ return "\n".join( f" {cType} arr_{name}[CHUNK_SIZE];" for name, _, cType in self.inputColumns) def _getRefillCode(self): """returns code to (up to) CHUNK_SIZE items into the column arrays. """ res = [ "offsets[0] = total_read;", "if (total_read+chunk_size[0]>nrecs) {", " chunk_size[0] = nrecs-total_read;", " HDFGUARD(H5Sselect_hyperslab(memspace, H5S_SELECT_SET," " null_offset, NULL, chunk_size, NULL));", "}", "HDFGUARD(H5Sselect_hyperslab(colspace, H5S_SELECT_SET," " offsets, NULL, chunk_size, NULL));",] for index, (name, npType, cType) in enumerate(self.inputColumns): memTypeId = self._h5typemap[str(npType)] res.append( f"HDFGUARD(H5Dread(datasets[{index}], {memTypeId}, memspace," f" colspace, H5P_DEFAULT, arr_{name}));") return "\n".join(res) def _getTupleBuilder(self): """returns code to build the InRec struct passed to getTuple. """ res = [] for index, (name, _, cType) in enumerate(self.inputColumns): res.append(f"cur_rec.{name} = arr_{name}[index_in_chunk];"); return "\n".join(res)
[docs] def getPreamble(self): n_cols = len(self.inputColumns) return directgrammar._NumpyMetaCodeGenerator.getPreamble(self )+[ "#include <inttypes.h>", "#include <stdlib.h>", "#include <hdf5.h>", f"#define N_COLS {n_cols}", # how many rows to process in one go? "#define CHUNK_SIZE {}".format( self.grammar.getProperty("chunkSize", 5000)), self._getStructDefinition(), "#define HDFGUARD(x) if ((x)<0) abort();",]
[docs] def getPrototype(self): return "Field *getTuple(InRec *data, int rowIndex)"
[docs] def getFooter(self): return "\n".join([ directgrammar.COMMON_MAIN_HEADER, " hid_t input_id;" " hid_t datasets[N_COLS];", " hid_t colspace;", " size_t nrecs;", " int rank;", " hsize_t dims[1], offsets[1];", " hsize_t null_offset[1] = {0};", " hsize_t chunk_size[1] = {CHUNK_SIZE};", " hid_t memspace = H5Screate_simple(1, chunk_size, NULL);", # index_in_chunk set so we immediately read a chunk at startup " size_t index_in_chunk = CHUNK_SIZE;", " size_t total_read = 0;", " InRec cur_rec;", "", directgrammar.COMMON_MAIN_INTRO, " input_id = H5Fopen(argv[1], H5F_ACC_RDONLY, H5P_DEFAULT);", " if (input_id<0) abort();", " HDFGUARD(H5Sselect_hyperslab(memspace, H5S_SELECT_SET," " null_offset, NULL, chunk_size, NULL));", self._getArrayDecls(), self._getDatasetsCode(), ' colspace = H5Dget_space(datasets[0]);', ' rank = H5Sget_simple_extent_ndims(colspace);', ' if (rank>1) abort();', ' H5Sget_simple_extent_dims(colspace, dims, NULL);', ' nrecs = dims[0];', " while (total_read<nrecs) {", directgrammar.LOOP_BODY_INTRO, " if (index_in_chunk>=CHUNK_SIZE) { ", utils.fixIndentation(self._getRefillCode(), " "), " index_in_chunk = 0;", " }", utils.fixIndentation(self._getTupleBuilder(), " "), " total_read += 1;", " index_in_chunk += 1;", " tuple = getTuple(&cur_rec, total_read-1);", directgrammar.LOOP_BODY_FOOT, " }", directgrammar.COMMON_MAIN_FOOT,])
[docs] def getItemParser(self, item, index): nameForItem = directgrammar.getNameForItem(item) typeMacro = directgrammar._getMakeMacro(item) res = [ f"/* {item.description} ({item.type}) */",] if item.name not in self.byName: res.append( f"MAKE_NULL({nameForItem});" f"/* {typeMacro}({nameForItem}, FILL IN VALUE); */") else: origName, _, castTo = self.byName[item.name] # there is a source column in the HDF5, make a default map res.append( f"{typeMacro}({nameForItem}, (({castTo})(data->{origName})));") return res
[docs] def getColumns(self): """has to return a sequence of (name, type) pairs for the columns of the relation encoded in the HDF5 file. """ raise NotImplementedError("HDF5 getColumns")
def _getDatasetsCode(self): """has to return C code to fill datasets[0...input_columns] with the values the C code should to process. """ raise NotImplementedError("HDF5 _getDatasetsCode")
[docs]class HDF5vaexCodeGenerator(BaseHDF5CodeGenerator): """A code generator for boosters importing HDF5 files in VAEX convention. These have one array per column in a "columns" group; the actual data is in a "data" group. Our strategy when parsing from them is to read CHUNK_SIZE items at a time into the corresponding arrays, and then iterating over these chunks, building the records. """
[docs] def getColumns(self, hdf): # SIDE EFFECT: self.dsName is the dataset name to read from after # this function. try: self.dsName = self.grammar.getProperty("dataset", "table") cols = hdf[self.dsName]["columns"] except KeyError: raise base.StructureError(f"Cannot access dataset {self.dsName} in" f" {self.sampleSource}. Override the grammar's dataset property" " to point it to the right dataset.") return [(name, col["data"].dtype) for name, col in cols.items()]
def _getDatasetsCode(self): """returns code to create the datasets for the columns; the result is kept in a datasets array. """ res = [] for index, (name, _, cType) in enumerate(self.inputColumns): col_ds_name = "/".join([self.dsName, "columns", name, "data"]) res.extend([ f" datasets[{index}] = H5Dopen2(" f'input_id, "{col_ds_name}", H5P_DEFAULT);', f" if (datasets[{index}]<0) abort();"]) return "\n".join(res)
[docs]class HDF5rootarrCodeGenerator(BaseHDF5CodeGenerator): """A code generator for boosters importing HDF5 files having their columns in arrays in the root dataset. """
[docs] def getColumns(self, hdf): return [(name, col.dtype) for name, col in hdf.items()]
def _getDatasetsCode(self): """returns code to create the datasets for the columns; the result is kept in a datasets array. """ res = [] for index, (name, _, cType) in enumerate(self.inputColumns): res.extend([ f" datasets[{index}] = H5Dopen2(" f'input_id, "{name}", H5P_DEFAULT);', f" if (datasets[{index}]<0) abort();"]) return "\n".join(res)