Dynare++ multithreading: simplification of mutex interface

parent 752a02a3
......@@ -121,7 +121,7 @@ public:
smarter. */
void
operator()() override
operator()(std::mutex &mut) override
{
_Tpit beg = quad.begin(ti, tn, level);
_Tpit end = quad.begin(ti+1, tn, level);
......@@ -138,7 +138,7 @@ public:
}
{
sthread::synchro syn(&outvec, "IntegrationWorker");
std::unique_lock<std::mutex> lk{mut};
outvec.add(1.0, tmpall);
}
}
......
......@@ -480,12 +480,12 @@ IRFResults::writeMat(mat_t *fd, const char *prefix) const
}
void
SimulationWorker::operator()()
SimulationWorker::operator()(std::mutex &mut)
{
auto *esr = new ExplicitShockRealization(sr, np);
TwoDMatrix *m = dr.simulate(em, np, st, *esr);
{
sthread::synchro syn(&res, "simulation");
std::unique_lock<std::mutex> lk{mut};
res.addDataSet(m, esr);
}
}
......@@ -494,7 +494,7 @@ SimulationWorker::operator()()
corresponding control, add the impulse, and simulate. */
void
SimulationIRFWorker::operator()()
SimulationIRFWorker::operator()(std::mutex &mut)
{
auto *esr
= new ExplicitShockRealization(res.control.getShocks(idata));
......@@ -504,13 +504,13 @@ SimulationIRFWorker::operator()()
TwoDMatrix *m = dr.simulate(em, np, st, *esr);
m->add(-1.0, res.control.getData(idata));
{
sthread::synchro syn(&res, "simulation");
std::unique_lock<std::mutex> lk{mut};
res.addDataSet(m, esr);
}
}
void
RTSimulationWorker::operator()()
RTSimulationWorker::operator()(std::mutex &mut)
{
NormalConj nc(res.nc.getDim());
const PartitionY &ypart = dr.getYPart();
......@@ -546,7 +546,7 @@ RTSimulationWorker::operator()()
nc.update(y);
}
{
sthread::synchro syn(&res, "rtsimulation");
std::unique_lock<std::mutex> lk{mut};
res.nc.update(nc);
if (res.num_per-ip > 0)
{
......
......@@ -925,7 +925,7 @@ public:
: res(sim_res), dr(dec_rule), em(emet), np(num_per), st(start), sr(shock_r)
{
}
void operator()() override;
void operator()(std::mutex &mut) override;
};
/* This worker simulates a given impulse |imp| to a given shock
......@@ -951,7 +951,7 @@ public:
idata(id), ishock(ishck), imp(impulse)
{
}
void operator()() override;
void operator()(std::mutex &mut) override;
};
/* This class does the real time simulation job for
......@@ -977,7 +977,7 @@ public:
: res(sim_res), dr(dec_rule), em(emet), np(num_per), ystart(start), sr(shock_r)
{
}
void operator()() override;
void operator()(std::mutex &mut) override;
};
/* This class generates draws from Gaussian distribution with zero mean
......
......@@ -54,12 +54,12 @@ FoldedStackContainer::multAndAdd(int dim, const FGSContainer &c, FGSTensor &out)
code@>|. */
void
WorkerFoldMAADense::operator()()
WorkerFoldMAADense::operator()(std::mutex &mut)
{
Permutation iden(dense_cont.num());
IntSequence coor(sym, iden.getMap());
const FGSTensor *g = dense_cont.get(sym);
cont.multAndAddStacks(coor, *g, out, &out);
cont.multAndAddStacks(coor, *g, out, mut);
}
WorkerFoldMAADense::WorkerFoldMAADense(const FoldedStackContainer &container,
......@@ -94,7 +94,7 @@ FoldedStackContainer::multAndAddSparse1(const FSSparseTensor &t,
vertically narrow out accordingly. */
void
WorkerFoldMAASparse1::operator()()
WorkerFoldMAASparse1::operator()(std::mutex &mut)
{
const EquivalenceSet &eset = ebundle.get(out.dimen());
const PermutationSet &pset = tls.pbundle->get(t.dimen());
......@@ -121,7 +121,7 @@ WorkerFoldMAASparse1::operator()()
{
FPSTensor fps(out.getDims(), it, slice, kp);
{
sthread::synchro syn(&out, "WorkerUnfoldMAASparse1");
std::unique_lock<std::mutex> lk{mut};
fps.addTo(out);
}
}
......@@ -168,7 +168,7 @@ FoldedStackContainer::multAndAddSparse2(const FSSparseTensor &t,
rows. */
void
WorkerFoldMAASparse2::operator()()
WorkerFoldMAASparse2::operator()(std::mutex &mut)
{
GSSparseTensor slice(t, cont.getStackSizes(), coor,
TensorDimens(cont.getStackSizes(), coor));
......@@ -181,10 +181,10 @@ WorkerFoldMAASparse2::operator()()
int r2 = slice.getLastNonZeroRow();
FGSTensor dense_slice1(r1, r2-r1+1, dense_slice);
FGSTensor out1(r1, r2-r1+1, out);
cont.multAndAddStacks(coor, dense_slice1, out1, &out);
cont.multAndAddStacks(coor, dense_slice1, out1, mut);
}
else
cont.multAndAddStacks(coor, slice, out, &out);
cont.multAndAddStacks(coor, slice, out, mut);
}
}
......@@ -257,12 +257,12 @@ FoldedStackContainer::multAndAddSparse4(const FSSparseTensor &t, FGSTensor &out)
|multAndAddStacks|. */
void
WorkerFoldMAASparse4::operator()()
WorkerFoldMAASparse4::operator()(std::mutex &mut)
{
GSSparseTensor slice(t, cont.getStackSizes(), coor,
TensorDimens(cont.getStackSizes(), coor));
if (slice.getNumNonZero())
cont.multAndAddStacks(coor, slice, out, &out);
cont.multAndAddStacks(coor, slice, out, mut);
}
WorkerFoldMAASparse4::WorkerFoldMAASparse4(const FoldedStackContainer &container,
......@@ -284,7 +284,7 @@ WorkerFoldMAASparse4::WorkerFoldMAASparse4(const FoldedStackContainer &container
void
FoldedStackContainer::multAndAddStacks(const IntSequence &coor,
const FGSTensor &g,
FGSTensor &out, const void *ad) const
FGSTensor &out, std::mutex &mut) const
{
const EquivalenceSet &eset = ebundle.get(out.dimen());
......@@ -310,7 +310,7 @@ FoldedStackContainer::multAndAddStacks(const IntSequence &coor,
kp.optimizeOrder();
FPSTensor fps(out.getDims(), it, sort_per, ug, kp);
{
sthread::synchro syn(ad, "multAndAddStacks");
std::unique_lock<std::mutex> lk{mut};
fps.addTo(out);
}
}
......@@ -329,7 +329,7 @@ FoldedStackContainer::multAndAddStacks(const IntSequence &coor,
void
FoldedStackContainer::multAndAddStacks(const IntSequence &coor,
const GSSparseTensor &g,
FGSTensor &out, const void *ad) const
FGSTensor &out, std::mutex &mut) const
{
const EquivalenceSet &eset = ebundle.get(out.dimen());
UFSTensor dummy_u(0, numStacks(), g.dimen());
......@@ -351,7 +351,7 @@ FoldedStackContainer::multAndAddStacks(const IntSequence &coor,
KronProdStack<FGSTensor> kp(sp, coor);
FPSTensor fps(out.getDims(), it, sort_per, g, kp);
{
sthread::synchro syn(ad, "multAndAddStacks");
std::unique_lock<std::mutex> lk{mut};
fps.addTo(out);
}
}
......@@ -404,12 +404,12 @@ UnfoldedStackContainer::multAndAdd(int dim, const UGSContainer &c,
}
void
WorkerUnfoldMAADense::operator()()
WorkerUnfoldMAADense::operator()(std::mutex &mut)
{
Permutation iden(dense_cont.num());
IntSequence coor(sym, iden.getMap());
const UGSTensor *g = dense_cont.get(sym);
cont.multAndAddStacks(coor, *g, out, &out);
cont.multAndAddStacks(coor, *g, out, mut);
}
WorkerUnfoldMAADense::WorkerUnfoldMAADense(const UnfoldedStackContainer &container,
......@@ -476,7 +476,7 @@ UnfoldedStackContainer::multAndAddSparse1(const FSSparseTensor &t,
todo: vertically narrow slice and out according to the fill in t. */
void
WorkerUnfoldMAASparse1::operator()()
WorkerUnfoldMAASparse1::operator()(std::mutex &mut)
{
const EquivalenceSet &eset = ebundle.get(out.dimen());
const PermutationSet &pset = tls.pbundle->get(t.dimen());
......@@ -503,7 +503,7 @@ WorkerUnfoldMAASparse1::operator()()
{
UPSTensor ups(out.getDims(), it, slice, kp);
{
sthread::synchro syn(&out, "WorkerUnfoldMAASparse1");
std::unique_lock<std::mutex> lk{mut};
ups.addTo(out);
}
}
......@@ -561,7 +561,7 @@ UnfoldedStackContainer::multAndAddSparse2(const FSSparseTensor &t,
|@<|WorkerFoldMAASparse2::operator()()| code@>|. */
void
WorkerUnfoldMAASparse2::operator()()
WorkerUnfoldMAASparse2::operator()(std::mutex &mut)
{
GSSparseTensor slice(t, cont.getStackSizes(), coor,
TensorDimens(cont.getStackSizes(), coor));
......@@ -574,7 +574,7 @@ WorkerUnfoldMAASparse2::operator()()
UGSTensor dense_slice1(r1, r2-r1+1, dense_slice);
UGSTensor out1(r1, r2-r1+1, out);
cont.multAndAddStacks(coor, dense_slice1, out1, &out);
cont.multAndAddStacks(coor, dense_slice1, out1, mut);
}
}
......@@ -604,7 +604,7 @@ WorkerUnfoldMAASparse2::WorkerUnfoldMAASparse2(const UnfoldedStackContainer &con
void
UnfoldedStackContainer::multAndAddStacks(const IntSequence &fi,
const UGSTensor &g,
UGSTensor &out, const void *ad) const
UGSTensor &out, std::mutex &mut) const
{
const EquivalenceSet &eset = ebundle.get(out.dimen());
......@@ -629,7 +629,7 @@ UnfoldedStackContainer::multAndAddStacks(const IntSequence &fi,
kp.optimizeOrder();
UPSTensor ups(out.getDims(), it, sort_per, g, kp);
{
sthread::synchro syn(ad, "multAndAddStacks");
std::unique_lock<std::mutex> lk{mut};
ups.addTo(out);
}
}
......
......@@ -295,9 +295,9 @@ protected:
void multAndAddSparse3(const FSSparseTensor &t, FGSTensor &out) const;
void multAndAddSparse4(const FSSparseTensor &t, FGSTensor &out) const;
void multAndAddStacks(const IntSequence &fi, const FGSTensor &g,
FGSTensor &out, const void *ad) const;
FGSTensor &out, std::mutex &mut) const;
void multAndAddStacks(const IntSequence &fi, const GSSparseTensor &g,
FGSTensor &out, const void *ad) const;
FGSTensor &out, std::mutex &mut) const;
};
class WorkerUnfoldMAADense;
......@@ -323,7 +323,7 @@ protected:
void multAndAddSparse1(const FSSparseTensor &t, UGSTensor &out) const;
void multAndAddSparse2(const FSSparseTensor &t, UGSTensor &out) const;
void multAndAddStacks(const IntSequence &fi, const UGSTensor &g,
UGSTensor &out, const void *ad) const;
UGSTensor &out, std::mutex &mut) const;
};
/* Here is the specialization of the |StackContainer|. We implement
......@@ -656,7 +656,7 @@ public:
const Symmetry &s,
const FGSContainer &dcontainer,
FGSTensor &outten);
void operator()() override;
void operator()(std::mutex &mut) override;
};
class WorkerFoldMAASparse1 : public sthread::detach_thread
......@@ -670,7 +670,7 @@ public:
WorkerFoldMAASparse1(const FoldedStackContainer &container,
const FSSparseTensor &ten,
FGSTensor &outten, const IntSequence &c);
void operator()() override;
void operator()(std::mutex &mut) override;
};
class WorkerFoldMAASparse2 : public sthread::detach_thread
......@@ -683,7 +683,7 @@ public:
WorkerFoldMAASparse2(const FoldedStackContainer &container,
const FSSparseTensor &ten,
FGSTensor &outten, const IntSequence &c);
void operator()() override;
void operator()(std::mutex &mut) override;
};
class WorkerFoldMAASparse4 : public sthread::detach_thread
......@@ -696,7 +696,7 @@ public:
WorkerFoldMAASparse4(const FoldedStackContainer &container,
const FSSparseTensor &ten,
FGSTensor &outten, const IntSequence &c);
void operator()() override;
void operator()(std::mutex &mut) override;
};
class WorkerUnfoldMAADense : public sthread::detach_thread
......@@ -710,7 +710,7 @@ public:
const Symmetry &s,
const UGSContainer &dcontainer,
UGSTensor &outten);
void operator()() override;
void operator()(std::mutex &mut) override;
};
class WorkerUnfoldMAASparse1 : public sthread::detach_thread
......@@ -724,7 +724,7 @@ public:
WorkerUnfoldMAASparse1(const UnfoldedStackContainer &container,
const FSSparseTensor &ten,
UGSTensor &outten, const IntSequence &c);
void operator()() override;
void operator()(std::mutex &mut) override;
};
class WorkerUnfoldMAASparse2 : public sthread::detach_thread
......@@ -737,7 +737,7 @@ public:
WorkerUnfoldMAASparse2(const UnfoldedStackContainer &container,
const FSSparseTensor &ten,
UGSTensor &outten, const IntSequence &c);
void operator()() override;
void operator()(std::mutex &mut) override;
};
#endif
......@@ -9,37 +9,6 @@ namespace sthread
uniprocessor machine with hyper-threading */
int detach_thread_group::max_parallel_threads = 2;
/* The constructor acquires the mutex in the map. First it tries to
get an exclusive access to the map. Then it increases a number of
references of the mutex (if it does not exists, it inserts it). Then
unlocks the map, and finally tries to lock the mutex of the map. */
synchro::synchro(const void *c, std::string id)
: caller{c}, iden{std::move(id)}
{
mutmap.lock_map();
if (!mutmap.check(caller, iden))
mutmap.insert(caller, iden);
mutmap.get(caller, iden).second++;
mutmap.unlock_map();
mutmap.get(caller, iden).first.lock();
}
/* The destructor first locks the map. Then releases the lock,
and decreases a number of references. If it is zero, it removes the
mutex. */
synchro::~synchro()
{
mutmap.lock_map();
if (mutmap.check(caller, iden))
{
mutmap.get(caller, iden).first.unlock();
mutmap.get(caller, iden).second--;
if (mutmap.get(caller, iden).second == 0)
mutmap.remove(caller, iden);
}
mutmap.unlock_map();
}
/* We cycle through all threads in the group, and in each cycle we wait
for the change in the |counter|. If the counter indicates less than
maximum parallel threads running, then a new thread is run, and the
......@@ -49,15 +18,15 @@ namespace sthread
void
detach_thread_group::run()
{
std::unique_lock<std::mutex> lk{m};
std::unique_lock<std::mutex> lk{mut_cv};
auto it = tlist.begin();
while (it != tlist.end())
{
counter++;
std::thread th{[&, it] {
// The "it" variable is captured by value, because otherwise the iterator may move
(*it)->operator()();
std::unique_lock<std::mutex> lk2{m};
(*it)->operator()(mut_threads);
std::unique_lock<std::mutex> lk2{mut_cv};
counter--;
std::notify_all_at_thread_exit(cv, std::move(lk2));
}};
......
......@@ -15,15 +15,8 @@
are not joined, they are synchronized by means of a counter counting
running threads. A change of the counter is checked by waiting on an
associated condition. The number of maximum parallel
threads can be controlled. See below.
\li |synchro| object locks a piece of code to be executed only serially
for a given data and specified entry-point. It locks the code until it
is destructed. So, the typical use is to create the |synchro| object
on the stack of a function which is to be synchronized. The
synchronization can be subjected to specific data (then a pointer can
be passed to |synchro|'s constructor), and can be subjected to
specific entry-point (then |std::string| is passed to the
constructor).
threads can be controlled. See below. The group also provides a mutex to be
shared between the workers for their own synchronization purposes.
\endunorderedlist
The number of maximum parallel threads is controlled via a static
......@@ -42,104 +35,6 @@
namespace sthread
{
using mmkey = std::pair<const void *, std::string>;
/* Here we define a map of mutexes keyed by a pair of address, and a
string. A purpose of the map of mutexes is that, if synchronizing, we
need to publish mutexes locking some piece of codes (characterized by
the string) accessing the data (characterized by the pointer). So, if
any thread needs to pass a |synchro| object, it creates its own with
the same address and string, and must look to some public storage to
unlock the mutex. If the |synchro| object is created for the first
time, the mutex is created and inserted to the map. We count the
references to the mutex (number of waiting threads) to know, when it
is save to remove the mutex from the map. This is the only purpose of
counting the references. Recall, that the mutex is keyed by an address
of the data, and without removing, the number of mutexes would only
grow.
The map itself needs its own mutex to avoid concurrent insertions and
deletions. */
struct ltmmkey
{
bool
operator()(const mmkey &k1, const mmkey &k2) const
{
return k1.first < k2.first
|| (k1.first == k2.first && k1.second < k2.second);
}
};
using mutex_int_map = std::map<mmkey, std::pair<std::mutex, int>, ltmmkey>;
class mutex_map : public mutex_int_map
{
using mmval = std::pair<std::mutex, int>;
std::mutex m;
public:
mutex_map() = default;
void
insert(const void *c, std::string id)
{
// We cannot use emplace(), because std::mutex is neither copyable nor moveable
operator[](mmkey{c, std::move(id)}).second = 0;
}
bool
check(const void *c, std::string id) const
{
return find(mmkey{c, std::move(id)}) != end();
}
/* This returns the pair of mutex and count reference number. */
mmval &
get(const void *c, std::string id)
{
return operator[](mmkey{c, std::move(id)});
}
/* This removes unconditionally the mutex from the map regardless its
number of references. The only user of this class should be |synchro|
class, it implementation must not remove referenced mutex. */
void
remove(const void *c, std::string id)
{
auto it = find(mmkey{c, std::string{id}});
if (it != end())
erase(it);
}
void
lock_map()
{
m.lock();
}
void
unlock_map()
{
m.unlock();
}
};
// The global map used by the synchro class
static mutex_map mutmap;
/* This is the |synchro| class. The constructor of this class tries to
lock a mutex for a particular address (identification of data) and
string (identification of entry-point). If the mutex is already
locked, it waits until it is unlocked and then returns. The destructor
releases the lock. The typical use is to construct the object on the
stacked of the code being synchronized. */
class synchro
{
private:
const void *caller;
const std::string iden;
public:
synchro(const void *c, std::string id);
~synchro();
};
/* The detached thread is the same as joinable |thread|. We only
re-implement |run| method to call |thread_traits::detach_run|, and add
a method which installs a counter. The counter is increased and
......@@ -149,7 +44,7 @@ namespace sthread
{
public:
virtual ~detach_thread() = default;
virtual void operator()() = 0;
virtual void operator()(std::mutex &mut) = 0;
};
/* The detach thread group is (by interface) the same as
......@@ -159,9 +54,10 @@ namespace sthread
class detach_thread_group
{
std::vector<std::unique_ptr<detach_thread>> tlist;
std::mutex m; // For the condition variable and the counter
std::mutex mut_cv; // For the condition variable and the counter
std::condition_variable cv;
int counter{0};
std::mutex mut_threads; // Passed to the workers and shared between them
public:
static int max_parallel_threads;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment