00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #if defined(WIN32) || defined(_WIN32_WCE)
00022 #include "ortp-config-win32.h"
00023 #else
00024 #include "ortp-config.h"
00025 #endif
00026
00027 #include "ortp/ortp.h"
00028 #include "ortp/telephonyevents.h"
00029 #include "ortp/rtcp.h"
00030 #include "jitterctl.h"
00031 #include "scheduler.h"
00032 #include "utils.h"
00033 #include "rtpsession_priv.h"
00034
00035 extern mblk_t *rtcp_create_simple_bye_packet(uint32_t ssrc, const char *reason);
00036 extern int rtcp_sr_init(RtpSession *session, char *buf, int size);
00037 extern int rtcp_rr_init(RtpSession *session, char *buf, int size);
00038
00039
00040
00041
00042 static void payload_type_changed(RtpSession *session, PayloadType *pt){
00043 jitter_control_set_payload(&session->rtp.jittctl,pt);
00044 session->rtp.rtcp_report_snt_interval=RTCP_DEFAULT_REPORT_INTERVAL*pt->clock_rate;
00045 rtp_session_set_time_jump_limit(session,session->rtp.time_jump);
00046 if (pt->type==PAYLOAD_VIDEO){
00047 session->permissive=TRUE;
00048 ortp_message("Using permissive algorithm");
00049 }
00050 else session->permissive=FALSE;
00051 }
00052
00053 void wait_point_init(WaitPoint *wp){
00054 ortp_mutex_init(&wp->lock,NULL);
00055 ortp_cond_init(&wp->cond,NULL);
00056 wp->time=0;
00057 wp->wakeup=FALSE;
00058 }
00059 void wait_point_uninit(WaitPoint *wp){
00060 ortp_cond_destroy(&wp->cond);
00061 ortp_mutex_destroy(&wp->lock);
00062 }
00063
00064 #define wait_point_lock(wp) ortp_mutex_lock(&(wp)->lock)
00065 #define wait_point_unlock(wp) ortp_mutex_unlock(&(wp)->lock)
00066
00067 void wait_point_wakeup_at(WaitPoint *wp, uint32_t t, bool_t dosleep){
00068 wp->time=t;
00069 wp->wakeup=TRUE;
00070 if (dosleep) ortp_cond_wait(&wp->cond,&wp->lock);
00071 }
00072
00073
00074 bool_t wait_point_check(WaitPoint *wp, uint32_t t){
00075 bool_t ok=FALSE;
00076
00077 if (wp->wakeup){
00078 if (TIME_IS_NEWER_THAN(t,wp->time)){
00079 wp->wakeup=FALSE;
00080 ok=TRUE;
00081
00082 }
00083 }
00084 return ok;
00085 }
00086 #define wait_point_wakeup(wp) ortp_cond_signal(&(wp)->cond);
00087
00088 extern void rtp_parse(RtpSession *session, mblk_t *mp, uint32_t local_str_ts,
00089 struct sockaddr *addr, socklen_t addrlen);
00090
00091
00092 static uint32_t uint32_t_random(){
00093 return random();
00094 }
00095
00096
00097 #define RTP_SEQ_IS_GREATER(seq1,seq2)\
00098 ((uint16_t)((uint16_t)(seq1) - (uint16_t)(seq2))< (uint16_t)(1<<15))
00099
00100
00101 void rtp_putq(queue_t *q, mblk_t *mp)
00102 {
00103 mblk_t *tmp;
00104 rtp_header_t *rtp=(rtp_header_t*)mp->b_rptr,*tmprtp;
00105
00106
00107 ortp_debug("rtp_putq(): Enqueuing packet with ts=%i and seq=%i",rtp->timestamp,rtp->seq_number);
00108
00109 if (qempty(q)) {
00110 putq(q,mp);
00111 return;
00112 }
00113 tmp=qlast(q);
00114
00115
00116 while (!qend(q,tmp))
00117 {
00118 tmprtp=(rtp_header_t*)tmp->b_rptr;
00119 ortp_debug("rtp_putq(): Seeing packet with seq=%i",tmprtp->seq_number);
00120
00121 if (rtp->seq_number == tmprtp->seq_number)
00122 {
00123
00124 ortp_debug("rtp_putq: duplicated message.");
00125 freemsg(mp);
00126 return;
00127 }else if (RTP_SEQ_IS_GREATER(rtp->seq_number,tmprtp->seq_number)){
00128
00129 insq(q,tmp->b_next,mp);
00130 return;
00131 }
00132 tmp=tmp->b_prev;
00133 }
00134
00135
00136 insq(q,qfirst(q),mp);
00137
00138 }
00139
00140
00141
00142 mblk_t *rtp_getq(queue_t *q,uint32_t timestamp, int *rejected)
00143 {
00144 mblk_t *tmp,*ret=NULL,*old=NULL;
00145 rtp_header_t *tmprtp;
00146 uint32_t ts_found=0;
00147
00148 *rejected=0;
00149 ortp_debug("rtp_getq(): Timestamp %i wanted.",timestamp);
00150
00151 if (qempty(q))
00152 {
00153
00154 return NULL;
00155 }
00156
00157
00158 while ((tmp=qfirst(q))!=NULL)
00159 {
00160 tmprtp=(rtp_header_t*)tmp->b_rptr;
00161 ortp_debug("rtp_getq: Seeing packet with ts=%i",tmprtp->timestamp);
00162 if ( RTP_TIMESTAMP_IS_NEWER_THAN(timestamp,tmprtp->timestamp) )
00163 {
00164 if (ret!=NULL && tmprtp->timestamp==ts_found) {
00165
00166 break;
00167 }
00168 if (old!=NULL) {
00169 ortp_debug("rtp_getq: discarding too old packet with ts=%i",ts_found);
00170 (*rejected)++;
00171 freemsg(old);
00172 }
00173 ret=getq(q);
00174 ts_found=tmprtp->timestamp;
00175 ortp_debug("rtp_getq: Found packet with ts=%i",tmprtp->timestamp);
00176 old=ret;
00177 }
00178 else
00179 {
00180 break;
00181 }
00182 }
00183 return ret;
00184 }
00185
00186 mblk_t *rtp_getq_permissive(queue_t *q,uint32_t timestamp, int *rejected)
00187 {
00188 mblk_t *tmp,*ret=NULL;
00189 rtp_header_t *tmprtp;
00190
00191 *rejected=0;
00192 ortp_debug("rtp_getq_permissive(): Timestamp %i wanted.",timestamp);
00193
00194 if (qempty(q))
00195 {
00196
00197 return NULL;
00198 }
00199
00200
00201 tmp=qfirst(q);
00202 tmprtp=(rtp_header_t*)tmp->b_rptr;
00203 ortp_debug("rtp_getq_permissive: Seeing packet with ts=%i",tmprtp->timestamp);
00204 if ( RTP_TIMESTAMP_IS_NEWER_THAN(timestamp,tmprtp->timestamp) )
00205 {
00206 ret=getq(q);
00207 ortp_debug("rtp_getq_permissive: Found packet with ts=%i",tmprtp->timestamp);
00208 }
00209 return ret;
00210 }
00211
00212
00213 void
00214 rtp_session_init (RtpSession * session, int mode)
00215 {
00216 memset (session, 0, sizeof (RtpSession));
00217 session->rtp.max_rq_size = 100;
00218 session->mode = (RtpSessionMode) mode;
00219 if ((mode == RTP_SESSION_RECVONLY) || (mode == RTP_SESSION_SENDRECV))
00220 {
00221 rtp_session_set_flag (session, RTP_SESSION_RECV_SYNC);
00222 rtp_session_set_flag (session, RTP_SESSION_RECV_NOT_STARTED);
00223
00224 }
00225 if ((mode == RTP_SESSION_SENDONLY) || (mode == RTP_SESSION_SENDRECV))
00226 {
00227 rtp_session_set_flag (session, RTP_SESSION_SEND_NOT_STARTED);
00228 session->snd.ssrc=uint32_t_random();
00229
00230 rtp_session_set_source_description(session,"unknown@unknown",NULL,NULL,
00231 NULL,NULL,"oRTP-" ORTP_VERSION,"This is free sofware (LGPL) !");
00232 }
00233 session->snd.telephone_events_pt=-1;
00234 session->rcv.telephone_events_pt=-1;
00235 rtp_session_set_profile (session, &av_profile);
00236 session->rtp.socket=-1;
00237 session->rtcp.socket=-1;
00238 session->dscp=RTP_DEFAULT_DSCP;
00239 session->multicast_ttl=RTP_DEFAULT_MULTICAST_TTL;
00240 session->multicast_loopback=RTP_DEFAULT_MULTICAST_LOOPBACK;
00241 qinit(&session->rtp.rq);
00242 qinit(&session->rtp.tev_rq);
00243 qinit(&session->contributing_sources);
00244 session->eventqs=NULL;
00245
00246 rtp_signal_table_init (&session->on_ssrc_changed, session,"ssrc_changed");
00247 rtp_signal_table_init (&session->on_payload_type_changed, session,"payload_type_changed");
00248 rtp_signal_table_init (&session->on_telephone_event, session,"telephone-event");
00249 rtp_signal_table_init (&session->on_telephone_event_packet, session,"telephone-event_packet");
00250 rtp_signal_table_init (&session->on_timestamp_jump,session,"timestamp_jump");
00251 rtp_signal_table_init (&session->on_network_error,session,"network_error");
00252 rtp_signal_table_init (&session->on_rtcp_bye,session,"rtcp_bye");
00253 wait_point_init(&session->snd.wp);
00254 wait_point_init(&session->rcv.wp);
00255
00256 rtp_session_set_send_payload_type(session,0);
00257
00258 rtp_session_set_recv_payload_type(session,-1);
00259 rtp_session_set_jitter_compensation(session,RTP_DEFAULT_JITTER_TIME);
00260 rtp_session_enable_adaptive_jitter_compensation(session,FALSE);
00261 rtp_session_set_time_jump_limit(session,5000);
00262 session->recv_buf_size = UDP_MAX_SIZE;
00263 session->symmetric_rtp = FALSE;
00264 session->permissive=FALSE;
00265 }
00266
00267
00277 RtpSession *
00278 rtp_session_new (int mode)
00279 {
00280 RtpSession *session;
00281 session = (RtpSession *) ortp_malloc (sizeof (RtpSession));
00282 rtp_session_init (session, mode);
00283 return session;
00284 }
00285
00299 void
00300 rtp_session_set_scheduling_mode (RtpSession * session, int yesno)
00301 {
00302 if (yesno)
00303 {
00304 RtpScheduler *sched;
00305 sched = ortp_get_scheduler ();
00306 if (sched != NULL)
00307 {
00308 rtp_session_set_flag (session, RTP_SESSION_SCHEDULED);
00309 session->sched = sched;
00310 rtp_scheduler_add_session (sched, session);
00311 }
00312 else
00313 ortp_warning
00314 ("rtp_session_set_scheduling_mode: Cannot use scheduled mode because the "
00315 "scheduler is not started. Use ortp_scheduler_init() before.");
00316 }
00317 else
00318 rtp_session_unset_flag (session, RTP_SESSION_SCHEDULED);
00319 }
00320
00321
00334 void
00335 rtp_session_set_blocking_mode (RtpSession * session, int yesno)
00336 {
00337 if (yesno){
00338 rtp_session_set_scheduling_mode(session,TRUE);
00339 rtp_session_set_flag (session, RTP_SESSION_BLOCKING_MODE);
00340 }else
00341 rtp_session_unset_flag (session, RTP_SESSION_BLOCKING_MODE);
00342 }
00343
00353 void
00354 rtp_session_set_profile (RtpSession * session, RtpProfile * profile)
00355 {
00356 session->snd.profile = profile;
00357 session->rcv.profile = profile;
00358 rtp_session_telephone_events_supported(session);
00359 }
00360
00361
00371 void
00372 rtp_session_set_send_profile (RtpSession * session, RtpProfile * profile)
00373 {
00374 session->snd.profile = profile;
00375 rtp_session_send_telephone_events_supported(session);
00376 }
00377
00378
00379
00389 void
00390 rtp_session_set_recv_profile (RtpSession * session, RtpProfile * profile)
00391 {
00392 session->rcv.profile = profile;
00393 rtp_session_recv_telephone_events_supported(session);
00394 }
00395
00403 RtpProfile *rtp_session_get_profile(RtpSession *session){
00404 return session->snd.profile;
00405 }
00406
00407
00414 RtpProfile *rtp_session_get_send_profile(RtpSession *session){
00415 return session->snd.profile;
00416 }
00417
00424 RtpProfile *rtp_session_get_recv_profile(RtpSession *session){
00425 return session->rcv.profile;
00426 }
00427
00436 void rtp_session_set_recv_buf_size(RtpSession *session, int bufsize){
00437 session->recv_buf_size=bufsize;
00438 }
00439
00473 int
00474 rtp_session_signal_connect (RtpSession * session, const char *signal_name,
00475 RtpCallback cb, unsigned long user_data)
00476 {
00477 OList *elem;
00478 for (elem=session->signal_tables;elem!=NULL;elem=o_list_next(elem)){
00479 RtpSignalTable *s=(RtpSignalTable*) elem->data;
00480 if (strcmp(signal_name,s->signal_name)==0){
00481 return rtp_signal_table_add(s,cb,user_data);
00482 }
00483 }
00484 ortp_warning ("rtp_session_signal_connect: inexistant signal %s",signal_name);
00485 return -1;
00486 }
00487
00488
00497 int
00498 rtp_session_signal_disconnect_by_callback (RtpSession * session, const char *signal_name,
00499 RtpCallback cb)
00500 {
00501 OList *elem;
00502 for (elem=session->signal_tables;elem!=NULL;elem=o_list_next(elem)){
00503 RtpSignalTable *s=(RtpSignalTable*) elem->data;
00504 if (strcmp(signal_name,s->signal_name)==0){
00505 return rtp_signal_table_remove_by_callback(s,cb);
00506 }
00507 }
00508 ortp_warning ("rtp_session_signal_connect: inexistant signal %s",signal_name);
00509 return -1;
00510 }
00511
00512
00519 void rtp_session_set_seq_number(RtpSession *session, uint16_t seq){
00520 session->rtp.snd_seq=seq;
00521 }
00522
00523
00524 uint16_t rtp_session_get_seq_number(RtpSession *session){
00525 return session->rtp.snd_seq;
00526 }
00527
00528
00536 void
00537 rtp_session_set_ssrc (RtpSession * session, uint32_t ssrc)
00538 {
00539 session->snd.ssrc = ssrc;
00540 }
00541
00542
00543 void rtp_session_update_payload_type(RtpSession *session, int paytype){
00544
00545 PayloadType *pt=rtp_profile_get_payload(session->rcv.profile,paytype);
00546 session->hw_recv_pt=paytype;
00547 if (pt!=0){
00548 ortp_message ("payload type changed to %i(%s) !",
00549 paytype,pt->mime_type);
00550 payload_type_changed(session,pt);
00551 }else{
00552 ortp_warning("Receiving packet with unknown payload type %i.",paytype);
00553 }
00554 }
00567 int
00568 rtp_session_set_send_payload_type (RtpSession * session, int paytype)
00569 {
00570 session->snd.pt=paytype;
00571 return 0;
00572 }
00573
00579 int rtp_session_get_send_payload_type(const RtpSession *session){
00580 return session->snd.pt;
00581 }
00582
00594 int
00595 rtp_session_set_recv_payload_type (RtpSession * session, int paytype)
00596 {
00597 PayloadType *pt;
00598 session->rcv.pt=paytype;
00599 session->hw_recv_pt=paytype;
00600 pt=rtp_profile_get_payload(session->rcv.profile,paytype);
00601 if (pt!=NULL){
00602 payload_type_changed(session,pt);
00603 }
00604 return 0;
00605 }
00606
00612 int rtp_session_get_recv_payload_type(const RtpSession *session){
00613 return session->rcv.pt;
00614 }
00615
00625 int rtp_session_set_payload_type(RtpSession *session, int pt){
00626 if (rtp_session_set_send_payload_type(session,pt)<0) return -1;
00627 if (rtp_session_set_recv_payload_type(session,pt)<0) return -1;
00628 return 0;
00629 }
00630
00631
00632 static void rtp_header_init_from_session(rtp_header_t *rtp, RtpSession *session){
00633 rtp->version = 2;
00634 rtp->padbit = 0;
00635 rtp->extbit = 0;
00636 rtp->markbit= 0;
00637 rtp->cc = 0;
00638 rtp->paytype = session->snd.pt;
00639 rtp->ssrc = session->snd.ssrc;
00640 rtp->timestamp = 0;
00641
00642 rtp->seq_number=session->rtp.snd_seq;
00643 }
00644
00657 mblk_t * rtp_session_create_packet(RtpSession *session,int header_size, const uint8_t *payload, int payload_size)
00658 {
00659 mblk_t *mp;
00660 int msglen=header_size+payload_size;
00661 rtp_header_t *rtp;
00662
00663 mp=allocb(msglen,BPRI_MED);
00664 rtp=(rtp_header_t*)mp->b_rptr;
00665 rtp_header_init_from_session(rtp,session);
00666
00667 mp->b_wptr+=header_size;
00668 if (payload_size){
00669 memcpy(mp->b_wptr,payload,payload_size);
00670 mp->b_wptr+=payload_size;
00671 }
00672 return mp;
00673 }
00674
00691 mblk_t * rtp_session_create_packet_with_data(RtpSession *session, uint8_t *payload, int payload_size, void (*freefn)(void*))
00692 {
00693 mblk_t *mp,*mpayload;
00694 int header_size=RTP_FIXED_HEADER_SIZE;
00695 rtp_header_t *rtp;
00696
00697 mp=allocb(header_size,BPRI_MED);
00698 rtp=(rtp_header_t*)mp->b_rptr;
00699 rtp_header_init_from_session(rtp,session);
00700 mp->b_wptr+=header_size;
00701
00702 mpayload=esballoc(payload,payload_size,BPRI_MED,freefn);
00703 mpayload->b_wptr+=payload_size;
00704
00705 mp->b_cont=mpayload;
00706 return mp;
00707 }
00708
00709
00723 mblk_t * rtp_session_create_packet_in_place(RtpSession *session,uint8_t *buffer, int size, void (*freefn)(void*) )
00724 {
00725 mblk_t *mp;
00726 rtp_header_t *rtp;
00727
00728 mp=esballoc(buffer,size,BPRI_MED,freefn);
00729
00730 rtp=(rtp_header_t*)mp->b_rptr;
00731 rtp_header_init_from_session(rtp,session);
00732 return mp;
00733 }
00734
00746 int
00747 rtp_session_sendm_with_ts (RtpSession * session, mblk_t *mp, uint32_t timestamp)
00748 {
00749 rtp_header_t *rtp;
00750 uint32_t packet_time;
00751 int error = 0;
00752 int packsize;
00753 RtpScheduler *sched=session->sched;
00754 RtpStream *stream=&session->rtp;
00755
00756 if (session->flags & RTP_SESSION_SEND_NOT_STARTED)
00757 {
00758 session->rtp.snd_ts_offset = timestamp;
00759
00760 if ((session->flags & RTP_SESSION_RECV_NOT_STARTED)
00761 || session->mode == RTP_SESSION_SENDONLY)
00762 {
00763 gettimeofday(&session->last_recv_time, NULL);
00764 }
00765 if (session->flags & RTP_SESSION_SCHEDULED)
00766 {
00767 session->rtp.snd_time_offset = sched->time_;
00768 }
00769 rtp_session_unset_flag (session,RTP_SESSION_SEND_NOT_STARTED);
00770 }
00771
00772
00773
00774
00775 if (session->flags & RTP_SESSION_SCHEDULED)
00776 {
00777 packet_time =
00778 rtp_session_ts_to_time (session,
00779 timestamp -
00780 session->rtp.snd_ts_offset) +
00781 session->rtp.snd_time_offset;
00782
00783 wait_point_lock(&session->snd.wp);
00784 if (TIME_IS_STRICTLY_NEWER_THAN (packet_time, sched->time_))
00785 {
00786 wait_point_wakeup_at(&session->snd.wp,packet_time,(session->flags & RTP_SESSION_BLOCKING_MODE)!=0);
00787 session_set_clr(&sched->w_sessions,session);
00788 }
00789 else session_set_set(&sched->w_sessions,session);
00790 wait_point_unlock(&session->snd.wp);
00791 }
00792
00793
00794 rtp=(rtp_header_t*)mp->b_rptr;
00795
00796 packsize = msgdsize(mp) ;
00797
00798 rtp->timestamp=timestamp;
00799 if (session->snd.telephone_events_pt==rtp->paytype)
00800 {
00801 session->rtp.snd_seq++;
00802 rtp->seq_number = session->rtp.snd_seq;
00803 }
00804 else
00805 session->rtp.snd_seq=rtp->seq_number+1;
00806 session->rtp.snd_last_ts = timestamp;
00807
00808
00809 ortp_global_stats.sent += packsize;
00810 stream->stats.sent += packsize;
00811 ortp_global_stats.packet_sent++;
00812 stream->stats.packet_sent++;
00813
00814 error = rtp_session_rtp_send (session, mp);
00815
00816 rtp_session_rtcp_process_send(session);
00817
00818
00819 if (session->mode==RTP_SESSION_SENDONLY) rtp_session_rtcp_recv(session);
00820 return error;
00821 }
00822
00823
00836 int
00837 rtp_session_send_with_ts (RtpSession * session, const uint8_t * buffer, int len,
00838 uint32_t userts)
00839 {
00840 mblk_t *m;
00841 int err;
00842 #ifdef USE_SENDMSG
00843 m=rtp_session_create_packet_with_data(session,(uint8_t*)buffer,len,NULL);
00844 #else
00845 m = rtp_session_create_packet(session,RTP_FIXED_HEADER_SIZE,(uint8_t*)buffer,len);
00846 #endif
00847 err=rtp_session_sendm_with_ts(session,m,userts);
00848 return err;
00849 }
00850
00851
00852
00853 extern void rtcp_parse(RtpSession *session, mblk_t *mp);
00854
00855
00856
00857 static void payload_type_changed_notify(RtpSession *session, int paytype){
00858 session->rcv.pt = paytype;
00859 rtp_signal_table_emit (&session->on_payload_type_changed);
00860 }
00861
00862
00877 mblk_t *
00878 rtp_session_recvm_with_ts (RtpSession * session, uint32_t user_ts)
00879 {
00880 mblk_t *mp = NULL;
00881 rtp_header_t *rtp;
00882 uint32_t ts;
00883 uint32_t packet_time;
00884 RtpScheduler *sched=session->sched;
00885 RtpStream *stream=&session->rtp;
00886 int rejected=0;
00887
00888
00889
00890
00891 if (session->flags & RTP_SESSION_RECV_NOT_STARTED)
00892 {
00893 session->rtp.rcv_query_ts_offset = user_ts;
00894
00895 if ((session->flags & RTP_SESSION_SEND_NOT_STARTED)
00896 || session->mode == RTP_SESSION_RECVONLY){
00897 gettimeofday(&session->last_recv_time, NULL);
00898 }
00899 if (session->flags & RTP_SESSION_SCHEDULED)
00900 {
00901 session->rtp.rcv_time_offset = sched->time_;
00902
00903 }
00904 rtp_session_unset_flag (session,RTP_SESSION_RECV_NOT_STARTED);
00905 }
00906 session->rtp.rcv_last_app_ts = user_ts;
00907 rtp_session_rtp_recv (session, user_ts);
00908 rtp_session_rtcp_recv(session);
00909
00910 mp=getq(&session->rtp.tev_rq);
00911 if (mp!=NULL){
00912 int msgsize=msgdsize(mp);
00913 ortp_global_stats.recv += msgsize;
00914 stream->stats.recv += msgsize;
00915 rtp_signal_table_emit2(&session->on_telephone_event_packet,(long)mp);
00916 rtp_session_check_telephone_events(session,mp);
00917 freemsg(mp);
00918 mp=NULL;
00919 }
00920
00921
00922
00923
00924
00925 if (session->flags & RTP_SESSION_RECV_SYNC)
00926 {
00927 queue_t *q = &session->rtp.rq;
00928 if (qempty(q))
00929 {
00930 ortp_debug ("Queue is empty.");
00931 goto end;
00932 }
00933 rtp = (rtp_header_t *) qfirst(q)->b_rptr;
00934 session->rtp.rcv_ts_offset = rtp->timestamp;
00935
00936
00937 session->rtp.hwrcv_diff_ts = rtp->timestamp - user_ts;
00938
00939 session->rtp.rcv_diff_ts=session->rtp.hwrcv_diff_ts - session->rtp.jittctl.jitt_comp_ts;
00940 session->rtp.rcv_last_ret_ts = user_ts;
00941 session->rcv.ssrc = rtp->ssrc;
00942
00943 rtp_session_unset_flag (session, RTP_SESSION_RECV_SYNC);
00944 }
00945
00946 ts = user_ts + session->rtp.rcv_diff_ts;
00947 if (session->permissive || session->rtp.jittctl.jitt_comp_ts==0)
00948 mp = rtp_getq_permissive(&session->rtp.rq, ts,&rejected);
00949 else
00950 mp = rtp_getq(&session->rtp.rq, ts,&rejected);
00951
00952 stream->stats.outoftime+=rejected;
00953 ortp_global_stats.outoftime+=rejected;
00954
00955 goto end;
00956
00957 end:
00958 if (mp != NULL)
00959 {
00960 int msgsize = msgdsize (mp);
00961 uint32_t packet_ts;
00962 ortp_global_stats.recv += msgsize;
00963 stream->stats.recv += msgsize;
00964 rtp = (rtp_header_t *) mp->b_rptr;
00965 packet_ts=rtp->timestamp;
00966 ortp_debug("Returning mp with ts=%i", packet_ts);
00967
00968 if (session->rcv.pt != rtp->paytype)
00969 {
00970 payload_type_changed_notify(session, rtp->paytype);
00971 }
00972
00973
00974 if (session->rtp.jittctl.adaptive){
00975 uint32_t changed_ts;
00976
00977
00978 if (packet_ts!=session->rtp.rcv_last_ts)
00979 jitter_control_update_corrective_slide(&session->rtp.jittctl);
00980 changed_ts=packet_ts-session->rtp.jittctl.corrective_slide;
00981 rtp->timestamp=changed_ts;
00982
00983 }
00984 session->rtp.rcv_last_ts = packet_ts;
00985 if (!(session->flags & RTP_SESSION_FIRST_PACKET_DELIVERED)){
00986 rtp_session_set_flag(session,RTP_SESSION_FIRST_PACKET_DELIVERED);
00987 }
00988 }
00989 else
00990 {
00991 ortp_debug ("No mp for timestamp queried");
00992 stream->stats.unavaillable++;
00993 ortp_global_stats.unavaillable++;
00994 }
00995 rtp_session_rtcp_process_recv(session);
00996
00997 if (session->flags & RTP_SESSION_SCHEDULED)
00998 {
00999
01000
01001
01002
01003 packet_time =
01004 rtp_session_ts_to_time (session,
01005 user_ts -
01006 session->rtp.rcv_query_ts_offset) +
01007 session->rtp.rcv_time_offset;
01008 ortp_debug ("rtp_session_recvm_with_ts: packet_time=%i, time=%i",packet_time, sched->time_);
01009 wait_point_lock(&session->rcv.wp);
01010 if (TIME_IS_STRICTLY_NEWER_THAN (packet_time, sched->time_))
01011 {
01012 wait_point_wakeup_at(&session->rcv.wp,packet_time, (session->flags & RTP_SESSION_BLOCKING_MODE)!=0);
01013 session_set_clr(&sched->r_sessions,session);
01014 }
01015 else session_set_set(&sched->r_sessions,session);
01016 wait_point_unlock(&session->rcv.wp);
01017 }
01018 return mp;
01019 }
01020
01021
01022 static int msg_to_buf (mblk_t * mp, uint8_t *buffer, int len)
01023 {
01024 int rlen = len;
01025 mblk_t *m, *mprev;
01026 int mlen;
01027 m = mp->b_cont;
01028 mprev = mp;
01029 while (m != NULL)
01030 {
01031 mlen = (int) (m->b_wptr - m->b_rptr);
01032 if (mlen <= rlen)
01033 {
01034 mblk_t *consumed = m;
01035 memcpy (buffer, m->b_rptr, mlen);
01036
01037 mprev->b_cont = m->b_cont;
01038 m = m->b_cont;
01039 consumed->b_cont = NULL;
01040 freeb (consumed);
01041 buffer += mlen;
01042 rlen -= mlen;
01043 }
01044 else
01045 {
01046 memcpy (buffer, m->b_rptr, rlen);
01047 m->b_rptr += rlen;
01048 return len;
01049 }
01050 }
01051 return len - rlen;
01052 }
01053
01093 int rtp_session_recv_with_ts (RtpSession * session, uint8_t * buffer,
01094 int len, uint32_t ts, int * have_more)
01095 {
01096 mblk_t *mp;
01097 int rlen = len;
01098 int wlen, mlen;
01099 uint32_t ts_int = 0;
01100 PayloadType *payload;
01101 RtpStream *stream=&session->rtp;
01102
01103 *have_more = 0;
01104
01105 mp = rtp_session_recvm_with_ts (session, ts);
01106 payload =rtp_profile_get_payload (session->rcv.profile,
01107 session->rcv.pt);
01108 if (payload==NULL){
01109 ortp_warning("rtp_session_recv_with_ts: unable to recv an unsupported payload (%i)",session->rcv.pt);
01110 if (mp!=NULL) freemsg(mp);
01111 return -1;
01112 }
01113 if (!(session->flags & RTP_SESSION_RECV_SYNC))
01114 {
01115
01116 if (RTP_TIMESTAMP_IS_STRICTLY_NEWER_THAN
01117 (ts, session->rtp.rcv_last_ret_ts))
01118 {
01119
01120
01121
01122 *have_more = 1;
01123 }
01124 if (payload->type == PAYLOAD_AUDIO_CONTINUOUS)
01125 {
01126 ts_int = (len * payload->bits_per_sample) >> 3;
01127 session->rtp.rcv_last_ret_ts += ts_int;
01128
01129 }
01130 else
01131 ts_int = 0;
01132 }
01133 else return 0;
01134
01135
01136 while (1)
01137 {
01138
01139 if (mp != NULL)
01140 {
01141 mlen = msgdsize (mp->b_cont);
01142 wlen = msg_to_buf (mp, buffer, rlen);
01143 buffer += wlen;
01144 rlen -= wlen;
01145 ortp_debug("mlen=%i wlen=%i rlen=%i", mlen, wlen,
01146 rlen);
01147
01148 if (rlen > 0)
01149 {
01150
01151 freemsg (mp);
01152
01153
01154
01155 if (ts_int > 0)
01156 {
01157 ts = session->rtp.rcv_last_ret_ts;
01158 ortp_debug("Need more: will ask for %i.", ts);
01159 }
01160 else
01161 return len - rlen;
01162 }
01163 else if (mlen > wlen)
01164 {
01165 int unread =
01166 mlen - wlen + (int) (mp->b_wptr -
01167 mp->b_rptr);
01168
01169
01170 ortp_debug ("Re-enqueuing packet.");
01171 rtp_putq (&session->rtp.rq, mp);
01172
01173 ortp_global_stats.recv -= unread;
01174 stream->stats.recv -= unread;
01175 return len;
01176 }
01177 else
01178 {
01179
01180 freemsg (mp);
01181 return len;
01182 }
01183 }
01184 else
01185 {
01186
01187 if (payload->pattern_length != 0)
01188 {
01189 int i = 0, j = 0;
01190 while (i < rlen)
01191 {
01192 buffer[i] = payload->zero_pattern[j];
01193 i++;
01194 j++;
01195 if (j <= payload->pattern_length)
01196 j = 0;
01197 }
01198 return len;
01199 }
01200 *have_more = 0;
01201 return 0;
01202 }
01203 mp = rtp_session_recvm_with_ts (session, ts);
01204 payload = rtp_profile_get_payload (session->rcv.profile,
01205 session->rcv.pt);
01206 if (payload==NULL){
01207 ortp_warning("rtp_session_recv_with_ts: unable to recv an unsupported payload.");
01208 if (mp!=NULL) freemsg(mp);
01209 return -1;
01210 }
01211 }
01212 return -1;
01213 }
01226 uint32_t rtp_session_get_current_send_ts(RtpSession *session)
01227 {
01228 uint32_t userts;
01229 uint32_t session_time;
01230 RtpScheduler *sched=session->sched;
01231 PayloadType *payload;
01232 payload=rtp_profile_get_payload(session->snd.profile,session->snd.pt);
01233 return_val_if_fail(payload!=NULL, 0);
01234 if ( (session->flags & RTP_SESSION_SCHEDULED)==0 ){
01235 ortp_warning("can't guess current timestamp because session is not scheduled.");
01236 return 0;
01237 }
01238 session_time=sched->time_-session->rtp.snd_time_offset;
01239 userts= (uint32_t)( ( (double)(session_time) * (double) payload->clock_rate )/ 1000.0)
01240 + session->rtp.snd_ts_offset;
01241 return userts;
01242 }
01243
01252 uint32_t rtp_session_get_current_recv_ts(RtpSession *session){
01253 uint32_t userts;
01254 uint32_t session_time;
01255 RtpScheduler *sched=ortp_get_scheduler();
01256 PayloadType *payload;
01257 payload=rtp_profile_get_payload(session->rcv.profile,session->rcv.pt);
01258 return_val_if_fail(payload!=NULL, 0);
01259 if ( (session->flags & RTP_SESSION_SCHEDULED)==0 ){
01260 ortp_warning("can't guess current timestamp because session is not scheduled.");
01261 return 0;
01262 }
01263 session_time=sched->time_-session->rtp.rcv_time_offset;
01264 userts= (uint32_t)( ( (double)(session_time) * (double) payload->clock_rate )/ 1000.0)
01265 + session->rtp.rcv_ts_offset;
01266 return userts;
01267 }
01268
01279 void rtp_session_set_time_jump_limit(RtpSession *session, int milisecs){
01280 uint32_t ts;
01281 session->rtp.time_jump=milisecs;
01282 ts=rtp_session_time_to_ts(session,milisecs);
01283 if (ts==0) session->rtp.ts_jump=1<<31;
01284 else session->rtp.ts_jump=ts;
01285 }
01286
01290 void rtp_session_release_sockets(RtpSession *session){
01291 if (session->rtp.socket>=0) close_socket (session->rtp.socket);
01292 if (session->rtcp.socket>=0) close_socket (session->rtcp.socket);
01293 session->rtp.socket=-1;
01294 session->rtcp.socket=-1;
01295 session->rtp.tr = 0;
01296 session->rtcp.tr = 0;
01297
01298
01299
01300
01301
01302 }
01303
01304 ortp_socket_t rtp_session_get_rtp_socket(const RtpSession *session){
01305 return rtp_session_using_transport(session, rtp) ? (session->rtp.tr->t_getsocket)(session->rtp.tr) : session->rtp.socket;
01306 }
01307
01308 ortp_socket_t rtp_session_get_rtcp_socket(const RtpSession *session){
01309 return rtp_session_using_transport(session, rtcp) ? (session->rtcp.tr->t_getsocket)(session->rtcp.tr) : session->rtcp.socket;
01310 }
01311
01316 void rtp_session_register_event_queue(RtpSession *session, OrtpEvQueue *q){
01317 session->eventqs=o_list_append(session->eventqs,q);
01318 }
01319
01320 void rtp_session_unregister_event_queue(RtpSession *session, OrtpEvQueue *q){
01321 session->eventqs=o_list_remove(session->eventqs,q);
01322 }
01323
01324 void rtp_session_dispatch_event(RtpSession *session, OrtpEvent *ev){
01325 OList *it;
01326 int i;
01327 for(i=0,it=session->eventqs;it!=NULL;it=it->next,++i){
01328 ortp_ev_queue_put((OrtpEvQueue*)it->data,ortp_event_dup(ev));
01329 }
01330 ortp_event_destroy(ev);
01331 }
01332
01333
01334 void rtp_session_uninit (RtpSession * session)
01335 {
01336
01337 if (session->flags & RTP_SESSION_SCHEDULED)
01338 {
01339 rtp_scheduler_remove_session (session->sched,session);
01340 }
01341
01342 flushq(&session->rtp.rq, FLUSHALL);
01343 flushq(&session->rtp.tev_rq, FLUSHALL);
01344
01345 if (session->eventqs!=NULL) o_list_free(session->eventqs);
01346
01347 rtp_session_release_sockets(session);
01348
01349 wait_point_uninit(&session->snd.wp);
01350 wait_point_uninit(&session->rcv.wp);
01351 if (session->current_tev!=NULL) freemsg(session->current_tev);
01352 if (session->rtp.cached_mp!=NULL) freemsg(session->rtp.cached_mp);
01353 if (session->rtcp.cached_mp!=NULL) freemsg(session->rtcp.cached_mp);
01354 if (session->sd!=NULL) freemsg(session->sd);
01355
01356 session->signal_tables = o_list_free(session->signal_tables);
01357 }
01358
01365 void rtp_session_resync(RtpSession *session){
01366 flushq (&session->rtp.rq, FLUSHALL);
01367 rtp_session_set_flag(session, RTP_SESSION_RECV_SYNC);
01368 rtp_session_unset_flag(session,RTP_SESSION_FIRST_PACKET_DELIVERED);
01369 jitter_control_init(&session->rtp.jittctl,-1,NULL);
01370 }
01371
01378 void rtp_session_reset (RtpSession * session)
01379 {
01380 rtp_session_set_flag (session, RTP_SESSION_RECV_NOT_STARTED);
01381 rtp_session_set_flag (session, RTP_SESSION_SEND_NOT_STARTED);
01382
01383 session->rtp.snd_time_offset = 0;
01384 session->rtp.snd_ts_offset = 0;
01385 session->rtp.snd_rand_offset = 0;
01386 session->rtp.snd_last_ts = 0;
01387 session->rtp.rcv_time_offset = 0;
01388 session->rtp.rcv_ts_offset = 0;
01389 session->rtp.rcv_query_ts_offset = 0;
01390 session->rtp.rcv_diff_ts = 0;
01391 session->rtp.rcv_last_ts = 0;
01392 session->rtp.rcv_last_app_ts = 0;
01393 session->rtp.hwrcv_extseq = 0;
01394 session->rtp.hwrcv_since_last_SR=0;
01395 session->rtp.snd_seq = 0;
01396 rtp_stats_reset(&session->rtp.stats);
01397 rtp_session_resync(session);
01398 }
01399
01403 const rtp_stats_t * rtp_session_get_stats(const RtpSession *session){
01404 return &session->rtp.stats;
01405 }
01406
01407 void rtp_session_reset_stats(RtpSession *session){
01408 memset(&session->rtp.stats,0,sizeof(rtp_stats_t));
01409 }
01410
01417 void rtp_session_set_data(RtpSession *session, void *data){
01418 session->user_data=data;
01419 }
01420
01425 void *rtp_session_get_data(const RtpSession *session){
01426 return session->user_data;
01427 }
01428
01438 void
01439 rtp_session_set_symmetric_rtp (RtpSession * session, bool_t yesno)
01440 {
01441 session->symmetric_rtp =yesno;
01442 }
01443
01457 void rtp_session_set_connected_mode(RtpSession *session, bool_t yesno){
01458 session->use_connect=yesno;
01459 }
01460
01461 static float compute_bw(struct timeval *orig, unsigned int bytes){
01462 struct timeval current;
01463 float bw;
01464 float time;
01465 if (bytes==0) return 0;
01466 gettimeofday(¤t,NULL);
01467 time=(current.tv_sec - orig->tv_sec) +
01468 ((current.tv_usec - orig->tv_usec)*1e-6);
01469 bw=((float)bytes)*8/time;
01470 return bw;
01471 }
01472
01473 float rtp_session_compute_recv_bandwidth(RtpSession *session){
01474 float bw;
01475 bw=compute_bw(&session->rtp.recv_bw_start,session->rtp.recv_bytes);
01476 session->rtp.recv_bytes=0;
01477 return bw;
01478 }
01479
01480 float rtp_session_compute_send_bandwidth(RtpSession *session){
01481 float bw;
01482 bw=compute_bw(&session->rtp.send_bw_start,session->rtp.sent_bytes);
01483 session->rtp.sent_bytes=0;
01484 return bw;
01485 }
01486
01487
01494 void rtp_session_destroy (RtpSession * session)
01495 {
01496 rtp_session_uninit (session);
01497 ortp_free (session);
01498 }
01499
01500 void rtp_session_make_time_distorsion(RtpSession *session, int milisec)
01501 {
01502 session->rtp.snd_time_offset+=milisec;
01503 }
01504
01505
01506
01507
01508 void rtp_add_csrc(mblk_t *mp, uint32_t csrc)
01509 {
01510 rtp_header_t *hdr=(rtp_header_t*)mp->b_rptr;
01511 hdr->csrc[hdr->cc]=csrc;
01512 hdr->cc++;
01513 }
01514
01515
01522 void
01523 rtp_session_get_last_recv_time(RtpSession *session, struct timeval *tv)
01524 {
01525 *tv = session->last_recv_time;
01526 }
01527
01528
01529
01530 uint32_t rtp_session_time_to_ts(RtpSession *session, int millisecs){
01531 PayloadType *payload;
01532 payload =
01533 rtp_profile_get_payload (session->snd.profile,
01534 session->snd.pt);
01535 if (payload == NULL)
01536 {
01537 ortp_warning
01538 ("rtp_session_ts_to_t: use of unsupported payload type %d.", session->snd.pt);
01539 return 0;
01540 }
01541
01542 return (uint32_t) (payload->clock_rate*(double) (millisecs/1000.0f));
01543 }
01544
01545
01546 uint32_t rtp_session_ts_to_time (RtpSession * session, uint32_t timestamp)
01547 {
01548 PayloadType *payload;
01549 payload =
01550 rtp_profile_get_payload (session->snd.profile,
01551 session->snd.pt);
01552 if (payload == NULL)
01553 {
01554 ortp_warning
01555 ("rtp_session_ts_to_t: use of unsupported payload type %d.", session->snd.pt);
01556 return 0;
01557 }
01558
01559 return (uint32_t) (1000.0 *
01560 ((double) timestamp /
01561 (double) payload->clock_rate));
01562 }
01563
01564
01565
01566 void rtp_session_process (RtpSession * session, uint32_t time, RtpScheduler *sched)
01567 {
01568 wait_point_lock(&session->snd.wp);
01569 if (wait_point_check(&session->snd.wp,time)){
01570 session_set_set(&sched->w_sessions,session);
01571 wait_point_wakeup(&session->snd.wp);
01572 }
01573 wait_point_unlock(&session->snd.wp);
01574
01575 wait_point_lock(&session->rcv.wp);
01576 if (wait_point_check(&session->rcv.wp,time)){
01577 session_set_set(&sched->r_sessions,session);
01578 wait_point_wakeup(&session->rcv.wp);
01579 }
01580 wait_point_unlock(&session->rcv.wp);
01581 }
01582