Module tau2.files

Functions

def load_from_hdf5(input_file)
Expand source code
def load_from_hdf5(input_file):
    """
    Loads calculation data from a unified Taupy HDF5 file.

    Args:
        input_file (str): The path to the input HDF5 file.

    Returns:
        dict: A dictionary containing the full data structure of the HDF5 file,
              typically with a single top-level 'orientation_0' key.
    """
    data = {}
    with h5py.File(input_file, 'r') as hf:
        for orient_key in hf.keys():
            if not orient_key.startswith('orientation_'): continue
            
            orientation_group = hf[orient_key]
            orient_data = {'parameters': {}, 'results': {}}

            # Load parameters
            if 'parameters' in orientation_group:
                params_group = orientation_group['parameters']
                for key, val in params_group.attrs.items():
                    orient_data['parameters'][key] = val
                for key in params_group.keys():
                     orient_data['parameters'][key] = params_group[key][...]

            # Load results
            if 'results' in orientation_group:
                results_group = orientation_group['results']
                field_keys = sorted(results_group.keys(), key=lambda x: int(x.split('_')[1]))
                for field_key in field_keys:
                    field_group = results_group[field_key]
                    field_data = {}
                    
                    # Load all datasets and subgroups from the field_group
                    for key, item in field_group.items():
                        if isinstance(item, h5py.Dataset):
                            field_data[key] = item[...]
                        elif isinstance(item, h5py.Group):
                            # Use the new recursive helper for any subgroups
                            field_data[key] = _recursive_load_group(item)

                    orient_data['results'][field_key] = field_data
            data[orient_key] = orient_data
            
    return data

Loads calculation data from a unified Taupy HDF5 file.

Args

input_file : str
The path to the input HDF5 file.

Returns

dict
A dictionary containing the full data structure of the HDF5 file, typically with a single top-level 'orientation_0' key.
def load_initial_data(input_file,
kramers,
max_state=None,
states=None,
max_modes=None,
modes=None,
defer_coupling_loading=False)
Expand source code
def load_initial_data(input_file, kramers, max_state=None, states=None, max_modes=None, modes=None, defer_coupling_loading=False):
    """
    Loads and prepares initial electronic structure data from a Tau format HDF5 file.

    Args:
        input_file (str): The path to the input HDF5 file.
        kramers (bool): True if the system is a Kramers ion, False otherwise.
        max_state (int, optional): The maximum number of electronic states to load.
        max_modes (int, optional): The number of lowest-indexed phonon modes to load.
        modes (list, optional): List of specific modes to load.
        defer_coupling_loading (bool): If True, stores HDF5 keys instead of loading
                                     matrices into memory.

    Returns:
        dict: A dictionary containing the prepared, 0-indexed initial data.
    """
    print("\n--- Loading and Preparing Data ---")
    with h5py.File(input_file, 'r') as hf:
        # Define a slice or list to load only the required number of states.
        # Note: h5py requires indices to be in increasing order for fancy indexing.
        # We assume 'states' has already been sorted by cli.py if provided.
        if states is not None:
            s_load = states
        elif max_state is not None:
            s_load = slice(0, max_state)
        else:
            num_total_states = hf['energies'].shape[0]
            s_load = slice(0, num_total_states)

        # Helper to extract and optionally reorder vectors
        def get_vector(dset, indices):
            return dset[indices]

        # Helper to extract and optionally reorder submatrices
        def get_submatrix(dset, indices):
            # Load the submatrix (h5py returns rows sorted by index)
            # We slice columns after loading rows
            matrix = dset[indices][:, indices]
            return matrix

        # Step 1: Efficiently get all keys and sort them numerically.
        all_keys = sorted(hf['couplings'].keys(), key=int)

        # Step 2: Truncate the list of keys if max_modes is specified.
        if modes is not None:
            keys_to_load = [str(i) for i in modes]
        elif max_modes is not None:
            if len(all_keys) < max_modes:
                print(f"Warning: --max-modes ({max_modes}) is larger than the number of "
                      f"available coupling matrices ({len(all_keys)}). Using all available.")
            keys_to_load = all_keys[:max_modes]
        else:
            keys_to_load = all_keys
        
        # Load other arrays
        data = {
            'initial_energies': get_vector(hf['energies'], s_load),
            'angmom': np.array([ symmetrise_op(get_submatrix(hf['angmom'][d], s_load), 
                                               is_kramers=kramers, is_tr_even=False) for d in 'xyz']),
            'spin': np.array([ symmetrise_op(get_submatrix(hf['spin'][d], s_load), 
                                               is_kramers=kramers, is_tr_even=False) for d in 'xyz']),
        }

        # Step 3 & 4: Handle couplings and mode energies
        if defer_coupling_loading:
            # Store info to load couplings later, on-demand
            data['coupling_info'] = {
                'file': input_file,
                'kramers': kramers,
                's_load': s_load,
                'keys': {new_idx: key for new_idx, key in enumerate(keys_to_load)}
            }
        else:
            # Load couplings into memory now (for Orbach)
            raw_couplings_remapped = {
                new_idx: get_submatrix(hf['couplings'][key], s_load)
                for new_idx, key in enumerate(keys_to_load)
            }
            data['symmetrised_couplings'] = symmetrise_couplings(raw_couplings_remapped, kramers)

        all_mode_energies = hf['mode_energies'][...]
        original_indices = [int(key) for key in keys_to_load]
        energy_indices = [idx - 1 for idx in original_indices]
        data['all_mode_energies'] = all_mode_energies[energy_indices]
    
    return data

