Basic interface to riak_pipe.
Clients of riak_pipe are most likely to be interested in
exec/2
, wait_first_fitting/1
,
receive_result/1
, and collect_results/1
.
% define the pipeline PipelineSpec = [#fitting_spec{name="passer" module=riak_pipe_w_pass}], % start things up {ok, Pipe} = riak_pipe:exec(PipelineSpec, []), % send in some work riak_pipe:queue_work(Pipe, "work item 1"), riak_pipe:queue_work(Pipe, "work item 2"), riak_pipe:queue_work(Pipe, "work item 3"), riak_pipe:eoi(Pipe), % wait for results (alternatively use receive_result/1 repeatedly) {ok, Results} = riak_pipe:collect_results(Pipe).
Many examples are included in the source code, and exported
as functions named example
*.
result/3
, eoi/1
, and log/3
are used by workers and fittings to deliver messages to
the sink.
exec_option() = {sink, fitting()} | {trace, all | list() | set() | ordsets:ordset()} | {log, sink | sasl}
exec_opts() = [exec_option()]
fitting() = #fitting{pid = undefined | pid(), ref = undefined | reference(), chashfun = undefined | riak_pipe_vnode:chashfun(), nval = undefined | riak_pipe_vnode:nval()}
fitting_spec() = #fitting_spec{name = undefined | term(), module = undefined | atom(), arg = undefined | term(), chashfun = riak_pipe_vnode:chashfun(), nval = riak_pipe_vnode:nval(), q_limit = pos_integer()}
pipe() = #pipe{builder = undefined | pid(), fittings = undefined | [{Name::term(), #fitting{pid = undefined | pid(), ref = undefined | reference(), chashfun = undefined | riak_pipe_vnode:chashfun(), nval = undefined | riak_pipe_vnode:nval()}}], sink = undefined | #fitting{pid = undefined | pid(), ref = undefined | reference(), chashfun = undefined | riak_pipe_vnode:chashfun(), nval = undefined | riak_pipe_vnode:nval()}}
stat() = {atom(), term()}
active_pipelines/1 | Get all active pipelines hosted on Node . |
collect_results/1 | Receive all results and log messages, up to end-of-inputs
(unless receive_result times out before the eoi
arrives). |
collect_results/2 | |
destroy/1 | Brutally kill a pipeline. |
eoi/1 | Send an end-of-inputs message to the head of the pipe. |
example/0 | An example run of a simple pipe. |
example_receive/1 | An example of receiving data from a pipeline. |
example_reduce/0 | Another example pipeline use. |
example_send/1 | An example of sending data into a pipeline. |
example_start/0 | An example of starting a simple pipe. |
example_tick/3 | |
example_tick/4 | |
example_transform/0 | Another example pipeline use. |
exec/2 | Setup a pipeline. |
generic_transform/4 | |
queue_work/2 | Equivalent to queue_work(Pipe, Input, infinity). |
queue_work/3 | Send inputs to the head of the pipe. |
receive_result/1 | Pull the next pipeline result out of the sink's mailbox. |
receive_result/2 | |
status/1 | Retrieve details about the status of the workers in this pipeline. |
active_pipelines(Node::node() | global) -> [#pipe{builder = undefined | pid(), fittings = undefined | [{Name::term(), #fitting{pid = undefined | pid(), ref = undefined | reference(), chashfun = undefined | riak_pipe_vnode:chashfun(), nval = undefined | riak_pipe_vnode:nval()}}], sink = undefined | #fitting{pid = undefined | pid(), ref = undefined | reference(), chashfun = undefined | riak_pipe_vnode:chashfun(), nval = undefined | riak_pipe_vnode:nval()}}] | error | [{node(), [#pipe{builder = undefined | pid(), fittings = undefined | [{Name::term(), #fitting{pid = undefined | pid(), ref = undefined | reference(), chashfun = undefined | riak_pipe_vnode:chashfun(), nval = undefined | riak_pipe_vnode:nval()}}], sink = undefined | #fitting{pid = undefined | pid(), ref = undefined | reference(), chashfun = undefined | riak_pipe_vnode:chashfun(), nval = undefined | riak_pipe_vnode:nval()}}] | error}]
Get all active pipelines hosted on Node
. Pass the atom
global
instead of a node name to get all pipelines hosted on
all nodes.
#pipe{}
records.
When global
is used, the return value is a list of {Node,
[#pipe{}]}
tuples.
collect_results(Pipe::pipe()) -> {eoi | timeout, Results::[{From::term(), Result::term()}], Logs::[{From::term(), Message::term()}]}
Receive all results and log messages, up to end-of-inputs
(unless receive_result
times out before the eoi
arrives).
If end-of-inputs was the last message received, the first
element of the returned tuple will be the atom eoi
. If the
receive timed out before receiving end-of-inputs, the first
element of the returned tuple will be the atom timeout
.
The second element will be a list of all result messages received, while the third element will be a list of all log messages received.
This function assumes that it is called in the sink's process. Passing the #fitting{} structure is only needed for reference to weed out misdirected messages from forgotten pipelines. A static inter-message timeout of five seconds is hard-coded (TODO).collect_results(Pipe::pipe(), Timeout::integer() | infinity) -> {eoi | timeout, Results::[{From::term(), Result::term()}], Logs::[{From::term(), Message::term()}]}
destroy(Pipe::pipe()) -> ok
Brutally kill a pipeline. Use this when it is necessary to
stop all parts of a pipeline as quickly as possible, instead
of waiting for an eoi
to propagate through.
eoi(Pipe::pipe()) -> ok
Send an end-of-inputs message to the head of the pipe.
example() -> {eoi | timeout, list(), list()}
An example run of a simple pipe. Uses example_start/0
,
example_send/0
, and example_receive/0
to send
nonsense through a pipe.
{eoi, [{empty_pass, "hello"}], _LogMessages}.
example_receive(Pipe::pipe()) -> {eoi | timeout, list(), list()}
An example of receiving data from a pipeline. Reads all results sent to the given sink.
example_reduce() -> any()
Another example pipeline use. This one sets up a simple
"reduce" fitting, which expects tuples of the form
{Key::term(), Value::number()}
, and produces results of the
same form, where the output value is the sum of all of the
input values for a given key.
{eoi, [{"sum reduce", {a, [55]}}, {"sum reduce", {b, [155]}}], []}.
example_send(Pipe::pipe()) -> ok
An example of sending data into a pipeline. Queues the string
"hello"
for the fitting provided, then signals end-of-inputs
to that fitting.
example_start() -> {ok, Pipe::pipe()}
An example of starting a simple pipe. Starts a pipe with one "pass" fitting. Sink is pointed at the current process. Logging is pointed at the sink. All tracing is enabled.
example_tick(TickLen, NumTicks, ChainLen) -> any()
example_tick(TickLen, BatchSize, NumTicks, ChainLen) -> any()
example_transform() -> {eoi | timeout, list(), list()}
Another example pipeline use. This one sets up a simple "transform" fitting, which expects lists of numbers as input, and produces the sum of that list as output.
If everything behaves correctly, this function should return{eoi, [{"sum transform", 55}], []}.
exec(Spec::[fitting_spec()], Options::exec_opts()) -> {ok, Pipe::pipe()}
Setup a pipeline. This function starts up fitting/monitoring processes according the fitting specs given, returning a handle to the pipeline. Inputs may then be sent to vnodes, tagged with that head fitting.
The pipeline is specified as an ordered list of#fitting_spec{}
records. Each record has the fields:
name
module
riak_pipe_vnode_worker
behavior.
arg
chashfun
A function of arity 1. The consistent-hashing function used
to determine which vnode should receive an input. This
function will be evaluated as Fun(Input)
. The result of
that evaluation should be a binary, 160 bits in length, which
will be used to choose the working vnode from a
riak_core_ring
. (Very similar to the chash_keyfun
bucket
property used in riak_kv
.)
fun chash:key_of/1
, which will distribute
inputs according to the SHA-1 hash of the input.
nval
Either a positive integer, or a function of arity 1 that
returns a positive integer. This field determines the maximum
number of vnodes that might be asked to handle the input. If
a worker is unable to process an input on a given vnode, it
can ask to have the input sent to a different vnode. Up to
nval
vnodes will be tried in this manner.
nval
is an integer, that static number is used for all
inputs. If nval
is a function, the function is evaluated as
Fun(Input)
(much like chashfun
), and is expected to return
a positive integer.
Options
list are:
{sink, Sink}
sink
option is provided, one will be created, such
that the calling process will receive all messages sent to the
sink (all output, logging, and trace messages). If specified,
Sink
should be a #fitting{}
record, filled with the pid of
the process prepared to receive these messages.
{trace, TraceMatches}
trace
option is provided, tracing will be disabled for
this pipeline. If specified, TraceMatches
should be either
the atom all
, in which case all trace messages will be
delivered, or a list of trace tags to match, in which case
only messages with matching tags will be delivered.
{log, LogTarget}
log
option is provided, logging will be disabled for
this pipeline. If specified, LogTarget
should be one of the
following atoms:
sink
sasl
error_logger
to the SASL log
lager
Options
. The
value of Options
is provided to all fitting modules during
initialization, so it can be a good vector for global
configuration of general fittings.
generic_transform(MsgFun, DriverFun, ExecOpts, NumFittings) -> any()
queue_work(Pipe, Input) -> any()
Equivalent to queue_work(Pipe, Input, infinity).
queue_work(Pipe::pipe(), Input::term(), Timeout::riak_pipe_vnode:qtimeout()) -> ok | {error, riak_pipe_vnode:qerror()}
Send inputs to the head of the pipe.
Note thatTimeout
options are only infinity
and noblock
,
not generic durations yet.
receive_result(Pipe::pipe()) -> {result, {From::term(), Result::term()}} | {log, {From::term(), Message::term()}} | eoi | timeout
Pull the next pipeline result out of the sink's mailbox.
The From
element of the result
and log
messages will
be the name of the fitting that generated them, as specified
in the #fitting_spec{}
record used to start the pipeline.
This function assumes that it is called in the sink's process.
Passing the #fitting{} structure is only needed for reference
to weed out misdirected messages from forgotten pipelines.
A static timeout of five seconds is hard-coded (TODO).
receive_result(Pipe::pipe(), Timeout::integer() | infinity) -> {result, {From::term(), Result::term()}} | {log, {From::term(), Message::term()}} | eoi | timeout
Retrieve details about the status of the workers in this
pipeline. The form of the return is a list with one entry per
fitting in the pipe. Each fitting's entry is a 2-tuple of the
form {FittingName, WorkerDetails}
, where FittingName
is
the name that was given to the fitting in the call to riak_pipe:exec/2
, and WorkerDetails
is a list with one
entry per worker. Each worker entry is a proplist, of the
form returned by riak_pipe_vnode:status/1
, with two
properties added: node
, the node on which the worker is
running, and partition
, the index of the vnode that the
worker belongs to.
Generated by EDoc, Oct 20 2012, 18:22:55.