From 299674bf9bcf79aa2858f3a5c1f6acc39c10c9d5 Mon Sep 17 00:00:00 2001 From: Ge Dong Date: Tue, 12 Nov 2019 16:31:48 -0500 Subject: [PATCH 1/4] modified: plasma/conf_parser.py modified: plasma/models/builder.py modified: plasma/models/mpi_runner.py modified: plasma/utils/hashing.py plasma/models/torch_runner_dist.py --- examples/conf.yaml | 12 ++++++------ examples/slurm.cmd | 2 +- plasma/conf_parser.py | 22 ++++++++++++++++++++++ plasma/models/builder.py | 26 +++++++++++++++++++++++--- plasma/models/mpi_runner.py | 9 +++++++++ plasma/utils/hashing.py | 4 +--- 6 files changed, 62 insertions(+), 13 deletions(-) diff --git a/examples/conf.yaml b/examples/conf.yaml index dfda1145..25ccb6b5 100644 --- a/examples/conf.yaml +++ b/examples/conf.yaml @@ -10,7 +10,7 @@ paths: signal_prepath: '/signal_data/' # /signal_data/jet/ shot_list_dir: '/shot_lists/' tensorboard_save_path: '/Graph/' - data: d3d_data_0D # 'd3d_to_jet_data' # 'd3d_to_jet_data' # 'jet_to_d3d_data' # jet_data + data: d3d_data ##_0D # 'd3d_to_jet_data' # 'd3d_to_jet_data' # 'jet_to_d3d_data' # jet_data # if specific_signals: [] left empty, it will use all valid signals defined on a machine. Only use if need a custom set specific_signals: [] # ['q95','li','ip','betan','energy','lm','pradcore','pradedge','pradtot','pin','torquein','tmamp1','tmamp2','tmfreq1','tmfreq2','pechin','energydt','ipdirect','etemp_profile','edens_profile'] executable: "mpi_learn.py" @@ -74,10 +74,10 @@ model: mlp_regularization: 0.0001 skip_train: False # should a finished model be loaded if available # length of LSTM memory - pred_length: 200 + pred_length: 256 pred_batch_size: 128 # TODO(KGF): optimize length of LSTM memory - length: 128 + length: 256 skip: 1 # hidden layer size # TODO(KGF): optimize size of RNN layers @@ -85,12 +85,12 @@ model: # size 100 slight overfitting, size 20 no overfitting. 200 is not better than 100. Prediction much better with size 100, size 20 cannot capture the data. rnn_type: 'LSTM' # TODO(KGF): optimize number of RNN layers - rnn_layers: 2 + rnn_layers: 1 num_conv_filters: 128 size_conv_filters: 3 num_conv_layers: 3 pool_size: 2 - dense_size: 128 + dense_size: 64 extra_dense_input: False # have not found a difference yet optimizer: 'adam' @@ -103,7 +103,7 @@ model: lr: 0.00002 # 0.00001 # 0.0005 # for adam plots 0.0000001 # 0.00005 # 0.00005 # 0.00005 lr_decay: 0.97 # 0.98 # 0.9 stateful: True - return_sequences: True + return_sequences: True # True dropout_prob: 0.1 # only relevant if we want to do MPI training. The number of steps with a single replica warmup_steps: 0 diff --git a/examples/slurm.cmd b/examples/slurm.cmd index 3dcae884..2641707f 100644 --- a/examples/slurm.cmd +++ b/examples/slurm.cmd @@ -11,7 +11,7 @@ # Each node = 2.4 GHz Xeon Broadwell E5-2680 v4 + 4x 1328 MHz P100 GPU module load anaconda3 -conda activate my_env +conda activate pplori module load cudatoolkit module load cudnn module load openmpi/cuda-8.0/intel-17.0/3.0.0/64 diff --git a/plasma/conf_parser.py b/plasma/conf_parser.py index 9c338fb7..e32a9685 100644 --- a/plasma/conf_parser.py +++ b/plasma/conf_parser.py @@ -136,6 +136,7 @@ def parameters(input_file): sig.d3d, params['paths']['shot_list_dir'], ['d3d_clear_data_avail.txt', 'd3d_disrupt_data_avail.txt'], 'd3d data since shot 125500') + d3d_full_new = ShotListFiles(sig.d3d,params['paths']['shot_list_dir'],['shots_since_2016_clear.txt','shots_since_2016_disrupt.txt'],'d3d data since shot 125500') d3d_jenkins = ShotListFiles( sig.d3d, params['paths']['shot_list_dir'], ['jenkins_d3d_clear.txt', 'jenkins_d3d_disrupt.txt'], @@ -224,6 +225,27 @@ def parameters(input_file): 'etemp_profile': sig.etemp_profile, 'edens_profile': sig.edens_profile, } + elif params['paths']['data'] == 'd3d_data_new': + params['paths']['shot_files'] = [d3d_full_new] + params['paths']['shot_files_test'] = [] + params['paths']['use_signals_dict'] = { + 'q95': sig.q95, + 'li': sig.li, + 'ip': sig.ip, + 'lm': sig.lm, + 'betan': sig.betan, + 'energy': sig.energy, + 'dens': sig.dens, + 'pradcore': sig.pradcore, + 'pradedge': sig.pradedge, + 'pin': sig.pin, + 'torquein': sig.torquein, + 'ipdirect': sig.ipdirect, + 'iptarget': sig.iptarget, + 'iperr': sig.iperr, + 'etemp_profile': sig.etemp_profile, + 'edens_profile': sig.edens_profile, + } elif params['paths']['data'] == 'd3d_data_1D': params['paths']['shot_files'] = [d3d_full] params['paths']['shot_files_test'] = [] diff --git a/plasma/models/builder.py b/plasma/models/builder.py index e47f6588..ffce7632 100644 --- a/plasma/models/builder.py +++ b/plasma/models/builder.py @@ -159,8 +159,12 @@ def slicer_output_shape(input_shape, indices): # slicer_output_shape(s,indices_0d))(pre_rnn_input) pre_rnn_1D = Reshape((num_1D, len(indices_1d)//num_1D))(pre_rnn_1D) pre_rnn_1D = Permute((2, 1))(pre_rnn_1D) - - for i in range(model_conf['num_conv_layers']): + if 'simple_conv' in model_conf.keys() and model_conf['simple_conv']==True: + for i in range(model_conf['num_conv_layers']): + pre_rnn_1D = Convolution1D(num_conv_filters,size_conv_filters,padding='valid',activation='relu') (pre_rnn_1D) + pre_rnn_1D = MaxPooling1D(pool_size) (pre_rnn_1D) + else: + for i in range(model_conf['num_conv_layers']): div_fac = 2**i '''The first conv layer learns `num_conv_filters//div_fac` filters (aka kernels), each of size @@ -254,9 +258,25 @@ def slicer_output_shape(input_shape, indices): activity_regularizer=l2(dense_regularization))(pre_rnn) pre_rnn_model = Model(inputs=pre_rnn_input, outputs=pre_rnn) + from mpi4py import MPI + comm = MPI.COMM_WORLD + task_index = comm.Get_rank() + if not predict and task_index==0 : + print('Printingout pre_rnn model.........') + fr=open('model_architecture.log','w') + ori=sys.stdout + sys.stdout=fr + pre_rnn_model.summary() + sys.stdout=ori + fr.close() # pre_rnn_model.summary() x_input = Input(batch_shape=batch_input_shape) - x_in = TimeDistributed(pre_rnn_model)(x_input) + if num_1D>0 or ( + 'extra_dense_input' in model_conf.keys() + and model_conf['extra_dense_input']): + x_in = TimeDistributed(pre_rnn_model)(x_input) + else: + x_in=x_input for _ in range(model_conf['rnn_layers']): x_in = rnn_model( rnn_size, return_sequences=return_sequences, diff --git a/plasma/models/mpi_runner.py b/plasma/models/mpi_runner.py index 30e0caed..32eb1715 100644 --- a/plasma/models/mpi_runner.py +++ b/plasma/models/mpi_runner.py @@ -311,6 +311,9 @@ def train_on_batch_and_get_deltas(self, X_batch, Y_batch, verbose=False): ''' weights_before_update = self.model.get_weights() + return_sequences = self.conf['model']['return_sequences'] + if not return_sequences: + Y_batch=Y_batch[:,-1,:] loss = self.model.train_on_batch(X_batch, Y_batch) weights_after_update = self.model.get_weights() @@ -861,6 +864,12 @@ def mpi_train(conf, shot_list_train, shot_list_validate, loader, histogram_freq=1, write_graph=True, write_grads=write_grads) tensorboard.set_model(mpi_model.model) + fr=open('model_architecture.log','a') + ori=sys.stdout + sys.stdout=fr + mpi_model.model.summary() + sys.stdout=ori + fr.close() mpi_model.model.summary() if g.task_index == 0: diff --git a/plasma/utils/hashing.py b/plasma/utils/hashing.py index b5c5a1d4..49463875 100644 --- a/plasma/utils/hashing.py +++ b/plasma/utils/hashing.py @@ -74,9 +74,7 @@ def myhash_signals(signals): (descriptions), concatenate their hexadecimal hashes (converted to base-10), and hash the resulting str ''' - return myhash(''.join(tuple(map(lambda x: "{}".format(x.__hash__()), - sorted(signals))))) - + return myhash(''.join((map(lambda x: x.description, sorted(signals))))) def myhash(x): ''' From 176ea6bd8ecc56fbe9989f3bd0243fb20965d035 Mon Sep 17 00:00:00 2001 From: Ge Dong Date: Wed, 13 Nov 2019 13:49:03 -0500 Subject: [PATCH 2/4] modified: plasma/models/mpi_runner.py --- plasma/models/mpi_runner.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/plasma/models/mpi_runner.py b/plasma/models/mpi_runner.py index 32eb1715..a8431663 100644 --- a/plasma/models/mpi_runner.py +++ b/plasma/models/mpi_runner.py @@ -13,6 +13,7 @@ from plasma.conf import conf from mpi4py import MPI from pkg_resources import parse_version, get_distribution +import random ''' ######################################################### This file trains a deep learning model to predict @@ -466,7 +467,18 @@ def build_callbacks(self, conf, callbacks_list): pass return cbks.CallbackList(callbacks) - + def add_noise(self,X): + if self.conf['training']['noise']==True: + prob=0.05 + else: + prob=self.conf['training']['noise'] + for i in range(0,X.shape[0]): + for j in range(0,X.shape[2]): + a=random.randint(0,100) + if a Date: Wed, 13 Nov 2019 13:49:49 -0500 Subject: [PATCH 3/4] new file: plasma/models/torch_runner_dist.py --- plasma/models/torch_runner_dist.py | 489 +++++++++++++++++++++++++++++ 1 file changed, 489 insertions(+) create mode 100644 plasma/models/torch_runner_dist.py diff --git a/plasma/models/torch_runner_dist.py b/plasma/models/torch_runner_dist.py new file mode 100644 index 00000000..8db59f2a --- /dev/null +++ b/plasma/models/torch_runner_dist.py @@ -0,0 +1,489 @@ +from __future__ import print_function +import matplotlib +matplotlib.use('Agg') +import matplotlib.pyplot as plt + +import numpy as np +import sys +if sys.version_info[0] < 3: + from itertools import imap + +#leading to import errors: +#from hyperopt import hp, STATUS_OK +#from hyperas.distributions import conditional + +import time +import datetime +import os +from functools import partial +import pathos.multiprocessing as mp + +from plasma.conf import conf +from plasma.models.loader import Loader, ProcessGenerator +from plasma.utils.performance import PerformanceAnalyzer +from plasma.utils.evaluation import * +from plasma.utils.downloading import makedirs_process_safe + + +import hashlib + +import torch +import torch.nn as nn +from torch.autograd import Variable +import torch.optim as opt +from torch.nn.utils import weight_norm + +model_filename = 'torch_model.pt' + +class FTCN(nn.Module): + def __init__(self,n_scalars,n_profiles,profile_size,layer_sizes_spatial, + kernel_size_spatial,linear_size,output_size, + num_channels_tcn,kernel_size_temporal,dropout=0.1): + super(FTCN, self).__init__() + self.lin = InputBlock(n_scalars, n_profiles,profile_size, layer_sizes_spatial, kernel_size_spatial, linear_size, dropout) + self.input_layer = TimeDistributed(self.lin,batch_first=True) + self.tcn = TCN(linear_size, output_size, num_channels_tcn , kernel_size_temporal, dropout) + self.model = nn.Sequential(self.input_layer,self.tcn) + + def forward(self,x): + return self.model(x) + + +class InputBlock(nn.Module): + def __init__(self, n_scalars, n_profiles,profile_size, layer_sizes, kernel_size, linear_size, dropout=0.2): + super(InputBlock, self).__init__() + self.pooling_size = 2 + self.n_scalars = n_scalars + self.n_profiles = n_profiles + self.profile_size = profile_size + self.conv_output_size = profile_size + if self.n_profiles == 0: + self.net = None + self.conv_output_size = 0 + else: + self.layers = [] + for (i,layer_size) in enumerate(layer_sizes): + if i == 0: + input_size = n_profiles + else: + input_size = layer_sizes[i-1] + self.layers.append(weight_norm(nn.Conv1d(input_size, layer_size, kernel_size))) + self.layers.append(nn.ReLU()) + self.conv_output_size = calculate_conv_output_size(self.conv_output_size,0,1,1,kernel_size) + self.layers.append(nn.MaxPool1d(kernel_size=self.pooling_size)) + self.conv_output_size = calculate_conv_output_size(self.conv_output_size,0,1,self.pooling_size,self.pooling_size) + self.layers.append(nn.Dropout2d(dropout)) + self.net = nn.Sequential(*self.layers) + self.conv_output_size = self.conv_output_size*layer_sizes[-1] + self.linear_layers = [] + + print("Final feature size = {}".format(self.n_scalars + self.conv_output_size)) + self.linear_layers.append(nn.Linear(self.conv_output_size+self.n_scalars,linear_size)) + self.linear_layers.append(nn.ReLU()) + self.linear_layers.append(nn.Linear(linear_size,linear_size)) + self.linear_layers.append(nn.ReLU()) + print("Final output size = {}".format(linear_size)) + self.linear_net = nn.Sequential(*self.linear_layers) + +# def init_weights(self): +# self.conv1.weight.data.normal_(0, 0.01) +# self.conv2.weight.data.normal_(0, 0.01) +# if self.downsample is not None: +# self.downsample.weight.data.normal_(0, 0.01) + + def forward(self, x): + if self.n_profiles == 0: + full_features = x#x_scalars + else: + if self.n_scalars == 0: + x_profiles = x + else: + x_scalars = x[:,:self.n_scalars] + x_profiles = x[:,self.n_scalars:] + x_profiles = x_profiles.contiguous().view(x.size(0),self.n_profiles,self.profile_size) + profile_features = self.net(x_profiles).view(x.size(0),-1) + if self.n_scalars == 0: + full_features = profile_features + else: + full_features = torch.cat([x_scalars,profile_features],dim=1) + + out = self.linear_net(full_features) +# out = self.net(x) +# res = x if self.downsample is None else self.downsample(x) + return out + + +def calculate_conv_output_size(L_in,padding,dilation,stride,kernel_size): + return int(np.floor((L_in + 2*padding - dilation*(kernel_size-1) - 1)*1.0/stride + 1)) + + +class Chomp1d(nn.Module): + def __init__(self, chomp_size): + super(Chomp1d, self).__init__() + self.chomp_size = chomp_size + + def forward(self, x): + return x[:, :, :-self.chomp_size].contiguous() + + +class TemporalBlock(nn.Module): + def __init__(self, n_inputs, n_outputs, kernel_size, stride, dilation, padding, dropout=0.2): + super(TemporalBlock, self).__init__() + self.conv1 = weight_norm(nn.Conv1d(n_inputs, n_outputs, kernel_size, + stride=stride, padding=padding, dilation=dilation)) + self.chomp1 = Chomp1d(padding) + self.relu1 = nn.ReLU() + self.dropout1 = nn.Dropout2d(dropout) + + self.conv2 = weight_norm(nn.Conv1d(n_outputs, n_outputs, kernel_size, + stride=stride, padding=padding, dilation=dilation)) + self.chomp2 = Chomp1d(padding) + self.relu2 = nn.ReLU() + self.dropout2 = nn.Dropout2d(dropout) + + self.net = nn.Sequential(self.conv1, self.chomp1, self.relu1, self.dropout1, + self.conv2, self.chomp2, self.relu2, self.dropout2) + self.downsample = nn.Conv1d(n_inputs, n_outputs, 1) if n_inputs != n_outputs else None + self.relu = nn.ReLU() + self.init_weights() + + def init_weights(self): + self.conv1.weight.data.normal_(0, 0.01) + self.conv2.weight.data.normal_(0, 0.01) + if self.downsample is not None: + self.downsample.weight.data.normal_(0, 0.01) + + def forward(self, x): + out = self.net(x) + res = x if self.downsample is None else self.downsample(x) + return self.relu(out + res) + +#dimensions are batch,channels,length +class TemporalConvNet(nn.Module): + def __init__(self, num_inputs, num_channels, kernel_size=2, dropout=0.2): + super(TemporalConvNet, self).__init__() + layers = [] + num_levels = len(num_channels) + for i in range(num_levels): + dilation_size = 2 ** i + in_channels = num_inputs if i == 0 else num_channels[i-1] + out_channels = num_channels[i] + layers += [TemporalBlock(in_channels, out_channels, kernel_size, stride=1, dilation=dilation_size, + padding=(kernel_size-1) * dilation_size, dropout=dropout)] + + self.network = nn.Sequential(*layers) + + def forward(self, x): + return self.network(x) + + +class TCN(nn.Module): + def __init__(self, input_size, output_size, num_channels, kernel_size, dropout): + super(TCN, self).__init__() + self.tcn = TemporalConvNet(input_size, num_channels, kernel_size, dropout=dropout) + self.linear = nn.Linear(num_channels[-1], output_size) +# self.sig = nn.Sigmoid() + + def forward(self, x): + # x needs to have dimension (N, C, L) in order to be passed into CNN + output = self.tcn(x.transpose(1, 2)).transpose(1, 2) + output = self.linear(output)#.transpose(1,2)).transpose(1,2) + return output +# return self.sig(output) + + + + +# def train(model,data_gen,lr=0.001,iters = 100): +# log_step = int(round(iters*0.1)) +# optimizer = opt.Adam(model.parameters(),lr = lr) +# model.train() +# total_loss = 0 +# count = 0 +# loss_fn = nn.MSELoss(size_average=False) +# for i in range(iters): +# x_,y_,mask_ = data_gen() +# # print(y) +# x, y, mask = Variable(torch.from_numpy(x_).float()), Variable(torch.from_numpy(y_).float()),Variable(torch.from_numpy(mask_).byte()) +# # print(y) +# optimizer.zero_grad() +# # output = model(x.unsqueeze(0)).squeeze(0) +# output = model(x)#.unsqueeze(0)).squeeze(0) +# output_masked = torch.masked_select(output,mask) +# y_masked = torch.masked_select(y,mask) +# # print(y.shape,output.shape) +# loss = loss_fn(output_masked,y_masked) +# total_loss += loss.data[0] +# count += output.size(0) + +# # if args.clip > 0: +# # torch.nn.utils.clip_grad_norm(model.parameters(), args.clip) +# loss.backward() +# optimizer.step() +# if i > 0 and i % log_step == 0: +# cur_loss = total_loss / count +# print("Epoch {:2d} | lr {:.5f} | loss {:.5f}".format(0,lr, cur_loss)) +# total_loss = 0.0 +# count = 0 + + + + + + + + +class TimeDistributed(nn.Module): + def __init__(self, module,is_half=False, batch_first=False): + super(TimeDistributed, self).__init__() + self.module = module + self.batch_first = batch_first + self.is_half=is_half + def forward(self, x): + + if len(x.size()) <= 2: + return self.module(x) + x=x.float() + + # Squash samples and timesteps into a single axis + x_reshape = x.contiguous().view(-1, x.size(-1)) # (samples * timesteps, input_size) + + y = self.module(x_reshape) + + # We have to reshape Y + if self.batch_first: + y = y.contiguous().view(x.size(0), -1, y.size(-1)) # (samples, timesteps, output_size) + else: + y = y.view(-1, x.size(1), y.size(-1)) # (timesteps, samples, output_size) + if self.is_half: + y=y.half() + return y + + + + + +def build_torch_model(conf): + dropout = conf['model']['dropout_prob'] +# dim = 10 + + # lin = nn.Linear(input_size,intermediate_dim) + n_scalars, n_profiles, profile_size = get_signal_dimensions(conf) + print('n_scalars,n_profiles,profile_size=',n_scalars,n_profiles,profile_size) + dim = n_scalars+n_profiles*profile_size + input_size = dim + output_size = 1 + # intermediate_dim = 15 + + layer_sizes_spatial = conf['model']['layer_size_spatial']#[40,20,20] + kernel_size_spatial = conf['model']['kernel_size_spatial'] + linear_size = 5 + + num_channels_tcn = [conf['model']['tcn_hidden']]*conf['model']['tcn_layers']#[3]*5 + kernel_size_temporal = conf['model']['kernel_size_temporal'] #3 + model = FTCN(n_scalars,n_profiles,profile_size,layer_sizes_spatial, + kernel_size_spatial,linear_size,output_size,num_channels_tcn, + kernel_size_temporal,dropout) +# model.cuda() +# para_model=nn.DataParallel(model) + return model + +def get_signal_dimensions(conf): + #make sure all 1D indices are contiguous in the end! + use_signals = conf['paths']['use_signals'] + n_scalars = 0 + n_profiles = 0 + profile_size = 0 + is_1D_region = use_signals[0].num_channels > 1#do we have any 1D indices? + for sig in use_signals: + num_channels = sig.num_channels + if num_channels > 1: + profile_size = num_channels + n_profiles += 1 + is_1D_region = True + else: + assert(not is_1D_region), "make sure all use_signals are ordered such that 1D signals come last!" + assert(num_channels == 1) + n_scalars += 1 + is_1D_region = False + return n_scalars,n_profiles,profile_size + +def apply_model_to_np(model,x): + # return model(Variable(torch.from_numpy(x).float()).unsqueeze(0)).squeeze(0).data.numpy() + return model(Variable(torch.from_numpy(x).float())).data.numpy() + + + +def make_predictions(conf,shot_list,loader,custom_path=None): + generator = loader.inference_batch_generator_full_shot(shot_list) + inference_model = build_torch_model(conf) + + if custom_path == None: + model_path = get_model_path(conf) + else: + model_path = custom_path + print('model-path is: ',model_path) + a=torch.load(model_path) + print('tried loading model path',len(a)) + inference_model.load_state_dict(torch.load(model_path)) + #shot_list = shot_list.random_sublist(10) + + y_prime = [] + y_gold = [] + disruptive = [] + num_shots = len(shot_list) + + while True: + x,y,mask,disr,lengths,num_so_far,num_total = next(generator) + #x, y, mask = Variable(torch.from_numpy(x_).float()), Variable(torch.from_numpy(y_).float()),Variable(torch.from_numpy(mask_).byte()) + output = apply_model_to_np(inference_model,x) + for batch_idx in range(x.shape[0]): + curr_length = lengths[batch_idx] + y_prime += [output[batch_idx,:curr_length,0]] + y_gold += [y[batch_idx,:curr_length,0]] + disruptive += [disr[batch_idx]] + if len(disruptive) >= num_shots: + y_prime = y_prime[:num_shots] + y_gold = y_gold[:num_shots] + disruptive = disruptive[:num_shots] + break + return y_prime,y_gold,disruptive + +def make_predictions_and_evaluate_gpu(conf,shot_list,loader,custom_path = None): + y_prime,y_gold,disruptive = make_predictions(conf,shot_list,loader,custom_path) + analyzer = PerformanceAnalyzer(conf=conf) + roc_area = analyzer.get_roc_area(y_prime,y_gold,disruptive) + loss = get_loss_from_list(y_prime,y_gold,conf['data']['target']) + return y_prime,y_gold,disruptive,roc_area,loss + + +def get_model_path(conf): + return conf['paths']['model_save_path'] + model_filename #save_prepath + model_filename + + +def train_epoch(model,data_gen,optimizer,loss_fn): + loss = 0 + total_loss = 0 + num_so_far = 0 + x_,y_,mask_,num_so_far_start,num_total = next(data_gen) + num_so_far = num_so_far_start + step = 0 + while True: + # print(y) + if conf['data']['floatx'] == 'float16': + x, y, mask = Variable(torch.from_numpy(x_).half()), Variable(torch.from_numpy(y_).half()),Variable(torch.from_numpy(mask_).byte()) + else: + x, y, mask = Variable(torch.from_numpy(x_).float()), Variable(torch.from_numpy(y_).float()),Variable(torch.from_numpy(mask_).byte()) + # print(y) + # x,y,mask=x.cuda(),y.cuda(),mask.cuda() + optimizer.zero_grad() + # output = model(x.unsqueeze(0)).squeeze(0) + output = model(x)#.unsqueeze(0)).squeeze(0) + output_masked = torch.masked_select(output,mask) + y_masked = torch.masked_select(y,mask) + print('OUTPUTSHAPING::') + print('y.shape:',y.shape) + print('output.shape:',output.shape) + loss = loss_fn(output_masked,y_masked) + total_loss += loss.data.item() + # count += output.size(0) + + # if args.clip > 0: + # torch.nn.utils.clip_grad_norm(model.parameters(), args.clip) + loss.backward() + optimizer.step() + step += 1 + print("[{}] [{}/{}] loss: {:.3f}, ave_loss: {:.3f}".format(step,num_so_far-num_so_far_start,num_total,loss.data.item(),total_loss/step)) + if num_so_far-num_so_far_start >= num_total: + break + x_,y_,mask_,num_so_far,num_total = next(data_gen) + return step,loss.data.item(),total_loss,num_so_far,1.0*num_so_far/num_total + + +def train(conf,shot_list_train,shot_list_validate,loader): + + np.random.seed(1) + + #data_gen = ProcessGenerator(partial(loader.training_batch_generator_full_shot_partial_reset,shot_list=shot_list_train)()) + data_gen = partial(loader.training_batch_generator_full_shot_partial_reset,shot_list=shot_list_train)() + + print('validate: {} shots, {} disruptive'.format(len(shot_list_validate),shot_list_validate.num_disruptive())) + print('training: {} shots, {} disruptive'.format(len(shot_list_train),shot_list_train.num_disruptive())) + + loader.set_inference_mode(False) + + train_model = build_torch_model(conf) + + if conf['data']['floatx'] == 'float16': + train_model.half() + #load the latest epoch we did. Returns -1 if none exist yet + # e = specific_builder.load_model_weights(train_model) + + num_epochs = conf['training']['num_epochs'] + patience = conf['callbacks']['patience'] + lr_decay = conf['model']['lr_decay'] + batch_size = conf['training']['batch_size'] + lr = conf['model']['lr'] + clipnorm = conf['model']['clipnorm'] + e = 0 + # warmup_steps = conf['model']['warmup_steps'] + # num_batches_minimum = conf['training']['num_batches_minimum'] + + # if 'adam' in conf['model']['optimizer']: + # optimizer = MPIAdam(lr=lr) + # elif conf['model']['optimizer'] == 'sgd' or conf['model']['optimizer'] == 'tf_sgd': + # optimizer = MPISGD(lr=lr) + # elif 'momentum_sgd' in conf['model']['optimizer']: + # optimizer = MPIMomentumSGD(lr=lr) + # else: + # print("Optimizer not implemented yet") + # exit(1) + + + + if conf['callbacks']['mode'] == 'max': + best_so_far = -np.inf + cmp_fn = max + else: + best_so_far = np.inf + cmp_fn = min + optimizer = opt.Adam(train_model.parameters(),lr = lr) + scheduler = opt.lr_scheduler.ExponentialLR(optimizer,lr_decay) + train_model.train() + not_updated = 0 + total_loss = 0 + count = 0 + loss_fn = nn.MSELoss(size_average=True) + model_path = get_model_path(conf) + makedirs_process_safe(os.path.dirname(model_path)) + while e < num_epochs-1: + print('{} epochs left to go'.format(num_epochs - 1 - e)) + scheduler.step() + print('\nTraining Epoch {}/{}'.format(e,num_epochs),'starting at',datetime.datetime.now()) + (step,ave_loss,curr_loss,num_so_far,effective_epochs) = train_epoch(train_model,data_gen,optimizer,loss_fn) + e = effective_epochs + print('\nFiniehsed Training'.format(e,num_epochs),'finishing at',datetime.datetime.now()) + loader.verbose=False #True during the first iteration + # if task_index == 0: + # specific_builder.save_model_weights(train_model,int(round(e))) + torch.save(train_model.state_dict(),model_path) + _,_,_,roc_area,loss = make_predictions_and_evaluate_gpu(conf,shot_list_validate,loader) + + best_so_far = cmp_fn(roc_area,best_so_far) + + stop_training = False + print('=========Summary======== for epoch{}'.format(step)) + print('Training Loss numpy: {:.3e}'.format(ave_loss)) + print('Validation Loss: {:.3e}'.format(loss)) + print('Validation ROC: {:.4f}'.format(roc_area)) + + if best_so_far != roc_area: #only save model weights if quantity we are tracking is improving + print("No improvement, still saving model") + not_updated += 1 + else: + print("Saving model") + # specific_builder.delete_model_weights(train_model,int(round(e))) + if not_updated > patience: + print("Stopping training due to early stopping") + break + From 35806ec935d9a7d8fdd048810cfac9c70fe58452 Mon Sep 17 00:00:00 2001 From: Ge Dong Date: Wed, 20 Nov 2019 14:07:46 -0500 Subject: [PATCH 4/4] modified: data/signals.py modified: plasma/conf_parser.py modified: plasma/models/builder.py modified: plasma/preprocessor/preprocess.py modified: plasma/primitives/data.py modified: plasma/primitives/shots.py modified: plasma/utils/batch_jobs.py --- data/signals.py | 80 ++++++++++++++++++++++++------- plasma/conf_parser.py | 43 +++++++++++++++-- plasma/models/builder.py | 25 +++++++++- plasma/preprocessor/preprocess.py | 3 ++ plasma/primitives/data.py | 6 +-- plasma/primitives/shots.py | 44 ++++++++++++++--- plasma/utils/batch_jobs.py | 18 +++---- 7 files changed, 180 insertions(+), 39 deletions(-) diff --git a/data/signals.py b/data/signals.py index 4c1c9b8e..f78ce566 100644 --- a/data/signals.py +++ b/data/signals.py @@ -174,6 +174,18 @@ def fetch_nstx_data(signal_path, shot_num, c): mapping_range=(0, 1), num_channels=profile_num_channels, data_avail_tolerances=[0.05, 0.02]) +etemp_profilet = ProfileSignal( + "Electron temperature profile tol", + ["ppf/hrts/te", "ZIPFIT01/PROFILES.ETEMPFIT"], [jet, d3d], + mapping_paths=["ppf/hrts/rho", None], causal_shifts=[0, 10], + mapping_range=(0, 1), num_channels=profile_num_channels, + data_avail_tolerances=[0.05, 0.029]) +edens_profilet = ProfileSignal( + "Electron density profile tol", + ["ppf/hrts/ne", "ZIPFIT01/PROFILES.EDENSFIT"], [jet, d3d], + mapping_paths=["ppf/hrts/rho", None], causal_shifts=[0, 10], + mapping_range=(0, 1), num_channels=profile_num_channels, + data_avail_tolerances=[0.05, 0.029]) # d3d only: # etemp_profile = ProfileSignal( # "Electron temperature profile", ["ZIPFIT01/PROFILES.ETEMPFIT"], [d3d], @@ -262,26 +274,45 @@ def fetch_nstx_data(signal_path, shot_num, c): "q95 safety factor", ['ppf/efit/q95', "EFIT01/RESULTS.AEQDSK.Q95"], [jet, d3d], causal_shifts=[15, 10], normalize=False, data_avail_tolerances=[0.03, 0.02]) +q95t = Signal( + "q95 safety factor tol", ['ppf/efit/q95', "EFIT01/RESULTS.AEQDSK.Q95"], + [jet, d3d], causal_shifts=[15, 10], normalize=False, + data_avail_tolerances=[0.03, 0.029]) # "d3d/ipsip" was used before, ipspr15V seems to be available for a # superset of shots. ip = Signal("plasma current", ["jpf/da/c2-ipla", "d3d/ipspr15V"], [jet, d3d], is_ip=True) + +ipt = Signal("plasma current tol", ["jpf/da/c2-ipla", "d3d/ipspr15V"], + [jet, d3d], is_ip=True,data_avail_tolerances=[0.029, 0.029]) iptarget = Signal("plasma current target", ["d3d/ipsiptargt"], [d3d]) +iptargett = Signal("plasma current target tol", ["d3d/ipsiptargt"], [d3d],data_avail_tolerances=[0.029]) iperr = Signal("plasma current error", ["d3d/ipeecoil"], [d3d]) +iperrt = Signal("plasma current error tol", ["d3d/ipeecoil"], [d3d],data_avail_tolerances=[0.029]) li = Signal("internal inductance", ["jpf/gs/bl-liout'], [jet]) +pradtott = Signal("Radiated Power tol", ['jpf/db/b5r-ptot>out'], [jet],data_avail_tolerances=[0.029]) # pradtot = Signal("Radiated Power", ['jpf/db/b5r-ptot>out', # 'd3d/'+r'\prad_tot'], [jet,d3d]) # pradcore = ChannelSignal("Radiated Power Core", [ 'd3d/' + r'\bol_l15_p'] @@ -294,17 +325,27 @@ def fetch_nstx_data(signal_path, shot_num, c): pradedge = ChannelSignal("Radiated Power Edge", ['ppf/bolo/kb5h/channel10', 'd3d/' + r'\bol_l03_p'], [jet, d3d]) +pradcoret = ChannelSignal("Radiated Power Core tol", + ['ppf/bolo/kb5h/channel14', 'd3d/' + r'\bol_l15_p'], + [jet, d3d],data_avail_tolerances=[0.029, 0.029]) +pradedget = ChannelSignal("Radiated Power Edge tol" , + ['ppf/bolo/kb5h/channel10', 'd3d/' + r'\bol_l03_p'], + [jet, d3d],data_avail_tolerances=[0.029, 0.029]) # pechin = Signal("ECH input power, not always on", ['d3d/pcechpwrf'], [d3d]) pechin = Signal("ECH input power, not always on", ['RF/ECH.TOTAL.ECHPWRC'], [d3d]) +pechint = Signal("ECH input power, not always on tol", + ['RF/ECH.TOTAL.ECHPWRC'], [d3d],data_avail_tolerances=[0.029]) # betan = Signal("Normalized Beta", ['jpf/gs/bl-bndia= shot.machine.current_threshold)[0] if len(region) == 0: print('shot {} has no current'.format(shot.number)) - return None, None, False + return None, sig.shape, False first_idx = region[0] last_idx = region[-1] # add 50 ms to cover possible disruption event @@ -126,7 +126,7 @@ def load_data(self, prepath, shot, dtype='float32'): print( 'Signal {}, shot {} contains no data'.format( self.description, shot.number)) - return None, None, False + return None, sig.shape, False # make sure data doesn't contain nan if np.any(np.isnan(t)) or np.any(np.isnan(sig)): @@ -134,7 +134,7 @@ def load_data(self, prepath, shot, dtype='float32'): 'Signal {}, shot {} contains NAN'.format( self.description, shot.number)) - return None, None, False + return None, sig.shape, False return t, sig, True diff --git a/plasma/primitives/shots.py b/plasma/primitives/shots.py index cafb76ed..21c53049 100644 --- a/plasma/primitives/shots.py +++ b/plasma/primitives/shots.py @@ -397,15 +397,34 @@ def get_signals_and_times_from_file(self, conf): # t_thresh = -1 signal_arrays = [] time_arrays = [] - + garbage=False # disruptive = self.t_disrupt >= 0 - + if conf['paths']['data'] =='d3d_data_garbage': + garbage=True + non_valid_signals=0 signal_prepath = conf['paths']['signal_prepath'] + if self.number in [127613,129423,125726,126662]: + return None, None, None, None, False for (i, signal) in enumerate(self.signals): - t, sig, valid_signal = signal.load_data( + if isinstance(signal_prepath,list): + for prepath in signal_prepath: + t, sig, valid_signal = signal.load_data( + prepath, self, conf['data']['floatx']) + if valid_signal: + break + else: + t, sig, valid_signal = signal.load_data( signal_prepath, self, conf['data']['floatx']) if not valid_signal: - return None, None, None, None, False + if signal.is_ip or ('q95' in signal.description) or garbage==False or sig==None: + ########################Not allow a shot if it is missing plasma current information, or q95 is missing + return None, None, None, None, False + else: + t=np.arange(0,20,0.001) + sig=np.zeros((t.shape[0],sig[1])) + non_valid_signals+=1 + signal_arrays.append(sig) + time_arrays.append(t) else: assert(len(sig.shape) == 2) assert(len(t.shape) == 1) @@ -413,18 +432,25 @@ def get_signals_and_times_from_file(self, conf): t_min = max(t_min, np.min(t)) signal_arrays.append(sig) time_arrays.append(t) + if self.is_disruptive and self.t_disrupt > np.max(t): t_max_total = ( np.max(t) + signal.get_data_avail_tolerance( self.machine) ) if (self.t_disrupt > t_max_total): - print('Shot {}: disruption event '.format(self.number), + if garbage==False: + print('Shot {}: disruption event '.format(self.number), 'is not contained in valid time region of ', 'signal {} by {}s, omitting.'.format( self.number, signal, self.t_disrupt - np.max(t))) - valid = False + valid = False + else: + non_valid_signals+=1 + t=np.arange(0,20,0.001) + sig=np.zeros((t.shape[0],sig.shape[1])) + #Setting the entire channel to zero to prevent any peeking into possible disruptions from this early ended channel else: t_max = np.max( t) + signal.get_data_avail_tolerance(self.machine) @@ -447,6 +473,12 @@ def get_signals_and_times_from_file(self, conf): if self.is_disruptive: assert(self.t_disrupt <= t_max or not valid) t_max = self.t_disrupt + if non_valid_signals>3: + print('Shot {}: '.format(self.number), + 'is REALLY garbabge (More than three channels are contaminated......) ') + + #Omit a shot if more than 3 channels are bad channels.... + valid=False return time_arrays, signal_arrays, t_min, t_max, valid diff --git a/plasma/utils/batch_jobs.py b/plasma/utils/batch_jobs.py index 292964c8..6010a04e 100644 --- a/plasma/utils/batch_jobs.py +++ b/plasma/utils/batch_jobs.py @@ -124,8 +124,8 @@ def create_slurm_script( idx, executable_name, use_mpi, - env_name="frnn", - env_type="anaconda"): + env_name="ppltest", + env_type="anaconda3"): filename = "run_{}_nodes.cmd".format(num_nodes) filepath = subdir+filename # user = getpass.getuser() @@ -134,14 +134,14 @@ def create_slurm_script( for line in sbatch_header: f.write(line) f.write('module load '+env_type+'\n') - f.write('source activate '+env_name+'\n') + f.write('conda activate '+env_name+'\n') + f.write(( + 'module load cudatoolkit \n')) f.write(( - 'module load cudatoolkit/8.0 cudnn/cuda-8.0/6.0 ' - 'openmpi/cuda-8.0/intel-17.0/2.1.0/64\n')) - f.write('module load intel/17.0/64/17.0.5.239 intel-mkl/2017.3/4/64\n') + 'module load openmpi/gcc/3.1.4/64 \n')) + f.write(( + 'module load hdf5/gcc/openmpi-3.1.4/1.10.5 \n')) # f.write('rm -f /tigress/{}/model_checkpoints/*.h5\n'.format(user)) - f.write('cd {}\n'.format(subdir)) - f.write('export OMPI_MCA_btl=\"tcp,self,sm\"\n') f.write('srun python {}\n'.format(executable_name)) f.write('echo "done."') @@ -183,7 +183,7 @@ def create_slurm_header(num_nodes, use_mpi, idx): assert(num_nodes == 1) lines = [] lines.append('#!/bin/bash\n') - lines.append('#SBATCH -t 20:00:00\n') + lines.append('#SBATCH -t 24:00:00\n') lines.append('#SBATCH -N '+str(num_nodes)+'\n') if use_mpi: lines.append('#SBATCH --ntasks-per-node=4\n')