I am trying to write a C++11/14 program in which a fixed number of threads (say 4) continuously take a work off a threadsafe queue, until there is no work left in the queue.
Threadsafe queue implementation:
template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue() {}
threadsafe_queue(threadsafe_queue const &other)
{
std::lock_guard<std::mutex> lk(other.mut);
data_queue = other.data_queue;
}
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T &value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty();});
value = data_queue.front();
data_queue.pop();
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty();});
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool try_pop(T &value)
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return false;
value = data_queue.front();
data_queue.pop();
return true;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
Function each thread runs:
void insertintobidask(std::string connstring, std::string ziparchivename, OFStreamWriter &errlog) { /.../ }
The Main in which the threads are supposed to take a work off the workqueue until there is no work left in the queue:
int main()
{
std::ofstream errlog
errlog.open("/home/vorlket/Desktop/Project/Code/Test/errlog.txt", std::ofstream::out);
OFStreamWriter ofsw(&errlog);
threadsafe_queue<std::string> wqueue;
boost::filesystem::path fx_dir("/home/vorlket/Desktop/Project/Code/Test/Data");
std::regex pattern_fx("HISTDATA_COM_ASCII_.*.zip");
for (boost::filesystem::recursive_directory_iterator iter(fx_dir), end; iter != end; ++iter)
{
std::string name = iter->path().filename().string();
if (std::regex_match(name, pattern_fx))
{
wqueue.push(name);
}
}
/* Each thread below would run once, how do I modify it to make it continuously take a work off the queue and run until there is no work left in the queue?
std::thread consumer1 (insertintobidask, "hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw);
std::thread consumer2 (insertintobidask, "hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw);
std::thread consumer3 (insertintobidask, "hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw);
std::thread consumer4 (insertintobidask, "hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw);
consumer1.join();
consumer2.join();
consumer3.join();
consumer4.join();
*/
errlog.close();
return 0;
}
I tried another approach based on Nim's answer below, but the threads terminate without doing any work - the inserintobidask function works fine with a single thread. Furthermore, presuming this approach works, I wonder what needs to be done to use multiple network cards for possible speedup.
/* g++ -std=gnu++11 fxetl.cxx -o fxetl -lboost_system -lboost_filesystem -lzip -lpqxx -lpq -pthread */
#include <boost/filesystem.hpp>
#include <regex>
#include <iostream>
#include <fstream>
#include <string>
#include <pqxx/pqxx>
#include <zip.h>
#include <thread>
#include <boost/asio.hpp>
void insertintobidask(std::string connstring, std::string ziparchivename)
{
pqxx::connection conn(connstring);
pqxx::work txn(conn);
std::string fileyearmonth = ziparchivename.substr(27, 6);
std::string ziparchivepath = "HISTDATA_COM_ASCII_AUDUSD_T" + fileyearmonth + ".zip";
std::string zipfilepath = "DAT_ASCII_AUDUSD_T_" + fileyearmonth + ".csv";
int err, r;
char buffer[39]; // each line takes up 39 bytes
struct zip *ziparchive = zip_open(ziparchivepath.c_str(), 0, &err);
if (!ziparchive)
{
zip_close(ziparchive);
}
struct zip_file *zipfile = zip_fopen(ziparchive, zipfilepath.c_str(), 0);
if (zipfile)
{
while ((r = zip_fread(zipfile, buffer, sizeof(buffer))) > 0)
{
std::string str(buffer);
txn.exec("INSERT INTO fx.bidask VALUES('AUDUSD', to_timestamp(" +txn.quote(str.substr(0, 18)) + ", 'YYYYMMDD HH24MISSMS'), " + txn.quote(str.substr(19, 8)) + ", " + txn.quote(str.substr(28, 8)) + ")");
}
zip_fclose(zipfile);
txn.commit();
}
zip_close(ziparchive);
}
int main()
{
boost::asio::io_service service; // queue
boost::filesystem::path fx_dir("/home/vorlket/Desktop/Project/Code/Test/Data");
std::regex pattern_fx("HISTDATA_COM_ASCII_.*.zip");
for (boost::filesystem::recursive_directory_iterator iter(fx_dir), end; iter != end; ++iter)
{
std::string name = iter->path().filename().string();
if (std::regex_match(name, pattern_fx))
{
service.post([name]() { insertintobidask("hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", name); });
}
}
std::thread t1([&service]() { service.run(); });
std::thread t2([&service]() { service.run(); });
std::thread t3([&service]() { service.run(); });
std::thread t4([&service]() { service.run(); });
t1.join();
t2.join();
t3.join();
t4.join();
}
Aucun commentaire:
Enregistrer un commentaire