Sunday, September 30, 2012

Another example of streaming functions

Now I want to show an example that is fundamentally kind of dumb. The same thing is easier to do in Triceps with templates. And the whole premise is not exactly great either. But it provides an opportunity to show more of the streaming functions, in a set-up that is closer to the SQL-based systems.

The background is as follows: There happen to be multiple ways to identify the securities (stock shares etc.). "RIC" is the identifier used by Reuters (and quite often by the other data suppliers), consisting of the ticker symbol on an exchange and the code of the exchange. ISIN is the international standard identifier. A security (and some of its creative derivatives) might happen to be listed on multiple exchanges, each having its own RIC, but all translating to the same ISIN (there might be multiple ISINs too but that's another story). A large financial company would want to track a security all around the world. To aggregate the data on the security worldwide, it has to identify it by ISIN, but the data feed might be coming in as RIC only. The translation of RIC to ISIN is then done by the table during processing. The RIC is not thrown away either, it shows the detail of what happened. But ISIN is added for the aggregation.

The data might be coming from multiple feeds, and there are multiple kinds of data: trades, quotes, lending quotes and so on, each with its own schema and its own aggregations. However the step of RIC-to-ISIN translation is the same for all of them, is done by the same table, and can be done in one place.

An extra complexity is that in the real world the translation table might be incomplete. However some feeds might provide both RICs and ISINs in their records, so the pairs that aren't in the reference table yet, can be inserted there and used for the following translations. This is actually not such a great idea, because it means that there might be previous records that have went through before the translation became available. A much better way would be to do the translation as a join, where the update to a reference table would update any previous records as well. But then there would not be much use for a streaming function in it. As I've said before, it's a rather dumb example.

The streaming function will work like this: It will get an argument pair of (RIC, ISIN) from an incoming record. Either component of this pair might be empty. Since the rest of the record is wildly different for different feeds, the rest of the record is left off at this point, and the uniform argument of (RIC, ISIN) is given to the function. The function will consult its table, see if it can add more information from there, or add more information from the argument into the table, and return the hopefully enriched pair (RIC, ISIN) with an empty ISIN field replaced by the right value, to the caller.

The function is defined like this:

my $rtIsin = Triceps::RowType->new(
    ric => "string",
    isin => "string",
) or confess "$!";

my $ttIsin = Triceps::TableType->new($rtIsin)
    ->addSubIndex("byRic", Triceps::IndexType->newHashed(key => [ "ric" ])
) or confess "$!";
$ttIsin->initialize() or confess "$!";

my $tIsin = $unit->makeTable($ttIsin, "EM_CALL", "tIsin") or confess "$!";

# the results will come from here
my $fretLookupIsin = Triceps::FnReturn->new(
    name => "fretLookupIsin",
    unit => $unit,
    labels => [
        result => $rtIsin,
    ],
);

# The function argument: the input data will be sent here.
my $lbLookupIsin = $unit->makeLabel($rtIsin, "lbLookupIsin", undef, sub {
    my $row = $_[1]->getRow();
    if ($row->get("ric")) {
        my $argrh = $tIsin->makeRowHandle($row);
        my $rh = $tIsin->find($argrh);
        if ($rh->isNull()) {
            if ($row->get("isin")) {
                $tIsin->insert($argrh);
            }
        } else {
            $row = $rh->getRow();
        }
    }
    $unit->call($fretLookupIsin->getLabel("result")
        ->makeRowop("OP_INSERT", $row));
}) or confess "$!";

The $fretLookupIsin is the function result, $lbLookupIsin is the function input. In this example the result label in FnReturn is defined differently than in the previous one: not by a source label but by a row type. This label doesn't get chained to anything, instead the procedural code finds it as $fretLookupIsin->getLabel("result") and sends the rowops directly to it.

Then the ISIN translation code for some trades feed would look as follows (remember, supposedly there would be many feeds, each with its own schema, but for the example I show only one):

my $rtTrade = Triceps::RowType->new(
    ric => "string",
    isin => "string",
    size => "float64",
    price => "float64",
) or confess "$!";

my $lbTradeEnriched = $unit->makeDummyLabel($rtTrade, "lbTradeEnriched");
my $lbTrade = $unit->makeLabel($rtTrade, "lbTrade", undef, sub {
    my $rowop = $_[1];
    my $row = $rowop->getRow();
    Triceps::FnBinding::call(
        name => "callTradeLookupIsin",
        on => $fretLookupIsin,
        unit => $unit,
        rowop => $lbLookupIsin->makeRowopHash("OP_INSERT",
            ric => $row->get("ric"),
            isin => $row->get("isin"),
        ),
        labels => [
            result => sub { # a label will be created from this sub
                $unit->call($lbTradeEnriched->makeRowop($rowop->getOpcode(),
                    $row->copymod(
                        isin => $_[1]->getRow()->get("isin")
                    )
                ));
            },
        ],
    );
});

The label $lbTrade receives the incoming trades, calls the streaming function to enrich them with the ISIN data, and forwards the enriched data to the label $lbTradeEnriched. The function call is done differently in this example. Rather than create a FnBinding object and then use it with a scoped AutoFnBind, it uses the convenience function FnBinding::call() that wraps all that logic. It's simpler to use, without all these extra objects, but the price is the efficiency: it ends up creating a new FnBinding object for every call. That's where a compiler would be very useful, it could take a call like this, translate it to the internal objects once, and then keep reusing them.

A FnBinding::call gets a name that is used for the error messages and also to give names to the temporary objects it creates. The option "on" tells, which streaming function is being called (by specifying its FnReturn). The option "rowop" gives the arguments of the streaming functions. There are multiple way to do that: option "rowop" for a single rowop, "rowops" for an array of rowops, "tray" for a tray, and "code" for a procedural function that would send the inputs. And "labels" as usual connects the results of the function, either to the labels, or by creating labels automatically from the snippets of code.

The result handling here demonstrates the technique that I call the "implicit join": The function gets a portion of data from an original row, does some transformation and returns the data back. This data is then joined with the original row. The code knows, what this original row was, it gets remembered in the variable $row. The semantics of the call guarantees that nothing else has happened during the function call, and that $row is still the current row. Then the function result gets joined with $row, and the produced data is sent further on its way. The variable $row could be either a global one, or as shown here a scoped variable that gets embedded into a closure function.

The rest of the example, the dispatcher part, is:

# print what is going on
my $lbPrintIsin = makePrintLabel("printIsin", $tIsin->getOutputLabel());
my $lbPrintTrade = makePrintLabel("printTrade", $lbTradeEnriched);

# the main loop
my %dispatch = (
    isin => $tIsin->getInputLabel(),
    trade => $lbTrade,
);

while(<STDIN>) {
    chomp;
    my @data = split(/,/); # starts with a command, then string opcode
    my $type = shift @data;
    my $lb = $dispatch{$type};
    my $rowop = $lb->makeRowopArray(@data);
    $unit->call($rowop);
    $unit->drainFrame(); # just in case, for completeness
}

And an example of running, with the input lines shown according to the new convention preceded by "> ":

> isin,OP_INSERT,ABC.L,US0000012345
tIsin.out OP_INSERT ric="ABC.L" isin="US0000012345"
> isin,OP_INSERT,ABC.N,US0000012345
tIsin.out OP_INSERT ric="ABC.N" isin="US0000012345"
> isin,OP_INSERT,DEF.N,US0000054321
tIsin.out OP_INSERT ric="DEF.N" isin="US0000054321"
> trade,OP_INSERT,ABC.L,,100,10.5
lbTradeEnriched OP_INSERT ric="ABC.L" isin="US0000012345" size="100" price="10.5"
> trade,OP_DELETE,ABC.N,,200,10.5
lbTradeEnriched OP_DELETE ric="ABC.N" isin="US0000012345" size="200" price="10.5"
> trade,OP_INSERT,GHI.N,,300,10.5
lbTradeEnriched OP_INSERT ric="GHI.N" isin="" size="300" price="10.5"
> trade,OP_INSERT,,XX0000012345,400,10.5
lbTradeEnriched OP_INSERT ric="" isin="XX0000012345" size="400" price="10.5"
> trade,OP_INSERT,GHI.N,XX0000012345,500,10.5
tIsin.out OP_INSERT ric="GHI.N" isin="XX0000012345"
lbTradeEnriched OP_INSERT ric="GHI.N" isin="XX0000012345" size="500" price="10.5"
> trade,OP_INSERT,GHI.N,,600,10.5
lbTradeEnriched OP_INSERT ric="GHI.N" isin="XX0000012345" size="600" price="10.5"

The table gets pre-populated with a few translations, and the first few trades use them. Then goes the example of a non-existing translation, which gets eventually added from the incoming data (see that the trade with GHI.N,XX0000012345 both updates the ISIN table and sends through the trade record), and the following trades can then use this newly added translation.

Saturday, September 29, 2012

More of Collapse with functions

The Collapse as shown before sends all the collected deletes before all the collected inserts. For example, if it has collected the updates for four rows, the output will be (assuming that the Collapse element is named "collapse" and the data set in it is named "idata"):

collapse.idata.out OP_DELETE local_ip="3.3.3.3" remote_ip="7.7.7.7" bytes="100"
collapse.idata.out OP_DELETE local_ip="2.2.2.2" remote_ip="6.6.6.6" bytes="100"
collapse.idata.out OP_DELETE local_ip="4.4.4.4" remote_ip="8.8.8.8" bytes="100"
collapse.idata.out OP_DELETE local_ip="1.1.1.1" remote_ip="5.5.5.5" bytes="100"
collapse.idata.out OP_INSERT local_ip="3.3.3.3" remote_ip="7.7.7.7" bytes="300"
collapse.idata.out OP_INSERT local_ip="2.2.2.2" remote_ip="6.6.6.6" bytes="300"
collapse.idata.out OP_INSERT local_ip="4.4.4.4" remote_ip="8.8.8.8" bytes="300"
collapse.idata.out OP_INSERT local_ip="1.1.1.1" remote_ip="5.5.5.5" bytes="300"

What if you want the deletes followed directly by the matching inserts? Like this:

collapse.idata.out OP_DELETE local_ip="3.3.3.3" remote_ip="7.7.7.7" bytes="100"
collapse.idata.out OP_INSERT local_ip="3.3.3.3" remote_ip="7.7.7.7" bytes="300"
collapse.idata.out OP_DELETE local_ip="2.2.2.2" remote_ip="6.6.6.6" bytes="100"
collapse.idata.out OP_INSERT local_ip="2.2.2.2" remote_ip="6.6.6.6" bytes="300"
collapse.idata.out OP_DELETE local_ip="4.4.4.4" remote_ip="8.8.8.8" bytes="100"
collapse.idata.out OP_INSERT local_ip="4.4.4.4" remote_ip="8.8.8.8" bytes="300"
collapse.idata.out OP_DELETE local_ip="1.1.1.1" remote_ip="5.5.5.5" bytes="100"
collapse.idata.out OP_INSERT local_ip="1.1.1.1" remote_ip="5.5.5.5" bytes="300"

With the procedural version it required doing an look-up in the insert table after processing each row in the delete table and handling it if found. So I've left it out to avoid complicating the example. But in the streaming function form it becomes easy, just change the binding a little bit:

        my $lbInsInput = $dataset->{tbInsert}->getInputLabel();

        my $fbind = Triceps::FnBinding->new(
            name => $self->{name} . "." . $dataset->{name} . ".bndTbl",
            on => $fret,
            unit => $unit,
            labels => [
                del => sub {
                    if ($_[1]->isDelete()) {
                        $unit->call($lbOut->adopt($_[1]));
                        # If the INSERT is available after this DELETE, this
                        # will produce it.
                        $unit->call($lbInsInput->adopt($_[1]));
                    }
                },
                ins => sub {
                    if ($_[1]->isDelete()) {
                        $unit->call($lbOut->makeRowop($OP_INSERT, $_[1]->getRow()));
                    }
                },
            ],
        );

The "del" binding first sends the result out as usual and then forwards the DELETE rowop to the insert table's input. Which then causes the insert rowop to be sent of a match is found. Mind you, the look-up and conditional processing still happens. But now it all happens inside the table machinery, all you need to do is add one more line to invoke it.

Let's talk in a little more detail, what happens when the clearing of the Delete table deletes the row with (local_ip="3.3.3.3" remote_ip="7.7.7.7").

  • The Delete table sends a rowop with this row and OP_DELETE to its output label collapse.idata.tbDelete.out.
  • Which then gets forwarded to a chained label in the FnReturn, collapse.idata.retTbl.del.
  • FnReturn has a FnBinding pushed into it, so the rowop passes to the matching label in the binding, collapse.idata.bndTbl.del.
  • The Perl handler of that label gets called, first forwards the rowop to the Collapse output label collapse.idata.out, and then to the Insert table's input label collapse.idata.tbInsert.in.
  • The Insert table looks up the row by the key, finds it, removes it from the table, and sends an OP_DELETE rowop to its output label collapse.idata.tbInsert.out.
  • Which then gets forwarded to a chained label in the FnReturn, collapse.idata.retTbl.ins.
  • FnReturn has a FnBinding pushed into it, so the rowop passes to the matching label in the binding, collapse.idata.bndTbl.ins.
  • The Perl handler of that label gets called and sends the rowop with the opcode changed to OP_INSERT to the Collapse output label collapse.idata.out.
It's a fairly complicated sequence but all you needed to do was to add one line of code. The downside of course is that if something goes not the way you expected, you'd have to trace and understand the whole  long sequence.

Since when the INSERTs are send after DELETEs, their data is removed from the Insert table too, the following clear() of the Insert table won't find them any more and won't send any duplicates; it will send only the inserts for which there were no matching deletes.

You may notice that the code in the "del" handler only forwards the rows around, and that can be replaced by a chaining:

        my $lbDel = $unit->makeDummyLabel(
            $dataset->{tbDelete}->getOutputLabel()->getRowType(),
            $self->{name} . "." . $dataset->{name} . ".lbDel");
        $lbDel->chain($lbOut);
        $lbDel->chain($lbInsInput);

        my $fbind = Triceps::FnBinding->new(
            name => $self->{name} . "." . $dataset->{name} . ".bndTbl",
            on => $fret,
            unit => $unit,
            labels => [
                del => $lbDel,
                ins => sub {
                    $unit->call($lbOut->makeRowop($OP_INSERT, $_[1]->getRow()));
                },
            ],
        );

This shows another way of label definition in FnBinding: an actual label is created first and then given to FnBinding, instead of letting it automatically create a label from the code. The "if ($_[1]->isDelete())" condition has been removed from the "ins", since it's really redundant, and the delete part with its chaining doesn't do the same check anyway.

This code works just as well and even more efficiently than the previous version, since no Perl code needs to be invoked for "del", it all propagates internally through the chaining. However the price is that the DELETE rowops coming out of the output label will have the head-of-the-chain label in them:

collapse.idata.lbDel OP_DELETE local_ip="3.3.3.3" remote_ip="7.7.7.7" bytes="100"
collapse.idata.out OP_INSERT local_ip="3.3.3.3" remote_ip="7.7.7.7" bytes="300"
collapse.idata.lbDel OP_DELETE local_ip="2.2.2.2" remote_ip="6.6.6.6" bytes="100"
collapse.idata.out OP_INSERT local_ip="2.2.2.2" remote_ip="6.6.6.6" bytes="300"
collapse.idata.lbDel OP_DELETE local_ip="4.4.4.4" remote_ip="8.8.8.8" bytes="100"
collapse.idata.out OP_INSERT local_ip="4.4.4.4" remote_ip="8.8.8.8" bytes="300"
collapse.idata.lbDel OP_DELETE local_ip="1.1.1.1" remote_ip="5.5.5.5" bytes="100"
collapse.idata.out OP_INSERT local_ip="1.1.1.1" remote_ip="5.5.5.5" bytes="300"

The "ins" side can't be handled just by chaining because it has to replace the opcode in the rowops. A potential different way to handle this would be to define various label types in C++ for many primitive operations, like replacing the opcode, and then build by combining them.

The final item is that the code shown in this post involved a recursive call of the streaming function. Its output from the "del" label got fed back to the function, producing more output on the "ins" label. This worked because it invoked a different code path in the streaming function than the one that produced the "del" data. If it were to form a topological loop back to the same path with the same labels, that would have been an error. The recursion will be discussed in more detail later.

Hello streaming functions, or a functional Collapse

Coming up with the good examples of the streaming function usage in Triceps is surprisingly difficult. Ironically, the flexibility of Triceps is the problem. If all you have is SQL, the streaming functions become pretty much a must. But if you can write the procedural code, most things are easier that way. For a streaming function to become beneficial, it has to be written in SQLy primitives (such as tables, joins) and not be easily reducible to the procedural code.

The most distilled example I've come up is in te implementation of Collapse. The original implementation of Collapse is described in the manual section "Collapsed updates". The flush() there goes in a loop deleting the all rows from the state tables and sending them as rowops to the output.

The deletion of all the rows can nowadays be done easier with the Table method clear(). However by itself it doesn't solve the problem of sending the output. It sends the deleted rows to the table's output label but we can't just connect the output of the state tables to the Collapse output: then it would also pick up all the intermediate changes! The data needs to be picked up from the tables output selectively, only in flush().

This makes it a good streaming function: the body of the function consists of running clear() on the state tables, and its result is whatever comes on the output labels of the tables.

Since most of the logic remains unchanged, I've implemented this new version of Collapse as a subclass that extends and replaces some of the code with its own:

package FnCollapse;

our @ISA=qw(Triceps::Collapse);

sub new # ($class, $optName => $optValue, ...)
{
    my $class = shift;
    my $self = $class->SUPER::new(@_);
    # Now add an FnReturn to the output of the dataset's tables.
    # One return is enough for both.
    # Also create the bindings for sending the data.
    foreach my $dataset (values %{$self->{datasets}}) {
        my $fret = Triceps::FnReturn->new(
            name => $self->{name} . "." . $dataset->{name} . ".retTbl",
            labels => [
                del => $dataset->{tbDelete}->getOutputLabel(),
                ins => $dataset->{tbInsert}->getOutputLabel(),
            ],
        );
        $dataset->{fret} = $fret;

        # these variables will be compiled into the binding snippets
        my $lbOut = $dataset->{lbOut};
        my $unit = $self->{unit};
        my $OP_INSERT = &Triceps::OP_INSERT;
        my $OP_DELETE = &Triceps::OP_DELETE;

        my $fbind = Triceps::FnBinding->new(
            name => $self->{name} . "." . $dataset->{name} . ".bndTbl",
            on => $fret,
            unit => $unit,
            labels => [
                del => sub {
                    if ($_[1]->isDelete()) {
                        $unit->call($lbOut->adopt($_[1]));
                    }
                },
                ins => sub {
                    if ($_[1]->isDelete()) {
                        $unit->call($lbOut->makeRowop($OP_INSERT, $_[1]->getRow()));
                    }
                },
            ],
        );
        $dataset->{fbind} = $fbind;
    }
    bless $self, $class;
    return $self;
}

# Override the base-class flush with a different implementation.
sub flush # ($self)
{
    my $self = shift;
    foreach my $dataset (values %{$self->{datasets}}) {
        # The binding takes care of producing and directing
        # the output. AutoFnBind will unbind when the block ends.
        my $ab = Triceps::AutoFnBind->new(
            $dataset->{fret} => $dataset->{fbind}
        );
        $dataset->{tbDelete}->clear();
        $dataset->{tbInsert}->clear();
    }
}

new() adds the streaming function elements in each data set. They consist of two parts: FnReturn defines the return value of a streaming function (there is no formal definition of the body or the entry point since they are quite flexible), and FnBinding defines a call of the streaming function. In this case the function is called in only one place, so one FnBinding is defined. If called from multiple places, there would be multiple FnBindings.

When a normal procedural function is called, the return address provides the connection to get the result back from it to the caller. In a streaming function, the FnBinding connects the result labels to the caller's further processing of the returned data. Unlike the procedural functions, the data is not returned in one step: run the function, compute the value, return it. Instead the return value of a streaming function is a stream of rowops. As each of them is sent to a return label, it goes through the binding and to the caller's further processing. Then the streaming function continues, producing the next rowop, and so on.

If this sounds complicated, please realize that here we're dealing with the assembly language equivalent for streaming functions. I expect that over time it will become easier.

The second source of complexity is that the arguments of a streaming function are not computed in one step either. You don't normally have a full set of rows to send to a streaming function in one go. Instead you set up the streaming call to bind the result, then you pump the rowops to the function's input, creating them in whatever way.

Getting back to the definition of a streaming function, FnReturn defines a set of labels, each with a logical name. In this case the names are "del" and "ins". The labels inside FnReturn are a special variety of dummy labels, but they are chained to some real labels that send the result of the function. The snippet

   del => $dataset->{tbDelete}->getOutputLabel(),

says "create a return label named 'del' and chain it from the tbDelete's output label". There is more details to the naming and label creation but let's not get bogged in it now.

The FnBinding defines a matching set of labels, with the same logical names. It's like a receptacle and plug: you put the plug into the receptacle and get the data flowing, you unplug it and the data flow stops. The Perl version of FnBinding provides a convenience: when it gets a code reference instead of a label, it automatically creates a label with that code for its handler.

In this case both binding labels forward the data to the Collapse's output label. Only the one for the insert table has to change the opcodes to OP_INSERT. The check

if ($_[1]->isDelete()) ...

is really redundant, to be on the safe side, since we know that when the data will be flowing, all of it will be coming from the table clearing and have the opcodes of OP_DELETE.

The actual call happens in flush(): Triceps::AutoFnBind does the "plug into receptable" thing, with automatic unplugging when leaving the block scope. If you want to do things manually, FnReturn has the methods push() and pop() but the scoped binding is safer and easier. Once the binding is done, the data is sent through the function by calling clear() on both tables. And then the block ends, AutoFnBind undoes the binding, and the life goes on.

The result produced by this version of Collapse is exactly the same as by the original version. And even when we get down to grits, it's produced with the exact same logical sequence: the rows are sent out as they are deleted from the state tables. But it's structured differently: instead of the procedural deletion and sending of the rows, the internal machinery of the tables gets invoked, and the results of that machinery are then converted to the form suitable for the collapse results and propagated to the output.

Philosophically, it could be argued: what is the body of this function? Is it just the internal logic of the table delection, that gets triggered by clear() in the caller? Or are the clear() calls also a part of the function body? But it practice it just doesn't matter, whatever.

Friday, September 28, 2012

Streaming functions introduction

Now for a moment let's take a break from the C++ API description (especially that it's a good spot, with all the types described), and talk about something new for version 1.1. I've been working on it in the background.

