processGraph(): Call getEdges in parallel
This commit is contained in:
parent
63d6e0ad3f
commit
90da34e421
1 changed files with 46 additions and 27 deletions
|
@ -70,50 +70,69 @@ void processGraph(
|
||||||
struct Graph {
|
struct Graph {
|
||||||
std::set<T> left;
|
std::set<T> left;
|
||||||
std::map<T, std::set<T>> refs, rrefs;
|
std::map<T, std::set<T>> refs, rrefs;
|
||||||
std::function<void(T)> wrap;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
ref<Sync<Graph>> graph_ = make_ref<Sync<Graph>>();
|
Sync<Graph> graph_(Graph{nodes, {}, {}});
|
||||||
|
|
||||||
auto wrapWork = [&pool, graph_, processNode](const T & node) {
|
std::function<void(const T &)> worker;
|
||||||
|
|
||||||
|
worker = [&](const T & node) {
|
||||||
|
|
||||||
|
{
|
||||||
|
auto graph(graph_.lock());
|
||||||
|
auto i = graph->refs.find(node);
|
||||||
|
if (i == graph->refs.end())
|
||||||
|
goto getRefs;
|
||||||
|
goto doWork;
|
||||||
|
}
|
||||||
|
|
||||||
|
getRefs:
|
||||||
|
{
|
||||||
|
auto refs = getEdges(node);
|
||||||
|
refs.erase(node);
|
||||||
|
|
||||||
|
{
|
||||||
|
auto graph(graph_.lock());
|
||||||
|
for (auto & ref : refs)
|
||||||
|
if (graph->left.count(ref)) {
|
||||||
|
graph->refs[node].insert(ref);
|
||||||
|
graph->rrefs[ref].insert(node);
|
||||||
|
}
|
||||||
|
if (graph->refs[node].empty())
|
||||||
|
goto doWork;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
|
||||||
|
doWork:
|
||||||
processNode(node);
|
processNode(node);
|
||||||
|
|
||||||
/* Enqueue work for all nodes that were waiting on this one. */
|
/* Enqueue work for all nodes that were waiting on this one
|
||||||
|
and have no unprocessed dependencies. */
|
||||||
{
|
{
|
||||||
auto graph(graph_->lock());
|
auto graph(graph_.lock());
|
||||||
graph->left.erase(node);
|
|
||||||
for (auto & rref : graph->rrefs[node]) {
|
for (auto & rref : graph->rrefs[node]) {
|
||||||
auto & refs(graph->refs[rref]);
|
auto & refs(graph->refs[rref]);
|
||||||
auto i = refs.find(node);
|
auto i = refs.find(node);
|
||||||
assert(i != refs.end());
|
assert(i != refs.end());
|
||||||
refs.erase(i);
|
refs.erase(i);
|
||||||
if (refs.empty())
|
if (refs.empty())
|
||||||
pool.enqueue(std::bind(graph->wrap, rref));
|
pool.enqueue(std::bind(worker, rref));
|
||||||
}
|
}
|
||||||
|
graph->left.erase(node);
|
||||||
|
graph->refs.erase(node);
|
||||||
|
graph->rrefs.erase(node);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
{
|
for (auto & node : nodes)
|
||||||
auto graph(graph_->lock());
|
pool.enqueue(std::bind(worker, std::ref(node)));
|
||||||
graph->left = nodes;
|
|
||||||
graph->wrap = wrapWork;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Build the dependency graph; enqueue all nodes with no
|
pool.process();
|
||||||
dependencies. */
|
|
||||||
for (auto & node : nodes) {
|
if (!graph_.lock()->left.empty())
|
||||||
auto refs = getEdges(node);
|
throw Error("graph processing incomplete (cyclic reference?)");
|
||||||
{
|
|
||||||
auto graph(graph_->lock());
|
|
||||||
for (auto & ref : refs)
|
|
||||||
if (ref != node && graph->left.count(ref)) {
|
|
||||||
graph->refs[node].insert(ref);
|
|
||||||
graph->rrefs[ref].insert(node);
|
|
||||||
}
|
|
||||||
if (graph->refs[node].empty())
|
|
||||||
pool.enqueue(std::bind(graph->wrap, node));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue