To give a brief overview of the program 1.) Opens a connection to the socket and reads data 2.) Separation of data on a new line char 3.) Pushes data segments into the queue for processing in a separate thread.
I use the curlpp library as it handles authentication and DNS. A queue is just a deque with mutexes to ensure thread safety.
This is the method I'm using right now.
std::string input;
size_t socketIO::dataCallBack(char* ptr, size_t size, size_t nmemb) {
size_t realsize = size * nmemb;
input.append(ptr, realsize);
size_t oldPosition = 0;
size_t position = 0;
position = input.find('\r', oldPosition);
while (position != std::string::npos) {
queueObject.push(input.substr(oldPosition, position))
oldPosition = position + 1;
position = input.find('\r', oldPosition);
}
input = input.substr(oldPosition);
return realsize;
}
I have a few issues. I am having problems with memory leaks and valgrind is showing a serious leak from this function.
==12867== 813,287,102 bytes in 390,337 blocks are possibly lost in loss record 359 of 359
==12867== at 0x4C27CC1: operator new(unsigned long) (vg_replace_malloc.c:261)
==12867== by 0x5AA8D98: std::string::_Rep::_S_create(unsigned long, unsigned long, std::allocator<char> const&) (in /usr/lib/libstdc++.so.6.0.13)
==12867== by 0x5AA9B64: ??? (in /usr/lib/libstdc++.so.6.0.13)
==12867== by 0x5AA9D38: std::basic_string<char, std::char_traits<char>, std::allocator<char> >::basic_string(std::string const&, unsigned long, unsigned long) (in /usr/lib/libstdc++.so.6.0.13)
==12867== by 0x41E4F5: socketIO::write(char*, unsigned long, unsigned long) (basic_string.h:2006)
==12867== by 0x509C657: utilspp::Functor<unsigned long, utilspp::tl::TypeList<char*, utilspp::tl::TypeList<unsigned long, utilspp::tl::TypeList<unsigned long, utilspp::NullType> > > >::operator()(char*, unsigned long, unsigned long) (Functor.hpp:106)
==12867== by 0x509B6E4: curlpp::internal::CurlHandle::executeWriteFunctor(char*, unsigned long, unsigned long) (CurlHandle.cpp:171)
==12867== by 0x509F509: curlpp::internal::Callbacks::WriteCallback(char*, unsigned long, unsigned long, curlpp::internal::CurlHandle*) (OptionSetter.cpp:47)
==12867== by 0x4E3D667: ??? (in /usr/lib/libcurl-gnutls.so.4.1.1)
==12867== by 0x4E5407B: ??? (in /usr/lib/libcurl-gnutls.so.4.1.1)
==12867== by 0x4E505A1: ??? (in /usr/lib/libcurl-gnutls.so.4.1.1)
==12867== by 0x4E51A8F: ??? (in /usr/lib/libcurl-gnutls.so.4.1.1)
==12867== by 0x509A78B: curlpp::internal::CurlHandle::perform() (CurlHandle.cpp:52)
==12867== by 0x5093A6B: curlpp::Easy::perform() (Easy.cpp:48)
==12867== by 0x41EDC3: socketIO::processLoop() (socketIO.cpp:126)
. istringstream, , , , . , , , .
UPDATE
. , .
main.cpp
#include "types.h"
#include "threadBase.h"
#include "socketIO.h"
#include "processor.h"
#include "dbTweetQueue.h"
#include "dbUserQueue.h"
#include <vector>
stringQueue twitToProc;
tweetQueue tweetQ;
userQueue userQ;
deleteQueue deleteQ;
std::vector<ThreadBase *> threadGroup;
std::string dbBase::dbUser(DBUSER);
std::string dbBase::dbURL(DBURL);
std::string dbBase::dbPass(DBPASS);
void sigquit(int param)
{
std::cout<<"Received sigquit"<<std::endl;
for(unsigned int i = 0; i < threadGroup.size(); i++)
{
threadGroup[i]->interupt();
}
}
int main(int argc, char* argv[])
{
try{
struct sigaction act;
act.sa_handler = sigquit;
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
sigaction(SIGQUIT, &act, 0);
int MaxThreads = 5;
if(argc < 3)
{
std::cout<<"Usage: >"<<argv[0]<<" TwitterUserName TwitterPassWord"<<std::endl;
std::cout<<"Using Defaults: "<<TWITTERACCT<<" "<<TWITTERPASS<<std::endl;
}
if(argc == 3)
{
threadGroup.push_back(new socketIO(twitToProc, argv[1], argv[2]));
}
else
{
threadGroup.push_back(new socketIO(twitToProc));
}
for(int i = 0; i < MaxThreads; i++)
{
threadGroup.push_back(new processor(twitToProc, tweetQ, deleteQ, userQ));
}
threadGroup.push_back(new dbTweetQueue(tweetQ, deleteQ));
threadGroup.push_back(new dbUserQueue(userQ));
for(unsigned int i = 0; i < threadGroup.size(); i++)
{
threadGroup[i]->start();
}
for(unsigned int i = 0; i < threadGroup.size(); i++)
{
threadGroup[i]->join();
}
} catch (std::exception & e) {
std::cerr << e.what() << std::endl;
}
for(unsigned int i = 0; i < threadGroup.size(); i++)
{
threadGroup[i]->();
}
return 0;
}
threadBase.h
#ifndef _THREADBASE_H
#define _THREADBASE_H
#include <boost/thread.hpp>
class ThreadBase
{
public:
virtual void join() = 0;
virtual void start() = 0;
void interupt(){thread.interrupt();}
protected:
boost::thread thread;
};
#endif
socketIO.h
#ifndef _SOCKETIO_H
#define _SOCKETIO_H
#include "types.h"
#include "threadBase.h"
#include <boost/bind.hpp>
#include <curlpp/cURLpp.hpp>
#include <curlpp/Multi.hpp>
#include <curlpp/Easy.hpp>
#include <curlpp/Options.hpp>
#include <curlpp/Exception.hpp>
#include <curlpp/Infos.hpp>
#include <curl/curl.h>
#include <signal.h>
#include <string>
#include <sstream>
#include <cstdlib>
#define defaultRepeatInterval 10;
class socketIO: public ThreadBase {
private:
int repeatInterval;
double previousDownloadSize;
int failCount;
int writeRound;
std::string userPassword;
stringQueue& queueObject;
std::string input;
public:
socketIO(stringQueue & messageQueue):
queueObject(messageQueue)
{
userPassword.append(TWITTERACCT);
userPassword.append(":");
userPassword.append(TWITTERPASS);
}
socketIO(stringQueue & messageQueue, char* userName, char* password):
queueObject(messageQueue)
{
userPassword.append(userName);
userPassword.append(":");
userPassword.append(password);
}
virtual ~socketIO();
void join();
void start();
std::auto_ptr<curlpp::Easy> createRequest(int);
void processLoop();
size_t write(char* ptr, size_t size, size_t nmemb);
int progress(double, double, double, double);
};
#endif
socketIO.cpp
socketIO::~socketIO() {
}
void socketIO::start() {
thread = boost::thread(&socketIO::processLoop, this);
}
void socketIO::join() {
thread.join();
}
size_t socketIO::write(char* ptr, size_t size, size_t nmemb) {
size_t realsize = size * nmemb;
std::string temp;
temp.append(input);
temp.append(ptr, realsize);
size_t oldPosition = 0;
size_t position = 0;
position = temp.find('\r', oldPosition);
while (position != std::string::npos) {
queueObject.push(temp.substr(oldPosition, position));
++writeRound;
oldPosition = position + 1;
position = temp.find('\r', oldPosition);
}
input = temp.substr(oldPosition);
return realsize;
}
int socketIO::progress(double dltotal, double dlnow, double ultotal, double ulnow) {
if (boost::this_thread::interruption_requested())
return 1;
if (dlnow == previousDownloadSize) {
if (failCount < 15)
failCount++;
else {
repeatInterval = repeatInterval * 2;
return 1;
}
} else {
repeatInterval = 10;
previousDownloadSize = dlnow;
}
return 0;
}
std::auto_ptr<curlpp::Easy> socketIO::createRequest(int source) {
input = std::string("");
std::auto_ptr<curlpp::Easy> newRequest(new curlpp::Easy);
curlpp::types::ProgressFunctionFunctor progressFunctor(this, &socketIO::progress);
newRequest->setOpt(new curlpp::options::ProgressFunction(progressFunctor));
curlpp::types::WriteFunctionFunctor functor(this, &socketIO::write);
newRequest->setOpt(new curlpp::options::WriteFunction(functor));
newRequest->setOpt(new curlpp::options::FailOnError(true));
newRequest->setOpt(new curlpp::options::NoProgress(0));
newRequest->setOpt(new curlpp::options::Verbose(true));
newRequest->setOpt(new curlpp::options::UserPwd(userPassword));
std::string params = "track=basketball,football,baseball,footy,soccer";
switch (source) {
case 1:
newRequest->setOpt(new curlpp::options::Url("127.0.0.1:17000"));
break;
case 2:
newRequest->setOpt(new curlpp::options::Url("http://stream.twitter.com/1/statuses/filter.json"));
newRequest->setOpt(new curlpp::options::PostFields(params));
newRequest->setOpt(new curlpp::options::PostFieldSize(params.size()));
break;
case 3:
newRequest->setOpt(new curlpp::options::Url("http://stream.twitter.com/1/statuses/sample.json"));
break;
}
return newRequest;
}
void socketIO::processLoop() {
repeatInterval = defaultRepeatInterval;
std::auto_ptr<curlpp::Easy> request;
while (true) {
try {
previousDownloadSize = 0;
failCount = 0;
request.reset(createRequest(3));
request->perform();
} catch (curlpp::UnknowException & e) {
std::cout << "Unknown Exception: " << e.what() << std::endl;
} catch (curlpp::RuntimeError & e) {
std::cout << "Runtime Exception: " << e.what() << std::endl;
} catch (curlpp::LogicError & e) {
std::cout << "Logic Exception: " << e.what() << std::endl;
}
if (boost::this_thread::interruption_requested())
break;
else
boost::this_thread::sleep(boost::posix_time::seconds(repeatInterval));
}
}
types.h
#ifndef _TYPES_H
#define _TYPES_H
#include <string>
#include <concurrent_queue.hpp>
#define DBUSER "****"
#define DBPASS "****"
#define DBURL "****"
#define TWITTERACCT "****"
#define TWITTERPASS "****"
typedef struct tweet {
...
} tweet;
typedef struct user {
...
} user;
typedef concurrent_queue<std::string> stringQueue;
typedef std::pair<int, std::string> dbPair;
typedef concurrent_queue<dbPair> dbQueue;
typedef concurrent_queue<tweet> tweetQueue;
typedef concurrent_queue<user> userQueue;
typedef concurrent_queue<boost::int64_t> deleteQueue;
#endif
concurrent_queue.hpp
#ifndef _CONCURRENT_QUEUE_
#define _CONCURRENT_QUEUE_
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#include <deque>
template<typename Data>
class concurrent_queue
{
private:
std::deque<Data> the_queue;
mutable boost::mutex the_mutex;
boost::condition_variable the_condition_variable;
public:
void push(Data const& data)
{
boost::mutex::scoped_lock lock(the_mutex);
the_queue.push_back(data);
lock.unlock();
the_condition_variable.notify_one();
}
bool empty() const
{
boost::mutex::scoped_lock lock(the_mutex);
return the_queue.empty();
}
bool try_pop(Data& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
if(the_queue.empty())
{
return false;
}
popped_value=the_queue.front();
the_queue.pop_front();
return true;
}
void wait_and_pop(Data& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
while(the_queue.empty())
{
the_condition_variable.wait(lock);
}
popped_value=the_queue.front();
the_queue.pop_front();
}
};
#endif