Ohm Solver: Magnetic Reconnection

Hybrid-PIC codes are often used to simulate magnetic reconnection in space plasmas. An example of magnetic reconnection from a force-free sheet is provided, based on the simulation described in Le et al. [14].

Run

The following Python script configures and launches the simulation.

Script inputs_test_2d_ohm_solver_magnetic_reconnection_picmi.py
Listing 51 You can copy this file from Examples/Tests/ohm_solver_magnetic_reconnection/inputs_test_2d_ohm_solver_magnetic_reconnection_picmi.py.
#!/usr/bin/env python3
#
# --- Test script for the kinetic-fluid hybrid model in WarpX wherein ions are
# --- treated as kinetic particles and electrons as an isothermal, inertialess
# --- background fluid. The script demonstrates the use of this model to
# --- simulate magnetic reconnection in a force-free sheet. The setup is based
# --- on the problem described in Le et al. (2016)
# --- https://aip.scitation.org/doi/10.1063/1.4943893.

import argparse
import shutil
import sys
from pathlib import Path

import dill
import numpy as np
from mpi4py import MPI as mpi

from pywarpx import callbacks, libwarpx, picmi
from pywarpx.LoadThirdParty import load_cupy

constants = picmi.constants

comm = mpi.COMM_WORLD

simulation = picmi.Simulation(warpx_serialize_initial_conditions=True, verbose=0)


def get_xp():
    xp, _ = load_cupy()
    return xp


