eventkit
Release 1.0.2.
Event
- class eventkit.event.Event(name='', _with_error_done_events=True)[source]
Enable event passing between loosely coupled components. The event emits values to connected listeners and has a selection of operators to create general data flow pipelines.
- Parameters:
name (
str
) – Name to use for this event.
- __await__()[source]
Asynchronously await the next emit of an event:
async def coro(): args = await event ...
If the event does an empty
emit()
, then the value ofargs
is set toutil.NO_VALUE
.wait()
and__await__()
are each other’s inverse.
- async __aiter__(skip_to_last=False, tuples=False)
Synonym for
aiter()
with default arguments:async def coro(): async for args in event: ...
aiterate()
and__aiter__()
are each other’s inverse.
-
error_event:
Optional
[Event
] Sub event that emits errors from this event as
emit(source, exception)
.
- done()[source]
True
if event has ended with no more emits coming,False
otherwise.- Return type:
bool
- connect(listener, error=None, done=None, keep_ref=False)[source]
Connect a listener to this event. If the listener is added multiple times then it is invoked just as many times on emit.
The
+=
operator can be used as a synonym for this method:import eventkit as ev def f(a, b): print(a * b) def g(a, b): print(a / b) event = ev.Event() event += f event += g event.emit(10, 5)
- Parameters:
listener – The callback to invoke on emit of this event. It gets the
*args
from an emit as arguments. If the listener is a coroutine function, or a function that returns an awaitable, the awaitable is run in the asyncio event loop.error – The callback to invoke on error of this event. It gets (this event, exception) as two arguments.
done – The callback to invoke on ending of this event. It gets this event as single argument.
keep_ref (
bool
) –True
: A strong reference to the callable is keptFalse
: If the callable allows weak refs and it is garbage collected, then it is automatically disconnected from this event.
- Return type:
- disconnect(listener, error=None, done=None)[source]
Disconnect a listener from this event.
The
-=
operator can be used as a synonym for this method.- Parameters:
listener – The callback to disconnect. The callback is removed at most once. It is valid if the callback is already not connected.
error – The error callback to disconnect.
done – The done callback to disconnect.
- disconnect_obj(obj)[source]
Disconnect all listeners on the given object. (also the error and done listeners).
- Parameters:
obj – The target object that is to be completely removed from this event.
- emit(*args)[source]
Emit a new value to all connected listeners.
- Parameters:
args – Argument values to emit to listeners.
- emit_threadsafe(*args)[source]
Threadsafe version of
emit()
that doesn’t invoke the listeners directly but via the event loop of the main thread.
- run()[source]
Start the asyncio event loop, run this event to completion and return all values as a list:
import eventkit as ev ev.Timer(0.25, count=10).run() -> [0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 1.75, 2.0, 2.25, 2.5]
- Return type:
List
Note
When running inside a Jupyter notebook this will give an error that the asyncio event loop is already running. This can be remedied by applying nest_asyncio or by using the top-level
await
statement of Jupyter:await event.list()
- pipe(*targets)[source]
Form several events into a pipe:
import eventkit as ev e1 = ev.Sequence('abcde') e2 = ev.Enumerate().map(lambda i, c: (i, i + ord(c))) e3 = ev.Star().pluck(1).map(chr) e1.pipe(e2, e3) # or: ev.Event.Pipe(e1, e2, e3) -> ['a', 'c', 'e', 'g', 'i']
- Parameters:
targets (
Event
) – One or more Events that have no source yet, orEvent
constructors that needs no arguments.
- fork(*targets)[source]
Fork this event into one or more target events. Square brackets can be used as a synonym:
import eventkit as ev ev.Range(2, 5)[ev.Min, ev.Max, ev.Sum].zip() -> [(2, 2, 2), (2, 3, 5), (2, 4, 9)]
The events in the fork can be combined by one of the join methods of
Fork
.- Parameters:
targets (
Event
) – One or more events that have no source yet, orEvent
constructors that need no arguments.- Return type:
Fork
- async aiter(skip_to_last=False, tuples=False)[source]
Create an asynchronous iterator that yields the emitted values from this event:
async def coro(): async for args in event.aiter(): ...
__aiter__()
is a synonym foraiter()
with default arguments,- Parameters:
skip_to_last (
bool
) –True
: Backlogged source values are skipped over to yield only the latest value. Can be used as a slipper clutch between a source that produces too fast and the handling that can’t keep up.False
: All events are yielded.
tuples (
bool
) –True
: Always yield arguments as a tuple.False
: Unpack single argument tuples.
- static init(obj, event_names)[source]
Convenience function for initializing multiple events as members of the given object.
- Parameters:
event_names (
Iterable
) – Names to use for the created events.
- static create(obj)[source]
Create an event from a async iterator, awaitable, or event constructor without arguments.
- Parameters:
obj – The source object. If it’s already an event then it is passed as-is.
- static wait(future)[source]
Create a new event that emits the value of the awaitable when it becomes available and then set this event done.
wait()
and__await__()
are each other’s inverse.- Parameters:
future (
Awaitable
) – Future to wait on.- Return type:
Wait
- static aiterate(ait)[source]
Create a new event that emits the yielded values from the asynchronous iterator.
The asynchronous iterator serves as a source for both the time and value of emits.
aiterate()
and__aiter__()
are each other’s inverse.- Parameters:
ait (
AsyncIterable
) –The asynchronous source iterator. It must
await
at least once; If necessary use:await asyncio.sleep(0)
- Return type:
Aiterate
- static sequence(values, interval=0, times=None)[source]
Create a new event that emits the given values. Supply at most one
interval
ortimes
.- Parameters:
values (
Iterable
) – The source values.interval (
float
) – Time interval in seconds between values.times (
Optional
[Iterable
[float
]]) – Relative times for individual values, in seconds since start of event. The sequence should matchvalues
.
- Return type:
Sequence
- static repeat(value=<NoValue>, count=1, interval=0, times=None)[source]
Create a new event that repeats
value
a number ofcount
times.- Parameters:
value – The value to emit.
count – Number of times to emit.
interval (
float
) – Time interval in seconds between values.times (
Optional
[Iterable
[float
]]) – Relative times for individual values, in seconds since start of event. The sequence should matchvalues
.
- Return type:
Repeat
- static range(*args, interval=0, times=None)[source]
Create a new event that emits the values from a range.
- Parameters:
args – Same as for built-in
range
.interval (
float
) – Time interval in seconds between values.times (
Optional
[Iterable
[float
]]) – Relative times for individual values, in seconds since start of event. The sequence should match the range.
- Return type:
Range
- static timerange(start=0, end=None, step=1)[source]
Create a new event that emits the datetime value, at that datetime, from a range of datetimes.
- Parameters:
start –
Start time, can be specified as:
datetime.datetime
.datetime.time
: Today is used as date.int
orfloat
: Number of seconds relative to now. Values will be quantized to the given step.
end –
End time, can be specified as:
datetime.datetime
.datetime.time
: Today is used as date.None
: No end limit.
step – Number of seconds, or
datetime.timedelta
, to space between values.
- Return type:
Timerange
- static timer(interval, count=None)[source]
Create a new timer event that emits at regularly paced intervals the number of seconds since starting it.
- Parameters:
interval (
float
) – Time interval in seconds between emits.count (
Optional
[int
]) – Number of times to emit, orNone
for no limit.
- Return type:
Timer
- static marble(s, interval=0, times=None)[source]
Create a new event that emits the values from a Rx-type marble string.
- Parameters:
s (
str
) – The string with characters that are emitted.interval (
float
) – Time interval in seconds between values.times (
Optional
[Iterable
[float
]]) – Relative times for individual values, in seconds since start of event. The sequence should match the marble string.
- Return type:
Marble
- filter(predicate=<class 'bool'>)[source]
For every source value, apply predicate and re-emit when True.
- Parameters:
predicate – The function to test every source value with. The default is to test the general truthiness with
bool()
.- Return type:
Filter
- skip(count=1)[source]
Drop the first
count
values from source and follow the source after that.- Parameters:
count (
int
) – Number of source values to drop.- Return type:
Skip
- take(count=1)[source]
Re-emit first
count
values from the source and then end.- Parameters:
count (
int
) – Number of source values to re-emit.- Return type:
Take
- takewhile(predicate=<class 'bool'>)[source]
Re-emit values from the source until the predicate becomes False and then end.
- Parameters:
predicate – The function to test every source value with. The default is to test the general truthiness with
bool()
.- Return type:
TakeWhile
- dropwhile(predicate=<function Event.<lambda>>)[source]
Drop source values until the predicate becomes False and after that re-emit everything from the source.
- Parameters:
predicate – The function to test every source value with. The default is to test the inverted general truthiness.
- Return type:
DropWhile
- takeuntil(notifier)[source]
Re-emit values from the source until the
notifier
emits and then end. If the notifier ends without any emit then keep passing source values.- Parameters:
notifier (
Event
) – Event that signals to end this event.- Return type:
TakeUntil
- constant(constant)[source]
On emit of the source emit a constant value:
emit(value) -> emit(constant)
- Parameters:
constant – The constant value to emit.
- Return type:
Constant
- iterate(it)[source]
On emit of the source, emit the next value from an iterator:
emit(a, b, ...) -> emit(next(it))
The time of events follows the source and the values follow the iterator.
- Parameters:
it – The source iterator to use for generating values. When the iterator is exhausted the event is set to be done.
- Return type:
Iterate
- count(start=0, step=1)[source]
Count and emit the number of source emits:
emit(a, b, ...) -> emit(count)
- Parameters:
start – Start count.
step – Add count by this amount for every new source value.
- Return type:
Count
- enumerate(start=0, step=1)[source]
Add a count to every source value:
emit(a, b, ...) -> emit(count, a, b, ...)
- Parameters:
start – Start count.
step – Increase by this amount for every new source value.
- Return type:
Enumerate
- timestamp()[source]
Add a timestamp (from time.time()) to every source value:
emit(a, b, ...) -> emit(timestamp, a, b, ...)
The timestamp is the float number in seconds since the midnight Jan 1, 1970 epoch.
- Return type:
Timestamp
- partial(*left_args)[source]
Pad source values with extra arguments on the left:
emit(a, b, ...) -> emit(*left_args, a, b, ...)
- Parameters:
left_args – Arguments to inject.
- Return type:
Partial
- partial_right(*right_args)[source]
Pad source values with extra arguments on the right:
emit(a, b, ...) -> emit(a, b, ..., *right_args)
- Parameters:
right_args – Arguments to inject.
- Return type:
PartialRight
- star()[source]
Unpack a source tuple into positional arguments, similar to the star operator:
emit((a, b, ...)) -> emit(a, b, ...)
star()
andpack()
are each other’s inverse.- Return type:
Star
- pack()[source]
Pack positional arguments into a tuple:
emit(a, b, ...) -> emit((a, b, ...))
star()
andpack()
are each other’s inverse.- Return type:
Pack
- pluck(*selections)[source]
Extract arguments or nested properties from the source values.
Select which argument positions to keep:
emit(a, b, c, d).pluck(1, 2) -> emit(b, c)
Re-order arguments:
emit(a, b, c).pluck(2, 1, 0) -> emit(c, b, a)
To do an empty emit leave
selections
empty:emit(a, b).pluck() -> emit()
Select nested properties from positional arguments:
emit(person, account).pluck( '1.number', '0.address.street') -> emit(account.number, person.address.street)
If no value can be extracted then
NO_VALUE
is emitted in its place.- Parameters:
selections (
Union
[int
,str
]) – The values to extract.- Return type:
Pluck
- map(func, timeout=None, ordered=True, task_limit=None)[source]
Apply a sync or async function to source values using positional arguments:
emit(a, b, ...) -> emit(func(a, b, ...))
or if
func
returns an awaitable then it will be awaited:emit(a, b, ...) -> emit(await func(a, b, ...))
In case of timeout or other failure,
NO_VALUE
is emitted.- Parameters:
func – The function or coroutine constructor to apply.
timeout – Timeout in seconds since coroutine is started
ordered –
True
: The order of emitted results preserves the order of the source values.False
: Results are in order of completion.
task_limit – Max number of concurrent tasks, or None for no limit.
- Return type:
Map
timeout
,ordered
andtask_limit
apply to async functions only.
- emap(constr, joiner)[source]
Higher-order event map that creates a new
Event
instance for every source value:emit(a, b, ...) -> new Event constr(a, b, ...)
- Parameters:
constr – Constructor function for creating a new event. Apart from returning an
Event
, the constructor may also return an awaitable or an asynchronous iterator, in which case anEvent
will be created.joiner (
AddableJoinOp
) – Join operator to combine the emits of nested events.
- Return type:
Emap
- mergemap(constr)[source]
emap()
that usesmerge()
to combine the nested events:marbles = [ 'A B C D', '_1 2 3 4', '__K L M N'] ev.Range(3).mergemap(lambda v: ev.Marble(marbles[v])) -> ['A', '1', 'K', 'B', '2', 'L', '3', 'C', 'M', '4', 'D', 'N']
- Return type:
Mergemap
- concatmap(constr)[source]
emap()
that usesconcat()
to combine the nested events:marbles = [ 'A B C D', '_ 1 2 3 4', '__ K L M N'] ev.Range(3).concatmap(lambda v: ev.Marble(marbles[v])) -> ['A', 'B', '1', '2', '3', 'K', 'L', 'M', 'N']
- Return type:
Concatmap
- chainmap(constr)[source]
emap()
that useschain()
to combine the nested events:marbles = [ 'A B C D ', '_ 1 2 3 4', '__ K L M N'] ev.Range(3).chainmap(lambda v: ev.Marble(marbles[v])) -> ['A', 'B', 'C', 'D', '1', '2', '3', '4', 'K', 'L', 'M', 'N']
- Return type:
Chainmap
- switchmap(constr)[source]
emap()
that usesswitch()
to combine the nested events:marbles = [ 'A B C D ', '_ K L M N', '__ 1 2 3 4' ] ev.Range(3).switchmap(lambda v: Event.marble(marbles[v])) -> ['A', 'B', '1', '2', 'K', 'L', 'M', 'N'])
- Return type:
Switchmap
- reduce(func, initializer=<NoValue>)[source]
Apply a two-argument reduction function to the previous reduction result and the current value and emit the new reduction result.
- Parameters:
func –
Reduction function:
emit(args) -> emit(func(prev_args, args))
initializer –
First argument of first reduction:
first_result = func(initializer, first_value)
If no initializer is given, then the first result is emitted on the second source emit.
- Return type:
Reduce
- product(start=1)[source]
Total product.
- Parameters:
start – Initial start value.
- Return type:
Product
- ema(n=None, weight=None)[source]
Exponential moving average.
- Parameters:
n (
Optional
[int
]) – Number of periods.weight (
Optional
[float
]) – Weight of new value.
- Return type:
Ema
Give either
n
orweight
. The relation isweight = 2 / (n + 1)
.
- previous(count=1)[source]
For every source value, emit the
count
-th previous value:source: -ab---c--d-e- output: --a---b--c-d-
Starts emitting on the
count + 1
-th source emit.- Parameters:
count (
int
) – Number of periods to go back.- Return type:
Previous
- pairwise()[source]
Emit
(previous_source_value, current_source_value)
tuples. Starts emitting on the second source emit:source: -a----b------c--------d----- output: ------(a,b)--(b,c)----(c,d)-
- Return type:
Pairwise
- changes()[source]
Emit only source values that have changed from the previous value.
- Return type:
Changes
- unique(key=None)[source]
Emit only unique values, dropping values that have already been emitted.
- Parameters:
key – The callable `’key(value)` is used to group values. The default of
None
groups values by equality. The resulting group must be hashable.- Return type:
Unique
- deque(count=0)[source]
Emit a
deque
with the lastcount
values from the source (or less in the lead-in phase).- Parameters:
count – Number of last periods to use, or 0 to use all.
- Return type:
Deque
- array(count=0)[source]
Emit a numpy array with the last
count
values from the source (or less in the lead-in phase).- Parameters:
count – Number of last periods to use, or 0 to use all.
- Return type:
Array
- chunk(size)[source]
Chunk values up in lists of equal size. The last chunk can be shorter.
- Parameters:
size (
int
) – Chunk size.- Return type:
Chunk
- chunkwith(timer, emit_empty=True)[source]
Emit a chunked list of values when the timer emits.
- Parameters:
timer (
Event
) – Event to use for timing the chunks.emit_empty (
bool
) – Emit empty list if no values present since last emit.
- Return type:
ChunkWith
- chain(*sources)[source]
Re-emit from a source until it ends, then move to the next source, Repeat until all sources have ended, ending the chain. Emits from pending sources are queued up:
source 1: -a----b---c| source 2: --2-----3--4| source 3: ------------x---------y--| output: -a----b---c2--3--4x---y--|
- Parameters:
sources (
Event
) – Source events.- Return type:
Chain
- merge(*sources)[source]
Re-emit everything from the source events:
source 1: -a----b-------------c------d-| source 2: ------1-----2------3--4-| source 3: --------x----y--| output: -a----b--1--x--2-y--c-3--4-d-|
- Parameters:
sources – Source events.
- Return type:
Merge
- concat(*sources)[source]
Re-emit everything from one source until it ends and then move to the next source:
source 1: -a----b-----| source 2: --1-----2-----3----4--| source 3: -----------x--y--| output: -a----b---------3----4----x--y--|
- Parameters:
sources – Source events.
- Return type:
Concat
- switch(*sources)[source]
Re-emit everything from one source and move to another source as soon as that other source starts to emit:
source 1: -a----b---c-----d---| source 2: -----------x---y-| source 3: ---------1----2----3-----| output: -a----b--1----2--x---y---|
- Parameters:
sources – Source events.
- Return type:
Switch
- zip(*sources)[source]
Zip sources together: The i-th emit has the i-th value from each source as positional arguments. Only emits when each source has emtted its i-th value and ends when any source ends:
source 1: -a----b------------------c------d---e--f---| source 2: --------1-------2-------3---------4-----| output emit: --------(a,1)---(b,2)----(c,3)----(d,4)-|
- Parameters:
sources – Source events.
- Return type:
Zip
- ziplatest(*sources, partial=True)[source]
Emit zipped values with the latest value from each of the source events. Emits every time when a source emits:
source 1: -a-------------------b-------c---| source 2: ---------------1--------------------2------| output emit: (a,NoValue)---(a,1)-(b,1)---(c,1)--(c,2)--|
- Parameters:
sources – Source events.
partial (
bool
) –True: Use
NoValue
for sources that have not emitted yet.False: Wait until all sources have emitted.
- Return type:
Ziplatest
- delay(delay)[source]
Time-shift all source events by a delay:
source: -abc-d-e---f---| output: ---abc-d-e---f---|
This applies to the source errors and the source done event as well.
- Parameters:
delay – Time delay of all events (in seconds).
- Return type:
Delay
- timeout(timeout)[source]
When the source doesn’t emit for longer than the timeout period, do an empty emit and set this event as done.
- Parameters:
timeout – Timeout value.
- Return type:
Timeout
- throttle(maximum, interval, cost_func=None)[source]
Limit number of emits per time without dropping values. Values that come in too fast are queued and re-emitted as soon as allowed by the limits.
A nested
status_event
emitsTrue
when throttling starts andFalse
when throttling ends.The limit can be dynamically changed with
set_limit
.- Parameters:
maximum – Maximum payload per interval.
interval – Time interval (in seconds).
cost_func – The sum of
cost_func(value)
for every source value inside theinterval
that is to remain under themaximum
. The default is to count every source value as 1.
- Return type:
Throttle
- debounce(delay, on_first=False)[source]
Filter out values from the source that happen in rapid succession.
- Parameters:
delay – Maximal time difference (in seconds) between successive values before debouncing kicks in.
on_first (
bool
) –True: First value is send immediately and following values in the rapid succession are dropped:
source: -abcd----efg- output: -a-------e---
False: Last value of a rapid succession is send after the delay and the values before that are dropped:
source: -abcd----efg-- output: ----d------g-
- Return type:
Debounce