The Data Server
The data server hosts experimental data. It has a collection of data sets, each of which contains a “source” and one or more “sinks”. The “source” provides data to a data set and the “sinks” collect data from the data set.
To start the data server, simply run:
$ nspyre-dataserv
The data server can be controlled through its CLI (command-line interface).
Type help
for a list of available commands:
$ nspyre-dataserv
2022-11-21 11:42:25.647 [INFO] (dataserv_cli.py:151) starting data server...
dataserv > 2022-11-21 11:42:25.654 [INFO] (base_events.py:1539) <Server sockets=(<asyncio.TransportSocket fd=7, family=2, type=1, proto=6, laddr=('127.0.0.1', 30000)>,)> is serving
2022-11-21 11:42:25.654 [INFO] (dataserv.py:497) Serving on ('127.0.0.1', 30000)
dataserv > help
Documented commands (type help <topic>):
========================================
debug help list quit
dataserv > help list
...
Data can be pushed to the data server like so:
# source.py
from nspyre import DataSource
with DataSource('MyDataSet') as data_source:
data_source.push({'some_data1': [1, 2, 3], 'some_data2': [4, 5, 6]})
The pushed data can be any pickleable
Python object. While it is not strictly required, the argument to push()
should be a Python dictonary so that data can be accessed from the data sink
like so:
# sink.py
from nspyre import DataSink
with DataSink('MyDataSet') as data_sink:
if data_sink.pop():
print(data_sink.some_data1)
print(data_sink.some_data2)
The data server is designed for use with streaming data. Each time push()
is called, it creates a packet that is sent to the data server. It does not
guarantee the delivery of any given packet sent to the server with push()
and retrieved with pop()
. This is by design and can make your software
more robust if used correctly: as long as newer packets strictly contain more
data than older packets, your application should run smoothly.
This is an example of BAD data server useage:
from nspyre import DataSource
with DataSource('MyDataSet') as data_source:
for i in range(100):
data = get_data()
data_source.push({'mydata': data})
The problem is that each push()
statement sends different data to the data
server. If any packets are dropped, a connected sink could lose some
potentially important data! A better implementation is:
from nspyre import DataSource
data = []
with DataSource('MyDataSet') as data_source:
for i in range(100):
data.append(get_data())
data_source.push({'mydata': data})
In this example, every packet sent to the data server with push()
contains
some new data, but also the data taken in previous push()
calls. This
guarantees that any dropped packets will be of no consequence to any connected
sinks.
This may seem like an unintuitive design, but imagine the following situation:
the data source program calling push()
is sending data faster
than the data sink program calling pop()
can process the data. An
alternative data server implementation might block push()
calls in the
source if previous data has not yet been processed by the sink. This could
introduce timing variation and uncertainty in the source, which is very
undesirable if a scientific experiment is the source. Instead, if a sink object
is not calling pop()
fast enough to keep up with the source, the data
server will start throwing away older packets (just for that specific sink).
This design creates a decoupling between the code running an experiment and
any user interaction with viewing the resulting data.
One disadvantage of this design is that on every call to push()
the DataSource is
redundantly sending data to the data server that has already been sent. To solve this
problem, there are special “Streaming” data types that can be used for storing data.
The special streaming objects solve the redundant data problem by automatically
calculating how they’ve been modified since the last call to push()
. They store
these modifications in “diffs” and then send only the diffs to the data server. For
example, this code will have poor performance because the pushed data is growing larger
with each call to push()
:
import numpy as np
from nspyre import DataSource
with DataSource('my_dataset') as src:
data = []
for i in range(10_000):
print(i)
data.append(np.random.random(5))
# this will take longer and longer with each call
src.push(data)
This code will have much higher performance:
import numpy as np
from nspyre import DataSource
from nspyre import StreamingList
with DataSource('my_dataset') as src:
data = StreamingList([])
for i in range(10_000):
print(i)
data.append(np.random.random(5))
# this will take the same amount of time for each call
src.push(data)
See the API reference on streaming objects for more details.