Parsing large amounts of text

To give a brief overview of the program 1.) Opens a connection to the socket and reads data 2.) Separation of data on a new line char 3.) Pushes data segments into the queue for processing in a separate thread.

I use the curlpp library as it handles authentication and DNS. A queue is just a deque with mutexes to ensure thread safety.

This is the method I'm using right now.

std::string input;
size_t socketIO::dataCallBack(char* ptr, size_t size, size_t nmemb) {
    // Calculate the real size of the incoming buffer
    size_t realsize = size * nmemb;

    //Append the new input to the old input
    input.append(ptr, realsize);

    //Find all the complete strings and push them to the queue
    size_t oldPosition = 0;
    size_t position = 0;
    position = input.find('\r', oldPosition);
    while (position != std::string::npos) {
        queueObject.push(input.substr(oldPosition, position))
        oldPosition = position + 1;
        position = input.find('\r', oldPosition);
    }

    //Save off the partial string as you'll get the rest of it on the next data callback
    input = input.substr(oldPosition);

    return realsize;
}

I have a few issues. I am having problems with memory leaks and valgrind is showing a serious leak from this function.

==12867== 813,287,102 bytes in 390,337 blocks are possibly lost in loss record 359 of 359
==12867==    at 0x4C27CC1: operator new(unsigned long) (vg_replace_malloc.c:261)
==12867==    by 0x5AA8D98: std::string::_Rep::_S_create(unsigned long, unsigned long, std::allocator<char> const&) (in /usr/lib/libstdc++.so.6.0.13)
==12867==    by 0x5AA9B64: ??? (in /usr/lib/libstdc++.so.6.0.13)
==12867==    by 0x5AA9D38: std::basic_string<char, std::char_traits<char>, std::allocator<char> >::basic_string(std::string const&, unsigned long, unsigned long) (in /usr/lib/libstdc++.so.6.0.13)
==12867==    by 0x41E4F5: socketIO::write(char*, unsigned long, unsigned long) (basic_string.h:2006)
==12867==    by 0x509C657: utilspp::Functor<unsigned long, utilspp::tl::TypeList<char*, utilspp::tl::TypeList<unsigned long, utilspp::tl::TypeList<unsigned long, utilspp::NullType> > > >::operator()(char*, unsigned long, unsigned long) (Functor.hpp:106)
==12867==    by 0x509B6E4: curlpp::internal::CurlHandle::executeWriteFunctor(char*, unsigned long, unsigned long) (CurlHandle.cpp:171)
==12867==    by 0x509F509: curlpp::internal::Callbacks::WriteCallback(char*, unsigned long, unsigned long, curlpp::internal::CurlHandle*) (OptionSetter.cpp:47)
==12867==    by 0x4E3D667: ??? (in /usr/lib/libcurl-gnutls.so.4.1.1)
==12867==    by 0x4E5407B: ??? (in /usr/lib/libcurl-gnutls.so.4.1.1)
==12867==    by 0x4E505A1: ??? (in /usr/lib/libcurl-gnutls.so.4.1.1)
==12867==    by 0x4E51A8F: ??? (in /usr/lib/libcurl-gnutls.so.4.1.1)
==12867==    by 0x509A78B: curlpp::internal::CurlHandle::perform() (CurlHandle.cpp:52)
==12867==    by 0x5093A6B: curlpp::Easy::perform() (Easy.cpp:48)
==12867==    by 0x41EDC3: socketIO::processLoop() (socketIO.cpp:126)

. istringstream, , , , . , , , .

UPDATE . , .

main.cpp

/**
 * The main driver for the twitter capture app.  Starts multiple threads for processors, 1 io thread and 2 db threads. One for user
 * information and the other for tweet information 
 */

#include "types.h"
#include "threadBase.h"
#include "socketIO.h"
#include "processor.h"
#include "dbTweetQueue.h"
#include "dbUserQueue.h"

#include <vector>


stringQueue twitToProc;
tweetQueue tweetQ;
userQueue userQ;
deleteQueue deleteQ;
std::vector<ThreadBase *> threadGroup;

std::string dbBase::dbUser(DBUSER);
std::string dbBase::dbURL(DBURL);
std::string dbBase::dbPass(DBPASS);

/*
 * Handle the signal for interupt
 */
void sigquit(int param)
{
    std::cout<<"Received sigquit"<<std::endl;
    for(unsigned int i = 0; i < threadGroup.size(); i++)
    {
        threadGroup[i]->interupt();
    }
}


int main(int argc, char* argv[])
{
    try{
    //Setting the signal handler up.
    struct sigaction act;
    act.sa_handler = sigquit;
    sigemptyset(&act.sa_mask);
    act.sa_flags = 0;
    sigaction(SIGQUIT, &act, 0);


    int MaxThreads = 5;
    if(argc < 3)
    {
        std::cout<<"Usage: >"<<argv[0]<<" TwitterUserName TwitterPassWord"<<std::endl;
        std::cout<<"Using Defaults: "<<TWITTERACCT<<" "<<TWITTERPASS<<std::endl;
    }

    // Create socketIO, and add it to the thread group
    if(argc == 3)
    {
        threadGroup.push_back(new socketIO(twitToProc, argv[1], argv[2]));
    }
    else
    {
        threadGroup.push_back(new socketIO(twitToProc));
    }


   // Create processorThreads and add them to the thread group
    for(int i = 0; i < MaxThreads; i++)
    {
        threadGroup.push_back(new processor(twitToProc, tweetQ, deleteQ, userQ));
    }

    //Create DB Threads and add them to the thread group.
    threadGroup.push_back(new dbTweetQueue(tweetQ, deleteQ));
    threadGroup.push_back(new dbUserQueue(userQ));


    // Start the threads
    for(unsigned int i = 0; i < threadGroup.size(); i++)
    {
        threadGroup[i]->start();
    }

    // Join the threads
    for(unsigned int i = 0; i < threadGroup.size(); i++)
    {
        threadGroup[i]->join();
    }

           } catch (std::exception & e) {
            std::cerr << e.what() <<  std::endl;
        } 

    for(unsigned int i = 0; i < threadGroup.size(); i++)
    {
        threadGroup[i]->();
    }
    return 0;
}

threadBase.h

#ifndef _THREADBASE_H
#define _THREADBASE_H

#include <boost/thread.hpp>

class ThreadBase
{
public:
    virtual void join() = 0;
    virtual void start() = 0;
    void interupt(){thread.interrupt();}
protected:
    boost::thread thread;

};



#endif  /* _THREADBASE_H */

socketIO.h

#ifndef _SOCKETIO_H
#define _SOCKETIO_H

#include "types.h"
#include "threadBase.h"

#include <boost/bind.hpp>
#include <curlpp/cURLpp.hpp>
#include <curlpp/Multi.hpp>
#include <curlpp/Easy.hpp>
#include <curlpp/Options.hpp>
#include <curlpp/Exception.hpp>
#include <curlpp/Infos.hpp>
#include <curl/curl.h>

#include <signal.h>
#include <string>
#include <sstream>
#include <cstdlib>


#define defaultRepeatInterval 10;

class socketIO: public ThreadBase {
private:
    int repeatInterval;
    double previousDownloadSize;
    int failCount;
    int writeRound;
    std::string userPassword;
    stringQueue&  queueObject;
    std::string input;


public:
    socketIO(stringQueue & messageQueue):
                queueObject(messageQueue)
    {
        userPassword.append(TWITTERACCT);
        userPassword.append(":");
        userPassword.append(TWITTERPASS);
    }

    socketIO(stringQueue & messageQueue, char* userName, char* password):
                queueObject(messageQueue)
    {
        userPassword.append(userName);
        userPassword.append(":");
        userPassword.append(password);
    }

    virtual ~socketIO();

    void join();
    void start();
    std::auto_ptr<curlpp::Easy> createRequest(int);