class ForceFreeSheetReconnection(object):
    # B0 is chosen with all other quantities scaled by it
    B0 = 0.1  # Initial magnetic field strength (T)

    # Physical parameters
    m_ion = 400.0  # Ion mass (electron masses)

    beta_e = 0.1
    Bg = 0.3  # times B0 - guiding field
    dB = 0.01  # times B0 - initial perturbation to seed reconnection

    T_ratio = 5.0  # T_i / T_e

    # Domain parameters
    LX = 40  # ion skin depths
    LZ = 20  # ion skin depths

    LT = 50  # ion cyclotron periods
    DT = 1e-3  # ion cyclotron periods

    # Resolution parameters
    NX = 512
    NZ = 512

    # Starting number of particles per cell
    NPPC = 400

    # Plasma resistivity - used to dampen the mode excitation
    eta = 6e-3  # normalized resistivity
    # Number of substeps used to update B
    substeps = 20

    def __init__(self, test, verbose):
        self.test = test
        self.verbose = verbose or self.test

        # calculate various plasma parameters based on the simulation input
        self.get_plasma_quantities()

        self.Lx = self.LX * self.l_i
        self.Lz = self.LZ * self.l_i

        self.dt = self.DT * self.t_ci

        # run very low resolution as a CI test
        if self.test:
            self.total_steps = 20
            self.diag_steps = self.total_steps // 5
            self.NX = 128
            self.NZ = 128
        else:
            self.total_steps = int(self.LT / self.DT)
            self.diag_steps = self.total_steps // 200

        # Initial magnetic field
        self.Bg *= self.B0
        self.dB *= self.B0
        self.Bx = (
            f"{self.B0}*tanh(z*{1.0 / self.l_i})"
            f"+{-self.dB * self.Lx / (2.0 * self.Lz)}*cos({2.0 * np.pi / self.Lx}*x)"
            f"*sin({np.pi / self.Lz}*z)"
        )
        self.By = (
            f"sqrt({self.Bg**2 + self.B0**2}-({self.B0}*tanh(z*{1.0 / self.l_i}))**2)"
        )
        self.Bz = f"{self.dB}*sin({2.0 * np.pi / self.Lx}*x)*cos({np.pi / self.Lz}*z)"

        self.J0 = self.B0 / constants.mu0 / self.l_i

        # dump all the current attributes to a dill pickle file
        if comm.rank == 0:
            with open("sim_parameters.dpkl", "wb") as f:
                dill.dump(self, f)

        # print out plasma parameters
        if comm.rank == 0:
            print(
                f"Initializing simulation with input parameters:\n"
                f"\tTi = {self.Ti * 1e-3:.1f} keV\n"
                f"\tn0 = {self.n_plasma:.1e} m^-3\n"
                f"\tB0 = {self.B0:.2f} T\n"
                f"\tM/m = {self.m_ion:.0f}\n"
            )
            print(
                f"Plasma parameters:\n"
                f"\tl_i = {self.l_i:.1e} m\n"
                f"\tt_ci = {self.t_ci:.1e} s\n"
                f"\tv_ti = {self.vi_th:.1e} m/s\n"
                f"\tvA = {self.vA:.1e} m/s\n"
            )
            print(
                f"Numerical parameters:\n"
                f"\tdz = {self.Lz / self.NZ:.1e} m\n"
                f"\tdt = {self.dt:.1e} s\n"
                f"\tdiag steps = {self.diag_steps:d}\n"
                f"\ttotal steps = {self.total_steps:d}\n"
            )

        self.setup_run()

    def get_plasma_quantities(self):
        """Calculate various plasma parameters based on the simulation input."""

        # Ion mass (kg)
        self.M = self.m_ion * constants.m_e

        # Cyclotron angular frequency (rad/s) and period (s)
        self.w_ce = constants.q_e * abs(self.B0) / constants.m_e
        self.w_ci = constants.q_e * abs(self.B0) / self.M
        self.t_ci = 2.0 * np.pi / self.w_ci

        # Electron plasma frequency: w_pe / omega_ce = 2 is given
        self.w_pe = 2.0 * self.w_ce

        # calculate plasma density based on electron plasma frequency
        self.n_plasma = self.w_pe**2 * constants.m_e * constants.ep0 / constants.q_e**2

        # Ion plasma frequency (Hz)
        self.w_pi = np.sqrt(constants.q_e**2 * self.n_plasma / (self.M * constants.ep0))

        # Ion skin depth (m)
        self.l_i = constants.c / self.w_pi

        # # Alfven speed (m/s): vA = B / sqrt(mu0 * n * (M + m)) = c * omega_ci / w_pi
        self.vA = abs(self.B0) / np.sqrt(
            constants.mu0 * self.n_plasma * (constants.m_e + self.M)
        )

        # calculate Te based on beta
        self.Te = (
            self.beta_e
            * self.B0**2
            / (2.0 * constants.mu0 * self.n_plasma)
            / constants.q_e
        )
        self.Ti = self.Te * self.T_ratio

        # calculate thermal speeds
        self.ve_th = np.sqrt(self.Te * constants.q_e / constants.m_e)
        self.vi_th = np.sqrt(self.Ti * constants.q_e / self.M)

        # Ion Larmor radius (m)
        self.rho_i = self.vi_th / self.w_ci

        # Reference resistivity (Malakit et al.)
        self.eta0 = self.l_i * self.vA / (constants.ep0 * constants.c**2)

    def setup_run(self):
        """Setup simulation components."""

        #######################################################################
        # Set geometry and boundary conditions                                #
        #######################################################################

        # Create grid
        self.grid = picmi.Cartesian2DGrid(
            number_of_cells=[self.NX, self.NZ],
            lower_bound=[-self.Lx / 2.0, -self.Lz / 2.0],
            upper_bound=[self.Lx / 2.0, self.Lz / 2.0],
            lower_boundary_conditions=["periodic", "dirichlet"],
            upper_boundary_conditions=["periodic", "dirichlet"],
            lower_boundary_conditions_particles=["periodic", "reflecting"],
            upper_boundary_conditions_particles=["periodic", "reflecting"],
            warpx_max_grid_size=self.NZ,
        )
        simulation.time_step_size = self.dt
        simulation.max_steps = self.total_steps
        simulation.current_deposition_algo = "direct"
        simulation.particle_shape = 1
        simulation.use_filter = False
        simulation.verbose = self.verbose

        #######################################################################
        # Field solver and external field                                     #
        #######################################################################

        self.solver = picmi.HybridPICSolver(
            grid=self.grid,
            gamma=1.0,
            Te=self.Te,
            n0=self.n_plasma,
            n_floor=0.1 * self.n_plasma,
            plasma_resistivity=self.eta * self.eta0,
            substeps=self.substeps,
        )
        simulation.solver = self.solver

        B_ext = picmi.AnalyticInitialField(
            Bx_expression=self.Bx, By_expression=self.By, Bz_expression=self.Bz
        )
        simulation.add_applied_field(B_ext)

        #######################################################################
        # Particle types setup                                                #
        #######################################################################

        self.ions = picmi.Species(
            name="ions",
            charge="q_e",
            mass=self.M,
            initial_distribution=picmi.UniformDistribution(
                density=self.n_plasma,
                rms_velocity=[self.vi_th] * 3,
            ),
        )
        simulation.add_species(
            self.ions,
            layout=picmi.PseudoRandomLayout(
                grid=self.grid, n_macroparticles_per_cell=self.NPPC
            ),
        )

        #######################################################################
        # Add diagnostics                                                     #
        #######################################################################

        callbacks.installafterEsolve(self.check_fields)

        if self.test:
            particle_diag = picmi.ParticleDiagnostic(
                name="diag1",
                period=self.total_steps,
                species=[self.ions],
                data_list=["ux", "uy", "uz", "x", "z", "weighting"],
                # warpx_format='openpmd',
                # warpx_openpmd_backend='h5',
            )
            simulation.add_diagnostic(particle_diag)
            field_diag = picmi.FieldDiagnostic(
                name="diag1",
                grid=self.grid,
                period=self.total_steps,
                data_list=["B", "E", "phi"],
                # warpx_format='openpmd',
                # warpx_openpmd_backend='h5',
            )
            simulation.add_diagnostic(field_diag)

            # set the solver convergence criteria low since phi is only
            # calculated for diagnostic output testing
            simulation.self_fields_required_precision = 1e-3
            simulation.self_fields_verbosity = 1

        # reduced diagnostics for reconnection rate calculation
        # create a 2 l_i box around the X-point on which to measure
        # magnetic flux changes
        plane = picmi.ReducedDiagnostic(
            diag_type="FieldProbe",
            name="plane",
            period=self.diag_steps,
            path="diags/",
            extension="dat",
            probe_geometry="Plane",
            resolution=60,
            x_probe=0.0,
            z_probe=0.0,
            detector_radius=self.l_i,
            target_up_x=0,
            target_up_z=1.0,
        )
        simulation.add_diagnostic(plane)

        #######################################################################
        # Initialize                                                          #
        #######################################################################

        if comm.rank == 0:
            if Path.exists(Path("diags")):
                shutil.rmtree("diags")
            Path("diags/fields").mkdir(parents=True, exist_ok=True)

        # Initialize inputs and WarpX instance
        simulation.initialize_inputs()
        simulation.initialize_warpx()

    def check_fields(self):
        step = simulation.extension.warpx.getistep(lev=0) - 1

        if not (step == 1 or step % self.diag_steps == 0):
            return

        get_xp()

        rho = simulation.fields.get("rho_fp", level=0)[...] / self.J0

        Jiy = simulation.fields.get("current_fp", dir="y", level=0)[...] / self.J0
        Jy = (
            simulation.fields.get("hybrid_current_fp_plasma", dir="y", level=0)[...]
            / self.J0
        )

        Bx = simulation.fields.get("Bfield_fp", dir="x", level=0)[...] / self.B0
        By = simulation.fields.get("Bfield_fp", dir="y", level=0)[...] / self.B0
        Bz = simulation.fields.get("Bfield_fp", dir="z", level=0)[...] / self.B0

        if libwarpx.amr.ParallelDescriptor.MyProc() != 0:
            return

        # save the fields to file
        with open(f"diags/fields/fields_{step:06d}.npz", "wb") as f:
            np.savez(f, rho=rho, Jiy=Jiy, Jy=Jy, Bx=Bx, By=By, Bz=Bz)


