Module tau2.hysteresis

Functions

def load_and_aggregate_rates(input_files, tau_dynamics=None)
Expand source code
def load_and_aggregate_rates(input_files, tau_dynamics=None):
    if tau_dynamics is None:
        tau_dynamics = ['all']
    """
    Load and aggregate rate data from one or more Taupy rate HDF5 files.

    Performs a strict check to ensure all input files were generated
    with identical orientation grids, field magnitudes, and temperatures
    before aggregating the relaxation rates.

    Args:
        input_files (list): A list of paths to HDF5 files.
        tau_dynamics (list): List of rate types to include
            (e.g., ['raman', 'orbach']).

    Returns:
        tuple: A tuple containing:
            - list: A list of data dictionaries, one for each orientation.
            - list: A list of orientation weights.

    Raises:
        ValueError: If input files are not compatible (e.g., different
            orientation grids, field lists, or temperatures).
    """
    if not input_files:
        raise ValueError("No input files provided.")

    aggregated_data = {}
    orientation_weights = []
    reference_params = {}

    # --- 1. Load and initialise from the FIRST file ---
    first_file_path = input_files[0]
    with h5py.File(first_file_path, 'r') as hf:
        orient_keys = sorted(
            [k for k in hf.keys() if k.startswith('orientation_')],
            key=lambda x: int(x.split('_')[1])
        )
        if not orient_keys:
            raise ValueError(f"File {first_file_path} contains no 'orientation_' groups.")

        # Set reference parameters from orientation_0
        try:
            params_group = hf['orientation_0/parameters']
            reference_params['fields'] = params_group['field_magnitudes'][:]
            reference_params['temps'] = params_group['temperatures'][:]
            reference_params['num_orient'] = len(orient_keys)
            reference_params['vectors'] = [
                hf[f'{ok}/parameters/B_unit_vector'][:] for ok in orient_keys
            ]
        except KeyError as e:
            raise ValueError(f"File {first_file_path} (orientation_0) is missing required parameter data: {e}")

        # Initialise aggregated_data with static info and rates from the first file
        for i, ok in enumerate(orient_keys):
            orient_idx = int(ok.split('_')[1])
            orientation_group = hf[ok]

            weight = orientation_group.attrs.get('weight', 1.0)
            if not orientation_weights or orient_idx >= len(orientation_weights):
                orientation_weights.extend([0.0] * (orient_idx + 1 - len(orientation_weights)))
            orientation_weights[orient_idx] = weight

            results_group = orientation_group['results']
            num_states = results_group['field_0']['electronic_energies'].shape[0]
            num_fields = len(reference_params['fields'])
            num_temps = len(reference_params['temps'])

            energies = np.zeros((num_states, num_fields))
            moments = np.zeros((num_states, num_fields))
            
            # Initialize storage for both tau rates and gamma matrices
            total_rates = np.zeros((num_temps, num_fields))
            
            for f_idx in range(num_fields):
                field_group = results_group[f'field_{f_idx}']
                energies[:, f_idx] = field_group['electronic_energies'][:]
                moments[:, f_idx] = field_group['magnetic_moments'][:]
                
                # Helper to accumulate tau rates
                def add_tau(source_group, rate_name):
                    if rate_name in source_group:
                        total_rates[:, f_idx] += source_group[rate_name][:]

                if 'all' in tau_dynamics or 'raman' in tau_dynamics:
                    if 'raman' in field_group:
                        pass
                    if 'raman_rates' in field_group:
                         add_tau(field_group, 'raman_rates')
                
                if 'all' in tau_dynamics or 'orbach' in tau_dynamics:
                    if 'orbach' in field_group:
                        pass
                    if 'orbach_rates' in field_group:
                         add_tau(field_group, 'orbach_rates')

            aggregated_data[orient_idx] = {
                'fields': reference_params['fields'],
                'temperatures': reference_params['temps'],
                'energies': energies,
                'magnetic_moments': moments,
                'total_rates': total_rates
            }

    # --- 2. Validate and aggregate rates from REMAINING files ---
    for file_path in input_files[1:]:
        with h5py.File(file_path, 'r') as hf:
            orient_keys = sorted(
                [k for k in hf.keys() if k.startswith('orientation_')],
                key=lambda x: int(x.split('_')[1])
            )
            if not orient_keys:
                print(f"Warning: File {file_path} contains no 'orientation_' groups. Skipping.")
                continue

            # --- Validation Check ---
            try:
                current_num_orient = len(orient_keys)
                if current_num_orient != reference_params['num_orient']:
                    raise ValueError(f"has {current_num_orient} orientations, but expected {reference_params['num_orient']}")
                
                params_group = hf['orientation_0/parameters']
                if not np.allclose(params_group['field_magnitudes'][:], reference_params['fields']):
                    raise ValueError("has incompatible 'field_magnitudes'")
                if not np.allclose(params_group['temperatures'][:], reference_params['temps']):
                    raise ValueError("has incompatible 'temperatures'")

                current_vectors = [hf[f'{ok}/parameters/B_unit_vector'][:] for ok in orient_keys]
                for i in range(current_num_orient):
                    if not np.allclose(current_vectors[i], reference_params['vectors'][i]):
                        raise ValueError(f"has a mismatch in orientation vector for orientation_{i}")
            except Exception as e:
                # Catch both KeyErrors and ValueErrors
                raise ValueError(
                    f"File {file_path} is not compatible with {input_files[0]}: {e}. "
                    "All files must be from the same orientation grid."
                )

            # --- Aggregation ---
            for ok in orient_keys:
                orient_idx = int(ok.split('_')[1])
                orientation_group = hf[ok]
                results_group = orientation_group['results']
                
                current_tau_rates = aggregated_data[orient_idx]['total_rates']
                num_fields = len(reference_params['fields'])

                for f_idx in range(num_fields):
                    field_group = results_group[f'field_{f_idx}']
                    
                    def add_tau(source_group, rate_name):
                         if rate_name in source_group:
                            current_tau_rates[:, f_idx] += source_group[rate_name][:]

                    if 'all' in tau_dynamics or 'raman' in tau_dynamics:
                        if 'raman' in field_group:
                            pass
                        if 'raman_rates' in field_group:
                             add_tau(field_group, 'raman_rates')
                    
                    if 'all' in tau_dynamics or 'orbach' in tau_dynamics:
                        if 'orbach' in field_group:
                            pass
                        if 'orbach_rates' in field_group:
                             add_tau(field_group, 'orbach_rates')

    # Convert dict to a sorted list of data dictionaries
    data_list = [aggregated_data[i] for i in sorted(aggregated_data.keys())]

    return data_list, orientation_weights
