OZ++ Class: SocketStream
/******************************************************************************
 *
 * Copyright (c) 2014 Antillia.com TOSHIYUKI ARAI. ALL RIGHTS RESERVED.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions, and the following disclaimer.
 *  
 * 2. The name of the author may not be used to endorse or promote products
 *    derived from this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 
 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 
 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 
 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 *
 *  SocketStream.h
 *
 *****************************************************************************/

#pragma once

#include <oz++/Socket.h>
#include <oz++/SockAddrInet.h>
#include <oz++/SockAddrInet6.h>
#include <oz++/CharString.h>
#include <oz++/CharStringBuffer.h>

/**
 * SocketStream class
 */
namespace OZ {

class SocketStream :public Socket {
private:
  char*  buffer;

  static const int BUFFER_SIZE = 1024*8;

  
public:
  /**
   * Constructor
   */  
  SocketStream(ADDRESS_FAMILY family=INET) 
    :Socket(family, STREAM, 0),
    buffer(new char[BUFFER_SIZE])
  {
  }
  

public:
  /**
   * Constructor
   */
  SocketStream(int soc) 
    :Socket(),
  buffer(new char[BUFFER_SIZE])
  {
    setSocket(soc);
  }

public:
  /**
   * Constructor
   */
  ~SocketStream() 
  {
    if (this->buffer) {
      delete [] this->buffer; 
      this->buffer = NULL;
    }
  }


public:
  SocketStream* accept(SockAddrInet& address) 
  {
    int fd = Socket::accept(address);
    return new SocketStream(fd);
  }


public:
  SocketStream* accept(SockAddrInet6& address) 
  {
    int fd = Socket::accept(address);
    return new SocketStream(fd);
  }

public:
  /**
   *
   */
  int create(ADDRESS_FAMILY family=INET, int protocol=0) 
  {
    return Socket::create(family, STREAM, protocol);
  }


public:
  int getPeerName(sockaddr_in* addr) 
  {
    memset(addr, 0, sizeof(sockaddr_in));
    int soc = getSocket();
    socklen_t addrlen = sizeof(sockaddr_in);
    return ::getpeername(soc, (sockaddr*)addr, &addrlen);
  }


public:
  //
  bool isReadable(int timeout=100)
  {
    bool rc = false;

    int fd = getSocket();
    
    fd_set  readFD, writeFD;
  
    timeval tv;
    memset(&tv, 0, sizeof(tv));
    tv.tv_usec = timeout;
    
    FD_ZERO(&readFD);
    FD_ZERO(&writeFD);
  
    FD_SET(fd, &readFD);
    FD_SET(fd, &writeFD);

    if (::select(FD_SETSIZE, &readFD, &writeFD, 
        NULL,  &tv) ) {
  
      if (FD_ISSET(fd, &readFD)) {
        //OK. Readable
        rc = true;
      }
    }
    return rc;
  }

public:
  //
  bool isWritable(int timeout=100)
  {
    bool rc = false;
  
    int fd = getSocket();
    
    fd_set  readFD, writeFD;
    timeval tv;
    memset(&tv, 0, sizeof(tv));
    tv.tv_usec = timeout;

    FD_ZERO(&readFD);
    FD_ZERO(&writeFD);

    FD_SET(fd, &readFD);
    FD_SET(fd, &writeFD);

    if (::select(FD_SETSIZE, &readFD, &writeFD, 
        NULL,  &tv) ) {
      if (FD_ISSET(fd, &writeFD)) {
        //OK. Writable
        rc = true;
      }
    }
    return rc;
  }

public:
  int printf(const char* format,...)
  {
    va_list pos;
    va_start(pos, format);
    vsnprintf(buffer, BUFFER_SIZE, format, pos);
    va_end(pos);

    return sendAll(buffer);
  }


public:
  int sendAll(const char* buff, int len, int flag=0, long timeout=30)
  {
    
    int sentBytes = 0;
    
    int soc = getSocket();

    const char* ptr = buff;
    int  orglen = len;
    time_t startTime = time(NULL);
    
    const int RETRY_COUNT_MAX = 10;
    int   retryCount = 0;

    while (len >0) {
      
      time_t currentTime = time(NULL);

      if ((currentTime - startTime) > timeout) {
        if (sentBytes < orglen) { 
          // timeout;
          break;
        }
        if (sentBytes == orglen) {
          break;
        }
      }

      //Check if fd is writable
      if (isWritable() == false) {
        continue;
      }

      int size = ::send(soc, ptr, len, flag);
      
      if (size <0 && (errno== EWOULDBLOCK  || errno ==EAGAIN) ) {
        //Socket error has happened, and if it were caused by blocking,
        //retry to send the buff data.
        if (retryCount < RETRY_COUNT_MAX) {
          sleep(1);
          retryCount++;
          continue;
        } else {
          break;
        }
      }

      if (size < 0 && len <=0) {
        break;
      }
  
      if (size >0) {
        sentBytes += size;
        ptr += size;
        len -= size;
      }

      retryCount = 0;
    }
    return sentBytes;
  }

public:
  int sendAll(const char* string) 
  {
    int rc = 0;
    if (string) {
      rc = sendAll(string, strlen(string), 0);
    }
    return rc;
  }

public:    
  int sendAll(CharString& string) 
  {
    const char* text = (const char*)string;
    int rc = 0;
    if (text) {
      rc = sendAll(text, strlen(text), 0);
    }
    return rc;
  }
  
public:
  int sendAll(CharStringBuffer& buffer) 
  {
    const char* text = buffer.getBuffer();
    int rc = 0;
    if (text) {
      rc = sendAll(text, strlen(text), 0);
    }
    return rc;
  }

};

}