[Up] [Previous] [Next] [Index]

5 MasterSlave Tutorial

Sections

  1. A simple example
  2. ParSquare
  3. ParInstallTOPCGlobalFunction() and TaskInputIterator() (ParSquare revisited)
  4. ParMultMat
  5. DefaultCheckTaskResult (as illustrated by ParSemiEchelonMatrix)
  6. Caching slave task outputs (ParSemiEchelonMat revisited)
  7. Agglomerating tasks for efficiency (ParSemiEchelonMat revisited again)
  8. Raw MasterSlave (ParMultMat revisited)

This chapter assumes the background knowledge in section Basic TOP-C (Master-Slave) commands. ParGAP must be invoked through a script like pargap.sh generated in ParGAP's bin during installation. In addition, there must be a procgroup file in the current directory when ParGAP is called, or alternatively, -p4pg FULL_PATH/procgroup may be included on the pargap.sh command line. See Section Running ParGAP for the details. You should find a sample procgroup file in ParGAP's bin directory after installation of ParGAP. See the section Invoking ParGAP with Remote Slaves for further details. Many of the examples of this section can be found in ParGAP's examples directory.

5.1 A simple example

A master-slave computation is invoked when a ParGAP program issues the command MasterSlave(). This command is an example of what is called ``collective communication'' in MPI (although the command is not part of MPI). It is also sometimes called SPMD (Single Program, Multiple Data), since all processes see the same code, although different processes may execute different parts of the code. The MasterSlave() command must be invoked on all processes before execution can begin. The following trivial example does this. (Note that the final \ on a line that is still inside a string allows continuation of a string to the next line.) We illustrate these principles first in their simplest form, making all variables global variables. Later, we introduce additional ParGAP utilities that allow one to write in better style.

#Shared Data: none
#TaskInput:   counter
#TaskOutput:  counter^2 (square of counter)
#Task:        compute counter^2 from counter
ParEval( "result := []" );
ParEval( "counter := 0; \
          SubmitTaskInput := function() \
            counter := counter + 1; \
            if counter <= 10 then return counter; else return NOTASK; fi; \
          end;");
ParEval( "DoTask := x->x^2" );
ParEval( "CheckTaskResultVers1 := function( input, output ) \
            result[input] := [input, output]; \
            return NO_ACTION; \
          end;" );
ParEval( "MasterSlave( SubmitTaskInput, DoTask, CheckTaskResultVers1 )" );
Print(result);

By default, ParTrace = true (see ParTrace), causing the execution to display each input, x, as it is passed from the master to a slave, and each output, x^2, as it is passed from the slave back to the master. This behavior can be turned off by setting: ParTrace := false; The fourth argument of MasterSlave(), Print, is a dummy argument that is never invoked in this example.

Note that the result list is filled in only on the local process, and was never defined or modified on the slave processes. To remedy this situation, we introduce the concept of shared data, a globally shared, application-defined data structure. A central principle of the TOP-C model in ParGAP is that any routine may ``read'' the shared data, but it may be modified only by the application-defined routine, UpdateSharedData(). Hence, if we wanted the result list to be recorded on all processes (perhaps as a lookup table), we would now write:

ParBroadcast(PrintToString("result = ", result));

5.2 ParSquare

Until now, we have been using global variables. It is better style to use local variables, where possible. We rewrite the above routine in the improved style:

#Shared Data: result [ result is shared among all processes ]
#TaskInput:   counter
#TaskOutput:  counter^2 (square of counter)
#Task:        compute counter^2 from counter
#UpdateSharedData:  record [counter, counter^2] at index, counter, of result
MSSquare := function( range ) # expects as input a range, like [1..10]
  local counter, result,
      SubmitTaskInput, DoTask, CheckTaskResultVers2, UpdateSharedData;
  counter := range[1]; # Reset counter for use in SubmitTaskInput()
  result := [];
  SubmitTaskInput := function()
    counter := counter + 1;
    if counter <= range[Length(range)] then return counter;
    else return NOTASK;
    fi;
  end;
  DoTask := x->x^2;
  CheckTaskResultVers2 := function( input, output )
    return UPDATE_ACTION;
  end;
  UpdateSharedData := function( input, output )
    result[input] := [input, output];
  end;
  MasterSlave( SubmitTaskInput, DoTask, CheckTaskResultVers2, UpdateSharedData );
  return result;
