# -*- coding: utf-8 -*-
# Copyright 2024 Matthew Fitzpatrick.
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, version 3.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <https://www.gnu.org/licenses/gpl-3.0.html>.
r"""For running STEM simulations.
"""
#####################################
## Load libraries/packages/modules ##
#####################################
# For performing deep copies of objects.
import copy
# For performing operations on file and directory paths.
import pathlib
# For explicit garbage collection.
import gc
# For general array handling.
import numpy as np
# For validating and converting objects.
import czekitout.check
import czekitout.convert
# For defining classes that support enforced validation, updatability,
# pre-serialization, and de-serialization.
import fancytypes
# For loading objects from and saving objects to HDF5 files.
import h5pywrappers
# For conveniently extracting certain properties of probe models.
import embeam
# For postprocessing ``hyperspy`` signals.
import empix
# For creating a :obj:`pyprismatic.Metadata` object that is responsible for
# running the ``prismatic`` simulation.
import pyprismatic
# For validating, pre-serializing, and de-pre-serializing instances of the
# classes :class:`prismatique.worker.Params`,
# :class:`prismatique.stem.system.ModelParams`, and
# :class:`prismatique.stem.output.Params`.
import prismatique.worker
import prismatique.stem.system
import prismatique.stem.output
# For validating instances of the classes
# :class:`prismatique.sample.ModelParams`,
# :class:`prismatique.sample.PotentialSliceSubsetIDs`,
# :class:`prismatique.sample.SMatrixSubsetIDs`, and
# :class:`prismatique.sample.PotentialSliceAndSMatrixSubsetIDs`; for calculating
# quantities related to the modelling of the sample; for validating certain
# filenames; and for importing various other helper functions.
import prismatique.sample
# For generating probe positions.
import prismatique.scan
# For generating and postprocessing CBED patterns.
import prismatique._signal
# For recycling helper functions and/or constants.
import prismatique.hrtem.sim
##################################
## Define classes and functions ##
##################################
# List of public objects in objects.
__all__ = ["Params",
"run"]
def _check_and_convert_stem_system_model_params(params):
module_alias = prismatique.stem.system
func_alias = module_alias._check_and_convert_stem_system_model_params
stem_system_model_params = func_alias(params)
return stem_system_model_params
def _pre_serialize_stem_system_model_params(stem_system_model_params):
obj_to_pre_serialize = stem_system_model_params
module_alias = prismatique.stem.system
func_alias = module_alias._pre_serialize_stem_system_model_params
serializable_rep = func_alias(obj_to_pre_serialize)
return serializable_rep
def _de_pre_serialize_stem_system_model_params(serializable_rep):
module_alias = prismatique.stem.system
func_alias = module_alias._de_pre_serialize_stem_system_model_params
stem_system_model_params = func_alias(serializable_rep)
return stem_system_model_params
def _check_and_convert_output_params(params):
module_alias = prismatique.stem.output
func_alias = module_alias._check_and_convert_output_params
output_params = func_alias(params)
return output_params
def _pre_serialize_output_params(output_params):
obj_to_pre_serialize = output_params
module_alias = prismatique.stem.output
func_alias = module_alias._pre_serialize_output_params
serializable_rep = func_alias(obj_to_pre_serialize)
return serializable_rep
def _de_pre_serialize_output_params(serializable_rep):
module_alias = prismatique.stem.output
func_alias = module_alias._de_pre_serialize_output_params
output_params = func_alias(serializable_rep)
return output_params
def _check_and_convert_worker_params(params):
module_alias = prismatique.worker
func_alias = module_alias._check_and_convert_worker_params
worker_params = func_alias(params)
return worker_params
def _pre_serialize_worker_params(worker_params):
obj_to_pre_serialize = worker_params
module_alias = prismatique.worker
func_alias = module_alias._pre_serialize_worker_params
serializable_rep = func_alias(obj_to_pre_serialize)
return serializable_rep
def _de_pre_serialize_worker_params(serializable_rep):
module_alias = prismatique.worker
func_alias = module_alias._de_pre_serialize_worker_params
worker_params = func_alias(serializable_rep)
return worker_params
_module_alias = \
prismatique.worker
_default_output_params = \
None
_default_worker_params = \
_module_alias._default_worker_params
_default_skip_validation_and_conversion = \
_module_alias._default_skip_validation_and_conversion
[docs]class Params(fancytypes.PreSerializableAndUpdatable):
r"""The STEM simulation parameters.
Parameters
----------
stem_system_model_params : :class:`prismatique.stem.system.ModelParams`
The simulation parameters related to the modelling of the STEM
system. See the documentation for the class
:class:`prismatique.stem.system.ModelParams` for a discussion on said
parameters.
output_params : :class:`prismatique.stem.output.Params` | `None`, optional
The output parameters for the STEM simulation. See the documentation for
the class :class:`prismatique.stem.output.Params` for a discussion on
said parameters. If ``output_params`` is set to `None` [i.e. the default
value], then the aforementioned simulation parameters are set to default
values.
worker_params : :class:`prismatique.worker.Params` | `None`, optional
The simulation parameters related to GPU and CPU workers. See the
documentation for the class :class:`prismatique.worker.Params` for a
discussion on said parameters. If ``worker_params`` is set to `None`
[i.e. the default value], then the aforementioned simulation parameters
are set to default values.
skip_validation_and_conversion : `bool`, optional
Let ``validation_and_conversion_funcs`` and ``core_attrs`` denote the
attributes :attr:`~fancytypes.Checkable.validation_and_conversion_funcs`
and :attr:`~fancytypes.Checkable.core_attrs` respectively, both of which
being `dict` objects.
Let ``params_to_be_mapped_to_core_attrs`` denote the `dict`
representation of the constructor parameters excluding the parameter
``skip_validation_and_conversion``, where each `dict` key ``key`` is a
different constructor parameter name, excluding the name
``"skip_validation_and_conversion"``, and
``params_to_be_mapped_to_core_attrs[key]`` would yield the value of the
constructor parameter with the name given by ``key``.
If ``skip_validation_and_conversion`` is set to ``False``, then for each
key ``key`` in ``params_to_be_mapped_to_core_attrs``,
``core_attrs[key]`` is set to ``validation_and_conversion_funcs[key]
(params_to_be_mapped_to_core_attrs)``.
Otherwise, if ``skip_validation_and_conversion`` is set to ``True``,
then ``core_attrs`` is set to
``params_to_be_mapped_to_core_attrs.copy()``. This option is desired
primarily when the user wants to avoid potentially expensive deep copies
and/or conversions of the `dict` values of
``params_to_be_mapped_to_core_attrs``, as it is guaranteed that no
copies or conversions are made in this case.
"""
ctor_param_names = ("stem_system_model_params",
"output_params",
"worker_params")
kwargs = {"namespace_as_dict": globals(),
"ctor_param_names": ctor_param_names}
_validation_and_conversion_funcs_ = \
fancytypes.return_validation_and_conversion_funcs(**kwargs)
_pre_serialization_funcs_ = \
fancytypes.return_pre_serialization_funcs(**kwargs)
_de_pre_serialization_funcs_ = \
fancytypes.return_de_pre_serialization_funcs(**kwargs)
del ctor_param_names, kwargs
def __init__(self,
stem_system_model_params,
output_params=\
_default_output_params,
worker_params=\
_default_worker_params,
skip_validation_and_conversion=\
_default_skip_validation_and_conversion):
ctor_params = {key: val
for key, val in locals().items()
if (key not in ("self", "__class__"))}
kwargs = ctor_params
kwargs["skip_cls_tests"] = True
fancytypes.PreSerializableAndUpdatable.__init__(self, **kwargs)
return None
[docs] @classmethod
def get_validation_and_conversion_funcs(cls):
validation_and_conversion_funcs = \
cls._validation_and_conversion_funcs_.copy()
return validation_and_conversion_funcs
[docs] @classmethod
def get_pre_serialization_funcs(cls):
pre_serialization_funcs = \
cls._pre_serialization_funcs_.copy()
return pre_serialization_funcs
[docs] @classmethod
def get_de_pre_serialization_funcs(cls):
de_pre_serialization_funcs = \
cls._de_pre_serialization_funcs_.copy()
return de_pre_serialization_funcs
def _check_and_convert_sim_params(params):
obj_name = "sim_params"
obj = params[obj_name]
accepted_types = (Params,)
kwargs = {"obj": obj,
"obj_name": obj_name,
"accepted_types": accepted_types}
czekitout.check.if_instance_of_any_accepted_types(**kwargs)
kwargs = obj.get_core_attrs(deep_copy=False)
sim_params = accepted_types[0](**kwargs)
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
stem_system_model_params = \
sim_params_core_attrs["stem_system_model_params"]
kwargs = {"params": dict()}
kwargs["params"]["stem_system_model_params"] = stem_system_model_params
_check_and_convert_stem_system_model_params(**kwargs)
_pre_save(sim_params)
_check_data_size(sim_params)
return sim_params
def _pre_save(sim_params):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
stem_system_model_params = \
sim_params_core_attrs["stem_system_model_params"]
output_params = \
sim_params_core_attrs["output_params"]
stem_system_model_params_core_attrs = \
stem_system_model_params.get_core_attrs(deep_copy=False)
sample_specification = \
stem_system_model_params_core_attrs["sample_specification"]
output_params_core_attrs = output_params.get_core_attrs(deep_copy=False)
base_output_params = output_params_core_attrs["base_params"]
alg_specific_output_params = output_params_core_attrs["alg_specific_params"]
base_output_params_core_attrs = \
base_output_params.get_core_attrs(deep_copy=False)
alg_specific_output_params_core_attrs = \
alg_specific_output_params.get_core_attrs(deep_copy=False)
kwargs = {"sample_specification": sample_specification,
"output_dirname": base_output_params_core_attrs["output_dirname"]}
if base_output_params_core_attrs["save_potential_slices"]:
kwargs["unformatted_basename"] = "potential_slices_of_subset_{}.h5"
prismatique.sample._pre_save(**kwargs)
if "save_S_matrices" in alg_specific_output_params_core_attrs:
if alg_specific_output_params_core_attrs["save_S_matrices"]:
kwargs["unformatted_basename"] = "S_matrices_of_subset_{}.h5"
prismatique.sample._pre_save(**kwargs)
filenames = tuple()
if _intensity_output_is_to_be_saved(sim_params):
filenames += (_intensity_output_filename(sim_params),)
if _wavefunction_output_is_to_be_saved(sim_params):
filenames += _wavefunction_output_filenames(sim_params)
for filename in filenames:
if pathlib.Path(filename).is_file():
pathlib.Path(filename).unlink(missing_ok=True)
prismatique.sample._check_hdf5_filenames(filenames)
return None
def _intensity_output_is_to_be_saved(sim_params):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
output_params = \
sim_params_core_attrs["output_params"]
output_params_core_attrs = \
output_params.get_core_attrs(deep_copy=False)
base_output_params = \
output_params_core_attrs["base_params"]
kwargs = \
{"base_output_params": base_output_params}
intensity_output_is_to_be_saved = \
prismatique.stem.output._intensity_output_is_to_be_saved(**kwargs)
return intensity_output_is_to_be_saved
def _intensity_output_filename(sim_params):
output_dirname = _output_param_subset(sim_params)["output_dirname"]
filename = output_dirname + "/stem_sim_intensity_output.h5"
return filename
def _output_param_subset(sim_params):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
output_params = \
sim_params_core_attrs["output_params"]
output_params_core_attrs = \
output_params.get_core_attrs(deep_copy=False)
base_output_params = \
output_params_core_attrs["base_params"]
alg_specific_output_params = \
output_params_core_attrs["alg_specific_params"]
base_output_params_core_attrs = \
base_output_params.get_core_attrs(deep_copy=False)
alg_specific_output_params_core_attrs = \
alg_specific_output_params.get_core_attrs(deep_copy=False)
output_dirname = \
base_output_params_core_attrs["output_dirname"]
save_potential_slices = \
base_output_params_core_attrs["save_potential_slices"]
save_S_matrices = \
(alg_specific_output_params_core_attrs["save_S_matrices"]
if ("save_S_matrices" in alg_specific_output_params_core_attrs)
else False)
output_param_subset = {"output_dirname": output_dirname,
"save_potential_slices": save_potential_slices,
"save_S_matrices": save_S_matrices}
return output_param_subset
def _wavefunction_output_is_to_be_saved(sim_params):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
output_params = \
sim_params_core_attrs["output_params"]
output_params_core_attrs = \
output_params.get_core_attrs(deep_copy=False)
base_output_params = \
output_params_core_attrs["base_params"]
kwargs = \
{"base_output_params": base_output_params}
wavefunction_output_is_to_be_saved = \
prismatique.stem.output._wavefunction_output_is_to_be_saved(**kwargs)
return wavefunction_output_is_to_be_saved
def _wavefunction_output_filenames(sim_params):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
output_params = \
sim_params_core_attrs["output_params"]
output_params_core_attrs = \
output_params.get_core_attrs(deep_copy=False)
base_output_params = \
output_params_core_attrs["base_params"]
base_output_params_core_attrs = \
base_output_params.get_core_attrs(deep_copy=False)
output_dirname = \
base_output_params_core_attrs["output_dirname"]
num_atomic_config_subsets = _num_atomic_config_subsets(sim_params)
filenames = tuple()
for atomic_config_subset_idx in range(num_atomic_config_subsets):
unformatted_basename = "stem_sim_wavefunction_output_of_subset_{}.h5"
basename = unformatted_basename.format(atomic_config_subset_idx)
filename = output_dirname + "/" + basename
filenames += (filename,)
return filenames
def _num_atomic_config_subsets(sim_params):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
stem_system_model_params = \
sim_params_core_attrs["stem_system_model_params"]
stem_system_model_params_core_attrs = \
stem_system_model_params.get_core_attrs(deep_copy=False)
sample_specification = \
stem_system_model_params_core_attrs["sample_specification"]
kwargs = \
{"sample_specification": sample_specification}
num_atomic_config_subsets = \
prismatique.sample._num_frozen_phonon_config_subsets(**kwargs)
return num_atomic_config_subsets
def _check_data_size(sim_params):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
stem_system_model_params = \
sim_params_core_attrs["stem_system_model_params"]
output_params = \
sim_params_core_attrs["output_params"]
output_params_core_attrs = \
output_params.get_core_attrs(deep_copy=False)
base_output_params = \
output_params_core_attrs["base_params"]
base_output_params_core_attrs = \
base_output_params.get_core_attrs(deep_copy=False)
max_data_size = \
base_output_params_core_attrs["max_data_size"]
kwargs = {"stem_system_model_params": stem_system_model_params,
"output_params": output_params}
output_data_size = prismatique.stem.output._data_size(**kwargs)
current_func_name = "_check_data_size"
if max_data_size < output_data_size:
unformatted_err_msg = globals()[current_func_name+"_err_msg_1"]
err_msg = unformatted_err_msg.format(output_data_size, max_data_size)
raise MemoryError(err_msg)
return None
[docs]def run(sim_params):
r"""Run STEM simulation.
Parameters
----------
sim_params : :class:`prismatique.stem.sim.Params`
The STEM simulation parameters. See the documentation for the class
:class:`prismatique.stem.sim.Params` for a discussion on said
parameters.
Returns
-------
"""
params = locals()
global_symbol_table = globals()
for param_name in params:
func_name = "_check_and_convert_" + param_name
func_alias = global_symbol_table[func_name]
params[param_name] = func_alias(params)
kwargs = params
_run(**kwargs)
return None
def _run(sim_params):
rng_seeds = _generate_thermal_rng_seeds_from_sim_params(sim_params)
num_atomic_config_subsets = len(rng_seeds)
_remove_temp_files(sim_params, subset_idx=0, first_or_last_call=True)
_initialize_output_files(sim_params)
for atomic_config_subset_idx in range(num_atomic_config_subsets):
kwargs = {"sim_params": sim_params,
"atomic_config_subset_idx": atomic_config_subset_idx,
"rng_seeds": rng_seeds}
_run_prismatic_sims_and_postprocess_output_for_subset(**kwargs)
_remove_temp_files(sim_params, subset_idx=0, first_or_last_call=True)
_serialize_sim_params(sim_params)
print("\n\n\n")
return None
def _generate_thermal_rng_seeds_from_sim_params(sim_params):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
stem_system_model_params = \
sim_params_core_attrs["stem_system_model_params"]
stem_system_model_params_core_attrs = \
stem_system_model_params.get_core_attrs(deep_copy=False)
sample_specification = \
stem_system_model_params_core_attrs["sample_specification"]
module_alias = \
prismatique.sample
kwargs = \
{"sample_specification": sample_specification}
rng_seeds = \
module_alias._generate_rng_seeds_from_sample_specification(**kwargs)
return rng_seeds
def _remove_temp_files(sim_params, subset_idx, first_or_last_call):
output_param_subset = _output_param_subset(sim_params)
prismatique.sample._remove_temp_files(output_param_subset,
subset_idx,
first_or_last_call)
intensity_output_is_not_to_be_saved = \
not _intensity_output_is_to_be_saved(sim_params)
if first_or_last_call:
if intensity_output_is_not_to_be_saved:
filename = _intensity_output_filename(sim_params)
if pathlib.Path(filename).is_file():
pathlib.Path(filename).unlink(missing_ok=True)
return None
def _initialize_output_files(sim_params):
_write_metadata_to_output_files(sim_params)
_initialize_data_in_output_files(sim_params)
return None
def _write_metadata_to_output_files(sim_params):
filenames = tuple()
filenames += (_intensity_output_filename(sim_params),)
if _wavefunction_output_is_to_be_saved(sim_params):
filenames += _wavefunction_output_filenames(sim_params)
for filename_idx, filename in enumerate(filenames):
kwargs = {"sim_params": sim_params, "filename": filename}
_write_probe_position_metadata_to_output_file(**kwargs)
_write_output_layer_depth_metadata_to_output_file(**kwargs)
_write_k_x_and_k_y_metadata_to_output_file(**kwargs)
if filename_idx == 0:
if _intensity_3d_stem_output_is_to_be_saved(sim_params):
_write_k_xy_metadata_to_output_file(**kwargs)
else:
_write_defocus_metadata_to_output_file(**kwargs)
return None
def _write_probe_position_metadata_to_output_file(sim_params, filename):
pattern_type = _scan_pattern_type(sim_params)
grid_dims = _grid_dims_in_units_of_probe_shifts(sim_params)
path_in_file = "/metadata"
group_id = h5pywrappers.obj.ID(filename, path_in_file)
h5pywrappers.group.save(None, group_id, write_mode="w")
group = h5pywrappers.group.load(group_id, read_only=False)
probe_positions = _generate_probe_positions(sim_params, save=False)
dataset = group.create_dataset(name="probe_positions",
data=probe_positions,
dtype="float32")
dataset.attrs["dim 1"] = "probe idx"
dataset.attrs["dim 2"] = "vector component idx [0->x, 1->y]"
dataset.attrs["units"] = "Ã…"
dataset.attrs["pattern type"] = pattern_type
dataset.attrs["grid dims in units of probe shifts"] = grid_dims
group.file.close()
return None
def _generate_probe_positions(sim_params, save):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
stem_system_model_params = \
sim_params_core_attrs["stem_system_model_params"]
output_params = \
sim_params_core_attrs["output_params"]
stem_system_model_params_core_attrs = \
stem_system_model_params.get_core_attrs(deep_copy=False)
sample_specification = \
stem_system_model_params_core_attrs["sample_specification"]
scan_config = \
stem_system_model_params_core_attrs["scan_config"]
output_params_core_attrs = \
output_params.get_core_attrs(deep_copy=False)
base_output_params = \
output_params_core_attrs["base_params"]
base_output_params_core_attrs = \
base_output_params.get_core_attrs(deep_copy=False)
output_dirname = \
base_output_params_core_attrs["output_dirname"]
temp_scan_config_filename = \
prismatique.sample._generate_temp_scan_config_filename(output_dirname)
filename = \
temp_scan_config_filename if save else None
kwargs = {"sample_specification": sample_specification,
"scan_config": scan_config,
"filename": filename}
probe_positions = prismatique.scan._generate_probe_positions(**kwargs)
return probe_positions
def _scan_pattern_type(sim_params):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
stem_system_model_params = \
sim_params_core_attrs["stem_system_model_params"]
stem_system_model_params_core_attrs = \
stem_system_model_params.get_core_attrs(deep_copy=False)
scan_config = \
stem_system_model_params_core_attrs["scan_config"]
scan_pattern_type = \
prismatique.scan._pattern_type(scan_config)
return scan_pattern_type
def _grid_dims_in_units_of_probe_shifts(sim_params):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
stem_system_model_params = \
sim_params_core_attrs["stem_system_model_params"]
stem_system_model_params_core_attrs = \
stem_system_model_params.get_core_attrs(deep_copy=False)
sample_specification = \
stem_system_model_params_core_attrs["sample_specification"]
scan_config = \
stem_system_model_params_core_attrs["scan_config"]
kwargs = {"sample_specification": sample_specification,
"scan_config": scan_config}
grid_dims = prismatique.scan._grid_dims_in_units_of_probe_shifts(**kwargs)
return grid_dims
def _write_output_layer_depth_metadata_to_output_file(sim_params, filename):
path_in_file = "/metadata"
group_id = h5pywrappers.obj.ID(filename, path_in_file)
group = h5pywrappers.group.load(group_id, read_only=False)
output_layer_depths = _output_layer_depths(sim_params)
dataset = group.create_dataset(name="output_layer_depths",
data=output_layer_depths,
dtype="float32")
dataset.attrs["dim 1"] = "output layer idx"
dataset.attrs["units"] = "Ã…"
group.file.close()
return None
def _output_layer_depths(sim_params):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
stem_system_model_params = \
sim_params_core_attrs["stem_system_model_params"]
output_params = \
sim_params_core_attrs["output_params"]
stem_system_model_params_core_attrs = \
stem_system_model_params.get_core_attrs(deep_copy=False)
sample_specification = \
stem_system_model_params_core_attrs["sample_specification"]
output_params_core_attrs = \
output_params.get_core_attrs(deep_copy=False)
alg_specific_output_params = \
output_params_core_attrs["alg_specific_params"]
kwargs = {"sample_specification": sample_specification,
"alg_specific_params": alg_specific_output_params}
output_layer_depths = prismatique.stem.output._layer_depths(**kwargs)
return output_layer_depths
def _write_k_x_and_k_y_metadata_to_output_file(sim_params, filename):
path_in_file = "/metadata"
group_id = h5pywrappers.obj.ID(filename, path_in_file)
group = h5pywrappers.group.load(group_id, read_only=False)
for_postprocessed_dp = \
(True
if (filename == _intensity_output_filename(sim_params))
else False)
k_x = _k_x(sim_params, for_postprocessed_dp)
dataset = group.create_dataset(name="k_x", data=k_x, dtype="float32")
dataset.attrs["dim 1"] = "k_x idx"
dataset.attrs["units"] = "1/Ã…"
k_y = _k_y(sim_params, for_postprocessed_dp)
dataset = group.create_dataset(name="k_y", data=k_y, dtype="float32")
dataset.attrs["dim 1"] = "k_y idx"
dataset.attrs["units"] = "1/Ã…"
group.file.close()
return None
def _k_x(sim_params, for_postprocessed_dp):
kwargs = {"sim_params": sim_params,
"navigation_dims": tuple(),
"signal_dtype": "float"}
dp_set_signal = (_blank_postprocessed_dp_set_signal(**kwargs)
if for_postprocessed_dp
else _blank_unprocessed_dp_set_signal(**kwargs))
offset = dp_set_signal.axes_manager[0].offset
size = dp_set_signal.axes_manager[0].size
scale = dp_set_signal.axes_manager[0].scale
k_x = offset + scale*np.arange(size)
return k_x
def _k_y(sim_params, for_postprocessed_dp):
kwargs = {"sim_params": sim_params,
"navigation_dims": tuple(),
"signal_dtype": "float"}
dp_set_signal = (_blank_postprocessed_dp_set_signal(**kwargs)
if for_postprocessed_dp
else _blank_unprocessed_dp_set_signal(**kwargs))
offset = dp_set_signal.axes_manager[1].offset
size = dp_set_signal.axes_manager[1].size
scale = dp_set_signal.axes_manager[1].scale
k_y = offset + scale*np.arange(size)
return k_y
def _blank_postprocessed_dp_set_signal(sim_params,
navigation_dims,
signal_dtype):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
stem_system_model_params = \
sim_params_core_attrs["stem_system_model_params"]
output_params = \
sim_params_core_attrs["output_params"]
stem_system_model_params_core_attrs = \
stem_system_model_params.get_core_attrs(deep_copy=False)
sample_specification = \
stem_system_model_params_core_attrs["sample_specification"]
output_params_core_attrs = \
output_params.get_core_attrs(deep_copy=False)
base_output_params = \
output_params_core_attrs["base_params"]
base_output_params_core_attrs = \
base_output_params.get_core_attrs(deep_copy=False)
cbed_params = \
base_output_params_core_attrs["cbed_params"]
cbed_params_core_attrs = \
cbed_params.get_core_attrs(deep_copy=False)
postprocessing_seq = \
cbed_params_core_attrs["postprocessing_seq"]
kwargs = \
{"sample_specification": sample_specification,
"postprocessing_seq": postprocessing_seq,
"navigation_dims": navigation_dims,
"signal_is_cbed_pattern_set": True,
"signal_dtype": signal_dtype}
blank_postprocessed_dp_set_signal = \
prismatique._signal._blank_postprocessed_2d_signal(**kwargs)
return blank_postprocessed_dp_set_signal
def _blank_unprocessed_dp_set_signal(sim_params, navigation_dims, signal_dtype):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
stem_system_model_params = \
sim_params_core_attrs["stem_system_model_params"]
stem_system_model_params_core_attrs = \
stem_system_model_params.get_core_attrs(deep_copy=False)
sample_specification = \
stem_system_model_params_core_attrs["sample_specification"]
kwargs = \
{"sample_specification": sample_specification,
"navigation_dims": navigation_dims,
"signal_is_cbed_pattern_set": True,
"signal_dtype": signal_dtype}
blank_unprocessed_dp_set_signal = \
prismatique._signal._blank_unprocessed_2d_signal(**kwargs)
return blank_unprocessed_dp_set_signal
def _write_k_xy_metadata_to_output_file(sim_params, filename):
path_in_file = "/metadata"
group_id = h5pywrappers.obj.ID(filename, path_in_file)
group = h5pywrappers.group.load(group_id, read_only=False)
k_xy = _k_xy(sim_params)
dataset = group.create_dataset(name="k_xy", data=k_xy, dtype="float32")
dataset.attrs["dim 1"] = "k_xy idx"
dataset.attrs["units"] = "1/Ã…"
group.file.close()
return None
def _k_xy(sim_params):
kwargs = {"sim_params": sim_params,
"navigation_dims": tuple(),
"signal_dtype": "float"}
blank_postprocessed_dp_signal = _blank_postprocessed_dp_set_signal(**kwargs)
kwargs = {"input_signal": blank_postprocessed_dp_signal,
"sim_params": sim_params}
integrated_signal = _integrate_dp_to_3d_stem_signal(**kwargs)
offset = integrated_signal.axes_manager[0].offset
size = integrated_signal.axes_manager[0].size
scale = integrated_signal.axes_manager[0].scale
k_xy = offset + scale*np.arange(size)
return k_xy
def _integrate_dp_to_3d_stem_signal(input_signal, sim_params):
pixel_area = np.abs(input_signal.axes_manager[-2].scale
* input_signal.axes_manager[-1].scale)
input_signal.data /= pixel_area
num_bins = _num_bins_in_3d_stem_integration(sim_params, input_signal)
radial_k_range = _radial_k_range_in_3d_stem_integration(sim_params,
input_signal)
kwargs = {"center": (0, 0),
"radial_range": radial_k_range,
"num_bins": num_bins}
optional_params = empix.OptionalAzimuthalIntegrationParams(**kwargs)
integrated_dp_signal = empix.azimuthally_integrate(input_signal,
optional_params)
return integrated_dp_signal
def _num_bins_in_3d_stem_integration(sim_params, input_signal):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
stem_system_model_params = \
sim_params_core_attrs["stem_system_model_params"]
output_params = \
sim_params_core_attrs["output_params"]
stem_system_model_params_core_attrs = \
stem_system_model_params.get_core_attrs(deep_copy=False)
probe_model_params = \
stem_system_model_params_core_attrs["probe_model_params"]
output_params_core_attrs = \
output_params.get_core_attrs(deep_copy=False)
base_output_params = \
output_params_core_attrs["base_params"]
kwargs = \
{"probe_model_params": probe_model_params,
"base_output_params": base_output_params,
"input_signal": input_signal}
num_bins = \
prismatique.stem.output._num_bins_in_3d_stem_integration(**kwargs)
return num_bins
def _radial_k_range_in_3d_stem_integration(sim_params, input_signal):
wavelength = _wavelength(sim_params)
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
output_params = \
sim_params_core_attrs["output_params"]
output_params_core_attrs = \
output_params.get_core_attrs(deep_copy=False)
base_output_params = \
output_params_core_attrs["base_params"]
base_output_params_core_attrs = \
base_output_params.get_core_attrs(deep_copy=False)
radial_angular_step_size = \
base_output_params_core_attrs["radial_step_size_for_3d_stem"]
radial_k_step_size = \
(radial_angular_step_size / 1000) / wavelength
num_bins = _num_bins_in_3d_stem_integration(sim_params, input_signal)
radial_k_range = (0, num_bins*radial_k_step_size)
return radial_k_range
def _wavelength(sim_params):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
stem_system_model_params = \
sim_params_core_attrs["stem_system_model_params"]
stem_system_model_params_core_attrs = \
stem_system_model_params.get_core_attrs(deep_copy=False)
probe_model_params = \
stem_system_model_params_core_attrs["probe_model_params"]
probe_model_params_core_attrs = \
probe_model_params.get_core_attrs(deep_copy=False)
gun_model_params = \
probe_model_params_core_attrs["gun_model_params"]
gun_model_params_core_attrs = \
gun_model_params.get_core_attrs(deep_copy=False)
mean_beam_energy = \
gun_model_params_core_attrs["mean_beam_energy"]
wavelength = \
embeam.wavelength(mean_beam_energy)
return wavelength
def _write_defocus_metadata_to_output_file(sim_params, filename):
path_in_file = "/metadata"
group_id = h5pywrappers.obj.ID(filename, path_in_file)
group = h5pywrappers.group.load(group_id, read_only=False)
defocii = _defocii(sim_params)
dataset = group.create_dataset(name="defocii",
data=defocii,
dtype="float32")
dataset.attrs["dim 1"] = "defocus idx"
dataset.attrs["units"] = "Ã…"
group.file.close()
return None
def _defocii(sim_params):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
stem_system_model_params = \
sim_params_core_attrs["stem_system_model_params"]
stem_system_model_params_core_attrs = \
stem_system_model_params.get_core_attrs(deep_copy=False)
probe_model_params = \
stem_system_model_params_core_attrs["probe_model_params"]
wavelength = _wavelength(sim_params)
C_2_0_mag = _get_C_2_0_mag_from_probe_model_params(probe_model_params)
Delta_f = wavelength * C_2_0_mag / np.pi
gauss_hermite_points, _ = \
_gauss_hermite_points_and_weights(probe_model_params)
sigma_f = \
_sigma_f(probe_model_params)
defocal_offsets = np.sqrt(2) * gauss_hermite_points * sigma_f
defocii = Delta_f + defocal_offsets # In Ã….
return defocii
def _get_C_2_0_mag_from_probe_model_params(probe_model_params):
module_alias = prismatique.hrtem.sim
func_alias = module_alias._get_C_2_0_mag_from_probe_model_params
C_2_0_mag = func_alias(probe_model_params)
return C_2_0_mag
def _gauss_hermite_points_and_weights(probe_model_params):
module_alias = prismatique.hrtem.sim
func_alias = module_alias._gauss_hermite_points_and_weights
gauss_hermite_points, gauss_hermite_weights = func_alias(probe_model_params)
return gauss_hermite_points, gauss_hermite_weights
def _sigma_f(probe_model_params):
module_alias = prismatique.hrtem.sim
func_alias = module_alias._sigma_f
sigma_f = func_alias(probe_model_params)
return sigma_f
def _initialize_data_in_output_files(sim_params):
filenames = tuple()
filenames += (_intensity_output_filename(sim_params),)
if _wavefunction_output_is_to_be_saved(sim_params):
filenames += _wavefunction_output_filenames(sim_params)
for filename_idx, filename in enumerate(filenames):
if filename_idx == 0:
_initialize_intensity_data_in_output_file(sim_params, filename)
else:
atomic_config_subset_idx = filename_idx - 1
kwargs = {"sim_params": sim_params,
"filename": filename,
"atomic_config_subset_idx": atomic_config_subset_idx}
_initialize_wavefunction_data_in_output_file(**kwargs)
return None
def _initialize_intensity_data_in_output_file(sim_params, filename):
_initialize_intensity_4d_stem_data_in_output_file(sim_params, filename)
if _com_output_is_to_be_saved(sim_params):
_initialize_com_data_in_output_file(sim_params, filename)
if _intensity_3d_stem_output_is_to_be_saved(sim_params):
_initialize_intensity_3d_stem_data_in_output_file(sim_params, filename)
if _intensity_2d_stem_output_is_to_be_saved(sim_params):
_initialize_intensity_2d_stem_data_in_output_file(sim_params, filename)
return None
def _initialize_intensity_4d_stem_data_in_output_file(sim_params, filename):
output_layer_depths = _output_layer_depths(sim_params)
probe_positions = _generate_probe_positions(sim_params, save=False)
k_x = _k_x(sim_params, for_postprocessed_dp=True)
k_y = _k_y(sim_params, for_postprocessed_dp=True)
dataset_shape = (len(output_layer_depths),
len(probe_positions),
len(k_y),
len(k_x))
path_in_file = "/data/4D_STEM"
group_id = h5pywrappers.obj.ID(filename, path_in_file)
h5pywrappers.group.save(None, group_id, write_mode="a")
group = h5pywrappers.group.load(group_id, read_only=False)
dataset = group.create_dataset(name="intensity_DPs",
shape=dataset_shape,
dtype="float32",
fillvalue=0)
dataset.attrs["dim 1"] = "output layer idx"
dataset.attrs["dim 2"] = "probe idx"
dataset.attrs["dim 3"] = "k_y idx"
dataset.attrs["dim 4"] = "k_x idx"
dataset.attrs["units"] = "dimensionless"
group.file.close()
return None
def _com_output_is_to_be_saved(sim_params):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
output_params = \
sim_params_core_attrs["output_params"]
output_params_core_attrs = \
output_params.get_core_attrs(deep_copy=False)
base_output_params = \
output_params_core_attrs["base_params"]
base_output_params_core_attrs = \
base_output_params.get_core_attrs(deep_copy=False)
com_output_is_to_be_saved = \
base_output_params_core_attrs["save_com"]
return com_output_is_to_be_saved
def _initialize_com_data_in_output_file(sim_params, filename):
output_layer_depths = _output_layer_depths(sim_params)
probe_positions = _generate_probe_positions(sim_params, save=False)
dataset_shape = (len(output_layer_depths), 2, len(probe_positions))
path_in_file = "/data"
group_id = h5pywrappers.obj.ID(filename, path_in_file)
group = h5pywrappers.group.load(group_id, read_only=False)
dataset = group.create_dataset(name="center_of_mass_momentum",
shape=dataset_shape,
dtype="float32",
fillvalue=0)
dataset.attrs["dim 1"] = "output layer idx"
dataset.attrs["dim 2"] = "vector component idx [0->x, 1->y]"
dataset.attrs["dim 3"] = "probe idx"
dataset.attrs["units"] = "1/Ã…"
group.file.close()
return None
def _initialize_intensity_3d_stem_data_in_output_file(sim_params, filename):
output_layer_depths = _output_layer_depths(sim_params)
probe_positions = _generate_probe_positions(sim_params, save=False)
k_xy = _k_xy(sim_params)
dataset_shape = (len(output_layer_depths), len(probe_positions), len(k_xy))
path_in_file = "/data/3D_STEM"
group_id = h5pywrappers.obj.ID(filename, path_in_file)
h5pywrappers.group.save(None, group_id, write_mode="a")
group = h5pywrappers.group.load(group_id, read_only=False)
dataset = group.create_dataset(name="integrated_intensities",
shape=dataset_shape,
dtype="float32",
fillvalue=0)
dataset.attrs["dim 1"] = "output layer idx"
dataset.attrs["dim 2"] = "probe idx"
dataset.attrs["dim 3"] = "k_xy idx"
dataset.attrs["units"] = "dimensionless"
group.file.close()
return None
def _intensity_2d_stem_output_is_to_be_saved(sim_params):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
output_params = \
sim_params_core_attrs["output_params"]
output_params_core_attrs = \
output_params.get_core_attrs(deep_copy=False)
base_output_params = \
output_params_core_attrs["base_params"]
base_output_params_core_attrs = \
base_output_params.get_core_attrs(deep_copy=False)
radial_range_for_2d_stem = \
base_output_params_core_attrs["radial_range_for_2d_stem"]
intensity_2d_stem_output_is_to_be_saved = \
(radial_range_for_2d_stem[0] != radial_range_for_2d_stem[1])
return intensity_2d_stem_output_is_to_be_saved
def _initialize_intensity_2d_stem_data_in_output_file(sim_params, filename):
output_layer_depths = _output_layer_depths(sim_params)
probe_positions = _generate_probe_positions(sim_params, save=False)
dataset_shape = (len(output_layer_depths), len(probe_positions))
path_in_file = "/data/2D_STEM"
group_id = h5pywrappers.obj.ID(filename, path_in_file)
h5pywrappers.group.save(None, group_id, write_mode="a")
group = h5pywrappers.group.load(group_id, read_only=False)
dataset = group.create_dataset(name="integrated_intensities",
shape=dataset_shape,
dtype="float32",
fillvalue=0)
dataset.attrs["dim 1"] = "output layer idx"
dataset.attrs["dim 2"] = "probe idx"
dataset.attrs["units"] = "dimensionless"
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
output_params = \
sim_params_core_attrs["output_params"]
output_params_core_attrs = \
output_params.get_core_attrs(deep_copy=False)
base_output_params = \
output_params_core_attrs["base_params"]
base_output_params_core_attrs = \
base_output_params.get_core_attrs(deep_copy=False)
radial_range_for_2d_stem = \
base_output_params_core_attrs["radial_range_for_2d_stem"]
dataset.attrs["lower integration limit in mrads"] = \
radial_range_for_2d_stem[0]
dataset.attrs["upper integration limit in mrads"] = \
radial_range_for_2d_stem[1]
group.file.close()
return None
def _initialize_wavefunction_data_in_output_file(sim_params,
filename,
atomic_config_subset_idx):
output_layer_depths = _output_layer_depths(sim_params)
kwargs = {"sim_params": sim_params,
"atomic_config_subset_idx": atomic_config_subset_idx}
num_atomic_configs_in_subset = _num_atomic_configs_in_subset(**kwargs)
defocii = _defocii(sim_params)
probe_positions = _generate_probe_positions(sim_params, save=False)
k_x = _k_x(sim_params, for_postprocessed_dp=False)
k_y = _k_y(sim_params, for_postprocessed_dp=False)
dataset_shape = (len(output_layer_depths),
num_atomic_configs_in_subset,
len(defocii),
len(probe_positions),
len(k_y),
len(k_x))
path_in_file = "/data/4D_STEM"
group_id = h5pywrappers.obj.ID(filename, path_in_file)
h5pywrappers.group.save(None, group_id, write_mode="a")
group = h5pywrappers.group.load(group_id, read_only=False)
dataset = group.create_dataset(name="complex_valued_DPs",
shape=dataset_shape,
dtype="complex64",
fillvalue=0j)
dataset.attrs["dim 1"] = "output layer idx"
dataset.attrs["dim 2"] = "atomic config idx"
dataset.attrs["dim 3"] = "defocus idx"
dataset.attrs["dim 4"] = "probe idx"
dataset.attrs["dim 5"] = "k_y idx"
dataset.attrs["dim 6"] = "k_x idx"
dataset.attrs["units"] = "dimensionless"
group.file.close()
return None
def _num_atomic_configs_in_subset(sim_params, atomic_config_subset_idx):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
stem_system_model_params = \
sim_params_core_attrs["stem_system_model_params"]
stem_system_model_params_core_attrs = \
stem_system_model_params.get_core_attrs(deep_copy=False)
sample_specification = \
stem_system_model_params_core_attrs["sample_specification"]
kwargs = \
{"sample_specification": sample_specification,
"subset_idx": atomic_config_subset_idx}
num_atomic_configs_in_subset = \
prismatique.sample._num_frozen_phonon_configs_in_subset(**kwargs)
return num_atomic_configs_in_subset
def _initialize_pyprismatic_sim_obj(sim_params):
pyprismatic_sim_obj = pyprismatic.Metadata()
sim_params_core_attrs = sim_params.get_core_attrs(deep_copy=False)
stem_system_model_params = sim_params_core_attrs["stem_system_model_params"]
output_params = sim_params_core_attrs["output_params"]
worker_params = sim_params_core_attrs["worker_params"]
output_params_core_attrs = output_params.get_core_attrs(deep_copy=False)
base_output_params = output_params_core_attrs["base_params"]
base_output_params_core_attrs = \
base_output_params.get_core_attrs(deep_copy=False)
output_dirname = \
base_output_params_core_attrs["output_dirname"]
module_alias = prismatique.sample
func_alias = module_alias._set_pyprismatic_sim_obj_attrs_to_default_values
func_alias(pyprismatic_sim_obj, output_dirname)
_ = _generate_probe_positions(sim_params, save=True)
stem_system_model_params_core_attrs = \
stem_system_model_params.get_core_attrs(deep_copy=False)
sample_specification = \
stem_system_model_params_core_attrs["sample_specification"]
kwargs = {"pyprismatic_sim_obj": pyprismatic_sim_obj,
"stem_system_model_params": stem_system_model_params,
"output_dirname": output_dirname}
_unpack_stem_system_model_params_into_pyprismatic_sim_obj(**kwargs)
unpack_output_params_into_pyprismatic_sim_obj = \
prismatique.sample._unpack_output_params_into_pyprismatic_sim_obj
unpack_worker_params_into_pyprismatic_sim_obj = \
prismatique.sample._unpack_worker_params_into_pyprismatic_sim_obj
kwargs = {"pyprismatic_sim_obj": pyprismatic_sim_obj,
"output_params": output_params,
"sample_specification": sample_specification}
unpack_output_params_into_pyprismatic_sim_obj(**kwargs)
kwargs = {"pyprismatic_sim_obj": pyprismatic_sim_obj,
"worker_params": worker_params}
unpack_worker_params_into_pyprismatic_sim_obj(**kwargs)
return pyprismatic_sim_obj
def _unpack_stem_system_model_params_into_pyprismatic_sim_obj(
pyprismatic_sim_obj, stem_system_model_params, output_dirname):
stem_system_model_params_core_attrs = \
stem_system_model_params.get_core_attrs(deep_copy=False)
sample_specification = \
stem_system_model_params_core_attrs["sample_specification"]
probe_model_params = \
stem_system_model_params_core_attrs["probe_model_params"]
specimen_tilt = \
stem_system_model_params_core_attrs["specimen_tilt"]
unpack_sample_specification_into_pyprismatic_sim_obj = \
prismatique.sample._unpack_sample_specification_into_pyprismatic_sim_obj
unpack_probe_model_params_into_pyprismatic_sim_obj = \
prismatique.sample._unpack_probe_model_params_into_pyprismatic_sim_obj
unpack_sample_specification_into_pyprismatic_sim_obj(pyprismatic_sim_obj,
output_dirname,
sample_specification)
unpack_probe_model_params_into_pyprismatic_sim_obj(pyprismatic_sim_obj,
probe_model_params,
output_dirname)
pyprismatic_sim_obj.probeXtilt = specimen_tilt[0]
pyprismatic_sim_obj.probeYtilt = specimen_tilt[1]
pyprismatic_sim_obj.probes_file = \
prismatique.sample._generate_temp_scan_config_filename(output_dirname)
return None
def _run_prismatic_sims_and_postprocess_output_for_subset(
sim_params,
atomic_config_subset_idx,
rng_seeds):
defocii = _defocii(sim_params) # In Ã….
num_defocii = len(defocii)
for defocus_idx in range(num_defocii):
func_alias = \
_run_prismatic_sims_and_postprocess_output_for_subset_and_defocus
kwargs = \
{"sim_params": sim_params,
"atomic_config_subset_idx": atomic_config_subset_idx,
"defocus_idx": defocus_idx,
"rng_seeds": rng_seeds}
_ = \
func_alias(**kwargs)
_remove_temp_files(sim_params,
subset_idx=atomic_config_subset_idx,
first_or_last_call=False)
num_atomic_config_subsets = len(rng_seeds)
if atomic_config_subset_idx == num_atomic_config_subsets-1:
kwargs = {"sim_params": sim_params,
"filename": _intensity_output_filename(sim_params)}
_calc_and_write_remaining_intensity_data_to_output_file(**kwargs)
_remove_any_temp_intensity_4d_stem_data_in_output_file(**kwargs)
return None
def _run_prismatic_sims_and_postprocess_output_for_subset_and_defocus(
sim_params,
atomic_config_subset_idx,
defocus_idx,
rng_seeds):
pyprismatic_sim_obj = _initialize_pyprismatic_sim_obj(sim_params)
_update_pyprismatic_sim_obj_for_next_prismatic_sim(pyprismatic_sim_obj,
sim_params,
atomic_config_subset_idx,
defocus_idx,
rng_seeds)
# Run prismatic simulation.
_call_pyprismatic_sim_obj_go(pyprismatic_sim_obj)
gc.collect()
kwargs = {"pyprismatic_sim_obj": pyprismatic_sim_obj,
"sim_params": sim_params,
"atomic_config_subset_idx": atomic_config_subset_idx,
"defocus_idx": defocus_idx}
_postprocess_and_reorganize_current_prismatic_sim_output(**kwargs)
return None
def _update_pyprismatic_sim_obj_for_next_prismatic_sim(pyprismatic_sim_obj,
sim_params,
atomic_config_subset_idx,
defocus_idx,
rng_seeds):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
stem_system_model_params = \
sim_params_core_attrs["stem_system_model_params"]
stem_system_model_params_core_attrs = \
stem_system_model_params.get_core_attrs(deep_copy=False)
sample_specification = \
stem_system_model_params_core_attrs["sample_specification"]
output_param_subset = _output_param_subset(sim_params)
defocii = _defocii(sim_params) # In Ã….
kwargs = {"pyprismatic_sim_obj": pyprismatic_sim_obj,
"sample_specification": sample_specification,
"output_param_subset": output_param_subset,
"subset_idx": atomic_config_subset_idx,
"defocus_idx": defocus_idx,
"defocii": defocii,
"rng_seeds": rng_seeds}
module_alias = prismatique.sample
module_alias._update_pyprismatic_sim_obj_for_next_prismatic_sim(**kwargs)
return None
def _call_pyprismatic_sim_obj_go(pyprismatic_sim_obj):
pyprismatic_sim_obj.go()
return None
def _postprocess_and_reorganize_current_prismatic_sim_output(
pyprismatic_sim_obj, sim_params, atomic_config_subset_idx, defocus_idx):
if defocus_idx == 0:
output_param_subset = _output_param_subset(sim_params)
module_alias = \
prismatique.sample
func_alias = \
module_alias._move_sample_specification_output_to_separate_file
kwargs = {"output_dirname": output_param_subset["output_dirname"],
"new_output_filename": ""}
if pyprismatic_sim_obj.savePotentialSlices:
kwargs["sample_specification_type"] = "potential_slice_subset"
func_alias(**kwargs)
gc.collect()
if pyprismatic_sim_obj.saveSMatrix:
kwargs["sample_specification_type"] = "S_matrix_subset"
func_alias(**kwargs)
gc.collect()
kwargs = {"sim_params": sim_params,
"atomic_config_subset_idx": atomic_config_subset_idx}
num_atomic_configs_in_subset = _num_atomic_configs_in_subset(**kwargs)
output_layer_depths = _output_layer_depths(sim_params)
for output_layer_idx, _ in enumerate(output_layer_depths):
for atomic_config_idx in range(num_atomic_configs_in_subset):
if pyprismatic_sim_obj.save4DOutput:
quartet_of_indices = \
{"atomic_config_subset_idx": atomic_config_subset_idx,
"output_layer_idx": output_layer_idx,
"atomic_config_idx": atomic_config_idx,
"defocus_idx": defocus_idx}
kwargs = {"sim_params": sim_params,
"quartet_of_indices": quartet_of_indices}
func_alias = _postprocess_and_reorganize_dp_subset
func_alias(**kwargs)
gc.collect()
return None
def _postprocess_and_reorganize_dp_subset(sim_params, quartet_of_indices):
kwargs = \
{"sim_params": sim_params,
"quartet_of_indices": quartet_of_indices}
unprocessed_complex_dp_subset_signal = \
_load_unprocessed_complex_dp_subset_signal(**kwargs)
if _wavefunction_output_is_to_be_saved(sim_params):
output_data = unprocessed_complex_dp_subset_signal.data
filenames = _wavefunction_output_filenames(sim_params)
filename = filenames[quartet_of_indices["atomic_config_subset_idx"]]
kwargs = {"sim_params": sim_params,
"unprocessed_complex_dp_subset": output_data,
"filename": filename,
"quartet_of_indices": quartet_of_indices}
_write_unprocessed_complex_dp_subset_to_output_file(**kwargs)
kwargs = \
{"sim_params": \
sim_params,
"unprocessed_intensity_dp_subset_signal": \
empix.abs_sq(unprocessed_complex_dp_subset_signal)}
postprocessed_intensity_dp_subset_signal = \
_postprocess_intensity_dp_subset_signal(**kwargs)
filename = _intensity_output_filename(sim_params)
path_in_file = "/data/4D_STEM/intensity_DPs"
dataset_id = h5pywrappers.obj.ID(filename, path_in_file)
dataset = h5pywrappers.dataset.load(dataset_id, read_only=False)
kwargs = {"quartet_of_indices": \
quartet_of_indices,
"dataset": \
dataset,
"postprocessed_intensity_dp_subset_signal": \
postprocessed_intensity_dp_subset_signal,
"sim_params": \
sim_params}
_update_intensity_4d_stem_data_in_output_dataset(**kwargs)
dataset.file.close()
return None
def _load_unprocessed_complex_dp_subset_signal(sim_params, quartet_of_indices):
output_dirname = _output_param_subset(sim_params)["output_dirname"]
filename = output_dirname + "/prismatic_output.h5"
output_layer_idx = quartet_of_indices["output_layer_idx"]
atomic_config_idx = quartet_of_indices["atomic_config_idx"]
atomic_config_subset_idx = quartet_of_indices["atomic_config_subset_idx"]
output_layer_idx_str = str(output_layer_idx).rjust(4, "0")
atomic_config_idx_str = str(atomic_config_idx).rjust(4, "0")
unformatted_path_in_file = ("4DSTEM_simulation/data/datacubes"
"/CBED_array_depth{}_fp{}/data")
path_in_file = unformatted_path_in_file.format(output_layer_idx_str,
atomic_config_idx_str)
dataset_id = h5pywrappers.obj.ID(filename, path_in_file)
dataset = h5pywrappers.dataset.load(dataset_id, read_only=True)
kwargs = {"sim_params": sim_params,
"atomic_config_subset_idx": atomic_config_subset_idx}
num_atomic_configs_in_subset = _num_atomic_configs_in_subset(**kwargs)
unprocessed_complex_dp_subset_data = \
np.transpose(dataset[:, 0, :, :], axes=(0, 2, 1))[:, ::-1, :]
unprocessed_complex_dp_subset_data *= \
num_atomic_configs_in_subset # Fixes prismatic bug.
dataset.file.close()
kwargs = \
{"sim_params": sim_params,
"navigation_dims": (unprocessed_complex_dp_subset_data.shape[0],),
"signal_dtype": "complex"}
unprocessed_complex_dp_subset_signal = \
_blank_unprocessed_dp_set_signal(**kwargs)
unprocessed_complex_dp_subset_signal.data = \
unprocessed_complex_dp_subset_data
return unprocessed_complex_dp_subset_signal
def _write_unprocessed_complex_dp_subset_to_output_file(
sim_params,
unprocessed_complex_dp_subset,
filename,
quartet_of_indices):
path_in_file = "/data/4D_STEM/complex_valued_DPs"
dataset_id = h5pywrappers.obj.ID(filename, path_in_file)
multi_dim_slice = (quartet_of_indices["output_layer_idx"],
quartet_of_indices["atomic_config_idx"],
quartet_of_indices["defocus_idx"],
slice(None),
slice(None),
slice(None))
datasubset_id = h5pywrappers.datasubset.ID(dataset_id, multi_dim_slice)
datasubset = unprocessed_complex_dp_subset
h5pywrappers.datasubset.save(datasubset, datasubset_id)
return None
def _postprocess_intensity_dp_subset_signal(
sim_params, unprocessed_intensity_dp_subset_signal):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
output_params = \
sim_params_core_attrs["output_params"]
output_params_core_attrs = \
output_params.get_core_attrs(deep_copy=False)
base_output_params = \
output_params_core_attrs["base_params"]
base_output_params_core_attrs = \
base_output_params.get_core_attrs(deep_copy=False)
cbed_params = \
base_output_params_core_attrs["cbed_params"]
cbed_params_core_attrs = \
cbed_params.get_core_attrs(deep_copy=False)
postprocessing_seq = \
cbed_params_core_attrs["postprocessing_seq"]
kwargs = \
{"input_signal": unprocessed_intensity_dp_subset_signal,
"postprocessing_seq": postprocessing_seq}
postprocessed_intensity_dp_subset_signal = \
prismatique._signal._postprocess_2d_signal(**kwargs)
return postprocessed_intensity_dp_subset_signal
def _update_intensity_4d_stem_data_in_output_dataset(
quartet_of_indices,
dataset,
postprocessed_intensity_dp_subset_signal,
sim_params):
output_layer_idx = quartet_of_indices["output_layer_idx"]
atomic_config_idx = quartet_of_indices["atomic_config_idx"]
atomic_config_subset_idx = quartet_of_indices["atomic_config_subset_idx"]
defocus_idx = quartet_of_indices["defocus_idx"]
dataset[output_layer_idx] += (postprocessed_intensity_dp_subset_signal.data
* _w_f_l(sim_params, l=defocus_idx)
/ _total_num_frozen_phonon_configs(sim_params)
/ np.sqrt(np.pi))
kwargs = {"sim_params": sim_params,
"atomic_config_subset_idx": atomic_config_subset_idx}
num_atomic_configs_in_subset = _num_atomic_configs_in_subset(**kwargs)
num_atomic_config_subsets = _num_atomic_config_subsets(sim_params)
num_defocii = len(_defocii(sim_params))
sim_params_core_attrs = sim_params.get_core_attrs(deep_copy=False)
output_params = sim_params_core_attrs["output_params"]
output_params_core_attrs = output_params.get_core_attrs(deep_copy=False)
base_output_params = output_params_core_attrs["base_params"]
base_output_params_core_attrs = \
base_output_params.get_core_attrs(deep_copy=False)
cbed_params = \
base_output_params_core_attrs["cbed_params"]
if ((atomic_config_subset_idx == num_atomic_config_subsets-1)
and (atomic_config_idx == num_atomic_configs_in_subset-1)
and (defocus_idx == num_defocii-1)):
cbed_params_core_attrs = \
cbed_params.get_core_attrs(deep_copy=False)
avg_num_electrons_per_postprocessed_dp = \
cbed_params_core_attrs["avg_num_electrons_per_postprocessed_dp"]
datasubset = dataset[output_layer_idx].clip(min=0)
datasubset *= (avg_num_electrons_per_postprocessed_dp
/ np.mean(np.sum(datasubset, axis=(-2, -1))))
if cbed_params_core_attrs["apply_shot_noise"]:
datasubset = np.random.poisson(datasubset)
dataset[output_layer_idx] = datasubset
return None
def _w_f_l(sim_params, l):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
stem_system_model_params = \
sim_params_core_attrs["stem_system_model_params"]
stem_system_model_params_core_attrs = \
stem_system_model_params.get_core_attrs(deep_copy=False)
probe_model_params = \
stem_system_model_params_core_attrs["probe_model_params"]
_, gauss_hermite_weights = \
_gauss_hermite_points_and_weights(probe_model_params)
w_f_l = gauss_hermite_weights[l]
return w_f_l
def _total_num_frozen_phonon_configs(sim_params):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
stem_system_model_params = \
sim_params_core_attrs["stem_system_model_params"]
stem_system_model_params_core_attrs = \
stem_system_model_params.get_core_attrs(deep_copy=False)
sample_specification = \
stem_system_model_params_core_attrs["sample_specification"]
kwargs = \
{"sample_specification": sample_specification}
total_num_frozen_phonon_configs = \
prismatique.sample._total_num_frozen_phonon_configs(**kwargs)
return total_num_frozen_phonon_configs
def _calc_and_write_remaining_intensity_data_to_output_file(sim_params,
filename):
func_alias_1 = \
_calc_and_write_com_data_of_output_layer_to_output_file
func_alias_2 = \
_calc_and_write_intensity_3d_stem_data_of_output_layer_to_output_file
func_alias_3 = \
_calc_and_write_intensity_2d_stem_data_of_output_layer_to_output_file
output_layer_depths = _output_layer_depths(sim_params)
for output_layer_idx, _ in enumerate(output_layer_depths):
kwargs = {"sim_params": sim_params,
"output_layer_idx": output_layer_idx,
"filename": filename}
func_aliases = tuple()
if _com_output_is_to_be_saved(sim_params):
func_aliases += (func_alias_1,)
if _intensity_3d_stem_output_is_to_be_saved(sim_params):
func_aliases += (func_alias_2,)
if _intensity_2d_stem_output_is_to_be_saved(sim_params):
func_aliases += (func_alias_3,)
for func_alias in func_aliases:
func_alias(**kwargs)
gc.collect()
return None
def _intensity_3d_stem_output_is_to_be_saved(sim_params):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
output_params = \
sim_params_core_attrs["output_params"]
output_params_core_attrs = \
output_params.get_core_attrs(deep_copy=False)
base_output_params = \
output_params_core_attrs["base_params"]
base_output_params_core_attrs = \
base_output_params.get_core_attrs(deep_copy=False)
radial_step_size_for_3d_stem = \
base_output_params_core_attrs["radial_step_size_for_3d_stem"]
intensity_3d_stem_output_is_to_be_saved = \
(radial_step_size_for_3d_stem > 0)
return intensity_3d_stem_output_is_to_be_saved
def _calc_and_write_com_data_of_output_layer_to_output_file(sim_params,
output_layer_idx,
filename):
k_mesh = _k_mesh(sim_params, for_postprocessed_dp=True)
intensity_4d_stem_signal = _load_intensity_4d_stem_signal(sim_params,
output_layer_idx,
filename)
kwargs = {"center": (0, 0), "radial_range": None}
optional_params = empix.OptionalAnnularIntegrationParams(**kwargs)
normalization_signal = empix.annularly_integrate(intensity_4d_stem_signal,
optional_params)
normalization_signal.data[normalization_signal.data == 0] = 1.0
integrand_signal = copy.deepcopy(intensity_4d_stem_signal)
integrand_signal.data *= k_mesh[0]
com_x_signal = empix.annularly_integrate(integrand_signal, optional_params)
com_x_signal.data /= normalization_signal.data
integrand_signal = copy.deepcopy(intensity_4d_stem_signal)
integrand_signal.data *= k_mesh[1]
com_y_signal = empix.annularly_integrate(integrand_signal, optional_params)
com_y_signal.data /= normalization_signal.data
com_data_of_output_layer = np.array([com_x_signal.data, com_y_signal.data])
path_in_file = "/data/center_of_mass_momentum"
dataset_id = h5pywrappers.obj.ID(filename, path_in_file)
multi_dim_slice = (output_layer_idx, slice(None), slice(None))
datasubset_id = h5pywrappers.datasubset.ID(dataset_id, multi_dim_slice)
datasubset = com_data_of_output_layer
h5pywrappers.datasubset.save(datasubset, datasubset_id)
return None
def _k_mesh(sim_params, for_postprocessed_dp):
k_x = _k_x(sim_params, for_postprocessed_dp)
k_y = _k_y(sim_params, for_postprocessed_dp)
k_mesh = np.meshgrid(k_x, k_y, indexing="xy")
return k_mesh
def _load_intensity_4d_stem_signal(sim_params, output_layer_idx, filename):
path_in_file = "/data/4D_STEM/intensity_DPs"
dataset_id = h5pywrappers.obj.ID(filename, path_in_file)
multi_dim_slice = (output_layer_idx, slice(None), slice(None), slice(None))
datasubset_id = h5pywrappers.datasubset.ID(dataset_id, multi_dim_slice)
intensity_4d_stem_data = h5pywrappers.datasubset.load(datasubset_id)
kwargs = {"sim_params": sim_params,
"navigation_dims": (intensity_4d_stem_data.shape[0],),
"signal_dtype": "float"}
intensity_4d_stem_signal = _blank_postprocessed_dp_set_signal(**kwargs)
intensity_4d_stem_signal.data = intensity_4d_stem_data
return intensity_4d_stem_signal
def _calc_and_write_intensity_3d_stem_data_of_output_layer_to_output_file(
sim_params, output_layer_idx, filename):
intensity_4d_stem_signal = \
_load_intensity_4d_stem_signal(sim_params, output_layer_idx, filename)
intensity_3d_stem_signal = \
_integrate_dp_to_3d_stem_signal(intensity_4d_stem_signal, sim_params)
intensity_3d_stem_data_of_output_layer = \
intensity_3d_stem_signal.data
path_in_file = "/data/3D_STEM/integrated_intensities"
dataset_id = h5pywrappers.obj.ID(filename, path_in_file)
multi_dim_slice = (output_layer_idx, slice(None), slice(None))
datasubset_id = h5pywrappers.datasubset.ID(dataset_id, multi_dim_slice)
datasubset = intensity_3d_stem_data_of_output_layer
h5pywrappers.datasubset.save(datasubset, datasubset_id)
return None
def _calc_and_write_intensity_2d_stem_data_of_output_layer_to_output_file(
sim_params, output_layer_idx, filename):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
output_params = \
sim_params_core_attrs["output_params"]
output_params_core_attrs = \
output_params.get_core_attrs(deep_copy=False)
base_output_params = \
output_params_core_attrs["base_params"]
base_output_params_core_attrs = \
base_output_params.get_core_attrs(deep_copy=False)
radial_angular_range_for_2d_stem = \
np.array(base_output_params_core_attrs["radial_range_for_2d_stem"])
wavelength = \
_wavelength(sim_params)
radial_k_range_for_2d_stem = \
(radial_angular_range_for_2d_stem / 1000) / wavelength
intensity_4d_stem_signal = _load_intensity_4d_stem_signal(sim_params,
output_layer_idx,
filename)
input_signal = intensity_4d_stem_signal
pixel_area = np.abs(input_signal.axes_manager[-2].scale
* input_signal.axes_manager[-1].scale)
input_signal.data /= pixel_area
kwargs = {"center": (0, 0), "radial_range": radial_k_range_for_2d_stem}
optional_params = empix.OptionalAnnularIntegrationParams(**kwargs)
intensity_2d_stem_signal = empix.annularly_integrate(input_signal,
optional_params)
intensity_2d_stem_data_of_output_layer = intensity_2d_stem_signal.data
path_in_file = "/data/2D_STEM/integrated_intensities"
dataset_id = h5pywrappers.obj.ID(filename, path_in_file)
multi_dim_slice = (output_layer_idx, slice(None))
datasubset_id = h5pywrappers.datasubset.ID(dataset_id, multi_dim_slice)
datasubset = intensity_2d_stem_data_of_output_layer
h5pywrappers.datasubset.save(datasubset, datasubset_id)
return None
def _remove_any_temp_intensity_4d_stem_data_in_output_file(sim_params,
filename):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
output_params = \
sim_params_core_attrs["output_params"]
output_params_core_attrs = \
output_params.get_core_attrs(deep_copy=False)
base_output_params = \
output_params_core_attrs["base_params"]
base_output_params_core_attrs = \
base_output_params.get_core_attrs(deep_copy=False)
cbed_params = \
base_output_params_core_attrs["cbed_params"]
cbed_params_core_attrs = \
cbed_params.get_core_attrs(deep_copy=False)
intensity_4d_stem_data_is_temp = \
not cbed_params_core_attrs["save_final_intensity"]
if intensity_4d_stem_data_is_temp:
path_in_file = "/data/4D_STEM"
group_id = h5pywrappers.obj.ID(filename, path_in_file)
group = h5pywrappers.group.load(group_id, read_only=False)
del group["intensity_DPs"]
group.file.close()
return None
def _serialize_sim_params(sim_params):
sim_params_core_attrs = \
sim_params.get_core_attrs(deep_copy=False)
output_params = \
sim_params_core_attrs["output_params"]
output_params_core_attrs = \
output_params.get_core_attrs(deep_copy=False)
base_output_params = \
output_params_core_attrs["base_params"]
base_output_params_core_attrs = \
base_output_params.get_core_attrs(deep_copy=False)
output_dirname = \
base_output_params_core_attrs["output_dirname"]
filename = \
output_dirname + "/stem_sim_params.json"
kwargs = {"filename": filename, "overwrite": True}
sim_params.dump(**kwargs)
return None
###########################
## Define error messages ##
###########################
_check_data_size_err_msg_1 = \
("The data size of the output of the STEM simulation to be performed is {} "
"bytes, exceeding the maximum allowed size of {} bytes specified by the "
"simulation parameters.")