/* ARISA - POSIXs threads linkage * Copyright (C) 2004 Carl Ritson * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA */ #define _GNU_SOURCE #define _REENTRANT #define _THREAD_SAFE #include #include #include #include #include #include #include #include #include #include #include #include #include "../config.h" #ifdef HAVE_SCHED_H #include #endif #include "threading.h" static pthread_key_t thread_key; static pthread_once_t thread_key_once = PTHREAD_ONCE_INIT; void thread_init(thread_t *t) { t->end = 0; pthread_cond_init(&(t->signal),NULL); pthread_mutex_init(&(t->rlock),NULL); pthread_mutex_init(&(t->alock),NULL); } void thread_free(thread_t *t) { int ret; thread_join(t,NULL); ret = pthread_cond_destroy(&(t->signal)); assert(ret == 0); ret = pthread_mutex_destroy(&(t->rlock)); assert(ret == 0); ret = pthread_mutex_destroy(&(t->alock)); assert(ret == 0); } int thread_create(thread_t *t, void *(*code)(void *), void *arg) { int ret; if(t->end != 0) return -1; pthread_mutex_lock(&(t->rlock)); ret = pthread_create(&(t->thread),NULL,code,arg); pthread_mutex_unlock(&(t->rlock)); return ret; } int thread_join(thread_t *t, void **ret_val) { int ret; if(t->end != 0) { ret = pthread_join(t->thread,ret_val); if(ret == 0) t->end = 0; } else { ret = -1; } return ret; } int thread_wake_up(thread_t *t) { if(thread_is_running(t)) { if(pthread_mutex_trylock(&(t->alock)) == 0) { pthread_cond_broadcast(&(t->signal)); pthread_mutex_unlock(&(t->alock)); return 0; } else { // ? } } return -1; } int thread_end(thread_t *t) { if(thread_is_running(t)) { t->end = 1; if(pthread_mutex_trylock(&(t->alock)) == 0) { pthread_cond_broadcast(&(t->signal)); pthread_mutex_unlock(&(t->alock)); } return 0; } else return -1; } int thread_is_running(thread_t *t) { if(pthread_mutex_trylock(&(t->rlock)) == EBUSY) { return 1; } else { pthread_mutex_unlock(&(t->rlock)); } return 0; } static void thread_key_alloc(void) { pthread_key_create(&thread_key,NULL); } void thread_signal_started(thread_t *t) { pthread_once(&thread_key_once,thread_key_alloc); pthread_setspecific(thread_key,(void *)t); pthread_mutex_lock(&(t->rlock)); pthread_mutex_lock(&(t->alock)); } void thread_signal_finished(void) { thread_t *t = (thread_t *)pthread_getspecific(thread_key); if(t != NULL) { pthread_setspecific(thread_key,NULL); pthread_mutex_unlock(&(t->rlock)); pthread_mutex_unlock(&(t->alock)); } } int thread_should_end(void) { thread_t *t = (thread_t *)pthread_getspecific(thread_key); if(t != NULL) { if(t->end != 0) return 1; else return 0; } else return 0; } int thread_sleep(unsigned long sec) { thread_t *t = (thread_t *) pthread_getspecific(thread_key); if(t != NULL) { struct timeval now; struct timespec timeout; int ret; gettimeofday(&now,NULL); timeout.tv_sec = now.tv_sec + sec; timeout.tv_nsec = now.tv_usec * 1000; if(sec > 0) ret = pthread_cond_timedwait(&(t->signal), &(t->alock),&timeout); else ret = pthread_cond_wait(&(t->signal),&(t->alock)); if(ret == 0) { return 1; } else { if(errno == EINTR) return -1; else return 0; } } else { if(sec > 0) sleep(sec); else sched_yield(); return 0; } } int interruptable_connect(int sockfd, const struct sockaddr *serv_addr, socklen_t addrlen) { thread_t *t = (thread_t *) pthread_getspecific(thread_key); int err = ENOTSUP, ret = -1; if(t != NULL) { struct pollfd pfd; int fl = fcntl(sockfd,F_GETFL); // alter flow control if required (need non-blocking io) if(!(fl & O_NONBLOCK)) fcntl(sockfd,F_SETFL,fl | O_NONBLOCK); ret = connect(sockfd,serv_addr,addrlen); if(ret == -1 && errno != EINPROGRESS) return -1; pfd.fd = sockfd; pfd.events = POLLIN | POLLOUT; pfd.revents = 0; while((ret = poll(&pfd,1,1000)) == 0) { if(t->end != 0) { errno = EINTR; ret = -1; break; } } if(ret == 1 && (pfd.revents & (POLLERR|POLLHUP|POLLNVAL))) { socklen_t len = sizeof(err); getsockopt(sockfd,SOL_SOCKET,SO_ERROR,&err,&len); ret = -1; } else if(ret == 1) { err = 0; ret = 0; } else err = errno; // restore flow original flow control if(!(fl & O_NONBLOCK)) fcntl(sockfd,F_SETFL,fl); } errno = err; return ret; }