jeudi 16 juin 2016

C++ Multithreading with a fixed number of threads and a threadsafe queue [on hold]

I am trying to write a C++11/14 program in which a fixed number of threads (say 4) continuously take a work off a threadsafe queue, until there is no work left in the queue.

Threadsafe queue implementation:

template<typename T>
class threadsafe_queue
{
private:
  mutable std::mutex mut;
  std::queue<T> data_queue;
  std::condition_variable data_cond;
public:
  threadsafe_queue() {}
  threadsafe_queue(threadsafe_queue const &other)
  {
    std::lock_guard<std::mutex> lk(other.mut);
    data_queue = other.data_queue;
  }

  void push(T new_value)
  {
    std::lock_guard<std::mutex> lk(mut);
    data_queue.push(new_value);
    data_cond.notify_one();
  }

  void wait_and_pop(T &value)
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk, [this]{return !data_queue.empty();});
    value = data_queue.front();
    data_queue.pop();
  }

  std::shared_ptr<T> wait_and_pop()
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk, [this]{return !data_queue.empty();});
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
    data_queue.pop();
    return res;
  }

  bool try_pop(T &value)
  {
    std::lock_guard<std::mutex> lk(mut);
    if (data_queue.empty())
      return false;
    value = data_queue.front();
    data_queue.pop();
    return true;
  }

  std::shared_ptr<T> try_pop()
  {
    std::lock_guard<std::mutex> lk(mut);
    if (data_queue.empty())
      return std::shared_ptr<T>();
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
    data_queue.pop();
    return res;
  }

  bool empty() const
  {
    std::lock_guard<std::mutex> lk(mut);
    return data_queue.empty();
  }
};

Function each thread runs:

void insertintobidask(std::string connstring, std::string ziparchivename, OFStreamWriter &errlog) { /.../ }

The Main in which the threads are supposed to take a work off the workqueue until there is no work left in the queue:

int main()
{
  std::ofstream errlog
  errlog.open("/home/vorlket/Desktop/Project/Code/Test/errlog.txt", std::ofstream::out);
  OFStreamWriter ofsw(&errlog);

  threadsafe_queue<std::string> wqueue;
  boost::filesystem::path fx_dir("/home/vorlket/Desktop/Project/Code/Test/Data");
  std::regex pattern_fx("HISTDATA_COM_ASCII_.*.zip");
  for (boost::filesystem::recursive_directory_iterator iter(fx_dir), end; iter != end; ++iter)
  {
    std::string name = iter->path().filename().string();
    if (std::regex_match(name, pattern_fx))
    {
      wqueue.push(name);
    }
  }

  /* Each thread below would run once, how do I modify it to make it continuously take a work off the queue and run until there is no work left in the queue?
  std::thread consumer1 (insertintobidask, "hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw);
  std::thread consumer2 (insertintobidask, "hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw);
  std::thread consumer3 (insertintobidask, "hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw);
  std::thread consumer4 (insertintobidask, "hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw);

  consumer1.join();
  consumer2.join();
  consumer3.join();
  consumer4.join();
  */

  errlog.close();
  return 0;
}

I tried another approach based on Nim's answer below, but the threads terminate without doing any work - the inserintobidask function works fine with a single thread. Furthermore, presuming this approach works, I wonder what needs to be done to use multiple network cards for possible speedup.

/* g++ -std=gnu++11 fxetl.cxx -o fxetl -lboost_system -lboost_filesystem -lzip -lpqxx -lpq -pthread */

#include <boost/filesystem.hpp>
#include <regex>
#include <iostream>
#include <fstream>
#include <string>
#include <pqxx/pqxx>
#include <zip.h>
#include <thread>
#include <boost/asio.hpp>

void insertintobidask(std::string connstring, std::string ziparchivename)
{
  pqxx::connection conn(connstring);
  pqxx::work txn(conn);

  std::string fileyearmonth = ziparchivename.substr(27, 6);
  std::string ziparchivepath = "HISTDATA_COM_ASCII_AUDUSD_T" + fileyearmonth + ".zip";
  std::string zipfilepath = "DAT_ASCII_AUDUSD_T_" + fileyearmonth + ".csv";
  int err, r;
  char buffer[39];  // each line takes up 39 bytes

  struct zip *ziparchive = zip_open(ziparchivepath.c_str(), 0, &err);
  if (!ziparchive)
  {
    zip_close(ziparchive);
  } 

  struct zip_file *zipfile = zip_fopen(ziparchive, zipfilepath.c_str(), 0);
  if (zipfile)
  {
    while ((r = zip_fread(zipfile, buffer, sizeof(buffer))) > 0)
    {
      std::string str(buffer);
      txn.exec("INSERT INTO fx.bidask VALUES('AUDUSD', to_timestamp(" +txn.quote(str.substr(0, 18)) + ", 'YYYYMMDD HH24MISSMS'), " + txn.quote(str.substr(19, 8)) + ", " + txn.quote(str.substr(28, 8)) + ")");
    }
    zip_fclose(zipfile);
    txn.commit();
  }

  zip_close(ziparchive);
}

int main()
{
  boost::asio::io_service service; // queue
  boost::filesystem::path fx_dir("/home/vorlket/Desktop/Project/Code/Test/Data");
  std::regex pattern_fx("HISTDATA_COM_ASCII_.*.zip");
  for (boost::filesystem::recursive_directory_iterator iter(fx_dir), end; iter != end; ++iter)
  {
    std::string name = iter->path().filename().string();
    if (std::regex_match(name, pattern_fx))
    {
      service.post([name]() { insertintobidask("hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", name); });
    }
  }

  std::thread t1([&service]() { service.run(); });
  std::thread t2([&service]() { service.run(); });
  std::thread t3([&service]() { service.run(); });
  std::thread t4([&service]() { service.run(); });

  t1.join();
  t2.join();
  t3.join();
  t4.join();
}

Aucun commentaire:

Enregistrer un commentaire