Loads and prepares initial electronic structure data from a Tau format HDF5 file.

Args

input_file : str
The path to the input HDF5 file.
kramers : bool
True if the system is a Kramers ion, False otherwise.
max_state : int, optional
The maximum number of electronic states to load.
max_modes : int, optional
The number of lowest-indexed phonon modes to load.
modes : list, optional
List of specific modes to load.
defer_coupling_loading : bool
If True, stores HDF5 keys instead of loading matrices into memory.

Returns

dict
A dictionary containing the prepared, 0-indexed initial data.
def print_calculation_parameters(args)
Expand source code
def print_calculation_parameters(args):
    """Prints a formatted and context-aware summary of calculation parameters.

    This function reads the argparse.Namespace object and prints a neat summary
    of the key parameters that will be used for the calculation, providing a
    clear record in the output log.

    Args:
        args (argparse.Namespace): The parsed command-line arguments.
    """
    print("\n--- Calculation Parameters ---")
    p = vars(args).copy()

    def _format_list(items):
        if not isinstance(items, list) or len(items) <= 6:
            return items
        return f"[{', '.join(map(str, items[:3]))}, ..., {', '.join(map(str, items[-3:]))}] ({len(items)} total)"

    def _print_param(key, display_name=None):
        if key in p and p[key] is not None:
            if not display_name:
                display_name = key.replace('_', ' ').capitalize()
            value = p[key]
            if isinstance(value, list) and key != 'input_files':
                value = _format_list(value)
            print(f"  {display_name:<28}: {value}")

    _print_param('mode')
    if 'input_files' in p:
        _print_param('input_files')
    else:
        _print_param('input_file')
    _print_param('output')
    if 'output_csv' in p and p['output_csv']:
         _print_param('output_csv', 'Output CSV')
    _print_param('max_state')
    _print_param('raman_max_state')
    _print_param('max_modes')
    _print_param('n_cores')
    _print_param('kramers')
    _print_param('erange')
    _print_param('fwhm')
    _print_param('lineshape')
    _print_param('integration_width')
    _print_param('grid_points')
    _print_param('grid_resolution')
    if 'augment_with_modes' in p and (p['augment_with_modes'] or p['augment_with_resonances']):
        _print_param('augment_with_modes')
        _print_param('augment_with_resonances')
        _print_param('augment_window_fwhm')
        _print_param('augment_resolution')
    _print_param('field_direction')
    _print_param('field_magnitudes')
    _print_param('orientations')
    _print_param('quadrature')
    _print_param('no_reorient')
    if 'temperatures' in p and p['temperatures']:
        _print_param('temperatures')
        if 'rate_integrator' in p:
            _print_param('rate_integrator')
        if 'interpolation_kind' in p and p.get('rate_integrator') != 'trapz':
            _print_param('interpolation_kind')
    if 'debug_summary' in p and (p['debug_summary'] or p['debug_verbose']):
        _print_param('debug_summary')
        _print_param('debug_verbose')
        _print_param('debug_top_n')
    if 'profile' in p and p['profile']:
        _print_param('profile')

