STREAM

INTRO

The STREAM module provides chain accumulation, traversal and persistence.

  • writer/1 — creates writer cursor to db.
  • reader/1 — creates reader cursor to db.
  • save/1 — stores cursor to db.
  • load_reader/1 — loads reader cursor.
  • load_writer/1 — loads writer cursor.
  • top/1 — returns top of the chain.
  • bot/1 — returns bottom of the chain.
  • up/1 — up from bottom (default).
  • down/1 — down from top.
  • next/1 — moves reader next.
  • prev/1 — moves reader prev.
  • take/1 — takes N elements from reader.
  • drop/1 — skips N elements from reader.
  • add/1 — adds element to list.

You can grab kvs_stream and use it in your applications without importing synrc/kvx dependency, as this module is self-containing. The possible applications are: public and private feeds, FIFO queues, unread messages, chat applications, blockchain, etc.

SESSION

> kvs:join(). [[{table,id_seq,false,set,[thing,id],[thing],disc_copies}], [{table,writer,false,set,[id,count,cache,args,first],[],disc_copies}, {table,emails,false,set,[id,email,next,prev],[],disc_copies}, {table,reader,false,set,[id,pos,cache,args,feed,dir],[],disc_copies}]] > kvs:check(). ok > kvs:all(reader). [{reader,1,0,[],[],{emails,1},0}, {reader,2,0,[],[],{emails,1},0}] > rr(kvs). [emails,id_seq,iter,kvs,reader,schema,table,writer] > kvs:save(kvs:reader({emails,1})). #reader{id = 3,pos = 0, cache = {emails,1}, args = [], feed = {emails,1}, dir = 0} > kvs:take(kvs:bot((kvs:load_reader(3))#reader{args=-1})). #reader{id = 3,pos = 5, cache = {emails,5}, args = [#emails{id = 5,email = [],next = 4,prev = []}, #emails{id = 4,email = 5,next = 3,prev = []}, #emails{id = 3,email = 4,next = 2,prev = []}, #emails{id = 2,email = 3,next = 1,prev = []}, #emails{id = 1,email = 2,next = [],prev = []}], feed = {emails,1}, dir = 0}

WRITER

Writer cursor represents append list chain with some cached values. E.g., chain size, first element of the chain, cached value of previous written message and field for passing arguments for stream functions, like add.

-record(writer, { id = [] :: term(), count = 0 :: integer(), cache = [] :: [] | tuple(), args = [] :: term(), first = [] :: [] | tuple() } ).

For adding data to database you need first create writer cursor, set the args field with record from metainfo and call save function.

writer(term()) -> #writer{}.

Creates writer cursor.

add(#writer{}) -> #writer{}.

Adds element to list declared by writer cursor.

load_writer(#writer{}) -> #writer{}.

Loads writer cursor.

save(#writer{}) -> #writer{}.

Flushes writer cursor to database.

READER

Reader Cursor
-record(reader, { id = [] :: integer(), pos = 0 :: [] | integer(), cache = [] :: [] | integer(), args = [] :: term(), feed = [] :: term(), dir = 0 :: 0 | 1 } ).

reader(integer()) -> #reader{}.

Creates reader cursor.

load_reader(#reader{}) -> #reader{}.

Loads reader cursor from database.

save(#reader{}) -> #reader{}.

Flushes cursor to database.

top(#reader{}) -> #reader{}.

Moves cursor to top of the list.

bot(#reader{}) -> #reader{}.

Moves cursor to bottom of the list.

ITER

KVS Stream Iterator
-record(iter, { id = [] :: [] | integer(), next = [] :: [] | integer(), prev = [] :: [] | integer() } ).

next(#reader{}) -> #reader{}.

Moves cursor to next. Consume data down from top. Reutrn error if lists is empty, otherwise next element or last.

prev(#reader{}) -> #reader{}.

Moves cursor to prev. Consume data up from bottom. Reutrn error if lists is empty, otherwise next element or last.

drop(#reader{}) -> #reader{}.

Drops N elements starting from reader.

take(#reader{}) -> #reader{}.

Trying to consume N records from stream using its current value and direction. Returns consumed data. Usually you seek to some position and then consume some data.

This module may refer to: kvs.