/*
* Copyright (c) 2004-2010 The Regents of the University of California.
* All rights reserved.
*
* '$Author$'
* '$Date$'
* '$Revision$'
*
* Permission is hereby granted, without written agreement and without
* license or royalty fees, to use, copy, modify, and distribute this
* software and its documentation for any purpose, provided that the above
* copyright notice and the following two paragraphs appear in all copies
* of this software.
*
* IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
* FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
* ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
* THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
* PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
* CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
* ENHANCEMENTS, OR MODIFICATIONS.
*
*/
/*
* '$Author$'
* '$Date$'
* '$Revision$'
*/
package org.kepler.distributed;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Vector;
import org.kepler.configuration.ConfigurationManager;
import org.kepler.configuration.ConfigurationProperty;
import org.kepler.distributed.gui.DCAEditorPane;
import ptolemy.actor.Director;
import ptolemy.actor.IOPort;
import ptolemy.actor.QueueReceiver;
import ptolemy.actor.Receiver;
import ptolemy.actor.TypedCompositeActor;
import ptolemy.actor.TypedIOPort;
import ptolemy.data.IntToken;
import ptolemy.data.Token;
import ptolemy.data.expr.Variable;
import ptolemy.data.type.Type;
import ptolemy.domains.pn.kernel.PNDirector;
import ptolemy.domains.pn.kernel.PNQueueReceiver;
import ptolemy.kernel.CompositeEntity;
import ptolemy.kernel.util.Attribute;
import ptolemy.kernel.util.IllegalActionException;
import ptolemy.kernel.util.InternalErrorException;
import ptolemy.kernel.util.NameDuplicationException;
import ptolemy.kernel.util.Settable;
import ptolemy.kernel.util.Workspace;
import ptolemy.util.MessageHandler;
//////////////////////////////////////////////////////////////////////////
//// DistributedCompositeActor
/**
* '$Author$' '$Date$'
* '$Revision$'
*/
public class DistributedCompositeActor extends TypedCompositeActor implements
Remote {
private Slave currentSlave;
private Vector threadVector = new Vector();
public static File LOGFILE = new File("master-slave/distributed-output.log");
/**
* Construct an actor in the default workspace with no container and an
* empty string as its name. Add the actor to the workspace directory. You
* should set the local director or executive director before attempting to
* send data to the actor or to execute it. Increment the version number of
* the workspace.
*/
public DistributedCompositeActor() {
super();
// By default, when exporting MoML, the class name is whatever
// the Java class is, which in this case is DistributedCompositeActor.
// In derived classes, however, we usually do not want to identify
// the class name as that of the derived class, but rather want
// to identify it as DistributedCompositeActor. This way, the MoML
// that is exported does not depend on the presence of the
// derived class Java definition. Thus, we force the class name
// here to be DistributedCompositeActor.
setClassName("org.kepler.distributed.DistributedCompositeActor");
}
/**
* Construct a DistributedCompositeActor in the specified workspace with no
* container and an empty string as a name. You can then change the name
* with setName(). If the workspace argument is null, then use the default
* workspace. You should set the local director or executive director before
* attempting to send data to the actor or to execute it. Add the actor to
* the workspace directory. Increment the version number of the workspace.
*
*@param workspace
* The workspace that will list the actor.
*/
public DistributedCompositeActor(Workspace workspace) {
super(workspace);
// By default, when exporting MoML, the class name is whatever
// the Java class is, which in this case is DistributedCompositeActor.
// In derived classes, however, we usually do not want to identify
// the class name as that of the derived class, but rather want
// to identify it as DistributedCompositeActor. This way, the MoML
// that is exported does not depend on the presence of the
// derived class Java definition. Thus, we force the class name
// here to be DistributedCompositeActor.
setClassName("org.kepler.distributed.DistributedCompositeActor");
}
/**
* Construct a DistributedCompositeActor with a name and a container. The
* container argument must not be null, or a NullPointerException will be
* thrown. This actor will use the workspace of the container for
* synchronization and version counts. If the name argument is null, then
* the name is set to the empty string. Increment the version of the
* workspace. This actor will have no local director initially, and its
* executive director will be simply the director of the container.
*
*@param container
* The container.
*@param name
* The name of this actor.
*@exception IllegalActionException
* If the container is incompatible with this actor.
*@exception NameDuplicationException
* If the name coincides with an actor already in the
* container.
*/
public DistributedCompositeActor(CompositeEntity container, String name)
throws IllegalActionException, NameDuplicationException {
super(container, name);
// By default, when exporting MoML, the class name is whatever
// the Java class is, which in this case is DistributedCompositeActor.
// In derived classes, however, we usually do not want to identify
// the class name as that of the derived class, but rather want
// to identify it as DistributedCompositeActor. This way, the MoML
// that is exported does not depend on the presence of the
// derived class Java definition. Thus, we force the class name
// here to be DistributedCompositeActor.
setClassName("org.kepler.distributed.DistributedCompositeActor");
initConfigDialog();
}
/**
* create the configuration dialog for the DCA. This allows us to add
* configuration options for sending the execution from this DCA to a
* specific slave or slaves.
*/
private void initConfigDialog() throws IllegalActionException,
NameDuplicationException {
// enable the custom config dialog
DCAEditorPane dcapane = new DCAEditorPane(this, "Editor Pane");
}
// /////////////////////////////////////////////////////////////////
// // public methods ////
/**
* Run a complete execution of the contained model. A complete execution
* consists of invocation of super.initialize(), repeated invocations of
* super.prefire(), super.fire(), and super.postfire(), followed by
* super.wrapup(). The invocations of prefire(), fire(), and postfire() are
* repeated until either the model indicates it is not ready to execute
* (prefire() returns false), or it requests a stop (postfire() returns
* false or stop() is called). Before running the complete execution, this
* method calls the director's transferInputs() method to read any available
* inputs. After running the complete execution, it calls transferOutputs().
* The subclass of this can set the
* _isSubclassOfDistributedCompositeActor to be true to call the fire
* method of the superclass of this.
*
*@exception IllegalActionException
* If there is no director, or if the director's action
* methods throw it.
*/
public void fire() throws IllegalActionException {
try {
print("%%%%%%%%%%%%DCA fire start%%%%%%%%%%%%%%%%");
boolean readInputOK = true;
do {
while (readInputOK && masterController.slavesAvailable()) {
if (readInputOK) {
currentSlave = masterController.getSlave();
readInputOK = _readInputs(masterController,
currentSlave);
print("read input to slave? " + readInputOK);
print("%%%%%%%%%%%%DCA read input to slave: %%%%%%%%%%%%%%%%"
+ currentSlave.getServer().getHostname());
} else {
//print("%%%%%%%%%%%%no input token available: %%%%%%%%%%%%%%%%");
}
}
//} while (readInputOK && masterController.slavesAvailable());
//print("%%%%%%%%%%%%DCA firing%%%%%%%%%%%%%%%%");
//print("Check all slaves to see whether they finish execution");
Vector removedVec = new Vector();
for (int i = 0; i < threadVector.size(); i++) {
ExecuteModelThread emt = (ExecuteModelThread) threadVector
.elementAt(i);
if (!emt.dataTransferred) {
//print("$$$$$$$$$$$$$$ Waiting 1000 millisecond for threads to finish during fire $$$$$$$$$$$$$$ ");
// Thread.currentThread().sleep(1000);
// block here until all threads are done
// we don't want to complete wrapup until all current
// threads finish
} else {
print("begin to write output to slave:"
+ emt.slave);
//_writeOutputs(emt.slave); // write the outputs
//emt.dataTransferred = true;
print("write output finished at ExecuteModelThread.");
emt.master.releaseSlave(emt.slave); // release the slave
print("releaseSlave finished at ExecuteModelThread.");
removedVec.add(emt);
}
}
for (int i = 0; i < removedVec.size(); i++) {
ExecuteModelThread emt = (ExecuteModelThread) removedVec
.elementAt(i);
print("ExecuteModelThread " + emt.getName() + " running at slave: " + emt.slave.getServer().getHostname()
+ " is removed from thread vector");
threadVector.remove(emt);
}
} while (readInputOK || threadVector.size() > 0);
} catch (IllegalActionException e) {
e.printStackTrace();
throw new IllegalActionException("Error executing the "
+ "DistributedCompositeActor: " + e.getMessage());
} catch (RemoteException e) {
e.printStackTrace();
print("RemoteException");
throw new IllegalActionException("Error executing the "
+ "DistributedCompositeActor: " + e.getMessage());
} catch (IOException e) {
e.printStackTrace();
print("IOException");
throw new IllegalActionException("Error executing the "
+ "DistributedCompositeActor: " + e.getMessage());
}
// }catch (InterruptedException e)
// {
// e.printStackTrace();
// print("InterruptedException");
// throw new IllegalActionException("Error executing the " +
// "DistributedCompositeActor: " + e.getMessage());
// }
print("finishing fire().");
// try get input again to realize PN block
// NOTE: This is an essentially exact copy of the code in
// ModelReference,
// but this class and that one can't easily share a common base class.
print("begin to read input again to get read block for PN Director.");
Iterator ports = inputPortList().iterator();
if (ports.hasNext()) {
IOPort port = (IOPort) ports.next();
if ((port.getWidth() > 0) && port.hasToken(0)) {
print("port " + port.getName() + " has token");
Token token = port.get(0);
// TODO be careful if file pointer token
print("_readInputs getting token from port and putting token to the master: "
+ token);
}
}
}
/**
* Initialize this actor, which in this case, does nothing. The
* initialization of the submodel is accomplished in fire(). The subclass of
* this can set the _isSubclassOfDistributedCompositeActor to be true
* to call the initialize method of the superclass of this.
*
*@exception IllegalActionException
* Not thrown in this base class, but declared so the
* subclasses can throw it.
*/
public void initialize() throws IllegalActionException {
super.initialize();
try {
// java.io.File configFile = new java.io.File(CONFIG_FILE);
// // getParameter("distributed config file")
// print("config file exists: " + configFile.exists());
// print("config file path: " + configFile.getPath());
// URL fileURL = configFile.toURL();
// look in the DCA to see if there are DistributedServerAttributes.
// If there
// are, use the hosts defined in those attributes to execute. If
// not,
// use the default host list from the config file.
String hostList = "";
boolean hasAtt = false;
Iterator attlistitt = this.attributeList(
DistributedServerAttribute.class).iterator();
while (attlistitt.hasNext()) {
hasAtt = true;
DistributedServerAttribute att = (DistributedServerAttribute) attlistitt
.next();
print("att: " + att.getName());
print("attexpr: " + att.getExpression());
hostList += att.getExpression() + "\n";
}
// try the top-level attributes
// List attList = this.getContainer().attributeList();
Iterator attIt = this.getContainer().attributeList().iterator();
Vector newAttIt = new Vector();
while (attIt.hasNext()) {
Attribute att = (Attribute) attIt.next();
// Parameter para = (Parameter)att;
// System.out.println("att:"+att.exportMoML());
// print("att in print:"+att.description());
// if(!att.getName().endsWith("Director"))
if(!att.exportMoML().equalsIgnoreCase(""))
newAttIt.add(att.exportMoML());
}
// try the types of input ports
Iterator inputPortList = this.inputPortList().iterator();
Map inputPortMap = new HashMap();
while (inputPortList.hasNext()) {
TypedIOPort inputPort = (TypedIOPort) inputPortList.next();
String strInputName = inputPort.getName();
Type inputType = inputPort.getType();
// Parameter para = (Parameter)att;
// System.out.println("att:"+att.exportMoML());
print("input :" + strInputName + " whose type is "
+ inputType.toString());
inputPortMap.put(strInputName, inputType.toString());
}
// try the types of output ports
Iterator outputPortList = this.outputPortList().iterator();
Map outputPortMap = new HashMap();
while (outputPortList.hasNext()) {
TypedIOPort outputPort = (TypedIOPort) outputPortList.next();
String strOutputName = outputPort.getName();
Type outputType = outputPort.getType();
// Parameter para = (Parameter)att;
// System.out.println("att:"+att.exportMoML());
print("output :" + strOutputName + " whose type is "
+ outputType.toString());
outputPortMap.put(strOutputName, outputType.toString());
}
if (hasAtt) { // use the attributes
print("using DSA attributes for host selection.");
print("hostList: " + hostList);
byte[] b = hostList.getBytes();
ByteArrayInputStream bais = new ByteArrayInputStream(b);
masterController = new MasterController(this, newAttIt,
inputPortMap, outputPortMap, bais);
// masterController = new MasterController(this, newAttIt, null,
// bais);
} else { // use the config file
print("using config file for host selection.");
ConfigurationManager confManager = ConfigurationManager.getInstance();
List slavePropList = confManager.getProperties(
ConfigurationManager.getModule("master-slave"), "usedSlaveList.slave");
ConfigurationProperty slaveProp = null;
String[] hosts = new String [slavePropList.size()];
String oneSlave = null;
for (int i=0; i_isSubclassOfDistributedCompositeActor to be true to
* call the postfire method of the superclass of this.
*
*@return Description of the Return Value
*@exception IllegalActionException
* Not thrown in this base class, but declared so the
* subclasses can throw it.
*/
public boolean postfire() throws IllegalActionException {
print("calling postfire()");
// tell the master we're done executing the wf
try {
print("Waiting for current slaves to finish execution before finishing postfire()");
for (int i = 0; i < threadVector.size(); i++) {
ExecuteModelThread emt = (ExecuteModelThread) threadVector
.elementAt(i);
while (!emt.dataTransferred) {
// print(
// "$$$$$$$$$$$$$$ Waiting for threads to finish before wrapping up"
// );
Thread.currentThread().sleep(1000);
// block here until all threads are done
// we don't want to complete wrapup until all current
// threads finish
}
}
} catch (Exception e) {
throw new IllegalActionException(
"Could not release slave. You may "
+ "want to do this manually. : " + e.getMessage());
}
print("finish of postfire()");
print("super.postfire():" + super.postfire());
// return super.postfire();
Director director = this.getExecutiveDirector();
print("director of this actor:" + director.getName());
print("director class of this actor:" + director.getClass());
if (director instanceof PNDirector)
return false;
else
return true;
// return true;
}
/**
* Return true, indicating that this actor is always ready to fire.
*
*@return Description of the Return Value
*@exception IllegalActionException
* Not thrown in this base class, but declared so the
* subclasses can throw it.
*/
public boolean prefire() throws IllegalActionException {
print("waiting in prefire");
// currentSlave = masterController.getSlave(); //block here if there is
// no slave available
print("done waiting in prefire");
return true;
}
/**
* Override the base class to set type constraints between the output ports
* and parameters of this actor whose name matches the output port. If there
* is no such parameter, then create an instance of Variable with a matching
* name and set up the type constraints to that instance. The type of the
* output port is constrained to be at least that of the parameter or
* variable.
*
*@exception IllegalActionException
* If there is no director, or if the director's
* preinitialize() method throws it, or if this actor is not
* opaque.
*/
// public void preinitialize()
// throws IllegalActionException
// {
// }
/**
* Override the base class to release the slaves
*
*@exception IllegalActionException
* Not thrown in this base class, but declared so the
* subclasses can throw it.
*/
public void wrapup() throws IllegalActionException {
print("calling wrapup()");
// tell the master we're done executing the wf
try {
if (masterController != null)
masterController.done();
print("empty ExecutionModelThread Vector...");
threadVector = new Vector();
print("calling super.wrapup()");
super.wrapup();
} catch (Exception e) {
e.printStackTrace();
throw new IllegalActionException(
"Could not release slave. You may "
+ "want to do this manually. : " + e.getMessage());
}
}
// /////////////////////////////////////////////////////////////////
// // protected methods ////
/**
* Iterate over input ports and read any available values into the
* referenced model parameters. Then start the slave thread.
*
*@param master
* The distributed connection manager.
*@exception IllegalActionException
* If reading the ports or setting the parameters causes it.
*@exception RemoteException
* Description of the Exception
*@exception IOException
* Description of the Exception
*/
protected boolean _readInputs(MasterController master, Slave slave)
throws IllegalActionException {
try {
print("Reading Inputs...");
// NOTE: This is an essentially exact copy of the code in
// ModelReference,
// but this class and that one can't easily share a common base
// class.
Iterator ports = inputPortList().iterator();
boolean gotToken = false;
boolean returnBoolean = false;
while (ports.hasNext()) {
IOPort port = (IOPort) ports.next();
if (slave == null) {
print("SLAVE IS NULL");
}
print("reading input from port: " + port + " sending to slave "
+ slave.toString());
Receiver[][] inputReceiverArray = port.getReceivers();
Receiver inputReceiver = inputReceiverArray[0][0];
if (inputReceiver instanceof PNQueueReceiver) {
print("the receiver is a PNQueueReceiver");
int size = ((PNQueueReceiver) inputReceiver).size();
print("PNQueueReceiver " + inputReceiver + " with its size:" + size);
if ( size > 0){
//if (((QueueReceiver) inputReceiver).size() > 0) {
print("port " + port.getName() + " has token");
print("try to get input token from " + port.getName());
Token token = inputReceiver.get();
// TODO be careful if file pointer token
print("_readInputs from Queue Receiver getting token from port and putting token to the master: "
+ token);
gotToken = true;
returnBoolean = true;
// put the token to the specific port on the specific
// slave
master
.put(slave, new Token[] { token }, port
.getName());
//}
}else{
print("in DCA: " + this);
print("the PNQueueReceiver's size is 0, first check whether having running slave threads. If have, wait for their finish first.");
int j = 0;
// int threadNumber = threadVector.size();
int receiverSize = 0;
while (threadVector.size() > j && receiverSize == 0){
j = 0;
print("receiverSize is :" + receiverSize);
print("threadVector.size() is :" + threadVector.size() + ", and j is:" + j);
for (int i = 0; i < threadVector.size(); i++) {
ExecuteModelThread emt = (ExecuteModelThread) threadVector
.elementAt(i);
print("ExecuteModelThread:" + emt);
print("emt.dataTransferred:" + emt.dataTransferred);
if (emt.dataTransferred){
j++;
} else {
print("$$$$$$$$$$$$$$ No data anymore. Waiting for the finishing of current running slave threads. $$$$$$$$$$$$$$ ");
Thread.currentThread().sleep(1000);
receiverSize = ((PNQueueReceiver) inputReceiver).size();
}
}
}
// while (threadVector.size() > 0 && receiverSize == 0){
// receiverSize = ((PNQueueReceiver) inputReceiver).size();
// for (int i = 0; i < threadVector.size(); i++) {
// ExecuteModelThread emt = (ExecuteModelThread) threadVector
// .elementAt(i);
// if (!emt.done){
// print("$$$$$$$$$$$$$$ No data anymore. Waiting for the finishing of current running slave threads. $$$$$$$$$$$$$$ ");
// Thread.currentThread().sleep(1000);
// }
// }
// }
receiverSize = ((PNQueueReceiver) inputReceiver).size();
print("After all running slave finished or new input token is available, the PNQueueReceiver's size is :" + receiverSize);
print("threadVector.size() is :" + threadVector.size() + ", and j is:" + j);
//if (receiverSize > 0 || threadVector.size() == j){
Token token = inputReceiver.get();
// TODO be careful if file pointer token
print("_readInputs from Queue Receiver getting token from port and putting token to the master: "
+ token);
gotToken = true;
returnBoolean = true;
// put the token to the specific port on the specific
// slave
master
.put(slave, new Token[] { token }, port
.getName());
//}
}
}
else if ((port.getWidth() > 0) && port.hasToken(0)) {
print("port " + port.getName() + " has token");
print("port.getWidth():" + port.getWidth()
+ ", and port.hasToken(0):" + port.hasToken(0));
gotToken = true;
returnBoolean = false;
Token token = port.get(0);
// TODO be careful if file pointer token
print("_readInputs getting token from port and putting token to the master: "
+ token);
// put the token to the specific port on the specific slave
master.put(slave, new Token[] { token }, port.getName());
}
}
if (gotToken) {
// tell the master we're ready to execute
ExecuteModelThread emt = new ExecuteModelThread(master, slave);
print("a new ExecuteModelThread: " + emt + " is created in DCA:" + this);
threadVector.addElement(emt);
emt.start();
}
return returnBoolean;
} catch (RemoteException e) {
e.printStackTrace();
print("RemoteException");
throw new IllegalActionException("RMI error while getting data: "
+ e.getMessage());
} catch (IOException e) {
e.printStackTrace();
print("IOException");
throw new IllegalActionException("RMI error while getting data: "
+ e.getMessage());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
print("InterruptedException");
throw new IllegalActionException("RMI error while getting data: "
+ e.getMessage());
}
}
/**
* print a message with the name of this class prepended
*/
private void print(String message) {
try {
DistributedLogger.print("DistributedCompositeActor: " + message,
DistributedLogger.HIGH);
} catch (Exception e) {
e.fillInStackTrace().toString();
e.printStackTrace();
}
}
private void printException(String slaveName, Exception ex) {
String exceptionMessage;
if (slaveName != null)
{
exceptionMessage = "Error executing the workflow at " + slaveName + ".";
}
else
{
exceptionMessage = "Error executing the workflow at slave side.";
}
MessageHandler.error(exceptionMessage, ex);
}
// ///////////////////////////////////////////////////////////////////
// // protected variables ////
/**
* Iterate over output ports and read any available values from the
* referenced model parameters and produce them on the outputs.
*
*@exception IllegalActionException
* If reading the parameters or writing to the ports causes
* it.
* @throws IOException
* @throws RemoteException
*/
protected void _writeOutputs(Slave slave) throws IllegalActionException {
try {
print("_writeOutputs");
Iterator ports = outputPortList().iterator();
// doesn't run in this while
while (ports.hasNext()) {
IOPort port = (IOPort) ports.next();
String portName = port.getName();
print("writing from port " + port.getName());
if (port.getWidth() > 0) {
Attribute attribute = getAttribute(portName);
// Use the token directly rather than a string if possible.
if (attribute instanceof Variable) {
// TODO which? how many?
print("_writeOutputs masterController.get variable");
// get the token from the specific slave on the specific
// port
Token[] temp = masterController.get(slave, portName);
if (temp == null) {
return;
}
// send the data on it's way
port.send(0, temp, temp.length);
} else if (attribute instanceof Settable) {
print("_writeOutputs masterController.get settable");
// get the token from the specific slave on the specific
// port
Token[] temp = masterController.get(slave, portName);
if (temp == null) {
return;
}
// send the data on it's way
port.send(0, temp, temp.length);
} else {
print("_writeOutputs masterController.get other");
// get the token from the specific slave on the specific
// port
Token[] temp = masterController.get(slave, portName);
if (temp == null) {
return;
}
print("get token from slave: "
+ slave.getServer().getHostname()
+ " at port: " + portName);
// Token.toString() is called when temp[i] is called. So
// DataToken.toString() is called which call
// DataToken.stringValue().
for (int i = 0; i < temp.length; i++) {
print("token " + i + ": " + temp[i]);
}
Receiver[][] localReceivers = port.getInsideReceivers();
for (int t = 0; t < localReceivers.length; t++) {
for (int j = 0; j < localReceivers[t].length; j++) {
print("input port:"
+ t
+ "."
+ j
+ ":"
+ localReceivers[t][j].getContainer()
.getFullName());
}
}
Receiver[][] localReceivers2 = port
.getRemoteReceivers();
for (int t = 0; t < localReceivers2.length; t++) {
for (int j = 0; j < localReceivers2[t].length; j++) {
print("remote input port:"
+ t
+ "."
+ j
+ ":"
+ localReceivers2[t][j].getContainer()
.getFullName());
}
}
// send the data on it's way
port.send(0, temp, temp.length);
// port.broadcast(temp, temp.length);
print("finish token sending");
}
}
}
} catch (RemoteException e) {
e.printStackTrace();
print("RemoteException");
throw new IllegalActionException("RMI error while getting data: "
+ e.getMessage());
} catch (IOException e) {
e.printStackTrace();
print("IOException");
throw new IllegalActionException("RMI error while getting data: "
+ e.getMessage());
}
print("Done writing outputs");
}
/** Description of the Field */
// protected static MasterController masterController;
MasterController masterController;
// public static String CONFIG_FILE = "master-slave/resources/system.properties/DistributedKepler.config";
// public static String USER_CONFIG_FILE = "master-slave/resources/system.properties/DistributedKeplerUser.config";
// ////////////////////////////////////////////////////////////////////
// // private classes /////
/**
* a thread to do the execution. The reason for using Thread is to trigger a
* slave execution once one input token is ready, without waiting for the output
* of previous input. It is useful in PN director.
*/
private class ExecuteModelThread extends Thread {
MasterController master;
DistributedCompositeActor dca;
Slave slave;
public boolean done = false;
public boolean dataTransferred = false;
String slaveName = null;
/**
* constructor
*/
public ExecuteModelThread(MasterController mc, Slave slave) {
// this.dca = dc;
this.slave = slave;
this.master = mc;
try {
slaveName = slave.getServer().getHostname();
} catch (RemoteException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* run this when the thread is executed
*/
public void run() {
try {
// Make sure that change requests are not executed when
// requested,
// but rather only executed when executeChangeRequests() is
// called.
setDeferringChangeRequests(true);
// _readInputs(master, slave); //read inputs and execute
master.readyToRun(slave); // block on execution
print("remote execution finished at slave: "
+ slave.getServer().getHostname() + ", for DCA: " + DistributedCompositeActor.this);
} catch (Exception e) {
print("Error running ExecuteModelThread: " + e.getMessage());
e.printStackTrace();
printException(slaveName, e);
}
done = true;
print("begin to write output from DCA " + DistributedCompositeActor.this + " to slave:"
+ slave);
try {
DistributedCompositeActor.this._writeOutputs(slave);
dataTransferred = true;
} catch (IllegalActionException e) {
print("Error running ExecuteModelThread: " + e.getMessage());
// TODO Auto-generated catch block
e.printStackTrace();
printException(slaveName, e);
} // write the outputs
print("write output finished at ExecuteModelThread: " + this);
}
/**
* tell the data has been transferred back to master.
*/
}
}