end;

#ParSquare() is the main calling function;  It must define MSSquare on
#  all slaves before calling it in parallel.
ParSquare := function( range ) # expects as input a range, like [1..10]
  ParEval( PrintToString( "MSSquare := ", MSSquare ) );
  return ParEval( PrintToString( "MSSquare( ", range, ")" ) );
end;

5.3 ParInstallTOPCGlobalFunction() and TaskInputIterator() (ParSquare revisited)

This example can be written more compactly by using some of the convenience functions provided by ParGAP. Specifically, we would rewrite this as:

ParInstallTOPCGlobalFunction( "ParSquare", function( range )
  local result;
  result := [];
  MasterSlave( TaskInputIterator( range ),
               x->x^2,
               function( input, output ) return UPDATE_ACTION; end, 
               function( input, output ) result[input] := [input, output]; end
             );
  return result;
end );

The usage above demonstrates the use of two utilities.

  • ParInstallTOPCGlobalFunction( string, function ) F

    This defines gvar as a function on the master and on the slaves. On each slave, the definition of gvar is given by function. However, on the master, gvar is defined as a function that first calls gvar on all slaves with the arguments originally passed to gvar, and then on the master, function is called with the original arguments. This is exactly the behavior that is wanted in order to compress an invocation of MasterSlave() so that the right things happen on both the master and on the slaves. This is exactly what we saw in the previous definition of ParSquare, above.

  • TaskInputIterator( collection ) F

    This function provides the functionality of a common case of SubmitTaskInput(), by turning it into a GAP iterator (see iterators in the Reference Manual). Its meaning is best understood from its definition:

    TaskInputIterator := function( collection )
      local iter;
      iter := Iterator( collection );
      return function()
               if IsDoneIterator(iter) then return NOTASK;
               else return NextIterator(iter);
               fi;
             end;
    end;
    

    5.4 ParMultMat

    Let us now write a matrix-matrix multiplication routine in this style. Since matrix multiplication for dimension n requires n3 operations, we can afford to spend n2 time doing any sequential work. (A finer analysis would also consider the number of slaves, k, resulting in up to k*n2 time to send all messages, depending on the MPI broadcast algorithm.) So, a sequential matrix multiplication program might be written as follows. (The style emphasizes clarity over efficiency.)

    SeqMultMat := function(m1, m2)               # sequential code
      local i, j, k, n, m2t, sum, result;
      n := Length(m1);
      result := [];
      m2t := TransposedMat(m2);
      for i in [1..n] do
        result[i] := [];
        for j in [1..n] do
          sum := 0;
          for k in [1..n] do
            sum := sum + m1[i][k]*m2t[j][k];
          od;
          result[i][j] := sum;
        od;
      od;
      return result;
    end;
    

    We choose to define the task as the computation of a single row of the result matrix. This corresponds to the body of the outermost loop.

    #Shared Data: m1, m2t, result (three matrices)
    #TaskInput:   i (row index of result matrix)
    #TaskOutput:  result[i] (row of result matrix)
    #Task:        Compute result[i] from i, m1, and m2
    #UpdateSharedData:  Given result[i] and i, modify result on all processes.
    
    ParInstallTOPCGlobalFunction( "ParMultMat", function(m1, m2)
      local i, n, m2t, result, DoTask, CheckTaskResult, UpdateSharedData;
      n := Length(m1);
      result := [];
      m2t := TransposedMat(m2);
    
      DoTask := function(i) # i is task input
        local j, k, sum;
        result[i] := [];
        for j in [1..n] do
          sum := 0;
          for k in [1..n] do
            sum := sum + m1[i][k]*m2t[j][k];
          od;
          result[i][j] := sum;
        od;
        return result[i]; # return task output, row_i
      end;
      # CheckTaskResult executes only on the master
      CheckTaskResult := function(i, row_i) # task output is row_i
        return UPDATE_ACTION; # Pass on output and input to UpdateSharedData
      end;
      # UpdateSharedData executes on the master and on all slaves
      UpdateSharedData := function(i, row_i) # task output is row_i
        result[i] := row_i;
      end;
      # We're done defining the task.  Let's do it now.
      MasterSlave( TaskInputIterator([1..n]), DoTask, CheckTaskResult,
                   UpdateSharedData );
      # result is defined on all processes;  return local copy of result
      return result;
    end );
    

    5.5 DefaultCheckTaskResult (as illustrated by ParSemiEchelonMatrix)

    Now that the basic principles of the TOP-C model are clear, we investigate an example that requires most of the basic features of ParGAP, including the use of IsUpToDate() and REDO_ACTION. Recall the standard idiom for CheckTaskResult(). These issues were discussed in the section Efficient Parallelism in MasterSlave() using CheckTaskResult().

    DefaultCheckTaskResult := function( taskOutput, taskInput )
      if taskOutput = false then return NO_ACTION;
      elif not IsUpToDate() then return REDO_ACTION;
      else return UPDATE_ACTION;
      fi;
    end;
    

    The next example is a parallelization of the function SemiEchelonMat() (a form of Gaussian elimination) in the GAP library, lib/matrix.gi. Unlike the previous examples, parallelizing Gaussian elimination efficiently is a non-trivial undertaking. This is because a naive parallelization has poor load balancing. A slave executing a task in the middle will have to sift a row vector through many previous row vectors, while a slave executing a task toward the beginning or end will have little work to do. We will begin with a naive parallelization based on the sequential code, and then migrate the code in a natural manner toward a more efficient form, by analyzing the inefficiencies and applying the TOP-C model.

    The reader may wish to stop and read the original code in lib/matrix.gi first. The logic of SemiEchelonMat() is to examine each row vector of an input matrix, in order, reduce it by a list of basis vectors stored in vectors, and then add the row to vectors. Upon completion, the number of leading zeroes of the row vectors in vectors may not increase monotonically, but each element of vectors will have a unique number of leading zeroes. Some rows of the input matrix may reduce to the zero matrix, in which case they are not added to vectors.

    For the reader's convenience, the original sequential code is reproduced here.

    SemiEchelonMat := function( mat )
    
        local zero,      # zero of the field of <mat>
              nrows,     # number of rows in <mat>
              ncols,     # number of columns in <mat>
              vectors,   # list of basis vectors
              heads,     # list of pivot positions in 'vectors'
              i,         # loop over rows
              j,         # loop over columns
              x,         # a current element
              nzheads,   # list of non-zero heads
              row;       # the row of current interest
    
        mat:= List( mat, ShallowCopy );
        nrows:= Length( mat );
        ncols:= Length( mat[1] );
    
        zero:= Zero( mat[1][1] );
    
        heads:= ListWithIdenticalEntries( ncols, 0 );
        nzheads := [];
        vectors := [];
    
        for i in [ 1 .. nrows ] do
            row := mat[i];
            # Reduce the row with the known basis vectors.
            for j in [ 1 .. Length(nzheads) ] do
                x := row[nzheads[j]];
                if x <> zero then
                    AddRowVector( row, vectors[ j ], - x );
                fi;
            od;
            j := PositionNot( row, zero );
            if j <= ncols then
                # We found a new basis vector.
                MultRowVector(row, Inverse(row[j]));
                Add( vectors, row );
                Add( nzheads, j);
                heads[j]:= Length( vectors );
            fi;
        od;
    
        return rec( heads   := heads,
                    vectors := vectors );
        end;
    

    Although GAP's Gaussian elimination algorithm appears to be awkward to parallelize (since the next row vector in vectors depends on row reduction by all previous vectors, we will see how the original code of SemiEchelonMat() can be modified in a natural manner to produce useful parallelism. This illustrates the general TOP-C paradigm for ``naturally'' parallelizing a sequential algorithm.

    #Shared Data: vectors (basis vectors), heads, nzheads, mat (matrix)
    #TaskInput:   i (row index of matrix)
    #TaskOutput:  List of (1) j and (2) row i of matrix, mat, reduced by vectors
    #               j is the first non-zero element of row i
    #Task:        Compute reduced row i from mat, vectors, heads
    #UpdateSharedData:  Given i, j, reduced row i, add new basis vector
    #               to vectors and update heads[j] to point to it
    
    ParInstallTOPCGlobalFunction( "ParSemiEchelonMat", function( mat )
        local zero,      # zero of the field of <mat>
              nrows,     # number of rows in <mat>
              ncols,     # number of columns in <mat>
              vectors,   # list of basis vectors
              heads,     # list of pivot positions in 'vectors'
              i,         # loop over rows
              nzheads,   # list of non-zero heads
              DoTask, UpdateSharedData;
    
        mat:= List( mat, ShallowCopy );
        nrows:= Length( mat );
        ncols:= Length( mat[1] );
    
        zero:= Zero( mat[1][1] );
    
        heads:= ListWithIdenticalEntries( ncols, 0 );
        nzheads := [];
        vectors := [];
    
        DoTask := function( i ) # taskInput = i
          local j,         # loop over columns
                x,         # a current element
                row;       # the row of current interest
          row := mat[i];
          # Reduce the row with the known basis vectors.
          for j in [ 1 .. Length(nzheads) ] do
              x := row[nzheads[j]];
              if x <> zero then
                  AddRowVector( row, vectors[ j ], - x );
              fi;
          od;
          j := PositionNot( row, zero );
          if j <= ncols then return [j, row]; # return taskOutput
          else return fail; fi;
        end;
        UpdateSharedData := function( i, taskOutput )
          local j, row;
          j := taskOutput[1];
          row := taskOutput[2];
          # We found a new basis vector.
          MultRowVector(row, Inverse(row[j]));
          Add( vectors, row );
          Add( nzheads, j);
          heads[j]:= Length( vectors );
        end;
        
        MasterSlave( TaskInputIterator( [1..nrows] ), DoTask, DefaultCheckTaskResult,
                      UpdateSharedData );
    
        return rec( heads   := heads,
                    vectors := vectors );
    end );
    

    The next section describes how to make this code more efficient.

    5.6 Caching slave task outputs (ParSemiEchelonMat revisited)

    The code above is inefficient unless nrows >> ncols. This is because if nrows is comparable to ncols, it will be rare for DoTask() to return fail. If most slaves return a result distinct from fail, then DefaultCheckTaskResult() will return an UPDATE_ACTION upon receiving the output from the first slave, and it will return a REDO_ACTION to all other slaves, until those slaves execute UpdateSharedData(). The inefficiency arose because a REDO_ACTION caused the original slave process to re-compute DoTask() from the beginning.

    In the case of a REDO_ACTION, we can fix this by taking advantage of information that was already computed. To accomplish this, a global variable should be defined on all slaves:

    ParEval("globalTaskOutput := [[-1]]");
    
    the routine DoTask() in the previous example should be modified to:

      DoTask := function( i )
          local j,         # loop over columns
                x,         # a current element
                row;       # the row of current interest
        if i = globalTaskOutput[1] then
          # then this is a REDO_ACTION
          row := globalTaskOutput[2]; # recover last row value
        else row := mat[i];
        fi;
        # Reduce the row with the known basis vectors.
        for j in [ 1 .. Length(nzheads) ] do
          x := row[nzheads[j]];
          if x <> zero then
            AddRowVector( row, vectors[ j ], - x );
          fi;
        od;
        j := PositionNot( row, zero );
        # save row in case of new REDO_ACTION
        globalTaskOutupt[1] := i;
        globalTaskOutput[2] := row;
        if j <= ncols then return [j, row]; # return taskOutput
        else return fail; fi;
      end;
    

    (A perceptive reader will have noticed that it was not necessary to also save and restore row from globalTaskOutput, since this can be found again based on the saved variable value i. However, the additional cost is small, and it illustrates potentially greater generality of the method.)

    The next section describes how to make this code more efficient.

    5.7 Agglomerating tasks for efficiency (ParSemiEchelonMat revisited again)

    A more efficient parallelization would partition the matrix into sets of adjacent rows, and send an entire set as a single taskInput. This would minimize the communication overhead, since the network latency varies only slowly with message sizxe, but linearly with the number of messages. To minimize network latency, one adds an extra parameter to MasterSlave() in order to bundle, perhaps, up to 5 tasks at a time.

    MasterSlave( TaskInputIterator( [1..n] ), DoTask, DefaultCheckTaskResult,
                 UpdateSharedData, 5 );
    

    Now the task input will be a list of the next 5 tasks returned by GetTaskInput(), or in this case by TaskInputIterator( [1..nrows] ). If fewer than 5 tasks are produced before NOTASK is returned, then the task input will be correspondingly shorter. If the first input task is NOTASK (yielding a list of tasks of length 0), then this will be interpreted as a traditional NOTASK. The task output corresponding to this task input is whatever the application routine, DoTask() produces as task output. The routine DoTask() will be unchanged, and MasterSlave() will arrange to repeatedly call DoTask(), once for each input task and produce a list of task outputs.

    Hence, this new variation requires us to rewrite UpdateSharedData() in the obvious manner, to handle a list of input and output tasks. Here is one solution to patch the earlier code.

  • TaskAgglomIndex V

    This global variable is provided for use inside DoTask(). It allows the application code to inquire about the index of the input task in the full list of tasks created when agglomTask is used. The variable is most useful in the case of a REDO_ACTION or CONTINUATION_ACTION(), as illustrated below.

    ParEval("globalTaskOutput := [[-1]]");
    ParEval("globalTaskOutputs := []");
    
    #Shared Data: vectors (basis vectors), heads, mat (matrix)
    #TaskInput:   i (row index of matrix)
    #TaskOutput:  List of (1) j and (2) row i of matrix, mat, reduced by vectors
    #               j is the first non-zero element of row i
    #Task:        Compute reduced row i from mat, vectors, heads
    #UpdateSharedData:  Given i, j, reduced row i, add new basis vector
    #               to vectors and update heads[j] to point to it
    
    ParInstallTOPCGlobalFunction( "ParSemiEchelonMat", function( mat )
      local zero,      # zero of the field of <mat>
            nrows,     # number of rows in <mat>
            ncols,     # number of columns in <mat>
            vectors,   # list of basis vectors
            heads,     # list of pivot positions in 'vectors'
            i,         # loop over rows
            nzheads,   # list of non-zero heads
            DoTask, UpdateSharedDataWithAgglom;
    
      mat:= List( mat, ShallowCopy );
      nrows:= Length( mat );
      ncols:= Length( mat[1] );
    
      zero:= Zero( mat[1][1] );
    
      heads:= ListWithIdenticalEntries( ncols, 0 );
      nzheads := [];
      vectors := [];
    
      DoTask := function( i )
          local j,         # loop over columns
                x,         # a current element
                row;       # the row of current interest
        if IsBound(globalTaskOutputs[TaskAgglomIndex])
            and i = globalTaskOutputs[TaskAgglomIndex][1] then
          # then this is a REDO_ACTION
          row := globalTaskOutputs[TaskAgglomIndex][2][2]; # recover last row value
        else row := mat[i];
        fi;
        # Reduce the row with the known basis vectors.
        for j in [ 1 .. Length(nzheads) ] do
          x := row[nzheads[j]];
          if x <> zero then
            AddRowVector( row, vectors[ j ], - x );
          fi;
        od;
        j := PositionNot( row, zero );
    
        # save [input, output] in case of new REDO_ACTION
        globalTaskOutputs[TaskAgglomIndex] := [ i, [j, row] ];
    
        if j <= ncols then return [j, row]; # return taskOutput
        else return fail; fi;
      end;
      
      # This version of UpdateSharedData() expects a list of taskOutput's
      UpdateSharedDataWithAgglom := function( listI, taskOutputs )
        local j, row, idx, tmp;
        for idx in [1..Length( taskOutputs )] do
          j := taskOutputs[idx][1];
          row := taskOutputs[idx][2];
          
          if idx > 1 then
            globalTaskOutputs[1] := [-1, [j, row] ];
            tmp := DoTask( -1 ); # Trick DoTask() into a REDO_ACTION
            if tmp <> fail then
              j := tmp[1];
              row := tmp[2];
            fi;
          fi;
    
          # We found a new basis vector.
          MultRowVector(row, Inverse(row[j]));
          Add( vectors, row );
          Add( nzheads, j);
          heads[j]:= Length( vectors );
        od;
      end;
        
      MasterSlave( TaskInputIterator( [1..nrows] ), DoTask, DefaultCheckTaskResult,
                    UpdateSharedDataWithAgglom, 5 ); #taskAgglom set to 5 tasks
    
      return rec( heads   := heads,
                  vectors := vectors );
    end );
    

    Note that in this simple example, we were able to re-use most of the code from the previous version, at the cost of adding an additional global variable, globalTaskOutputs. In fact, the last DoTask() is backward compatible to the first version of the code, for which agglomTasks is not used. If we wanted to run the latest code without agglomeration of tasks, it would suffice either to set the taskAgglom parameter to 1, or else to remove it entirely and replace UpdateSharedDataWithAgglom() by UpdateSharedData().

    It is useful to experiment with the above code by substituting a variable, taskAgglom, for the number 5, and trying it out with remote slaves on your own network for different values of taskAgglom and for different size matrices. You can call MasterSlaveStats() to see the effect of different parameters. Suitable pseudo-random matrices can be quickly generated via mat := PseudoRandom( GL( 30, 5 ) and similar commands.

    The paper Coo98 is suggested as further reading to see a still more efficient parallel implementation of ParSemiEchelonMatrix (also known as Gaussian elimination) using the TOP-C model.

    5.8 Raw MasterSlave (ParMultMat revisited)

    Finally, we given an example of a variation of MasterSlave(), based on a ``raw'' MasterSlave(). These versions are designed for the common case of legacy code that contains deeply nested parentheses. The taskInput may be generated inside several nested loops, making it awkward and error-prone to produce a function, SubmitTaskInput(), that will generate instances of taskInput in the appropriate sequence.

    Effectively, when we wish to return successive values from several deeply nested loops, we are in the situation of programming the ``opposite of a GAP iterator'' (see iterators in the Reference Manual). We are already producing successive iterator values, and we wish to ``stuff them back into some iterator''. Until GAP develops such a language construct :-) , the following example of a ``raws'' MasterSlave() demonstrates a solution. Before studying this example, please review the sequential version, SeqMultMat() near the beginning of section ParMultMat.

    We make use of the following three new ParGAP functions.

  • BeginRawMasterSlave( DoTask, CheckTaskResult, UpdateSharedData ) F
  • RawSubmitTaskInput( taskInput ) F
  • EndRawMasterSlave() F

    Their use will be obvious in the next example. This time, we parallelize SeqMultMat() by defining the task as the computation of a single entry in the result matrix. Hence, the task will be the computation of the appropriate inner product. For dimension n, n2 tasks are now generated, and each task is generated inside a doubly nested loop.

    #Shared Data: m1, m2t, result (three matrices)
    #TaskInput:   [i,j] (indices of entry in result matrix)
    #TaskOutput:  result[i][j] (value of entry in result matrix)
    #Task:        Compute inner produce of row i of m1 by colum j of m1
                  ( Note that column j of m1 is also row j of m2t, the transpose )
    #UpdateSharedData:  Given result[i][j] and [i,j], modify result everywhere
    
    ParInstallTOPCGlobalFunction( "ParRawMultMat", function(m1, m2)
      local i, j, k, n, m2t, sum, result, DoTask, CheckTaskResult, UpdateSharedData;
      n := Length(m1);
      m2t := TransposedMat(m2);
      result := ListWithIdenticalEntries( Length(m2t), [] );
    
      DoTask := function( input )
        local i,j,k,sum;
        i:=input[1]; j:=input[2];
        sum := 0;
        for k in [1..n] do
          sum := sum + m1[i][k]*m2t[j][k];
        od;
        return sum;
      end;
    
      CheckTaskResult := function( input, output )
        return UPDATE_ACTION;
      end;
    
      UpdateSharedData := function( input, output )
        local i, j;
        i := input[1]; j := input[2];
        result[i][j] := output;
        # result[i,j] := sum;
      end;
    
      BeginRawMasterSlave( DoTask, CheckTaskResult, UpdateSharedData );
      for i in [1..n] do
        result[i] := [];
        for j in [1..n] do
          RawSubmitTaskInput( [i,j] );
          # sum := 0;
          # for k in [1..n] do
          #   sum := sum + m1[i][k]*m2t[j][k];
          # od;
          # result[i][j] := sum;
        od;
      od;
      EndRawMasterSlave();
    
      return result;
    end );
    

    [Up] [Previous] [Next] [Index]

    ParGAP manual
    May 2002