The Data Server

The data server hosts experimental data. It has a collection of data sets, each of which contains a “source” and one or more “sinks”. The “source” provides data to a data set and the “sinks” collect data from the data set.

To start the data server, simply run:

$ nspyre-dataserv

The data server can be controlled through its CLI (command-line interface). Type help for a list of available commands:

$ nspyre-dataserv
2022-11-21 11:42:25.647 [INFO] (dataserv_cli.py:151) starting data server...
dataserv > 2022-11-21 11:42:25.654 [INFO] (base_events.py:1539) <Server sockets=(<asyncio.TransportSocket fd=7, family=2, type=1, proto=6, laddr=('127.0.0.1', 30000)>,)> is serving
2022-11-21 11:42:25.654 [INFO] (dataserv.py:497) Serving on ('127.0.0.1', 30000)

dataserv > help

Documented commands (type help <topic>):
========================================
debug  help  list  quit

dataserv > help list

...

Data can be pushed to the data server like so:

# source.py

from nspyre import DataSource

with DataSource('MyDataSet') as data_source:
    data_source.push({'some_data1': [1, 2, 3], 'some_data2': [4, 5, 6]})

The pushed data can be any pickleable Python object. While it is not strictly required, the argument to push() should be a Python dictonary so that data can be accessed from the data sink like so:

# sink.py

from nspyre import DataSink

with DataSink('MyDataSet') as data_sink:
    if data_sink.pop():
        print(data_sink.some_data1)
        print(data_sink.some_data2)

The data server is designed for use with streaming data. Each time push() is called, it creates a packet that is sent to the data server. It does not guarantee the delivery of any given packet sent to the server with push() and retrieved with pop(). This is by design and can make your software more robust if used correctly: as long as newer packets strictly contain more data than older packets, your application should run smoothly.

This is an example of BAD data server useage:

from nspyre import DataSource

with DataSource('MyDataSet') as data_source:
    for i in range(100):
        data = get_data()
        data_source.push({'mydata': data})

The problem is that each push() statement sends different data to the data server. If any packets are dropped, a connected sink could lose some potentially important data! A better implementation is:

from nspyre import DataSource

data = []
with DataSource('MyDataSet') as data_source:
    for i in range(100):
        data.append(get_data())
        data_source.push({'mydata': data})

In this example, every packet sent to the data server with push() contains some new data, but also the data taken in previous push() calls. This guarantees that any dropped packets will be of no consequence to any connected sinks.

This may seem like an unintuitive design, but imagine the following situation: the data source program calling push() is sending data faster than the data sink program calling pop() can process the data. An alternative data server implementation might block push() calls in the source if previous data has not yet been processed by the sink. This could introduce timing variation and uncertainty in the source, which is very undesirable if a scientific experiment is the source. Instead, if a sink object is not calling pop() fast enough to keep up with the source, the data server will start throwing away older packets (just for that specific sink). This design creates a decoupling between the code running an experiment and any user interaction with viewing the resulting data.

One disadvantage of this design is that on every call to push() the DataSource is redundantly sending data to the data server that has already been sent. To solve this problem, there are special “Streaming” data types that can be used for storing data. The special streaming objects solve the redundant data problem by automatically calculating how they’ve been modified since the last call to push(). They store these modifications in “diffs” and then send only the diffs to the data server. For example, this code will have poor performance because the pushed data is growing larger with each call to push():

import numpy as np
from nspyre import DataSource

with DataSource('my_dataset') as src:
    data = []
    for i in range(10_000):
        print(i)
        data.append(np.random.random(5))
        # this will take longer and longer with each call
        src.push(data)

This code will have much higher performance:

import numpy as np
from nspyre import DataSource
from nspyre import StreamingList

with DataSource('my_dataset') as src:
    data = StreamingList([])
    for i in range(10_000):
        print(i)
        data.append(np.random.random(5))
        # this will take the same amount of time for each call
        src.push(data)

See the API reference on streaming objects for more details.