Sunday, May 29, 2016

A C++ Pool class to reuse connections or pointers


 #include <list> 
 #include <string> 
 #include <iostream> 
 #include <pthread.h> 
 #include <thread> 
 #include <mutex> 

/**
 * A pool for caching connections for HTTP requests and MySQL queries.
 * This class has been adapted from http://www.codeproject.com/Articles/8108/Template-based-Generic-Pool-using-C
 */
template <class T>  
class Pool
{
private:
    typedef std::shared_ptr
<T> ObjHolder_t;///< typedef for ObjHolder_t which is actually an std::shared_ptr
    typedef std::list
<ObjHolder_t> ObjList_t;///< typedef for ObjList_t

    unsigned m_size;///< pool size : default 0
    unsigned m_waitTimeSec;///< wait time: How long calling function can wait to find object
    bool m_isTempObjAllowed;///< if pool is full, is temp object allowed
    ObjList_t m_reservedList;///< reserved object list
    ObjList_t m_freeList;///< free object list
    std::mutex m_dataMutex;///< mutex for Pool data
    std::shared_ptr m_nullptr;///< a convenient nullptr std::shared_ptr
    long m_checkAbandonedIntervalSec;///< how often we should check the abandoned objects because some borrowers may fail to checkin the objects they borrowed (default: 3600 seconds)
    long m_lastCheckTimestampForAbandonedObjs;///< the last timestamp when we checked for abandoned objects
    std::function()> m_constructFunc;
    std::function&)> m_checkHealthFunc;
    std::function&)> m_reactiveFunc;
    std::function&)> m_destructFunc;

    /**
     * Initialize this instance with default member variables.
     */
    void initialize()
    {
        std::lock_guard scopelock(m_dataMutex);
        for (auto &it: m_freeList)
        {
            m_destructFunc(it);
            it.reset();
        }
        for (auto &it: m_reservedList)
        {
            m_destructFunc(it);
            it.reset();
        }
        m_reservedList.clear();
        m_freeList.clear();
        m_size = 0;
        m_isTempObjAllowed = true;
        m_waitTimeSec = 3;
        m_checkAbandonedIntervalSec=3600;
    }
public:
    /**
     * A default constructor.
     */
    Pool()
    {
        initialize();
    }
    /**
     * A default deconstructor.
     */
    ~Pool()
    {
        initialize();
    }
    /**
     * Reset the Pool.
     */
    void reset()
    {
        initialize();
    }
    /**
     * Initialize the pool with specific parameters.
     * This method could be only called once per instance.
     * @param nPoolSize
     * @param nExpirationTime
     * @param bTempObjAllowed
     * @param nWaitTime
     */
    void initialize(const unsigned nPoolSize,
            std::function()> constructFunc,
            std::function&)> checkHealthFunc,
            std::function&)> reactiveFunc,
            std::function&)> destructFunc,
            const bool bTempObjAllowed=true,
            const unsigned nWaitTime = 3)
    {
        std::lock_guard scopelock(m_dataMutex);
        if (m_size == 0)
        {
            m_size = nPoolSize;
            m_isTempObjAllowed = bTempObjAllowed;
            m_waitTimeSec = nWaitTime;
            m_constructFunc=constructFunc;
            m_checkHealthFunc=checkHealthFunc;
            m_reactiveFunc=reactiveFunc;
            m_destructFunc=destructFunc;
        }
        else
            throw FailureException("can't Initialize the pool again");
    }

    /**
     * Borrow an object from the Pool.
     * This method promises finding a new object.
     * @return the object pointer
     */
    std::shared_ptr& checkout()
    {
        while (true)
        {
            {
                std::lock_guard scopelock(m_dataMutex);
                std::shared_ptr &pObj=findFreeObject();
                if (pObj!=nullptr)
                {
                    return pObj;
                }
                // did not find a free one
                if (m_freeList.size() + m_reservedList.size() < m_size)
                    return createObject();
                else if ((long)time(NULL) - m_lastCheckTimestampForAbandonedObjs > m_checkAbandonedIntervalSec)
                {
                    collectAbandonedObjects();
                    std::shared_ptr &pObj = findFreeObject();
                    if (pObj!=nullptr)
                        return pObj;
                }
                else if (m_isTempObjAllowed)
                    return createObject();
                collectAbandonedObjects();
                {
                    std::shared_ptr &pObj = findFreeObject();
                    if (pObj!=nullptr)
                        return pObj;
                }
            }
            sleep(m_waitTimeSec);
        }
    }
    /**
     * Return an object to this Pool.
     * This method will first validate the returned object, then put it in the free object list, and finally remove it from the reserved object list.
     * @param pObj the object to return
     */
    void checkin(std::shared_ptr& pObj)
    {
        std::lock_guard scopelock(m_dataMutex);
        if (validateObject(pObj))
        {
            m_freeList.push_back(pObj);
            // Todo: why?
            //oTemp.setObject(NULL);
        }
        else
        {// the object is bad, so deconstruct it
            m_destructFunc(pObj);
        }
        // remove the object from the reserved list
        for (typename ObjList_t::iterator i=m_reservedList.begin(); i!=m_reservedList.end(); ++i)
        {
            if (*i==pObj)
            {
                i = m_reservedList.erase(i);
                break;
            }
        }
    }

