Initial Commit

This commit is contained in:
misterg 2017-09-19 16:54:40 -04:00
commit c2e7548296
238 changed files with 65475 additions and 0 deletions

View file

@ -0,0 +1,178 @@
#
# Copyright 2017 The Abseil Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
load(
"//absl:copts.bzl",
"ABSL_DEFAULT_COPTS",
"ABSL_TEST_COPTS",
)
package(default_visibility = ["//visibility:public"])
licenses(["notice"]) # Apache 2.0
# Internal data structure for efficiently detecting mutex dependency cycles
cc_library(
name = "graphcycles_internal",
srcs = [
"internal/graphcycles.cc",
],
hdrs = [
"internal/graphcycles.h",
],
copts = ABSL_DEFAULT_COPTS,
deps = [
"//absl/base",
"//absl/base:core_headers",
"//absl/base:malloc_internal",
],
)
cc_library(
name = "synchronization",
srcs = [
"barrier.cc",
"blocking_counter.cc",
"internal/create_thread_identity.cc",
"internal/per_thread_sem.cc",
"internal/waiter.cc",
"notification.cc",
] + select({
"//conditions:default": ["mutex.cc"],
}),
hdrs = [
"barrier.h",
"blocking_counter.h",
"internal/create_thread_identity.h",
"internal/kernel_timeout.h",
"internal/mutex_nonprod.inc",
"internal/per_thread_sem.h",
"internal/waiter.h",
"mutex.h",
"notification.h",
],
copts = ABSL_DEFAULT_COPTS,
deps = [
":graphcycles_internal",
"//absl/base",
"//absl/base:base_internal",
"//absl/base:config",
"//absl/base:core_headers",
"//absl/base:dynamic_annotations",
"//absl/base:malloc_extension",
"//absl/base:malloc_internal",
"//absl/debugging:stacktrace",
"//absl/time",
],
)
cc_test(
name = "blocking_counter_test",
size = "small",
srcs = ["blocking_counter_test.cc"],
copts = ABSL_TEST_COPTS,
deps = [
":synchronization",
"//absl/time",
"@com_google_googletest//:gtest_main",
],
)
cc_test(
name = "graphcycles_test",
size = "medium",
srcs = ["internal/graphcycles_test.cc"],
copts = ABSL_TEST_COPTS,
deps = [
":graphcycles_internal",
"//absl/base",
"//absl/base:core_headers",
"@com_google_googletest//:gtest_main",
],
)
cc_library(
name = "thread_pool",
testonly = 1,
hdrs = ["internal/thread_pool.h"],
deps = [
":synchronization",
"//absl/base:core_headers",
],
)
cc_test(
name = "mutex_test",
size = "large",
timeout = "moderate",
srcs = ["mutex_test.cc"],
copts = ABSL_TEST_COPTS,
tags = [
"no_test_loonix", # Too slow.
],
deps = [
":synchronization",
":thread_pool",
"//absl/base",
"//absl/base:core_headers",
"//absl/memory",
"//absl/time",
"@com_google_googletest//:gtest_main",
],
)
cc_test(
name = "notification_test",
size = "small",
srcs = ["notification_test.cc"],
copts = ABSL_TEST_COPTS,
deps = [
":synchronization",
"//absl/time",
"@com_google_googletest//:gtest_main",
],
)
cc_library(
name = "per_thread_sem_test_common",
testonly = 1,
srcs = ["internal/per_thread_sem_test.cc"],
copts = ABSL_TEST_COPTS,
deps = [
":synchronization",
"//absl/base",
"//absl/base:malloc_extension",
"//absl/strings",
"//absl/time",
"@com_google_googletest//:gtest",
],
alwayslink = 1,
)
cc_test(
name = "per_thread_sem_test",
size = "medium",
copts = ABSL_TEST_COPTS,
deps = [
":per_thread_sem_test_common",
":synchronization",
"//absl/base",
"//absl/base:malloc_extension",
"//absl/strings",
"//absl/time",
"@com_google_googletest//:gtest_main",
],
)

View file

@ -0,0 +1,50 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "absl/synchronization/barrier.h"
#include "absl/base/internal/raw_logging.h"
#include "absl/synchronization/mutex.h"
namespace absl {
// Return whether int *arg is zero.
static bool IsZero(void *arg) {
return 0 == *reinterpret_cast<int *>(arg);
}
bool Barrier::Block() {
MutexLock l(&this->lock_);
this->num_to_block_--;
if (this->num_to_block_ < 0) {
ABSL_RAW_LOG(
FATAL,
"Block() called too many times. num_to_block_=%d out of total=%d",
this->num_to_block_, this->num_to_exit_);
}
this->lock_.Await(Condition(IsZero, &this->num_to_block_));
// Determine which thread can safely delete this Barrier object
this->num_to_exit_--;
ABSL_RAW_CHECK(this->num_to_exit_ >= 0, "barrier underflow");
// If num_to_exit_ == 0 then all other threads in the barrier have
// exited the Wait() and have released the Mutex so this thread is
// free to delete the barrier.
return this->num_to_exit_ == 0;
}
} // namespace absl

View file

@ -0,0 +1,77 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// -----------------------------------------------------------------------------
// barrier.h
// -----------------------------------------------------------------------------
#ifndef ABSL_SYNCHRONIZATION_BARRIER_H_
#define ABSL_SYNCHRONIZATION_BARRIER_H_
#include "absl/base/thread_annotations.h"
#include "absl/synchronization/mutex.h"
namespace absl {
// Barrier
//
// This class creates a barrier which blocks threads until a prespecified
// threshold of threads (`num_threads`) utilizes the barrier. A thread utilizes
// the `Barrier` by calling `Block()` on the barrier, which will block that
// thread; no call to `Block()` will return until `num_threads` threads have
// called it.
//
// Exactly one call to `Block()` will return `true`, which is then responsible
// for destroying the barrier; because stack allocation will cause the barrier
// to be deleted when it is out of scope, barriers should not be stack
// allocated.
//
// Example:
//
// // Main thread creates a `Barrier`:
// barrier = new Barrier(num_threads);
//
// // Each participating thread could then call:
// if (barrier->Block()) delete barrier; // Exactly one call to `Block()`
// // returns `true`; that call
// // deletes the barrier.
class Barrier {
public:
// `num_threads` is the number of threads that will participate in the barrier
explicit Barrier(int num_threads)
: num_to_block_(num_threads), num_to_exit_(num_threads) {}
Barrier(const Barrier&) = delete;
Barrier& operator=(const Barrier&) = delete;
// Barrier::Block()
//
// Blocks the current thread, and returns only when the `num_threads`
// threshold of threads utilizing this barrier has been reached. `Block()`
// returns `true` for precisely one caller, which may then destroy the
// barrier.
//
// Memory ordering: For any threads X and Y, any action taken by X
// before X calls `Block()` will be visible to Y after Y returns from
// `Block()`.
bool Block();
private:
Mutex lock_;
int num_to_block_ GUARDED_BY(lock_);
int num_to_exit_ GUARDED_BY(lock_);
};
} // namespace absl
#endif // ABSL_SYNCHRONIZATION_BARRIER_H_

View file

@ -0,0 +1,53 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "absl/synchronization/blocking_counter.h"
namespace absl {
// Return whether int *arg is zero.
static bool IsZero(void *arg) {
return 0 == *reinterpret_cast<int *>(arg);
}
bool BlockingCounter::DecrementCount() {
MutexLock l(&lock_);
count_--;
if (count_ < 0) {
ABSL_RAW_LOG(
FATAL,
"BlockingCounter::DecrementCount() called too many times. count=%d",
count_);
}
return count_ == 0;
}
void BlockingCounter::Wait() {
MutexLock l(&this->lock_);
ABSL_RAW_CHECK(count_ >= 0, "BlockingCounter underflow");
// only one thread may call Wait(). To support more than one thread,
// implement a counter num_to_exit, like in the Barrier class.
ABSL_RAW_CHECK(num_waiting_ == 0, "multiple threads called Wait()");
num_waiting_++;
this->lock_.Await(Condition(IsZero, &this->count_));
// At this point, We know that all threads executing DecrementCount have
// released the lock, and so will not touch this object again.
// Therefore, the thread calling this method is free to delete the object
// after we return from this method.
}
} // namespace absl

View file

@ -0,0 +1,96 @@
//
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// -----------------------------------------------------------------------------
// blocking_counter.h
// -----------------------------------------------------------------------------
#ifndef ABSL_SYNCHRONIZATION_BLOCKING_COUNTER_H_
#define ABSL_SYNCHRONIZATION_BLOCKING_COUNTER_H_
#include "absl/base/thread_annotations.h"
#include "absl/synchronization/mutex.h"
namespace absl {
// BlockingCounter
//
// This class allows a thread to block for a pre-specified number of actions.
// `BlockingCounter` maintains a single non-negative abstract integer "count"
// with an initial value `initial_count`. A thread can then call `Wait()` on
// this blocking counter to block until the specified number of events occur;
// worker threads then call 'DecrementCount()` on the counter upon completion of
// their work. Once the counter's internal "count" reaches zero, the blocked
// thread unblocks.
//
// A `BlockingCounter` requires the following:
// - its `initial_count` is non-negative.
// - the number of calls to `DecrementCount()` on it is at most
// `initial_count`.
// - `Wait()` is called at most once on it.
//
// Given the above requirements, a `BlockingCounter` provides the following
// guarantees:
// - Once its internal "count" reaches zero, no legal action on the object
// can further change the value of "count".
// - When `Wait()` returns, it is legal to destroy the `BlockingCounter`.
// - When `Wait()` returns, the number of calls to `DecrementCount()` on
// this blocking counter exactly equals `initial_count`.
//
// Example:
// BlockingCounter bcount(N); // there are N items of work
// ... Allow worker threads to start.
// ... On completing each work item, workers do:
// ... bcount.DecrementCount(); // an item of work has been completed
//
// bcount.Wait(); // wait for all work to be complete
//
class BlockingCounter {
public:
explicit BlockingCounter(int initial_count)
: count_(initial_count), num_waiting_(0) {}
BlockingCounter(const BlockingCounter&) = delete;
BlockingCounter& operator=(const BlockingCounter&) = delete;
// BlockingCounter::DecrementCount()
//
// Decrements the counter's "count" by one, and return "count == 0". This
// function requires that "count != 0" when it is called.
//
// Memory ordering: For any threads X and Y, any action taken by X
// before it calls `DecrementCount()` is visible to thread Y after
// Y's call to `DecrementCount()`, provided Y's call returns `true`.
bool DecrementCount();
// BlockingCounter::Wait()
//
// Blocks until the counter reaches zero. This function may be called at most
// once. On return, `DecrementCount()` will have been called "initial_count"
// times and the blocking counter may be destroyed.
//
// Memory ordering: For any threads X and Y, any action taken by X
// before X calls `DecrementCount()` is visible to Y after Y returns
// from `Wait()`.
void Wait();
private:
Mutex lock_;
int count_ GUARDED_BY(lock_);
int num_waiting_ GUARDED_BY(lock_);
};
} // namespace absl
#endif // ABSL_SYNCHRONIZATION_BLOCKING_COUNTER_H_

View file

@ -0,0 +1,67 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "absl/synchronization/blocking_counter.h"
#include <functional>
#include <memory>
#include <thread> // NOLINT(build/c++11)
#include <vector>
#include "gtest/gtest.h"
#include "absl/time/clock.h"
namespace absl {
namespace {
void PauseAndDecreaseCounter(BlockingCounter* counter, int* done) {
absl::SleepFor(absl::Seconds(1));
*done = 1;
counter->DecrementCount();
}
TEST(BlockingCounterTest, BasicFunctionality) {
// This test verifies that BlockingCounter functions correctly. Starts a
// number of threads that just sleep for a second and decrement a counter.
// Initialize the counter.
const int num_workers = 10;
BlockingCounter counter(num_workers);
std::vector<std::thread> workers;
std::vector<int> done(num_workers, 0);
// Start a number of parallel tasks that will just wait for a seconds and
// then decrement the count.
workers.reserve(num_workers);
for (int k = 0; k < num_workers; k++) {
workers.emplace_back(
[&counter, &done, k] { PauseAndDecreaseCounter(&counter, &done[k]); });
}
// Wait for the threads to have all finished.
counter.Wait();
// Check that all the workers have completed.
for (int k = 0; k < num_workers; k++) {
EXPECT_EQ(1, done[k]);
}
for (std::thread& w : workers) {
w.join();
}
}
} // namespace
} // namespace absl

View file

@ -0,0 +1,110 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is a no-op if the required LowLevelAlloc support is missing.
#include "absl/base/internal/low_level_alloc.h"
#ifndef ABSL_LOW_LEVEL_ALLOC_MISSING
#include <string.h>
#include <atomic>
#include <memory>
#include "absl/base/internal/spinlock.h"
#include "absl/base/internal/thread_identity.h"
#include "absl/synchronization/internal/per_thread_sem.h"
namespace absl {
namespace synchronization_internal {
// ThreadIdentity storage is persistent, we maintain a free-list of previously
// released ThreadIdentity objects.
static base_internal::SpinLock freelist_lock(base_internal::kLinkerInitialized);
static base_internal::ThreadIdentity* thread_identity_freelist;
// A per-thread destructor for reclaiming associated ThreadIdentity objects.
// Since we must preserve their storage we cache them for re-use.
static void ReclaimThreadIdentity(void* v) {
base_internal::ThreadIdentity* identity =
static_cast<base_internal::ThreadIdentity*>(v);
// all_locks might have been allocated by the Mutex implementation.
// We free it here when we are notified that our thread is dying.
if (identity->per_thread_synch.all_locks != nullptr) {
base_internal::LowLevelAlloc::Free(identity->per_thread_synch.all_locks);
}
// We must explicitly clear the current thread's identity:
// (a) Subsequent (unrelated) per-thread destructors may require an identity.
// We must guarantee a new identity is used in this case (this instructor
// will be reinvoked up to PTHREAD_DESTRUCTOR_ITERATIONS in this case).
// (b) ThreadIdentity implementations may depend on memory that is not
// reinitialized before reuse. We must allow explicit clearing of the
// association state in this case.
base_internal::ClearCurrentThreadIdentity();
{
base_internal::SpinLockHolder l(&freelist_lock);
identity->next = thread_identity_freelist;
thread_identity_freelist = identity;
}
}
// Return value rounded up to next multiple of align.
// Align must be a power of two.
static intptr_t RoundUp(intptr_t addr, intptr_t align) {
return (addr + align - 1) & ~(align - 1);
}
static base_internal::ThreadIdentity* NewThreadIdentity() {
base_internal::ThreadIdentity* identity = nullptr;
{
// Re-use a previously released object if possible.
base_internal::SpinLockHolder l(&freelist_lock);
if (thread_identity_freelist) {
identity = thread_identity_freelist; // Take list-head.
thread_identity_freelist = thread_identity_freelist->next;
}
}
if (identity == nullptr) {
// Allocate enough space to align ThreadIdentity to a multiple of
// PerThreadSynch::kAlignment. This space is never released (it is
// added to a freelist by ReclaimThreadIdentity instead).
void* allocation = base_internal::LowLevelAlloc::Alloc(
sizeof(*identity) + base_internal::PerThreadSynch::kAlignment - 1);
// Round up the address to the required alignment.
identity = reinterpret_cast<base_internal::ThreadIdentity*>(
RoundUp(reinterpret_cast<intptr_t>(allocation),
base_internal::PerThreadSynch::kAlignment));
}
memset(identity, 0, sizeof(*identity));
return identity;
}
// Allocates and attaches ThreadIdentity object for the calling thread. Returns
// the new identity.
// REQUIRES: CurrentThreadIdentity(false) == nullptr
base_internal::ThreadIdentity* CreateThreadIdentity() {
base_internal::ThreadIdentity* identity = NewThreadIdentity();
PerThreadSem::Init(identity);
// Associate the value with the current thread, and attach our destructor.
base_internal::SetCurrentThreadIdentity(identity, ReclaimThreadIdentity);
return identity;
}
} // namespace synchronization_internal
} // namespace absl
#endif // ABSL_LOW_LEVEL_ALLOC_MISSING

