MyriaL is an imperative-yet-declarative high-level data flow language based on the relational algebra that includes support for SQL syntax, iteration, user-defined functions, and familiar language constructs such as set comprehensions. The language is the flagship programming interface for the Myria big data management system, and can it also be compiled to a number of other back ends.
The language began as a “whiteboard language” for reasoning about the semantics of Datalog programs. At the time, we anticipated Datalog becoming our premier programming interface. But the fact that we were using an imperative style language to reason about Datalog made us realize we should just implement the imperative language directly.
MyriaL is imperative: Each program is a sequence of assignment statements. However, it is also declarative, in two ways: First, the optimizer is free to reorder blocks of code and apply other transformations as needed prior to execution, meaning that the programmer need not write the ``perfect’’ program for decent performance. Second, the right-hand-side of each assignment statement may itself be a declarative expression: either a SQL query or a set comprehension. We find this combination of features to strike a useful balance between programmer control and programmer convenience.
MyriaL was designed by the Database group at the University of Washington, led by Andrew Whitaker, now at Amazon.
Myria can read and store a CSV file from S3 via the load
command:
T = load("s3://uwdb/sampleData/TwitterK.csv", csv(schema(src:int, dst:int), skip=0));
store(T, TwitterK, [src, dst]);
The skip
option takes the number of lines at the beginning of the CSV file to skip over (such as column headers).
Here, Myria will create a relation T1
with the contents of TwitterK.csv
and store it in a table called TwitterK
. The third argument, [src, dst]
, is a list of attributes to partition the rows by.
Note that the load
command can also handle TSV data:
T = load("path/to/your/file.tsv", csv(schema(src:int, dst:int), skip=0, delimiter="\t"));
store(T, TwitterK, [src, dst]);
T = load("s3://uwdb/sampleData/sampleCrossmatch/points.txt",
csv(schema(id:int,
x:float,
y:float,
z:float), skip=0));
store(T, points, [x,y,z]);
You can also load data from other sources including your own local file system. To ingest from a local file source, you must deploy a local instance of Myria. Below is an example of loading a smallTable from a local file.
T = load("file:///path/to/smallTable/file",
csv(schema(x:float,
y:float), skip=0));
store(T, points, [x,y]);
If your table is in HDFS, you can also run something like the following:
T = load("hdfs://server:port/path/to/file",
csv(schema(x:float,
y:float), skip=0));
store(T, points, [x,y]);
Once a relation is stored, Myria can access use it in later queries with scan
. This example simply repartitions the TwitterK
relation by just attribute src
.
T = scan(TwitterK);
store(T, TwitterK_Src, [src]);
--Create an empty relation with a particular schema
r = empty(x:int, y:float, z:string);
store(r, myrelation);
MyriaL has fairly aggressive deadcode elimination. That means if you do not store a relation, Myria may not bother computing anything.
This program, for example,
T = scan(TwitterK);
results in the following error message:
MyrialCompileException: Optimized program is empty
MyriaL provides the sink
command to get around this. We often find sink
useful when benchmarking Myria’s performance. The following program scans TwitterK
from disk into memory and then throws the relation away.
T = scan(TwitterK);
sink(T);
Now for some real queries! MyriaL has two styles of syntax: SQL and comprehensions. If you’ve used list comprehensions in python then MyriaL’s comprehensions will look familiar. Use the style you prefer or mix and match.
You can try all the examples in this section yourself by copy/pasting them into your allotted demo cluster.
Let’s find the twitter relationships where the follower and followee are the same user.
T = scan(TwitterK);
-- SQL style syntax
s = select * from T where src = dst;
store(s, selfloops);
T = scan(TwitterK);
-- comprehension syntax
s = [from T where src = dst emit *];
store(s, selfloops);
from T
means read tuples from relation T. where src = dst
means only keep tuples where the value of src
is equal to the value of dst
. The *
in emit *
means the resulting relation should contain all the attributes from the relations in the from
clause (in this case, the attributes of T
: src
and dst
).
Joins let us match two relations on 1 or more attributes. This query finds all the friend-of-friend relationships in TwitterK.
T1 = scan(TwitterK);
T2 = scan(TwitterK);
joined = select T1.src as src, T1.dst as link, T2.dst as dst
from T1, T2
where T1.dst = T2.src;
store(joined, TwoHopsInTwitter);
T1 = scan(TwitterK);
T2 = scan(TwitterK);
joined = [from T1, T2
where T1.dst = T2.src
emit T1.src AS src, T1.dst AS link, T2.dst AS dst];
store(joined, TwoHopsInTwitter);
Aggregation lets us combine results from multiple tuples. This query counts the number of friends for user 821.
T = scan(TwitterK);
cnt = select count(*) from T where src=821;
store(cnt, user821);
T1 = scan(TwitterK);
cnt = [from T1 where src=821 emit count(*) as x];
store(cnt, user821);
We can also group the aggregation by attributes. This query counts the number of friends for each user.
T = scan(TwitterK);
T1 = select src as user, count(*) as degree from T;
store(T1, user_degrees);
T = scan(TwitterK);
T1 = [from T emit src as user, count(*) as degree];
store(T1, user_degrees);
Notice that MyriaL’s syntax differs from SQL for group by. MyriaL groups by all attributes in the select clause without using a group by clause. For clarity, the equivalent SQL query is:
select src as user, count(*) as degree from T group by src;
We can also sort results with an order by
clause:
T = scan(TwitterK);
T1 = select t1.a as src, t2.b as dst, count(*) as numLinks from T as t1, T as t2 where t1.b = t2.a order by src asc, numLinks desc limit 10;
store(T1, countLinksTwoHops);
Note: order by
clauses must be accompanied by a limit
clause.
+
or unionall
concatenates two relations in MyriaL using the bag semantics.
T1 = scan(TwitterK);
result = T1+T1;
result = unionall(result, T1);
store(result, threeTimes);
Most operations in MyriaL treat the relation like a bag rather than a set, like SQL. However, MyriaL also has set operators: diff
, and distinct
. intersect
and union
(set-semantics) are not implemented but can be composed using diff
, unionall
and distinct
.
edges = scan(TwitterK);
left = select src as vertex from edges;
right = select dst as vertex from edges;
dups = left + right;
vertices = select distinct vertex from dups;
store(vertices, users);
Note the query plan created by MyriaX for the above query! It computes left
and right
in parallel.
edges = scan(TwitterK);
left = select src as vertex from edges;
right = select dst as vertex from edges;
onlyleft = diff(left, right);
store(onlyleft, onlyAsSource);
Also, notice how the query plans for the union, distinct operation is different from that of just diff!
Exercise: Find all users who follow someone and are followed by someone.
MyriaL allows users to define two kinds of functions: UDF and UDA. A user-defined function (UDF) takes one or more parameters to produce an output. The function foo
below is a UDF and it can be directly invoked in any of the expressions.
def foo(a, b): a - int(a/(b+1))*b;
T1 = [from scan(TwitterK) as t emit foo(src, dst)];
store(T1, udf_result);
A user-defined aggregate function, which is sometimes called an UDAF or UDA takes in a series of inputs and produces a single output for the series. The syntax for defining a UDA is as follows:
uda func-name(args) {
initialization-expr(s);
update-expr(s);
result-expr(s);
};
User-defined aggregate function to calculate an arg max. We’ll use it to find the vertex with the largest degree.
-- break ties by picking the first value
def pickBasedOnValue(value1, arg1, value2, arg2):
case
when value1 >= value2
then arg1
else arg2
end;
-- User defined aggregate that finds the argmax and max
uda argMaxAndMax(arg, val) {
-- init
[-1 as _arg, -1 as _val];
-- update
[pickBasedOnValue(val, arg, _val, _arg),
pickBasedOnValue(val, val, _val, _val)];
-- output
[_arg, _val];
};
T = scan(TwitterK);
degrees = select dst as vertex, count(*) as followers
from T;
most_followed_follower = select T.dst, argMaxAndMax(D.followers, D.vertex)
from T, degrees as D
where T.src = D.vertex;
store(most_followed_follower, MostFollowedFollower);
General SQL syntax only allows pure functions in expression. Even UDFs described above does not allow you to do more complicated map functions such as those that require an internal state.
A very good example is to assign session ids to clickstreams based on the interval between two clicks. You need to record the last time a click was received in order to decide whether to assign the next session id or the same session id to a clickstream.
The following program assigns a sequential id to each tuple. Important: stateful apply is partition-local. That means every partition keeps its own state. The following program produces 0,1,2… for the tuples on every partition.
apply counter() {
[0 AS c];
[c + 1];
c;
};
T1 = scan(TwitterK);
T2 = [from T1 emit src, counter()];
store(T2, identified);
To do a distributed counter, Myria has coordination operators like broadcast and collect, but these are not currently exposed in MyriaL.
MyriaL supports do...while
loops. The loop condition must be an expression that evaluates to a singleton relation with one boolean attribute ie. either [true]
or [false]
. Here are a few examples:
In this example, we will find the set of nodes that are reachable from a given node in the Twitter dataset. Note that there is a special syntax for a scalar constant in MyriaL.
edges = scan(TwitterK);
-- special syntax for scalar constants
source = [2 AS addr];
reachable = source;
do
before_size = select count(*) as B
from reachable;
new_reachable = select edges.dst as addr
from reachable, edges
where reachable.addr = edges.src;
reachable = new_reachable + reachable;
reachable = select distinct addr
from reachable;
after_size = select count(*) as A
from reachable;
while [from before_size, after_size emit A - B > 0];
store(reachable, Reachable);
Here is another example of a do...while
loop used to find the connected components in the Twitter dataset.
edges = scan(TwitterK);
con_comp = select src as nid, src as cid
from edges;
do
before_size = select count(*) as B
from con_comp;
new_con_comp = select edges.dst as nid, con_comp.cid as cid
from edges, con_comp
where edges.src = con_comp.nid;
new_con_comp = new_con_comp + con_comp;
new_con_comp = select nid, min(cid) as cid
from new_con_comp;
con_comp = new_con_comp;
after_size = select count(*) as A
from con_comp;
while [from before_size, after_size emit A - B > 0];
comp_count = [from con_comp emit cid as id, count(*) as cnt];
store(comp_count, TwitterCC);