Difference between revisions of "Programming Reference:SignalSharingDemo Signal Processing"

From BCI2000 Wiki
Jump to: navigation, search
(Declaration of internal variables)
(Filter Initialize() function)
 
(2 intermediate revisions by the same user not shown)
Line 21: Line 21:
  
 
==Filter Code==
 
==Filter Code==
 +
The code example uses a pointer to an internal private <tt>struct</tt> to hide implementation details from the outer header file of the filter class (PIMPL idiom).
 
===Declaration of internal variables===
 
===Declaration of internal variables===
<syntaxhiglight lang="cpp">
+
<syntaxhighlight lang="cpp">
 
#include "Thread.h"
 
#include "Thread.h"
 
#include "WaitableEvent.h"
 
#include "WaitableEvent.h"
Line 105: Line 106:
 
       p->mSharedSignal.ShareAcrossModules();
 
       p->mSharedSignal.ShareAcrossModules();
 
       bciwarn << "Locally connected to " << addr << ", using shared memory";
 
       bciwarn << "Locally connected to " << addr << ", using shared memory";
     } // otherwise, full data will be transmitted over network (slower)
+
     }  
    else
+
    else // otherwise, full data will be transmitted over network (slower)
 
     {
 
     {
 
       bciwarn << "Remotely connected to " << addr << ", transmitting data through network";
 
       bciwarn << "Remotely connected to " << addr << ", transmitting data through network";

Latest revision as of 15:14, 8 February 2019

Location

src/contrib/SignalProcessing/SignalSharingDemo

Synopsis

The SignalSharingDemo signal processing module demonstrates how to share signal data with an external application, using shared memory.

Inheritance

The SignalSharingDemoFilter signal processing filter derives from GenericFilter.

Function

The SignalSharingDemoFilter shares its input signal through a GenericSignal object which has been linked to a shared memory block using GenericSignal::ShareAcrossModules(). A dedicated thread waits for signal updates, and sends signal data out to a separate application waiting on a TCP/IP connection.

When the client application is running on a separate machine, full signal data are sent over the network. When the client is running on the same machine, only a reference to a shared memory block is sent. On the application side, unserializing the signal will transparently bind it to the shared memory block if available.

The client application visualizes signal data by plotting normalized signals on a circle.

Filter Code

The code example uses a pointer to an internal private struct to hide implementation details from the outer header file of the filter class (PIMPL idiom).

Declaration of internal variables

#include "Thread.h"
#include "WaitableEvent.h"
#include "Sockets.h"
#include "Streambuf.h"

struct SignalSharingDemoFilter::Private
{
  // A socket for a connection to the client application.
  ClientTCPSocket mSocket;
  // A signal to be shared.
  GenericSignal mSharedSignal;
  // An event to notify the sender thread when new signal data is available.
  WaitableEvent mSignalAvailable;

  // An object to wrap a call to SenderThreadFunc().
  MemberCall<void(Private*)> mSenderThreadCall;
  // The sender thread.
  Thread mSenderThread;

  Private();
  void SenderThreadFunc();
};

Initialization of internal variables

SignalSharingDemoFilter::Private::Private()
: mSenderThreadCall(&Private::SenderThreadFunc, this),
  mSenderThread(&mSenderThreadCall, "Signal Sharing Demo Sender"),
  mSignalAvailable(true)
{
  mSocket.SetTCPNodelay(true);
  mSocket.SetFlushAfterWrite(true);
}

Sender thread

The sender thread waits for the "signal available" event to be triggered, and then calls Serialize() on the shared signal in order to notify the client.

void
SignalSharingDemoFilter::Private::SenderThreadFunc()
{
  if(mSocket.Connected())
  {
    UnbufferedIO buf;
    buf.SetOutput(&mSocket.Output());
    std::ostream clientConnection(&buf);
    GenericSignal().Serialize(clientConnection); // empty signal to indicate beginning of data
    while(mSignalAvailable.Wait()) // will return false when Thread::Terminate() has been called
    {
      if(!mSharedSignal.Serialize(clientConnection)) // will transmit memory reference, or signal, depending on whether ShareAcrossModules() has been called
      {
        bciwarn << "Could not send signal, giving up";
        mSenderThread.Terminate();
      }
    }
    GenericSignal().Serialize(clientConnection); // empty signal to indicate end of data
  }
}

Filter Initialize() function

The filter connects to the local or remote network address in the SignalSharingDemoClientAddress parameter and allocates signal data according to input signal properties. If the connection is through a local socket, the filter switches the signal to shared memory by calling GenericSignal::ShareAcrossModules(). Otherwise, it proceeds with a standard GenericSignal, which is transmitted in full rather than as a shared memory reference.

void
SignalSharingDemoFilter::Initialize(const SignalProperties& Input, const SignalProperties& Output)
{
  std::string addr = Parameter("SignalSharingDemoClientAddress");
  if(!addr.empty())
  { // connect to the client side
    p->mSocket.Open(addr);
    // initialize shared signal from input signal properties
    p->mSharedSignal = GenericSignal(Input);
    if(!p->mSocket.Connected())
    {
      bciwarn << "Cannot connect to " << addr;
    }
    else if(p->mSocket.Connected() == Socket::local) // only use shared memory when connected locally
    {
      p->mSharedSignal.ShareAcrossModules();
      bciwarn << "Locally connected to " << addr << ", using shared memory";
    } 
    else // otherwise, full data will be transmitted over network (slower)
    {
      bciwarn << "Remotely connected to " << addr << ", transmitting data through network";
    }
  }
}

Filter StartRun() and StopRun() functions

StartRun() and StopRun() start resp. stop the sender thread.

void
SignalSharingDemoFilter::StartRun()
{
  p->mSenderThread.Start();
}

void
SignalSharingDemoFilter::StopRun()
{
  p->mSenderThread.Terminate();
}

Filter Process() function

The Process() function calls GenericSignal::AssignValues() to assign the shared signal's entries without affecting its shared memory representation. Then, it sets the "signal available" event to the signaled state.

void
SignalSharingDemoFilter::Process(const GenericSignal& Input, GenericSignal& Output)
{
  Output = Input;
  p->mSharedSignal.AssignValues(Input);
  p->mSignalAvailable.Set();
}

Client application code

Declaration of internal variables

#include "SignalSharingDemoWidget.h"

#include "Sockets.h"
#include "StringUtils.h"
#include "Streambuf.h"
#include "Thread.h"
#include "GenericSignal.h"
#include "Synchronized.h"
#include "Runnable.h"

#include <QPaintEvent>
#include <QPainter>

struct SignalSharingDemoWidget::Private
{
  SignalSharingDemoWidget* mpSelf;
  std::vector<QColor> mSignalColors;

  ServerTCPSocket mListeningSocket;
  Synchronized<bool> mConnected;
  SynchronizedObject<GenericSignal> mpSignal;

  Thread mThread;
  void ThreadFunc();
  MemberCall<void(Private*)> mThreadCall;
  void Invalidate();
  Private();
};

Initialization of internal variables

SignalSharingDemoWidget::Private::Private()
: mThreadCall(&Private::ThreadFunc, this),
  mThread(&mThreadCall, "SignalSharingDemoWidget listening/receiving thread")
{
  mSignalColors.resize(8);
  for(int i = 0; i < mSignalColors.size(); ++i)
    mSignalColors[i].setHsvF(i*1.0/mSignalColors.size(), 1, 0.9);
}

Receiving thread function

void
SignalSharingDemoWidget::Private::ThreadFunc()
{
  while(!mThread.Terminating())
  { // wait for a connection
    while(mListeningSocket.Input().Wait())
    { // accept connection
      ClientTCPSocket clientSocket;
      if(mListeningSocket.WaitForAccept(clientSocket, 0))
      {
        mConnected = true;
        Invalidate(); // calls the widget's update() method
        UnbufferedIO buf;
        buf.SetInput(&clientSocket.Input());
        std::istream stream(&buf);
        while(stream && clientSocket.Input().Wait()) // will be interrupted by Thread::Terminate()
        {
          mpSignal.Mutable()->Unserialize(stream); // get current signal content
          Invalidate(); // call update() on the widget
        }
        *mpSignal.Mutable() = GenericSignal();
        mConnected = false;
        Invalidate();
      }
    }
  }
}

The widget's paintEvent()

void
SignalSharingDemoWidget::paintEvent(QPaintEvent* ev)
{
  ev->accept();
  WithLocked(pSignal = p->mpSignal.Const()) // lock the signal while reading from it
  {
    if(pSignal->Empty())
    {
      QPainter painter(this);
      painter.fillRect(ev->rect(), Qt::gray);
      painter.setPen(Qt::white);
      painter.drawText(geometry(), Qt::AlignCenter, 
        p->mConnected ? "Waiting for signal ..." : "Waiting for connection ...");
    }
    else
    {
       // draw some visualization into the widget
       ...
    }
  }
}

Parameters

SignalSharingDemoClientAddress

IP address and port number of the client application.

See also

Programming Reference:GenericSignal Class