Module tau2.files
Functions
def load_from_hdf5(input_file)-
Expand source code
def load_from_hdf5(input_file): """ Loads calculation data from a unified Taupy HDF5 file. Args: input_file (str): The path to the input HDF5 file. Returns: dict: A dictionary containing the full data structure of the HDF5 file, typically with a single top-level 'orientation_0' key. """ data = {} with h5py.File(input_file, 'r') as hf: for orient_key in hf.keys(): if not orient_key.startswith('orientation_'): continue orientation_group = hf[orient_key] orient_data = {'parameters': {}, 'results': {}} # Load parameters if 'parameters' in orientation_group: params_group = orientation_group['parameters'] for key, val in params_group.attrs.items(): orient_data['parameters'][key] = val for key in params_group.keys(): orient_data['parameters'][key] = params_group[key][...] # Load results if 'results' in orientation_group: results_group = orientation_group['results'] field_keys = sorted(results_group.keys(), key=lambda x: int(x.split('_')[1])) for field_key in field_keys: field_group = results_group[field_key] field_data = {} # Load all datasets and subgroups from the field_group for key, item in field_group.items(): if isinstance(item, h5py.Dataset): field_data[key] = item[...] elif isinstance(item, h5py.Group): # Use the new recursive helper for any subgroups field_data[key] = _recursive_load_group(item) orient_data['results'][field_key] = field_data data[orient_key] = orient_data return dataLoads calculation data from a unified Taupy HDF5 file.
Args
input_file:str- The path to the input HDF5 file.
Returns
dict- A dictionary containing the full data structure of the HDF5 file, typically with a single top-level 'orientation_0' key.
def load_initial_data(input_file,
kramers,
max_state=None,
states=None,
max_modes=None,
modes=None,
defer_coupling_loading=False)-
Expand source code
def load_initial_data(input_file, kramers, max_state=None, states=None, max_modes=None, modes=None, defer_coupling_loading=False): """ Loads and prepares initial electronic structure data from a Tau format HDF5 file. Args: input_file (str): The path to the input HDF5 file. kramers (bool): True if the system is a Kramers ion, False otherwise. max_state (int, optional): The maximum number of electronic states to load. max_modes (int, optional): The number of lowest-indexed phonon modes to load. modes (list, optional): List of specific modes to load. defer_coupling_loading (bool): If True, stores HDF5 keys instead of loading matrices into memory. Returns: dict: A dictionary containing the prepared, 0-indexed initial data. """ print("\n--- Loading and Preparing Data ---") with h5py.File(input_file, 'r') as hf: # Define a slice or list to load only the required number of states. # Note: h5py requires indices to be in increasing order for fancy indexing. # We assume 'states' has already been sorted by cli.py if provided. if states is not None: s_load = states elif max_state is not None: s_load = slice(0, max_state) else: num_total_states = hf['energies'].shape[0] s_load = slice(0, num_total_states) # Helper to extract and optionally reorder vectors def get_vector(dset, indices): return dset[indices] # Helper to extract and optionally reorder submatrices def get_submatrix(dset, indices): # Load the submatrix (h5py returns rows sorted by index) # We slice columns after loading rows matrix = dset[indices][:, indices] return matrix # Step 1: Efficiently get all keys and sort them numerically. all_keys = sorted(hf['couplings'].keys(), key=int) # Step 2: Truncate the list of keys if max_modes is specified. if modes is not None: keys_to_load = [str(i) for i in modes] elif max_modes is not None: if len(all_keys) < max_modes: print(f"Warning: --max-modes ({max_modes}) is larger than the number of " f"available coupling matrices ({len(all_keys)}). Using all available.") keys_to_load = all_keys[:max_modes] else: keys_to_load = all_keys # Load other arrays data = { 'initial_energies': get_vector(hf['energies'], s_load), 'angmom': np.array([ symmetrise_op(get_submatrix(hf['angmom'][d], s_load), is_kramers=kramers, is_tr_even=False) for d in 'xyz']), 'spin': np.array([ symmetrise_op(get_submatrix(hf['spin'][d], s_load), is_kramers=kramers, is_tr_even=False) for d in 'xyz']), } # Step 3 & 4: Handle couplings and mode energies if defer_coupling_loading: # Store info to load couplings later, on-demand data['coupling_info'] = { 'file': input_file, 'kramers': kramers, 's_load': s_load, 'keys': {new_idx: key for new_idx, key in enumerate(keys_to_load)} } else: # Load couplings into memory now (for Orbach) raw_couplings_remapped = { new_idx: get_submatrix(hf['couplings'][key], s_load) for new_idx, key in enumerate(keys_to_load) } data['symmetrised_couplings'] = symmetrise_couplings(raw_couplings_remapped, kramers) all_mode_energies = hf['mode_energies'][...] original_indices = [int(key) for key in keys_to_load] energy_indices = [idx - 1 for idx in original_indices] data['all_mode_energies'] = all_mode_energies[energy_indices] return dataLoads and prepares initial electronic structure data from a Tau format HDF5 file.
Args
input_file:str- The path to the input HDF5 file.
kramers:bool- True if the system is a Kramers ion, False otherwise.
max_state:int, optional- The maximum number of electronic states to load.
max_modes:int, optional- The number of lowest-indexed phonon modes to load.
modes:list, optional- List of specific modes to load.
defer_coupling_loading:bool- If True, stores HDF5 keys instead of loading matrices into memory.
Returns
dict- A dictionary containing the prepared, 0-indexed initial data.
def print_calculation_parameters(args)-
Expand source code
def print_calculation_parameters(args): """Prints a formatted and context-aware summary of calculation parameters. This function reads the argparse.Namespace object and prints a neat summary of the key parameters that will be used for the calculation, providing a clear record in the output log. Args: args (argparse.Namespace): The parsed command-line arguments. """ print("\n--- Calculation Parameters ---") p = vars(args).copy() def _format_list(items): if not isinstance(items, list) or len(items) <= 6: return items return f"[{', '.join(map(str, items[:3]))}, ..., {', '.join(map(str, items[-3:]))}] ({len(items)} total)" def _print_param(key, display_name=None): if key in p and p[key] is not None: if not display_name: display_name = key.replace('_', ' ').capitalize() value = p[key] if isinstance(value, list) and key != 'input_files': value = _format_list(value) print(f" {display_name:<28}: {value}") _print_param('mode') if 'input_files' in p: _print_param('input_files') else: _print_param('input_file') _print_param('output') if 'output_csv' in p and p['output_csv']: _print_param('output_csv', 'Output CSV') _print_param('max_state') _print_param('raman_max_state') _print_param('max_modes') _print_param('n_cores') _print_param('kramers') _print_param('erange') _print_param('fwhm') _print_param('lineshape') _print_param('integration_width') _print_param('grid_points') _print_param('grid_resolution') if 'augment_with_modes' in p and (p['augment_with_modes'] or p['augment_with_resonances']): _print_param('augment_with_modes') _print_param('augment_with_resonances') _print_param('augment_window_fwhm') _print_param('augment_resolution') _print_param('field_direction') _print_param('field_magnitudes') _print_param('orientations') _print_param('quadrature') _print_param('no_reorient') if 'temperatures' in p and p['temperatures']: _print_param('temperatures') if 'rate_integrator' in p: _print_param('rate_integrator') if 'interpolation_kind' in p and p.get('rate_integrator') != 'trapz': _print_param('interpolation_kind') if 'debug_summary' in p and (p['debug_summary'] or p['debug_verbose']): _print_param('debug_summary') _print_param('debug_verbose') _print_param('debug_top_n') if 'profile' in p and p['profile']: _print_param('profile')Prints a formatted and context-aware summary of calculation parameters.
This function reads the argparse.Namespace object and prints a neat summary of the key parameters that will be used for the calculation, providing a clear record in the output log.
Args
args:argparse.Namespace- The parsed command-line arguments.
def print_electronic_structure(initial_data, B_unit, field_mag)-
Expand source code
def print_electronic_structure(initial_data, B_unit, field_mag): """ Calculates and prints a formatted table of the electronic structure for a given field. Returns: dict: The calculated Zeeman data {energies, state_mu, state_mJ, U}. """ B_vec = B_unit * field_mag # Calculate the electronic structure for this field. # Note: apply_zeeman_splitting returns (new_energies, new_couplings, state_mu, state_mJ, U) energies, _, state_mu, state_mJ, U = apply_zeeman_splitting( initial_data['initial_energies'], initial_data['angmom'], initial_data['spin'], B_vec ) # Print the formatted table. print(f"\n {'State':<5} {'Energy (cm-1)':>15} {'Mag. Moment (μ_B)':>20} {'<mJ>':>16}") print(f" {'-'*5:<5} {'-'*15:>15} {'-'*20:>20} {'-'*16:>16}") for i, energy in enumerate(energies): print(f" {i:<5d} {(energy - energies[0]):>15.3f} {state_mu[i]:>20.3f} {state_mJ[i]:>16.3f}") print("") return {'energies': energies, 'state_mu': state_mu, 'state_mJ': state_mJ, 'U': U}Calculates and prints a formatted table of the electronic structure for a given field.
Returns
dict- The calculated Zeeman data {energies, state_mu, state_mJ, U}.
def save_calculation_to_hdf5(all_orient_results, params, orientation_weights, output_filename)-
Expand source code
def save_calculation_to_hdf5(all_orient_results, params, orientation_weights, output_filename): """ Saves all calculation results to the final, unified HDF5 file format. Args: all_orient_results (dict): A dictionary of results, where keys are orientation indices and values are lists of field results. params (object): Dataclass object (RamanParams or OrbachParams). orientation_weights (list): List of weights for each orientation. output_filename (str): The path to the output HDF5 file. """ with h5py.File(output_filename, 'w') as f: for orient_idx, all_field_results in all_orient_results.items(): if not all_field_results: continue # Create the top-level group for this single orientation orientation_group = f.create_group(f'orientation_{orient_idx}') orientation_group.attrs['weight'] = orientation_weights[orient_idx] # --- Parameters Subgroup --- params_group = orientation_group.create_group('parameters') # Save all calculation parameters as attributes of this group for param_name, param_value in vars(params).items(): if param_value is not None: try: params_group.attrs[param_name] = param_value except TypeError: # Handle non-standard types like lists params_group.attrs[param_name] = str(param_value) # Reconstruct the B_unit_vector from the first result of this orientation first_result = all_field_results[0] if first_result.get('field_mag', 0) > 1e-9: B_unit_vector = np.array(first_result.get('B_vec', [0,0,1])) / first_result['field_mag'] else: B_unit_vector = np.array(first_result.get('B_vec', [0,0,1])) params_group.create_dataset('B_unit_vector', data=B_unit_vector) field_magnitudes = sorted([res['field_mag'] for res in all_field_results]) params_group.create_dataset('field_magnitudes', data=np.array(field_magnitudes)) if hasattr(params, 'temperatures') and params.temperatures is not None: params_group.create_dataset('temperatures', data=np.array(params.temperatures)) # --- Results Subgroup --- results_group = orientation_group.create_group('results') # Sort results by field magnitude to ensure field_j corresponds to the j-th field sorted_results = sorted(all_field_results, key=lambda x: x['field_mag']) for i, result in enumerate(sorted_results): field_group = results_group.create_group(f'field_{i}') # Save Zeeman-split state information field_group.create_dataset('electronic_energies', data=result['energies']) field_group.create_dataset('mJ_values', data=result['state_mJ']) field_group.create_dataset('magnetic_moments', data=result['state_mu']) # Determine if the calculation was Raman or Orbach is_orbach = isinstance(params, OrbachParams) rates_dataset_name = 'orbach_rates' if is_orbach else 'raman_rates' if 'relaxation_rates' in result: field_group.create_dataset(rates_dataset_name, data=result['relaxation_rates']) # Save calculation data to a specific subgroup calc_details_group = field_group.create_group('orbach' if is_orbach else 'raman') if 'gamma_matrices' in result: gamma_group = calc_details_group.create_group('gamma_matrices') for temp, matrix in result['gamma_matrices'].items(): gamma_group.create_dataset(f"gamma_{temp:.2f}K", data=matrix) if 'det_balance' in result: det_balance_group = calc_details_group.create_group('det_balance') for temp, matrix in result['det_balance'].items(): det_balance_group.create_dataset(f"det_balance_{temp:.2f}K", data=matrix) if not is_orbach: # Raman-specific detailed data if 'omega_grid' in result: calc_details_group.create_dataset('omega_grid', data=result['omega_grid']) if 'integrands' in result: for (i_state, j_state), procs in result['integrands'].items(): transition_group = calc_details_group.create_group(f"transition_{i_state}_{j_state}") for proc_name, J_array in procs.items(): transition_group.create_dataset(f"J_{proc_name}", data=J_array) print(f"\nCalculation data saved to {output_filename}")Saves all calculation results to the final, unified HDF5 file format.
Args
all_orient_results:dict- A dictionary of results, where keys are orientation indices and values are lists of field results.
params:object- Dataclass object (RamanParams or OrbachParams).
orientation_weights:list- List of weights for each orientation.
output_filename:str- The path to the output HDF5 file.
def save_hysteresis(fields, magnetisation_dict, output_filename='hysteresis')-
Expand source code
def save_hysteresis(fields, magnetisation_dict, output_filename='hysteresis'): """ Save hysteresis data to a CSV file. """ import csv with open(output_filename+'.csv', 'w') as csv_file: writer = csv.writer(csv_file, lineterminator="\n") row = ['temp'] row.extend(fields) writer.writerow(row) for temp, mag_values in magnetisation_dict.items(): row = [temp] row.extend(mag_values) writer.writerow(row)Save hysteresis data to a CSV file.
def write_debug_files(field_mag,
summary_data,
verbose_data,
params: RamanParams)-
Expand source code
def write_debug_files(field_mag, summary_data, verbose_data, params: RamanParams): """Writes detailed debug CSV files for significant phonon pairs. If --debug-summary is enabled, this function writes a CSV file containing a summary of the most significant phonon pair contributions. If --debug-verbose is enabled, it writes detailed CSVs showing the integrand for each of these top contributing pairs. Args: field_mag (float): The magnetic field magnitude (T) for this data. summary_data (list): A list of dictionaries with summary data. verbose_data (list): A list of dictionaries with verbose integrand data. params (RamanParams): The dataclass containing calculation parameters. """ if not summary_data: return sorted_summary = sorted(summary_data, key=lambda x: x['Contribution'], reverse=True) if params.debug_summary: debug_filename = f"debug_summary_{field_mag:.4f}T.csv" csv_summary = [{k: (f"{v:.6e}" if isinstance(v, float) else v) for k, v in item.items()} for item in sorted_summary] if csv_summary: with open(debug_filename, 'w', newline='') as f: writer = csv.DictWriter(f, fieldnames=csv_summary[0].keys()) writer.writeheader() writer.writerows(csv_summary) if params.debug_verbose and verbose_data: top_n_pairs = {(d['Process'], d['Transition'], d['Pair(q,r)']) for d in sorted_summary[:params.debug_top_n]} for item in verbose_data: pair_key = (item['process'], item['transition'].replace('_', '->'), item['pair_id']) if pair_key in top_n_pairs: p, t, pair = item['process'], item['transition'], item['pair_id'] filename = f"debug_verbose_{field_mag:.4f}T_proc_{p}_trans_{t}_pair_{pair.replace('(','').replace(')','').replace(',','_')}.csv" data_dict = item['data'] rows = [dict(zip(data_dict.keys(), values)) for values in zip(*data_dict.values())] if rows: with open(filename, 'w', newline='') as f: writer = csv.DictWriter(f, fieldnames=rows[0].keys()) writer.writeheader() writer.writerows(rows)Writes detailed debug CSV files for significant phonon pairs.
If –debug-summary is enabled, this function writes a CSV file containing a summary of the most significant phonon pair contributions. If –debug-verbose is enabled, it writes detailed CSVs showing the integrand for each of these top contributing pairs.
Args
field_mag:float- The magnetic field magnitude (T) for this data.
summary_data:list- A list of dictionaries with summary data.
verbose_data:list- A list of dictionaries with verbose integrand data.
params:RamanParams- The dataclass containing calculation parameters.
def write_rates_to_csv(filename, temperatures, all_rates_data)-
Expand source code
def write_rates_to_csv(filename, temperatures, all_rates_data): """Writes the relaxation rates to a CSV file. Args: filename (str): The path to the output CSV file. temperatures (list): A list of temperatures (K). all_rates_data (dict): A dictionary where keys are field magnitudes and values are the 1D arrays of relaxation rates. """ fields = sorted(all_rates_data.keys()) csv_header = ["Temperature (K)"] + [f"{f:.8f} T" for f in fields] # Create a matrix of rates: rows are temperatures, columns are fields rates_matrix = np.zeros((len(temperatures), len(fields))) for j, field in enumerate(fields): rates_for_field = all_rates_data[field] # Ensure the rates array is not longer than the temperatures list num_rates = min(len(rates_for_field), len(temperatures)) rates_matrix[:num_rates, j] = rates_for_field[:num_rates] output_rows = [] for i, temp in enumerate(temperatures): row = [f"{temp:.2f}"] + [f"{rate:.6e}" for rate in rates_matrix[i, :]] output_rows.append(row) try: with open(filename, 'w', newline='') as f: writer = csv.writer(f) writer.writerow(csv_header) writer.writerows(output_rows) print(f"Relaxation rates saved to {filename}") except IOError as e: print(f"Error writing to {filename}: {e}")Writes the relaxation rates to a CSV file.
Args
filename:str- The path to the output CSV file.
temperatures:list- A list of temperatures (K).
all_rates_data:dict- A dictionary where keys are field magnitudes and values are the 1D arrays of relaxation rates.