/*
	gish/misc/mproc.c

	Copyright (C) 1995 Warren R. Gish
	All rights reserved.
*/
#include <ncbi.h>
#include <gishlib.h>
#ifdef MPROC_AVAIL
#include <sys/sysinfo.h>
#include <sys/resource.h>
#endif

/* set the stack size per thread, for systems that provide control over it */
#define STACKSIZE_DEFAULT (1024*KBYTE)

static volatile int	i_am_multithreading;
static volatile int	lock_count;
static sigset_t	critmask;

static int		proc_max = THREADS_MAX,
				proc_reserve = PROC_RESERVE,
				nchunks = NCHUNKS;

typedef struct {
			void		(LIBCALL *func)();
			TaskBlkPtr	rp, tp;
			int			PNTR user_nprocs;
			void		(*sigterm)();
			int			id;
		} WildBlk, PNTR WildBlkPtr;

#ifdef MPROC_AVAIL
#ifdef OS_UNIX_IRIX
static volatile int	lockID = -1;

int
mproc_id()
{
	if (i_am_multithreading)
		return (int)m_get_myid();
	return 0;
}

void
mproc_lock()
{
	int	myid;
	sigset_t	omask;

	if (i_am_multithreading) {
		sigprocmask(SIG_BLOCK, &critmask, &omask);
		if ((myid = m_get_myid()) != lockID) {
			m_lock();
			lockID = myid;
			lock_count = 0; /* insurance */
		}
		++lock_count;
		sigprocmask(SIG_SETMASK, &omask, NULL);
	}
}

void
mproc_unlock()
{
	sigset_t	omask;

	/* it should be an error to call mproc_unlock() if mproc_lock() wasn't
	previously called  by the same thread */
	if (i_am_multithreading) {
		sigprocmask(SIG_BLOCK, &critmask, &omask);
		if (lockID == m_get_myid() && --lock_count <= 0) {
			lockID = -1;
			m_unlock();
		}
		sigprocmask(SIG_SETMASK, &omask, NULL);
	}
}

int
mproc_next(tp)
	TaskBlkPtr	tp;
{
	int	retcode;

	mproc_lock();
	retcode = tp->root->next_cnt++;
	mproc_unlock();
	return retcode;
}

void LIBCALL
mproc_sync(TaskBlkPtr tp)
{
	if (!i_am_multithreading)
		return;

	/*
	Two calls to m_sync() here match the pair of calls in wild_init,
	eliminating a potential race condition
	*/
	m_sync();
	m_sync();
}
#endif /* OS_UNIX_IRIX */

#ifdef SOLARIS_THREADS_AVAIL
static mutex_t	lock_mutex;
static thread_t	lockID, reaperID;

int
mproc_id()
{
	if (i_am_multithreading)
		return (int)thr_self();
	return 0;
}

void
mproc_lock()
{
	thread_t	myid;
	sigset_t	omask;

	if (i_am_multithreading) {
		thr_sigsetmask(SIG_BLOCK, &critmask, &omask);
		myid = thr_self();
		if (myid != lockID) {
			if (myid != reaperID)
				mutex_lock(&lock_mutex);
			lockID = myid;
			lock_count = 0; /* insurance */
		}
		++lock_count;
		thr_sigsetmask(SIG_SETMASK, &omask, NULL);
	}
}

void
mproc_unlock()
{
	thread_t	myid;
	sigset_t	omask;

	if (i_am_multithreading) {
		thr_sigsetmask(SIG_BLOCK, &critmask, &omask);
		myid = thr_self();
		if (myid == lockID && --lock_count <= 0) {
			lockID = 0;
			if (myid != reaperID)
				mutex_unlock(&lock_mutex);
		}
		thr_sigsetmask(SIG_SETMASK, &omask, NULL);
	}
}

int
mproc_next(tp)
	TaskBlkPtr	tp;
{
	int		retcode;

	if (i_am_multithreading)
		mutex_lock(&tp->root->next_mutex);
	retcode = tp->root->next_cnt++;
	if (i_am_multithreading)
		mutex_unlock(&tp->root->next_mutex);
	return retcode;
}

/*
	mproc_sync -- synchronize threads
*/
void LIBCALL
mproc_sync(tp)
	TaskBlkPtr	tp;
{
	TaskBlkPtr	rp;

	if (!i_am_multithreading)
		return;

	rp = tp->root;

	mutex_lock(&rp->sync_mutex);

	/* wait for recovery from a previous sync to complete */
	while (rp->sync_flag) {
		cond_wait(&rp->sync_cond, &rp->sync_mutex);
	}

	/* enter the new sync */
	++rp->sync_cnt;
	while (rp->nprocs > rp->sync_cnt && !rp->sync_flag) {
		cond_wait(&rp->sync_cond, &rp->sync_mutex);
	}

	/*
	Start recovery:
	The first thread that discovers all threads are synched will execute the
	following, but only if there is at least one other thread to be awakened.
	*/
	if (--rp->sync_cnt > 0 && !rp->sync_flag) {
		rp->sync_flag = TRUE;
		cond_broadcast(&rp->sync_cond);
	}

	/*
	The last thread out awakens any others that may be waiting
	for recovery to complete above
	*/
	if (rp->sync_cnt == 0 && rp->nprocs > 1) {
		rp->sync_flag = FALSE;
		cond_broadcast(&rp->sync_cond);
	}
	mutex_unlock(&rp->sync_mutex);
	return;
}
#endif /* SOLARIS_THREADS_AVAIL */

#ifdef DCE_THREADS_AVAIL
static pthread_mutex_t	lock_mutex;
static pthread_t	lockID, reaperID, nullID;
static pthread_t	threadlist[THREADS_MAX];

int
mproc_id()
{
	if (i_am_multithreading) {
		pthread_t	myid;
		int	i;

		myid = pthread_self();
		for (i = 0; i < DIM(threadlist); ++i)
			if (pthread_equal(myid, threadlist[i]))
				return i+1;
		return 0;
	}
	return 0;
}

void
mproc_lock()
{
	pthread_t	myid;
	sigset_t	omask;

	if (i_am_multithreading) {
		sigprocmask(SIG_BLOCK, &critmask, &omask);
		myid = pthread_self();
		if (!pthread_equal(myid, lockID)) {
			if (!pthread_equal(myid, reaperID)) {
				pthread_mutex_lock(&lock_mutex);
				lockID = myid;
				lock_count = 0; /* insurance */
			}
		}
		++lock_count;
		sigprocmask(SIG_SETMASK, &omask, NULL);
	}
}

void
mproc_unlock()
{
	pthread_t	myid;
	sigset_t	omask;

	if (i_am_multithreading) {
		sigprocmask(SIG_BLOCK, &critmask, &omask);
		myid = pthread_self();
		if (pthread_equal(myid, lockID) && --lock_count <= 0) {
			lockID = nullID;
			if (!pthread_equal(myid, reaperID))
				pthread_mutex_unlock(&lock_mutex);
		}
		sigprocmask(SIG_SETMASK, &omask, NULL);
	}
}


int
mproc_next(tp)
	TaskBlkPtr	tp;
{
	int		retcode;

	if (i_am_multithreading)
		pthread_mutex_lock(&tp->root->next_mutex);
	retcode = tp->root->next_cnt++;
	if (i_am_multithreading)
		pthread_mutex_unlock(&tp->root->next_mutex);
	return retcode;
}

/*
	mproc_sync -- synchronize threads
*/
void LIBCALL
mproc_sync(tp)
	TaskBlkPtr	tp;
{
	TaskBlkPtr	rp;

	if (!i_am_multithreading)
		return;

	rp = tp->root;

	pthread_mutex_lock(&rp->sync_mutex);

	/* wait for recovery from a previous sync to complete */
	while (rp->sync_flag) {
		pthread_cond_wait(&rp->sync_cond, &rp->sync_mutex);
	}

	/* enter the new sync */
	++rp->sync_cnt;
	while (rp->nprocs > rp->sync_cnt && !rp->sync_flag) {
		pthread_cond_wait(&rp->sync_cond, &rp->sync_mutex);
	}

	/*
	Start recovery:
	The first thread that discovers all threads are synched will execute the
	following, but only if there is at least one other thread to be awakened.
	*/
	if (--rp->sync_cnt > 0 && !rp->sync_flag) {
		rp->sync_flag = TRUE;
		pthread_cond_broadcast(&rp->sync_cond);
	}

	/*
	The last thread out awakens any others that may be waiting
	for recovery to complete above
	*/
	if (rp->sync_cnt == 0 && rp->nprocs > 1) {
		rp->sync_flag = FALSE;
		pthread_cond_broadcast(&rp->sync_cond);
	}
	pthread_mutex_unlock(&rp->sync_mutex);
	return;
}
#endif /* DCE_THREADS_AVAIL */

#else /* !MPROC_AVAIL */

#ifdef mproc_lock
#undef mproc_lock
#endif
#ifdef mproc_unlock
#undef mproc_unlock
#endif
#ifdef mproc_sync
#undef mproc_sync
#endif

int
mproc_id()
{
	return 0;
}

void
mproc_lock()
{
}

void
mproc_unlock()
{
}

int LIBCALL
mproc_next(tp)
	TaskBlkPtr	tp;
{
	return tp->root->next_cnt++;
}

void LIBCALL
mproc_sync(tp)
	TaskBlkPtr	tp;
{
	return;
}

#endif /* !MPROC_AVAIL */


static void
_mticknext(tp, n)
	TaskBlkPtr	tp;
	unsigned long	n;
{
	if (n >= *tp->gtick_cnt && *tp->gtick_out < tp->nticks) {
		mproc_lock();
		while (n > *tp->gtick_cnt && *tp->gtick_out < tp->nticks) {
			*(tp->gtick_cnt) += tp->tick_incr;
			++*(tp->gtick_out);
			if (tp->tickproc != NULL)
				(*tp->tickproc)(tp);
		}
		mproc_unlock();
	}
}

static long
_mtasknext(tp)
	TaskBlkPtr	tp;
{
	if (tp->task_cur < 0)
		return LONG_MIN;
	if (++tp->task_cur >= tp->task_max) {
		tp->task_cur = mproc_next(tp) * tp->chunksize;
		if (tp->task_cur >= tp->ntasks)
			return tp->task_cur = LONG_MIN;
		tp->task_max = tp->task_cur + tp->chunksize;
		tp->task_max = MIN(tp->task_max, tp->ntasks);
	}
	if (tp->tickproc == NULL)
		return tp->task_cur;
	_mticknext(tp, tp->task_cur);
	return tp->task_cur;
}


static void
wild_init(wbp)
	WildBlkPtr	wbp;
{
	void	(LIBCALL *func)();
	TaskBlkPtr	tp, rp;
	int		PNTR user_nprocs;
#ifdef MPROC_AVAIL
#ifdef SGI_MPROC_AVAIL
	Boolean	completely_done = 0;
	struct rlimit	rl;
	unsigned long	cputime = RLIM_INFINITY;
	int		id;

	id = m_get_myid();
	if (id == 0)
		sys_signal(SIGTERM, wbp->sigterm);
	else
		sys_signal(SIGTERM, SIG_DFL);
#endif
#endif

#ifdef DCE_THREADS_AVAIL
	/* Synchronize with master thread */
	pthread_mutex_lock(&lock_mutex);
	pthread_mutex_unlock(&lock_mutex);
#endif

	func = wbp->func;
	tp = wbp->tp;
	rp = tp->root;
	user_nprocs = wbp->user_nprocs;

#ifdef MPROC_AVAIL
	tp->tasknext = _mtasknext;
#ifdef SGI_MPROC_AVAIL
	tp = TaskLink(tp, id);
	rp->tid[id] = getpid();
	m_sync();
	if (getrlimit(RLIMIT_CPU, &rl) != -1 && rl.rlim_cur != RLIM_INFINITY) {
		cputime = rl.rlim_cur;
		rl.rlim_cur = cputime / tp->proc_max + 1;
		setrlimit(RLIMIT_CPU, &rl);
	}
#endif /* SGI_MPROC_AVAIL */
#if defined(SOLARIS_THREADS_AVAIL) || defined(DCE_THREADS_AVAIL)
	sys_signal(SIGTERM, wbp->sigterm);
#endif
#endif /* MPROC_AVAIL */

	mproc_lock();
	TaskInit(tp, nchunks);
	*user_nprocs++;
	mproc_unlock();

	(*func)(tp); /* the work desired by the user is performed here */

	mproc_lock();
	*user_nprocs--;
	TaskUnlink(tp);
#ifdef SOLARIS_THREADS_AVAIL
	cond_signal(&rp->parent_cond);
#endif
#ifdef DCE_THREADS_AVAIL
	pthread_cond_signal(&rp->parent_cond);
#endif
	mproc_unlock();

#ifdef SOLARIS_THREADS_AVAIL
	/* make sure any synchronizers know we've exited the thread pool */
	mutex_lock(&rp->sync_mutex);
	cond_broadcast(&rp->sync_cond);
	mutex_unlock(&rp->sync_mutex);
#endif
#ifdef DCE_THREADS_AVAIL
	/* make sure any synchronizers know we've exited the thread pool */
	pthread_mutex_lock(&rp->sync_mutex);
	pthread_cond_broadcast(&rp->sync_cond);
	pthread_mutex_unlock(&rp->sync_mutex);
#endif

#ifdef MPROC_AVAIL
#ifdef SGI_MPROC_AVAIL
	if (cputime != RLIM_INFINITY) {
		rl.rlim_cur = cputime;
		setrlimit(RLIMIT_CPU, &rl);
	}
#endif

#ifdef SGI_MPROC_AVAIL
	while (!completely_done) {
		m_sync();
		completely_done = (rp->nprocs == 0);
		m_sync();
	}
#endif /* SGI_MPROC_AVAIL */

#endif /* MPROC_AVAIL */
}