def print_hysteresis_parameters(args)
Expand source code
def print_hysteresis_parameters(args):
    """
    Prints a formatted summary of the hysteresis simulation parameters.

    Args:
        args (argparse.Namespace): The parsed command-line arguments.
    """
    print("\n--- Calculation Parameters ---")
    p = vars(args).copy()

    def _print_param(key, display_name=None):
        if key in p and p[key] is not None:
            if not display_name:
                display_name = key.replace('_', ' ').capitalize()
            value = p[key]
            # Use _format_list for lists, except for input_files
            if isinstance(value, list) and key != 'input_files':
                value = _format_list(value)
            # Special handling for input_files list
            elif key == 'input_files':
                value = f"[{', '.join(value)}]"
            
            print(f"  {display_name:<28}: {value}")

    _print_param('mode')
    _print_param('input_files')
    _print_param('output', 'Output prefix')
    _print_param('temperatures')
    _print_param('init_field', 'Initial field(s)')
    _print_param('sweep_rate')
    _print_param('time_step')
    _print_param('population_propagation')
    _print_param('n_cores')
    _print_param('state_limit')
    _print_param('tau_dynamics', 'Dynamics included')

Prints a formatted summary of the hysteresis simulation parameters.

Args

args : argparse.Namespace
The parsed command-line arguments.
def run_hysteresis_for_orient_temp(args_tuple)
Expand source code
def run_hysteresis_for_orient_temp(args_tuple):
    """
    Run a single hysteresis simulation for one orientation and temperature.
    """
    orientation_data, temp, args = args_tuple
    
    sim = HysteresisSim(
        orientation_data,
        init_field=args.init_field,
        sweep_rate=args.sweep_rate,
        time_step=args.time_step,
        state_limit=args.state_limit,
        population_propagation=args.population_propagation
    )

    mag_result = sim.run_sweep(temp)
    return sim.field_list, mag_result

Run a single hysteresis simulation for one orientation and temperature.

def run_hysteresis_simulation(args)
Expand source code
def run_hysteresis_simulation(args):
    """
    Main entry point for the 'tau2 hyst' command.
    """
    
    print_hysteresis_parameters(args)

    print("\n--- Starting Hysteresis Simulation ---")

    # 1. Load and aggregate data
    print(f"Loading and aggregating data from: {', '.join(args.input_files)}")
    all_orient_data, orientation_weights = load_and_aggregate_rates(
        args.input_files, args.tau_dynamics
    )
    num_orientations = len(all_orient_data)
    print(f"Found {num_orientations} orientation(s).")

    # 2. Create Dask tasks
    tasks = []
    for i, orient_data in enumerate(all_orient_data):
        for temp in args.temperatures:
            tasks.append((orient_data, temp, args))

    print(f"Executing {len(tasks)} simulations ({len(args.temperatures)} temperatures x "
          f"{num_orientations} orientations) on {args.n_cores} core(s).")

    if args.n_cores > 1:
        lazy_results = [dask.delayed(run_hysteresis_for_orient_temp)(task) for task in tasks]
        results_flat = dask.compute(*lazy_results, scheduler='processes', num_workers=args.n_cores)
    else:
        results_flat = [run_hysteresis_for_orient_temp(task) for task in tasks]

    # 3. Process and average results
    if not results_flat:
        print("No simulation data was generated. Exiting.")
        return

    # Unflatten results
    fields = results_flat[0][0]
    mag_results_by_temp = {temp: [] for temp in args.temperatures}

    task_idx = 0
    for i in range(num_orientations):
        for temp in args.temperatures:
            mag_results_by_temp[temp].append(results_flat[task_idx][1])
            task_idx += 1

    # Weighted average
    magnetisation_average = {}
    total_weight = sum(orientation_weights)

    for temp, mag_lists in mag_results_by_temp.items():
        weighted_sum = np.zeros_like(mag_lists[0])
        for i, mag_list in enumerate(mag_lists):
            weighted_sum += np.array(mag_list) * orientation_weights[i]
        magnetisation_average[temp] = weighted_sum / total_weight

    # 4. Downsample, save, and plot
    if len(fields) > 2000:
        hysteresis_data = {}
        fieldpoints = np.linspace(fields[0], fields[-1], num=1001, endpoint=True)
        for temp in args.temperatures:
            interp_func = interp1d(fields, magnetisation_average[temp],
                                   bounds_error=False, fill_value='extrapolate')
            hysteresis_data[temp] = interp_func(fieldpoints)
    else:
        fieldpoints = fields
        hysteresis_data = magnetisation_average

    if args.save_fig or args.show_plots:
        plot_hysteresis(fieldpoints, hysteresis_data, args.temperatures,
                        output_filename=args.output,
                        xlim=args.xlim, ylim=args.ylim,
                        xlim_zoom=args.xlim_zoom, ylim_zoom=args.ylim_zoom,
                        xaxis_units=args.xaxis_units, show_plots=args.show_plots)
        print(f"Hysteresis plot saved to '{args.output}.pdf'.")

    if args.save_csv:
        save_hysteresis(fieldpoints, hysteresis_data,
                        output_filename=args.output)
        print(f"Hysteresis data saved to '{args.output}.csv'.")

    # 5. Calculate and print coercive field and remnant magnetisation
    print("\n--- Coercivity and Remnance ---")
    print(f"{'T (K)':<7} {'H_c (Oe)':>10} {'M_r (μ_B)':>10}")
    print(f"{'-'*7:<7} {'-'*10:>10} {'-'*10:>10}")
    for temp in args.temperatures:

        mag_at_temp = magnetisation_average[temp]

        # Calculate Remnant Magnetisation (M at H=0)
        try:
            # Create M(H) interpolator
            mr_interp = interp1d(fields, mag_at_temp, bounds_error=False,
                                 fill_value="extrapolate")
            remnant_mag = mr_interp(0.0)
            mr_str = f"{remnant_mag:>10.4f}"
        except ValueError:
            mr_str = f"{'not found':>10}"

        # Calculate Coercive Field (H at M=0)
        try:
            # Create H(M) interpolator
            hc_interp = interp1d(mag_at_temp, fields, bounds_error=False,
                                 fill_value="extrapolate")
            coercive_field_T = hc_interp(0.0)
            hc_str = f"{abs(coercive_field_T * 10000):>10.3f}"
        except ValueError:
            hc_str = f"{'(not found)':>10}"

        print(f"{temp:<7.1f} {hc_str} {mr_str}")

