Spaces:
Paused
Paused
| # SPDX-FileCopyrightText: Copyright (c) 2023 - 2024 NVIDIA CORPORATION & AFFILIATES. | |
| # SPDX-FileCopyrightText: All rights reserved. | |
| # SPDX-License-Identifier: Apache-2.0 | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import torch | |
| from IPython.display import display | |
| from torch.utils.data import DataLoader, Dataset | |
| try: | |
| from .datasets import Dedalus2DDataset | |
| except: | |
| from datasets import Dedalus2DDataset | |
| class MHDDataloader(Dataset): | |
| "Dataloader for MHD Dataset with magnetic field" | |
| def __init__( | |
| self, dataset: Dedalus2DDataset, sub_x=1, sub_t=1, ind_x=None, ind_t=None | |
| ): | |
| self.dataset = dataset | |
| self.sub_x = sub_x | |
| self.sub_t = sub_t | |
| self.ind_x = ind_x | |
| self.ind_t = ind_t | |
| t, x, y = dataset.get_coords(0) | |
| self.x = x[:ind_x:sub_x] | |
| self.y = y[:ind_x:sub_x] | |
| self.t = t[:ind_t:sub_t] | |
| self.nx = len(self.x) | |
| self.ny = len(self.y) | |
| self.nt = len(self.t) | |
| self.num = num = len(self.dataset) | |
| self.x_slice = slice(0, self.ind_x, self.sub_x) | |
| self.t_slice = slice(0, self.ind_t, self.sub_t) | |
| def __len__(self): | |
| length = len(self.dataset) | |
| return length | |
| def __getitem__(self, index): | |
| "Gets input of dataloader, including data, t, x, and y" | |
| fields = self.dataset[index] | |
| # Data includes velocity and magnetic field | |
| velocity = fields["velocity"] | |
| magnetic_field = fields["magnetic field"] | |
| u = torch.from_numpy( | |
| velocity[ | |
| : self.ind_t : self.sub_t, | |
| 0, | |
| : self.ind_x : self.sub_x, | |
| : self.ind_x : self.sub_x, | |
| ] | |
| ) | |
| v = torch.from_numpy( | |
| velocity[ | |
| : self.ind_t : self.sub_t, | |
| 1, | |
| : self.ind_x : self.sub_x, | |
| : self.ind_x : self.sub_x, | |
| ] | |
| ) | |
| Bx = torch.from_numpy( | |
| magnetic_field[ | |
| : self.ind_t : self.sub_t, | |
| 0, | |
| : self.ind_x : self.sub_x, | |
| : self.ind_x : self.sub_x, | |
| ] | |
| ) | |
| By = torch.from_numpy( | |
| magnetic_field[ | |
| : self.ind_t : self.sub_t, | |
| 1, | |
| : self.ind_x : self.sub_x, | |
| : self.ind_x : self.sub_x, | |
| ] | |
| ) | |
| # shape is now (nt, nx, ny, nfields) | |
| data = torch.stack([u, v, Bx, By], dim=-1) | |
| data0 = data[0].reshape(1, self.nx, self.ny, -1).repeat(self.nt, 1, 1, 1) | |
| grid_t = ( | |
| torch.from_numpy(self.t) | |
| .reshape(self.nt, 1, 1, 1) | |
| .repeat(1, self.nx, self.ny, 1) | |
| ) | |
| grid_x = ( | |
| torch.from_numpy(self.x) | |
| .reshape(1, self.nx, 1, 1) | |
| .repeat(self.nt, 1, self.ny, 1) | |
| ) | |
| grid_y = ( | |
| torch.from_numpy(self.y) | |
| .reshape(1, 1, self.ny, 1) | |
| .repeat(self.nt, self.nx, 1, 1) | |
| ) | |
| inputs = torch.cat([grid_t, grid_x, grid_y, data0], dim=-1) | |
| outputs = data | |
| return inputs, outputs | |
| def create_dataloader( | |
| self, | |
| batch_size=1, | |
| shuffle=False, | |
| num_workers=0, | |
| pin_memory=False, | |
| distributed=False, | |
| ): | |
| "Creates dataloader and sampler based on whether distributed training is on" | |
| if distributed: | |
| sampler = torch.utils.data.DistributedSampler(self) | |
| dataloader = DataLoader( | |
| self, | |
| batch_size=batch_size, | |
| shuffle=False, | |
| sampler=sampler, | |
| num_workers=num_workers, | |
| pin_memory=pin_memory, | |
| ) | |
| else: | |
| sampler = None | |
| dataloader = DataLoader( | |
| self, | |
| batch_size=batch_size, | |
| shuffle=shuffle, | |
| num_workers=num_workers, | |
| pin_memory=pin_memory, | |
| ) | |
| return dataloader, sampler | |
| class MHDDataloaderVecPot(MHDDataloader): | |
| "Dataloader for MHD Dataset with vector potential" | |
| def __init__( | |
| self, dataset: Dedalus2DDataset, sub_x=1, sub_t=1, ind_x=None, ind_t=None | |
| ): | |
| self.dataset = dataset | |
| self.sub_x = sub_x | |
| self.sub_t = sub_t | |
| self.ind_x = ind_x | |
| self.ind_t = ind_t | |
| t, x, y = dataset.get_coords(0) | |
| self.x = x[:ind_x:sub_x] | |
| self.y = y[:ind_x:sub_x] | |
| self.t = t[:ind_t:sub_t] | |
| self.nx = len(self.x) | |
| self.ny = len(self.y) | |
| self.nt = len(self.t) | |
| self.num = num = len(self.dataset) | |
| self.x_slice = slice(0, self.ind_x, self.sub_x) | |
| self.t_slice = slice(0, self.ind_t, self.sub_t) | |
| def __len__(self): | |
| length = len(self.dataset) | |
| return length | |
| def __getitem__(self, index): | |
| "Gets input of dataloader, including data, t, x, and y" | |
| fields = self.dataset[index] | |
| # Data includes velocity and vector potential | |
| velocity = fields["velocity"] | |
| vector_potential = fields["vector potential"] | |
| u = torch.from_numpy( | |
| velocity[ | |
| : self.ind_t : self.sub_t, | |
| 0, | |
| : self.ind_x : self.sub_x, | |
| : self.ind_x : self.sub_x, | |
| ] | |
| ) | |
| v = torch.from_numpy( | |
| velocity[ | |
| : self.ind_t : self.sub_t, | |
| 1, | |
| : self.ind_x : self.sub_x, | |
| : self.ind_x : self.sub_x, | |
| ] | |
| ) | |
| A = torch.from_numpy( | |
| vector_potential[ | |
| : self.ind_t : self.sub_t, | |
| : self.ind_x : self.sub_x, | |
| : self.ind_x : self.sub_x, | |
| ] | |
| ) | |
| # shape is now (self.nt, self.nx, self.ny, nfields) | |
| data = torch.stack([u, v, A], dim=-1) | |
| data0 = data[0].reshape(1, self.nx, self.ny, -1).repeat(self.nt, 1, 1, 1) | |
| grid_t = ( | |
| torch.from_numpy(self.t) | |
| .reshape(self.nt, 1, 1, 1) | |
| .repeat(1, self.nx, self.ny, 1) | |
| ) | |
| grid_x = ( | |
| torch.from_numpy(self.x) | |
| .reshape(1, self.nx, 1, 1) | |
| .repeat(self.nt, 1, self.ny, 1) | |
| ) | |
| grid_y = ( | |
| torch.from_numpy(self.y) | |
| .reshape(1, 1, self.ny, 1) | |
| .repeat(self.nt, self.nx, 1, 1) | |
| ) | |
| inputs = torch.cat([grid_t, grid_x, grid_y, data0], dim=-1) | |
| outputs = data | |
| return inputs, outputs | |
| def create_dataloader( | |
| self, | |
| batch_size=1, | |
| shuffle=False, | |
| num_workers=0, | |
| pin_memory=False, | |
| distributed=False, | |
| ): | |
| "Creates dataloader and sampler based on whether distributed training is on" | |
| if distributed: | |
| sampler = torch.utils.data.DistributedSampler(self) | |
| dataloader = DataLoader( | |
| self, | |
| batch_size=batch_size, | |
| shuffle=False, | |
| sampler=sampler, | |
| num_workers=num_workers, | |
| pin_memory=pin_memory, | |
| ) | |
| else: | |
| sampler = None | |
| dataloader = DataLoader( | |
| self, | |
| batch_size=batch_size, | |
| shuffle=shuffle, | |
| num_workers=num_workers, | |
| pin_memory=pin_memory, | |
| ) | |
| return dataloader, sampler | |
| if __name__ == "__main__": | |
| dataset = Dedalus2DDataset( | |
| data_path="../mhd_data/simulation_outputs_Re250", | |
| output_names="output-????", | |
| field_names=["magnetic field", "velocity", "vector potential"], | |
| ) | |
| mhd_dataloader = MHDDataloader(dataset) | |
| mhd_vec_pot_dataloader = MHDDataloaderVecPot(dataset) | |
| data = mhd_dataloader[0] | |
| display(data) | |