Myria REST API

Query plan

MyriaX organizes its computation tasks by units of query plans. A query plan has a three layer hierarchical structure.

At the very top of a computation is the QueryPlan. A query plan has a globally identifiable long valued queryID, a set of properties that may control the execution of the query plan, for example how to react when a worker executing the query fails, and a mapping from workerID to sub queries. The maping from a workerID to a SubQueryPlan means the SubQueryPlan should be executed by the worker with id workerID.

A QueryPlan can be generated by many ways. The simplest way is to create the QueryPlan Java object completely by hand. A better way is to compose a Json execution plan. And even better is to use a higher end language such as MyriaL or Datalog.

When a QueryPlan is submitted, the system verifies the correctness of it and then dispatch the SubQueries to workers. A QueryPlan is considered running if any of the SubQueries is running. And currently if any of the SubQueries has errors during the execution, the whole QueryPlan will be killed. Note that here the errors are only limited to data processing errors. If any system error happens, for example, a worker machine gets down, the processing is up to the fault tolerance policies.

+----------------------------------------------------------------+
|  Query                                                         |
| +-----------------------+         +--------------------------+ |
| |   Sub Query           |         |     Sub Query            | |
| |                       |         |                          | |
| | +------------------+  |         | +---------------------+  | |
| | |  LocalFragment 1 |  |         | |  LocalFragment 1    |  | |
| | +------------------+  |         | +---------------------+  | |
| | +------------------+  |         | +---------------------+  | |
| | |  LocalFragment 2 |  |         | |  LocalFragment 3    |  | |
| | +------------------+  |         | +---------------------+  | |
| +-+------------------+--+         +-+---------------------+--+ |
|        Worker 1                           Worker 2             |
+----------------------------------------------------------------+

SubQuery

A SubQuery is the computation tasks that one worker is assigned in a QueryPlan. It contains a set of LocalFragments. The execution state of a SubQuery is similar to the execution state of a QueryPlan. When a SubQuery starts execution, all the LocalFragments start execution in the same time. A SubQuery is considered completed if all the LocalFragments are completed. And if any of the LocalFragments fails, the whole SubQuery will be marked as failure and gets killed.

Query Fragment

A query fragment is the basic execution unit in MyriaX. Each LocalFragment is a driver of a single Operator tree. It is the basic execution unit of MyriaX.

                          +------------+                
                          |            |                
                          |  DbInsert  |                
                          |            |                
                          +------+-----+                
                                 |                      
                          +------+-----+                
                          |            |                
                +------>  |   Merge    | <------+       
                |         |            |        |       
                |         +------------+        |       
                |                               |       
           +----+-----+                  +------+------+
           |          |                  |             |
    +----> |   Join   +--+               |   Filter    |
    |      |          |  |               |             |
    |      +----------+  |               +------+------+
    |                    |                      |       
+---+------+        +----+---+           +------+------+
|          |        |        |           |             |
| Postgres |        |  HDFS  |           |  Local file |
|          |        |        |           |             |
+----------+        +--------+           +-------------+

A sample query fragment.

A operator tree is driven by a query fragment, which is discussed in the next section. A query fragment maintains the execution environment of an Operator tree.

Roughly, a query fragment runs a Operator tree in the following way (The actual implementation is much more complex because of all the concurrent state management):

while (executionCondition is satisified)
{
  if (operatorTree.fetchNextReady() is null and no new data arrived)
    break; // no data can be output currently
}

Operators

Operators are MyriaX’s data processing units. Each operator has a set of children operators (may be 0). Relational data (in format of TupleBatch) are drawn from the children operators (i.e. the inputs of the operator). Output relational data can be drawn by calling “Operator.fetchNextReady()”. Each operator can have a set of children operators, but only one or zero parent operator. In this way, operators can be linked together to form an operator tree.

Initialization and cleanup.

Before data processing, an Operator must be initialized, by calling “Operator.open()”, which in turn calls “Operator.init()”. It is a method that is required to be implemented by the Operator developers to do actual initialization. It may include memory allocation, system resource reservation, etc.

Once an operator is opend by the MyriaX system, it is guarranted that the Operator’s “Operator.close()”. method will be called after the execution of the Operator, either successfully or erroneously. And in turn “Operator.cleanup()” is called. The Operator developer should do exactly the opposite of init in the cleanup method.

RootOperator and LeafOperator

A RootOperator is an operator without parent. Each operator tree must be rooted by a RootOperator. RootOperator is the single location where the relational data leaves the computing system. Currently, there are SinkRoot which simply drop everything, Insert which inserts the data drawn from its child into a database, and Producer ( and its children classes) which transfers data to remote operators.