Prints a formatted and context-aware summary of calculation parameters.

This function reads the argparse.Namespace object and prints a neat summary of the key parameters that will be used for the calculation, providing a clear record in the output log.

Args

args : argparse.Namespace
The parsed command-line arguments.
def print_electronic_structure(initial_data, B_unit, field_mag)
Expand source code
def print_electronic_structure(initial_data, B_unit, field_mag):
    """
    Calculates and prints a formatted table of the electronic structure for a given field.
    
    Returns:
        dict: The calculated Zeeman data {energies, state_mu, state_mJ, U}.
    """
    
    B_vec = B_unit * field_mag
    
    # Calculate the electronic structure for this field.
    # Note: apply_zeeman_splitting returns (new_energies, new_couplings, state_mu, state_mJ, U)
    energies, _, state_mu, state_mJ, U = apply_zeeman_splitting(
        initial_data['initial_energies'], initial_data['angmom'],
        initial_data['spin'], B_vec
    )

    # Print the formatted table.
    print(f"\n  {'State':<5} {'Energy (cm-1)':>15} {'Mag. Moment (μ_B)':>20} {'<mJ>':>16}")
    print(f"  {'-'*5:<5} {'-'*15:>15} {'-'*20:>20} {'-'*16:>16}")
    
    for i, energy in enumerate(energies):
        print(f"  {i:<5d} {(energy - energies[0]):>15.3f} {state_mu[i]:>20.3f} {state_mJ[i]:>16.3f}")
    print("")
    
    return {'energies': energies, 'state_mu': state_mu, 'state_mJ': state_mJ, 'U': U}

Calculates and prints a formatted table of the electronic structure for a given field.

Returns

