// Copyright 2004 Rutger E.W. van Beusekom.
// Distributed under the Boost Software License, Version 1.0.(See
// accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

#ifndef BOUNDED_BUFFER_HPP
#define BOUNDED_BUFFER_HPP

#include "circular_buffer.hpp"

#include <boost/thread.hpp>

template <typename T, typename buffer = circular_buffer<T> >
class bounded_buffer
{
  buffer                   _buffer;	  
  bool                     _available;    
  mutable boost::mutex     _mutex;		   
  mutable boost::condition _read_condition;	   
  mutable boost::condition _write_condition;	   
  mutable boost::condition _clear_condition;     
  std::size_t              _read_waiters;	   
  std::size_t              _write_waiters;	   
public:
  bounded_buffer():
    _buffer(),
    _available(true),
    _mutex(),
    _read_condition(),
    _write_condition(),
    _clear_condition(),
    _read_waiters(0),
    _write_waiters(0)
  {
  }
  bounded_buffer(size_t size):
    _buffer(size),
    _available(true),
    _mutex(),
    _read_condition(),
    _write_condition(),
    _clear_condition(),
    _read_waiters(0),
    _write_waiters(0)
  {
  }
  ~bounded_buffer()
  {
    clear();
  }
  size_t size() const
  {
    boost::mutex::scoped_lock lock(_mutex);
    return _buffer.size();
  }
  size_t capacity() const
  {
    boost::mutex::scoped_lock lock(_mutex);
    return _buffer.capacity();
  }
  void reserve(size_t size)
  {
    boost::mutex::scoped_lock lock(_mutex);
    _buffer.reserve(size);
  }
  bool empty() const
  {
    boost::mutex::scoped_lock lock(_mutex);
    return _buffer.empty();
  }
  bool full() const
  {
    boost::mutex::scoped_lock lock(_mutex);
    return _buffer.full();
  }
  void clear()
  {
    boost::mutex::scoped_lock lock(_mutex);
    _available = false;
    
    _read_condition.notify_all();
    _write_condition.notify_all();
    
    while(_read_waiters + _write_waiters)
    {
      _clear_condition.wait(lock);
    }
  }
  bool available() const
  {
    boost::mutex::scoped_lock lock(_mutex);
    return _available;
  }
  bool read(T& t)
  {
    boost::mutex::scoped_lock lock(_mutex);
    ++_read_waiters;
    while(_available && _buffer.empty())
    {
      _read_condition.wait(lock);
    }
    if(!_buffer.empty())
    {
      t = _buffer.front();
      _buffer.pop_front();
      _write_condition.notify_one();
    }
    if(!_available)
    {
      _clear_condition.notify_one();
    }
    --_read_waiters;
    return _available;
  }
  bool write(const T& t)
  {
    boost::mutex::scoped_lock lock(_mutex);
    ++_write_waiters;
    while(_available && _buffer.full())
    {
      _write_condition.wait(lock);
    }
    if(!_buffer.full())
    {
      _buffer.push_back(t);
      if(_read_waiters > 0)
      _read_condition.notify_one();
    }
    if(!_available)
    {
      _clear_condition.notify_one();
    }
    --_write_waiters;
    return _available;
  }
private:
  bounded_buffer(const bounded_buffer&);
  bounded_buffer& operator = (const bounded_buffer&);
};

#endif
