Build pipeline: openpipelines-bio.openpipeline-spatial.update-spatialdata-sdzfd
Source commit: a58c202dcc
Source message: update changelog
88 lines
3.1 KiB
Python
88 lines
3.1 KiB
Python
import shutil
|
|
from anndata import AnnData
|
|
from mudata import write_h5ad
|
|
from h5py import File as H5File
|
|
from h5py import Group, Dataset
|
|
from pathlib import Path
|
|
from typing import Union, Literal
|
|
from functools import partial
|
|
|
|
|
|
def compress_h5mu(
|
|
input_path: Union[str, Path],
|
|
output_path: Union[str, Path],
|
|
compression: Union[Literal["gzip"], Literal["lzf"]],
|
|
):
|
|
input_path, output_path = str(input_path), str(output_path)
|
|
|
|
def copy_attributes(in_object, out_object):
|
|
for key, value in in_object.attrs.items():
|
|
out_object.attrs[key] = value
|
|
|
|
def visit_path(
|
|
output_h5: H5File,
|
|
compression: Union[Literal["gzip"], Literal["lzf"]],
|
|
name: str,
|
|
object: Union[Group, Dataset],
|
|
):
|
|
if isinstance(object, Group):
|
|
new_group = output_h5.create_group(name)
|
|
copy_attributes(object, new_group)
|
|
elif isinstance(object, Dataset):
|
|
# Compression only works for non-scalar Dataset objects
|
|
# Scalar objects dont have a shape defined
|
|
if not object.compression and object.shape not in [None, ()]:
|
|
new_dataset = output_h5.create_dataset(
|
|
name, data=object, compression=compression
|
|
)
|
|
copy_attributes(object, new_dataset)
|
|
else:
|
|
output_h5.copy(object, name)
|
|
else:
|
|
raise NotImplementedError(
|
|
f"Could not copy element {name}, "
|
|
f"type has not been implemented yet: {type(object)}"
|
|
)
|
|
|
|
with (
|
|
H5File(input_path, "r") as input_h5,
|
|
H5File(output_path, "w", userblock_size=512) as output_h5,
|
|
):
|
|
copy_attributes(input_h5, output_h5)
|
|
input_h5.visititems(partial(visit_path, output_h5, compression))
|
|
|
|
with open(input_path, "rb") as input_bytes:
|
|
# Mudata puts metadata like this in the first 512 bytes:
|
|
# MuData (format-version=0.1.0;creator=muon;creator-version=0.2.0)
|
|
# See mudata/_core/io.py, read_h5mu() function
|
|
starting_metadata = input_bytes.read(100)
|
|
# The metadata is padded with extra null bytes up until 512 bytes
|
|
truncate_location = starting_metadata.find(b"\x00")
|
|
starting_metadata = starting_metadata[:truncate_location]
|
|
with open(output_path, "br+") as f:
|
|
nbytes = f.write(starting_metadata)
|
|
f.write(b"\0" * (512 - nbytes))
|
|
|
|
|
|
def write_h5ad_to_h5mu_with_compression(
|
|
output_file: Union[str, Path],
|
|
h5mu: Union[str, Path],
|
|
modality_name: str,
|
|
modality_data: AnnData,
|
|
output_compression=None,
|
|
):
|
|
output_file = Path(output_file)
|
|
h5mu = Path(h5mu)
|
|
output_file_uncompressed = (
|
|
output_file.with_name(output_file.stem + "_uncompressed.h5mu")
|
|
if output_compression
|
|
else output_file
|
|
)
|
|
shutil.copyfile(h5mu, output_file_uncompressed)
|
|
write_h5ad(filename=output_file_uncompressed, mod=modality_name, data=modality_data)
|
|
if output_compression:
|
|
compress_h5mu(
|
|
output_file_uncompressed, output_file, compression=output_compression
|
|
)
|
|
output_file_uncompressed.unlink()
|