dict
The calculated Zeeman data {energies, state_mu, state_mJ, U}.
def save_calculation_to_hdf5(all_orient_results, params, orientation_weights, output_filename)
Expand source code
def save_calculation_to_hdf5(all_orient_results, params, orientation_weights, output_filename):
    """
    Saves all calculation results to the final, unified HDF5 file format.

    Args:
        all_orient_results (dict): A dictionary of results, where keys are
            orientation indices and values are lists of field results.
        params (object): Dataclass object (RamanParams or OrbachParams).
        orientation_weights (list): List of weights for each orientation.
        output_filename (str): The path to the output HDF5 file.
    """
    with h5py.File(output_filename, 'w') as f:
        for orient_idx, all_field_results in all_orient_results.items():
            if not all_field_results:
                continue

            # Create the top-level group for this single orientation
            orientation_group = f.create_group(f'orientation_{orient_idx}')
            orientation_group.attrs['weight'] = orientation_weights[orient_idx]

            # --- Parameters Subgroup ---
            params_group = orientation_group.create_group('parameters')
            # Save all calculation parameters as attributes of this group
            for param_name, param_value in vars(params).items():
                if param_value is not None:
                    try:
                        params_group.attrs[param_name] = param_value
                    except TypeError: # Handle non-standard types like lists
                        params_group.attrs[param_name] = str(param_value)
            
            # Reconstruct the B_unit_vector from the first result of this orientation
            first_result = all_field_results[0]
            if first_result.get('field_mag', 0) > 1e-9:
                B_unit_vector = np.array(first_result.get('B_vec', [0,0,1])) / first_result['field_mag']
            else:
                B_unit_vector = np.array(first_result.get('B_vec', [0,0,1]))

            params_group.create_dataset('B_unit_vector', data=B_unit_vector)
            field_magnitudes = sorted([res['field_mag'] for res in all_field_results])
            params_group.create_dataset('field_magnitudes', data=np.array(field_magnitudes))
            if hasattr(params, 'temperatures') and params.temperatures is not None:
                params_group.create_dataset('temperatures', data=np.array(params.temperatures))

            # --- Results Subgroup ---
            results_group = orientation_group.create_group('results')
            # Sort results by field magnitude to ensure field_j corresponds to the j-th field
            sorted_results = sorted(all_field_results, key=lambda x: x['field_mag'])
            
            for i, result in enumerate(sorted_results):
                field_group = results_group.create_group(f'field_{i}')

                # Save Zeeman-split state information
                field_group.create_dataset('electronic_energies', data=result['energies'])
                field_group.create_dataset('mJ_values', data=result['state_mJ'])
                field_group.create_dataset('magnetic_moments', data=result['state_mu'])

                # Determine if the calculation was Raman or Orbach
                is_orbach = isinstance(params, OrbachParams)
                rates_dataset_name = 'orbach_rates' if is_orbach else 'raman_rates'
                if 'relaxation_rates' in result:
                    field_group.create_dataset(rates_dataset_name, data=result['relaxation_rates'])

                # Save calculation data to a specific subgroup
                calc_details_group = field_group.create_group('orbach' if is_orbach else 'raman')
                if 'gamma_matrices' in result:
                    gamma_group = calc_details_group.create_group('gamma_matrices')
                    for temp, matrix in result['gamma_matrices'].items():
                        gamma_group.create_dataset(f"gamma_{temp:.2f}K", data=matrix)
                
                if 'det_balance' in result:
                    det_balance_group = calc_details_group.create_group('det_balance')
                    for temp, matrix in result['det_balance'].items():
                        det_balance_group.create_dataset(f"det_balance_{temp:.2f}K", data=matrix)

                if not is_orbach: # Raman-specific detailed data
                    if 'omega_grid' in result:
                        calc_details_group.create_dataset('omega_grid', data=result['omega_grid'])
                    if 'integrands' in result:
                        for (i_state, j_state), procs in result['integrands'].items():
                            transition_group = calc_details_group.create_group(f"transition_{i_state}_{j_state}")
                            for proc_name, J_array in procs.items():
                                transition_group.create_dataset(f"J_{proc_name}", data=J_array)
    print(f"\nCalculation data saved to {output_filename}")

Saves all calculation results to the final, unified HDF5 file format.

Args

all_orient_results : dict
A dictionary of results, where keys are orientation indices and values are lists of field results.
params : object
Dataclass object (RamanParams or OrbachParams).
orientation_weights : list
List of weights for each orientation.
output_filename : str
The path to the output HDF5 file.
def save_hysteresis(fields, magnetisation_dict, output_filename='hysteresis')
Expand source code
def save_hysteresis(fields, magnetisation_dict, output_filename='hysteresis'):
    """
    Save hysteresis data to a CSV file.
    """
    import csv
    with open(output_filename+'.csv', 'w') as csv_file:
        writer = csv.writer(csv_file, lineterminator="\n")
        row = ['temp']
        row.extend(fields)
        writer.writerow(row)
        for temp, mag_values in magnetisation_dict.items():
            row = [temp]
            row.extend(mag_values)
            writer.writerow(row)

Save hysteresis data to a CSV file.