    void processLoop();
    size_t write(char* ptr, size_t size, size_t nmemb);
    int progress(double, double, double, double);

};

#endif  /* _SOCKETIO_H */

socketIO.cpp

#include "socketIO.h"

socketIO::~socketIO() {
}

/*
 * This method starts a new thread with the processLoop method
 */
void socketIO::start() {
    thread = boost::thread(&socketIO::processLoop, this);
}

/*
 * This method blocks waiting for the thread to exit
 */
void socketIO::join() {
    thread.join();
}

/*
 * The datacall back function for the open twitter connection.\
 */
size_t socketIO::write(char* ptr, size_t size, size_t nmemb) {
    // Calculate the real size of the incoming buffer
    size_t realsize = size * nmemb;
    std::string temp;
    temp.append(input);
    temp.append(ptr, realsize);
    size_t oldPosition = 0;
    size_t position = 0;
    position = temp.find('\r', oldPosition);
    while (position != std::string::npos) {
        queueObject.push(temp.substr(oldPosition, position));
        ++writeRound;
        oldPosition = position + 1;
        position = temp.find('\r', oldPosition);
    }
    input = temp.substr(oldPosition);
    return realsize;
}

/*
 * The timed callback function, called every second, used to monitor that the connection is still receiving data
 * Return 1 if requesting break or data flow stops, 0 if continuing normally
 */
int socketIO::progress(double dltotal, double dlnow, double ultotal, double ulnow) {
    // Allows us to break out on interruption
    if (boost::this_thread::interruption_requested())
        return 1;

    if (dlnow == previousDownloadSize) {
        if (failCount < 15)
            failCount++;
        else {
            repeatInterval = repeatInterval * 2;
            return 1;
        }
    } else {
        repeatInterval = 10;
        previousDownloadSize = dlnow;
    }
    return 0;
}

/*
 * This method creates a new connection to the twitter service with the required settings
 */
std::auto_ptr<curlpp::Easy> socketIO::createRequest(int source) {
    //Reset the input buffer when the connection is made.
    input = std::string("");
    std::auto_ptr<curlpp::Easy> newRequest(new curlpp::Easy);

    curlpp::types::ProgressFunctionFunctor progressFunctor(this, &socketIO::progress);
    newRequest->setOpt(new curlpp::options::ProgressFunction(progressFunctor));

    curlpp::types::WriteFunctionFunctor functor(this, &socketIO::write);
    newRequest->setOpt(new curlpp::options::WriteFunction(functor));

    newRequest->setOpt(new curlpp::options::FailOnError(true));
    newRequest->setOpt(new curlpp::options::NoProgress(0));
    newRequest->setOpt(new curlpp::options::Verbose(true));
    newRequest->setOpt(new curlpp::options::UserPwd(userPassword));


    //Code for debugging and using alternate sources
    std::string params = "track=basketball,football,baseball,footy,soccer";

    switch (source) {
        case 1: // Testing Locally
            newRequest->setOpt(new curlpp::options::Url("127.0.0.1:17000"));
            break;
        case 2: // Filtered
            newRequest->setOpt(new curlpp::options::Url("http://stream.twitter.com/1/statuses/filter.json"));
            newRequest->setOpt(new curlpp::options::PostFields(params));
            newRequest->setOpt(new curlpp::options::PostFieldSize(params.size()));
            break;
        case 3: //Twitter Main Stream
            newRequest->setOpt(new curlpp::options::Url("http://stream.twitter.com/1/statuses/sample.json"));
            break;
    }

    return newRequest;
}


/*
 * The main method of the thread.  Creates a new instance of the request
 */
void socketIO::processLoop() {
    repeatInterval = defaultRepeatInterval;
    std::auto_ptr<curlpp::Easy> request;
    while (true) {
        try {
            previousDownloadSize = 0;
            failCount = 0;
            request.reset(createRequest(3));
            request->perform();
        } catch (curlpp::UnknowException & e) {
            std::cout << "Unknown Exception: " << e.what() << std::endl;
        } catch (curlpp::RuntimeError & e) {
            std::cout << "Runtime Exception: " << e.what() << std::endl;
        } catch (curlpp::LogicError & e) {
            std::cout << "Logic Exception: " << e.what() << std::endl;
        }


        if (boost::this_thread::interruption_requested())
            break;
        else
            boost::this_thread::sleep(boost::posix_time::seconds(repeatInterval));
    }
}

