carmelog's picture
init: magnetohydrodynamics with physicsnemo
830a558
# SPDX-FileCopyrightText: Copyright (c) 2023 - 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
from IPython.display import display
from torch.utils.data import DataLoader, Dataset
try:
from .datasets import Dedalus2DDataset
except:
from datasets import Dedalus2DDataset
class MHDDataloader(Dataset):
"Dataloader for MHD Dataset with magnetic field"
def __init__(
self, dataset: Dedalus2DDataset, sub_x=1, sub_t=1, ind_x=None, ind_t=None
):
self.dataset = dataset
self.sub_x = sub_x
self.sub_t = sub_t
self.ind_x = ind_x
self.ind_t = ind_t
t, x, y = dataset.get_coords(0)
self.x = x[:ind_x:sub_x]
self.y = y[:ind_x:sub_x]
self.t = t[:ind_t:sub_t]
self.nx = len(self.x)
self.ny = len(self.y)
self.nt = len(self.t)
self.num = num = len(self.dataset)
self.x_slice = slice(0, self.ind_x, self.sub_x)
self.t_slice = slice(0, self.ind_t, self.sub_t)
def __len__(self):
length = len(self.dataset)
return length
def __getitem__(self, index):
"Gets input of dataloader, including data, t, x, and y"
fields = self.dataset[index]
# Data includes velocity and magnetic field
velocity = fields["velocity"]
magnetic_field = fields["magnetic field"]
u = torch.from_numpy(
velocity[
: self.ind_t : self.sub_t,
0,
: self.ind_x : self.sub_x,
: self.ind_x : self.sub_x,
]
)
v = torch.from_numpy(
velocity[
: self.ind_t : self.sub_t,
1,
: self.ind_x : self.sub_x,
: self.ind_x : self.sub_x,
]
)
Bx = torch.from_numpy(
magnetic_field[
: self.ind_t : self.sub_t,
0,
: self.ind_x : self.sub_x,
: self.ind_x : self.sub_x,
]
)
By = torch.from_numpy(
magnetic_field[
: self.ind_t : self.sub_t,
1,
: self.ind_x : self.sub_x,
: self.ind_x : self.sub_x,
]
)
# shape is now (nt, nx, ny, nfields)
data = torch.stack([u, v, Bx, By], dim=-1)
data0 = data[0].reshape(1, self.nx, self.ny, -1).repeat(self.nt, 1, 1, 1)
grid_t = (
torch.from_numpy(self.t)
.reshape(self.nt, 1, 1, 1)
.repeat(1, self.nx, self.ny, 1)
)
grid_x = (
torch.from_numpy(self.x)
.reshape(1, self.nx, 1, 1)
.repeat(self.nt, 1, self.ny, 1)
)
grid_y = (
torch.from_numpy(self.y)
.reshape(1, 1, self.ny, 1)
.repeat(self.nt, self.nx, 1, 1)
)
inputs = torch.cat([grid_t, grid_x, grid_y, data0], dim=-1)
outputs = data
return inputs, outputs
def create_dataloader(
self,
batch_size=1,
shuffle=False,
num_workers=0,
pin_memory=False,
distributed=False,
):
"Creates dataloader and sampler based on whether distributed training is on"
if distributed:
sampler = torch.utils.data.DistributedSampler(self)
dataloader = DataLoader(
self,
batch_size=batch_size,
shuffle=False,
sampler=sampler,
num_workers=num_workers,
pin_memory=pin_memory,
)
else:
sampler = None
dataloader = DataLoader(
self,
batch_size=batch_size,
shuffle=shuffle,
num_workers=num_workers,
pin_memory=pin_memory,
)
return dataloader, sampler
class MHDDataloaderVecPot(MHDDataloader):
"Dataloader for MHD Dataset with vector potential"
def __init__(
self, dataset: Dedalus2DDataset, sub_x=1, sub_t=1, ind_x=None, ind_t=None
):
self.dataset = dataset
self.sub_x = sub_x
self.sub_t = sub_t
self.ind_x = ind_x
self.ind_t = ind_t
t, x, y = dataset.get_coords(0)
self.x = x[:ind_x:sub_x]
self.y = y[:ind_x:sub_x]
self.t = t[:ind_t:sub_t]
self.nx = len(self.x)
self.ny = len(self.y)
self.nt = len(self.t)
self.num = num = len(self.dataset)
self.x_slice = slice(0, self.ind_x, self.sub_x)
self.t_slice = slice(0, self.ind_t, self.sub_t)
def __len__(self):
length = len(self.dataset)
return length
def __getitem__(self, index):
"Gets input of dataloader, including data, t, x, and y"
fields = self.dataset[index]
# Data includes velocity and vector potential
velocity = fields["velocity"]
vector_potential = fields["vector potential"]
u = torch.from_numpy(
velocity[
: self.ind_t : self.sub_t,
0,
: self.ind_x : self.sub_x,
: self.ind_x : self.sub_x,
]
)
v = torch.from_numpy(
velocity[
: self.ind_t : self.sub_t,
1,
: self.ind_x : self.sub_x,
: self.ind_x : self.sub_x,
]
)
A = torch.from_numpy(
vector_potential[
: self.ind_t : self.sub_t,
: self.ind_x : self.sub_x,
: self.ind_x : self.sub_x,
]
)
# shape is now (self.nt, self.nx, self.ny, nfields)
data = torch.stack([u, v, A], dim=-1)
data0 = data[0].reshape(1, self.nx, self.ny, -1).repeat(self.nt, 1, 1, 1)
grid_t = (
torch.from_numpy(self.t)
.reshape(self.nt, 1, 1, 1)
.repeat(1, self.nx, self.ny, 1)
)
grid_x = (
torch.from_numpy(self.x)
.reshape(1, self.nx, 1, 1)
.repeat(self.nt, 1, self.ny, 1)
)
grid_y = (
torch.from_numpy(self.y)
.reshape(1, 1, self.ny, 1)
.repeat(self.nt, self.nx, 1, 1)
)
inputs = torch.cat([grid_t, grid_x, grid_y, data0], dim=-1)
outputs = data
return inputs, outputs
def create_dataloader(
self,
batch_size=1,
shuffle=False,
num_workers=0,
pin_memory=False,
distributed=False,
):
"Creates dataloader and sampler based on whether distributed training is on"
if distributed:
sampler = torch.utils.data.DistributedSampler(self)
dataloader = DataLoader(
self,
batch_size=batch_size,
shuffle=False,
sampler=sampler,
num_workers=num_workers,
pin_memory=pin_memory,
)
else:
sampler = None
dataloader = DataLoader(
self,
batch_size=batch_size,
shuffle=shuffle,
num_workers=num_workers,
pin_memory=pin_memory,
)
return dataloader, sampler
if __name__ == "__main__":
dataset = Dedalus2DDataset(
data_path="../mhd_data/simulation_outputs_Re250",
output_names="output-????",
field_names=["magnetic field", "velocity", "vector potential"],
)
mhd_dataloader = MHDDataloader(dataset)
mhd_vec_pot_dataloader = MHDDataloaderVecPot(dataset)
data = mhd_dataloader[0]
display(data)