LeafOperator are the leafs of operator trees. There are also several implementations of LeafOperator, including DbScan which scans relational data from database tables, FileScan which scans relational data from local data files.

States

Each Operator has a EOS state variable. An Operator should execute and process data only if the EOS state is not true. If the EOS state becomes true, the Operator no longer process any data. It may still return data because it may maintain an output data buffer.

Scheduling

The MyriaX execution layer is modeled after the Actor Model. But it is not a general purpose Actor Model implementation. It does not allow creation of new actors by existing actors. It allows only relational data as messages. And it restricts the processing of the messages by only using the Operator interface.

Currently there are no schedulers in Myria. Once a query Fragment gets executed, it keeps executing until it yields. And also if there is a set of query fragments waiting for execution, and now a execution thread becomes free, it is not deterministic of which pending query fragments will be executed.

Execution mode and threading model

There are two execution modes for LocalFragments, i.e. Non-Blocking and Blocking. But the blocking mode is obsolete. It is not maintained long time ago.

The differences between the two modes are mainly at the threading model.

In the non-blocking mode, a query executor has a pool of a fixed number of threads for executing LocalFragments. The execution threads are similar to the slots in Hadoop. The LocalFragment is designed that when an executing LocalFragment finds out that no progress can be made, for example no more input data available, the LocalFragment should tell the execution system that it is willing to yield the execution resources to other LocalFragments.

In the code block of the last section, the yield of a LocalFragment is implemented in changing the executionCondition to unsatisfied and in the break on no output.

The blocking mode has no such execution pool. Each time a LocalFragment is created, a new Java Thread is created to execute the operator tree. The executionConditions and the break will be ignored. The execution keeps going when any of the LocalFragements is not EOS and no errors occur.

Execution condition

Each LocalFragment has a long state variable recording the current execution condition. Each bit of the long variable is a state indicator. Currently we have the following state bits:

To start executing a LocalFragment, the execution condition must be: “EXECUTION_PRE_START = STATE_INITIALIZED STATE_OUTPUT_AVAILABLE STATE_INPUT_AVAILABLE”.
After execution starts, if the LocalFragment is ready to actually gets executed, the exeuction condition is: “EXECUTION_READY = EXECUTION_PRE_START STATE_STARTED”.  
When a LocalFragment executed a round (i.e. a call of fetchNextReady on the root Operator), it needs to check if currently another round of execution is needed. The execution condition is: “EXECUTION_READY STATE_EXECUTION_REQUESTED STATE_IN_EXECUTION”.

IPC

The IPC layer is the module that controls all the inter-process communications in MyriaX, including control message delivery and data message delivery.

All the source codes of the IPC layer are in the edu.washington.escience.myria.parallel.ipc package. And all the functionalities of the IPC layer are provided through the IPCConnectionPool class.

The typical usage of the IPC layer is like the following code example:

    final IPCConnectionPool c =
        new IPCConnectionPool(myID, computingUnits, ..., payloadSerializer,
            messageProcessor, ...);
    c.start(...);
    ...
    c.sendShortMessage(receiver,message);
    ...
    StreamOutputChannel o = c.reserveLongTermConnection(receiver,dataStreamID);
    try{
      for (d : data)
        o.write(d);
    } finally {
      o.release();
      ...
    }

The IPCEntities

The IPC layer is designed to support not only inter-process communications, but also intra-process communications. To provide this flexibility, the IPC layer abstracts the various senders and receivers using IPCEntity.

Each IPCEntity has an IPCID. It is currently an integer. Given a set of IPCEntities, the IPCID of each of them must be different in order for them to communicate with each other and must be non-negative integers. In currently MyriaX, IPCID of value 0 is reserved for the master. An IPCEntity can also specify negative IPCIDs as the destination of a communication connection. In this case it means connect to myself locally.

Each IPCEntity also has a SocketInfo recording the address of the IPCEntity. Currently, only IP4 addresses/host names together with port numbers are supported.

Each IPCEntity is mapped into a single instance of an IPCConnectionPool. If a Java process has several IPCConnectionPool instances, each of them is an IPCEntity. They are able to talk to each other as long as their IPCID are unique and the SocketInfo addresses are also unique.

Services

The IPC layer tries to hide all the complex and error prone concurrency/parallelism issues under a clean and easy to use interface. It provides two services for the users.

Standalone message delivery service

This service can be accessed through the call of “IPCConnectionPool.sendShortMessage(ipcID, message)”. This service is suitable for control message delivery. Given two calls of the sendShortMessage, there’s no guarantee that the message sent by the first call is delivered and processed by the recipient before the message sent by the second call.

Stream data delivery service.