Main entry point for the 'tau2 hyst' command.

Classes

class HysteresisSim (data_dict,
init_field=-7,
sweep_rate=0.0022,
time_step=1,
state_limit=4,
population_propagation='implicit-euler')
Expand source code
class HysteresisSim:
    """
    Manages data and execution for the simulation of a single orientation.
    """

    def __init__(self, data_dict, init_field=-7, sweep_rate=22/10000, time_step=1,
                 state_limit=4, population_propagation='implicit-euler'):
        """
        Initialise the Hysteresis Simulation: load data, and construct data at negative fields.

        Args:
            data_dict (dict): Aggregated data (fields, temps, energies, moments, rates).
            init_field (float/list): Initial field or sweep bounds.
            sweep_rate (float/list): Sweep rate(s).
            time_step (float): Time step.
            state_limit (int): Number of states.
            population_propagation (str): 'implicit-euler' or 'fixed'.
        """
        self.population_propagation = population_propagation
        self.time_step = time_step

        # Use full available states for initial tracking to avoid edge effects
        self.fields_array = data_dict['fields']
        self.temp_array = data_dict['temperatures']
        
        # Determine number of states to track (use all available)
        total_states = data_dict['energies'].shape[0]
        
        raw_energies = data_dict['energies']
        raw_moments = data_dict['magnetic_moments']
        
        # Ensure fields are monotonic 0 -> B_max for tracking
        sort_indices = np.argsort(self.fields_array)
        sorted_fields = self.fields_array[sort_indices]
        
        energies = raw_energies[:, sort_indices]
        moments = raw_moments[:, sort_indices]

        # Track states linearly to fix crossing issues
        energies, moments = self._track_states(energies, moments, sorted_fields)
        
        # Construct Negative Branch using Time-Reversal Symmetry
        tr_map = np.arange(total_states)
        for i in range(0, total_states, 2):
            if i + 1 < total_states:
                tr_map[i] = i + 1
                tr_map[i+1] = i
        
        neg_energies = energies[tr_map, :][:, ::-1]
        neg_moments = -moments[tr_map, :][:, ::-1]
        neg_fields = -sorted_fields[::-1]

        # Concatenate and Handle Overlap at 0
        if len(neg_fields) > 0 and len(sorted_fields) > 0 and \
           np.isclose(neg_fields[-1], sorted_fields[0]):
            # Remove the last point from the negative side arrays
            neg_fields = neg_fields[:-1]
            neg_energies = neg_energies[:, :-1]
            neg_moments = neg_moments[:, :-1]
            has_overlap_0 = True
        else:
            has_overlap_0 = False

        self.neg_fields_array = np.concatenate((neg_fields, sorted_fields))
        full_energies = np.concatenate((neg_energies, energies), axis=1)
        full_moments = np.concatenate((neg_moments, moments), axis=1)
        self.full_energies = full_energies
        
        # Tau Model: Use 'total_rates' (1/τ)
        raw_rates = data_dict['total_rates'][:, sort_indices] # (n_temps, n_fields)
        neg_rates = raw_rates[:, ::-1]
        
        if has_overlap_0:
            neg_rates = neg_rates[:, :-1]
            
        full_rates = np.concatenate((neg_rates, raw_rates), axis=1)
        
        # Logarithmic interpolation
        self.log_neg_total_rates = np.log10(full_rates, where=full_rates > 0)
        self.log_neg_total_rates[np.isneginf(self.log_neg_total_rates)] = -99
        
        self.rate_interpolation = RegularGridInterpolator(
            (self.temp_array, self.neg_fields_array), self.log_neg_total_rates,
            method='linear', bounds_error=False, fill_value=None)
        
        # Slice output arrays to state_limit
        self.neg_field_evals = full_energies[:state_limit, :]
        self.neg_field_magmom = full_moments[:state_limit, :]

        # Interpolators
        self.eval_interpolation = [interp1d(
            self.neg_fields_array, self.neg_field_evals[i], bounds_error=False,
            fill_value='extrapolate') for i in np.arange(0, state_limit)]
        
        self.magmom_interpolation = [interp1d(
            self.neg_fields_array, self.neg_field_magmom[i], bounds_error=False,
            fill_value='extrapolate') for i in np.arange(0, state_limit)]

        # Set up the field sweep list
        self.field_list = self._create_field_sweep(init_field, sweep_rate, time_step)

        self.eval_list = np.array([interp(self.field_list) for interp in self.eval_interpolation])
        self.magmom_list = np.array([interp(self.field_list) for interp in self.magmom_interpolation])

        self.magnetisation_results = {}

    def _track_states(self, energies, moments, fields):
        """
        Track states across field steps to maintain diabatic continuity.
        
        Uses the Zeeman linearity (E ~ E_0 - mu*B) to predict the energy order
        at the next step and reorders the actual diagonalised states to match.
        """
        n_states, n_fields = energies.shape
        
        # Arrays to hold tracked data
        tracked_energies = np.zeros_like(energies)
        tracked_moments = np.zeros_like(moments)
        
        # Initialise with the first field point
        tracked_energies[:, 0] = energies[:, 0]
        tracked_moments[:, 0] = moments[:, 0]
        
        for k in range(1, n_fields):
            dB = fields[k] - fields[k-1]
            
            # Predict next energies based on current moments: E(B + dB) ~ E(B) - mu * dB
            pred_E = tracked_energies[:, k-1] - tracked_moments[:, k-1] * MU_B_CM_T * dB
            
            # Actual states at k (sorted by energy)
            curr_E = energies[:, k]
            
            # Cost matrix for assignment
            cost = np.abs(pred_E[:, np.newaxis] - curr_E[np.newaxis, :])
            
            # Find best matching
            _, col_inds = linear_sum_assignment(cost)
            
            # Reorder current step data
            tracked_energies[:, k] = energies[col_inds, k]
            tracked_moments[:, k] = moments[col_inds, k]
            
        return tracked_energies, tracked_moments

    def _create_field_sweep(self, init_field, sweep_rate, time_step):
        # Handle single value convenience case
        if isinstance(sweep_rate, list) and len(sweep_rate) == 1:
            sweep_rate = sweep_rate[0]
        if isinstance(init_field, list) and len(init_field) == 1:
            init_field = init_field[0]

        start_field = init_field if isinstance(init_field, (int, float)) else init_field[0]
        
        if isinstance(sweep_rate, (int, float)):
            if not isinstance(init_field, (int, float)):
                raise ValueError("Mismatch in init_field/sweep_rate args.")
            end_field = -init_field
            num_steps = int(np.ceil(abs(end_field - init_field) / (sweep_rate * time_step)))
            if num_steps == 0: return [init_field]
            return list(np.linspace(init_field, end_field, num=num_steps + 1))
        
        else:
            if isinstance(init_field, (int, float)) or len(init_field) != len(sweep_rate):
                 raise ValueError("Mismatch in number of init_fields and sweep_rates.")

            sweep_rate_list = list(sweep_rate)
            sweep_field_list = list(init_field)
            sweep_rate_list.extend(sweep_rate_list[::-1])
            sweep_field_list.append(0.0)
            sweep_field_list.extend([-field for field in init_field[::-1]])

            final_field_list = np.array([])
            current_field = sweep_field_list[0]

            for i in range(len(sweep_rate_list)):
                rate = sweep_rate_list[i]
                end_field = sweep_field_list[i+1]
                num_steps = int(np.ceil(abs(end_field - current_field) / (rate * time_step)))
                if num_steps > 0:
                    start_idx = 1 if i > 0 else 0
                    points = np.linspace(current_field, end_field, num=num_steps + 1)[start_idx:]
                    final_field_list = np.concatenate((final_field_list, points))
                current_field = end_field
            
            return list(final_field_list)

    def run_sweep(self, temp):
        """
        Perform a hysteresis sweep simulation at a specified temperature.
        """
        # Find index of starting field to get all energies for partition function
        start_field = self.field_list[0]
        start_field_idx = np.argmin(np.abs(self.neg_fields_array - start_field))
        all_energies_at_start = self.full_energies[:, start_field_idx]

        # Calculate partition function over ALL states to get correct normalisation
        Z_full = np.sum(np.exp(-all_energies_at_start / (kB * temp)))

        # Initial populations for the truncated subspace
        initial_populations = np.exp(-self.eval_list[:, 0] / (kB * temp)) / Z_full
        num_fields = len(self.field_list)
        use_implicit_euler = (self.population_propagation == 'implicit-euler')

        point_grid = np.array([[temp] * num_fields, self.field_list]).T

        # Clip field values to the grid boundaries to prevent extrapolation errors.
        field_grid_bounds = [self.neg_fields_array[0], self.neg_fields_array[-1]]
        point_grid[:, 1] = np.clip(point_grid[:, 1], field_grid_bounds[0], field_grid_bounds[1])

        rates = 10**self.rate_interpolation(point_grid)
        
        # Calculate Equilibrium Populations (needed for tau relaxation target)
        Z = np.sum(np.exp(-self.eval_list / (kB * temp)), axis=0)
        eq_pop_list = np.exp(-self.eval_list / (kB * temp)) / Z
        
        magnetisation_list = _hysteresis_loop_tau_jit(
            num_fields, self.time_step, rates, self.magmom_list,
            eq_pop_list, initial_populations, use_implicit_euler
        )

        self.magnetisation_results[temp] = magnetisation_list
        return magnetisation_list