This new thing is the streaming functions. It's a cool and advanced concept, I've never seen it anywhere before, and for all I know I have invented it.

First let's look at the differences between the common functions and macros (or templates and such). Please turn your attention to the illustration below:

What happens during a function call? Some code (marked with the light bluish color) is happily zooming along when it decides to call a function. It prepares some arguments and jumps to the function code (reddish). The function executes, computes its result and jumps back to the point right after it has been called from. Then the original code continues from there (the slightly darker bluish color).

What happens during a macro (or template) invocation? It starts with some code zooming along in the same way, however when the macro call time comes, it prepares the arguments and then does nothing. It gets away with it because the compiler has done the work: it has placed the macro code right where it's called, so there is no need for jumps. After the macro is done, again it does nothing: the compiler has placed the next code to execute right after it, so it just continues on its way.

So far it's pretty equivalent. An interesting difference happens when the function or macro is called from more than one place. With a macro, another copy of the macro is created, inserted between its call and return points. That's why in the figure the macro is shown twice. But with the function the same function code is executed, and then returns back to the caller. That's why in the figure there are two function callers with their paths through the same function. But how does the function know, where should it jump on return? The caller tells it by pushing the return address onto the stack. When the function is done, it pops this address from the stack and jumps there.

Still, it looks all the same. A macro call is a bit more efficient, except when a large complex macro is called from many places, then it becomes more efficient as a function. However there is another difference if the function or macro holds some context (say, a static variable): each invocation of the macro will get its own context but all the function calls will share the same context. The only way to share the context with a macro is to pass some global context as its argument.

