src/scheduler.c

00001 /*
00002   The oRTP library is an RTP (Realtime Transport Protocol - rfc1889) stack.
00003   Copyright (C) 2001  Simon MORLAT simon.morlat@linphone.org
00004 
00005   This library is free software; you can redistribute it and/or
00006   modify it under the terms of the GNU Lesser General Public
00007   License as published by the Free Software Foundation; either
00008   version 2.1 of the License, or (at your option) any later version.
00009 
00010   This library is distributed in the hope that it will be useful,
00011   but WITHOUT ANY WARRANTY; without even the implied warranty of
00012   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013   Lesser General Public License for more details.
00014 
00015   You should have received a copy of the GNU Lesser General Public
00016   License along with this library; if not, write to the Free Software
00017   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00018 */
00019 
00020 #include <ortp/ortp.h>
00021 #include "utils.h"
00022 #include "scheduler.h"
00023 #include "rtpsession_priv.h"
00024 
00025 // To avoid warning during compile
00026 extern void rtp_session_process (RtpSession * session, uint32_t time, RtpScheduler *sched);
00027 
00028 
00029 void rtp_scheduler_init(RtpScheduler *sched)
00030 {
00031         sched->list=0;
00032         sched->time_=0;
00033         /* default to the posix timer */
00034         rtp_scheduler_set_timer(sched,&posix_timer);
00035         ortp_mutex_init(&sched->lock,NULL);
00036         ortp_cond_init(&sched->unblock_select_cond,NULL);
00037         sched->max_sessions=sizeof(SessionSet)*8;
00038         session_set_init(&sched->all_sessions);
00039         sched->all_max=0;
00040         session_set_init(&sched->r_sessions);
00041         sched->r_max=0;
00042         session_set_init(&sched->w_sessions);
00043         sched->w_max=0;
00044         session_set_init(&sched->e_sessions);
00045         sched->e_max=0;
00046 }
00047 
00048 RtpScheduler * rtp_scheduler_new()
00049 {
00050         RtpScheduler *sched=(RtpScheduler *) ortp_malloc(sizeof(RtpScheduler));
00051         memset(sched,0,sizeof(RtpScheduler));
00052         rtp_scheduler_init(sched);
00053         return sched;
00054 }
00055 
00056 void rtp_scheduler_set_timer(RtpScheduler *sched,RtpTimer *timer)
00057 {
00058         if (sched->thread_running){
00059                 ortp_warning("Cannot change timer while the scheduler is running !!");
00060                 return;
00061         }
00062         sched->timer=timer;
00063         /* report the timer increment */
00064         sched->timer_inc=(timer->interval.tv_usec/1000) + (timer->interval.tv_sec*1000);
00065 }
00066 
00067 void rtp_scheduler_start(RtpScheduler *sched)
00068 {
00069         if (sched->thread_running==0){
00070                 sched->thread_running=1;
00071                 ortp_mutex_lock(&sched->lock);
00072                 ortp_thread_create(&sched->thread, NULL, rtp_scheduler_schedule,(void*)sched);
00073                 ortp_cond_wait(&sched->unblock_select_cond,&sched->lock);
00074                 ortp_mutex_unlock(&sched->lock);
00075         }
00076         else ortp_warning("Scheduler thread already running.");
00077 
00078 }
00079 void rtp_scheduler_stop(RtpScheduler *sched)
00080 {
00081         if (sched->thread_running==1)
00082         {
00083                 sched->thread_running=0;
00084                 ortp_thread_join(sched->thread, NULL);
00085         }
00086         else ortp_warning("Scheduler thread is not running.");
00087 }
00088 
00089 void rtp_scheduler_destroy(RtpScheduler *sched)
00090 {
00091         if (sched->thread_running) rtp_scheduler_stop(sched);
00092         ortp_mutex_destroy(&sched->lock);
00093         //g_mutex_free(sched->unblock_select_mutex);
00094         ortp_cond_destroy(&sched->unblock_select_cond);
00095         ortp_free(sched);
00096 }
00097 
00098 void * rtp_scheduler_schedule(void * psched)
00099 {
00100         RtpScheduler *sched=(RtpScheduler*) psched;
00101         RtpTimer *timer=sched->timer;
00102         RtpSession *current;
00103 
00104         /* take this lock to prevent the thread to start until g_thread_create() returns
00105                 because we need sched->thread to be initialized */
00106         ortp_mutex_lock(&sched->lock);
00107         ortp_cond_signal(&sched->unblock_select_cond);  /* unblock the starting thread */
00108         ortp_mutex_unlock(&sched->lock);
00109         timer->timer_init();
00110         while(sched->thread_running)
00111         {
00112                 /* do the processing here: */
00113                 ortp_mutex_lock(&sched->lock);
00114                 
00115                 current=sched->list;
00116                 /* processing all scheduled rtp sessions */
00117                 while (current!=NULL)
00118                 {
00119                         ortp_debug("scheduler: processing session=0x%x.\n",current);
00120                         rtp_session_process(current,sched->time_,sched);
00121                         current=current->next;
00122                 }
00123                 /* wake up all the threads that are sleeping in _select()  */
00124                 ortp_cond_broadcast(&sched->unblock_select_cond);
00125                 ortp_mutex_unlock(&sched->lock);
00126                 
00127                 /* now while the scheduler is going to sleep, the other threads can compute their
00128                 result mask and see if they have to leave, or to wait for next tick*/
00129                 //ortp_message("scheduler: sleeping.");
00130                 timer->timer_do();
00131                 sched->time_+=sched->timer_inc;
00132         }
00133         /* when leaving the thread, stop the timer */
00134         timer->timer_uninit();
00135         return NULL;
00136 }
00137 
00138 void rtp_scheduler_add_session(RtpScheduler *sched, RtpSession *session)
00139 {
00140         RtpSession *oldfirst;
00141         int i;
00142         if (session->flags & RTP_SESSION_IN_SCHEDULER){
00143                 /* the rtp session is already scheduled, so return silently */
00144                 return;
00145         }
00146         rtp_scheduler_lock(sched);
00147         /* enqueue the session to the list of scheduled sessions */
00148         oldfirst=sched->list;
00149         sched->list=session;
00150         session->next=oldfirst;
00151         if (sched->max_sessions==0){
00152                 ortp_error("rtp_scheduler_add_session: max_session=0 !");
00153         }
00154         /* find a free pos in the session mask*/
00155         for (i=0;i<sched->max_sessions;i++){
00156                 if (!ORTP_FD_ISSET(i,&sched->all_sessions.rtpset)){
00157                         session->mask_pos=i;
00158                         session_set_set(&sched->all_sessions,session);
00159                         /* make a new session scheduled not blockable if it has not started*/
00160                         if (session->flags & RTP_SESSION_RECV_NOT_STARTED) 
00161                                 session_set_set(&sched->r_sessions,session);
00162                         if (session->flags & RTP_SESSION_SEND_NOT_STARTED) 
00163                                 session_set_set(&sched->w_sessions,session);
00164                         if (i>sched->all_max){
00165                                 sched->all_max=i;
00166                         }
00167                         break;
00168                 }
00169         }
00170         
00171         rtp_session_set_flag(session,RTP_SESSION_IN_SCHEDULER);
00172         rtp_scheduler_unlock(sched);
00173 }
00174 
00175 void rtp_scheduler_remove_session(RtpScheduler *sched, RtpSession *session)
00176 {
00177         RtpSession *tmp;
00178         int cond=1;
00179         return_if_fail(session!=NULL); 
00180         if (!(session->flags & RTP_SESSION_IN_SCHEDULER)){
00181                 /* the rtp session is not scheduled, so return silently */
00182                 return;
00183         }
00184 
00185         rtp_scheduler_lock(sched);
00186         tmp=sched->list;
00187         if (tmp==session){
00188                 sched->list=tmp->next;
00189                 rtp_session_unset_flag(session,RTP_SESSION_IN_SCHEDULER);
00190                 session_set_clr(&sched->all_sessions,session);
00191                 rtp_scheduler_unlock(sched);
00192                 return;
00193         }
00194         /* go the position of session in the list */
00195         while(cond){
00196                 if (tmp!=NULL){
00197                         if (tmp->next==session){
00198                                 tmp->next=tmp->next->next;
00199                                 cond=0;
00200                         }
00201                         else tmp=tmp->next;
00202                 }else {
00203                         /* the session was not found ! */
00204                         ortp_warning("rtp_scheduler_remove_session: the session was not found in the scheduler list!");
00205                         cond=0;
00206                 }
00207         }
00208         rtp_session_unset_flag(session,RTP_SESSION_IN_SCHEDULER);
00209         /* delete the bit in the mask */
00210         session_set_clr(&sched->all_sessions,session);
00211         rtp_scheduler_unlock(sched);
00212 }

Generated on Fri Jun 22 17:30:30 2007 for oRTP by  doxygen 1.5.2