types.h

#ifndef _TYPES_H
#define _TYPES_H

#include <string>
#include <concurrent_queue.hpp>

#define DBUSER "****"
#define DBPASS "****"
#define DBURL "****"
#define TWITTERACCT "****"
#define TWITTERPASS "****"

typedef struct tweet {
...
} tweet;

typedef struct user {
...
} user;


typedef concurrent_queue<std::string> stringQueue;
typedef std::pair<int, std::string> dbPair;
typedef concurrent_queue<dbPair> dbQueue;

typedef concurrent_queue<tweet> tweetQueue;
typedef concurrent_queue<user> userQueue;
typedef concurrent_queue<boost::int64_t> deleteQueue;

#endif  /* _TYPES_H */

concurrent_queue.hpp

#ifndef _CONCURRENT_QUEUE_
#define _CONCURRENT_QUEUE_

#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#include <deque>

template<typename Data>
class concurrent_queue
{
private:
    std::deque<Data> the_queue;
    mutable boost::mutex the_mutex;
    boost::condition_variable the_condition_variable;
public:
    void push(Data const& data)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        the_queue.push_back(data);
        lock.unlock();
        the_condition_variable.notify_one();
    }

    bool empty() const
    {
        boost::mutex::scoped_lock lock(the_mutex);
        return the_queue.empty();
    }

    bool try_pop(Data& popped_value)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        if(the_queue.empty())
        {
            return false;
        }

        popped_value=the_queue.front();
        the_queue.pop_front();
        return true;
    }

    void wait_and_pop(Data& popped_value)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        while(the_queue.empty())
        {
            the_condition_variable.wait(lock);
        }

        popped_value=the_queue.front();
        the_queue.pop_front();
    }

};

#endif  /* _CONCURRENT_QUEUE_ */
+3
4

, , , .

valgrind, , :

==12867== 813,287,102 bytes in 390,337 blocks are possibly lost in loss record 359 of 359
==12867==    at 0x4C27CC1: operator new(unsigned long) (vg_replace_malloc.c:261)
==12867==    by 0x5AA8D98: std::string::_Rep::_S_create(unsigned long, unsigned long, std::allocator<char> const&) (in /usr/lib/libstdc++.so.6.0.13)
==12867==    by 0x5AA9B64: ??? (in /usr/lib/libstdc++.so.6.0.13)
==12867==    by 0x5AA9D38: std::basic_string<char, std::char_traits<char>, std::allocator<char> >::basic_string(std::string const&, unsigned long, unsigned long) (in /usr/lib/libstdc++.so.6.0.13)
==12867==    by 0x41E4F5: socketIO::write(char*, unsigned long, unsigned long) (basic_string.h:2006)

, write. std::string, stl, , , - .

, , std::string . , :

  • socketIO .
  • , - .
+3

ThreadBase .

delete to ThreadBase*, , , ThreadBase, undefined. , - ( ).

class ThreadBase
{
public:
    virtual ~ThreadBase() {} // <-- There you go!

    virtual void join() = 0;
    virtual void start() = 0;

    void interupt() { thread.interrupt(); }

protected:
    boost::thread thread;
};

:

  • protected, .
  • NVI ( ) , , public, virtual, (, ), , .
  • , ThreadBase boost::noncopyable, , .
+2

, ( ), , , , curlpp.

, ++, , . , RAII , ( ), - .

curlpp, . , , , main() - , , , Easy HTTPClient ( ), .

+2

.

, , . , SO, . , ( , , ), . : ? Mic , .

, , .

+1

Source: https://habr.com/ru/post/1785309/


All Articles