Now let's jump to the CEP world. The Sybase or StreamBase modules are essentially macros, and so are the Triceps templates. When such a macro gets instantiated, a whole new copy of it gets created with its tables/windows and streams/labels. Its input and output streams/labels get all connected in a fixed way. The limitation is that if the macro contains any tables, each instantiation gets a copy of it. Well, in Triceps you can use a table as an argument to a template In the other systems I think you still can't, so if you want to work with a common table in a module, you have to make up the query-response patterns, like the one described in the manual section "Comparative modularity".

In a query-response pattern there is some common sub-model, with a stream (in Triceps terms, a label, but here we're talking the other systems) for the queries to come in and a stream for the results to come out (both sides might have not only one but multiple streams). There are multiple inputs connected, from all the request sources, and the outputs are connected back to all the request sources. All the request sources (i.e. callers) get back the whole output of the pattern, so they need to identify, what output came from their input, and ignore the rest. They do this by adding the unique ids to their queries, and filter the results. In the end, it looks almost like a function but with much pain involved.

To make it look quite like a function, one thing is needed: the selective connection of the result streams (or, returning to the Triceps terminology, labels) to the caller. Connect the output labels, send some input, have it processed and send the result through the connection, disconnect the output labels. And what you get is a streaming function. It's very much like a common function but working on the streaming data arguments and results.

The next figure highlights the similarity and differences between the query patterns and the streaming functions.

The thick lines show where the data goes during one concrete call. The thin lines show the connections that do exist but without the data going through them at the moment (they will be used during the other calls, from these other callers). The dashed thin line shows the connection that doesn't exist at the moment. It will be created when needed (and at that time the thick arrow from the streaming to what is now the current return would disappear).

The particular beauty of the streaming functions for Triceps is that the other caller's don't even need to exist yet. They can be created and connected dynamically, do their job, call the function, use its result, and then be disposed of. The calling side in Triceps doesn't have to be streaming either: it could as well be procedural.

Tuesday, September 25, 2012

Table clearing

The table clearing has been coming up repeatedly, so I've made a convenience method for it. In Perl it's called as:

$table->clear();
$table->clear($limit);

If $limit is absent or 0, the whole table gets cleared. If it's greater than 0, no more than this number of records will be deleted. A negative limit is an error. The deletion happens in the usual order of the first leaf index, and the rowops are sent to the table's output label as usual. It's really the same thing as running a loop over all the row handles and removing them, only in C++ it's more efficient than in Perl. There is no return value, the errors cause a confess().

The C++ API has this method in the Table as well:

void clear(size_t limit = 0);

Rhref constructor directly from fields

I've been writing more C++ unit tests, and I've come up with one more convenience constructor for Rhref:

Rhref(Table *t, FdataVec &data);

It takes directly the field values, constructs a row out of them, then constructs a handle for that row.

The usage is pretty easy:

Rhref rh1(t, dv);

Saturday, September 22, 2012

AggregatorType and BasicAggregatorType, part 3

The purpose of the Aggregator object created by makeAggregator is to keep the state of the group. If you're doing an additive aggregation, it allows you to keep the previous results. If you're doing the optimization of the deletes, it allows you to keep the previous sent row.

What if your aggregator keeps no state? You still have to make an Aggregator for every group, and no, you can't just return NULL, and no, they are not reference-countable, so you have to make a new copy of it for every group (i.e. for every call of makeAggergator()). This looks decidedly sub-optimal, and eventually I'll get around to straighten it out. The good new though is that most of the real aggerators keep the state anyway, so it doesn't matter much.

More of the AggregatorType working can't be explained without going into the working of the aggregators, which requires looking at the tables first, so it all will be discussed later.

The class BasicAggregatorType (defined in type/BasicAggregatorType.h) provides for a simple case: the stateless aggregation, where the aggregation is done by a single simple C function. This C function has all the arguments of Aggregator::handle forwarded to it:

typedef void Callback(Table *table, AggregatorGadget *gadget, Index *index,
    const IndexType *parentIndexType, GroupHandle *gh, Tray *dest,
    Aggregator::AggOp aggop, Rowop::Opcode opcode, RowHandle *rh, Tray *copyTray);

If you have a function like this, you just give it to the BasicAggregatorType constructor, and you don't need to worry about the rest of it:

BasicAggregatorType(const string &name, const RowType *rt, Callback *cb);

BasicAggregatorType takes care of the rest of the infrastructure: gadgets, aggregators etc.

Thursday, September 20, 2012

AggregatorType, part 2

The other method that you can re-define or leave alone is printTo():

virtual void printTo(string &res, const string &indent = "", const string &subindent = "  ") const;

The default one prints "aggregator (<result row type>) <name>". If you want to print more information, such as the name of the aggregator class and its arguments, you can define your own.

Finally, there are methods that will produce objects that do the actual work:

virtual AggregatorGadget *makeGadget(Table *table, IndexType *intype) const;
virtual Aggregator *makeAggregator(Table *table, AggregatorGadget *gadget);

This exposes quite a bit of the inherent complexity of the aggregators. For the simpler cases you can use the subclass BasicAggregatorType that handles most of this complexity for you and just skip these "make" methods. By the way, the IndexType has a "make" method of this kind too but it was not discussed because unless you define a completely new IndexType, you don't need to worry about it: it just happen under the hood. The SortedIndexType just asks you to define a condition and takes care of the rest, like the BasicAggregatorType for aggregators.

Gadget is a concept that has not been mentioned yet. It's not present in the Perl API, only in C++. Fundamentally it's a general base class that means  "something with an output label". It doesn't have to be limited to one label, it just has one "default" output label and then the subclasses can add anything they want. A table is a gadget. Each aggregator type in a table is a gadget too. So whenever a table is created from a table type, each aggregator type in that table type is called to produce its gadget, and these gadgets are collected in the table. When you call table->getAggregatorLabel("name"), you get the output label from the appropriate gadget.

The gadget construction gets the pointers to the concrete table and concrete index type to which it will be connected. It can store these pointers in the gadget, but it must not make them into references: that would create cyclic references, because the table already references all its aggregator gadgets. There is normally no need to worry that the table will disappear: when the table is destroyed, it will never call the aggregator gadget again, and the dereferencing of the aggregator gadget will likely cause it to be destroyed too (unless you hold another reference to it, which you normally should not).

Once again, short version: one AggregatorGadget per table per aggregator type.

On the other hand, an Aggergator represents a concrete aggregation on a concrete index (not on an index type, on an index!). Whenever an index of some type is created, an aggregator of its connected type is created with it. A table with a complicated tree structure of indexes can have lots of aggregators of a single type. The difference between an index type and an index is explained in http://triceps.sourceforge.net/docs-latest/guide.html#sc_table_indextree. In short, it's one index per group.

The way it works, whenever some row in the table gets deleted or inserted, the table determines for each index type, which actual index in the tree (i.e. which group) got changed. Then for aggregation purposes, if that index has an aggegator on it, that aggregator is called to do its work on the group. It produces an output row or two (or maybe none) for that group and sends it to the aggregator gadget of the same type.

Once again, short version: one Aggregator object per group, produces the updates when asked, sends them to the single common gadget.

The pointers to the Table and Gadget are given for convenience, the Aggergator doesn't need to remember it. Whenever it will be called, it will also be given these pointers as arguments. This is done in the attempt to reduce the amount of data stored per aggregator.

Sunday, September 16, 2012

AggregatorType, part 1

The AggregatorType is a base class in which you define the concrete aggregator types, very much like the sorted index type. It has a chunk of functionality common for all the aggregator types and a bunch of virtual functions that compute the actual aggregation in the subclasses.

AggregatorType(const string &name, const RowType *rt);

The constructor provides a name and the result row type. Remember, that AggregatorType is an abstract class,  and will never be instantiated directly. Instead your subclass that performs a concrete aggregation will invoke this constructor as a part of its constructor.

As has been described in the Perl part of the manual, the aggregator type is unique in the fact that it has a name.  And it's a bit weird name: each aggregator type is kind of by itself and can be reused in multiple table types, but all the aggregator types in a table type must have different names. This is the name that is used to generate the name of the aggregator's output label in a table: '<table_name>.<aggregator_type_name>'. Fundamentally, the aggregator type itself should not have a name, it should be given a name when connected to an index in the table type. But at the time the current idea looked good enough, it's easy, convenient for error messages, and doesn't get much in the way.

The result row type might not be known at the time of the aggregator type creation. All the constructor does with it is place the value into a field, so if the right type is not known, just make up some (as long as it's not NULL!) and use it, then change later at the initialization time.

For 1.1 I've changed this code to accept a NULL result row type until the initialization is completed. If it's still NULL after initialization, this will be reported as an error.

AggregatorType(const AggregatorType &agg);
virtual AggregatorType *copy() const;

An aggregator type must provide a copy constructor that does the deep copy and the virtual  method copy() that invokes it. It's the same as with the index types: when an agggregator type gets connected into a table type, it gets actually copied, and the must always be uninitialized.

Speaking of the fields, the fields in the AggregatorType and available to the subclasses are:

    const_Autoref<RowType> rowType_; // row type of result
    Erref errors_; // errors from initialization
    string name_; // name inside the table's dotted namespace
    int pos_; // a table has a flat vector of AggregatorGadgets in it, this is the index for this one (-1 if not set)
    bool initialized_; // flag: already initialized, no future changes

rowType_ is the row type of the result. The constructor puts the argument value there but it can be changed at any time (until the initialization is completed) later.

errors_ is a place to put the errors during initialization. It comes set to NULL, so if you want to report any errors, you have to create an Errors object first.

name_ is where the name is kept. Generally, don't change it, treat it as read-only.

pos_ has to do with management of the aggregator types in a table type. Before initialization it's -1, after initialization each aggregator type (that becomes tied to its table type) will be assigned a sequential number. Again, treat it as read-only, and you probably would never need to even read it.

initialized_ shows that the initialization has already happened. Your initialization should call the initialization of the base class, which would set this flag. No matter if the initialization succeesed or failed, this flag gets set. It never gets reset in the original AggregatorType object, it gets reset only in the copies.

const string &getName() const;
const RowType *getRowType() const;
bool isInitialized() const;
virtual Erref getErrors() const;

The convenience getter functions that return the data from the fields. You can override getErrors() but there probably is no point to it.



virtual bool equals(const Type *t) const;
virtual bool match(const Type *t) const;

The equality and match comparisons are as usual. The defaults provided in the base AggregatorType check that the result row type is equal or matching (or, in version 1.1, that both result row types are NULL), and that the typeid of both are the same. So if your aggregator type has no parameters, this is good enough and you don't need to redefine these methods. If you do have parameters, you call the base class method first, if it returns false, you return false, otherwise you check the parameters. Like this:

bool MyAggregatorType::equals(const Type *t) const
{
     if (!AggregatorType::equals(t))
        return false;

    // the typeid matched, so safe to cast
    const MyAggregatorType *at = static_cast<const MyAggregatorType *>(t);
    // ... check the type-specific parameters ...
}


Friday, September 14, 2012

Map-Reduce in database terms

Taking a break from the regular Triceps documentation writing, I want to talk a little about the generalities. I kind of started thinking of this subject for the next edition of my book on parallel programming, but then it seemed to make sense for this blog too.

Map-Reduce is a fashionable word popularized by Google, but what does it really mean? Fundamentally, it's an aggregation. The data gets split into groups, and then an aggregation is performed on each group. The "map" is just a different word for "group by", and "reduce" is just an aggregation on a group. Really nothing new, same old stuff.

The interesting part starts when the processing gets parallelized. Mind you, map-reduce doesn't have to be parallelized, it can work within a single machine, but the parallel execution is really the interesting part of it. So when people say "map-reduce", they usually mean the parallel aggregation.

Each group is completely independent from the others. So the aggregation of all of them can be computed in parallel. In reality though you usually don't have as many machines (nor CPUs) as groups, so each CPU handles a subset of groups. That's your Reduce step: take the data collected into groups, give a subset of groups to each individual reducer threads, and have them send the results.

The splitting of the original rows into the groups is the Map step: take the row, compute the aggregation key ("group by" clause), and put it into the appropriate group. There may be multiple mappers working in parallel: just split the input data arbitrarily into chunks, each containing an approximately the same number of records.

Between Map and Reduce there is an intermediate unnamed step: the collation. After you've decided that this row goes into this groups, something has to collect all the rows in the group before they can be aggregated. The collation is usually done in the reducers: The mapper determines the key, takes the hash of it, modulo the number of reducer threads, and thus computes the identity of the reducer handling this key. Then it sends the record to that reducer. The reducer then groups the records by the full keys and holds them until all the mappers tell it that they are done. Then it can proceed to the aggregation ("reduction") and output of the result.

Of course, if the aggregation performed is additive, the reducer needs to keep only the running sum for each group, not the whole group contents.

Now, with this knowledge, implementing map-reduce on CEP looks straightforward, doesn't it?

Wednesday, September 12, 2012

SortedIndexType, row handle section and sequences

Now we get to an advanced feature that has been mentioned before in the description of the row handles but is not accessible from Perl. A row handle contains a chunk of memory for every index type in the table. It is called a "row handle section". At the very least this chunk of memory contains the iterator in an index of that type, which allows to navigate through the table and to delete the row handles from the table efficiently.

But an index may request more memory (the same fixed amount for each row handle) to store some row-specific information. For example, the Hashed index stores the value of the hash in its section, and uses this value for the efficient comparisons.

A sort condition may request and use memory in this section of a SortedIndexType. It is done by defining a few more virtual methods that handle the row section.

I could have showed an example of the Hashed index re-implementation through the Sorted interface, but it's kind of boring, since you could as well look directly at the source code of the HashedIndexType. Instead I want to show a different kind of index that doesn't use the data in the rows for comparison at all but keeps the rows in the order they were inserted. Like a more expensive variety of FIFO index type. It's also a bit of a preview of a future feature. It assigns a new auto-generated sequence number to each row handle, and uses that sequence number for ordering. Later you can find the row handle quickly if you know its sequence number. If a table may contain multiple copies of a row, the sequence numbers allow you to tell, which copy you are dealing with. It comes handy for such things as results of joins without a natural primary key. Of course, the usefulness of this preview is limited by the fact that there is no place for the sequence numbers in the rowops, and thus there is no way to propagate the sequence numbers in the model. That would have to be addressed before it becomes a full feature.

Now, you might ask, why not just add an extra field and put the sequence number in there? Sure, that would work too, and also solve the issue with the propagation in the rowops. However this means that as a row goes through a table, it gets copied to set the sequence number in it, which is less efficient. So ultimately keeping the sequence numbers "on the side" is more beneficial.

Now, to the implementation:

class SeqSortCondition : public SortedIndexCondition
{
protected:
    class SeqRhSection : public TreeIndexType::BasicRhSection
    {
    public:
        SeqRhSection(int64_t val) :
            seq_(val)
        { }

        int64_t seq_; // the sequence number of this row handle
    };

public:
    SeqSortCondition() :
        seq_(0)
    { }

    virtual void initialize(Erref &errors, TableType *tabtype, SortedIndexType *indtype)
    {
        SortedIndexCondition::initialize(errors, tabtype, indtype);
        seq_ = 0;
    }

    virtual bool equals(const SortedIndexCondition *sc) const
    {
        return true;
    }

    virtual bool match(const SortedIndexCondition *sc) const
    {
        return true;
    }

    virtual void printTo(string &res, const string &indent = "", const string &subindent = "  ") const
    {
        res.append("Sequenced");
    }

    virtual SortedIndexCondition *copy() const
    {
        return new SeqSortCondition(*this);
    }

    virtual size_t sizeOfRhSection() const
    {
        return sizeof(SeqRhSection);
    }

    virtual void initRowHandleSection(RowHandle *rh) const
    {
        // initialize the Seq part, the general Sorted index
        // will initialize the iterator
        SeqRhSection *rs = rh->get<SeqRhSection>(rhOffset_);
        new(rs) SeqRhSection(seq_++);
    }

    virtual void clearRowHandleSection(RowHandle *rh) const
    {
        // clear the iterator by calling its destructor
        SeqRhSection *rs = rh->get<SeqRhSection>(rhOffset_);
        rs->~SeqRhSection();
    }

    virtual void copyRowHandleSection(RowHandle *rh, const RowHandle *fromrh) const
    {
        SeqRhSection *rs = rh->get<SeqRhSection>(rhOffset_);
        SeqRhSection *fromrs = fromrh->get<SeqRhSection>(rhOffset_);

        // initialize the iterator by calling its copy constructor inside the placement,
        // the sequence number gets copied too
        new(rs) SeqRhSection(*fromrs);
    }

    // Helper method to read the sequence from the row handle,
    // can also be used by the end-user. The row handle must as usual
    // belong to this table.
    int64_t getSeq(const RowHandle *rh) const
    {
        return rh->get<SeqRhSection>(rhOffset_)->seq_;
    }
    // Helper method to set the sequence in the row handle.
    // May be used only on the rows that are not in table.
    void setSeq(const RowHandle *rh, int64_t val) const
    {
        if (rh->isInTable()) {
            throw Exception("Attempted to change the sequence on a row in table.", true);
        }
        rh->get<SeqRhSection>(rhOffset_)->seq_ = val;
    }

    virtual bool operator() (const RowHandle *rh1, const RowHandle *rh2) const
    {
        return getSeq(rh1) < getSeq(rh2);
    }

    mutable int64_t seq_; // the next sequence number to assign
};

    ...
     Autoref<IndexType> it = new SortedIndexType(new SeqSortCondition());
    ...

The nested class SeqRhSection defines the structure of this index's section. For the sort condition it must always inherit from TreeIndexType::BasicRhSection, to get the iterator part from it. Any extra fields are owned by the sort condition.

The SeqSortCondition contains the sequence number generator seq_ (not to be confused with the same-named field seq_ in SeqRhSection), that gets initialized to 0, and will be incremented from there. Since each sorted index type has its own copy of the condition, and each table type gets its own sorted index type, each of them will be counting independently. However there is a bit of a catch when multiple tables get created from the same table type: they will all share the same copy of the sort condition, and thus the same sequence number generator. In practice it should not be a problem, as long as all of the tables are in the same thread. If they are in different threads, a synchronization would be needed around the sequence generator increment. Or better, make a copy of the table type for each thread and avoid the synchronization issues.

The equals() and match() always return true because there is nothing configurable in this sort condition.

The new features start at sizeOfRhSection(). The size of each row handle in a table type is the same, and is computed by asking every index type in it at initialization time and adding up the totals (plus alignment and some fixed amount of basic data). sizeOfRhSection() does its part by telling the caller the size of SeqRhSection.

Then each row handle section must provide the ways to construct and destruct it. Naturally, to save space, a section must have no virtual table, so like for the rows, a separate method in the index type acts as its virtual destructor. And there is no such thing as a virtual constructor in C++, which gets simulater through more methods in the index type. The SortedIndexType delegates most of this work to the sort condition in it. The basic constructor is initRowHandleSection(), the copy constructor is copyRowHandleSection(), and the destructor is clearRowHandleSection().

Each of them gets the location of this index type's section in the row handle with

SeqRhSection *rs = rh->get<SeqRhSection>(rhOffset_);

The field rhOffset_ gets initialized by the SortedIndexType machinery before either of these methods gets ever called. Here rs points to the raw bytes, on which the placement constructors and the explicit destructor are called.

The methods getSeq() and setSeq() are not virtual, they are unique to this SeqSortCondition. They allow to read the sequence from a row handle or set the sequence in it. Naturally, the sequence number may be changed only when the row handle is not in the table yet, or it would mess up the indexing horribly. It's OK to throw the exceptions from setSeq() and getSeq() since they are called directly from the used code and won't confuse any Triceps code along the way.

If you want to find a row handle in the table by its sequence number, you start with creating a new row handle (which can even use an empty row). That new row handle will have a new sequence number assigned to it, but it doesn't matter, because next you call setSeq() and overwrite it with your desired number. Then you use this row handle to call find() or delete() on the table as usual. Like this:

Rhref rh1(table, r1);
sc->setSeq(rh1, desired_number);

Or to read the number, you do:

int64_t seq = sc->getSeq(rh);

Here sc is the exact initialized sort condition from the actual table type. If you use a wrong or uninitialized one, the rhOffset_ in it will likely be wrong, and will cause all kinds of memory corruption. You can get the sort condition from a table type like this:

    Autoref<SortedIndexType> ixt = dynamic_cast<SortedIndexType *>(tt->findSubIndex("primary"));
    Autoref<SeqSortCondition> sc = dynamic_cast<SeqSortCondition *>(ixt->getCondition());

You don't have to use the dynamic cast but it's safer, and since you'd normally do it once at the model setup time and then just keep using the value, there is no noticeable performance penalty for it.

The full example can be found in svn in cpp/type/test/t_xSortedIndex.cpp, and will be also included in version 1.1.

Monday, September 10, 2012

SortedIndexType example with getKey

The method getKey() is just a little optional addition to the sort condition. If you leave it unimplemented in your subclass, the basic implementation will just return NULL. I wanted to show a bit more techniques, so I've done a fairly big extension of the previous example. Now it can compare multiple fields, and the fields are specified by names. The only part missing from it being a general OrderedIndexType is the support of all the field types, not just int32.

Here it goes, method by method, with commentary.

class MultiInt32SortCondition : public SortedIndexCondition
{  
public:  
    // @param key - the key fields specification
    MultiInt32SortCondition(NameSet *key):
        key_(key)
    { }
  
The key is specified as a name set. Unlike the HashedIndexType, there is no changing the key later, it must be specified in the constructor, and must not be NULL. The same as with HashedIndexType, the original name set becomes referenced by this sort condition and all its copies. So don't change and don't even use the original condition any more after you've passed it to the sort condition.



    virtual void initialize(Erref &errors, TableType *tabtype, SortedIndexType *indtype)
    {  
        SortedIndexCondition::initialize(errors, tabtype, indtype);
        idxs_.clear();
   
        for (int i = 0; i < key_->size(); i++) {
            const string &s = (*key_)[i];
            int n = rt_->findIdx(s);
            if (n < 0) {
                errors->appendMsg(true, strprintf("No such field '%s'.", s.c_str()));
                continue;
            }
            const RowType::Field &fld = rt_->fields()[n];
            if (fld.type_->getTypeId() != Type::TT_INT32) {
                errors->appendMsg(true, strprintf("The field '%s' must be an int32.", s.c_str()));
                continue;
            }
            if (fld.arsz_ != RowType::Field::AR_SCALAR) {
                errors->appendMsg(true, strprintf("The field '%s' must not be an array.", s.c_str()));
                continue;
            }
            idxs_.push_back(n);
        }
    }

The initialization translates the field names to indexes (um, that's a confusing double usage of the word "index", here it's like "array indexes") in the row type, and checks that the fields are as expected.

    virtual bool equals(const SortedIndexCondition *sc) const
    {
        // the cast is safe to do because the caller has checked the typeid
        MultiInt32SortCondition *other = (MultiInt32SortCondition *)sc;

        // names must be the same
        if (!key_->equals(other->key_))
            return false;

        // and if initialized, the indexs must be the same too
        if (!rt_.isNull()) {
            if (idxs_.size() != other->idxs_.size())
                return false;

            for (int i = 0; i < idxs_.size(); i++) {
                if (idxs_[i] != other->idxs_[i])
                    return false;
            }
        }

        return true;
    }

    virtual bool match(const SortedIndexCondition *sc) const
    {
        MultiInt32SortCondition *other = (MultiInt32SortCondition *)sc;
        if (rt_.isNull()) {
            // not initialized, check by names
            return key_->equals(other->key_);
        } else {
            // initialized, check by indexes
            if (idxs_.size() != other->idxs_.size())
                return false;

            for (int i = 0; i < idxs_.size(); i++) {
                if (idxs_[i] != other->idxs_[i])
                    return false;
            }
            return true;
        }
    }

The equality and match checks follow the fashion of HashedIndexType in 1.1: if not initialized, they rely on field names, if initialized, they take the field indexes into the consideration (for equality both the names and indexes must be equal, for matching, only the indexes need to be equal).

    virtual void printTo(string &res, const string &indent = "", const string &subindent = "  ") const
    {
        res.append("MultiInt32Sort(");
        for (NameSet::iterator i = key_->begin(); i != key_->end(); ++i) {
            res.append(*i);
            res.append(", "); // extra comma after last field doesn't hurt
        }
        res.append(")");
    }

    virtual SortedIndexCondition *copy() const
    {
        return new MultiInt32SortCondition(*this);
    }

    virtual const_Onceref<NameSet> getKey() const
    {
        return key_;
    }

The printing and copying is nothing particularly new and fancy. getKey() simply returns back the key. This feels a bit like an anti-climax, a whole big example for this little one-liner, but again, that's not the only thing that this example shows.

     virtual bool operator() (const RowHandle *rh1, const RowHandle *rh2) const
    {
        const Row *row1 = rh1->getRow();
        const Row *row2 = rh2->getRow();

        int sz = idxs_.size();
        for (int i = 0; i < sz; i++) {
            int idx = idxs_[i];
            {
                bool v1 = rt_->isFieldNull(row1, idx);
                bool v2 = rt_->isFieldNull(row2, idx);
                if (v1 > v2) // isNull at true goes first, so the direction is opposite
                    return true;
                if (v1 < v2)
                    return false;
            }
            {
                int32_t v1 = rt_->getInt32(row1, idx);
                int32_t v2 = rt_->getInt32(row2, idx);
                if (v1 < v2)
                    return true;
                if (v1 > v2)
                    return false;
            }
        }
        return false; // falls through on equality, which is not less
    }

    vector<int> idxs_;
    Autoref<NameSet> key_;
};

The "less" comparison function now loops through all the fields in the key. It can't do the shortcuts in the int32 comparison part any more, so that has been expanded to the full condition. If the whole loop falls through without returning, it means that the key fields in both rows are equal, so it returns false.

    ...
    Autoref<IndexType> it = new SortedIndexType(new MultiInt32SortCondition(
        NameSet::make()->add("b")->add("c")
    ));
    ...

The key argument is created very much like to the hashed index.


Sunday, September 9, 2012

SortedIndexType

The SortedIndexType is defined in type/SortedIndexType.h, and provides a way to define any custom sorting criteria. That is done by defining your condition class, derived from SortedIndexCondition, and passing it to the SortedIndexType. Because of this the index type itself is simple, all the complex things are in the condition:

SortedIndexType(Onceref<SortedIndexCondition> sc);
static SortedIndexType *make(Onceref<SortedIndexCondition> sc);
SortedIndexCondition *getCondition() const;

The examples of the sort conditions can be found in type/test/t_TableType.cpp and in perl/Triceps/PerlSortCondition.*. SortedIndexCondition provides a set of virtual methods that can be re-defined in the subclass to create a custom condition. Indeed, some of them must be re-defined, since they are pure virtual.

I've also added now a simple example that will be included in 1.1, but can as well be compiled with 1.0. It's located in type/ test/t_xSortedIndex.cpp. Here is the subclass:

class Int32SortCondition : public SortedIndexCondition
{
public:
    // @param idx - index of field to use for comparison (starting from 0)
    Int32SortCondition(int idx) :
        idx_(idx)
    { }

    virtual void initialize(Erref &errors, TableType *tabtype, SortedIndexType *indtype)
    {
        SortedIndexCondition::initialize(errors, tabtype, indtype);
        if (idx_ < 0)
            errors->appendMsg(true, "The index must not be negative.");
        if (rt_->fieldCount() <= idx_)
            errors->appendMsg(true, strprintf("The row type must contain at least %d fields.", idx_+1));

        if (!errors->hasError()) { // can be checked only if index is within range
            const RowType::Field &fld = rt_->fields()[idx_];
            if (fld.type_->getTypeId() != Type::TT_INT32)
                errors->appendMsg(true, strprintf("The field at index %d must be an int32.", idx_));
            if (fld.arsz_ != RowType::Field::AR_SCALAR)
                errors->appendMsg(true, strprintf("The field at index %d must not be an array.", idx_));
        }
    }

    virtual bool equals(const SortedIndexCondition *sc) const
    {
        // the cast is safe to do because the caller has checked the typeid
        Int32SortCondition *other = (Int32SortCondition *)sc;
        return idx_ == other->idx_;
    }
    virtual bool match(const SortedIndexCondition *sc) const
    {
        return equals(sc);
    }
    virtual void printTo(string &res, const string &indent = "", const string &subindent = "  ") const
    {
        res.append(strprintf("Int32Sort(%d)", idx_));
    }
    virtual SortedIndexCondition *copy() const
    {
        return new Int32SortCondition(*this);
    }

    virtual bool operator() (const RowHandle *rh1, const RowHandle *rh2) const
    {
        const Row *row1 = rh1->getRow();
        const Row *row2 = rh2->getRow();
        {
            bool v1 = rt_->isFieldNull(row1, idx_);
            bool v2 = rt_->isFieldNull(row2, idx_);
            if (v1 > v2) // isNull at true goes first, so the direction is opposite
                return true;
            if (v1 < v2)
                return false;
        }
        {
            int32_t v1 = rt_->getInt32(row1, idx_);
            int32_t v2 = rt_->getInt32(row2, idx_);
            return (v1 < v2);
        }
    }

    int idx_;
};

...

Autoref<IndexType> it = new SortedIndexType(new Int32SortCondition(1));

It's the very basic example that defined only the absolute minimum of methods. It sorts by an int32 field, whose index (starting as usual from 0) is specified in the constructor.

The method initialize() is called at the table type initialization time. The argument errors is an already allocated Errors object to return the error indications, tabtype is the table type where the initialization is happening, and indtype is the index type that owns this condition. Also the field rt_ gets magically initialized to the table's row type reference before the sort condition initialization is called. This method is expected to do all the initialization of the internal state, check for all the errors, and return these errors if found.

equals() and match() compare two conditions for equality and match. Before they get called, the caller checks that both conditions are of the same type (i.e. have the same C++ typeid), so it's safe to cast the second condition's pointer to our type. The easiest way to define match() is to make it the same as equals(). These methods may be called on both uninitialized and initialized conditions; if not initialized then the field rt_ will be NULL.

printTo() appends the printout of this index's description to a string. For the simple single-line printouts it just appends to the result string. The multi-line prints have to handle the indenting correctly, as will be shown later.

copy() creates a deep copy of this object. In particular, the SortedIndexType constructor makes a private copy of the condition, and remembers that copy, not the original.

Finally, the operator() implements the comparison for "Less": it gets two row handles, and returns true if the first one contains a row that is "less" (i.e. goes before in the sorting order) than the second one. The reason for why it's done like this is that the SortedIndexCondition is really a Less comparator class for the STL tree that has grown a few extra methods.

This example shows how to compare a value consisting of multiple elements. Even though this sort condition sorts by only one field, it first compares separately for NULL in that field, and then for the actual value. For each element you must:

  • find the values of the element in both rows
  • compare if "<", and if so, return true
  • compare if ">", and if so, return false
  • otherwise (if they are equal), fall through to the next element
The last element doesn't have to go through the whole procedure, it can just return the result of "<". And in this case the comparison for NULL wants the NULL value go before all the non-NULL values, so the result of true must go before false, and the comparison signs are reversed. It's real important that the second comparison, normally for ">", can not be skipped (except for the last element). If you skip it, you will get a mess of the data and will spend a lot of time trying to figure out, what is going on.

Another important point is that none of the methods, especially operator(), must not throw any exceptions. Print an error message and either call abort() or return false. This might be handled better in the future versions.

That's it for the basics, the minimal subset of the methods that has to be defined.

Saturday, September 8, 2012

HashedIndexType

The HashedIndexType is defined in type/HashedIndexType.h. It also allows to specify its only argument, the selection of the key fields, in the constructor, or set it later with a chainable call:

HashedIndexType(NameSet *key = NULL);
static HashedIndexType *make(NameSet *key = NULL);
HashedIndexType *setKey(NameSet *key);

One way or the other, the key has to be set, or a missing key will be detected as an error at the initialization time. Obviously, all the conditions described in the Perl API apply: the key fields must be present in the table's row type, and so on.

The key can be get back using the parent class method IndexType::getKey(). The value returned there is a const_Onceref<NameSet>, telling you that the key NameSet must not be changed afterward.

The check for equals() and match() of HashedIndexType are the same in 1.0: the list of key fields must be the same (not the exact same NameSet object, but its contents being the same).

However this has a tricky effect: if two table types have matching row types with different field names, and the same Hashed indexes differing only in the key field names (following the difference in the row types), these table types will be considered non-matching because their hashed indexes are non-matching.

So for version 1.1 I've updated the semantics. If the index types have been initialized, they can find their table types, and from there the row types. And then compare if the keys refer to the matching fields or not, even if their names are different. If the index types are not initialized, everything works the old way. This may lead to slightly surprising effects when the two indexes match inside the initialized table types but their uninitialized copies don't. However the comparison of the uninitialized index types is probably not that usable anyway.

And of course this semantics change in 1.1 propagates to Perl as well.

FifoIndexType

The FifoIndexType works the same way as in Perl, except that it provides two ways to set the configuration values: either as the constructor arguments or as chainable methods:

FifoIndexType(size_t limit = 0, bool jumping = false, bool reverse = false);
static FifoIndexType *make(size_t limit = 0, bool jumping = false, bool reverse = false);

FifoIndexType *setLimit(size_t limit);
FifoIndexType *setJumping(bool jumping);
FifoIndexType *setReverse(bool reverse);

So the following are equivalent:

Autoref<IndexType> it1 = new FifoIndexType(100, true, true);
Autoref<IndexType> it2 = FifoIndexType::make()
    ->setLimt(100)
    ->setJumping(true)
    ->setReverse(true);

As usual, the settings can be changed only until the initialization. The settings can be read back at any time with

size_t getLimit() const;
bool isJumping() const;
bool isReverse() const;

Note that the limit is unsigned, and setting it to negative values results in it being set to very large positive values. The limit of 0 means "unlimited".

All the common methods inherited from IndexType and Type work as usual.

The equals() and match() are equivalent for the FifoIndexType, and are true when all the parameters are set to the same values.

Types, aborts and exceptions

And by the way,  in version 1.0 the attempts to modify a TableType or IndexType after initialization caused an error message and abort(). For 1.1 I've changed them to throw an Exception instead. There is no more direct abort() anywhere in the code.

Though of course unless you set the Triceps Exception to be interceptable, it will amount to the same error message and abort().

NameSet

NameSet is fundamentally a reference-counted vector of strings that allows to construct them from a sequence of calls. It's used to construct such things as field list for the index key. Previously I've said that the names in the set must be different, and that should normally be the use case, but NameSet itself doesn't check for that. The order of values in it usually matters. So its name is slightly misleading: it's not really a set, it's a vector, but the name has been applied historically. And in the future it might include the set functionality too, by adding a quick look up of index by name.

It's defined in type/NameSet.h as

class NameSet : public Starget, public vector<string> { ... }

All the vector methods are also directly accessible.

NameSet();
NameSet(const NameSet *other);
static NameSet *make();
static NameSet *make(const NameSet &other);

The constructors are also duplicated as make(), for the more convenient operator priority than with new().

NameSet *add(const string &s);

The method for the chained construction, such as:

Autoref<NameSet> ns1 = (new NameSet())->add("b")->add("c");
Autoref<NameSet> ns2 = NameSet::make()->add("b")->add("c");

It's possible to combine the copy constructor and the addition of extra fields:

Autoref<NameSet> ns3 = NameSet::make(*ns2)->add("x");

One more feature of the NameSet is the comparison for equality:

bool equals(const NameSet *other) const;

As I've been writing this, it has struck me that the constructors are not particularly conveniend, so for the version 1.1.0, I've changed the copy constructors:

NameSet(const NameSet *other);
NameSet(const vector<string> &other);
static NameSet *make(const NameSet *other);
static NameSet *make(const vector<string> &other);

Now you can construct from a plain vector too, and since NameSet is its subclass, that replaces the old copy constructor. The constructor from a pointer makes the use of Autorefs more convenient, now you can do:

Autoref<NameSet> ns3 = NameSet::make(ns2)->add("x");


without dereferencing ns2.

IndexType

Very much like the Perl API, the IndexType is an abstract class, in which you can't create the objects directly, you have to create the objects with its concrete sub-classes. It has the methods common for all the index types, and is defined in type/IndexType.h.

The index type id is defined here, as enum IndexType::IndexId. The supported index types are still IT_HASHED, IT_FIFO, IT_SORTED. There is also the semi-hidden type IT_ROOT: you can't create it directly but every table type creates one implicitly, as the root of its index tree. There is also IT_LAST defined past the last actual type, so if you ever need to iterate through the types, you can do it as

for (int i = 0; i < IndexType::IT_LAST; i++) { ... }

 The conversion between the index type id and name can be done with the methods:

 static const char *indexIdString(int enval, const char *def = "???");
 static int stringIndexId(const char *str);

As usual with the contant-and-name conversions, the numeric id is invalid, the string def is returned, by default "???". If the string name is unknown, -1 is returned.

IndexId getIndexId() const;

Returns the id of an index type object. It can't be changed, it gets hardcoded in the subclass constructor.

IndexType *addSubIndex(const string &name, Onceref<IndexType> index);

Adds a sub-index, in exactly the same way as adding an index type to the TableType. It also adds a copy of the argument, not the argument itself. It's also designed for chaining, like:

Autoref<TableType> tt = (new TableType(rt1)
    )->addSubIndex("primary", new HashedIndexType(
        (new NameSet())->add("b")->add("c"))
    )->addSubIndex("limit", (new FifoIndexType())
        ->setLimit(2) // will policy-delete 2 rows
    ); 

The getting of the key back is done with:

const_Onceref<NameSet> getKey() const;

It will work with any kind of index, but will return a NULL if the index doesn't support the key. The NameSet is an ordered list of unique names, and will be described in detail soon.

IndexType *setAggregator(Onceref<AggregatorType> agg);
const AggregatorType *getAggregator() const;

Sets or gets an aggregator type for this index type. As usual, any setting can be done only until the index type is initialized.

IndexType *copy() const;

Creates an un-initialized deep copy (with all the sub-index and aggregator types also copied) of this index. This method is used by addSubIndex(). By the way, the usual copy constructor could theoretically be used on the index types but usually doesn't make a whole lot of a sense because the sub-types and such will end up shared by reference.

bool isLeaf() const;

Returns true if this index type has no sub-indexes. Of course, if this type is not initialized yet, more sub-types can be added to it to make it non-leaf later.

IndexType *findSubIndex(const string &name) const;
IndexType *findSubIndexById(IndexId it) const;

Find the sub-index by name or id, works in the same way as for TableType. A special feature is that it can be applied on a NULL object reference, like this:

Autoref<IndexType> it; // NULL by default
Autoref<IndexType> itsub = it->findSubIndex("xxx"); // doesn't crash, returns NULL

The idea here is to allow the safe chaining of findSubIndex() for the look-ups of the nested types:

Autoref<IndexType> it = tt->findSubIndex("level1")->findSubIndex("level2");
if (it.isNull()) {
    // not found
}

If any of the elements in the path are missing, the end result will be NULL, conveniently. But it won't tell you, which one was missing, inconveniently.

const IndexTypeVec &getSubIndexes() const;

Returns back the whole set of sub-indexes.

IndexType *getFirstLeaf() const;

Returns the first leaf index type (if a leaf itself, will return itself).

bool isInitialized() const;

Checks whether the index type has been initialized.

TableType *getTabtype() const;

Returns the table type, to which this index type is tied. The tying-together happens at the initialization time, so for an initialized index type this method will return NULL.

There are great many more methods on the IndexType, that are used to maintain the index trees, but you don't need to look at them unless you are interested in the inner workings of the Triceps tables.