Module tau2.hysteresis
Functions
def load_and_aggregate_rates(input_files, tau_dynamics=None)-
Expand source code
def load_and_aggregate_rates(input_files, tau_dynamics=None): if tau_dynamics is None: tau_dynamics = ['all'] """ Load and aggregate rate data from one or more Taupy rate HDF5 files. Performs a strict check to ensure all input files were generated with identical orientation grids, field magnitudes, and temperatures before aggregating the relaxation rates. Args: input_files (list): A list of paths to HDF5 files. tau_dynamics (list): List of rate types to include (e.g., ['raman', 'orbach']). Returns: tuple: A tuple containing: - list: A list of data dictionaries, one for each orientation. - list: A list of orientation weights. Raises: ValueError: If input files are not compatible (e.g., different orientation grids, field lists, or temperatures). """ if not input_files: raise ValueError("No input files provided.") aggregated_data = {} orientation_weights = [] reference_params = {} # --- 1. Load and initialise from the FIRST file --- first_file_path = input_files[0] with h5py.File(first_file_path, 'r') as hf: orient_keys = sorted( [k for k in hf.keys() if k.startswith('orientation_')], key=lambda x: int(x.split('_')[1]) ) if not orient_keys: raise ValueError(f"File {first_file_path} contains no 'orientation_' groups.") # Set reference parameters from orientation_0 try: params_group = hf['orientation_0/parameters'] reference_params['fields'] = params_group['field_magnitudes'][:] reference_params['temps'] = params_group['temperatures'][:] reference_params['num_orient'] = len(orient_keys) reference_params['vectors'] = [ hf[f'{ok}/parameters/B_unit_vector'][:] for ok in orient_keys ] except KeyError as e: raise ValueError(f"File {first_file_path} (orientation_0) is missing required parameter data: {e}") # Initialise aggregated_data with static info and rates from the first file for i, ok in enumerate(orient_keys): orient_idx = int(ok.split('_')[1]) orientation_group = hf[ok] weight = orientation_group.attrs.get('weight', 1.0) if not orientation_weights or orient_idx >= len(orientation_weights): orientation_weights.extend([0.0] * (orient_idx + 1 - len(orientation_weights))) orientation_weights[orient_idx] = weight results_group = orientation_group['results'] num_states = results_group['field_0']['electronic_energies'].shape[0] num_fields = len(reference_params['fields']) num_temps = len(reference_params['temps']) energies = np.zeros((num_states, num_fields)) moments = np.zeros((num_states, num_fields)) # Initialize storage for both tau rates and gamma matrices total_rates = np.zeros((num_temps, num_fields)) for f_idx in range(num_fields): field_group = results_group[f'field_{f_idx}'] energies[:, f_idx] = field_group['electronic_energies'][:] moments[:, f_idx] = field_group['magnetic_moments'][:] # Helper to accumulate tau rates def add_tau(source_group, rate_name): if rate_name in source_group: total_rates[:, f_idx] += source_group[rate_name][:] if 'all' in tau_dynamics or 'raman' in tau_dynamics: if 'raman' in field_group: pass if 'raman_rates' in field_group: add_tau(field_group, 'raman_rates') if 'all' in tau_dynamics or 'orbach' in tau_dynamics: if 'orbach' in field_group: pass if 'orbach_rates' in field_group: add_tau(field_group, 'orbach_rates') aggregated_data[orient_idx] = { 'fields': reference_params['fields'], 'temperatures': reference_params['temps'], 'energies': energies, 'magnetic_moments': moments, 'total_rates': total_rates } # --- 2. Validate and aggregate rates from REMAINING files --- for file_path in input_files[1:]: with h5py.File(file_path, 'r') as hf: orient_keys = sorted( [k for k in hf.keys() if k.startswith('orientation_')], key=lambda x: int(x.split('_')[1]) ) if not orient_keys: print(f"Warning: File {file_path} contains no 'orientation_' groups. Skipping.") continue # --- Validation Check --- try: current_num_orient = len(orient_keys) if current_num_orient != reference_params['num_orient']: raise ValueError(f"has {current_num_orient} orientations, but expected {reference_params['num_orient']}") params_group = hf['orientation_0/parameters'] if not np.allclose(params_group['field_magnitudes'][:], reference_params['fields']): raise ValueError("has incompatible 'field_magnitudes'") if not np.allclose(params_group['temperatures'][:], reference_params['temps']): raise ValueError("has incompatible 'temperatures'") current_vectors = [hf[f'{ok}/parameters/B_unit_vector'][:] for ok in orient_keys] for i in range(current_num_orient): if not np.allclose(current_vectors[i], reference_params['vectors'][i]): raise ValueError(f"has a mismatch in orientation vector for orientation_{i}") except Exception as e: # Catch both KeyErrors and ValueErrors raise ValueError( f"File {file_path} is not compatible with {input_files[0]}: {e}. " "All files must be from the same orientation grid." ) # --- Aggregation --- for ok in orient_keys: orient_idx = int(ok.split('_')[1]) orientation_group = hf[ok] results_group = orientation_group['results'] current_tau_rates = aggregated_data[orient_idx]['total_rates'] num_fields = len(reference_params['fields']) for f_idx in range(num_fields): field_group = results_group[f'field_{f_idx}'] def add_tau(source_group, rate_name): if rate_name in source_group: current_tau_rates[:, f_idx] += source_group[rate_name][:] if 'all' in tau_dynamics or 'raman' in tau_dynamics: if 'raman' in field_group: pass if 'raman_rates' in field_group: add_tau(field_group, 'raman_rates') if 'all' in tau_dynamics or 'orbach' in tau_dynamics: if 'orbach' in field_group: pass if 'orbach_rates' in field_group: add_tau(field_group, 'orbach_rates') # Convert dict to a sorted list of data dictionaries data_list = [aggregated_data[i] for i in sorted(aggregated_data.keys())] return data_list, orientation_weights def print_hysteresis_parameters(args)-
Expand source code
def print_hysteresis_parameters(args): """ Prints a formatted summary of the hysteresis simulation parameters. Args: args (argparse.Namespace): The parsed command-line arguments. """ print("\n--- Calculation Parameters ---") p = vars(args).copy() def _print_param(key, display_name=None): if key in p and p[key] is not None: if not display_name: display_name = key.replace('_', ' ').capitalize() value = p[key] # Use _format_list for lists, except for input_files if isinstance(value, list) and key != 'input_files': value = _format_list(value) # Special handling for input_files list elif key == 'input_files': value = f"[{', '.join(value)}]" print(f" {display_name:<28}: {value}") _print_param('mode') _print_param('input_files') _print_param('output', 'Output prefix') _print_param('temperatures') _print_param('init_field', 'Initial field(s)') _print_param('sweep_rate') _print_param('time_step') _print_param('population_propagation') _print_param('n_cores') _print_param('state_limit') _print_param('tau_dynamics', 'Dynamics included')Prints a formatted summary of the hysteresis simulation parameters.
Args
args:argparse.Namespace- The parsed command-line arguments.
def run_hysteresis_for_orient_temp(args_tuple)-
Expand source code
def run_hysteresis_for_orient_temp(args_tuple): """ Run a single hysteresis simulation for one orientation and temperature. """ orientation_data, temp, args = args_tuple sim = HysteresisSim( orientation_data, init_field=args.init_field, sweep_rate=args.sweep_rate, time_step=args.time_step, state_limit=args.state_limit, population_propagation=args.population_propagation ) mag_result = sim.run_sweep(temp) return sim.field_list, mag_resultRun a single hysteresis simulation for one orientation and temperature.
def run_hysteresis_simulation(args)-
Expand source code
def run_hysteresis_simulation(args): """ Main entry point for the 'tau2 hyst' command. """ print_hysteresis_parameters(args) print("\n--- Starting Hysteresis Simulation ---") # 1. Load and aggregate data print(f"Loading and aggregating data from: {', '.join(args.input_files)}") all_orient_data, orientation_weights = load_and_aggregate_rates( args.input_files, args.tau_dynamics ) num_orientations = len(all_orient_data) print(f"Found {num_orientations} orientation(s).") # 2. Create Dask tasks tasks = [] for i, orient_data in enumerate(all_orient_data): for temp in args.temperatures: tasks.append((orient_data, temp, args)) print(f"Executing {len(tasks)} simulations ({len(args.temperatures)} temperatures x " f"{num_orientations} orientations) on {args.n_cores} core(s).") if args.n_cores > 1: lazy_results = [dask.delayed(run_hysteresis_for_orient_temp)(task) for task in tasks] results_flat = dask.compute(*lazy_results, scheduler='processes', num_workers=args.n_cores) else: results_flat = [run_hysteresis_for_orient_temp(task) for task in tasks] # 3. Process and average results if not results_flat: print("No simulation data was generated. Exiting.") return # Unflatten results fields = results_flat[0][0] mag_results_by_temp = {temp: [] for temp in args.temperatures} task_idx = 0 for i in range(num_orientations): for temp in args.temperatures: mag_results_by_temp[temp].append(results_flat[task_idx][1]) task_idx += 1 # Weighted average magnetisation_average = {} total_weight = sum(orientation_weights) for temp, mag_lists in mag_results_by_temp.items(): weighted_sum = np.zeros_like(mag_lists[0]) for i, mag_list in enumerate(mag_lists): weighted_sum += np.array(mag_list) * orientation_weights[i] magnetisation_average[temp] = weighted_sum / total_weight # 4. Downsample, save, and plot if len(fields) > 2000: hysteresis_data = {} fieldpoints = np.linspace(fields[0], fields[-1], num=1001, endpoint=True) for temp in args.temperatures: interp_func = interp1d(fields, magnetisation_average[temp], bounds_error=False, fill_value='extrapolate') hysteresis_data[temp] = interp_func(fieldpoints) else: fieldpoints = fields hysteresis_data = magnetisation_average if args.save_fig or args.show_plots: plot_hysteresis(fieldpoints, hysteresis_data, args.temperatures, output_filename=args.output, xlim=args.xlim, ylim=args.ylim, xlim_zoom=args.xlim_zoom, ylim_zoom=args.ylim_zoom, xaxis_units=args.xaxis_units, show_plots=args.show_plots) print(f"Hysteresis plot saved to '{args.output}.pdf'.") if args.save_csv: save_hysteresis(fieldpoints, hysteresis_data, output_filename=args.output) print(f"Hysteresis data saved to '{args.output}.csv'.") # 5. Calculate and print coercive field and remnant magnetisation print("\n--- Coercivity and Remnance ---") print(f"{'T (K)':<7} {'H_c (Oe)':>10} {'M_r (μ_B)':>10}") print(f"{'-'*7:<7} {'-'*10:>10} {'-'*10:>10}") for temp in args.temperatures: mag_at_temp = magnetisation_average[temp] # Calculate Remnant Magnetisation (M at H=0) try: # Create M(H) interpolator mr_interp = interp1d(fields, mag_at_temp, bounds_error=False, fill_value="extrapolate") remnant_mag = mr_interp(0.0) mr_str = f"{remnant_mag:>10.4f}" except ValueError: mr_str = f"{'not found':>10}" # Calculate Coercive Field (H at M=0) try: # Create H(M) interpolator hc_interp = interp1d(mag_at_temp, fields, bounds_error=False, fill_value="extrapolate") coercive_field_T = hc_interp(0.0) hc_str = f"{abs(coercive_field_T * 10000):>10.3f}" except ValueError: hc_str = f"{'(not found)':>10}" print(f"{temp:<7.1f} {hc_str} {mr_str}")Main entry point for the 'tau2 hyst' command.
Classes
class HysteresisSim (data_dict,
init_field=-7,
sweep_rate=0.0022,
time_step=1,
state_limit=4,
population_propagation='implicit-euler')-
Expand source code
class HysteresisSim: """ Manages data and execution for the simulation of a single orientation. """ def __init__(self, data_dict, init_field=-7, sweep_rate=22/10000, time_step=1, state_limit=4, population_propagation='implicit-euler'): """ Initialise the Hysteresis Simulation: load data, and construct data at negative fields. Args: data_dict (dict): Aggregated data (fields, temps, energies, moments, rates). init_field (float/list): Initial field or sweep bounds. sweep_rate (float/list): Sweep rate(s). time_step (float): Time step. state_limit (int): Number of states. population_propagation (str): 'implicit-euler' or 'fixed'. """ self.population_propagation = population_propagation self.time_step = time_step # Use full available states for initial tracking to avoid edge effects self.fields_array = data_dict['fields'] self.temp_array = data_dict['temperatures'] # Determine number of states to track (use all available) total_states = data_dict['energies'].shape[0] raw_energies = data_dict['energies'] raw_moments = data_dict['magnetic_moments'] # Ensure fields are monotonic 0 -> B_max for tracking sort_indices = np.argsort(self.fields_array) sorted_fields = self.fields_array[sort_indices] energies = raw_energies[:, sort_indices] moments = raw_moments[:, sort_indices] # Track states linearly to fix crossing issues energies, moments = self._track_states(energies, moments, sorted_fields) # Construct Negative Branch using Time-Reversal Symmetry tr_map = np.arange(total_states) for i in range(0, total_states, 2): if i + 1 < total_states: tr_map[i] = i + 1 tr_map[i+1] = i neg_energies = energies[tr_map, :][:, ::-1] neg_moments = -moments[tr_map, :][:, ::-1] neg_fields = -sorted_fields[::-1] # Concatenate and Handle Overlap at 0 if len(neg_fields) > 0 and len(sorted_fields) > 0 and \ np.isclose(neg_fields[-1], sorted_fields[0]): # Remove the last point from the negative side arrays neg_fields = neg_fields[:-1] neg_energies = neg_energies[:, :-1] neg_moments = neg_moments[:, :-1] has_overlap_0 = True else: has_overlap_0 = False self.neg_fields_array = np.concatenate((neg_fields, sorted_fields)) full_energies = np.concatenate((neg_energies, energies), axis=1) full_moments = np.concatenate((neg_moments, moments), axis=1) self.full_energies = full_energies # Tau Model: Use 'total_rates' (1/τ) raw_rates = data_dict['total_rates'][:, sort_indices] # (n_temps, n_fields) neg_rates = raw_rates[:, ::-1] if has_overlap_0: neg_rates = neg_rates[:, :-1] full_rates = np.concatenate((neg_rates, raw_rates), axis=1) # Logarithmic interpolation self.log_neg_total_rates = np.log10(full_rates, where=full_rates > 0) self.log_neg_total_rates[np.isneginf(self.log_neg_total_rates)] = -99 self.rate_interpolation = RegularGridInterpolator( (self.temp_array, self.neg_fields_array), self.log_neg_total_rates, method='linear', bounds_error=False, fill_value=None) # Slice output arrays to state_limit self.neg_field_evals = full_energies[:state_limit, :] self.neg_field_magmom = full_moments[:state_limit, :] # Interpolators self.eval_interpolation = [interp1d( self.neg_fields_array, self.neg_field_evals[i], bounds_error=False, fill_value='extrapolate') for i in np.arange(0, state_limit)] self.magmom_interpolation = [interp1d( self.neg_fields_array, self.neg_field_magmom[i], bounds_error=False, fill_value='extrapolate') for i in np.arange(0, state_limit)] # Set up the field sweep list self.field_list = self._create_field_sweep(init_field, sweep_rate, time_step) self.eval_list = np.array([interp(self.field_list) for interp in self.eval_interpolation]) self.magmom_list = np.array([interp(self.field_list) for interp in self.magmom_interpolation]) self.magnetisation_results = {} def _track_states(self, energies, moments, fields): """ Track states across field steps to maintain diabatic continuity. Uses the Zeeman linearity (E ~ E_0 - mu*B) to predict the energy order at the next step and reorders the actual diagonalised states to match. """ n_states, n_fields = energies.shape # Arrays to hold tracked data tracked_energies = np.zeros_like(energies) tracked_moments = np.zeros_like(moments) # Initialise with the first field point tracked_energies[:, 0] = energies[:, 0] tracked_moments[:, 0] = moments[:, 0] for k in range(1, n_fields): dB = fields[k] - fields[k-1] # Predict next energies based on current moments: E(B + dB) ~ E(B) - mu * dB pred_E = tracked_energies[:, k-1] - tracked_moments[:, k-1] * MU_B_CM_T * dB # Actual states at k (sorted by energy) curr_E = energies[:, k] # Cost matrix for assignment cost = np.abs(pred_E[:, np.newaxis] - curr_E[np.newaxis, :]) # Find best matching _, col_inds = linear_sum_assignment(cost) # Reorder current step data tracked_energies[:, k] = energies[col_inds, k] tracked_moments[:, k] = moments[col_inds, k] return tracked_energies, tracked_moments def _create_field_sweep(self, init_field, sweep_rate, time_step): # Handle single value convenience case if isinstance(sweep_rate, list) and len(sweep_rate) == 1: sweep_rate = sweep_rate[0] if isinstance(init_field, list) and len(init_field) == 1: init_field = init_field[0] start_field = init_field if isinstance(init_field, (int, float)) else init_field[0] if isinstance(sweep_rate, (int, float)): if not isinstance(init_field, (int, float)): raise ValueError("Mismatch in init_field/sweep_rate args.") end_field = -init_field num_steps = int(np.ceil(abs(end_field - init_field) / (sweep_rate * time_step))) if num_steps == 0: return [init_field] return list(np.linspace(init_field, end_field, num=num_steps + 1)) else: if isinstance(init_field, (int, float)) or len(init_field) != len(sweep_rate): raise ValueError("Mismatch in number of init_fields and sweep_rates.") sweep_rate_list = list(sweep_rate) sweep_field_list = list(init_field) sweep_rate_list.extend(sweep_rate_list[::-1]) sweep_field_list.append(0.0) sweep_field_list.extend([-field for field in init_field[::-1]]) final_field_list = np.array([]) current_field = sweep_field_list[0] for i in range(len(sweep_rate_list)): rate = sweep_rate_list[i] end_field = sweep_field_list[i+1] num_steps = int(np.ceil(abs(end_field - current_field) / (rate * time_step))) if num_steps > 0: start_idx = 1 if i > 0 else 0 points = np.linspace(current_field, end_field, num=num_steps + 1)[start_idx:] final_field_list = np.concatenate((final_field_list, points)) current_field = end_field return list(final_field_list) def run_sweep(self, temp): """ Perform a hysteresis sweep simulation at a specified temperature. """ # Find index of starting field to get all energies for partition function start_field = self.field_list[0] start_field_idx = np.argmin(np.abs(self.neg_fields_array - start_field)) all_energies_at_start = self.full_energies[:, start_field_idx] # Calculate partition function over ALL states to get correct normalisation Z_full = np.sum(np.exp(-all_energies_at_start / (kB * temp))) # Initial populations for the truncated subspace initial_populations = np.exp(-self.eval_list[:, 0] / (kB * temp)) / Z_full num_fields = len(self.field_list) use_implicit_euler = (self.population_propagation == 'implicit-euler') point_grid = np.array([[temp] * num_fields, self.field_list]).T # Clip field values to the grid boundaries to prevent extrapolation errors. field_grid_bounds = [self.neg_fields_array[0], self.neg_fields_array[-1]] point_grid[:, 1] = np.clip(point_grid[:, 1], field_grid_bounds[0], field_grid_bounds[1]) rates = 10**self.rate_interpolation(point_grid) # Calculate Equilibrium Populations (needed for tau relaxation target) Z = np.sum(np.exp(-self.eval_list / (kB * temp)), axis=0) eq_pop_list = np.exp(-self.eval_list / (kB * temp)) / Z magnetisation_list = _hysteresis_loop_tau_jit( num_fields, self.time_step, rates, self.magmom_list, eq_pop_list, initial_populations, use_implicit_euler ) self.magnetisation_results[temp] = magnetisation_list return magnetisation_listManages data and execution for the simulation of a single orientation.
Initialise the Hysteresis Simulation: load data, and construct data at negative fields.
Args
data_dict:dict- Aggregated data (fields, temps, energies, moments, rates).
- init_field (float/list): Initial field or sweep bounds.
- sweep_rate (float/list): Sweep rate(s).
time_step:float- Time step.
state_limit:int- Number of states.
population_propagation:str- 'implicit-euler' or 'fixed'.
Methods
def run_sweep(self, temp)-
Expand source code
def run_sweep(self, temp): """ Perform a hysteresis sweep simulation at a specified temperature. """ # Find index of starting field to get all energies for partition function start_field = self.field_list[0] start_field_idx = np.argmin(np.abs(self.neg_fields_array - start_field)) all_energies_at_start = self.full_energies[:, start_field_idx] # Calculate partition function over ALL states to get correct normalisation Z_full = np.sum(np.exp(-all_energies_at_start / (kB * temp))) # Initial populations for the truncated subspace initial_populations = np.exp(-self.eval_list[:, 0] / (kB * temp)) / Z_full num_fields = len(self.field_list) use_implicit_euler = (self.population_propagation == 'implicit-euler') point_grid = np.array([[temp] * num_fields, self.field_list]).T # Clip field values to the grid boundaries to prevent extrapolation errors. field_grid_bounds = [self.neg_fields_array[0], self.neg_fields_array[-1]] point_grid[:, 1] = np.clip(point_grid[:, 1], field_grid_bounds[0], field_grid_bounds[1]) rates = 10**self.rate_interpolation(point_grid) # Calculate Equilibrium Populations (needed for tau relaxation target) Z = np.sum(np.exp(-self.eval_list / (kB * temp)), axis=0) eq_pop_list = np.exp(-self.eval_list / (kB * temp)) / Z magnetisation_list = _hysteresis_loop_tau_jit( num_fields, self.time_step, rates, self.magmom_list, eq_pop_list, initial_populations, use_implicit_euler ) self.magnetisation_results[temp] = magnetisation_list return magnetisation_listPerform a hysteresis sweep simulation at a specified temperature.