Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/external/source/vncdll/winvnc/omnithread/nt.cpp
Views: 11784
// Package : omnithread1// omnithread/nt.cc Created : 6/95 tjr2//3// Copyright (C) 1999 AT&T Laboratories Cambridge. All Rights Reserved.4//5// This file is part of the omnithread library6//7// The omnithread library is free software; you can redistribute it and/or8// modify it under the terms of the GNU Library General Public9// License as published by the Free Software Foundation; either10// version 2 of the License, or (at your option) any later version.11//12// This library is distributed in the hope that it will be useful,13// but WITHOUT ANY WARRANTY; without even the implied warranty of14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU15// Library General Public License for more details.16//17// You should have received a copy of the GNU Library General Public18// License along with this library; if not, write to the Free19// Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA20// 02111-1307, USA21//2223//24// Implementation of OMNI thread abstraction for NT threads25//2627#include <stdlib.h>28#include <errno.h>29#include "omnithread.h"30#include <process.h>3132#define DB(x) // x33//#include <iostream.h> or #include <iostream> if DB is on.3435static void get_time_now(unsigned long* abs_sec, unsigned long* abs_nsec);3637///////////////////////////////////////////////////////////////////////////38//39// Mutex40//41///////////////////////////////////////////////////////////////////////////424344omni_mutex::omni_mutex(void)45{46InitializeCriticalSection(&crit);47}4849omni_mutex::~omni_mutex(void)50{51DeleteCriticalSection(&crit);52}5354void55omni_mutex::lock(void)56{57EnterCriticalSection(&crit);58}5960void61omni_mutex::unlock(void)62{63LeaveCriticalSection(&crit);64}65666768///////////////////////////////////////////////////////////////////////////69//70// Condition variable71//72///////////////////////////////////////////////////////////////////////////737475//76// Condition variables are tricky to implement using NT synchronisation77// primitives, since none of them have the atomic "release mutex and wait to be78// signalled" which is central to the idea of a condition variable. To get79// around this the solution is to record which threads are waiting and80// explicitly wake up those threads.81//82// Here we implement a condition variable using a list of waiting threads83// (protected by a critical section), and a per-thread semaphore (which84// actually only needs to be a binary semaphore).85//86// To wait on the cv, a thread puts itself on the list of waiting threads for87// that cv, then releases the mutex and waits on its own personal semaphore. A88// signalling thread simply takes a thread from the head of the list and kicks89// that thread's semaphore. Broadcast is simply implemented by kicking the90// semaphore of each waiting thread.91//92// The only other tricky part comes when a thread gets a timeout from a timed93// wait on its semaphore. Between returning with a timeout from the wait and94// entering the critical section, a signalling thread could get in, kick the95// waiting thread's semaphore and remove it from the list. If this happens,96// the waiting thread's semaphore is now out of step so it needs resetting, and97// the thread should indicate that it was signalled rather than that it timed98// out.99//100// It is possible that the thread calling wait or timedwait is not a101// omni_thread. In this case we have to provide a temporary data structure,102// i.e. for the duration of the call, for the thread to link itself on the103// list of waiting threads. _internal_omni_thread_dummy provides such104// a data structure and _internal_omni_thread_helper is a helper class to105// deal with this special case for wait() and timedwait(). Once created,106// the _internal_omni_thread_dummy is cached for use by the next wait() or107// timedwait() call from a non-omni_thread. This is probably worth doing108// because creating a Semaphore is quite heavy weight.109110class _internal_omni_thread_helper;111112class _internal_omni_thread_dummy : public omni_thread {113public:114inline _internal_omni_thread_dummy() : next(0) { }115inline ~_internal_omni_thread_dummy() { }116friend class _internal_omni_thread_helper;117private:118_internal_omni_thread_dummy* next;119};120121class _internal_omni_thread_helper {122public:123inline _internal_omni_thread_helper() {124d = 0;125t = omni_thread::self();126if (!t) {127omni_mutex_lock sync(cachelock);128if (cache) {129d = cache;130cache = cache->next;131}132else {133d = new _internal_omni_thread_dummy;134}135t = d;136}137}138inline ~_internal_omni_thread_helper() {139if (d) {140omni_mutex_lock sync(cachelock);141d->next = cache;142cache = d;143}144}145inline operator omni_thread* () { return t; }146inline omni_thread* operator->() { return t; }147148static _internal_omni_thread_dummy* cache;149static omni_mutex cachelock;150151private:152_internal_omni_thread_dummy* d;153omni_thread* t;154};155156_internal_omni_thread_dummy* _internal_omni_thread_helper::cache = 0;157omni_mutex _internal_omni_thread_helper::cachelock;158159160omni_condition::omni_condition(omni_mutex* m) : mutex(m)161{162InitializeCriticalSection(&crit);163waiting_head = waiting_tail = NULL;164}165166167omni_condition::~omni_condition(void)168{169DeleteCriticalSection(&crit);170DB( if (waiting_head != NULL) {171cerr << "omni_condition::~omni_condition: list of waiting threads "172<< "is not empty\n";173} )174}175176177void178omni_condition::wait(void)179{180_internal_omni_thread_helper me;181182EnterCriticalSection(&crit);183184me->cond_next = NULL;185me->cond_prev = waiting_tail;186if (waiting_head == NULL)187waiting_head = me;188else189waiting_tail->cond_next = me;190waiting_tail = me;191me->cond_waiting = TRUE;192193LeaveCriticalSection(&crit);194195mutex->unlock();196197DWORD result = WaitForSingleObject(me->cond_semaphore, INFINITE);198199mutex->lock();200201if (result != WAIT_OBJECT_0)202throw omni_thread_fatal(GetLastError());203}204205206int207omni_condition::timedwait(unsigned long abs_sec, unsigned long abs_nsec)208{209_internal_omni_thread_helper me;210211EnterCriticalSection(&crit);212213me->cond_next = NULL;214me->cond_prev = waiting_tail;215if (waiting_head == NULL)216waiting_head = me;217else218waiting_tail->cond_next = me;219waiting_tail = me;220me->cond_waiting = TRUE;221222LeaveCriticalSection(&crit);223224mutex->unlock();225226unsigned long now_sec, now_nsec;227228get_time_now(&now_sec, &now_nsec);229230DWORD timeout = (abs_sec-now_sec) * 1000 + (abs_nsec-now_nsec) / 1000000;231232if ((abs_sec <= now_sec) && ((abs_sec < now_sec) || (abs_nsec < abs_nsec)))233timeout = 0;234235DWORD result = WaitForSingleObject(me->cond_semaphore, timeout);236237if (result == WAIT_TIMEOUT) {238EnterCriticalSection(&crit);239240if (me->cond_waiting) {241if (me->cond_prev != NULL)242me->cond_prev->cond_next = me->cond_next;243else244waiting_head = me->cond_next;245if (me->cond_next != NULL)246me->cond_next->cond_prev = me->cond_prev;247else248waiting_tail = me->cond_prev;249me->cond_waiting = FALSE;250251LeaveCriticalSection(&crit);252253mutex->lock();254return 0;255}256257//258// We timed out but another thread still signalled us. Wait for259// the semaphore (it _must_ have been signalled) to decrement it260// again. Return that we were signalled, not that we timed out.261//262263LeaveCriticalSection(&crit);264265result = WaitForSingleObject(me->cond_semaphore, INFINITE);266}267268if (result != WAIT_OBJECT_0)269throw omni_thread_fatal(GetLastError());270271mutex->lock();272return 1;273}274275276void277omni_condition::signal(void)278{279EnterCriticalSection(&crit);280281if (waiting_head != NULL) {282omni_thread* t = waiting_head;283waiting_head = t->cond_next;284if (waiting_head == NULL)285waiting_tail = NULL;286else287waiting_head->cond_prev = NULL;288t->cond_waiting = FALSE;289290if (!ReleaseSemaphore(t->cond_semaphore, 1, NULL)) {291int rc = GetLastError();292LeaveCriticalSection(&crit);293throw omni_thread_fatal(rc);294}295}296297LeaveCriticalSection(&crit);298}299300301void302omni_condition::broadcast(void)303{304EnterCriticalSection(&crit);305306while (waiting_head != NULL) {307omni_thread* t = waiting_head;308waiting_head = t->cond_next;309if (waiting_head == NULL)310waiting_tail = NULL;311else312waiting_head->cond_prev = NULL;313t->cond_waiting = FALSE;314315if (!ReleaseSemaphore(t->cond_semaphore, 1, NULL)) {316int rc = GetLastError();317LeaveCriticalSection(&crit);318throw omni_thread_fatal(rc);319}320}321322LeaveCriticalSection(&crit);323}324325326327///////////////////////////////////////////////////////////////////////////328//329// Counting semaphore330//331///////////////////////////////////////////////////////////////////////////332333334#define SEMAPHORE_MAX 0x7fffffff335336337omni_semaphore::omni_semaphore(unsigned int initial)338{339nt_sem = CreateSemaphore(NULL, initial, SEMAPHORE_MAX, NULL);340341if (nt_sem == NULL) {342DB( cerr << "omni_semaphore::omni_semaphore: CreateSemaphore error "343<< GetLastError() << endl );344throw omni_thread_fatal(GetLastError());345}346}347348349omni_semaphore::~omni_semaphore(void)350{351if (!CloseHandle(nt_sem)) {352DB( cerr << "omni_semaphore::~omni_semaphore: CloseHandle error "353<< GetLastError() << endl );354throw omni_thread_fatal(GetLastError());355}356}357358359void360omni_semaphore::wait(void)361{362if (WaitForSingleObject(nt_sem, INFINITE) != WAIT_OBJECT_0)363throw omni_thread_fatal(GetLastError());364}365366367int368omni_semaphore::trywait(void)369{370switch (WaitForSingleObject(nt_sem, 0)) {371372case WAIT_OBJECT_0:373return 1;374case WAIT_TIMEOUT:375return 0;376}377378throw omni_thread_fatal(GetLastError());379return 0; /* keep msvc++ happy */380}381382383void384omni_semaphore::post(void)385{386if (!ReleaseSemaphore(nt_sem, 1, NULL))387throw omni_thread_fatal(GetLastError());388}389390391392///////////////////////////////////////////////////////////////////////////393//394// Thread395//396///////////////////////////////////////////////////////////////////////////397398399//400// Static variables401//402403int omni_thread::init_t::count = 0;404405omni_mutex* omni_thread::next_id_mutex;406int omni_thread::next_id = 0;407static DWORD self_tls_index;408409//410// Initialisation function (gets called before any user code).411//412413omni_thread::init_t::init_t(void)414{415if (count++ != 0) // only do it once however many objects get created.416return;417418DB(cerr << "omni_thread::init: NT implementation initialising\n");419420self_tls_index = TlsAlloc();421422if (self_tls_index == 0xffffffff)423throw omni_thread_fatal(GetLastError());424425next_id_mutex = new omni_mutex;426427//428// Create object for this (i.e. initial) thread.429//430431omni_thread* t = new omni_thread;432433t->_state = STATE_RUNNING;434435if (!DuplicateHandle(GetCurrentProcess(), GetCurrentThread(),436GetCurrentProcess(), &t->handle,4370, FALSE, DUPLICATE_SAME_ACCESS))438throw omni_thread_fatal(GetLastError());439440t->nt_id = GetCurrentThreadId();441442DB(cerr << "initial thread " << t->id() << " NT thread id " << t->nt_id443<< endl);444445if (!TlsSetValue(self_tls_index, (LPVOID)t))446throw omni_thread_fatal(GetLastError());447448if (!SetThreadPriority(t->handle, nt_priority(PRIORITY_NORMAL)))449throw omni_thread_fatal(GetLastError());450}451452//453// Wrapper for thread creation.454//455456extern "C"457unsigned __stdcall458omni_thread_wrapper(void* ptr)459{460omni_thread* me = (omni_thread*)ptr;461462DB(cerr << "omni_thread_wrapper: thread " << me->id()463<< " started\n");464465TlsSetValue(self_tls_index, (LPVOID)me);466//if (!TlsSetValue(self_tls_index, (LPVOID)me))467// throw omni_thread_fatal(GetLastError());468469//470// Now invoke the thread function with the given argument.471//472473if (me->fn_void != NULL) {474(*me->fn_void)(me->thread_arg);475omni_thread::exit();476}477478if (me->fn_ret != NULL) {479void* return_value = (*me->fn_ret)(me->thread_arg);480omni_thread::exit(return_value);481}482483if (me->detached) {484me->run(me->thread_arg);485omni_thread::exit();486} else {487void* return_value = me->run_undetached(me->thread_arg);488omni_thread::exit(return_value);489}490491// should never get here.492return 0;493}494495496//497// Constructors for omni_thread - set up the thread object but don't498// start it running.499//500501// construct a detached thread running a given function.502503omni_thread::omni_thread(void (*fn)(void*), void* arg, priority_t pri)504{505common_constructor(arg, pri, 1);506fn_void = fn;507fn_ret = NULL;508}509510// construct an undetached thread running a given function.511512omni_thread::omni_thread(void* (*fn)(void*), void* arg, priority_t pri)513{514common_constructor(arg, pri, 0);515fn_void = NULL;516fn_ret = fn;517}518519// construct a thread which will run either run() or run_undetached().520521omni_thread::omni_thread(void* arg, priority_t pri)522{523common_constructor(arg, pri, 1);524fn_void = NULL;525fn_ret = NULL;526}527528// common part of all constructors.529530void531omni_thread::common_constructor(void* arg, priority_t pri, int det)532{533_state = STATE_NEW;534_priority = pri;535536next_id_mutex->lock();537_id = next_id++;538next_id_mutex->unlock();539540thread_arg = arg;541detached = det; // may be altered in start_undetached()542543cond_semaphore = CreateSemaphore(NULL, 0, SEMAPHORE_MAX, NULL);544545if (cond_semaphore == NULL)546throw omni_thread_fatal(GetLastError());547548cond_next = cond_prev = NULL;549cond_waiting = FALSE;550551handle = NULL;552}553554555//556// Destructor for omni_thread.557//558559omni_thread::~omni_thread(void)560{561DB(cerr << "destructor called for thread " << id() << endl);562if ((handle != NULL) && !CloseHandle(handle))563throw omni_thread_fatal(GetLastError());564if (!CloseHandle(cond_semaphore))565throw omni_thread_fatal(GetLastError());566}567568569//570// Start the thread571//572573void574omni_thread::start(void)575{576omni_mutex_lock l(mutex);577578if (_state != STATE_NEW)579throw omni_thread_invalid();580581unsigned int t;582handle = (HANDLE)_beginthreadex(583NULL,5840,585omni_thread_wrapper,586(LPVOID)this,587CREATE_SUSPENDED,588&t);589nt_id = t;590if (handle == NULL)591throw omni_thread_fatal(GetLastError());592593if (!SetThreadPriority(handle, _priority))594throw omni_thread_fatal(GetLastError());595596if (ResumeThread(handle) == 0xffffffff)597throw omni_thread_fatal(GetLastError());598599_state = STATE_RUNNING;600}601602603//604// Start a thread which will run the member function run_undetached().605//606607void608omni_thread::start_undetached(void)609{610if ((fn_void != NULL) || (fn_ret != NULL))611throw omni_thread_invalid();612613detached = 0;614start();615}616617618//619// join - simply check error conditions & call WaitForSingleObject.620//621622void623omni_thread::join(void** status)624{625mutex.lock();626627if ((_state != STATE_RUNNING) && (_state != STATE_TERMINATED)) {628mutex.unlock();629throw omni_thread_invalid();630}631632mutex.unlock();633634if (this == self())635throw omni_thread_invalid();636637if (detached)638throw omni_thread_invalid();639640DB(cerr << "omni_thread::join: doing WaitForSingleObject\n");641642if (WaitForSingleObject(handle, INFINITE) != WAIT_OBJECT_0)643throw omni_thread_fatal(GetLastError());644645DB(cerr << "omni_thread::join: WaitForSingleObject succeeded\n");646647if (status)648*status = return_val;649650delete this;651}652653654//655// Change this thread's priority.656//657658void659omni_thread::set_priority(priority_t pri)660{661omni_mutex_lock l(mutex);662663if (_state != STATE_RUNNING)664throw omni_thread_invalid();665666_priority = pri;667668if (!SetThreadPriority(handle, nt_priority(pri)))669throw omni_thread_fatal(GetLastError());670}671672673//674// create - construct a new thread object and start it running. Returns thread675// object if successful, null pointer if not.676//677678// detached version679680omni_thread*681omni_thread::create(void (*fn)(void*), void* arg, priority_t pri)682{683omni_thread* t = new omni_thread(fn, arg, pri);684t->start();685return t;686}687688// undetached version689690omni_thread*691omni_thread::create(void* (*fn)(void*), void* arg, priority_t pri)692{693omni_thread* t = new omni_thread(fn, arg, pri);694t->start();695return t;696}697698699//700// exit() _must_ lock the mutex even in the case of a detached thread. This is701// because a thread may run to completion before the thread that created it has702// had a chance to get out of start(). By locking the mutex we ensure that the703// creating thread must have reached the end of start() before we delete the704// thread object. Of course, once the call to start() returns, the user can705// still incorrectly refer to the thread object, but that's their problem.706//707708void709omni_thread::exit(void* return_value)710{711omni_thread* me = self();712713if (me)714{715me->mutex.lock();716717me->_state = STATE_TERMINATED;718719me->mutex.unlock();720721DB(cerr << "omni_thread::exit: thread " << me->id() << " detached "722<< me->detached << " return value " << return_value << endl);723724if (me->detached) {725delete me;726} else {727me->return_val = return_value;728}729}730else731{732DB(cerr << "omni_thread::exit: called with a non-omnithread. Exit quietly." << endl);733}734// _endthreadex() does not automatically closes the thread handle.735// The omni_thread dtor closes the thread handle.736_endthreadex(0);737}738739740omni_thread*741omni_thread::self(void)742{743LPVOID me;744745me = TlsGetValue(self_tls_index);746747if (me == NULL) {748DB(cerr << "omni_thread::self: called with a non-ominthread. NULL is returned." << endl);749}750return (omni_thread*)me;751}752753754void755omni_thread::yield(void)756{757Sleep(0);758}759760761#define MAX_SLEEP_SECONDS (DWORD)4294966 // (2**32-2)/1000762763void764omni_thread::sleep(unsigned long secs, unsigned long nanosecs)765{766if (secs <= MAX_SLEEP_SECONDS) {767Sleep(secs * 1000 + nanosecs / 1000000);768return;769}770771DWORD no_of_max_sleeps = secs / MAX_SLEEP_SECONDS;772773for (DWORD i = 0; i < no_of_max_sleeps; i++)774Sleep(MAX_SLEEP_SECONDS * 1000);775776Sleep((secs % MAX_SLEEP_SECONDS) * 1000 + nanosecs / 1000000);777}778779780void781omni_thread::get_time(unsigned long* abs_sec, unsigned long* abs_nsec,782unsigned long rel_sec, unsigned long rel_nsec)783{784get_time_now(abs_sec, abs_nsec);785*abs_nsec += rel_nsec;786*abs_sec += rel_sec + *abs_nsec / 1000000000;787*abs_nsec = *abs_nsec % 1000000000;788}789790791int792omni_thread::nt_priority(priority_t pri)793{794switch (pri) {795796case PRIORITY_LOW:797return THREAD_PRIORITY_LOWEST;798799case PRIORITY_NORMAL:800return THREAD_PRIORITY_NORMAL;801802case PRIORITY_HIGH:803return THREAD_PRIORITY_HIGHEST;804}805806throw omni_thread_invalid();807return 0; /* keep msvc++ happy */808}809810811static void812get_time_now(unsigned long* abs_sec, unsigned long* abs_nsec)813{814static int days_in_preceding_months[12]815= { 0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334 };816static int days_in_preceding_months_leap[12]817= { 0, 31, 60, 91, 121, 152, 182, 213, 244, 274, 305, 335 };818819SYSTEMTIME st;820821GetSystemTime(&st);822*abs_nsec = st.wMilliseconds * 1000000;823824// this formula should work until 1st March 2100825826DWORD days = ((st.wYear - 1970) * 365 + (st.wYear - 1969) / 4827+ ((st.wYear % 4)828? days_in_preceding_months[st.wMonth - 1]829: days_in_preceding_months_leap[st.wMonth - 1])830+ st.wDay - 1);831832*abs_sec = st.wSecond + 60 * (st.wMinute + 60 * (st.wHour + 24 * days));833}834835836