Manages data and execution for the simulation of a single orientation.

Initialise the Hysteresis Simulation: load data, and construct data at negative fields.

Args

data_dict : dict
Aggregated data (fields, temps, energies, moments, rates).
init_field (float/list): Initial field or sweep bounds.
sweep_rate (float/list): Sweep rate(s).
time_step : float
Time step.
state_limit : int
Number of states.
population_propagation : str
'implicit-euler' or 'fixed'.

Methods

def run_sweep(self, temp)
Expand source code
def run_sweep(self, temp):
    """
    Perform a hysteresis sweep simulation at a specified temperature.
    """
    # Find index of starting field to get all energies for partition function
    start_field = self.field_list[0]
    start_field_idx = np.argmin(np.abs(self.neg_fields_array - start_field))
    all_energies_at_start = self.full_energies[:, start_field_idx]

    # Calculate partition function over ALL states to get correct normalisation
    Z_full = np.sum(np.exp(-all_energies_at_start / (kB * temp)))

    # Initial populations for the truncated subspace
    initial_populations = np.exp(-self.eval_list[:, 0] / (kB * temp)) / Z_full
    num_fields = len(self.field_list)
    use_implicit_euler = (self.population_propagation == 'implicit-euler')

    point_grid = np.array([[temp] * num_fields, self.field_list]).T

    # Clip field values to the grid boundaries to prevent extrapolation errors.
    field_grid_bounds = [self.neg_fields_array[0], self.neg_fields_array[-1]]
    point_grid[:, 1] = np.clip(point_grid[:, 1], field_grid_bounds[0], field_grid_bounds[1])

    rates = 10**self.rate_interpolation(point_grid)
    
    # Calculate Equilibrium Populations (needed for tau relaxation target)
    Z = np.sum(np.exp(-self.eval_list / (kB * temp)), axis=0)
    eq_pop_list = np.exp(-self.eval_list / (kB * temp)) / Z
    
    magnetisation_list = _hysteresis_loop_tau_jit(
        num_fields, self.time_step, rates, self.magmom_list,
        eq_pop_list, initial_populations, use_implicit_euler
    )

    self.magnetisation_results[temp] = magnetisation_list
    return magnetisation_list

Perform a hysteresis sweep simulation at a specified temperature.