View file

@ -0,0 +1,53 @@
/*
* Copyright 2017 The Abseil Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Interface for getting the current ThreadIdentity, creating one if necessary.
// See thread_identity.h.
//
// This file is separate from thread_identity.h because creating a new
// ThreadIdentity requires slightly higher level libraries (per_thread_sem
// and low_level_alloc) than accessing an existing one. This separation allows
// us to have a smaller //absl/base:base.
#ifndef ABSL_SYNCHRONIZATION_INTERNAL_CREATE_THREAD_IDENTITY_H_
#define ABSL_SYNCHRONIZATION_INTERNAL_CREATE_THREAD_IDENTITY_H_
#include "absl/base/internal/thread_identity.h"
#include "absl/base/port.h"
namespace absl {
namespace synchronization_internal {
// Allocates and attaches a ThreadIdentity object for the calling thread.
// For private use only.
base_internal::ThreadIdentity* CreateThreadIdentity();
// Returns the ThreadIdentity object representing the calling thread; guaranteed
// to be unique for its lifetime. The returned object will remain valid for the
// program's lifetime; although it may be re-assigned to a subsequent thread.
// If one does not exist for the calling thread, allocate it now.
inline base_internal::ThreadIdentity* GetOrCreateCurrentThreadIdentity() {
base_internal::ThreadIdentity* identity =
base_internal::CurrentThreadIdentityIfPresent();
if (ABSL_PREDICT_FALSE(identity == nullptr)) {
return CreateThreadIdentity();
}
return identity;
}
} // namespace synchronization_internal
} // namespace absl
#endif // ABSL_SYNCHRONIZATION_INTERNAL_CREATE_THREAD_IDENTITY_H_

View file

@ -0,0 +1,709 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// GraphCycles provides incremental cycle detection on a dynamic
// graph using the following algorithm:
//
// A dynamic topological sort algorithm for directed acyclic graphs
// David J. Pearce, Paul H. J. Kelly
// Journal of Experimental Algorithmics (JEA) JEA Homepage archive
// Volume 11, 2006, Article No. 1.7
//
// Brief summary of the algorithm:
//
// (1) Maintain a rank for each node that is consistent
// with the topological sort of the graph. I.e., path from x to y
// implies rank[x] < rank[y].
// (2) When a new edge (x->y) is inserted, do nothing if rank[x] < rank[y].
// (3) Otherwise: adjust ranks in the neighborhood of x and y.
// This file is a no-op if the required LowLevelAlloc support is missing.
#include "absl/base/internal/low_level_alloc.h"
#ifndef ABSL_LOW_LEVEL_ALLOC_MISSING
#include "absl/synchronization/internal/graphcycles.h"
#include <algorithm>
#include <array>
#include "absl/base/internal/raw_logging.h"
#include "absl/base/internal/spinlock.h"
// Do not use STL. This module does not use standard memory allocation.
namespace absl {
namespace synchronization_internal {
namespace {
// Avoid LowLevelAlloc's default arena since it calls malloc hooks in
// which people are doing things like acquiring Mutexes.
static absl::base_internal::SpinLock arena_mu(
absl::base_internal::kLinkerInitialized);
static base_internal::LowLevelAlloc::Arena* arena;
static void InitArenaIfNecessary() {
arena_mu.Lock();
if (arena == nullptr) {
arena = base_internal::LowLevelAlloc::NewArena(
0, base_internal::LowLevelAlloc::DefaultArena());
}
arena_mu.Unlock();
}
// Number of inlined elements in Vec. Hash table implementation
// relies on this being a power of two.
static const uint32_t kInline = 8;
// A simple LowLevelAlloc based resizable vector with inlined storage
// for a few elements. T must be a plain type since constructor
// and destructor are not run on elements of type T managed by Vec.
template <typename T>
class Vec {
public:
Vec() { Init(); }
~Vec() { Discard(); }
void clear() {
Discard();
Init();
}
bool empty() const { return size_ == 0; }
uint32_t size() const { return size_; }
T* begin() { return ptr_; }
T* end() { return ptr_ + size_; }
const T& operator[](uint32_t i) const { return ptr_[i]; }
T& operator[](uint32_t i) { return ptr_[i]; }
const T& back() const { return ptr_[size_-1]; }
void pop_back() { size_--; }
void push_back(const T& v) {
if (size_ == capacity_) Grow(size_ + 1);
ptr_[size_] = v;
size_++;
}
void resize(uint32_t n) {
if (n > capacity_) Grow(n);
size_ = n;
}
void fill(const T& val) {
for (uint32_t i = 0; i < size(); i++) {
ptr_[i] = val;
}
}
// Guarantees src is empty at end.
// Provided for the hash table resizing code below.
void MoveFrom(Vec<T>* src) {
if (src->ptr_ == src->space_) {
// Need to actually copy
resize(src->size_);
std::copy(src->ptr_, src->ptr_ + src->size_, ptr_);
src->size_ = 0;
} else {
Discard();
ptr_ = src->ptr_;
size_ = src->size_;
capacity_ = src->capacity_;
src->Init();
}
}
private:
T* ptr_;
T space_[kInline];
uint32_t size_;
uint32_t capacity_;
void Init() {
ptr_ = space_;
size_ = 0;
capacity_ = kInline;
}
void Discard() {
if (ptr_ != space_) base_internal::LowLevelAlloc::Free(ptr_);
}
void Grow(uint32_t n) {
while (capacity_ < n) {
capacity_ *= 2;
}
size_t request = static_cast<size_t>(capacity_) * sizeof(T);
T* copy = static_cast<T*>(
base_internal::LowLevelAlloc::AllocWithArena(request, arena));
std::copy(ptr_, ptr_ + size_, copy);
Discard();
ptr_ = copy;
}
Vec(const Vec&) = delete;
Vec& operator=(const Vec&) = delete;
};
// A hash set of non-negative int32_t that uses Vec for its underlying storage.
class NodeSet {
public:
NodeSet() { Init(); }
void clear() { Init(); }
bool contains(int32_t v) const { return table_[FindIndex(v)] == v; }
bool insert(int32_t v) {
uint32_t i = FindIndex(v);
if (table_[i] == v) {
return false;
}
if (table_[i] == kEmpty) {
// Only inserting over an empty cell increases the number of occupied
// slots.
occupied_++;
}
table_[i] = v;
// Double when 75% full.
if (occupied_ >= table_.size() - table_.size()/4) Grow();
return true;
}
void erase(uint32_t v) {
uint32_t i = FindIndex(v);
if (static_cast<uint32_t>(table_[i]) == v) {
table_[i] = kDel;
}
}
// Iteration: is done via HASH_FOR_EACH
// Example:
// HASH_FOR_EACH(elem, node->out) { ... }
#define HASH_FOR_EACH(elem, eset) \
for (int32_t elem, _cursor = 0; (eset).Next(&_cursor, &elem); )
bool Next(int32_t* cursor, int32_t* elem) {
while (static_cast<uint32_t>(*cursor) < table_.size()) {
int32_t v = table_[*cursor];
(*cursor)++;
if (v >= 0) {
*elem = v;
return true;
}
}
return false;
}
private:
static const int32_t kEmpty;
static const int32_t kDel;
Vec<int32_t> table_;
uint32_t occupied_; // Count of non-empty slots (includes deleted slots)
static uint32_t Hash(uint32_t a) { return a * 41; }
// Return index for storing v. May return an empty index or deleted index
int FindIndex(int32_t v) const {
// Search starting at hash index.
const uint32_t mask = table_.size() - 1;
uint32_t i = Hash(v) & mask;
int deleted_index = -1; // If >= 0, index of first deleted element we see
while (true) {
int32_t e = table_[i];
if (v == e) {
return i;
} else if (e == kEmpty) {
// Return any previously encountered deleted slot.
return (deleted_index >= 0) ? deleted_index : i;
} else if (e == kDel && deleted_index < 0) {
// Keep searching since v might be present later.
deleted_index = i;
}
i = (i + 1) & mask; // Linear probing; quadratic is slightly slower.
}
}
void Init() {
table_.clear();
table_.resize(kInline);
table_.fill(kEmpty);
occupied_ = 0;
}
void Grow() {
Vec<int32_t> copy;
copy.MoveFrom(&table_);
occupied_ = 0;
table_.resize(copy.size() * 2);
table_.fill(kEmpty);
for (const auto& e : copy) {
if (e >= 0) insert(e);
}
}
NodeSet(const NodeSet&) = delete;
NodeSet& operator=(const NodeSet&) = delete;
};
const int32_t NodeSet::kEmpty = -1;
const int32_t NodeSet::kDel = -2;
// We encode a node index and a node version in GraphId. The version
// number is incremented when the GraphId is freed which automatically
// invalidates all copies of the GraphId.
inline GraphId MakeId(int32_t index, uint32_t version) {
GraphId g;
g.handle =
(static_cast<uint64_t>(version) << 32) | static_cast<uint32_t>(index);
return g;
}
inline int32_t NodeIndex(GraphId id) {
return static_cast<uint32_t>(id.handle & 0xfffffffful);
}
inline uint32_t NodeVersion(GraphId id) {
return static_cast<uint32_t>(id.handle >> 32);
}
// We need to hide Mutexes (or other deadlock detection's pointers)
// from the leak detector. Xor with an arbitrary number with high bits set.
static const uintptr_t kHideMask = static_cast<uintptr_t>(0xF03A5F7BF03A5F7Bll);
static inline uintptr_t MaskPtr(void *ptr) {
return reinterpret_cast<uintptr_t>(ptr) ^ kHideMask;
}
static inline void* UnmaskPtr(uintptr_t word) {
return reinterpret_cast<void*>(word ^ kHideMask);
}
struct Node {
int32_t rank; // rank number assigned by Pearce-Kelly algorithm
uint32_t version; // Current version number
int32_t next_hash; // Next entry in hash table
bool visited; // Temporary marker used by depth-first-search
uintptr_t masked_ptr; // User-supplied pointer
NodeSet in; // List of immediate predecessor nodes in graph
NodeSet out; // List of immediate successor nodes in graph
int priority; // Priority of recorded stack trace.
int nstack; // Depth of recorded stack trace.
void* stack[40]; // stack[0,nstack-1] holds stack trace for node.
};
// Hash table for pointer to node index lookups.
class PointerMap {
public:
explicit PointerMap(const Vec<Node*>* nodes) : nodes_(nodes) {
table_.fill(-1);
}
int32_t Find(void* ptr) {
auto masked = MaskPtr(ptr);
for (int32_t i = table_[Hash(ptr)]; i != -1;) {
Node* n = (*nodes_)[i];
if (n->masked_ptr == masked) return i;
i = n->next_hash;
}
return -1;
}
void Add(void* ptr, int32_t i) {
int32_t* head = &table_[Hash(ptr)];
(*nodes_)[i]->next_hash = *head;
*head = i;
}
int32_t Remove(void* ptr) {
// Advance through linked list while keeping track of the
// predecessor slot that points to the current entry.
auto masked = MaskPtr(ptr);
for (int32_t* slot = &table_[Hash(ptr)]; *slot != -1; ) {
int32_t index = *slot;
Node* n = (*nodes_)[index];
if (n->masked_ptr == masked) {
*slot = n->next_hash; // Remove n from linked list
n->next_hash = -1;
return index;
}
slot = &n->next_hash;
}
return -1;
}
private:
// Number of buckets in hash table for pointer lookups.
static constexpr uint32_t kHashTableSize = 8171; // should be prime
const Vec<Node*>* nodes_;
std::array<int32_t, kHashTableSize> table_;
static uint32_t Hash(void* ptr) {
return reinterpret_cast<uintptr_t>(ptr) % kHashTableSize;
}
};
} // namespace
struct GraphCycles::Rep {
Vec<Node*> nodes_;
Vec<int32_t> free_nodes_; // Indices for unused entries in nodes_
PointerMap ptrmap_;
// Temporary state.
Vec<int32_t> deltaf_; // Results of forward DFS
Vec<int32_t> deltab_; // Results of backward DFS
Vec<int32_t> list_; // All nodes to reprocess
Vec<int32_t> merged_; // Rank values to assign to list_ entries
Vec<int32_t> stack_; // Emulates recursion stack for depth-first searches
Rep() : ptrmap_(&nodes_) {}
};
static Node* FindNode(GraphCycles::Rep* rep, GraphId id) {
Node* n = rep->nodes_[NodeIndex(id)];
return (n->version == NodeVersion(id)) ? n : nullptr;
}
GraphCycles::GraphCycles() {
InitArenaIfNecessary();
rep_ = new (base_internal::LowLevelAlloc::AllocWithArena(sizeof(Rep), arena))
Rep;
}
GraphCycles::~GraphCycles() {
for (auto* node : rep_->nodes_) {
node->Node::~Node();
base_internal::LowLevelAlloc::Free(node);
}
rep_->Rep::~Rep();
base_internal::LowLevelAlloc::Free(rep_);
}
bool GraphCycles::CheckInvariants() const {
Rep* r = rep_;
NodeSet ranks; // Set of ranks seen so far.
for (uint32_t x = 0; x < r->nodes_.size(); x++) {
Node* nx = r->nodes_[x];
void* ptr = UnmaskPtr(nx->masked_ptr);
if (ptr != nullptr && static_cast<uint32_t>(r->ptrmap_.Find(ptr)) != x) {
ABSL_RAW_LOG(FATAL, "Did not find live node in hash table %u %p", x, ptr);
}
if (nx->visited) {
ABSL_RAW_LOG(FATAL, "Did not clear visited marker on node %u", x);
}
if (!ranks.insert(nx->rank)) {
ABSL_RAW_LOG(FATAL, "Duplicate occurrence of rank %d", nx->rank);
}
HASH_FOR_EACH(y, nx->out) {
Node* ny = r->nodes_[y];
if (nx->rank >= ny->rank) {
ABSL_RAW_LOG(FATAL, "Edge %u->%d has bad rank assignment %d->%d", x, y,
nx->rank, ny->rank);
}
}
}
return true;
}
GraphId GraphCycles::GetId(void* ptr) {
int32_t i = rep_->ptrmap_.Find(ptr);
if (i != -1) {
return MakeId(i, rep_->nodes_[i]->version);
} else if (rep_->free_nodes_.empty()) {
Node* n =
new (base_internal::LowLevelAlloc::AllocWithArena(sizeof(Node), arena))
Node;
n->version = 1; // Avoid 0 since it is used by InvalidGraphId()
n->visited = false;
n->rank = rep_->nodes_.size();
n->masked_ptr = MaskPtr(ptr);
n->nstack = 0;
n->priority = 0;
rep_->nodes_.push_back(n);
rep_->ptrmap_.Add(ptr, n->rank);
return MakeId(n->rank, n->version);
} else {
// Preserve preceding rank since the set of ranks in use must be
// a permutation of [0,rep_->nodes_.size()-1].
int32_t r = rep_->free_nodes_.back();
rep_->free_nodes_.pop_back();
Node* n = rep_->nodes_[r];
n->masked_ptr = MaskPtr(ptr);
n->nstack = 0;
n->priority = 0;
rep_->ptrmap_.Add(ptr, r);
return MakeId(r, n->version);
}
}
void GraphCycles::RemoveNode(void* ptr) {
int32_t i = rep_->ptrmap_.Remove(ptr);
if (i == -1) {
return;
}
Node* x = rep_->nodes_[i];
HASH_FOR_EACH(y, x->out) {
rep_->nodes_[y]->in.erase(i);
}
HASH_FOR_EACH(y, x->in) {
rep_->nodes_[y]->out.erase(i);
}
x->in.clear();
x->out.clear();
x->masked_ptr = MaskPtr(nullptr);
if (x->version == std::numeric_limits<uint32_t>::max()) {
// Cannot use x any more
} else {
x->version++; // Invalidates all copies of node.
rep_->free_nodes_.push_back(i);
}
}
void* GraphCycles::Ptr(GraphId id) {
Node* n = FindNode(rep_, id);
return n == nullptr ? nullptr : UnmaskPtr(n->masked_ptr);
}
bool GraphCycles::HasNode(GraphId node) {
return FindNode(rep_, node) != nullptr;
}
bool GraphCycles::HasEdge(GraphId x, GraphId y) const {
Node* xn = FindNode(rep_, x);
return xn && FindNode(rep_, y) && xn->out.contains(NodeIndex(y));
}
void GraphCycles::RemoveEdge(GraphId x, GraphId y) {
Node* xn = FindNode(rep_, x);
Node* yn = FindNode(rep_, y);
if (xn && yn) {
xn->out.erase(NodeIndex(y));
yn->in.erase(NodeIndex(x));
// No need to update the rank assignment since a previous valid
// rank assignment remains valid after an edge deletion.
}
}
static bool ForwardDFS(GraphCycles::Rep* r, int32_t n, int32_t upper_bound);
static void BackwardDFS(GraphCycles::Rep* r, int32_t n, int32_t lower_bound);
static void Reorder(GraphCycles::Rep* r);
static void Sort(const Vec<Node*>&, Vec<int32_t>* delta);
static void MoveToList(
GraphCycles::Rep* r, Vec<int32_t>* src, Vec<int32_t>* dst);
bool GraphCycles::InsertEdge(GraphId idx, GraphId idy) {
Rep* r = rep_;
const int32_t x = NodeIndex(idx);
const int32_t y = NodeIndex(idy);
Node* nx = FindNode(r, idx);
Node* ny = FindNode(r, idy);
if (nx == nullptr || ny == nullptr) return true; // Expired ids
if (nx == ny) return false; // Self edge
if (!nx->out.insert(y)) {
// Edge already exists.
return true;
}
ny->in.insert(x);
if (nx->rank <= ny->rank) {
// New edge is consistent with existing rank assignment.
return true;
}
// Current rank assignments are incompatible with the new edge. Recompute.
// We only need to consider nodes that fall in the range [ny->rank,nx->rank].
if (!ForwardDFS(r, y, nx->rank)) {
// Found a cycle. Undo the insertion and tell caller.
nx->out.erase(y);
ny->in.erase(x);
// Since we do not call Reorder() on this path, clear any visited
// markers left by ForwardDFS.
for (const auto& d : r->deltaf_) {
r->nodes_[d]->visited = false;
}
return false;
}
BackwardDFS(r, x, ny->rank);
Reorder(r);
return true;
}
static bool ForwardDFS(GraphCycles::Rep* r, int32_t n, int32_t upper_bound) {
// Avoid recursion since stack space might be limited.
// We instead keep a stack of nodes to visit.
r->deltaf_.clear();
r->stack_.clear();
r->stack_.push_back(n);
while (!r->stack_.empty()) {
n = r->stack_.back();
r->stack_.pop_back();
Node* nn = r->nodes_[n];
if (nn->visited) continue;
nn->visited = true;
r->deltaf_.push_back(n);
HASH_FOR_EACH(w, nn->out) {
Node* nw = r->nodes_[w];
if (nw->rank == upper_bound) {
return false; // Cycle
}
if (!nw->visited && nw->rank < upper_bound) {
r->stack_.push_back(w);
}
}
}
return true;
}
static void BackwardDFS(GraphCycles::Rep* r, int32_t n, int32_t lower_bound) {
r->deltab_.clear();
r->stack_.clear();
r->stack_.push_back(n);
while (!r->stack_.empty()) {
n = r->stack_.back();
r->stack_.pop_back();
Node* nn = r->nodes_[n];
if (nn->visited) continue;
nn->visited = true;
r->deltab_.push_back(n);
HASH_FOR_EACH(w, nn->in) {
Node* nw = r->nodes_[w];
if (!nw->visited && lower_bound < nw->rank) {
r->stack_.push_back(w);
}
}
}
}
static void Reorder(GraphCycles::Rep* r) {
Sort(r->nodes_, &r->deltab_);
Sort(r->nodes_, &r->deltaf_);
// Adds contents of delta lists to list_ (backwards deltas first).
r->list_.clear();
MoveToList(r, &r->deltab_, &r->list_);
MoveToList(r, &r->deltaf_, &r->list_);
// Produce sorted list of all ranks that will be reassigned.
r->merged_.resize(r->deltab_.size() + r->deltaf_.size());
std::merge(r->deltab_.begin(), r->deltab_.end(),
r->deltaf_.begin(), r->deltaf_.end(),
r->merged_.begin());
// Assign the ranks in order to the collected list.
for (uint32_t i = 0; i < r->list_.size(); i++) {
r->nodes_[r->list_[i]]->rank = r->merged_[i];
}
}
static void Sort(const Vec<Node*>& nodes, Vec<int32_t>* delta) {
struct ByRank {
const Vec<Node*>* nodes;
bool operator()(int32_t a, int32_t b) const {
return (*nodes)[a]->rank < (*nodes)[b]->rank;
}
};
ByRank cmp;
cmp.nodes = &nodes;
std::sort(delta->begin(), delta->end(), cmp);
}
static void MoveToList(
GraphCycles::Rep* r, Vec<int32_t>* src, Vec<int32_t>* dst) {
for (auto& v : *src) {
int32_t w = v;
v = r->nodes_[w]->rank; // Replace v entry with its rank
r->nodes_[w]->visited = false; // Prepare for future DFS calls
dst->push_back(w);
}
}
int GraphCycles::FindPath(GraphId idx, GraphId idy, int max_path_len,
GraphId path[]) const {
Rep* r = rep_;
if (FindNode(r, idx) == nullptr || FindNode(r, idy) == nullptr) return 0;
const int32_t x = NodeIndex(idx);
const int32_t y = NodeIndex(idy);
// Forward depth first search starting at x until we hit y.
// As we descend into a node, we push it onto the path.
// As we leave a node, we remove it from the path.
int path_len = 0;
NodeSet seen;
r->stack_.clear();
r->stack_.push_back(x);
while (!r->stack_.empty()) {
int32_t n = r->stack_.back();
r->stack_.pop_back();
if (n < 0) {
// Marker to indicate that we are leaving a node
path_len--;
continue;
}
if (path_len < max_path_len) {
path[path_len] = MakeId(n, rep_->nodes_[n]->version);
}
path_len++;
r->stack_.push_back(-1); // Will remove tentative path entry
if (n == y) {
return path_len;
}
HASH_FOR_EACH(w, r->nodes_[n]->out) {
if (seen.insert(w)) {
r->stack_.push_back(w);
}
}
}
return 0;
}
bool GraphCycles::IsReachable(GraphId x, GraphId y) const {
return FindPath(x, y, 0, nullptr) > 0;
}
void GraphCycles::UpdateStackTrace(GraphId id, int priority,
int (*get_stack_trace)(void** stack, int)) {
Node* n = FindNode(rep_, id);
if (n == nullptr || n->priority >= priority) {
return;
}
n->nstack = (*get_stack_trace)(n->stack, ABSL_ARRAYSIZE(n->stack));
n->priority = priority;
}
int GraphCycles::GetStackTrace(GraphId id, void*** ptr) {
Node* n = FindNode(rep_, id);
if (n == nullptr) {
*ptr = nullptr;
return 0;
} else {
*ptr = n->stack;
return n->nstack;
}
}
} // namespace synchronization_internal
} // namespace absl
#endif // ABSL_LOW_LEVEL_ALLOC_MISSING

View file

@ -0,0 +1,136 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef ABSL_SYNCHRONIZATION_INTERNAL_GRAPHCYCLES_H_
#define ABSL_SYNCHRONIZATION_INTERNAL_GRAPHCYCLES_H_
// GraphCycles detects the introduction of a cycle into a directed
// graph that is being built up incrementally.
//
// Nodes are identified by small integers. It is not possible to
// record multiple edges with the same (source, destination) pair;
// requests to add an edge where one already exists are silently
// ignored.
//
// It is also not possible to introduce a cycle; an attempt to insert
// an edge that would introduce a cycle fails and returns false.
//
// GraphCycles uses no internal locking; calls into it should be
// serialized externally.
// Performance considerations:
// Works well on sparse graphs, poorly on dense graphs.
// Extra information is maintained incrementally to detect cycles quickly.
// InsertEdge() is very fast when the edge already exists, and reasonably fast
// otherwise.
// FindPath() is linear in the size of the graph.
// The current implemenation uses O(|V|+|E|) space.
#include <cstdint>
namespace absl {
namespace synchronization_internal {
// Opaque identifier for a graph node.
struct GraphId {
uint64_t handle;
bool operator==(const GraphId& x) const { return handle == x.handle; }
bool operator!=(const GraphId& x) const { return handle != x.handle; }
};
// Return an invalid graph id that will never be assigned by GraphCycles.
inline GraphId InvalidGraphId() {
return GraphId{0};
}
class GraphCycles {
public:
GraphCycles();
~GraphCycles();
// Return the id to use for ptr, assigning one if necessary.
// Subsequent calls with the same ptr value will return the same id
// until Remove().
GraphId GetId(void* ptr);
// Remove "ptr" from the graph. Its corresponding node and all
// edges to and from it are removed.
void RemoveNode(void* ptr);
// Return the pointer associated with id, or nullptr if id is not
// currently in the graph.
void* Ptr(GraphId id);
// Attempt to insert an edge from source_node to dest_node. If the
// edge would introduce a cycle, return false without making any
// changes. Otherwise add the edge and return true.
bool InsertEdge(GraphId source_node, GraphId dest_node);
// Remove any edge that exists from source_node to dest_node.
void RemoveEdge(GraphId source_node, GraphId dest_node);
// Return whether node exists in the graph.
bool HasNode(GraphId node);
// Return whether there is an edge directly from source_node to dest_node.
bool HasEdge(GraphId source_node, GraphId dest_node) const;
// Return whether dest_node is reachable from source_node
// by following edges.
bool IsReachable(GraphId source_node, GraphId dest_node) const;
// Find a path from "source" to "dest". If such a path exists,
// place the nodes on the path in the array path[], and return
// the number of nodes on the path. If the path is longer than
// max_path_len nodes, only the first max_path_len nodes are placed
// in path[]. The client should compare the return value with
// max_path_len" to see when this occurs. If no path exists, return
// 0. Any valid path stored in path[] will start with "source" and
// end with "dest". There is no guarantee that the path is the
// shortest, but no node will appear twice in the path, except the
// source and destination node if they are identical; therefore, the
// return value is at most one greater than the number of nodes in
// the graph.
int FindPath(GraphId source, GraphId dest, int max_path_len,
GraphId path[]) const;
// Update the stack trace recorded for id with the current stack
// trace if the last time it was updated had a smaller priority
// than the priority passed on this call.
//
// *get_stack_trace is called to get the stack trace.
void UpdateStackTrace(GraphId id, int priority,
int (*get_stack_trace)(void**, int));
// Set *ptr to the beginning of the array that holds the recorded
// stack trace for id and return the depth of the stack trace.
int GetStackTrace(GraphId id, void*** ptr);
// Check internal invariants. Crashes on failure, returns true on success.
// Expensive: should only be called from graphcycles_test.cc.
bool CheckInvariants() const;
// ----------------------------------------------------
struct Rep;
private:
Rep *rep_; // opaque representation
GraphCycles(const GraphCycles&) = delete;
GraphCycles& operator=(const GraphCycles&) = delete;
};
} // namespace synchronization_internal
} // namespace absl
#endif

View file

@ -0,0 +1,471 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Copyright 2007 Google, Inc.
// All rights reserved.
// Author: Mike Burrows
// A test for the GraphCycles interface.
// This test is testing a component of //third_party/absl. As written it
// heavily uses logging, including VLOG, so this test can't ship with Abseil.
// We're leaving it here until Abseil gets base/logging.h in a future release.
#include "absl/synchronization/internal/graphcycles.h"
#include <map>
#include <random>
#include <vector>
#include <unordered_set>
#include "gtest/gtest.h"
#include "absl/base/internal/raw_logging.h"
#include "absl/base/macros.h"
namespace absl {
namespace synchronization_internal {
// We emulate a GraphCycles object with a node vector and an edge vector.
// We then compare the two implementations.
using Nodes = std::vector<int>;
struct Edge {
int from;
int to;
};
using Edges = std::vector<Edge>;
using RandomEngine = std::mt19937_64;
// Mapping from integer index to GraphId.
typedef std::map<int, GraphId> IdMap;
static GraphId Get(const IdMap& id, int num) {
auto iter = id.find(num);
return (iter == id.end()) ? InvalidGraphId() : iter->second;
}
// Return whether "to" is reachable from "from".
static bool IsReachable(Edges *edges, int from, int to,
std::unordered_set<int> *seen) {
seen->insert(from); // we are investigating "from"; don't do it again
if (from == to) return true;
for (const auto &edge : *edges) {
if (edge.from == from) {
if (edge.to == to) { // success via edge directly
return true;
} else if (seen->find(edge.to) == seen->end() && // success via edge
IsReachable(edges, edge.to, to, seen)) {
return true;
}
}
}
return false;
}
static void PrintEdges(Edges *edges) {
ABSL_RAW_LOG(INFO, "EDGES (%zu)", edges->size());
for (const auto &edge : *edges) {
int a = edge.from;
int b = edge.to;
ABSL_RAW_LOG(INFO, "%d %d", a, b);
}
ABSL_RAW_LOG(INFO, "---");
}
static void PrintGCEdges(Nodes *nodes, const IdMap &id, GraphCycles *gc) {
ABSL_RAW_LOG(INFO, "GC EDGES");
for (int a : *nodes) {
for (int b : *nodes) {
if (gc->HasEdge(Get(id, a), Get(id, b))) {
ABSL_RAW_LOG(INFO, "%d %d", a, b);
}
}
}
ABSL_RAW_LOG(INFO, "---");
}
static void PrintTransitiveClosure(Nodes *nodes, Edges *edges) {
ABSL_RAW_LOG(INFO, "Transitive closure");
for (int a : *nodes) {
for (int b : *nodes) {
std::unordered_set<int> seen;
if (IsReachable(edges, a, b, &seen)) {
ABSL_RAW_LOG(INFO, "%d %d", a, b);
}
}
}
ABSL_RAW_LOG(INFO, "---");
}
static void PrintGCTransitiveClosure(Nodes *nodes, const IdMap &id,
GraphCycles *gc) {
ABSL_RAW_LOG(INFO, "GC Transitive closure");
for (int a : *nodes) {
for (int b : *nodes) {
if (gc->IsReachable(Get(id, a), Get(id, b))) {
ABSL_RAW_LOG(INFO, "%d %d", a, b);
}
}
}
ABSL_RAW_LOG(INFO, "---");
}
static void CheckTransitiveClosure(Nodes *nodes, Edges *edges, const IdMap &id,
GraphCycles *gc) {
std::unordered_set<int> seen;
for (const auto &a : *nodes) {
for (const auto &b : *nodes) {
seen.clear();
bool gc_reachable = gc->IsReachable(Get(id, a), Get(id, b));
bool reachable = IsReachable(edges, a, b, &seen);
if (gc_reachable != reachable) {
PrintEdges(edges);
PrintGCEdges(nodes, id, gc);
PrintTransitiveClosure(nodes, edges);
PrintGCTransitiveClosure(nodes, id, gc);
ABSL_RAW_LOG(FATAL, "gc_reachable %s reachable %s a %d b %d",
gc_reachable ? "true" : "false",
reachable ? "true" : "false", a, b);
}
}
}
}
static void CheckEdges(Nodes *nodes, Edges *edges, const IdMap &id,
GraphCycles *gc) {
int count = 0;
for (const auto &edge : *edges) {
int a = edge.from;
int b = edge.to;
if (!gc->HasEdge(Get(id, a), Get(id, b))) {
PrintEdges(edges);
PrintGCEdges(nodes, id, gc);
ABSL_RAW_LOG(FATAL, "!gc->HasEdge(%d, %d)", a, b);
}
}
for (const auto &a : *nodes) {
for (const auto &b : *nodes) {
if (gc->HasEdge(Get(id, a), Get(id, b))) {
count++;
}
}
}
if (count != edges->size()) {
PrintEdges(edges);
PrintGCEdges(nodes, id, gc);
ABSL_RAW_LOG(FATAL, "edges->size() %zu count %d", edges->size(), count);
}
}
static void CheckInvariants(const GraphCycles &gc) {
if (ABSL_PREDICT_FALSE(!gc.CheckInvariants()))
ABSL_RAW_LOG(FATAL, "CheckInvariants");
}
// Returns the index of a randomly chosen node in *nodes.
// Requires *nodes be non-empty.
static int RandomNode(RandomEngine* rng, Nodes *nodes) {
std::uniform_int_distribution<int> uniform(0, nodes->size()-1);
return uniform(*rng);
}
// Returns the index of a randomly chosen edge in *edges.
// Requires *edges be non-empty.
static int RandomEdge(RandomEngine* rng, Edges *edges) {
std::uniform_int_distribution<int> uniform(0, edges->size()-1);
return uniform(*rng);
}
// Returns the index of edge (from, to) in *edges or -1 if it is not in *edges.
static int EdgeIndex(Edges *edges, int from, int to) {
int i = 0;
while (i != edges->size() &&
((*edges)[i].from != from || (*edges)[i].to != to)) {
i++;
}
return i == edges->size()? -1 : i;
}
TEST(GraphCycles, RandomizedTest) {
int next_node = 0;
Nodes nodes;
Edges edges; // from, to
IdMap id;
GraphCycles graph_cycles;
static const int kMaxNodes = 7; // use <= 7 nodes to keep test short
static const int kDataOffset = 17; // an offset to the node-specific data
int n = 100000;
int op = 0;
RandomEngine rng(testing::UnitTest::GetInstance()->random_seed());
std::uniform_int_distribution<int> uniform(0, 5);
auto ptr = [](intptr_t i) {
return reinterpret_cast<void*>(i + kDataOffset);
};
for (int iter = 0; iter != n; iter++) {
for (const auto &node : nodes) {
ASSERT_EQ(graph_cycles.Ptr(Get(id, node)), ptr(node)) << " node " << node;
}
CheckEdges(&nodes, &edges, id, &graph_cycles);
CheckTransitiveClosure(&nodes, &edges, id, &graph_cycles);
op = uniform(rng);
switch (op) {
case 0: // Add a node
if (nodes.size() < kMaxNodes) {
int new_node = next_node++;
GraphId new_gnode = graph_cycles.GetId(ptr(new_node));
ASSERT_NE(new_gnode, InvalidGraphId());
id[new_node] = new_gnode;
ASSERT_EQ(ptr(new_node), graph_cycles.Ptr(new_gnode));
nodes.push_back(new_node);
}
break;
case 1: // Remove a node
if (nodes.size() > 0) {
int node_index = RandomNode(&rng, &nodes);
int node = nodes[node_index];
nodes[node_index] = nodes.back();
nodes.pop_back();
graph_cycles.RemoveNode(ptr(node));
ASSERT_EQ(graph_cycles.Ptr(Get(id, node)), nullptr);
id.erase(node);
int i = 0;
while (i != edges.size()) {
if (edges[i].from == node || edges[i].to == node) {
edges[i] = edges.back();
edges.pop_back();
} else {
i++;
}
}
}
break;
case 2: // Add an edge
if (nodes.size() > 0) {
int from = RandomNode(&rng, &nodes);
int to = RandomNode(&rng, &nodes);
if (EdgeIndex(&edges, nodes[from], nodes[to]) == -1) {
if (graph_cycles.InsertEdge(id[nodes[from]], id[nodes[to]])) {
Edge new_edge;
new_edge.from = nodes[from];
new_edge.to = nodes[to];
edges.push_back(new_edge);
} else {
std::unordered_set<int> seen;
ASSERT_TRUE(IsReachable(&edges, nodes[to], nodes[from], &seen))
<< "Edge " << nodes[to] << "->" << nodes[from];
}
}
}
break;
case 3: // Remove an edge
if (edges.size() > 0) {
int i = RandomEdge(&rng, &edges);
int from = edges[i].from;
int to = edges[i].to;
ASSERT_EQ(i, EdgeIndex(&edges, from, to));
edges[i] = edges.back();
edges.pop_back();
ASSERT_EQ(-1, EdgeIndex(&edges, from, to));
graph_cycles.RemoveEdge(id[from], id[to]);
}
break;
case 4: // Check a path
if (nodes.size() > 0) {
int from = RandomNode(&rng, &nodes);
int to = RandomNode(&rng, &nodes);
GraphId path[2*kMaxNodes];
int path_len = graph_cycles.FindPath(id[nodes[from]], id[nodes[to]],
ABSL_ARRAYSIZE(path), path);
std::unordered_set<int> seen;
bool reachable = IsReachable(&edges, nodes[from], nodes[to], &seen);
bool gc_reachable =
graph_cycles.IsReachable(Get(id, nodes[from]), Get(id, nodes[to]));
ASSERT_EQ(path_len != 0, reachable);
ASSERT_EQ(path_len != 0, gc_reachable);
// In the following line, we add one because a node can appear
// twice, if the path is from that node to itself, perhaps via
// every other node.
ASSERT_LE(path_len, kMaxNodes + 1);
if (path_len != 0) {
ASSERT_EQ(id[nodes[from]], path[0]);
ASSERT_EQ(id[nodes[to]], path[path_len-1]);
for (int i = 1; i < path_len; i++) {
ASSERT_TRUE(graph_cycles.HasEdge(path[i-1], path[i]));
}
}
}
break;
case 5: // Check invariants
CheckInvariants(graph_cycles);
break;
default:
ABSL_RAW_LOG(FATAL, "op %d", op);
}
// Very rarely, test graph expansion by adding then removing many nodes.
std::bernoulli_distribution one_in_1024(1.0 / 1024);
if (one_in_1024(rng)) {
CheckEdges(&nodes, &edges, id, &graph_cycles);
CheckTransitiveClosure(&nodes, &edges, id, &graph_cycles);
for (int i = 0; i != 256; i++) {
int new_node = next_node++;
GraphId new_gnode = graph_cycles.GetId(ptr(new_node));
ASSERT_NE(InvalidGraphId(), new_gnode);
id[new_node] = new_gnode;
ASSERT_EQ(ptr(new_node), graph_cycles.Ptr(new_gnode));
for (const auto &node : nodes) {
ASSERT_NE(node, new_node);
}
nodes.push_back(new_node);
}
for (int i = 0; i != 256; i++) {
ASSERT_GT(nodes.size(), 0);
int node_index = RandomNode(&rng, &nodes);
int node = nodes[node_index];
nodes[node_index] = nodes.back();
nodes.pop_back();
graph_cycles.RemoveNode(ptr(node));
id.erase(node);
int j = 0;
while (j != edges.size()) {
if (edges[j].from == node || edges[j].to == node) {
edges[j] = edges.back();
edges.pop_back();
} else {
j++;
}
}
}
CheckInvariants(graph_cycles);
}
}
}
class GraphCyclesTest : public ::testing::Test {
public:
IdMap id_;
GraphCycles g_;
static void* Ptr(int i) {
return reinterpret_cast<void*>(static_cast<uintptr_t>(i));
}
static int Num(void* ptr) {
return static_cast<int>(reinterpret_cast<uintptr_t>(ptr));
}
// Test relies on ith NewNode() call returning Node numbered i
GraphCyclesTest() {
for (int i = 0; i < 100; i++) {
id_[i] = g_.GetId(Ptr(i));
}
CheckInvariants(g_);
}
bool AddEdge(int x, int y) {
return g_.InsertEdge(Get(id_, x), Get(id_, y));
}
void AddMultiples() {
// For every node x > 0: add edge to 2*x, 3*x
for (int x = 1; x < 25; x++) {
EXPECT_TRUE(AddEdge(x, 2*x)) << x;
EXPECT_TRUE(AddEdge(x, 3*x)) << x;
}
CheckInvariants(g_);
}
std::string Path(int x, int y) {
GraphId path[5];
int np = g_.FindPath(Get(id_, x), Get(id_, y), ABSL_ARRAYSIZE(path), path);
std::string result;
for (int i = 0; i < np; i++) {
if (i >= ABSL_ARRAYSIZE(path)) {
result += " ...";
break;
}
if (!result.empty()) result.push_back(' ');
char buf[20];
snprintf(buf, sizeof(buf), "%d", Num(g_.Ptr(path[i])));
result += buf;
}
return result;
}
};
TEST_F(GraphCyclesTest, NoCycle) {
AddMultiples();
CheckInvariants(g_);
}
TEST_F(GraphCyclesTest, SimpleCycle) {
AddMultiples();
EXPECT_FALSE(AddEdge(8, 4));
EXPECT_EQ("4 8", Path(4, 8));
CheckInvariants(g_);
}
TEST_F(GraphCyclesTest, IndirectCycle) {
AddMultiples();
EXPECT_TRUE(AddEdge(16, 9));
CheckInvariants(g_);
EXPECT_FALSE(AddEdge(9, 2));
EXPECT_EQ("2 4 8 16 9", Path(2, 9));
CheckInvariants(g_);
}
TEST_F(GraphCyclesTest, LongPath) {
ASSERT_TRUE(AddEdge(2, 4));
ASSERT_TRUE(AddEdge(4, 6));
ASSERT_TRUE(AddEdge(6, 8));
ASSERT_TRUE(AddEdge(8, 10));
ASSERT_TRUE(AddEdge(10, 12));
ASSERT_FALSE(AddEdge(12, 2));
EXPECT_EQ("2 4 6 8 10 ...", Path(2, 12));
CheckInvariants(g_);
}
TEST_F(GraphCyclesTest, RemoveNode) {
ASSERT_TRUE(AddEdge(1, 2));
ASSERT_TRUE(AddEdge(2, 3));
ASSERT_TRUE(AddEdge(3, 4));
ASSERT_TRUE(AddEdge(4, 5));
g_.RemoveNode(g_.Ptr(id_[3]));
id_.erase(3);
ASSERT_TRUE(AddEdge(5, 1));
}
TEST_F(GraphCyclesTest, ManyEdges) {
const int N = 50;
for (int i = 0; i < N; i++) {
for (int j = 1; j < N; j++) {
ASSERT_TRUE(AddEdge(i, i+j));
}
}
CheckInvariants(g_);
ASSERT_TRUE(AddEdge(2*N-1, 0));
CheckInvariants(g_);
ASSERT_FALSE(AddEdge(10, 9));
CheckInvariants(g_);
}
} // namespace synchronization_internal
} // namespace absl

View file

@ -0,0 +1,147 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// An optional absolute timeout, with nanosecond granularity,
// compatible with absl::Time. Suitable for in-register
// parameter-passing (e.g. syscalls.)
// Constructible from a absl::Time (for a timeout to be respected) or {}
// (for "no timeout".)
// This is a private low-level API for use by a handful of low-level
// components that are friends of this class. Higher-level components
// should build APIs based on absl::Time and absl::Duration.
#ifndef ABSL_SYNCHRONIZATION_INTERNAL_KERNEL_TIMEOUT_H_
#define ABSL_SYNCHRONIZATION_INTERNAL_KERNEL_TIMEOUT_H_
#ifdef _WIN32
#include <intsafe.h>
#endif
#include <time.h>
#include <algorithm>
#include <limits>
#include "absl/base/internal/raw_logging.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
namespace absl {
namespace synchronization_internal {
class Waiter;
class KernelTimeout {
public:
// A timeout that should expire at <t>. Any value, in the full
// InfinitePast() to InfiniteFuture() range, is valid here and will be
// respected.
explicit KernelTimeout(absl::Time t) : ns_(MakeNs(t)) {}
// No timeout.
KernelTimeout() : ns_(0) {}
// A more explicit factory for those who prefer it. Equivalent to {}.
static KernelTimeout Never() { return {}; }
// We explicitly do not support other custom formats: timespec, int64_t nanos.
// Unify on this and absl::Time, please.
bool has_timeout() const { return ns_ != 0; }
private:
// internal rep, not user visible: ns after unix epoch.
// zero = no timeout.
// Negative we treat as an unlikely (and certainly expired!) but valid
// timeout.
int64_t ns_;
static int64_t MakeNs(absl::Time t) {
// optimization--InfiniteFuture is common "no timeout" value
// and cheaper to compare than convert.
if (t == absl::InfiniteFuture()) return 0;
int64_t x = ToUnixNanos(t);
// A timeout that lands exactly on the epoch (x=0) needs to be respected,
// so we alter it unnoticably to 1. Negative timeouts are in
// theory supported, but handled poorly by the kernel (long
// delays) so push them forward too; since all such times have
// already passed, it's indistinguishable.
if (x <= 0) x = 1;
// A time larger than what can be represented to the kernel is treated
// as no timeout.
if (x == std::numeric_limits<int64_t>::max()) x = 0;
return x;
}
// Convert to parameter for sem_timedwait/futex/similar. Only for approved
// users. Do not call if !has_timeout.
struct timespec MakeAbsTimespec() {
int64_t n = ns_;
static const int64_t kNanosPerSecond = 1000 * 1000 * 1000;
if (n == 0) {
ABSL_RAW_LOG(
ERROR,
"Tried to create a timespec from a non-timeout; never do this.");
// But we'll try to continue sanely. no-timeout ~= saturated timeout.
n = std::numeric_limits<int64_t>::max();
}
// Kernel APIs validate timespecs as being at or after the epoch,
// despite the kernel time type being signed. However, no one can
// tell the difference between a timeout at or before the epoch (since
// all such timeouts have expired!)
if (n < 0) n = 0;
struct timespec abstime;
int64_t seconds = std::min(n / kNanosPerSecond,
int64_t{std::numeric_limits<time_t>::max()});
abstime.tv_sec = static_cast<time_t>(seconds);
abstime.tv_nsec =
static_cast<decltype(abstime.tv_nsec)>(n % kNanosPerSecond);
return abstime;
}
#ifdef _WIN32
// Converts to milliseconds from now, or INFINITE when
// !has_timeout(). For use by SleepConditionVariableSRW on
// Windows. Callers should recognize that the return value is a
// relative duration (it should be recomputed by calling this method
// in the case of a spurious wakeup).
DWORD InMillisecondsFromNow() const {
if (!has_timeout()) {
return INFINITE;
}
// The use of absl::Now() to convert from absolute time to
// relative time means that absl::Now() cannot use anything that
// depends on KernelTimeout (for example, Mutex) on Windows.
int64_t now = ToUnixNanos(absl::Now());
if (ns_ >= now) {
// Round up so that Now() + ms_from_now >= ns_.
constexpr uint64_t max_nanos =
std::numeric_limits<int64_t>::max() - 999999u;
uint64_t ms_from_now =
(std::min<uint64_t>(max_nanos, ns_ - now) + 999999u) / 1000000u;
if (ms_from_now > std::numeric_limits<DWORD>::max()) {
return INFINITE;
}
return static_cast<DWORD>(ms_from_now);
}
return 0;
}
#endif
friend class Waiter;
};
} // namespace synchronization_internal
} // namespace absl
#endif // ABSL_SYNCHRONIZATION_INTERNAL_KERNEL_TIMEOUT_H_

View file

@ -0,0 +1,311 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Implementation of a small subset of Mutex and CondVar functionality
// for platforms where the production implementation hasn't been fully
// ported yet.
#include "absl/synchronization/mutex.h"
#if defined(_WIN32)
#include <chrono> // NOLINT(build/c++11)
#else
#include <sys/time.h>
#include <time.h>
#endif
#include <algorithm>
#include "absl/base/internal/raw_logging.h"
#include "absl/time/time.h"
namespace absl {
namespace synchronization_internal {
namespace {
// Return the current time plus the timeout.
absl::Time DeadlineFromTimeout(absl::Duration timeout) {
return absl::Now() + timeout;
}
// Limit the deadline to a positive, 32-bit time_t value to accommodate
// implementation restrictions. This also deals with InfinitePast and
// InfiniteFuture.
absl::Time LimitedDeadline(absl::Time deadline) {
deadline = std::max(absl::FromTimeT(0), deadline);
deadline = std::min(deadline, absl::FromTimeT(0x7fffffff));
return deadline;
}
} // namespace
#if defined(_WIN32)
MutexImpl::MutexImpl() {}
MutexImpl::~MutexImpl() {
if (locked_) {
std_mutex_.unlock();
}
}
void MutexImpl::Lock() {
std_mutex_.lock();
locked_ = true;
}
bool MutexImpl::TryLock() {
bool locked = std_mutex_.try_lock();
if (locked) locked_ = true;
return locked;
}
void MutexImpl::Unlock() {
locked_ = false;
released_.SignalAll();
std_mutex_.unlock();
}
CondVarImpl::CondVarImpl() {}
CondVarImpl::~CondVarImpl() {}
void CondVarImpl::Signal() { std_cv_.notify_one(); }
void CondVarImpl::SignalAll() { std_cv_.notify_all(); }
void CondVarImpl::Wait(MutexImpl* mu) {
mu->released_.SignalAll();
std_cv_.wait(mu->std_mutex_);
}
bool CondVarImpl::WaitWithDeadline(MutexImpl* mu, absl::Time deadline) {
mu->released_.SignalAll();
time_t when = ToTimeT(deadline);
int64_t nanos = ToInt64Nanoseconds(deadline - absl::FromTimeT(when));
std::chrono::system_clock::time_point deadline_tp =
std::chrono::system_clock::from_time_t(when) +
std::chrono::duration_cast<std::chrono::system_clock::duration>(
std::chrono::nanoseconds(nanos));
auto deadline_since_epoch =
std::chrono::duration_cast<std::chrono::duration<double>>(
deadline_tp - std::chrono::system_clock::from_time_t(0));
return std_cv_.wait_until(mu->std_mutex_, deadline_tp) ==
std::cv_status::timeout;
}
#else // ! _WIN32
MutexImpl::MutexImpl() {
ABSL_RAW_CHECK(pthread_mutex_init(&pthread_mutex_, nullptr) == 0,
"pthread error");
}
MutexImpl::~MutexImpl() {
if (locked_) {
ABSL_RAW_CHECK(pthread_mutex_unlock(&pthread_mutex_) == 0, "pthread error");
}
ABSL_RAW_CHECK(pthread_mutex_destroy(&pthread_mutex_) == 0, "pthread error");
}
void MutexImpl::Lock() {
ABSL_RAW_CHECK(pthread_mutex_lock(&pthread_mutex_) == 0, "pthread error");
locked_ = true;
}
bool MutexImpl::TryLock() {
bool locked = (0 == pthread_mutex_trylock(&pthread_mutex_));
if (locked) locked_ = true;
return locked;
}
void MutexImpl::Unlock() {
locked_ = false;
released_.SignalAll();
ABSL_RAW_CHECK(pthread_mutex_unlock(&pthread_mutex_) == 0, "pthread error");
}
CondVarImpl::CondVarImpl() {
ABSL_RAW_CHECK(pthread_cond_init(&pthread_cv_, nullptr) == 0,
"pthread error");
}
CondVarImpl::~CondVarImpl() {
ABSL_RAW_CHECK(pthread_cond_destroy(&pthread_cv_) == 0, "pthread error");
}
void CondVarImpl::Signal() {
ABSL_RAW_CHECK(pthread_cond_signal(&pthread_cv_) == 0, "pthread error");
}
void CondVarImpl::SignalAll() {
ABSL_RAW_CHECK(pthread_cond_broadcast(&pthread_cv_) == 0, "pthread error");
}
void CondVarImpl::Wait(MutexImpl* mu) {
mu->released_.SignalAll();
ABSL_RAW_CHECK(pthread_cond_wait(&pthread_cv_, &mu->pthread_mutex_) == 0,
"pthread error");
}
bool CondVarImpl::WaitWithDeadline(MutexImpl* mu, absl::Time deadline) {
mu->released_.SignalAll();
struct timespec ts = ToTimespec(deadline);
int rc = pthread_cond_timedwait(&pthread_cv_, &mu->pthread_mutex_, &ts);
if (rc == ETIMEDOUT) return true;
ABSL_RAW_CHECK(rc == 0, "pthread error");
return false;
}
#endif // ! _WIN32
void MutexImpl::Await(const Condition& cond) {
if (cond.Eval()) return;
released_.SignalAll();
do {
released_.Wait(this);
} while (!cond.Eval());
}
bool MutexImpl::AwaitWithDeadline(const Condition& cond, absl::Time deadline) {
if (cond.Eval()) return true;
released_.SignalAll();
while (true) {
if (released_.WaitWithDeadline(this, deadline)) return false;
if (cond.Eval()) return true;
}
}
} // namespace synchronization_internal
Mutex::Mutex() {}
Mutex::~Mutex() {}
void Mutex::Lock() { impl()->Lock(); }
void Mutex::Unlock() { impl()->Unlock(); }
bool Mutex::TryLock() { return impl()->TryLock(); }
void Mutex::ReaderLock() { Lock(); }
void Mutex::ReaderUnlock() { Unlock(); }
void Mutex::Await(const Condition& cond) { impl()->Await(cond); }
void Mutex::LockWhen(const Condition& cond) {
Lock();
Await(cond);
}
bool Mutex::AwaitWithDeadline(const Condition& cond, absl::Time deadline) {
return impl()->AwaitWithDeadline(
cond, synchronization_internal::LimitedDeadline(deadline));
}
bool Mutex::AwaitWithTimeout(const Condition& cond, absl::Duration timeout) {
return AwaitWithDeadline(
cond, synchronization_internal::DeadlineFromTimeout(timeout));
}
bool Mutex::LockWhenWithDeadline(const Condition& cond, absl::Time deadline) {
Lock();
return AwaitWithDeadline(cond, deadline);
}
bool Mutex::LockWhenWithTimeout(const Condition& cond, absl::Duration timeout) {
return LockWhenWithDeadline(
cond, synchronization_internal::DeadlineFromTimeout(timeout));
}
bool Mutex::ReaderLockWhenWithTimeout(const Condition& cond,
absl::Duration timeout) {
return LockWhenWithTimeout(cond, timeout);
}
bool Mutex::ReaderLockWhenWithDeadline(const Condition& cond,
absl::Time deadline) {
return LockWhenWithDeadline(cond, deadline);
}
void Mutex::EnableDebugLog(const char*) {}
void Mutex::EnableInvariantDebugging(void (*)(void*), void*) {}
void Mutex::ForgetDeadlockInfo() {}
void Mutex::AssertHeld() const {}
void Mutex::AssertReaderHeld() const {}
void Mutex::AssertNotHeld() const {}
CondVar::CondVar() {}
CondVar::~CondVar() {}
void CondVar::Signal() { impl()->Signal(); }
void CondVar::SignalAll() { impl()->SignalAll(); }
void CondVar::Wait(Mutex* mu) { return impl()->Wait(mu->impl()); }
bool CondVar::WaitWithDeadline(Mutex* mu, absl::Time deadline) {
return impl()->WaitWithDeadline(
mu->impl(), synchronization_internal::LimitedDeadline(deadline));
}
bool CondVar::WaitWithTimeout(Mutex* mu, absl::Duration timeout) {
return WaitWithDeadline(mu, absl::Now() + timeout);
}
void CondVar::EnableDebugLog(const char*) {}
#ifdef THREAD_SANITIZER
extern "C" void __tsan_read1(void *addr);
#else
#define __tsan_read1(addr) // do nothing if TSan not enabled
#endif
// A function that just returns its argument, dereferenced
static bool Dereference(void *arg) {
// ThreadSanitizer does not instrument this file for memory accesses.
// This function dereferences a user variable that can participate
// in a data race, so we need to manually tell TSan about this memory access.
__tsan_read1(arg);
return *(static_cast<bool *>(arg));
}
Condition::Condition() {} // null constructor, used for kTrue only
const Condition Condition::kTrue;
Condition::Condition(bool (*func)(void *), void *arg)
: eval_(&CallVoidPtrFunction),
function_(func),
method_(nullptr),
arg_(arg) {}
bool Condition::CallVoidPtrFunction(const Condition *c) {
return (*c->function_)(c->arg_);
}
Condition::Condition(const bool *cond)
: eval_(CallVoidPtrFunction),
function_(Dereference),
method_(nullptr),
// const_cast is safe since Dereference does not modify arg
arg_(const_cast<bool *>(cond)) {}
bool Condition::Eval() const {
// eval_ == null for kTrue
return (this->eval_ == nullptr) || (*this->eval_)(this);
}
} // namespace absl

View file

@ -0,0 +1,256 @@
// Do not include. This is an implementation detail of base/mutex.h.
//
// Declares three classes:
//
// base::internal::MutexImpl - implementation helper for Mutex
// base::internal::CondVarImpl - implementation helper for CondVar
// base::internal::SynchronizationStorage<T> - implementation helper for
// Mutex, CondVar
#include <type_traits>
#if defined(_WIN32)
#include <condition_variable>
#include <mutex>
#else
#include <pthread.h>
#endif
#include "absl/base/call_once.h"
#include "absl/time/time.h"
// Declare that Mutex::ReaderLock is actually Lock(). Intended primarily
// for tests, and even then as a last resort.
#ifdef ABSL_MUTEX_READER_LOCK_IS_EXCLUSIVE
#error ABSL_MUTEX_READER_LOCK_IS_EXCLUSIVE cannot be directly set
#else
#define ABSL_MUTEX_READER_LOCK_IS_EXCLUSIVE 1
#endif
// Declare that Mutex::EnableInvariantDebugging is not implemented.
// Intended primarily for tests, and even then as a last resort.
#ifdef ABSL_MUTEX_ENABLE_INVARIANT_DEBUGGING_NOT_IMPLEMENTED
#error ABSL_MUTEX_ENABLE_INVARIANT_DEBUGGING_NOT_IMPLEMENTED cannot be directly set
#else
#define ABSL_MUTEX_ENABLE_INVARIANT_DEBUGGING_NOT_IMPLEMENTED 1
#endif
namespace absl {
class Condition;
namespace synchronization_internal {
class MutexImpl;
// Do not use this implementation detail of CondVar. Provides most of the
// implementation, but should not be placed directly in static storage
// because it will not linker initialize properly. See
// SynchronizationStorage<T> below for what we mean by linker
// initialization.
class CondVarImpl {
public:
CondVarImpl();
CondVarImpl(const CondVarImpl&) = delete;
CondVarImpl& operator=(const CondVarImpl&) = delete;
~CondVarImpl();
void Signal();
void SignalAll();
void Wait(MutexImpl* mutex);
bool WaitWithDeadline(MutexImpl* mutex, absl::Time deadline);
private:
#if defined(_WIN32)
std::condition_variable_any std_cv_;
#else
pthread_cond_t pthread_cv_;
#endif
};
// Do not use this implementation detail of Mutex. Provides most of the
// implementation, but should not be placed directly in static storage
// because it will not linker initialize properly. See
// SynchronizationStorage<T> below for what we mean by linker
// initialization.
class MutexImpl {
public:
MutexImpl();
MutexImpl(const MutexImpl&) = delete;
MutexImpl& operator=(const MutexImpl&) = delete;
~MutexImpl();
void Lock();
bool TryLock();
void Unlock();
void Await(const Condition& cond);
bool AwaitWithDeadline(const Condition& cond, absl::Time deadline);
private:
friend class CondVarImpl;
#if defined(_WIN32)
std::mutex std_mutex_;
#else
pthread_mutex_t pthread_mutex_;
#endif
// True if the underlying mutex is locked. If the destructor is entered
// while locked_, the underlying mutex is unlocked. Mutex supports
// destruction while locked, but the same is undefined behavior for both
// pthread_mutex_t and std::mutex.
bool locked_ = false;
// Signaled before releasing the lock, in support of Await.
CondVarImpl released_;
};
// Do not use this implementation detail of CondVar and Mutex. A storage
// space for T that supports a base::LinkerInitialized constructor. T must
// have a default constructor, which is called by the first call to
// get(). T's destructor is never called if the base::LinkerInitialized
// constructor is called.
//
// Objects constructed with the default constructor are constructed and
// destructed like any other object, and should never be allocated in
// static storage.
//
// Objects constructed with the base::LinkerInitialized constructor should
// always be in static storage. For such objects, calls to get() are always
// valid, except from signal handlers.
//
// Note that this implementation relies on undefined language behavior that
// are known to hold for the set of supported compilers. An analysis
// follows.
//
// From the C++11 standard:
//
// [basic.life] says an object has non-trivial initialization if it is of
// class type and it is initialized by a constructor other than a trivial
// default constructor. (the base::LinkerInitialized constructor is
// non-trivial)
//
// [basic.life] says the lifetime of an object with a non-trivial
// constructor begins when the call to the constructor is complete.
//
// [basic.life] says the lifetime of an object with non-trivial destructor
// ends when the call to the destructor begins.
//
// [basic.life] p5 specifies undefined behavior when accessing non-static
// members of an instance outside its
// lifetime. (SynchronizationStorage::get() access non-static members)
//
// So, base::LinkerInitialized object of SynchronizationStorage uses a
// non-trivial constructor, which is called at some point during dynamic
// initialization, and is therefore subject to order of dynamic
// initialization bugs, where get() is called before the object's
// constructor is, resulting in undefined behavior.
//
// Similarly, a base::LinkerInitialized SynchronizationStorage object has a
// non-trivial destructor, and so its lifetime ends at some point during
// destruction of objects with static storage duration [basic.start.term]
// p4. There is a window where other exit code could call get() after this
// occurs, resulting in undefined behavior.
//
// Combined, these statements imply that base::LinkerInitialized instances
// of SynchronizationStorage<T> rely on undefined behavior.
//
// However, in practice, the implementation works on all supported
// compilers. Specifically, we rely on:
//
// a) zero-initialization being sufficient to initialize
// base::LinkerInitialized instances for the purposes of calling
// get(), regardless of when the constructor is called. This is
// because the is_dynamic_ boolean is correctly zero-initialized to
// false.
//
// b) the base::LinkerInitialized constructor is a NOP, and immaterial to
// even to concurrent calls to get().
//
// c) the destructor being a NOP for base::LinkerInitialized objects
// (guaranteed by a check for !is_dynamic_), and so any concurrent and
// subsequent calls to get() functioning as if the destructor were not
// called, by virtue of the instances' storage remaining valid after the
// destructor runs.
//
// d) That a-c apply transitively when SynchronizationStorage<T> is the
// only member of a class allocated in static storage.
//
// Nothing in the language standard guarantees that a-d hold. In practice,
// these hold in all supported compilers.
//
// Future direction:
//
// Ideally, we would simply use std::mutex or a similar class, which when
// allocated statically would support use immediately after static
// initialization up until static storage is reclaimed (i.e. the properties
// we require of all "linker initialized" instances).
//
// Regarding construction in static storage, std::mutex is required to
// provide a constexpr default constructor [thread.mutex.class], which
// ensures the instance's lifetime begins with static initialization
// [basic.start.init], and so is immune to any problems caused by the order
// of dynamic initialization. However, as of this writing Microsoft's
// Visual Studio does not provide a constexpr constructor for std::mutex.
// See
// https://blogs.msdn.microsoft.com/vcblog/2015/06/02/constexpr-complete-for-vs-2015-rtm-c11-compiler-c17-stl/
//
// Regarding destruction of instances in static storage, [basic.life] does
// say an object ends when storage in which the occupies is released, in
// the case of non-trivial destructor. However, std::mutex is not specified
// to have a trivial destructor.
//
// So, we would need a class with a constexpr default constructor and a
// trivial destructor. Today, we can achieve neither desired property using
// std::mutex directly.
template <typename T>
class SynchronizationStorage {
public:
// Instances allocated on the heap or on the stack should use the default
// constructor.
SynchronizationStorage()
: is_dynamic_(true), once_() {}
// Instances allocated in static storage (not on the heap, not on the
// stack) should use this constructor.
explicit SynchronizationStorage(base::LinkerInitialized) {}
SynchronizationStorage(SynchronizationStorage&) = delete;
SynchronizationStorage& operator=(SynchronizationStorage&) = delete;
~SynchronizationStorage() {
if (is_dynamic_) {
get()->~T();
}
}
// Retrieve the object in storage. This is fast and thread safe, but does
// incur the cost of absl::call_once().
//
// For instances in static storage constructed with the
// base::LinkerInitialized constructor, may be called at any time without
// regard for order of dynamic initialization or destruction of objects
// in static storage. See the class comment for caveats.
T* get() {
absl::call_once(once_, SynchronizationStorage::Construct, this);
return reinterpret_cast<T*>(&space_);
}
private:
static void Construct(SynchronizationStorage<T>* self) {
new (&self->space_) T();
}
// When true, T's destructor is run when this is destructed.
//
// The base::LinkerInitialized constructor assumes this value will be set
// false by static initialization.
bool is_dynamic_;
absl::once_flag once_;
// An aligned space for T.
typename std::aligned_storage<sizeof(T), alignof(T)>::type space_;
};
} // namespace synchronization_internal
} // namespace absl

View file

@ -0,0 +1,106 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is a no-op if the required LowLevelAlloc support is missing.
#include "absl/base/internal/low_level_alloc.h"
#ifndef ABSL_LOW_LEVEL_ALLOC_MISSING
#include "absl/synchronization/internal/per_thread_sem.h"
#include <atomic>
#include "absl/base/attributes.h"
#include "absl/base/internal/malloc_extension.h"
#include "absl/base/internal/thread_identity.h"
#include "absl/synchronization/internal/waiter.h"
namespace absl {
namespace synchronization_internal {
void PerThreadSem::SetThreadBlockedCounter(std::atomic<int> *counter) {
base_internal::ThreadIdentity *identity;
identity = GetOrCreateCurrentThreadIdentity();
identity->blocked_count_ptr = counter;
}
std::atomic<int> *PerThreadSem::GetThreadBlockedCounter() {
base_internal::ThreadIdentity *identity;
identity = GetOrCreateCurrentThreadIdentity();
return identity->blocked_count_ptr;
}
void PerThreadSem::Init(base_internal::ThreadIdentity *identity) {
Waiter::GetWaiter(identity)->Init();
identity->ticker.store(0, std::memory_order_relaxed);
identity->wait_start.store(0, std::memory_order_relaxed);
identity->is_idle.store(false, std::memory_order_relaxed);
}
void PerThreadSem::Tick(base_internal::ThreadIdentity *identity) {
const int ticker =
identity->ticker.fetch_add(1, std::memory_order_relaxed) + 1;
const int wait_start = identity->wait_start.load(std::memory_order_relaxed);
const bool is_idle = identity->is_idle.load(std::memory_order_relaxed);
if (wait_start && (ticker - wait_start > Waiter::kIdlePeriods) && !is_idle) {
// Wakeup the waiting thread since it is time for it to become idle.
Waiter::GetWaiter(identity)->Poke();
}
}
} // namespace synchronization_internal
} // namespace absl
extern "C" {
ABSL_ATTRIBUTE_WEAK void AbslInternalPerThreadSemPost(
absl::base_internal::ThreadIdentity *identity) {
absl::synchronization_internal::Waiter::GetWaiter(identity)->Post();
}
ABSL_ATTRIBUTE_WEAK bool AbslInternalPerThreadSemWait(
absl::synchronization_internal::KernelTimeout t) {
bool timeout = false;
absl::base_internal::ThreadIdentity *identity;
identity = absl::synchronization_internal::GetOrCreateCurrentThreadIdentity();
// Ensure wait_start != 0.
int ticker = identity->ticker.load(std::memory_order_relaxed);
identity->wait_start.store(ticker ? ticker : 1, std::memory_order_relaxed);
identity->is_idle.store(false, std::memory_order_relaxed);
if (identity->blocked_count_ptr != nullptr) {
// Increment count of threads blocked in a given thread pool.
identity->blocked_count_ptr->fetch_add(1, std::memory_order_relaxed);
}
timeout =
!absl::synchronization_internal::Waiter::GetWaiter(identity)->Wait(t);
if (identity->blocked_count_ptr != nullptr) {
identity->blocked_count_ptr->fetch_sub(1, std::memory_order_relaxed);
}
if (identity->is_idle.load(std::memory_order_relaxed)) {
// We became idle during the wait; become non-idle again so that
// performance of deallocations done from now on does not suffer.
absl::base_internal::MallocExtension::instance()->MarkThreadBusy();
}
identity->is_idle.store(false, std::memory_order_relaxed);
identity->wait_start.store(0, std::memory_order_relaxed);
return !timeout;
}
} // extern "C"
#endif // ABSL_LOW_LEVEL_ALLOC_MISSING

View file

@ -0,0 +1,107 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// PerThreadSem is a low-level synchronization primitive controlling the
// runnability of a single thread, used internally by Mutex and CondVar.
//
// This is NOT a general-purpose synchronization mechanism, and should not be
// used directly by applications. Applications should use Mutex and CondVar.
//
// The semantics of PerThreadSem are the same as that of a counting semaphore.
// Each thread maintains an abstract "count" value associated with its identity.
#ifndef ABSL_SYNCHRONIZATION_INTERNAL_PER_THREAD_SEM_H_
#define ABSL_SYNCHRONIZATION_INTERNAL_PER_THREAD_SEM_H_
#include <atomic>
#include "absl/base/internal/thread_identity.h"
#include "absl/synchronization/internal/create_thread_identity.h"
#include "absl/synchronization/internal/kernel_timeout.h"
namespace absl {
class Mutex;
namespace synchronization_internal {
class PerThreadSem {
public:
PerThreadSem() = delete;
PerThreadSem(const PerThreadSem&) = delete;
PerThreadSem& operator=(const PerThreadSem&) = delete;
// Routine invoked periodically (once a second) by a background thread.
// Has no effect on user-visible state.
static void Tick(base_internal::ThreadIdentity* identity);
// ---------------------------------------------------------------------------
// Routines used by autosizing threadpools to detect when threads are
// blocked. Each thread has a counter pointer, initially zero. If non-zero,
// the implementation atomically increments the counter when it blocks on a
// semaphore, a decrements it again when it wakes. This allows a threadpool
// to keep track of how many of its threads are blocked.
// SetThreadBlockedCounter() should be used only by threadpool
// implementations. GetThreadBlockedCounter() should be used by modules that
// block threads; if the pointer returned is non-zero, the location should be
// incremented before the thread blocks, and decremented after it wakes.
static void SetThreadBlockedCounter(std::atomic<int> *counter);
static std::atomic<int> *GetThreadBlockedCounter();
private:
// Create the PerThreadSem associated with "identity". Initializes count=0.
// REQUIRES: May only be called by ThreadIdentity.
static void Init(base_internal::ThreadIdentity* identity);
// Increments "identity"'s count.
static inline void Post(base_internal::ThreadIdentity* identity);
// Waits until either our count > 0 or t has expired.
// If count > 0, decrements count and returns true. Otherwise returns false.
// !t.has_timeout() => Wait(t) will return true.
static inline bool Wait(KernelTimeout t);
// White-listed callers.
friend class PerThreadSemTest;
friend class absl::Mutex;
friend absl::base_internal::ThreadIdentity* CreateThreadIdentity();
};
} // namespace synchronization_internal
} // namespace absl
// In some build configurations we pass --detect-odr-violations to the
// gold linker. This causes it to flag weak symbol overrides as ODR
// violations. Because ODR only applies to C++ and not C,
// --detect-odr-violations ignores symbols not mangled with C++ names.
// By changing our extension points to be extern "C", we dodge this
// check.
extern "C" {
void AbslInternalPerThreadSemPost(
absl::base_internal::ThreadIdentity* identity);
bool AbslInternalPerThreadSemWait(
absl::synchronization_internal::KernelTimeout t);
} // extern "C"
void absl::synchronization_internal::PerThreadSem::Post(
absl::base_internal::ThreadIdentity* identity) {
AbslInternalPerThreadSemPost(identity);
}
bool absl::synchronization_internal::PerThreadSem::Wait(
absl::synchronization_internal::KernelTimeout t) {
return AbslInternalPerThreadSemWait(t);
}
#endif // ABSL_SYNCHRONIZATION_INTERNAL_PER_THREAD_SEM_H_

View file

@ -0,0 +1,246 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "absl/synchronization/internal/per_thread_sem.h"
#include <atomic>
#include <condition_variable> // NOLINT(build/c++11)
#include <functional>
#include <limits>
#include <mutex> // NOLINT(build/c++11)
#include <string>
#include <thread> // NOLINT(build/c++11)
#include "absl/base/internal/cycleclock.h"
#include "absl/base/internal/malloc_extension.h"
#include "absl/base/internal/thread_identity.h"
#include "absl/strings/str_cat.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "gtest/gtest.h"
// In this test we explicitly avoid the use of synchronization
// primitives which might use PerThreadSem, most notably absl::Mutex.
namespace absl {
namespace synchronization_internal {
class SimpleSemaphore {
public:
SimpleSemaphore() : count_(0) {}
// Decrements (locks) the semaphore. If the semaphore's value is
// greater than zero, then the decrement proceeds, and the function
// returns, immediately. If the semaphore currently has the value
// zero, then the call blocks until it becomes possible to perform
// the decrement.
void Wait() {
std::unique_lock<std::mutex> lock(mu_);
cv_.wait(lock, [this]() { return count_ > 0; });
--count_;
cv_.notify_one();
}
// Increments (unlocks) the semaphore. If the semaphore's value
// consequently becomes greater than zero, then another thread
// blocked Wait() call will be woken up and proceed to lock the
// semaphore.
void Post() {
std::lock_guard<std::mutex> lock(mu_);
++count_;
cv_.notify_one();
}
private:
std::mutex mu_;
std::condition_variable cv_;
int count_;
};
struct ThreadData {
int num_iterations; // Number of replies to send.
SimpleSemaphore identity2_written; // Posted by thread writing identity2.
base_internal::ThreadIdentity *identity1; // First Post()-er.
base_internal::ThreadIdentity *identity2; // First Wait()-er.
KernelTimeout timeout;
};
// Need friendship with PerThreadSem.
class PerThreadSemTest : public testing::Test {
public:
static void TimingThread(ThreadData* t) {
t->identity2 = GetOrCreateCurrentThreadIdentity();
t->identity2_written.Post();
while (t->num_iterations--) {
Wait(t->timeout);
Post(t->identity1);
}
}
void TestTiming(const char *msg, bool timeout) {
static const int kNumIterations = 100;
ThreadData t;
t.num_iterations = kNumIterations;
t.timeout = timeout ?
KernelTimeout(absl::Now() + absl::Seconds(10000)) // far in the future
: KernelTimeout::Never();
t.identity1 = GetOrCreateCurrentThreadIdentity();
// We can't use the Thread class here because it uses the Mutex
// class which will invoke PerThreadSem, so we use std::thread instead.
std::thread partner_thread(std::bind(TimingThread, &t));
// Wait for our partner thread to register their identity.
t.identity2_written.Wait();
int64_t min_cycles = std::numeric_limits<int64_t>::max();
int64_t total_cycles = 0;
for (int i = 0; i < kNumIterations; ++i) {
absl::SleepFor(absl::Milliseconds(20));
int64_t cycles = base_internal::CycleClock::Now();
Post(t.identity2);
Wait(t.timeout);
cycles = base_internal::CycleClock::Now() - cycles;
min_cycles = std::min(min_cycles, cycles);
total_cycles += cycles;
}
std::string out =
StrCat(msg, "min cycle count=", min_cycles, " avg cycle count=",
absl::SixDigits(static_cast<double>(total_cycles) /
kNumIterations));
printf("%s\n", out.c_str());
partner_thread.join();
}
protected:
static void Post(base_internal::ThreadIdentity *id) {
PerThreadSem::Post(id);
}
static bool Wait(KernelTimeout t) {
return PerThreadSem::Wait(t);
}
// convenience overload
static bool Wait(absl::Time t) {
return Wait(KernelTimeout(t));
}
static void Tick(base_internal::ThreadIdentity *identity) {
PerThreadSem::Tick(identity);
}
};
namespace {
TEST_F(PerThreadSemTest, WithoutTimeout) {
PerThreadSemTest::TestTiming("Without timeout: ", false);
}
TEST_F(PerThreadSemTest, WithTimeout) {
PerThreadSemTest::TestTiming("With timeout: ", true);
}
TEST_F(PerThreadSemTest, Timeouts) {
absl::Time timeout = absl::Now() + absl::Milliseconds(50);
EXPECT_FALSE(Wait(timeout));
EXPECT_LE(timeout, absl::Now());
absl::Time negative_timeout = absl::UnixEpoch() - absl::Milliseconds(100);
EXPECT_FALSE(Wait(negative_timeout));
EXPECT_LE(negative_timeout, absl::Now()); // trivially true :)
Post(GetOrCreateCurrentThreadIdentity());
// The wait here has an expired timeout, but we have a wake to consume,
// so this should succeed
EXPECT_TRUE(Wait(negative_timeout));
}
// Test that idle threads properly register themselves as such with malloc.
TEST_F(PerThreadSemTest, Idle) {
// We can't use gmock because it might use synch calls. So we do it
// by hand, messily. I don't bother hitting every one of the
// MallocExtension calls because most of them won't get made
// anyway--if they do we can add them.
class MockMallocExtension : public base_internal::MallocExtension {
public:
MockMallocExtension(base_internal::MallocExtension *real,
base_internal::ThreadIdentity *id,
std::atomic<int> *idles, std::atomic<int> *busies)
: real_(real), id_(id), idles_(idles), busies_(busies) {}
void MarkThreadIdle() override {
if (base_internal::CurrentThreadIdentityIfPresent() != id_) {
return;
}
idles_->fetch_add(1, std::memory_order_relaxed);
}
void MarkThreadBusy() override {
if (base_internal::CurrentThreadIdentityIfPresent() != id_) {
return;
}
busies_->fetch_add(1, std::memory_order_relaxed);
}
size_t GetAllocatedSize(const void* p) override {
return real_->GetAllocatedSize(p);
}
private:
MallocExtension *real_;
base_internal::ThreadIdentity *id_;
std::atomic<int>* idles_;
std::atomic<int>* busies_;
};
base_internal::ThreadIdentity *id = GetOrCreateCurrentThreadIdentity();
std::atomic<int> idles(0);
std::atomic<int> busies(0);
base_internal::MallocExtension *old =
base_internal::MallocExtension::instance();
MockMallocExtension mock(old, id, &idles, &busies);
base_internal::MallocExtension::Register(&mock);
std::atomic<int> sync(0);
std::thread t([id, &idles, &sync]() {
// Wait for the main thread to begin the wait process
while (0 == sync.load(std::memory_order_relaxed)) {
SleepFor(absl::Milliseconds(1));
}
// Wait for main thread to become idle, then wake it
// pretend time is passing--enough of these should cause an idling.
for (int i = 0; i < 100; ++i) {
Tick(id);
}
while (0 == idles.load(std::memory_order_relaxed)) {
// Keep ticking, just in case.
Tick(id);
SleepFor(absl::Milliseconds(1));
}
Post(id);
});
idles.store(0, std::memory_order_relaxed); // In case we slept earlier.
sync.store(1, std::memory_order_relaxed);
Wait(KernelTimeout::Never());
// t will wake us once we become idle.
EXPECT_LT(0, busies.load(std::memory_order_relaxed));
t.join();
base_internal::MallocExtension::Register(old);
}
} // namespace
} // namespace synchronization_internal
} // namespace absl

View file

@ -0,0 +1,90 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_
#define ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_
#include <cassert>
#include <functional>
#include <queue>
#include <thread> // NOLINT(build/c++11)
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/synchronization/mutex.h"
namespace absl {
namespace synchronization_internal {
// A simple ThreadPool implementation for tests.
class ThreadPool {
public:
explicit ThreadPool(int num_threads) {
for (int i = 0; i < num_threads; ++i) {
threads_.push_back(std::thread(&ThreadPool::WorkLoop, this));
}
}
ThreadPool(const ThreadPool &) = delete;
ThreadPool &operator=(const ThreadPool &) = delete;
~ThreadPool() {
{
absl::MutexLock l(&mu_);
for (int i = 0; i < threads_.size(); ++i) {
queue_.push(nullptr); // Shutdown signal.
}
}
for (auto &t : threads_) {
t.join();
}
}
// Schedule a function to be run on a ThreadPool thread immediately.
void Schedule(std::function<void()> func) {
assert(func != nullptr);
absl::MutexLock l(&mu_);
queue_.push(std::move(func));
}
private:
bool WorkAvailable() const EXCLUSIVE_LOCKS_REQUIRED(mu_) {
return !queue_.empty();
}
void WorkLoop() {
while (true) {
std::function<void()> func;
{
absl::MutexLock l(&mu_);
mu_.Await(absl::Condition(this, &ThreadPool::WorkAvailable));
func = std::move(queue_.front());
queue_.pop();
}
if (func == nullptr) { // Shutdown signal.
break;
}
func();
}
}
absl::Mutex mu_;
std::queue<std::function<void()>> queue_ GUARDED_BY(mu_);
std::vector<std::thread> threads_;
};
} // namespace synchronization_internal
} // namespace absl
#endif // ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_

View file

@ -0,0 +1,394 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "absl/synchronization/internal/waiter.h"
#include "absl/base/config.h"
#ifdef _WIN32
#include <windows.h>
#else
#include <pthread.h>
#include <sys/time.h>
#include <unistd.h>
#endif
#ifdef __linux__
#include <linux/futex.h>
#include <sys/syscall.h>
#endif
#ifdef ABSL_HAVE_SEMAPHORE_H
#include <semaphore.h>
#endif
#include <errno.h>
#include <stdio.h>
#include <time.h>
#include <atomic>
#include <cassert>
#include "absl/base/internal/malloc_extension.h"
#include "absl/base/internal/raw_logging.h"
#include "absl/base/internal/thread_identity.h"
#include "absl/synchronization/internal/kernel_timeout.h"
namespace absl {
namespace synchronization_internal {
static void MaybeBecomeIdle() {
base_internal::ThreadIdentity *identity =
base_internal::CurrentThreadIdentityIfPresent();
assert(identity != nullptr);
const bool is_idle = identity->is_idle.load(std::memory_order_relaxed);
const int ticker = identity->ticker.load(std::memory_order_relaxed);
const int wait_start = identity->wait_start.load(std::memory_order_relaxed);
if (!is_idle && ticker - wait_start > Waiter::kIdlePeriods) {
identity->is_idle.store(true, std::memory_order_relaxed);
base_internal::MallocExtension::instance()->MarkThreadIdle();
}
}
#if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX
// Some Android headers are missing these definitions even though they
// support these futex operations.
#ifdef __BIONIC__
#ifndef SYS_futex
#define SYS_futex __NR_futex
#endif
#ifndef FUTEX_WAIT_BITSET
#define FUTEX_WAIT_BITSET 9
#endif
#ifndef FUTEX_PRIVATE_FLAG
#define FUTEX_PRIVATE_FLAG 128
#endif
#ifndef FUTEX_CLOCK_REALTIME
#define FUTEX_CLOCK_REALTIME 256
#endif
#ifndef FUTEX_BITSET_MATCH_ANY
#define FUTEX_BITSET_MATCH_ANY 0xFFFFFFFF
#endif
#endif
void Waiter::Init() {
futex_.store(0, std::memory_order_relaxed);
}
bool Waiter::Wait(KernelTimeout t) {
// Loop until we can atomically decrement futex from a positive
// value, waiting on a futex while we believe it is zero.
while (true) {
int x = futex_.load(std::memory_order_relaxed);
if (x != 0) {
if (!futex_.compare_exchange_weak(x, x - 1,
std::memory_order_acquire,
std::memory_order_relaxed)) {
continue; // Raced with someone, retry.
}
return true; // Consumed a wakeup, we are done.
}
int err = 0;
if (t.has_timeout()) {
// https://locklessinc.com/articles/futex_cheat_sheet/
// Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time.
struct timespec abs_timeout = t.MakeAbsTimespec();
// Atomically check that the futex value is still 0, and if it
// is, sleep until abs_timeout or until woken by FUTEX_WAKE.
err = syscall(
SYS_futex, reinterpret_cast<int *>(&futex_),
FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME, 0,
&abs_timeout, nullptr, FUTEX_BITSET_MATCH_ANY);
} else {
// Atomically check that the futex value is still 0, and if it
// is, sleep until woken by FUTEX_WAKE.
err = syscall(SYS_futex, reinterpret_cast<int *>(&futex_),
FUTEX_WAIT | FUTEX_PRIVATE_FLAG, 0, nullptr);
}
if (err != 0) {
if (errno == EINTR || errno == EWOULDBLOCK) {
// Do nothing, the loop will retry.
} else if (errno == ETIMEDOUT) {
return false; // Timeout.
} else {
ABSL_RAW_LOG(FATAL, "Futex operation failed with errno %d\n", errno);
}
}
MaybeBecomeIdle();
}
}
void Waiter::Post() {
if (futex_.fetch_add(1, std::memory_order_release) == 0) {
// We incremented from 0, need to wake a potential waker.
Poke();
}
}
void Waiter::Poke() {
// Wake one thread waiting on the futex.
int err = syscall(SYS_futex, reinterpret_cast<int *>(&futex_),
FUTEX_WAKE | FUTEX_PRIVATE_FLAG, 1);
if (err < 0) {
ABSL_RAW_LOG(FATAL, "FUTEX_WAKE failed with errno %d\n", errno);
}
}
#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_CONDVAR
class PthreadMutexHolder {
public:
explicit PthreadMutexHolder(pthread_mutex_t *mu) : mu_(mu) {
const int err = pthread_mutex_lock(mu_);
if (err != 0) {
ABSL_RAW_LOG(FATAL, "pthread_mutex_lock failed: %d", err);
}
}
PthreadMutexHolder(const PthreadMutexHolder &rhs) = delete;
PthreadMutexHolder &operator=(const PthreadMutexHolder &rhs) = delete;
~PthreadMutexHolder() {
const int err = pthread_mutex_unlock(mu_);
if (err != 0) {
ABSL_RAW_LOG(FATAL, "pthread_mutex_unlock failed: %d", err);
}
}
private:
pthread_mutex_t *mu_;
};
void Waiter::Init() {
const int err = pthread_mutex_init(&mu_, 0);
if (err != 0) {
ABSL_RAW_LOG(FATAL, "pthread_mutex_init failed: %d", err);
}
const int err2 = pthread_cond_init(&cv_, 0);
if (err2 != 0) {
ABSL_RAW_LOG(FATAL, "pthread_cond_init failed: %d", err2);
}
waiter_count_.store(0, std::memory_order_relaxed);
wakeup_count_.store(0, std::memory_order_relaxed);
}
bool Waiter::Wait(KernelTimeout t) {
struct timespec abs_timeout;
if (t.has_timeout()) {
abs_timeout = t.MakeAbsTimespec();
}
PthreadMutexHolder h(&mu_);
waiter_count_.fetch_add(1, std::memory_order_relaxed);
// Loop until we find a wakeup to consume or timeout.
while (true) {
int x = wakeup_count_.load(std::memory_order_relaxed);
if (x != 0) {
if (!wakeup_count_.compare_exchange_weak(x, x - 1,
std::memory_order_acquire,
std::memory_order_relaxed)) {
continue; // Raced with someone, retry.
}
// Successfully consumed a wakeup, we're done.
waiter_count_.fetch_sub(1, std::memory_order_relaxed);
return true;
}
// No wakeups available, time to wait.
if (!t.has_timeout()) {
const int err = pthread_cond_wait(&cv_, &mu_);
if (err != 0) {
ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err);
}
} else {
const int err = pthread_cond_timedwait(&cv_, &mu_, &abs_timeout);
if (err == ETIMEDOUT) {
waiter_count_.fetch_sub(1, std::memory_order_relaxed);
return false;
}
if (err != 0) {
ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err);
}
}
MaybeBecomeIdle();
}
}
void Waiter::Post() {
wakeup_count_.fetch_add(1, std::memory_order_release);
Poke();
}
void Waiter::Poke() {
if (waiter_count_.load(std::memory_order_relaxed) == 0) {
return;
}
// Potentially a waker. Take the lock and check again.
PthreadMutexHolder h(&mu_);
if (waiter_count_.load(std::memory_order_relaxed) == 0) {
return;
}
const int err = pthread_cond_signal(&cv_);
if (err != 0) {
ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err);
}
}
#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM
void Waiter::Init() {
if (sem_init(&sem_, 0, 0) != 0) {
ABSL_RAW_LOG(FATAL, "sem_init failed with errno %d\n", errno);
}
wakeups_.store(0, std::memory_order_relaxed);
}
bool Waiter::Wait(KernelTimeout t) {
struct timespec abs_timeout;
if (t.has_timeout()) {
abs_timeout = t.MakeAbsTimespec();
}
// Loop until we timeout or consume a wakeup.
while (true) {
int x = wakeups_.load(std::memory_order_relaxed);
if (x != 0) {
if (!wakeups_.compare_exchange_weak(x, x - 1,
std::memory_order_acquire,
std::memory_order_relaxed)) {
continue; // Raced with someone, retry.
}
// Successfully consumed a wakeup, we're done.
return true;
}
// Nothing to consume, wait (looping on EINTR).
while (true) {
if (!t.has_timeout()) {
if (sem_wait(&sem_) == 0) break;
if (errno == EINTR) continue;
ABSL_RAW_LOG(FATAL, "sem_wait failed: %d", errno);
} else {
if (sem_timedwait(&sem_, &abs_timeout) == 0) break;
if (errno == EINTR) continue;
if (errno == ETIMEDOUT) return false;
ABSL_RAW_LOG(FATAL, "sem_timedwait failed: %d", errno);
}
}
MaybeBecomeIdle();
}
}
void Waiter::Post() {
wakeups_.fetch_add(1, std::memory_order_release); // Post a wakeup.
Poke();
}
void Waiter::Poke() {
if (sem_post(&sem_) != 0) { // Wake any semaphore waiter.
ABSL_RAW_LOG(FATAL, "sem_post failed with errno %d\n", errno);
}
}
#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_WIN32
class LockHolder {
public:
explicit LockHolder(SRWLOCK* mu) : mu_(mu) {
AcquireSRWLockExclusive(mu_);
}
LockHolder(const LockHolder&) = delete;
LockHolder& operator=(const LockHolder&) = delete;
~LockHolder() {
ReleaseSRWLockExclusive(mu_);
}
private:
SRWLOCK* mu_;
};
void Waiter::Init() {
InitializeSRWLock(&mu_);
InitializeConditionVariable(&cv_);
waiter_count_.store(0, std::memory_order_relaxed);
wakeup_count_.store(0, std::memory_order_relaxed);
}
bool Waiter::Wait(KernelTimeout t) {
LockHolder h(&mu_);
waiter_count_.fetch_add(1, std::memory_order_relaxed);
// Loop until we find a wakeup to consume or timeout.
while (true) {
int x = wakeup_count_.load(std::memory_order_relaxed);
if (x != 0) {
if (!wakeup_count_.compare_exchange_weak(x, x - 1,
std::memory_order_acquire,
std::memory_order_relaxed)) {
continue; // Raced with someone, retry.
}
// Successfully consumed a wakeup, we're done.
waiter_count_.fetch_sub(1, std::memory_order_relaxed);
return true;
}
// No wakeups available, time to wait.
if (!SleepConditionVariableSRW(
&cv_, &mu_, t.InMillisecondsFromNow(), 0)) {
// GetLastError() returns a Win32 DWORD, but we assign to
// unsigned long to simplify the ABSL_RAW_LOG case below. The uniform
// initialization guarantees this is not a narrowing conversion.
const unsigned long err{GetLastError()}; // NOLINT(runtime/int)
if (err == ERROR_TIMEOUT) {
waiter_count_.fetch_sub(1, std::memory_order_relaxed);
return false;
} else {
ABSL_RAW_LOG(FATAL, "SleepConditionVariableSRW failed: %lu", err);
}
}
MaybeBecomeIdle();
}
}
void Waiter::Post() {
wakeup_count_.fetch_add(1, std::memory_order_release);
Poke();
}
void Waiter::Poke() {
if (waiter_count_.load(std::memory_order_relaxed) == 0) {
return;
}
// Potentially a waker. Take the lock and check again.
LockHolder h(&mu_);
if (waiter_count_.load(std::memory_order_relaxed) == 0) {
return;
}
WakeConditionVariable(&cv_);
}
#else
#error Unknown ABSL_WAITER_MODE
#endif
} // namespace synchronization_internal
} // namespace absl

View file

@ -0,0 +1,138 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef ABSL_SYNCHRONIZATION_INTERNAL_WAITER_H_
#define ABSL_SYNCHRONIZATION_INTERNAL_WAITER_H_
#include "absl/base/config.h"
#ifdef _WIN32
#include <windows.h>
#else
#include <pthread.h>
#endif
#ifdef ABSL_HAVE_SEMAPHORE_H
#include <semaphore.h>
#endif
#include <atomic>
#include "absl/base/internal/thread_identity.h"
#include "absl/synchronization/internal/kernel_timeout.h"
// May be chosen at compile time via -DABSL_FORCE_WAITER_MODE=<index>
#define ABSL_WAITER_MODE_FUTEX 0
#define ABSL_WAITER_MODE_SEM 1
#define ABSL_WAITER_MODE_CONDVAR 2
#define ABSL_WAITER_MODE_WIN32 3
#if defined(ABSL_FORCE_WAITER_MODE)
#define ABSL_WAITER_MODE ABSL_FORCE_WAITER_MODE
#elif defined(_WIN32)
#define ABSL_WAITER_MODE ABSL_WAITER_MODE_WIN32
#elif defined(__linux__)
#define ABSL_WAITER_MODE ABSL_WAITER_MODE_FUTEX
#elif defined(ABSL_HAVE_SEMAPHORE_H)
#define ABSL_WAITER_MODE ABSL_WAITER_MODE_SEM
#else
#define ABSL_WAITER_MODE ABSL_WAITER_MODE_CONDVAR
#endif
namespace absl {
namespace synchronization_internal {
// Waiter is an OS-specific semaphore.
class Waiter {
public:
// No constructor, instances use the reserved space in ThreadIdentity.
// All initialization logic belongs in `Init()`.
Waiter() = delete;
Waiter(const Waiter&) = delete;
Waiter& operator=(const Waiter&) = delete;
// Prepare any data to track waits.
void Init();
// Blocks the calling thread until a matching call to `Post()` or
// `t` has passed. Returns `true` if woken (`Post()` called),
// `false` on timeout.
bool Wait(KernelTimeout t);
// Restart the caller of `Wait()` as with a normal semaphore.
void Post();
// If anyone is waiting, wake them up temporarily and cause them to
// call `MaybeBecomeIdle()`. They will then return to waiting for a
// `Post()` or timeout.
void Poke();
// Returns the Waiter associated with the identity.
static Waiter* GetWaiter(base_internal::ThreadIdentity* identity) {
static_assert(
sizeof(Waiter) <= sizeof(base_internal::ThreadIdentity::WaiterState),
"Insufficient space for Waiter");
return reinterpret_cast<Waiter*>(identity->waiter_state.data);
}
// How many periods to remain idle before releasing resources
#ifndef THREAD_SANITIZER
static const int kIdlePeriods = 60;
#else
// Memory consumption under ThreadSanitizer is a serious concern,
// so we release resources sooner. The value of 1 leads to 1 to 2 second
// delay before marking a thread as idle.
static const int kIdlePeriods = 1;
#endif
private:
#if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX
// Futexes are defined by specification to be ints.
// Thus std::atomic<int> must be just an int with lockfree methods.
std::atomic<int> futex_;
static_assert(sizeof(int) == sizeof(futex_), "Wrong size for futex");
#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_CONDVAR
pthread_mutex_t mu_;
pthread_cond_t cv_;
std::atomic<int> waiter_count_;
std::atomic<int> wakeup_count_; // Unclaimed wakeups, written under lock.
#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM
sem_t sem_;
// This seems superfluous, but for Poke() we need to cause spurious
// wakeups on the semaphore. Hence we can't actually use the
// semaphore's count.
std::atomic<int> wakeups_;
#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_WIN32
// The Windows API has lots of choices for synchronization
// primivitives. We are using SRWLOCK and CONDITION_VARIABLE
// because they don't require a destructor to release system
// resources.
SRWLOCK mu_;
CONDITION_VARIABLE cv_;
std::atomic<int> waiter_count_;
std::atomic<int> wakeup_count_;
#else
#error Unknown ABSL_WAITER_MODE
#endif
};
} // namespace synchronization_internal
} // namespace absl
#endif // ABSL_SYNCHRONIZATION_INTERNAL_WAITER_H_

File diff suppressed because it is too large Load diff

1013
absl/synchronization/mutex.h Normal file

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,84 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "absl/synchronization/notification.h"
#include <atomic>
#include "absl/base/attributes.h"
#include "absl/base/internal/raw_logging.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
namespace absl {
void Notification::Notify() {
MutexLock l(&this->mutex_);
#ifndef NDEBUG
if (ABSL_PREDICT_FALSE(notified_yet_.load(std::memory_order_relaxed))) {
ABSL_RAW_LOG(
FATAL,
"Notify() method called more than once for Notification object %p",
static_cast<void *>(this));
}
#endif
notified_yet_.store(true, std::memory_order_release);
}
Notification::~Notification() {
// Make sure that the thread running Notify() exits before the object is
// destructed.
MutexLock l(&this->mutex_);
}
static inline bool HasBeenNotifiedInternal(
const std::atomic<bool> *notified_yet) {
return notified_yet->load(std::memory_order_acquire);
}
bool Notification::HasBeenNotified() const {
return HasBeenNotifiedInternal(&this->notified_yet_);
}
void Notification::WaitForNotification() const {
if (!HasBeenNotifiedInternal(&this->notified_yet_)) {
this->mutex_.LockWhen(Condition(&HasBeenNotifiedInternal,
&this->notified_yet_));
this->mutex_.Unlock();
}
}
bool Notification::WaitForNotificationWithTimeout(
absl::Duration timeout) const {
bool notified = HasBeenNotifiedInternal(&this->notified_yet_);
if (!notified) {
notified = this->mutex_.LockWhenWithTimeout(
Condition(&HasBeenNotifiedInternal, &this->notified_yet_), timeout);
this->mutex_.Unlock();
}
return notified;
}
bool Notification::WaitForNotificationWithDeadline(absl::Time deadline) const {
bool notified = HasBeenNotifiedInternal(&this->notified_yet_);
if (!notified) {
notified = this->mutex_.LockWhenWithDeadline(
Condition(&HasBeenNotifiedInternal, &this->notified_yet_), deadline);
this->mutex_.Unlock();
}
return notified;
}
} // namespace absl

View file

@ -0,0 +1,112 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// -----------------------------------------------------------------------------
// notification.h
// -----------------------------------------------------------------------------
//
// This header file defines a `Notification` abstraction, which allows threads
// to receive notification of a single occurrence of a single event.
//
// The `Notification` object maintains a private boolean "notified" state that
// transitions to `true` at most once. The `Notification` class provides the
// following primary member functions:
// * `HasBeenNotified() `to query its state
// * `WaitForNotification*()` to have threads wait until the "notified" state
// is `true`.
// * `Notify()` to set the notification's "notified" state to `true` and
// notify all waiting threads that the event has occurred.
// This method may only be called once.
//
// Note that while `Notify()` may only be called once, it is perfectly valid to
// call any of the `WaitForNotification*()` methods multiple times, from
// multiple threads -- even after the notification's "notified" state has been
// set -- in which case those methods will immediately return.
//
// Note that the lifetime of a `Notification` requires careful consideration;
// it might not be safe to destroy a notification after calling `Notify()` since
// it is still legal for other threads to call `WaitForNotification*()` methods
// on the notification. However, observers responding to a "notified" state of
// `true` can safely delete the notification without interfering with the call
// to `Notify()` in the other thread.
//
// Memory ordering: For any threads X and Y, if X calls `Notify()`, then any
// action taken by X before it calls `Notify()` is visible to thread Y after:
// * Y returns from `WaitForNotification()`, or
// * Y receives a `true` return value from either `HasBeenNotified()` or
// `WaitForNotificationWithTimeout()`.
#ifndef ABSL_SYNCHRONIZATION_NOTIFICATION_H_
#define ABSL_SYNCHRONIZATION_NOTIFICATION_H_
#include <atomic>
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
namespace absl {
// -----------------------------------------------------------------------------
// Notification
// -----------------------------------------------------------------------------
class Notification {
public:
// Initializes the "notified" state to unnotified.
Notification() : notified_yet_(false) {}
explicit Notification(bool prenotify) : notified_yet_(prenotify) {}
Notification(const Notification&) = delete;
Notification& operator=(const Notification&) = delete;
~Notification();
// Notification::HasBeenNotified()
//
// Returns the value of the notification's internal "notified" state.
bool HasBeenNotified() const;
// Notification::WaitForNotification()
//
// Blocks the calling thread until the notification's "notified" state is
// `true`. Note that if `Notify()` has been previously called on this
// notification, this function will immediately return.
void WaitForNotification() const;
// Notification::WaitForNotificationWithTimeout()
//
// Blocks until either the notification's "notified" state is `true` (which
// may occur immediately) or the timeout has elapsed, returning the value of
// its "notified" state in either case.
bool WaitForNotificationWithTimeout(absl::Duration timeout) const;
// Notification::WaitForNotificationWithDeadline()
//
// Blocks until either the notification's "notified" state is `true` (which
// may occur immediately) or the deadline has expired, returning the value of
// its "notified" state in either case.
bool WaitForNotificationWithDeadline(absl::Time deadline) const;
// Notification::Notify()
//
// Sets the "notified" state of this notification to `true` and wakes waiting
// threads. Note: do not call `Notify()` multiple times on the same
// `Notification`; calling `Notify()` more than once on the same notification
// results in undefined behavior.
void Notify();
private:
mutable Mutex mutex_;
std::atomic<bool> notified_yet_; // written under mutex_
};
} // namespace absl
#endif // ABSL_SYNCHRONIZATION_NOTIFICATION_H_

View file

@ -0,0 +1,124 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "absl/synchronization/notification.h"
#include <thread> // NOLINT(build/c++11)
#include <vector>
#include "gtest/gtest.h"
#include "absl/synchronization/mutex.h"
namespace absl {
// A thread-safe class that holds a counter.
class ThreadSafeCounter {
public:
ThreadSafeCounter() : count_(0) {}
void Increment() {
MutexLock lock(&mutex_);
++count_;
}
int Get() const {
MutexLock lock(&mutex_);
return count_;
}
void WaitUntilGreaterOrEqual(int n) {
MutexLock lock(&mutex_);
auto cond = [this, n]() { return count_ >= n; };
mutex_.Await(Condition(&cond));
}
private:
mutable Mutex mutex_;
int count_;
};
// Runs the |i|'th worker thread for the tests in BasicTests(). Increments the
// |ready_counter|, waits on the |notification|, and then increments the
// |done_counter|.
static void RunWorker(int i, ThreadSafeCounter* ready_counter,
Notification* notification,
ThreadSafeCounter* done_counter) {
ready_counter->Increment();
notification->WaitForNotification();
done_counter->Increment();
}
// Tests that the |notification| properly blocks and awakens threads. Assumes
// that the |notification| is not yet triggered. If |notify_before_waiting| is
// true, the |notification| is triggered before any threads are created, so the
// threads never block in WaitForNotification(). Otherwise, the |notification|
// is triggered at a later point when most threads are likely to be blocking in
// WaitForNotification().
static void BasicTests(bool notify_before_waiting, Notification* notification) {
EXPECT_FALSE(notification->HasBeenNotified());
EXPECT_FALSE(
notification->WaitForNotificationWithTimeout(absl::Milliseconds(0)));
EXPECT_FALSE(notification->WaitForNotificationWithDeadline(absl::Now()));
absl::Time start = absl::Now();
EXPECT_FALSE(
notification->WaitForNotificationWithTimeout(absl::Milliseconds(50)));
EXPECT_LE(start + absl::Milliseconds(50), absl::Now());
ThreadSafeCounter ready_counter;
ThreadSafeCounter done_counter;
if (notify_before_waiting) {
notification->Notify();
}
// Create a bunch of threads that increment the |done_counter| after being
// notified.
const int kNumThreads = 10;
std::vector<std::thread> workers;
for (int i = 0; i < kNumThreads; ++i) {
workers.push_back(std::thread(&RunWorker, i, &ready_counter, notification,
&done_counter));
}
if (!notify_before_waiting) {
ready_counter.WaitUntilGreaterOrEqual(kNumThreads);
// Workers have not been notified yet, so the |done_counter| should be
// unmodified.
EXPECT_EQ(0, done_counter.Get());
notification->Notify();
}
// After notifying and then joining the workers, both counters should be
// fully incremented.
notification->WaitForNotification(); // should exit immediately
EXPECT_TRUE(notification->HasBeenNotified());
EXPECT_TRUE(notification->WaitForNotificationWithTimeout(absl::Seconds(0)));
EXPECT_TRUE(notification->WaitForNotificationWithDeadline(absl::Now()));
for (std::thread& worker : workers) {
worker.join();
}
EXPECT_EQ(kNumThreads, ready_counter.Get());
EXPECT_EQ(kNumThreads, done_counter.Get());
}
TEST(NotificationTest, SanityTest) {
Notification local_notification1, local_notification2;
BasicTests(false, &local_notification1);
BasicTests(true, &local_notification2);
}
} // namespace absl