c++ - How to implement lock-free counter with std::atomic? -


in program multiple threads (checkers) requests webpages , if these pages contain data, threads (consumers) process data. need predefined count of consumers start processing (not all). try use std::atomic counter , fetch_add limit working consumers count. although counter stay in bounds, consumers identical counter values , real processing consumers count exceed limit. behavior depend on processing duration. simplified code contains sleep_for instead getting page , processing page functions.

#include <iostream> #include <thread> #include <atomic> #include <chrono>  class cconsumer { public:      cconsumer::cconsumer(         const size_t anumber,         std::atomic<bool> &afire,         std::atomic<size_t> &acounter) :         mnumber(anumber),         mfire(afire),         mcounter(acounter){}      void cconsumer::operator ()()     {         while (true)         {             while (!mfire.load()) std::this_thread::sleep_for(mmillisecond);              size_t vcounter = mcounter.fetch_add(1);             if (vcounter < 5)             {                 std::cout << "      fire! consumer " << mnumber << ", counter " << vcounter << "\n";                 std::this_thread::sleep_for(mworkduration);             }             if (vcounter == 5)             {                 mfire.store(false);                 mcounter.store(0);             }         }     }  private:      static const std::chrono::milliseconds          mmillisecond,         mworkduration;      const size_t mnumber;      std::atomic<bool> &mfire;     std::atomic<size_t> &mcounter; };  const std::chrono::milliseconds      cconsumer::mmillisecond(1),     cconsumer::mworkduration(1300);  class cchecker { public:      cchecker(         const size_t anumber,         std::atomic<bool> &afire) :         mnumber(anumber),         mfire(afire),         mstep(1){ }      void cchecker::operator ()()     {         while (true)         {             while (mfire.load()) std::this_thread::sleep_for(mmillisecond);              std::cout << "checker " << mnumber << " step " << mstep << "\n";             std::this_thread::sleep_for(mcheckduration);             if (mstep % 20 == 1) mfire.store(true);                      mstep++;         }     }  private:      static const std::chrono::milliseconds          mmillisecond,         mcheckduration;      const size_t mnumber;      size_t mstep;      std::atomic<bool> &mfire; };  const std::chrono::milliseconds      cchecker::mmillisecond(1),     cchecker::mcheckduration(500);  void main() {     std::atomic<bool> vfire(false);     std::atomic<size_t> vcounter(0);      std::thread vconsumerthreads[16];      (size_t = 0; < 16; i++)     {         std::thread vconsumerthread((cconsumer(i, vfire, vcounter)));         vconsumerthreads[i] = std::move(vconsumerthread);            }      std::chrono::milliseconds vnextcheckerdelay(239);      std::thread vcheckerthreads[3];      (size_t = 0; < 3; i++)     {         std::thread vcheckerthread((cchecker(i, vfire)));         vcheckerthreads[i] = std::move(vcheckerthread);         std::this_thread::sleep_for(vnextcheckerdelay);     }      (size_t = 0; < 16; i++) vconsumerthreads[i].join();      (size_t = 0; < 3; i++) vcheckerthreads[i].join(); } 

output example (partial)

... checker 1 step 19 checker 0 step 20 checker 2 step 19 checker 1 step 20 checker 0 step 21 checker 2 step 20 checker 1 step 21       fire! consumer 10, counter 0       fire! consumer 13, counter 4       fire! consumer 6, counter 1       fire! consumer 0, counter 2       fire! consumer 2, counter 3 checker 0 step 22 checker 2 step 21       fire! consumer 5, counter 3       fire! consumer 7, counter 4       fire! consumer 4, counter 1       fire! consumer 15, counter 2       fire! consumer 8, counter 0 checker 1 step 22       fire! consumer 9, counter 0       fire! consumer 11, counter 1       fire! consumer 3, counter 2       fire! consumer 14, counter 3       fire! consumer 1, counter 4 checker 0 step 23 checker 2 step 22 checker 1 step 23 checker 2 step 23 checker 0 step 24 checker 1 step 24 

i found 1 solution working not elegant: wait consumers try work , understand fire off.