private:
    /**
     * Create a new object and add it to the reserved object list.
     * @return the newly created object
     */
    std::shared_ptr& createObject()
    {
        std::shared_ptr newObj=m_constructFunc();
        if (newObj!=nullptr && m_checkHealthFunc(newObj))
        {
            m_reservedList.push_back(newObj);
            return m_reservedList.back();
        }
        else
        {
            throw FailureException("could not create Object");
        }
    }
    /**
     * It will move abandoned objects to the free object list from the reserved object list,
     * if they could be active.
     */
    void collectAbandonedObjects()
    {
        for (typename ObjList_t::iterator it=m_reservedList.begin(); it!=m_reservedList.end(); ++it)
        {
            ObjHolder_t &oHolder = *it;
            if (oHolder.unique())
            {// checks whether the managed object is managed only by the current shared_ptr instance
                if (validateObject(oHolder)==true)
                {
                    m_freeList.push_back(oHolder);
                }
                it = m_reservedList.erase(it);
            }
        }
        m_lastCheckTimestampForAbandonedObjs=(long)time(NULL);
    }

    /**
     * Validate object if it is still usable.
     * If not, try to make it usable.
     * @param obj the pointer to the object which needs check
     * @return true if obj is good; false otherwise
     */
    bool validateObject(std::shared_ptr &obj)
    {
        if (obj==nullptr)
        {
            return false;
        }
        else if (m_checkHealthFunc(obj) || m_reactiveFunc(obj))
        {
            return true;
        }
        return false;
    }

    /**
     * Find a free object which could be active from the free object list.
     * The free object list is checked. If any free object is inactive, we will try to reactive it.
     * If reactiving it failed, we will drop the object.
     * @return nullptr if no free object available which could be active
     */
    std::shared_ptr &findFreeObject()
    {
        // find existing free Object
        while (!m_freeList.empty())
        {
            ObjHolder_t &obj=m_freeList.front();
            if (validateObject(obj))
            {
                m_reservedList.push_back(obj);
                m_freeList.pop_front();
                return m_reservedList.back();
            }
            else// delete the Object
            {
                m_freeList.pop_front();
                m_destructFunc(obj);
            }
        }
        return m_nullptr;
    }

public:
    /**
     * Print the info of this Pool to a string.
     * @return the string representation of this Pool
     */
    std::string toString() const
    {
        std::stringstream ss;
        ss << "Pool(size=" << m_size
                << " isTempObjAllowed=" << m_isTempObjAllowed
                << " reservedList=" << m_reservedList.size()
                << " freeList=" << m_freeList.size();
        ss << ")";
        return ss.str();
    }
};




/////////////////////////////////////////////////////////////////////////////////////////////////////////
// How to use Pool

 #include <iostream> 
 #include <curl/curl.h> 

int main(int argc, char *argv[])
{

    typedef CURL T;
    const unsigned nPoolSize=2;
    std::string url="https://datamarket.accesscontrol.windows.net/v2/OAuth2-13/";
    std::function()> constructFunc(
    [url]() -> std::shared_ptr
    {
        CURL *curl=curl_easy_init();
        CURLcode res;
        if (curl==NULL)
            throw MZFailureException("could not get a curl handle in "
                    +std::string(__FILE__)+"("+std::string(__FUNCTION__)+") on line "+std::to_string(__LINE__));

        /* First set the URL that is about to receive our POST. This URL can
           just as well be a https:// URL if that is what should receive the
           data. */
        curl_easy_setopt(curl, CURLOPT_URL, url.c_str());

        curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0);
        // we will delete the pointer by ourselves. Otherwise, shared_ptr would call delete
        std::shared_ptr p(curl, [](T*){});
        return p;
    });
    std::function&)> checkHealthFunc=
    [](std::shared_ptr& curlConn) -> bool
    {
        return true;
    };
    std::function&)> reactiveFunc=
    [](std::shared_ptr& curlConn) -> bool
    {
        return true;
    };
    std::function&)> destructFunc=
    [](std::shared_ptr& curlConn)
    {
        curl_easy_cleanup(curlConn.get());
        curlConn.reset();
    };
    const bool bTempObjAllowed=true;
    const unsigned nWaitTime = 1;
    // get the pool instance
    Pool
<T>  pool;
    // initialize the pool
    pool.initialize(nPoolSize,
                constructFunc,
                checkHealthFunc,
                reactiveFunc,
                destructFunc,
                bTempObjAllowed,
                nWaitTime);
    // checkout the object
    std::shared_ptr pObj=pool.checkout();
    std::cerr << "after checkout, the pool is: " << pool.toString() << std::endl;
    pObj=pool.checkout();
    std::cerr << "after checkout, the pool is: " << pool.toString() << std::endl;
    pObj=pool.checkout();
    std::cerr << "after checkout, the pool is: " << pool.toString() << std::endl;
    pObj=pool.checkout();
    std::cerr << "after checkout, the pool is: " << pool.toString() << std::endl;
    pObj=pool.checkout();
    std::cerr << "after checkout, the pool is: " << pool.toString() << std::endl;
    if(pObj!=nullptr)
    {
        std::cerr << "got an object which is not nullptr" << std::endl;
        pool.checkin(pObj); // checkin the object
        std::cerr << "after checkin, the pool is: " << pool.toString() << std::endl;
    }
    else
        std::cerr << "got an object which is nullptr" << std::endl;
    // reset the pool
    pool.reset();
    std::cerr << "after reset, the pool is: " << pool.toString() << std::endl;
}

No comments: