c++ - How to implement lock-free counter with std::atomic? -
in program multiple threads (checkers) requests webpages , if these pages contain data, threads (consumers) process data. need predefined count of consumers start processing (not all). try use std::atomic counter , fetch_add limit working consumers count. although counter stay in bounds, consumers identical counter values , real processing consumers count exceed limit. behavior depend on processing duration. simplified code contains sleep_for instead getting page , processing page functions.
#include <iostream> #include <thread> #include <atomic> #include <chrono> class cconsumer { public: cconsumer::cconsumer( const size_t anumber, std::atomic<bool> &afire, std::atomic<size_t> &acounter) : mnumber(anumber), mfire(afire), mcounter(acounter){} void cconsumer::operator ()() { while (true) { while (!mfire.load()) std::this_thread::sleep_for(mmillisecond); size_t vcounter = mcounter.fetch_add(1); if (vcounter < 5) { std::cout << " fire! consumer " << mnumber << ", counter " << vcounter << "\n"; std::this_thread::sleep_for(mworkduration); } if (vcounter == 5) { mfire.store(false); mcounter.store(0); } } } private: static const std::chrono::milliseconds mmillisecond, mworkduration; const size_t mnumber; std::atomic<bool> &mfire; std::atomic<size_t> &mcounter; }; const std::chrono::milliseconds cconsumer::mmillisecond(1), cconsumer::mworkduration(1300); class cchecker { public: cchecker( const size_t anumber, std::atomic<bool> &afire) : mnumber(anumber), mfire(afire), mstep(1){ } void cchecker::operator ()() { while (true) { while (mfire.load()) std::this_thread::sleep_for(mmillisecond); std::cout << "checker " << mnumber << " step " << mstep << "\n"; std::this_thread::sleep_for(mcheckduration); if (mstep % 20 == 1) mfire.store(true); mstep++; } } private: static const std::chrono::milliseconds mmillisecond, mcheckduration; const size_t mnumber; size_t mstep; std::atomic<bool> &mfire; }; const std::chrono::milliseconds cchecker::mmillisecond(1), cchecker::mcheckduration(500); void main() { std::atomic<bool> vfire(false); std::atomic<size_t> vcounter(0); std::thread vconsumerthreads[16]; (size_t = 0; < 16; i++) { std::thread vconsumerthread((cconsumer(i, vfire, vcounter))); vconsumerthreads[i] = std::move(vconsumerthread); } std::chrono::milliseconds vnextcheckerdelay(239); std::thread vcheckerthreads[3]; (size_t = 0; < 3; i++) { std::thread vcheckerthread((cchecker(i, vfire))); vcheckerthreads[i] = std::move(vcheckerthread); std::this_thread::sleep_for(vnextcheckerdelay); } (size_t = 0; < 16; i++) vconsumerthreads[i].join(); (size_t = 0; < 3; i++) vcheckerthreads[i].join(); }
output example (partial)
... checker 1 step 19 checker 0 step 20 checker 2 step 19 checker 1 step 20 checker 0 step 21 checker 2 step 20 checker 1 step 21 fire! consumer 10, counter 0 fire! consumer 13, counter 4 fire! consumer 6, counter 1 fire! consumer 0, counter 2 fire! consumer 2, counter 3 checker 0 step 22 checker 2 step 21 fire! consumer 5, counter 3 fire! consumer 7, counter 4 fire! consumer 4, counter 1 fire! consumer 15, counter 2 fire! consumer 8, counter 0 checker 1 step 22 fire! consumer 9, counter 0 fire! consumer 11, counter 1 fire! consumer 3, counter 2 fire! consumer 14, counter 3 fire! consumer 1, counter 4 checker 0 step 23 checker 2 step 22 checker 1 step 23 checker 2 step 23 checker 0 step 24 checker 1 step 24
i found 1 solution working not elegant: wait consumers try work , understand fire off.
#include <iostream> #include <thread> #include <atomic> #include <chrono> class cconsumer { public: cconsumer::cconsumer( const size_t anumber, const size_t aconsumercount, std::atomic<bool> &afire, std::atomic<size_t> &acounter) : mnumber(anumber), mconsumercount(aconsumercount), mfire(afire), mcounter(acounter){} void cconsumer::operator ()() { while (true) { while (!mfire.load()) std::this_thread::sleep_for(mmillisecond); const size_t vcounter = mcounter.fetch_add(1); if (vcounter < 5) { std::cout << " fire! consumer " << mnumber << ", counter " << vcounter << "\n"; std::this_thread::sleep_for(mworkduration); //stub process function } if (vcounter >= 5) { std::this_thread::sleep_for(mworkduration); //wait other threads increase counter std::this_thread::sleep_for(mworkduration); //double wait long processing mfire.store(false); } if (vcounter == mconsumercount) { mcounter.store(0); } } } private: static const std::chrono::milliseconds mmillisecond, mworkduration; const size_t mnumber, mconsumercount; std::atomic<bool> &mfire; std::atomic<size_t> &mcounter; }; const std::chrono::milliseconds cconsumer::mmillisecond(1), cconsumer::mworkduration(1300); class cchecker { public: cchecker( const size_t anumber, std::atomic<bool> &afire) : mnumber(anumber), mfire(afire), mstep(1){ } void cchecker::operator ()() { while (true) { while (mfire.load()) std::this_thread::sleep_for(mmillisecond); std::cout << "checker " << mnumber << " step " << mstep << "\n"; std::this_thread::sleep_for(mcheckduration); if (mstep % 20 == 1) mfire.store(true); mstep++; } } private: static const std::chrono::milliseconds mmillisecond, mcheckduration; const size_t mnumber; size_t mstep; std::atomic<bool> &mfire; }; const std::chrono::milliseconds cchecker::mmillisecond(1), cchecker::mcheckduration(500); void main() { std::atomic<bool> vfire(false); std::atomic<size_t> vcouter(0); std::thread vconsumerthreads[16]; (size_t = 0; < 16; i++) { vconsumerthreads[i] = std::move(std::thread(cconsumer(i, 16, vfire, vcouter))); } std::chrono::milliseconds vnextcheckerdelay(239); std::thread vcheckerthreads[3]; (size_t = 0; < 3; i++) { vcheckerthreads[i] = std::move(std::thread(cchecker(i, vfire))); std::this_thread::sleep_for(vnextcheckerdelay); } (size_t = 0; < 16; i++) vconsumerthreads[i].join(); (size_t = 0; < 3; i++) vcheckerthreads[i].join();
i think better solution exists.
what happens here ?
with little luck, once set fire, there many more worker 5 passing line:
while(!mfire.load()) std::this_thread::sleep_for(mmillisecond);
suppose there 10 workers awake, , counter 0. every 10 workers execute this:
size_t vcounter = mcouter.fetch_add(1);
and every of 10 workers has different counter between 1 , 11. 5 first execute if clause:
if(vcounter < 5)
any thread having higher counter continue. among them 6th thread, reset fire , reset counter:
if(vcounter == 5) { mfire.store(false); mcouter.store(0); cout << "reset!!!!!! consume "<<mnumber << endl; // useful understand }
all these iddle threads continue loop waiting next fire.
but bad things can happen, because have workers still working, , have bunch of checkers waiting set fire again:
while(mfire.load()) std::this_thread::sleep_for(mmillisecond); ... // fire reset, go on
and reach following line:
if(mstep % 20 == 1) { mfire.store(true); cout << "set fire" << endl; // make problem visual }
as atomic counter 0, you'll have 5 new workers start new job in addition ones still running.
what can ?
it's not clear me intend do:
- do want have 5 workers active each new fire ? in case, it's ok did. total number of workers exceed 5 in total.
do want have max 5 workers active @ moment in time ? in case should never reset number of workers 0 did, should decrement counter threads have incremented it. conter contain number of threads in fire processing section:
while(true) { while(!mfire.load()) std::this_thread::sleep_for(mmillisecond); size_t vcounter = mcouter.fetch_add(1); // fire processing: increment counter if(vcounter < 5) { std::cout << " fire! consumer " << mnumber << ", counter " << vcounter << "\n"; std::this_thread::sleep_for(mworkduration); std::cout << " finished consumer "<< mnumber<<endl; } if(vcounter == 5) { mfire.store(false); //mcouter.store(0); cout << "reset!!!!!! consumer "<<mnumber << endl; } mcouter.fetch_sub(1); // end of processing: decrement counter
Comments
Post a Comment