Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
9103fde
DRR - Cpptraj: Initial attempt to add '-ya' command line flag for inp…
Jun 18, 2018
94f4d7b
DRR - Cpptraj: Use ArgList to process command line args instead of ar…
Jun 18, 2018
eaa1cb5
DRR - Cpptraj: Change automatic detection of topology and input files…
Jun 18, 2018
77687b2
DRR - Cpptraj: Add command line test.
Jun 18, 2018
8de83e6
DRR - Cpptraj: Simplify NotFinalArg()
Jun 18, 2018
f5f018f
DRR - Cpptraj: Fix memory leaks during automatic detection of traj/pa…
Jun 18, 2018
dfc8dbf
DRR - Cpptraj: With MPI, only have master autodetect files on command…
Jun 18, 2018
05f87f2
DRR - Cpptraj: Test is good up to 8 threads
Jun 18, 2018
60095ec
DRR - Cpptraj: Correct some references to MPI threads to MPI processe…
Jun 18, 2018
d3f760b
DRR - Cpptraj: Middle version number bump; command line functionality…
Jun 18, 2018
4796bab
DRR - Cpptraj: Make resizing of trajectory arguments a separate funct…
Jun 18, 2018
f96e7c1
DRR - Cpptraj: Process '-xa' flags.
Jun 18, 2018
50e23c8
DRR - Cpptraj: Hide debug info. Print which trajin/trajout arguments …
Jun 18, 2018
010fde2
DRR - Cpptraj: Test output args from command line
Jun 18, 2018
11975ca
DRR - Cpptraj: Test printing host name. Hopefully this works on the w…
Jun 18, 2018
0f3eb1a
DRR - Cpptraj: I think I need this include on windows for HostName() …
Jun 18, 2018
c073615
DRR - Cpptraj: Decided the last number should be incremented instead …
Jun 19, 2018
65dbec1
DRR - Cpptraj: Fix up help text.
Jun 19, 2018
9bf2e8c
DRR - Cpptraj: Warn if less than 3 atoms selected for RMSD, and skip …
Jun 19, 2018
3a1cb40
DRR - Cpptraj: Fix tests in parallel at higher process counts (#599).…
Jun 19, 2018
81d80ad
DRR - Cpptraj: Only warn about assuming file contains cpptraj input i…
Jun 19, 2018
22d30e6
DRR - Cpptraj: Add command line wildcard test
Jun 19, 2018
2a83d29
DRR - Cpptraj: Update test name.
Jun 19, 2018
c088e4e
DRR - Cpptraj: Remove hostname printout for now. I don't have the tim…
Jun 20, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/Action_Rmsd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,14 @@ Action::RetType Action_Rmsd::Setup(ActionSetup& setup) {
mprintf("Warning: No atoms in mask '%s'.\n", tgtMask_.MaskString());
return Action::SKIP;
}
if ( tgtMask_.Nselected() < 3 ) {
mprintf("Warning: Less than 3 atoms selected for RMSD. Cannot fully"
"Warning: populate the coordinate covariance matrix.\n");
if (debug_ == 0) {
mprintf("Warning: Skipping.\n");
return Action::SKIP;
}
}
// Allocate space for selected atoms in the frame. This will also put the
// correct masses in based on the mask.
tgtFrame_.SetupFrameFromMask(tgtMask_, setup.Top().Atoms());
Expand Down
241 changes: 175 additions & 66 deletions src/Cpptraj.cpp

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion src/Cpptraj.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ class Cpptraj {
static void Usage();
static void Intro();
static void Finalize();
static inline void AddArgs(Sarray&, ArgList const&, int&);
static inline void ResizeArgs(Sarray const&, Sarray&, const char*);
int ProcessMask(Sarray const&, Sarray const&, std::string const&, bool,bool) const;
static inline void AddFiles(Sarray&, int, char**, int&);
Mode ProcessCmdLineArgs(int, char**);
int Interactive();

Expand Down
60 changes: 30 additions & 30 deletions src/CpptrajState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ int CpptrajState::RunEnsemble() {
// In parallel only two frames needed; one for reading, one for receiving.
FramePtrArray SortedFrames( 2 );
FrameArray FrameEnsemble( 2 );
// Each thread will process one member of the ensemble, so local ensemble
// Each process will handle one member of the ensemble, so local ensemble
// size is effectively 1.
ensembleSize = 1;
# else
Expand All @@ -533,7 +533,7 @@ int CpptrajState::RunEnsemble() {
ActionEnsemble[0] = &actionList_;
for (int member = 1; member < ensembleSize; member++)
ActionEnsemble[member] = new ActionList();
// If we are on a single thread, give each member its own copy of the
// If we are on a single process, give each member its own copy of the
// current topology address. This way if topology is modified by a member,
// e.g. in strip or closest, subsequent members wont be trying to modify
// an already-modified topology.
Expand Down Expand Up @@ -739,19 +739,19 @@ int CpptrajState::RunEnsemble() {
}
#ifdef MPI
// -----------------------------------------------------------------------------
void CpptrajState::DivideFramesAmongThreads(int& my_start, int& my_stop, int& my_frames,
void CpptrajState::DivideFramesAmongProcesses(int& my_start, int& my_stop, int& my_frames,
int maxFrames, Parallel::Comm const& commIn) const
{
my_frames = commIn.DivideAmongThreads(my_start, my_stop, maxFrames);
std::vector<int> frames_per_thread( commIn.Size() );
commIn.GatherMaster(&my_frames, 1, MPI_INT, &frames_per_thread[0]);
my_frames = commIn.DivideAmongProcesses(my_start, my_stop, maxFrames);
std::vector<int> frames_per_process( commIn.Size() );
commIn.GatherMaster(&my_frames, 1, MPI_INT, &frames_per_process[0]);
// Print how many frames each rank will process.
if (commIn.Master()) {
mprintf("\nPARALLEL INFO:\n");
if (Parallel::EnsembleComm().Size() > 1)
mprintf(" %i threads per ensemble member.\n", commIn.Size());
mprintf(" %i processes per ensemble member.\n", commIn.Size());
for (int rank = 0; rank != commIn.Size(); rank++)
mprintf(" Thread %i will process %i frames.\n", rank, frames_per_thread[rank]);
mprintf(" Process %i will handle %i frames.\n", rank, frames_per_process[rank]);
}
commIn.Barrier();
if (debug_ > 0) rprintf("Start %i Stop %i Frames %i\n", my_start+1, my_stop, my_frames);
Expand All @@ -773,13 +773,13 @@ int CpptrajState::PreloadCheck(int my_start, int my_frames,
return 1;
} else if (my_frames == n_previous_frames) {
rprinterr("Error: Number of preload frames is same as number of processed frames.\n"
"Error: Try reducing the number of threads.\n");
"Error: Try reducing the number of processes.\n");
return 1;
}
if (n_previous_frames > (my_frames / 2))
rprintf("Warning: Number of preload frames is greater than half the "
"number of processed frames.\n"
"Warning: Try reducing the number of threads.\n");
"Warning: Try reducing the number of processes.\n");
rprintf("Warning: Preloading %i frames. These frames will NOT have Actions performed on them.\n",
n_previous_frames);
return 0;
Expand All @@ -805,19 +805,19 @@ int CpptrajState::RunParaEnsemble() {
int err = NAV.AddEnsembles(trajinList_.ensemble_begin(), trajinList_.ensemble_end());
if (Parallel::World().CheckError( err )) return 1;

// Divide frames among threads
// Divide frames among processes
int my_start, my_stop, my_frames;
DivideFramesAmongThreads(my_start, my_stop, my_frames, NAV.IDX().MaxFrames(), TrajComm);
// Ensure at least 1 frame per thread, otherwise some ranks could cause hangups.
DivideFramesAmongProcesses(my_start, my_stop, my_frames, NAV.IDX().MaxFrames(), TrajComm);
// Ensure at least 1 frame per process, otherwise some ranks could cause hangups.
if (my_frames > 0)
err = 0;
else {
rprinterr("Error: Thread is processing less than 1 frame. Try reducing # threads.\n");
rprinterr("Error: Process is handling less than 1 frame. Try reducing # processes.\n");
err = 1;
}
if (Parallel::World().CheckError( err )) return 1;

// Allocate DataSets in the master DataSetList based on # frames to be read by this thread.
// Allocate DataSets in the master DataSetList based on # frames to be read by this process.
DSL_.AllocateSets( my_frames );
// Any DataSets added to the DataSetList during run will need to be synced.
DSL_.SetNewSetsNeedSync( true );
Expand Down Expand Up @@ -917,9 +917,9 @@ int CpptrajState::RunParaEnsemble() {
ensembleOut_.CloseEnsembleOut();
DSL_.SetNewSetsNeedSync( false );
sync_time_.Start();
// Sync Actions to master thread
// Sync Actions to master process
actionList_.SyncActions();
// Sync data sets to master thread
// Sync data sets to master process
if (DSL_.SynchronizeData( TrajComm )) return 1;
sync_time_.Stop();
mprintf("\nACTION OUTPUT:\n");
Expand Down Expand Up @@ -968,19 +968,19 @@ int CpptrajState::RunParallel() {
if (input_traj.AddInputTraj( *traj )) { err = 1; break; }
if (TrajComm.CheckError( err )) return 1;

// Divide frames among threads.
// Divide frames among processes.
int my_start, my_stop, my_frames;
DivideFramesAmongThreads(my_start, my_stop, my_frames, input_traj.Size(), TrajComm);
// Ensure at least 1 frame per thread, otherwise some ranks could cause hangups.
DivideFramesAmongProcesses(my_start, my_stop, my_frames, input_traj.Size(), TrajComm);
// Ensure at least 1 frame per process, otherwise some ranks could cause hangups.
if (my_frames > 0)
err = 0;
else {
rprinterr("Error: Thread is processing less than 1 frame. Try reducing # threads.\n");
rprinterr("Error: Process is handling less than 1 frame. Try reducing # processes.\n");
err = 1;
}
if (TrajComm.CheckError( err )) return 1;

// Allocate DataSets in DataSetList based on # frames read by this thread.
// Allocate DataSets in DataSetList based on # frames read by this process.
DSL_.AllocateSets( my_frames );
// Any DataSets added to the DataSetList during run will need to be synced.
DSL_.SetNewSetsNeedSync( true );
Expand Down Expand Up @@ -1093,9 +1093,9 @@ int CpptrajState::RunParallel() {
trajoutList_.CloseTrajout();
DSL_.SetNewSetsNeedSync( false );
sync_time_.Start();
// Sync Actions to master thread
// Sync Actions to master process
actionList_.SyncActions();
// Sync data sets to master thread
// Sync data sets to master process
if (DSL_.SynchronizeData( TrajComm )) return 1;
sync_time_.Stop();
post_time_.Start();
Expand All @@ -1120,10 +1120,10 @@ int CpptrajState::RunSingleTrajParallel() {
// Set up single trajectory for parallel read.
Trajin* trajin = *(trajinList_.trajin_begin());
trajin->ParallelBeginTraj( Parallel::World() );
// Divide frames among threads.
// Divide frames among processes.
int total_read_frames = trajin->Traj().Counter().TotalReadFrames();
int my_start, my_stop, my_frames;
std::vector<int> rank_frames = DivideFramesAmongThreads(my_start, my_stop, my_frames,
std::vector<int> rank_frames = DivideFramesAmongProcesses(my_start, my_stop, my_frames,
total_read_frames,
Parallel::World().Size(),
Parallel::World().Rank(),
Expand All @@ -1135,7 +1135,7 @@ int CpptrajState::RunSingleTrajParallel() {
rprintf("Start and stop adjusted for offset: %i to %i\n", traj_start, traj_stop);
Parallel::World().Barrier();

// Allocate DataSets in DataSetList based on # frames read by this thread.
// Allocate DataSets in DataSetList based on # frames read by this process.
DSL_.AllocateSets( my_frames );

// ----- SETUP PHASE ---------------------------
Expand Down Expand Up @@ -1189,11 +1189,11 @@ int CpptrajState::RunSingleTrajParallel() {
mprintf("TIME: Avg. throughput= %.4f frames / second.\n",
(double)total_read_frames / frames_time.Total());
trajoutList_.CloseTrajout();
// Sync data sets to master thread
// Sync data sets to master process
Timer time_sync;
time_sync.Start();
if (DSL_.SynchronizeData( total_read_frames, rank_frames, Parallel::World() )) return 1;
// Sync Actions to master thread
// Sync Actions to master process
actionList_.SyncActions();
time_sync.Stop();
time_sync.WriteTiming(1, "Data set/actions sync");
Expand Down Expand Up @@ -1368,7 +1368,7 @@ int CpptrajState::RunAnalyses() {
# ifdef MPI
// Only master performs analyses currently.
if (Parallel::TrajComm().Size() > 1)
mprintf("Warning: Analysis does not currently use multiple MPI threads.\n");
mprintf("Warning: Analysis does not currently use multiple MPI processes.\n");
if (Parallel::TrajComm().Master())
# endif
err = analysisList_.DoAnalyses();
Expand Down
2 changes: 1 addition & 1 deletion src/CpptrajState.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class CpptrajState {
int RunNormal();
int RunEnsemble();
# ifdef MPI
void DivideFramesAmongThreads(int&, int&, int&, int, Parallel::Comm const&) const;
void DivideFramesAmongProcesses(int&, int&, int&, int, Parallel::Comm const&) const;
int PreloadCheck(int, int, int&, int&) const;
int RunParallel();
int RunParaEnsemble();
Expand Down
2 changes: 1 addition & 1 deletion src/Exec_CrdAction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ Exec::RetType Exec_CrdAction::Execute(CpptrajState& State, ArgList& argIn) {
//rprintf("DEBUG: About to create new comm, ID= %i\n", ID);
trajComm_ = Parallel::World().Split( ID );
if (ID != MPI_UNDEFINED) {
mprintf("Warning: '%s' command does not yet use multiple MPI threads.\n", argIn.Command());
mprintf("Warning: '%s' command does not yet use multiple MPI processes.\n", argIn.Command());
ret = ProcessArgs(State, argIn);
if (ret != CpptrajState::OK)
err = 1;
Expand Down
2 changes: 1 addition & 1 deletion src/Exec_CrdOut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Exec::RetType Exec_CrdOut::Execute(CpptrajState& State, ArgList& argIn) {
//rprintf("DEBUG: About to create new comm, ID= %i\n", ID);
trajComm_ = Parallel::World().Split( ID );
if (ID != MPI_UNDEFINED) {
mprintf("Warning: '%s' command does not yet use multiple MPI threads.\n", argIn.Command());
mprintf("Warning: '%s' command does not yet use multiple MPI processes.\n", argIn.Command());
ret = WriteCrd(State, argIn);
if (ret != CpptrajState::OK)
err = 1;
Expand Down
2 changes: 1 addition & 1 deletion src/Exec_RunAnalysis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Exec::RetType Exec_RunAnalysis::Execute(CpptrajState& State, ArgList& argIn) {
# ifdef MPI
// Only master performs analyses currently.
if (Parallel::TrajComm().Size() > 1)
mprintf("Warning: Analysis does not currently use multiple MPI threads.\n");
mprintf("Warning: Analysis does not currently use multiple MPI processes.\n");
if (Parallel::TrajComm().Master())
# endif
err = DoRunAnalysis(State, argIn);
Expand Down
44 changes: 22 additions & 22 deletions src/Parallel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ void Parallel::dbgprintf(const char* format, ...) {
return;
}

/** Open a file named Thread.worldrank for this thread */
/** Open a file named Process.worldrank for this process */
int Parallel::debug_init() {
char outfilename[32];
sprintf(outfilename, "Thread.%03i", world_.Rank());
sprintf(outfilename, "Process.%03i", world_.Rank());
mpidebugfile_ = fopen(outfilename, "w");
if (mpidebugfile_ == NULL) {
fprintf(stderr,"[%i]\tCould not open debug file:\n", world_.Rank());
Expand All @@ -74,7 +74,7 @@ int Parallel::debug_init() {
return 0;
}

/** Close Thread.worldrank file. */
/** Close Process.worldrank file. */
int Parallel::debug_end() {
if (mpidebugfile_ != 0)
fclose(mpidebugfile_);
Expand All @@ -97,7 +97,7 @@ int Parallel::Init(int argc, char** argv) {
//char processor_name[MPI_MAX_PROCESSOR_NAME];
//int namelen;
//MPI_Get_processor_name(processor_name, &namelen);
//printf("DEBUG: Thread %i of %i on %s\n", world_.Rank(), world_.Size(), processor_name);
//printf("DEBUG: Process %i of %i on %s\n", world_.Rank(), world_.Size(), processor_name);
return 0;
}

Expand Down Expand Up @@ -125,13 +125,13 @@ int Parallel::Abort(int errcode) {
/** Trajectory and Ensemble communicators are set up orthogonal to one
* another. In row-major notation, Trajectory communicators are set up
* across rows, and Ensemble communicators are set up down columns. For
* example, if reading in 2 ensemble members with 3 threads per member
* example, if reading in 2 ensemble members with 3 processes per member
* (world size 6), the layout would be:
* 0 1 2 (member 0)
* 3 4 5 (member 1)
* Threads 0 and 3 would read the first third of the trajectories, etc.
* Processes 0 and 3 would read the first third of the trajectories, etc.
*/
int Parallel::SetupComms(int ngroups, bool allowFewerThreadsThanGroups) {
int Parallel::SetupComms(int ngroups, bool allowFewerProcessesThanGroups) {
if (ngroups < 1) {
// If ngroups < 1 assume we want to reset comm info
//fprintf(stdout, "DEBUG: Resetting ensemble/traj comm info.\n");
Expand All @@ -152,32 +152,32 @@ int Parallel::SetupComms(int ngroups, bool allowFewerThreadsThanGroups) {
return 1;
}
} else if (world_.Size() < ngroups) {
// Fewer threads than groups. Make sure that # threads is a
// Fewer processes than groups. Make sure that # processes is a
// multiple of ngroups. This is required for things like AllGather to work
// properly.
if (!allowFewerThreadsThanGroups) {
fprintf(stderr,"Error: Fewer threads than groups currently not allowed.\n");
if (!allowFewerProcessesThanGroups) {
fprintf(stderr,"Error: Fewer processes than groups currently not allowed.\n");
return 1;
}
trajComm_.Reset();
ensembleComm_.Reset();
if ( (ngroups % world_.Size()) != 0 ) {
fprintf(stderr,"Error: # of replicas (%i) must be a multiple of # threads (%i)\n",
fprintf(stderr,"Error: # of replicas (%i) must be a multiple of # processes (%i)\n",
ngroups, world_.Size());
return 1;
}
ensemble_size_ = ngroups;
n_ens_members_ = world_.DivideAmongThreads( ensemble_beg_, ensemble_end_, ensemble_size_ );
n_ens_members_ = world_.DivideAmongProcesses( ensemble_beg_, ensemble_end_, ensemble_size_ );
int ID = world_.Rank();
trajComm_ = world_.Split( ID );
// NOTE: This effectively duplicates World
ensembleComm_ = world_.Split( 0 );
world_.Barrier();
} else {
// Threads >= groups. Make sure that ngroups is a multiple of total # threads.
// Processes >= groups. Make sure that ngroups is a multiple of total # processes.
if ( (world_.Size() % ngroups) != 0 ) {
if ( world_.Master() )
fprintf(stderr,"Error: # of threads (%i) must be a multiple of # replicas (%i)\n",
fprintf(stderr,"Error: # of processes (%i) must be a multiple of # replicas (%i)\n",
world_.Size(), ngroups);
return 1;
}
Expand Down Expand Up @@ -233,7 +233,7 @@ int Parallel::SetupComms(int ngroups, bool allowFewerThreadsThanGroups) {

/** Can be placed inside the code so debugger can be attached. */
void Parallel::Lock() {
fprintf(stdout,"[%i] Thread is locked. Waiting for debugger.\n", world_.Rank());
fprintf(stdout,"[%i] Process is locked. Waiting for debugger.\n", world_.Rank());
int PleaseWait = 1;
while (PleaseWait == 1)
PleaseWait *= 1;
Expand Down Expand Up @@ -300,20 +300,20 @@ void Parallel::Comm::Reset() {
}

/** Split given number of elements as evenly as possible among ranks.
* \return Number of elements this thread is responsible for.
* \return Number of elements this process is responsible for.
*/
int Parallel::Comm::DivideAmongThreads(int& my_start, int& my_stop, int maxElts) const
int Parallel::Comm::DivideAmongProcesses(int& my_start, int& my_stop, int maxElts) const
{
int frames_per_thread = maxElts / size_;
int frames_per_process = maxElts / size_;
int remainder = maxElts % size_;
int my_frames = frames_per_thread + (int)(rank_ < remainder);
// Figure out where this thread starts and stops
int my_frames = frames_per_process + (int)(rank_ < remainder);
// Figure out where this process starts and stops
my_start = 0;
for (int rnk = 0; rnk != rank_; rnk++)
if (rnk < remainder)
my_start += (frames_per_thread + 1);
my_start += (frames_per_process + 1);
else
my_start += (frames_per_thread);
my_start += (frames_per_process);
my_stop = my_start + my_frames;
return my_frames;
}
Expand Down
Loading