def write_debug_files(field_mag,
summary_data,
verbose_data,
params: RamanParams)
Expand source code
def write_debug_files(field_mag, summary_data, verbose_data, params: RamanParams):
    """Writes detailed debug CSV files for significant phonon pairs.

    If --debug-summary is enabled, this function writes a CSV file containing
    a summary of the most significant phonon pair contributions. If
    --debug-verbose is enabled, it writes detailed CSVs showing the integrand
    for each of these top contributing pairs.

    Args:
        field_mag (float): The magnetic field magnitude (T) for this data.
        summary_data (list): A list of dictionaries with summary data.
        verbose_data (list): A list of dictionaries with verbose integrand data.
        params (RamanParams): The dataclass containing calculation parameters.
    """
    if not summary_data:
        return

    sorted_summary = sorted(summary_data, key=lambda x: x['Contribution'], reverse=True)

    if params.debug_summary:
        debug_filename = f"debug_summary_{field_mag:.4f}T.csv"
        csv_summary = [{k: (f"{v:.6e}" if isinstance(v, float) else v) for k, v in item.items()} for item in sorted_summary]
        if csv_summary:
            with open(debug_filename, 'w', newline='') as f:
                writer = csv.DictWriter(f, fieldnames=csv_summary[0].keys())
                writer.writeheader()
                writer.writerows(csv_summary)

    if params.debug_verbose and verbose_data:
        top_n_pairs = {(d['Process'], d['Transition'], d['Pair(q,r)']) for d in sorted_summary[:params.debug_top_n]}
        for item in verbose_data:
            pair_key = (item['process'], item['transition'].replace('_', '->'), item['pair_id'])
            if pair_key in top_n_pairs:
                p, t, pair = item['process'], item['transition'], item['pair_id']
                filename = f"debug_verbose_{field_mag:.4f}T_proc_{p}_trans_{t}_pair_{pair.replace('(','').replace(')','').replace(',','_')}.csv"
                data_dict = item['data']
                rows = [dict(zip(data_dict.keys(), values)) for values in zip(*data_dict.values())]
                if rows:
                    with open(filename, 'w', newline='') as f:
                        writer = csv.DictWriter(f, fieldnames=rows[0].keys())
                        writer.writeheader()
                        writer.writerows(rows)

Writes detailed debug CSV files for significant phonon pairs.

If –debug-summary is enabled, this function writes a CSV file containing a summary of the most significant phonon pair contributions. If –debug-verbose is enabled, it writes detailed CSVs showing the integrand for each of these top contributing pairs.

Args

field_mag : float
The magnetic field magnitude (T) for this data.
summary_data : list
A list of dictionaries with summary data.
verbose_data : list
A list of dictionaries with verbose integrand data.
params : RamanParams
The dataclass containing calculation parameters.
def write_rates_to_csv(filename, temperatures, all_rates_data)
Expand source code
def write_rates_to_csv(filename, temperatures, all_rates_data):
    """Writes the relaxation rates to a CSV file.

    Args:
        filename (str): The path to the output CSV file.
        temperatures (list): A list of temperatures (K).
        all_rates_data (dict): A dictionary where keys are field magnitudes and
                               values are the 1D arrays of relaxation rates.
    """
    fields = sorted(all_rates_data.keys())
    csv_header = ["Temperature (K)"] + [f"{f:.8f} T" for f in fields]

    # Create a matrix of rates: rows are temperatures, columns are fields
    rates_matrix = np.zeros((len(temperatures), len(fields)))
    for j, field in enumerate(fields):
        rates_for_field = all_rates_data[field]
        # Ensure the rates array is not longer than the temperatures list
        num_rates = min(len(rates_for_field), len(temperatures))
        rates_matrix[:num_rates, j] = rates_for_field[:num_rates]

    output_rows = []
    for i, temp in enumerate(temperatures):
        row = [f"{temp:.2f}"] + [f"{rate:.6e}" for rate in rates_matrix[i, :]]
        output_rows.append(row)

    try:
        with open(filename, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(csv_header)
            writer.writerows(output_rows)
        print(f"Relaxation rates saved to {filename}")
    except IOError as e:
        print(f"Error writing to {filename}: {e}")

Writes the relaxation rates to a CSV file.

Args

filename : str
The path to the output CSV file.
temperatures : list
A list of temperatures (K).
all_rates_data : dict
A dictionary where keys are field magnitudes and values are the 1D arrays of relaxation rates.