#include <iostream> #include <thread> #include <atomic> #include <chrono>  class cconsumer { public:      cconsumer::cconsumer(         const size_t anumber,         const size_t aconsumercount,         std::atomic<bool> &afire,         std::atomic<size_t> &acounter) :         mnumber(anumber),         mconsumercount(aconsumercount),         mfire(afire),         mcounter(acounter){}      void cconsumer::operator ()()     {         while (true)         {             while (!mfire.load()) std::this_thread::sleep_for(mmillisecond);              const size_t vcounter = mcounter.fetch_add(1);              if (vcounter < 5)             {                 std::cout << "      fire! consumer " << mnumber << ", counter " << vcounter << "\n";                 std::this_thread::sleep_for(mworkduration); //stub process function             }              if (vcounter >= 5)             {                 std::this_thread::sleep_for(mworkduration); //wait other threads increase counter                 std::this_thread::sleep_for(mworkduration); //double wait long processing                 mfire.store(false);             }              if (vcounter == mconsumercount)             {                                mcounter.store(0);             }         }     }  private:      static const std::chrono::milliseconds          mmillisecond,         mworkduration;      const size_t          mnumber,         mconsumercount;      std::atomic<bool> &mfire;     std::atomic<size_t> &mcounter; };  const std::chrono::milliseconds      cconsumer::mmillisecond(1),     cconsumer::mworkduration(1300);  class cchecker { public:      cchecker(         const size_t anumber,         std::atomic<bool> &afire) :         mnumber(anumber),         mfire(afire),         mstep(1){ }      void cchecker::operator ()()     {         while (true)         {             while (mfire.load()) std::this_thread::sleep_for(mmillisecond);              std::cout << "checker " << mnumber << " step " << mstep << "\n";             std::this_thread::sleep_for(mcheckduration);             if (mstep % 20 == 1) mfire.store(true);                      mstep++;         }     }  private:      static const std::chrono::milliseconds          mmillisecond,         mcheckduration;      const size_t mnumber;      size_t mstep;      std::atomic<bool> &mfire; };  const std::chrono::milliseconds      cchecker::mmillisecond(1),     cchecker::mcheckduration(500);  void main() {     std::atomic<bool> vfire(false);     std::atomic<size_t> vcouter(0);      std::thread vconsumerthreads[16];      (size_t = 0; < 16; i++)     {         vconsumerthreads[i] = std::move(std::thread(cconsumer(i, 16, vfire, vcouter)));     }      std::chrono::milliseconds vnextcheckerdelay(239);      std::thread vcheckerthreads[3];      (size_t = 0; < 3; i++)     {         vcheckerthreads[i] = std::move(std::thread(cchecker(i, vfire)));         std::this_thread::sleep_for(vnextcheckerdelay);     }      (size_t = 0; < 16; i++) vconsumerthreads[i].join();      (size_t = 0; < 3; i++) vcheckerthreads[i].join(); 

i think better solution exists.

what happens here ?

with little luck, once set fire, there many more worker 5 passing line:

    while(!mfire.load()) std::this_thread::sleep_for(mmillisecond); 

suppose there 10 workers awake, , counter 0. every 10 workers execute this:

    size_t vcounter = mcouter.fetch_add(1); 

and every of 10 workers has different counter between 1 , 11. 5 first execute if clause:

        if(vcounter < 5) 

any thread having higher counter continue. among them 6th thread, reset fire , reset counter:

        if(vcounter == 5)         {             mfire.store(false);             mcouter.store(0);             cout << "reset!!!!!! consume "<<mnumber << endl; // useful understand         } 

all these iddle threads continue loop waiting next fire.

but bad things can happen, because have workers still working, , have bunch of checkers waiting set fire again:

while(mfire.load()) std::this_thread::sleep_for(mmillisecond); ...   // fire reset, go on 

and reach following line:

        if(mstep % 20 == 1) {             mfire.store(true);              cout << "set fire" << endl;   // make problem visual         } 

as atomic counter 0, you'll have 5 new workers start new job in addition ones still running.

what can ?

it's not clear me intend do:

  • do want have 5 workers active each new fire ? in case, it's ok did. total number of workers exceed 5 in total.
  • do want have max 5 workers active @ moment in time ? in case should never reset number of workers 0 did, should decrement counter threads have incremented it. conter contain number of threads in fire processing section:

    while(true) {     while(!mfire.load()) std::this_thread::sleep_for(mmillisecond);      size_t vcounter = mcouter.fetch_add(1);   // fire processing: increment counter     if(vcounter < 5)     {         std::cout << "      fire! consumer " << mnumber << ", counter " << vcounter << "\n";         std::this_thread::sleep_for(mworkduration);         std::cout << "         finished consumer "<< mnumber<<endl;     }     if(vcounter == 5)     {         mfire.store(false);         //mcouter.store(0);         cout << "reset!!!!!! consumer "<<mnumber << endl;      }     mcouter.fetch_sub(1);                    // end of processing: decrement counter  

Comments

Popular posts from this blog

facebook - android ACTION_SEND to share with specific application only -

python - Creating a new virtualenv gives a permissions error -

javascript - cocos2d-js draw circle not instantly -