##########################
# parse input parameters
##########################

parser = argparse.ArgumentParser()
parser.add_argument(
    "-t",
    "--test",
    help="toggle whether this script is run as a short CI test",
    action="store_true",
)
parser.add_argument(
    "-v",
    "--verbose",
    help="Verbose output",
    action="store_true",
)
args, left = parser.parse_known_args()
sys.argv = sys.argv[:1] + left

run = ForceFreeSheetReconnection(test=args.test, verbose=args.verbose)
simulation.step()

Running the full simulation should take about 4 hours if executed on 1 V100 GPU. For MPI-parallel runs, prefix these lines with mpiexec -n 4 ... or srun -n 4 ..., depending on the system.

python3 inputs_test_2d_ohm_solver_magnetic_reconnection_picmi.py

Analyze

The following script extracts the reconnection rate as a function of time and animates the evolution of the magnetic field (as shown below).

Script analysis.py
Listing 52 You can copy this file from Examples/Tests/ohm_solver_magnetic_reconnection/analysis.py.
#!/usr/bin/env python3
#
# --- Analysis script for the hybrid-PIC example of magnetic reconnection.

import glob

import dill
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import colors

plt.rcParams.update({"font.size": 20})

# load simulation parameters
with open("sim_parameters.dpkl", "rb") as f:
    sim = dill.load(f)

x_idx = 2
z_idx = 4
Ey_idx = 6
Bx_idx = 8

plane_data = np.loadtxt("diags/plane.dat", skiprows=1)

steps = np.unique(plane_data[:, 0])
num_steps = len(steps)
num_cells = plane_data.shape[0] // num_steps

plane_data = plane_data.reshape((num_steps, num_cells, plane_data.shape[1]))

times = plane_data[:, 0, 1]
dt = np.mean(np.diff(times))

plt.plot(
    times / sim.t_ci,
    np.mean(plane_data[:, :, Ey_idx], axis=1) / (sim.vA * sim.B0),
    "o-",
)

plt.grid()
plt.xlabel(r"$t/\tau_{c,i}$")
plt.ylabel("$<E_y>/v_AB_0$")
plt.title("Reconnection rate")
plt.tight_layout()
plt.savefig("diags/reconnection_rate.png")

if not sim.test:
    from matplotlib.animation import FFMpegWriter, FuncAnimation
    from scipy import interpolate

    # Animate the magnetic reconnection
    fig, axes = plt.subplots(3, 1, sharex=True, figsize=(7, 9))

    for ax in axes.flatten():
        ax.set_aspect("equal")
        ax.set_ylabel("$z/l_i$")

    axes[2].set_xlabel("$x/l_i$")

    datafiles = sorted(glob.glob("diags/fields/*.npz"))
    num_steps = len(datafiles)

    data0 = np.load(datafiles[0])

    sX = axes[0].imshow(
        data0["Jy"].T,
        origin="lower",
        norm=colors.TwoSlopeNorm(vmin=-0.6, vcenter=0.0, vmax=1.6),
        extent=[0, sim.LX, -sim.LZ / 2, sim.LZ / 2],
        cmap=plt.cm.RdYlBu_r,
    )
    # axes[0].set_ylim(-5, 5)
    cb = plt.colorbar(sX, ax=axes[0], label="$J_y/J_0$")
    cb.ax.set_yscale("linear")
    cb.ax.set_yticks([-0.5, 0.0, 0.75, 1.5])

    sY = axes[1].imshow(
        data0["By"].T,
        origin="lower",
        extent=[0, sim.LX, -sim.LZ / 2, sim.LZ / 2],
        cmap=plt.cm.plasma,
    )
    # axes[1].set_ylim(-5, 5)
    cb = plt.colorbar(sY, ax=axes[1], label="$B_y/B_0$")
    cb.ax.set_yscale("linear")

    sZ = axes[2].imshow(
        data0["Bz"].T,
        origin="lower",
        extent=[0, sim.LX, -sim.LZ / 2, sim.LZ / 2],
        # norm=colors.TwoSlopeNorm(vmin=-0.02, vcenter=0., vmax=0.02),
        cmap=plt.cm.RdBu,
    )
    cb = plt.colorbar(sZ, ax=axes[2], label="$B_z/B_0$")
    cb.ax.set_yscale("linear")

    # plot field lines
    x_grid = np.linspace(0, sim.LX, data0["Bx"][:-1].shape[0])
    z_grid = np.linspace(-sim.LZ / 2.0, sim.LZ / 2.0, data0["Bx"].shape[1])

    n_lines = 10
    start_x = np.zeros(n_lines)
    start_x[: n_lines // 2] = sim.LX
    start_z = np.linspace(-sim.LZ / 2.0 * 0.9, sim.LZ / 2.0 * 0.9, n_lines)
    step_size = 1.0 / 100.0

    def get_field_lines(Bx, Bz):
        field_line_coords = []

        Bx_interp = interpolate.interp2d(x_grid, z_grid, Bx[:-1].T)
        Bz_interp = interpolate.interp2d(x_grid, z_grid, Bz[:, :-1].T)

        for kk, z in enumerate(start_z):
            path_x = [start_x[kk]]
            path_z = [z]

            ii = 0
            while ii < 10000:
                ii += 1
                Bx = Bx_interp(path_x[-1], path_z[-1])[0]
                Bz = Bz_interp(path_x[-1], path_z[-1])[0]

                # print(path_x[-1], path_z[-1], Bx, Bz)

                # normalize and scale
                B_mag = np.sqrt(Bx**2 + Bz**2)
                if B_mag == 0:
                    break

                dx = Bx / B_mag * step_size
                dz = Bz / B_mag * step_size

                x_new = path_x[-1] + dx
                z_new = path_z[-1] + dz

                if (
                    np.isnan(x_new)
                    or x_new <= 0
                    or x_new > sim.LX
                    or abs(z_new) > sim.LZ / 2
                ):
                    break

                path_x.append(x_new)
                path_z.append(z_new)

            field_line_coords.append([path_x, path_z])
        return field_line_coords

    field_lines = []
    for path in get_field_lines(data0["Bx"], data0["Bz"]):
        path_x = path[0]
        path_z = path[1]
        (ln,) = axes[2].plot(path_x, path_z, "--", color="k")
        # draws arrows on the field lines
        # if path_x[10] > path_x[0]:
        axes[2].arrow(
            path_x[50],
            path_z[50],
            path_x[250] - path_x[50],
            path_z[250] - path_z[50],
            shape="full",
            length_includes_head=True,
            lw=0,
            head_width=1.0,
            color="g",
        )

        field_lines.append(ln)

    def animate(i):
        data = np.load(datafiles[i])
        sX.set_array(data["Jy"].T)
        sY.set_array(data["By"].T)
        sZ.set_array(data["Bz"].T)
        sZ.set_clim(-np.max(abs(data["Bz"])), np.max(abs(data["Bz"])))

        for ii, path in enumerate(get_field_lines(data["Bx"], data["Bz"])):
            path_x = path[0]
            path_z = path[1]
            field_lines[ii].set_data(path_x, path_z)

    anim = FuncAnimation(fig, animate, frames=num_steps - 1, repeat=True)

    writervideo = FFMpegWriter(fps=14)
    anim.save("diags/mag_reconnection.mp4", writer=writervideo)
Magnetic reconnection.

Fig. 10 Magnetic reconnection from a force-free sheet.