To use this service, firstly call “streamOChannel = IPCConnectionPool.reserveLongTermConnection(ipcID, streamID, …)” and get a “StreamOutputChannel” instance. This will establish a data transfer channel (using TCP connections). And then data transfer can be achieved by calling “streamOChannel.write(message)” as many times as the number of messages there are waiting for getting transferred. Given two calls of the write method in the same “streamOChannel” instance, the message written by the first call is guaranteed to get delivered and processed before the second one.

After all the messages are written, call “streamOChannel.release()” to release the connection.

IPC messages

The data unit that carries around by the IPC layer is IPCMessage.

IPC Header

Each IPCMessage has a header denoting the type of the IPCMessage. Currently there are 6 headers: BOS, EOS, CONNECT, DISCONNECT, PING, and DATA.

IPC Payload

For a DATA IPCMessage, the payload of the message is user defined. The IPC layer does not know what the Java type of the payload is. It only sees the binary representation, i.e. the serialized byte array of the payload.

PayloadSerializer is the interface of implementing IPC message payload serialization/deserialization. When a user message is written by a user into the IPC layer, either through sendShortMessage or though the StreamOutputChannel.write(), the PayloadSerializer.serialize method will be invoked and the payload will be serialized into byte arrays. When a user message is received by the IPC layer, the PayloadSerializer.deserialize method will be invoked to create a structured java object from byte arrays.

In current implementation, Myria uses Google ProtoBuf for data serialization and de-serialization.

Data Stream

In the stream data delivery service, The IPC layer has a notion of data stream. However, different from the data stream in point-to-point data transfer protocols such as TCP, the data stream in the MyriaX IPC layer may have multiple input streams. And also the actual distribution of the input streams over processes or physical machines can be arbitrary.

+---------------------------+                                      
| Worker 1                  |                                      
|                           |                                      
|     +-------------------+ |                                      
|     |   (wID:1, sID:2)  +-----------+                            
|     +-------------------+ |         |                            
|                           |         |                            
|     +-------------------+ |         |          +----------------+
|     |   (wID:1, sID:3)  +-----------+          |                |
|     +-------------------+ |         |          |  Data Stream:  |
+-----+-------------------+-+         |          |                |
                                      +--------> |  | wID | sID|  |
                                      |          |     1     2    |
+-----+-------------------+-+         |          |     1     3    |
|     +-------------------+ |         |          |     2     2    |
|     |   (wID:2, sID:2)  +-----------+          +----------------+
|     +-------------------+ |                                      
|                           |                                      
| Worker 2                  |                                      
+---------------------------+                                      

For example, the above diagram illustrates a data stream that has three input streams. The first two input streams are from worker 1 and the third input stream are from worker 2.

For the input streams, there can be many semantics. For example, set, bag or ordered. Currently, only bag-semantic data streams are implemented in MyriaX.

StreamInputBuffers

MyriaX uses an input buffer mechanism as a way of decoupling the IO module and the query execution module.

Input stream 1  +--------> +---------------+
                           |               |
                           |     Stream    |
Input stream 2  +--------> |  Input Buffer | -------> QueryExecution
                           |               |
                           |               |
Input stream 3  +--------> +---------------+

A stream input buffer is:

Currently only bag-semantic input buffers are implemented in MyriaX.

Threading model

Data Inputs: The input buffers are filled by IO workers. There’s no restriction on the threading model of the input streams. Currently, the input streams and the IO threads are n:1 mapping, i.e. data from one input stream will always be put into an input buffer by the same thread, but an IO thread may put data from multiple input streams.

Data Outputs: The threading model of pulling data out of an input buffer is upon implementations of input buffers. Current major implementation, i.e. the FlowControlInputBuffer, requires that only a single thread pulls data.

Connection pooling

MyriaX relies on Java NIO for actual connection creation and data transferring. More specifically we use Netty.

MyriaX maintains a one-to-one mapping between stream data connections or standalone message data connections and Netty connections.

For remote Netty connections, the IPC layer will not immediately release them at the time they are released by the user either in stream data transferring or standalone message transferring . The connections are pooled instead. The pooling works according to the following policy:

Flow control

Netty abstracts all the data transferring operations through a Channel interface. For each Channel, there is a bit controlling whether currently the channel should read data from the underlying system network stack. The bit can be set by calling “Channel.setReadable(readable)”.

If data reading is paused, the system network layer at the recipient side notifies the sender side to stop sending data once the system receive buffer is full. This mechanism of flow control is called network back pressure.

MyriaX adopts a push based data transferring model. The sender keeps sending data until the recipient is not able to process them quickly enough. And the flow control in MyriaX is exactly the back pressure mechanism.

Currently the flow controlling is implemented in input buffers, more specifically, the FlowControlInputBuffer class. The mechanism is as the following: