OpenMAXBellagio 0.9.3
omx_base_sink.c
Go to the documentation of this file.
00001 
00028 #include <omxcore.h>
00029 #include <omx_base_sink.h>
00030 
00031 OMX_ERRORTYPE omx_base_sink_Constructor(OMX_COMPONENTTYPE *openmaxStandComp,OMX_STRING cComponentName) {
00032   OMX_ERRORTYPE err = OMX_ErrorNone;
00033   omx_base_sink_PrivateType* omx_base_sink_Private;
00034 
00035   if (openmaxStandComp->pComponentPrivate) {
00036     omx_base_sink_Private = (omx_base_sink_PrivateType*)openmaxStandComp->pComponentPrivate;
00037   } else {
00038     omx_base_sink_Private = calloc(1,sizeof(omx_base_sink_PrivateType));
00039     if (!omx_base_sink_Private) {
00040       return OMX_ErrorInsufficientResources;
00041     }
00042   }
00043 
00044   // we could create our own port structures here
00045   // fixme maybe the base class could use a "port factory" function pointer?
00046   err = omx_base_component_Constructor(openmaxStandComp,cComponentName);
00047 
00048   /* here we can override whatever defaults the base_component constructor set
00049   * e.g. we can override the function pointers in the private struct  */
00050   omx_base_sink_Private = openmaxStandComp->pComponentPrivate;
00051 
00052   omx_base_sink_Private->BufferMgmtFunction = omx_base_sink_BufferMgmtFunction;
00053 
00054   return err;
00055 }
00056 
00057 OMX_ERRORTYPE omx_base_sink_Destructor(OMX_COMPONENTTYPE *openmaxStandComp)
00058 {
00059   return omx_base_component_Destructor(openmaxStandComp);
00060 }
00061 
00067 void* omx_base_sink_BufferMgmtFunction (void* param) {
00068   OMX_COMPONENTTYPE* openmaxStandComp = (OMX_COMPONENTTYPE*)param;
00069   omx_base_component_PrivateType* omx_base_component_Private  = (omx_base_component_PrivateType*)openmaxStandComp->pComponentPrivate;
00070   omx_base_sink_PrivateType*      omx_base_sink_Private       = (omx_base_sink_PrivateType*)omx_base_component_Private;
00071   omx_base_PortType               *pInPort                    = (omx_base_PortType *)omx_base_sink_Private->ports[OMX_BASE_SINK_INPUTPORT_INDEX];
00072   tsem_t*                         pInputSem                   = pInPort->pBufferSem;
00073   queue_t*                        pInputQueue                 = pInPort->pBufferQueue;
00074   OMX_BUFFERHEADERTYPE*           pInputBuffer                = NULL;
00075   OMX_COMPONENTTYPE*              target_component;
00076   OMX_BOOL                        isInputBufferNeeded         = OMX_TRUE;
00077   int                             inBufExchanged              = 0;
00078 
00079   omx_base_sink_Private->bellagioThreads->nThreadBufferMngtID = (long int)syscall(__NR_gettid);
00080   DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s the thread ID is %i\n", __func__, (int)omx_base_sink_Private->bellagioThreads->nThreadBufferMngtID);
00081 
00082   DEBUG(DEB_LEV_FUNCTION_NAME, "In %s \n", __func__);
00083   while(omx_base_component_Private->state == OMX_StateIdle || omx_base_component_Private->state == OMX_StateExecuting ||  omx_base_component_Private->state == OMX_StatePause ||
00084     omx_base_component_Private->transientState == OMX_TransStateLoadedToIdle){
00085 
00086     /*Wait till the ports are being flushed*/
00087     pthread_mutex_lock(&omx_base_sink_Private->flush_mutex);
00088     while( PORT_IS_BEING_FLUSHED(pInPort)) {
00089       pthread_mutex_unlock(&omx_base_sink_Private->flush_mutex);
00090 
00091       if(isInputBufferNeeded==OMX_FALSE) {
00092         pInPort->ReturnBufferFunction(pInPort,pInputBuffer);
00093         inBufExchanged--;
00094         pInputBuffer=NULL;
00095         isInputBufferNeeded=OMX_TRUE;
00096         DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning input buffer\n");
00097       }
00098       DEBUG(DEB_LEV_FULL_SEQ, "In %s signalling flush all condition \n", __func__);
00099 
00100       tsem_up(omx_base_sink_Private->flush_all_condition);
00101       tsem_down(omx_base_sink_Private->flush_condition);
00102       pthread_mutex_lock(&omx_base_sink_Private->flush_mutex);
00103     }
00104     pthread_mutex_unlock(&omx_base_sink_Private->flush_mutex);
00105 
00106     /*No buffer to process. So wait here*/
00107     if((pInputSem->semval==0 && isInputBufferNeeded==OMX_TRUE ) &&
00108       (omx_base_sink_Private->state != OMX_StateLoaded && omx_base_sink_Private->state != OMX_StateInvalid)) {
00109       DEBUG(DEB_LEV_SIMPLE_SEQ, "Waiting for input buffer \n");
00110       tsem_down(omx_base_sink_Private->bMgmtSem);
00111     }
00112 
00113     if(omx_base_sink_Private->state == OMX_StateLoaded || omx_base_sink_Private->state == OMX_StateInvalid) {
00114       DEBUG(DEB_LEV_FULL_SEQ, "In %s Buffer Management Thread is exiting\n",__func__);
00115       break;
00116     }
00117 
00118     DEBUG(DEB_LEV_FULL_SEQ, "Waiting for input buffer semval=%d in %s\n",pInputSem->semval, __func__);
00119     if(pInputSem->semval>0 && isInputBufferNeeded==OMX_TRUE ) {
00120       tsem_down(pInputSem);
00121       if(pInputQueue->nelem>0){
00122         inBufExchanged++;
00123         isInputBufferNeeded=OMX_FALSE;
00124         pInputBuffer = dequeue(pInputQueue);
00125         if(pInputBuffer == NULL){
00126           DEBUG(DEB_LEV_ERR, "Had NULL input buffer!!\n");
00127           break;
00128         }
00129       }
00130     }
00131 
00132     if(isInputBufferNeeded==OMX_FALSE) {
00133         if((pInputBuffer->nFlags & OMX_BUFFERFLAG_EOS) ==OMX_BUFFERFLAG_EOS) {
00134         DEBUG(DEB_LEV_SIMPLE_SEQ, "Detected EOS flags in input buffer\n");
00135 
00136         (*(omx_base_component_Private->callbacks->EventHandler))
00137           (openmaxStandComp,
00138           omx_base_component_Private->callbackData,
00139           OMX_EventBufferFlag, /* The command was completed */
00140           0, /* The commands was a OMX_CommandStateSet */
00141           pInputBuffer->nFlags, /* The state has been changed in message->messageParam2 */
00142           NULL);
00143         pInputBuffer->nFlags=0;
00144       }
00145 
00146       target_component=(OMX_COMPONENTTYPE*)pInputBuffer->hMarkTargetComponent;
00147       if(target_component==(OMX_COMPONENTTYPE *)openmaxStandComp) {
00148         /*Clear the mark and generate an event*/
00149         (*(omx_base_component_Private->callbacks->EventHandler))
00150           (openmaxStandComp,
00151           omx_base_component_Private->callbackData,
00152           OMX_EventMark, /* The command was completed */
00153           1, /* The commands was a OMX_CommandStateSet */
00154           0, /* The state has been changed in message->messageParam2 */
00155           pInputBuffer->pMarkData);
00156       } else if(pInputBuffer->hMarkTargetComponent!=NULL){
00157         /*If this is not the target component then pass the mark*/
00158         DEBUG(DEB_LEV_FULL_SEQ, "Can't Pass Mark. This is a Sink!!\n");
00159       }
00160 
00161       if((omx_base_sink_Private->state == OMX_StateExecuting) || (omx_base_sink_Private->state == OMX_StateIdle)) {
00162         if ((omx_base_sink_Private->BufferMgmtCallback && pInputBuffer->nFilledLen > 0)
00163                 || (pInputBuffer->nFlags)){
00164           (*(omx_base_sink_Private->BufferMgmtCallback))(openmaxStandComp, pInputBuffer);
00165         }
00166         else {
00167           /*If no buffer management call back the explicitly consume input buffer*/
00168           pInputBuffer->nFilledLen = 0;
00169         }
00170       } else {
00171         DEBUG(DEB_LEV_ERR, "In %s Received Buffer in non-Executing State(%s) TrState (%s)\n",
00172           __func__, stateName(omx_base_sink_Private->state),
00173           transientStateName(omx_base_component_Private->transientState));
00174         if(OMX_TransStateExecutingToIdle == omx_base_component_Private->transientState ||
00175            OMX_TransStatePauseToIdle == omx_base_component_Private->transientState) {
00176           pInputBuffer->nFilledLen = 0;
00177         }
00178       }
00179       /*Input Buffer has been completely consumed. So, get new input buffer*/
00180 
00181       if(omx_base_sink_Private->state==OMX_StatePause && !PORT_IS_BEING_FLUSHED(pInPort)) {
00182         /*Waiting at paused state*/
00183         tsem_wait(omx_base_sink_Private->bStateSem);
00184       }
00185 
00186       /*Input Buffer has been completely consumed. So, return input buffer*/
00187       if(pInputBuffer->nFilledLen==0) {
00188         pInPort->ReturnBufferFunction(pInPort,pInputBuffer);
00189         inBufExchanged--;
00190         pInputBuffer=NULL;
00191         isInputBufferNeeded = OMX_TRUE;
00192       }
00193 
00194     }
00195   }
00196   DEBUG(DEB_LEV_SIMPLE_SEQ,"Exiting Buffer Management Thread\n");
00197   return NULL;
00198 }
00199 
00206 void* omx_base_sink_twoport_BufferMgmtFunction (void* param) {
00207   OMX_COMPONENTTYPE* openmaxStandComp = (OMX_COMPONENTTYPE*)param;
00208   omx_base_component_PrivateType* omx_base_component_Private=(omx_base_component_PrivateType*)openmaxStandComp->pComponentPrivate;
00209   omx_base_sink_PrivateType* omx_base_sink_Private = (omx_base_sink_PrivateType*)omx_base_component_Private;
00210   omx_base_PortType *pInPort[2];
00211   tsem_t* pInputSem[2];
00212   queue_t* pInputQueue[2];
00213   OMX_BUFFERHEADERTYPE* pInputBuffer[2];
00214   OMX_COMPONENTTYPE* target_component;
00215   OMX_BOOL isInputBufferNeeded[2];
00216   int i,outBufExchanged[2];
00217 
00218   pInPort[0]=(omx_base_PortType *)omx_base_sink_Private->ports[OMX_BASE_SINK_INPUTPORT_INDEX];
00219   pInPort[1]=(omx_base_PortType *)omx_base_sink_Private->ports[OMX_BASE_SINK_INPUTPORT_INDEX_1];
00220   pInputSem[0] = pInPort[0]->pBufferSem;
00221   pInputSem[1] = pInPort[1]->pBufferSem;
00222   pInputQueue[0] = pInPort[0]->pBufferQueue;
00223   pInputQueue[1] = pInPort[1]->pBufferQueue;
00224   pInputBuffer[1]= pInputBuffer[0]=NULL;
00225   isInputBufferNeeded[0]=isInputBufferNeeded[1]=OMX_TRUE;
00226   outBufExchanged[0]=outBufExchanged[1]=0;
00227 
00228   DEBUG(DEB_LEV_FUNCTION_NAME, "In %s\n", __func__);
00229   while(omx_base_sink_Private->state == OMX_StateIdle || omx_base_sink_Private->state == OMX_StateExecuting ||  omx_base_sink_Private->state == OMX_StatePause ||
00230     omx_base_sink_Private->transientState == OMX_TransStateLoadedToIdle){
00231 
00232     /*Wait till the ports are being flushed*/
00233     pthread_mutex_lock(&omx_base_sink_Private->flush_mutex);
00234     while( PORT_IS_BEING_FLUSHED(pInPort[0]) ||
00235            PORT_IS_BEING_FLUSHED(pInPort[1])) {
00236       pthread_mutex_unlock(&omx_base_sink_Private->flush_mutex);
00237 
00238       DEBUG(DEB_LEV_FULL_SEQ, "In %s 1 signalling flush all cond iE=%d,iF=%d,oE=%d,oF=%d iSemVal=%d,oSemval=%d\n",
00239         __func__,outBufExchanged[0],isInputBufferNeeded[0],outBufExchanged[1],isInputBufferNeeded[1],pInputSem[0]->semval,pInputSem[1]->semval);
00240 
00241       if(isInputBufferNeeded[1]==OMX_FALSE && PORT_IS_BEING_FLUSHED(pInPort[1])) {
00242         pInPort[1]->ReturnBufferFunction(pInPort[1],pInputBuffer[1]);
00243         outBufExchanged[1]--;
00244         pInputBuffer[1]=NULL;
00245         isInputBufferNeeded[1]=OMX_TRUE;
00246         DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning Input 1 buffer\n");
00247       }
00248 
00249       if(isInputBufferNeeded[0]==OMX_FALSE && PORT_IS_BEING_FLUSHED(pInPort[0])) {
00250         pInPort[0]->ReturnBufferFunction(pInPort[0],pInputBuffer[0]);
00251         outBufExchanged[0]--;
00252         pInputBuffer[0]=NULL;
00253         isInputBufferNeeded[0]=OMX_TRUE;
00254         DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning Input 0 buffer\n");
00255       }
00256 
00257       DEBUG(DEB_LEV_FULL_SEQ, "In %s 2 signalling flush all cond iE=%d,iF=%d,oE=%d,oF=%d iSemVal=%d,oSemval=%d\n",
00258         __func__,outBufExchanged[0],isInputBufferNeeded[0],outBufExchanged[1],isInputBufferNeeded[1],pInputSem[0]->semval,pInputSem[1]->semval);
00259 
00260       tsem_up(omx_base_sink_Private->flush_all_condition);
00261       tsem_down(omx_base_sink_Private->flush_condition);
00262       pthread_mutex_lock(&omx_base_sink_Private->flush_mutex);
00263     }
00264     pthread_mutex_unlock(&omx_base_sink_Private->flush_mutex);
00265 
00266     /*No buffer to process. So wait here*/
00267     if((isInputBufferNeeded[0]==OMX_TRUE && pInputSem[0]->semval==0) &&
00268       (omx_base_sink_Private->state != OMX_StateLoaded && omx_base_sink_Private->state != OMX_StateInvalid)) {
00269       //Signalled from EmptyThisBuffer or FillThisBuffer or some thing else
00270       DEBUG(DEB_LEV_FULL_SEQ, "Waiting for next Input buffer 0\n");
00271       tsem_down(omx_base_sink_Private->bMgmtSem);
00272 
00273     }
00274     if(omx_base_sink_Private->state == OMX_StateLoaded || omx_base_sink_Private->state == OMX_StateInvalid) {
00275       DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s Buffer Management Thread is exiting\n",__func__);
00276       break;
00277     }
00278     if((isInputBufferNeeded[1]==OMX_TRUE && pInputSem[1]->semval==0) &&
00279       (omx_base_sink_Private->state != OMX_StateLoaded && omx_base_sink_Private->state != OMX_StateInvalid) &&
00280        !(PORT_IS_BEING_FLUSHED(pInPort[0]) || PORT_IS_BEING_FLUSHED(pInPort[1]))) {
00281       //Signalled from EmptyThisBuffer or FillThisBuffer or some thing else
00282       DEBUG(DEB_LEV_FULL_SEQ, "Waiting for next Input buffer 1\n");
00283       tsem_down(omx_base_sink_Private->bMgmtSem);
00284 
00285     }
00286     if(omx_base_sink_Private->state == OMX_StateLoaded || omx_base_sink_Private->state == OMX_StateInvalid) {
00287       DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s Buffer Management Thread is exiting\n",__func__);
00288       break;
00289     }
00290 
00291     DEBUG(DEB_LEV_SIMPLE_SEQ, "Waiting for Input buffer 0 semval=%d \n",pInputSem[0]->semval);
00292     if(pInputSem[0]->semval>0 && isInputBufferNeeded[0]==OMX_TRUE ) {
00293       tsem_down(pInputSem[0]);
00294       if(pInputQueue[0]->nelem>0){
00295         outBufExchanged[0]++;
00296         isInputBufferNeeded[0]=OMX_FALSE;
00297         pInputBuffer[0] = dequeue(pInputQueue[0]);
00298         if(pInputBuffer[0] == NULL){
00299           DEBUG(DEB_LEV_ERR, "Had NULL Input buffer!!\n");
00300           break;
00301         }
00302       }
00303     }
00304     /*When we have input buffer to process then get one Input buffer*/
00305     if(pInputSem[1]->semval>0 && isInputBufferNeeded[1]==OMX_TRUE) {
00306       tsem_down(pInputSem[1]);
00307       DEBUG(DEB_LEV_FULL_SEQ, "Wait over for Input buffer 1 semval=%d \n",pInputSem[1]->semval);
00308       if(pInputQueue[1]->nelem>0){
00309         outBufExchanged[1]++;
00310         isInputBufferNeeded[1]=OMX_FALSE;
00311         pInputBuffer[1] = dequeue(pInputQueue[1]);
00312         if(pInputBuffer[1] == NULL){
00313           DEBUG(DEB_LEV_ERR, "Had NULL Input buffer!! op is=%d,iq=%d\n",pInputSem[1]->semval,pInputQueue[1]->nelem);
00314           break;
00315         }
00316       }
00317     }
00318 
00319     for(i=0;i < (omx_base_component_Private->sPortTypesParam[OMX_PortDomainAudio].nPorts  +
00320                  omx_base_component_Private->sPortTypesParam[OMX_PortDomainVideo].nPorts +
00321                  omx_base_component_Private->sPortTypesParam[OMX_PortDomainImage].nPorts +
00322                  omx_base_component_Private->sPortTypesParam[OMX_PortDomainOther].nPorts);i++) {
00323 
00324       if(omx_base_sink_Private->ports[i]->sPortParam.eDomain != OMX_PortDomainOther){ /* clock ports are not to be processed */
00325         /*Process Input buffer of Port i */
00326         if(isInputBufferNeeded[i]==OMX_FALSE) {
00327 
00328           /*Pass the Mark to all outgoing buffers*/
00329           if(omx_base_sink_Private->pMark.hMarkTargetComponent != NULL){
00330             pInputBuffer[i]->hMarkTargetComponent = omx_base_sink_Private->pMark.hMarkTargetComponent;
00331             pInputBuffer[i]->pMarkData            = omx_base_sink_Private->pMark.pMarkData;
00332           }
00333 
00334           target_component=(OMX_COMPONENTTYPE*)pInputBuffer[i]->hMarkTargetComponent;
00335           if(target_component==(OMX_COMPONENTTYPE *)openmaxStandComp) {
00336             /*Clear the mark and generate an event*/
00337             (*(omx_base_sink_Private->callbacks->EventHandler))
00338               (openmaxStandComp,
00339               omx_base_sink_Private->callbackData,
00340               OMX_EventMark, /* The command was completed */
00341               1, /* The commands was a OMX_CommandStateSet */
00342               i, /* The state has been changed in message->messageParam2 */
00343               pInputBuffer[i]->pMarkData);
00344           } else if(pInputBuffer[i]->hMarkTargetComponent!=NULL){
00345             /*If this is not the target component then pass the mark*/
00346             //pInputBuffer[i]->pMarkData=NULL;
00347             DEBUG(DEB_LEV_FULL_SEQ, "Pass Mark. This is a Source!!\n");
00348           }
00349 
00350           if(omx_base_sink_Private->state == OMX_StateExecuting)  {
00351             if (omx_base_sink_Private->BufferMgmtCallback && pInputBuffer[i]->nFilledLen > 0) {
00352               (*(omx_base_sink_Private->BufferMgmtCallback))(openmaxStandComp, pInputBuffer[i]);
00353             } else {
00354               /*If no buffer management call back then don't produce any Input buffer*/
00355               pInputBuffer[i]->nFilledLen = 0;
00356             }
00357           } else {
00358             DEBUG(DEB_LEV_ERR, "In %s Received Buffer in non-Executing State(%x)\n", __func__, (int)omx_base_sink_Private->state);
00359 
00360             if(OMX_TransStateExecutingToIdle == omx_base_component_Private->transientState ||
00361                OMX_TransStatePauseToIdle == omx_base_component_Private->transientState) {
00362               pInputBuffer[i]->nFilledLen = 0;
00363             }
00364           }
00365 
00366           if((pInputBuffer[i]->nFlags & OMX_BUFFERFLAG_EOS) == OMX_BUFFERFLAG_EOS && pInputBuffer[i]->nFilledLen==0) {
00367             DEBUG(DEB_LEV_FULL_SEQ, "Detected EOS flags in input buffer filled len=%d\n", (int)pInputBuffer[i]->nFilledLen);
00368             (*(omx_base_sink_Private->callbacks->EventHandler))
00369               (openmaxStandComp,
00370               omx_base_sink_Private->callbackData,
00371               OMX_EventBufferFlag, /* The command was completed */
00372               i, /* The commands was a OMX_CommandStateSet */
00373               pInputBuffer[i]->nFlags, /* The state has been changed in message->messageParam2 */
00374               NULL);
00375           }
00376           if(omx_base_sink_Private->state==OMX_StatePause && !(PORT_IS_BEING_FLUSHED(pInPort[0]) || PORT_IS_BEING_FLUSHED(pInPort[1]))) {
00377             /*Waiting at paused state*/
00378             tsem_wait(omx_base_component_Private->bStateSem);
00379           }
00380 
00381            /*Input Buffer has been produced or EOS. So, return Input buffer and get new buffer*/
00382           if(pInputBuffer[i]->nFilledLen ==0 || ((pInputBuffer[i]->nFlags & OMX_BUFFERFLAG_EOS) == OMX_BUFFERFLAG_EOS)){
00383             pInPort[i]->ReturnBufferFunction(pInPort[i],pInputBuffer[i]);
00384             outBufExchanged[i]--;
00385             pInputBuffer[i]=NULL;
00386             isInputBufferNeeded[i]=OMX_TRUE;
00387           }
00388         }
00389       }
00390     }
00391 
00392     /*Clear the Mark*/
00393     if(omx_base_sink_Private->pMark.hMarkTargetComponent != NULL){
00394       omx_base_sink_Private->pMark.hMarkTargetComponent = NULL;
00395       omx_base_sink_Private->pMark.pMarkData            = NULL;
00396     }
00397   }
00398   DEBUG(DEB_LEV_SIMPLE_SEQ,"Exiting Buffer Management Thread\n");
00399   return NULL;
00400 }