TaskBlkPtr LIBCALL
mrun_wild(n, func, userp, globtp, tickproc, nticks, numprocs, user_nprocs, sigterm)
	unsigned long	n; /* Number of discrete subtasks */
	void	(LIBCALL *func)(TaskBlkPtr);
	Nlm_VoidPtr	userp;
	TaskBlkPtr	PNTR globtp;
	void	(LIBCALL *tickproc)();
	unsigned long	nticks;
	int		numprocs;
	int		PNTR user_nprocs;
	void	(LIBCALL *sigterm)();
{
	TaskBlkPtr	tp;

#ifdef MPROC_AVAIL
#ifdef SGI_MPROC_AVAIL
	WildBlk	wb;
	int		maxprocs, maxpprocs;

	if (n == 0)
		return NULL;

	sigaddset(&critmask, SIGALRM);
	sigaddset(&critmask, SIGCLD);

	maxprocs = prctl(PR_MAXPROCS); /* max. user processes */
	maxpprocs = prctl(PR_MAXPPROCS); /* max. parallel processes */
	numprocs = MAX(numprocs, 1);
	numprocs = MIN(numprocs, proc_max);
	numprocs = MIN(numprocs, maxprocs);
	numprocs = MIN(numprocs, maxpprocs - proc_reserve);
	numprocs = MAX(1, numprocs);
	numprocs = MIN(n, numprocs);
	m_set_procs(numprocs);

	(void) prctl(PR_SETEXITSIG, SIGTERM); /* signal SIGTERM if a child exits */

	tp = TaskNew(n, userp, tickproc, nticks);
	if (globtp != NULL)
		*globtp = tp;
	if (tp != NULL)
		tp->ticknext = _mticknext;
	wb.id = 0;
	wb.func = func;
	wb.rp = wb.tp = tp;
	wb.user_nprocs = user_nprocs;
	wb.sigterm = sigterm;

	*user_nprocs = 0;
	if (numprocs > 1)
		++i_am_multithreading;
	if (m_fork(wild_init, &wb) != 0)
		return NULL;
	m_kill_procs();
	if (numprocs > 1)
		if (--i_am_multithreading == 0)
			lock_count = 0;
	sys_signal(SIGTERM, SIG_DFL);
	return tp;
#endif /* SGI_MPROC_AVAIL */
#ifdef SOLARIS_THREADS_AVAIL
	WildBlk	wb[THREADS_MAX];
	sigset_t	omask;
	int		i, j;
	int		maxprocs;
	size_t	stack_size;
	static int	once;

	if (n == 0)
		return NULL;

	tp = TaskNew(n, userp, tickproc, nticks);
	if (globtp != NULL)
		*globtp = tp;
	if (tp != NULL)
		tp->ticknext = _mticknext;

	numprocs = MAX(numprocs, 1);
	numprocs = MIN(numprocs, proc_max);
	numprocs = MIN(n, numprocs);
	maxprocs = sysconf(_SC_NPROCESSORS_ONLN);
	maxprocs = MIN(numprocs, maxprocs - proc_reserve);
	maxprocs = MIN(maxprocs, DIM(wb));
	Nlm_MemSet((VoidPtr)wb, 0, maxprocs * sizeof(wb[0]));
	if (!once) {
		once = 1;
		mutex_init(&lock_mutex, USYNC_THREAD, NULL);
	}
	mutex_init(&tp->next_mutex, USYNC_THREAD, NULL);
	mutex_init(&tp->sync_mutex, USYNC_THREAD, NULL);
	cond_init(&tp->parent_cond, USYNC_THREAD, 0);
	cond_init(&tp->sync_cond, USYNC_THREAD, 0);

	stack_size = thr_min_stack();
	stack_size = MAX(stack_size, STACKSIZE_DEFAULT);

	sigaddset(&critmask, SIGALRM);
	sigaddset(&critmask, SIGCLD);
	thr_sigsetmask(SIG_BLOCK, &critmask, &omask);
	*user_nprocs = 0;
	for (numprocs = 0; numprocs < maxprocs; ++numprocs) {
		if (numprocs == 0) { /* the first thread is the current thread */
			tp->tid[numprocs] = thr_self();
		}
		else {
			if (thr_create(NULL, stack_size, (void *(*)(void *))wild_init,
					&wb[numprocs], THR_SUSPENDED|THR_NEW_LWP, &tp->tid[numprocs]) != 0) {
				tp->tid[numprocs] = 0;
				break;
			}
		}
		wb[numprocs].rp = tp;
		wb[numprocs].tp = TaskLink(tp, numprocs);
		wb[numprocs].user_nprocs = user_nprocs;
		wb[numprocs].id = numprocs;
		wb[numprocs].func = func;
		wb[numprocs].sigterm = sigterm;
	}

	if (numprocs > 1)
		++i_am_multithreading;
	/* start the created threads executing... */
	for (i = 1; i < numprocs; ++i) {
		thr_continue(tp->tid[i]);
	}
	thr_sigsetmask(SIG_SETMASK, &omask, NULL);

	/* ...and proceed to do the same thing in this thread */
	wild_init(&wb[0]);

	thr_sigsetmask(SIG_BLOCK, &critmask, &omask);
	mutex_lock(&lock_mutex);
	reaperID = thr_self();
	/* allow interrupts while waiting for other LWPs to finish */
	thr_sigsetmask(SIG_SETMASK, &omask, NULL);
	while (tp->nprocs > 0) {
		cond_wait(&tp->parent_cond, &lock_mutex);
	}
	thr_sigsetmask(SIG_BLOCK, &critmask, &omask);
	reaperID = 0;
	mutex_unlock(&lock_mutex);

	/* thr_join() doesn't like to be interrupted */
	for (i = 1; i < numprocs; ++i) {
		thr_join(tp->tid[i], NULL, NULL);
	}

	if (numprocs > 1)
		if (--i_am_multithreading == 0)
			lock_count = 0;

	thr_sigsetmask(SIG_SETMASK, &omask, NULL);

	return tp;
#endif /* SOLARIS_THREADS_AVAIL */
#ifdef DCE_THREADS_AVAIL
	WildBlk	wb[THREADS_MAX];
	pthread_attr_t	thratt;
	pthread_addr_t	status;
	int		i;
	int		maxprocs, start;
	sigset_t	omask;
	static int	once;

	if (n == 0)
		return NULL;

	tp = TaskNew(n, userp, tickproc, nticks);
	if (globtp != NULL)
		*globtp = tp;
	if (tp != NULL)
		tp->ticknext = _mticknext;

	numprocs = MAX(numprocs, 1);
	numprocs = MIN(numprocs, proc_max);
	numprocs = MIN(n, numprocs);
#ifdef _SC_NPROCESSORS_ONLN
	maxprocs = sysconf(_SC_NPROCESSORS_ONLN);
#else
#ifdef GSI_CPUS_IN_BOX
	start = 0;
	getsysinfo(GSI_CPUS_IN_BOX, &maxprocs, sizeof(maxprocs), &start, NULL);
	maxprocs = MAX(maxprocs, 1);
#else
	maxprocs = 1;
#endif
#endif
	maxprocs = MIN(numprocs, maxprocs - proc_reserve);
	maxprocs = MIN(maxprocs, DIM(wb));
	Nlm_MemSet((VoidPtr)wb, 0, maxprocs * sizeof(wb[0]));
	if (!once) {
		once = 1;
		pthread_mutex_init(&lock_mutex, pthread_mutexattr_default);
	}
	pthread_mutex_init(&tp->next_mutex, pthread_mutexattr_default);
	pthread_mutex_init(&tp->sync_mutex, pthread_mutexattr_default);
	pthread_cond_init(&tp->parent_cond, pthread_condattr_default);
	pthread_cond_init(&tp->sync_cond, pthread_condattr_default);

	pthread_attr_create(&thratt);
	pthread_attr_setstacksize(&thratt, STACKSIZE_DEFAULT);
	pthread_attr_setsched(&thratt,  SCHED_RR);
	sigaddset(&critmask, SIGALRM);
	sigaddset(&critmask, SIGCLD);
	sigprocmask(SIG_BLOCK, &critmask, &omask);
	pthread_mutex_lock(&lock_mutex); /* suspend spawned threads until ready */
	*user_nprocs = 0;
	for (numprocs = 0; numprocs < maxprocs; ++numprocs) {
		if (numprocs == 0) {
			tp->tid[numprocs] = pthread_self();
		}
		else
			if (pthread_create(&tp->tid[numprocs], thratt,
					(void *(*)(void *))wild_init, &wb[numprocs]) == -1) {
				break;
			}
		pthread_setscheduler(tp->tid[numprocs], SCHED_RR, PRI_RR_MAX);
		threadlist[numprocs] = tp->tid[numprocs];
		wb[numprocs].tp = TaskLink(tp, numprocs);
		wb[numprocs].rp = tp;
		wb[numprocs].user_nprocs = user_nprocs;
		wb[numprocs].id = numprocs;
		wb[numprocs].func = func;
		wb[numprocs].sigterm = sigterm;
	}
	if (numprocs > 1)
		++i_am_multithreading;
	/* resume/release the spawned threads */
	pthread_mutex_unlock(&lock_mutex);
	sigprocmask(SIG_SETMASK, &omask, NULL);

	/* ...and proceed to do the same thing in this thread */
	wild_init(&wb[0]);

	sigprocmask(SIG_BLOCK, &critmask, &omask);
	pthread_mutex_lock(&lock_mutex);
	reaperID = pthread_self();
	/* allow interrupts while waiting for other threads to finish */
	sigprocmask(SIG_SETMASK, &omask, NULL);
	while (tp->nprocs > 0) {
		pthread_cond_wait(&tp->parent_cond, &lock_mutex);
	}
	sigprocmask(SIG_BLOCK, &critmask, &omask);
	reaperID = nullID;
	pthread_mutex_unlock(&lock_mutex);

	for (i = 1; i < numprocs; ++i) {
		pthread_join(tp->tid[i], &status);
		threadlist[i] = nullID;
	}
	if (numprocs > 1)
		if (--i_am_multithreading == 0)
			lock_count = 0;

	sigprocmask(SIG_SETMASK, &omask, NULL);

	return tp;
#endif

#else /* !MPROC_AVAIL */

	if (n == 0)
		return 0;

	tp = TaskNew(n, userp, tickproc, nticks);
	if (globtp != NULL)
		*globtp = tp;
	sys_signal(SIGTERM, sigterm);
	*user_nprocs = 1;
	(*func)(tp);
	*user_nprocs = 0;
	return tp;

#endif /* !MPROC_AVAIL */
}
