// Copyright 2005 Rutger E.W. van Beusekom.
// Distributed under the Boost Software License, Version 1.0. (See
// accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

#ifndef __STREAM_HPP__
#define __STREAM_HPP__

#include "bounded_buffer.hpp"

class Active
{
  bool m_Running;
  boost::mutex m_Mutex;
  boost::condition m_Condition;
protected:
  Active()
  : m_Running(false)
  , m_Mutex()
  , m_Condition()
  {}
  virtual ~Active()
  {
    Run();
  }
  void Await()
  {
    boost::mutex::scoped_lock lock(m_Mutex);
    while(!m_Running)
    {
      m_Condition.wait(lock);
    }
  }
  void Start()
  {
    Run();
  }
private:
  void Run()
  {
    boost::mutex::scoped_lock lock(m_Mutex);
    if(!m_Running)
    {
      m_Running = true;
      m_Condition.notify_one();
    }
  }
public:
  void Resume()
  {
    Run();
  }
  void Pause()
  {
    boost::mutex::scoped_lock lock(m_Mutex);
    m_Running = false;
  }  
};

template <typename T>
class Source: public Active
{
public:
  typedef bounded_buffer <T> Pipe;
private:
  Pipe m_Out;
  boost::thread m_Thread;
public:
  Source(size_t size)
  : m_Out(size)
  , m_Thread(boost::ref(*this))
  {}
  virtual ~Source(){}
  void operator()()
  {
    T temp = T();
    while (m_Out.available())
    {
      Await();
      if (!Process(temp))
      {
        return;
      }
      m_Out.write(temp);
    }
  }
  Pipe & Out()
  {
    return m_Out;
  }
protected:
  void Stop()
  {
    m_Thread.join();
  }
  virtual bool Process(T & t) = 0;
};

template <typename T>
class Sink: public Active
{
public:
  typedef bounded_buffer <T> Pipe;
private:
  Pipe & m_In;
  boost::thread m_Thread;
public:
  Sink(Pipe & out)
  : m_In(out)
  , m_Thread(boost::ref(*this))
  {}
  virtual ~Sink(){}
  void operator()()
  {
    T temp = T();
    while (m_In.available())
    {
      Await();
      if (m_In.read(temp) && !Process(temp))
      {
        return;
      }
    }
  }
protected:
  void Stop()
  {
    m_In.clear();
    m_Thread.join();
  }
  virtual bool Process(T & t) = 0;
};

template <typename T>
class Filter: public Active
{
public:
  typedef bounded_buffer <T> Pipe;
private:
  Pipe & m_In;
  Pipe m_Out;
  boost::thread m_Thread;
public:
  Filter(Pipe & out)
  : m_In(out)
  , m_Out(m_In.capacity())
  , m_Thread(boost::ref(*this))
  {}
  virtual ~Filter(){}
  void operator()()
  {
    T temp = T();
    while (m_In.available() && m_Out.available())
    {
      Await();
      if (m_In.read(temp))
      {
        Process(temp);
        m_Out.write(temp);
      }
    }
  }
  Pipe & Out()
  {
    return m_Out;
  }
protected:
  void Stop()
  {
    m_In.clear();
    m_Thread.join();
  }
  virtual bool Process(T & t) = 0;
};

#endif
