merge(3p/absl): subtree merge of Abseil up to e19260f

... notably, this includes Abseil's own StatusOr type, which
conflicted with our implementation (that was taken from TensorFlow).

Change-Id: Ie7d6764b64055caaeb8dc7b6b9d066291e6b538f
This commit is contained in:
Vincent Ambo 2020-11-21 14:43:54 +01:00
parent cc27324d02
commit 082c006c04
854 changed files with 11260 additions and 5296 deletions

View file

@ -39,6 +39,7 @@
#include <thread> // NOLINT(build/c++11)
#include "absl/base/attributes.h"
#include "absl/base/call_once.h"
#include "absl/base/config.h"
#include "absl/base/dynamic_annotations.h"
#include "absl/base/internal/atomic_hook.h"
@ -49,6 +50,7 @@
#include "absl/base/internal/spinlock.h"
#include "absl/base/internal/sysinfo.h"
#include "absl/base/internal/thread_identity.h"
#include "absl/base/internal/tsan_mutex_interface.h"
#include "absl/base/port.h"
#include "absl/debugging/stacktrace.h"
#include "absl/debugging/symbolize.h"
@ -58,6 +60,7 @@
using absl::base_internal::CurrentThreadIdentityIfPresent;
using absl::base_internal::PerThreadSynch;
using absl::base_internal::SchedulingGuard;
using absl::base_internal::ThreadIdentity;
using absl::synchronization_internal::GetOrCreateCurrentThreadIdentity;
using absl::synchronization_internal::GraphCycles;
@ -75,7 +78,7 @@ ABSL_NAMESPACE_BEGIN
namespace {
#if defined(THREAD_SANITIZER)
#if defined(ABSL_HAVE_THREAD_SANITIZER)
constexpr OnDeadlockCycle kDeadlockDetectionDefault = OnDeadlockCycle::kIgnore;
#else
constexpr OnDeadlockCycle kDeadlockDetectionDefault = OnDeadlockCycle::kAbort;
@ -85,31 +88,9 @@ ABSL_CONST_INIT std::atomic<OnDeadlockCycle> synch_deadlock_detection(
kDeadlockDetectionDefault);
ABSL_CONST_INIT std::atomic<bool> synch_check_invariants(false);
// ------------------------------------------ spinlock support
// Make sure read-only globals used in the Mutex code are contained on the
// same cacheline and cacheline aligned to eliminate any false sharing with
// other globals from this and other modules.
static struct MutexGlobals {
MutexGlobals() {
// Find machine-specific data needed for Delay() and
// TryAcquireWithSpinning(). This runs in the global constructor
// sequence, and before that zeros are safe values.
num_cpus = absl::base_internal::NumCPUs();
spinloop_iterations = num_cpus > 1 ? 1500 : 0;
}
int num_cpus;
int spinloop_iterations;
// Pad this struct to a full cacheline to prevent false sharing.
char padding[ABSL_CACHELINE_SIZE - 2 * sizeof(int)];
} ABSL_CACHELINE_ALIGNED mutex_globals;
static_assert(
sizeof(MutexGlobals) == ABSL_CACHELINE_SIZE,
"MutexGlobals must occupy an entire cacheline to prevent false sharing");
ABSL_INTERNAL_ATOMIC_HOOK_ATTRIBUTES
absl::base_internal::AtomicHook<void (*)(int64_t wait_cycles)>
submit_profile_data;
absl::base_internal::AtomicHook<void (*)(int64_t wait_cycles)>
submit_profile_data;
ABSL_INTERNAL_ATOMIC_HOOK_ATTRIBUTES absl::base_internal::AtomicHook<void (*)(
const char *msg, const void *obj, int64_t wait_cycles)>
mutex_tracer;
@ -143,33 +124,55 @@ void RegisterSymbolizer(bool (*fn)(const void *pc, char *out, int out_size)) {
symbolizer.Store(fn);
}
// spinlock delay on iteration c. Returns new c.
struct ABSL_CACHELINE_ALIGNED MutexGlobals {
absl::once_flag once;
int num_cpus = 0;
int spinloop_iterations = 0;
};
static const MutexGlobals& GetMutexGlobals() {
ABSL_CONST_INIT static MutexGlobals data;
absl::base_internal::LowLevelCallOnce(&data.once, [&]() {
data.num_cpus = absl::base_internal::NumCPUs();
data.spinloop_iterations = data.num_cpus > 1 ? 1500 : 0;
});
return data;
}
// Spinlock delay on iteration c. Returns new c.
namespace {
enum DelayMode { AGGRESSIVE, GENTLE };
};
static int Delay(int32_t c, DelayMode mode) {
namespace synchronization_internal {
int MutexDelay(int32_t c, int mode) {
// If this a uniprocessor, only yield/sleep. Otherwise, if the mode is
// aggressive then spin many times before yielding. If the mode is
// gentle then spin only a few times before yielding. Aggressive spinning is
// used to ensure that an Unlock() call, which must get the spin lock for
// any thread to make progress gets it without undue delay.
int32_t limit = (mutex_globals.num_cpus > 1) ?
((mode == AGGRESSIVE) ? 5000 : 250) : 0;
const int32_t limit =
GetMutexGlobals().num_cpus > 1 ? (mode == AGGRESSIVE ? 5000 : 250) : 0;
if (c < limit) {
c++; // spin
// Spin.
c++;
} else {
SchedulingGuard::ScopedEnable enable_rescheduling;
ABSL_TSAN_MUTEX_PRE_DIVERT(nullptr, 0);
if (c == limit) { // yield once
if (c == limit) {
// Yield once.
AbslInternalMutexYield();
c++;
} else { // then wait
} else {
// Then wait.
absl::SleepFor(absl::Microseconds(10));
c = 0;
}
ABSL_TSAN_MUTEX_POST_DIVERT(nullptr, 0);
}
return (c);
return c;
}
} // namespace synchronization_internal
// --------------------------Generic atomic ops
// Ensure that "(*pv & bits) == bits" by doing an atomic update of "*pv" to
@ -489,7 +492,7 @@ struct SynchWaitParams {
std::atomic<intptr_t> *cv_word;
int64_t contention_start_cycles; // Time (in cycles) when this thread started
// to contend for the mutex.
// to contend for the mutex.
};
struct SynchLocksHeld {
@ -703,7 +706,7 @@ static constexpr bool kDebugMode = false;
static constexpr bool kDebugMode = true;
#endif
#ifdef THREAD_SANITIZER
#ifdef ABSL_INTERNAL_HAVE_TSAN_INTERFACE
static unsigned TsanFlags(Mutex::MuHow how) {
return how == kShared ? __tsan_mutex_read_lock : 0;
}
@ -1054,6 +1057,7 @@ static PerThreadSynch *DequeueAllWakeable(PerThreadSynch *head,
// Try to remove thread s from the list of waiters on this mutex.
// Does nothing if s is not on the waiter list.
void Mutex::TryRemove(PerThreadSynch *s) {
SchedulingGuard::ScopedDisable disable_rescheduling;
intptr_t v = mu_.load(std::memory_order_relaxed);
// acquire spinlock & lock
if ((v & (kMuWait | kMuSpin | kMuWriter | kMuReader)) == kMuWait &&
@ -1118,7 +1122,7 @@ ABSL_XRAY_LOG_ARGS(1) void Mutex::Block(PerThreadSynch *s) {
this->TryRemove(s);
int c = 0;
while (s->next != nullptr) {
c = Delay(c, GENTLE);
c = synchronization_internal::MutexDelay(c, GENTLE);
this->TryRemove(s);
}
if (kDebugMode) {
@ -1437,7 +1441,7 @@ void Mutex::AssertNotHeld() const {
// Attempt to acquire *mu, and return whether successful. The implementation
// may spin for a short while if the lock cannot be acquired immediately.
static bool TryAcquireWithSpinning(std::atomic<intptr_t>* mu) {
int c = mutex_globals.spinloop_iterations;
int c = GetMutexGlobals().spinloop_iterations;
do { // do/while somewhat faster on AMD
intptr_t v = mu->load(std::memory_order_relaxed);
if ((v & (kMuReader|kMuEvent)) != 0) {
@ -1764,7 +1768,7 @@ static inline bool EvalConditionAnnotated(const Condition *cond, Mutex *mu,
// All memory accesses are ignored inside of mutex operations + for unlock
// operation tsan considers that we've already released the mutex.
bool res = false;
#ifdef THREAD_SANITIZER
#ifdef ABSL_INTERNAL_HAVE_TSAN_INTERFACE
const int flags = read_lock ? __tsan_mutex_read_lock : 0;
const int tryflags = flags | (trylock ? __tsan_mutex_try_lock : 0);
#endif
@ -1814,9 +1818,9 @@ static inline bool EvalConditionIgnored(Mutex *mu, const Condition *cond) {
// So we "divert" (which un-ignores both memory accesses and synchronization)
// and then separately turn on ignores of memory accesses.
ABSL_TSAN_MUTEX_PRE_DIVERT(mu, 0);
ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN();
ABSL_ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN();
bool res = cond->Eval();
ANNOTATE_IGNORE_READS_AND_WRITES_END();
ABSL_ANNOTATE_IGNORE_READS_AND_WRITES_END();
ABSL_TSAN_MUTEX_POST_DIVERT(mu, 0);
static_cast<void>(mu); // Prevent unused param warning in non-TSAN builds.
return res;
@ -1897,6 +1901,7 @@ static void CheckForMutexCorruption(intptr_t v, const char* label) {
}
void Mutex::LockSlowLoop(SynchWaitParams *waitp, int flags) {
SchedulingGuard::ScopedDisable disable_rescheduling;
int c = 0;
intptr_t v = mu_.load(std::memory_order_relaxed);
if ((v & kMuEvent) != 0) {
@ -1998,7 +2003,8 @@ void Mutex::LockSlowLoop(SynchWaitParams *waitp, int flags) {
ABSL_RAW_CHECK(
waitp->thread->waitp == nullptr || waitp->thread->suppress_fatal_errors,
"detected illegal recursion into Mutex code");
c = Delay(c, GENTLE); // delay, then try again
// delay, then try again
c = synchronization_internal::MutexDelay(c, GENTLE);
}
ABSL_RAW_CHECK(
waitp->thread->waitp == nullptr || waitp->thread->suppress_fatal_errors,
@ -2016,6 +2022,7 @@ void Mutex::LockSlowLoop(SynchWaitParams *waitp, int flags) {
// or it is in the process of blocking on a condition variable; it must requeue
// itself on the mutex/condvar to wait for its condition to become true.
ABSL_ATTRIBUTE_NOINLINE void Mutex::UnlockSlow(SynchWaitParams *waitp) {
SchedulingGuard::ScopedDisable disable_rescheduling;
intptr_t v = mu_.load(std::memory_order_relaxed);
this->AssertReaderHeld();
CheckForMutexCorruption(v, "Unlock");
@ -2292,7 +2299,8 @@ ABSL_ATTRIBUTE_NOINLINE void Mutex::UnlockSlow(SynchWaitParams *waitp) {
mu_.store(nv, std::memory_order_release);
break; // out of for(;;)-loop
}
c = Delay(c, AGGRESSIVE); // aggressive here; no one can proceed till we do
// aggressive here; no one can proceed till we do
c = synchronization_internal::MutexDelay(c, AGGRESSIVE);
} // end of for(;;)-loop
if (wake_list != kPerThreadSynchNull) {
@ -2304,7 +2312,8 @@ ABSL_ATTRIBUTE_NOINLINE void Mutex::UnlockSlow(SynchWaitParams *waitp) {
if (!cond_waiter) {
// Sample lock contention events only if the (first) waiter was trying to
// acquire the lock, not waiting on a condition variable or Condition.
int64_t wait_cycles = base_internal::CycleClock::Now() - enqueue_timestamp;
int64_t wait_cycles =
base_internal::CycleClock::Now() - enqueue_timestamp;
mutex_tracer("slow release", this, wait_cycles);
ABSL_TSAN_MUTEX_PRE_DIVERT(this, 0);
submit_profile_data(enqueue_timestamp);
@ -2331,6 +2340,7 @@ void Mutex::Trans(MuHow how) {
// It will later acquire the mutex with high probability. Otherwise, we
// enqueue thread w on this mutex.
void Mutex::Fer(PerThreadSynch *w) {
SchedulingGuard::ScopedDisable disable_rescheduling;
int c = 0;
ABSL_RAW_CHECK(w->waitp->cond == nullptr,
"Mutex::Fer while waiting on Condition");
@ -2380,7 +2390,7 @@ void Mutex::Fer(PerThreadSynch *w) {
return;
}
}
c = Delay(c, GENTLE);
c = synchronization_internal::MutexDelay(c, GENTLE);
}
}
@ -2429,6 +2439,7 @@ CondVar::~CondVar() {
// Remove thread s from the list of waiters on this condition variable.
void CondVar::Remove(PerThreadSynch *s) {
SchedulingGuard::ScopedDisable disable_rescheduling;
intptr_t v;
int c = 0;
for (v = cv_.load(std::memory_order_relaxed);;
@ -2457,7 +2468,8 @@ void CondVar::Remove(PerThreadSynch *s) {
std::memory_order_release);
return;
} else {
c = Delay(c, GENTLE); // try again after a delay
// try again after a delay
c = synchronization_internal::MutexDelay(c, GENTLE);
}
}
}
@ -2490,7 +2502,7 @@ static void CondVarEnqueue(SynchWaitParams *waitp) {
!cv_word->compare_exchange_weak(v, v | kCvSpin,
std::memory_order_acquire,
std::memory_order_relaxed)) {
c = Delay(c, GENTLE);
c = synchronization_internal::MutexDelay(c, GENTLE);
v = cv_word->load(std::memory_order_relaxed);
}
ABSL_RAW_CHECK(waitp->thread->waitp == nullptr, "waiting when shouldn't be");
@ -2589,6 +2601,7 @@ void CondVar::Wakeup(PerThreadSynch *w) {
}
void CondVar::Signal() {
SchedulingGuard::ScopedDisable disable_rescheduling;
ABSL_TSAN_MUTEX_PRE_SIGNAL(nullptr, 0);
intptr_t v;
int c = 0;
@ -2621,7 +2634,7 @@ void CondVar::Signal() {
ABSL_TSAN_MUTEX_POST_SIGNAL(nullptr, 0);
return;
} else {
c = Delay(c, GENTLE);
c = synchronization_internal::MutexDelay(c, GENTLE);
}
}
ABSL_TSAN_MUTEX_POST_SIGNAL(nullptr, 0);
@ -2658,7 +2671,8 @@ void CondVar::SignalAll () {
ABSL_TSAN_MUTEX_POST_SIGNAL(nullptr, 0);
return;
} else {
c = Delay(c, GENTLE); // try again after a delay
// try again after a delay
c = synchronization_internal::MutexDelay(c, GENTLE);
}
}
ABSL_TSAN_MUTEX_POST_SIGNAL(nullptr, 0);
@ -2671,7 +2685,7 @@ void ReleasableMutexLock::Release() {
this->mu_ = nullptr;
}
#ifdef THREAD_SANITIZER
#ifdef ABSL_HAVE_THREAD_SANITIZER
extern "C" void __tsan_read1(void *addr);
#else
#define __tsan_read1(addr) // do nothing if TSan not enabled