編輯:關於Android編程
我們在開發直播過程中,會需要用到直播推送端,推送端將直播的音視頻數據推送到流媒體服務器或者cdn,再由流媒體服務器/CDN進行視頻的轉發和分發,提供給客戶端進行觀看。由於直播推送端會存在於各種不同的網絡環境下面:有線、無線、3G、4G、衛星信號等等,在這些網絡條件下,如何做到能夠做到靈活、低延時直播,我們這個時候就需要引入發送緩沖區和丟幀策略兩種功能,保證推送的實時和數據的有效;
環形緩沖區(ring buffer),是一種數據結構用於表示一個固定尺寸、頭尾相連的緩沖區,適合緩存數據流。
圓形緩沖區的一個有用特性是:當一個數據元素被用掉後,其余數據元素不需要移動其存儲位置。相反,一個非圓形緩沖區(例如一個普通的隊列)在用掉一個數據元素後,其余數據元素需要向前搬移。換句話說,圓形緩沖區適合實現先進先出緩沖區,而非圓形緩沖區適合後進先出緩沖區。
圓形緩沖區適合於事先明確了緩沖區的最大容量的情形。擴展一個圓形緩沖區的容量,需要搬移其中的數據。因此一個緩沖區如果需要經常調整其容量,用鏈表實現更為合適。
寫操作覆蓋圓形緩沖區中未被處理的數據在某些情況下是允許的。特別是在多媒體處理時。例如,音頻的生產者可以覆蓋掉聲卡尚未來得及處理的音頻數據。
一個圓形緩沖區最初為空並有預定的長度。例如,這是一個具有七個元素空間的圓形緩沖區,其中底部的單線與箭頭表示“頭尾相接”形成一個圓形地址空間:
假定1被寫入緩沖區中部(對於圓形緩沖區來說,最初的寫入位置在哪裡是無關緊要的):
再寫入2個元素,分別是2 & 3 — 被追加在1之後:
如果兩個元素被處理,那麼是緩沖區中最老的兩個元素被卸載。在本例中,1 & 2被卸載,緩沖區中只剩下3:
如果緩沖區中有7個元素,則是滿的:
如果緩沖區是滿的,又要寫入新的數據,一種策略是覆蓋掉最老的數據。此例中,2個新數據— A & B — 寫入,覆蓋了3 & 4:
也可以采取其他策略,禁止覆蓋緩沖區的數據,采取返回一個錯誤碼或者拋出異常。
最終,如果從緩沖區中卸載2個數據,不是3 & 4 而是 5 & 6 。因為 A & B 已經覆蓋了3 & 4:
由於計算機內存是線性地址空間,因此圓形緩沖區需要特別的設計才可以從邏輯上實現。
讀指針與寫指針
一般的,圓形緩沖區需要4個指針:
讀指針、寫指針可以用整型值來表示。
下例為一個未滿的緩沖區的讀寫指針:
下例為一個滿的緩沖區的讀寫指針:
EasyPusher內部也同樣采用的環形緩沖的設計方法,將音視頻數據都同時存入緩沖區,再由發送者從緩沖區中獲取數據進行發送,這樣就形成了一個異步、生產者、消費者的過程,上層調用者只需要將采集、編碼後的音視頻Frame數據Push到SDK的緩沖區中,即可返回繼續進行上層邏輯操作,SDK內部的發送線程則從緩沖區中不斷獲取音視頻數據推送到流媒體服務器;
有緩沖區,勢必會出現緩沖區滿的情況,當這個時候,就需要合理的丟幀策略了,我們這裡采用的非關鍵幀丟幀策略跟《EasyDarwin開源流媒體服務器低延時直播之轉發緩存跟進算法》博客中的原理是一致的,當緩沖區新增一幀進入緩沖區時,此時緩沖區已經滿了,那麼我們就要從緩沖區的讀指針部分開始丟棄非關鍵幀P幀,直到遇到I幀就停止丟幀,再將新來的數據幀增加進入緩沖區,這樣既可以保證一個較合理的直播推送緩沖區,確保推送延時可控,又能夠保證不會出現花屏的問題;
/* Copyright (c) 2012-2016 EasyDarwin.ORG. All rights reserved. Github: https://github.com/EasyDarwin WEChat: EasyDarwin Website: http://www.easydarwin.org Author: [email protected] */ #include "ssqueue.h" #include#include #include "trace.h" int SSQ_Init(SS_QUEUE_OBJ_T *pObj, unsigned int sharememory, unsigned int channelid, wchar_t *sharename, unsigned int bufsize, unsigned int prerecordsecs, unsigned int createsharememory) { wchar_t wszHeaderName[36] = {0,}; wchar_t wszFramelistName[36] = {0,}; wchar_t wszDataName[36] = {0,}; if (NULL==pObj) return -1; if (createsharememory==0x01 && bufsize<1) return -1; if ( (sharememory==0x01) && (NULL==sharename || (0==wcscmp(sharename, TEXT("\0")))) ) return -1; memset(pObj, 0x00, sizeof(SS_QUEUE_OBJ_T)); pObj->channelid = channelid; pObj->shareinfo.id = channelid; wcscpy(pObj->shareinfo.name, sharename); wchar_t wszMutexName[36] = {0,}; wsprintf(wszMutexName, TEXT("%s%d_mutex"), sharename, channelid); pObj->hMutex = OpenMutex(NULL, FALSE, wszMutexName); if (NULL == pObj->hMutex) { pObj->hMutex = CreateMutex(NULL, FALSE, wszMutexName); if (NULL == pObj->hMutex) return -1; } //Create Header map #ifdef _WIN32 if (sharememory == 0x01) { wsprintf(wszHeaderName, TEXT("%s%d_h"), sharename, channelid); pObj->hSSHeader = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, wszHeaderName); if (NULL==pObj->hSSHeader && createsharememory==0x01) { pObj->hSSHeader = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE|SEC_COMMIT, 0, sizeof(SS_HEADER_T), wszHeaderName); if (NULL==pObj->hSSHeader || pObj->hSSHeader==INVALID_HANDLE_VALUE) { return -1; } } pObj->pQueHeader = (SS_HEADER_T*)MapViewOfFile(pObj->hSSHeader, FILE_MAP_READ|FILE_MAP_WRITE, 0, 0, 0); if (createsharememory==0x01) { if (pObj->pQueHeader->bufsize < 1) { memset(pObj->pQueHeader, 0x00, sizeof(SS_HEADER_T)); pObj->pQueHeader->bufsize = bufsize; } } else if (NULL==pObj->pQueHeader) { return -1; } else { bufsize = pObj->pQueHeader->bufsize; } } else { pObj->pQueHeader = new SS_HEADER_T; memset(pObj->pQueHeader, 0x00, sizeof(SS_HEADER_T)); } //========================================== //Create frame list map if (prerecordsecs > 0) { wsprintf(wszFramelistName, TEXT("%s%d_f"), sharename, channelid); unsigned int nFramelistNum = prerecordsecs * 30; //每秒30幀 unsigned int nFrameQueSize = nFramelistNum*sizeof(FRAMEINFO_LIST_T); if (sharememory == 0x01) { pObj->hSSFrameList = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, wszFramelistName); if (NULL==pObj->hSSFrameList && createsharememory==0x01) { pObj->hSSFrameList = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE|SEC_COMMIT, 0, nFrameQueSize, wszFramelistName); if (NULL==pObj->hSSFrameList || pObj->hSSFrameList==INVALID_HANDLE_VALUE) { return -1; } } pObj->pFrameinfoList = (FRAMEINFO_LIST_T*)MapViewOfFile(pObj->hSSFrameList, FILE_MAP_READ|FILE_MAP_WRITE, 0, 0, 0); if (createsharememory==0x01) { memset(pObj->pFrameinfoList, 0x00, nFrameQueSize); pObj->pQueHeader->framelistNum = nFramelistNum; } else if (NULL==pObj->hSSFrameList) { return -1; } } else { pObj->pFrameinfoList = new FRAMEINFO_LIST_T[nFramelistNum]; memset(&pObj->pFrameinfoList[0], 0x00, sizeof(FRAMEINFO_LIST_T)*nFramelistNum); pObj->pQueHeader->framelistNum = nFramelistNum; } } //Create data map if (sharememory == 0x01) { wsprintf(wszDataName, TEXT("%s%d_b"), sharename, channelid); pObj->hSSData = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, wszDataName); if (NULL==pObj->hSSData && createsharememory==0x01) { pObj->hSSData = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE|SEC_COMMIT, 0, bufsize, wszDataName); } if (NULL == pObj->hSSData || pObj->hSSData==INVALID_HANDLE_VALUE) { return -1; } pObj->pQueData = (char*)MapViewOfFile(pObj->hSSData, FILE_MAP_READ|FILE_MAP_WRITE, 0, 0, 0); } else { pObj->pQueData = new char [bufsize]; pObj->pQueHeader->bufsize = bufsize; } if (createsharememory==0x01) { //memset(pQueHeader, 0x00, sizeof(SS_HEADER_T)); memset(pObj->pQueData, 0x00, bufsize); } #else int ret = shm_create((SYNC_VID_SHM_KEY<<8)|channelid, &pObj->shmHdrid, sizeof(SS_HEADER_T), (char**)&pObj->pQueHeader); if (ret < 0) { return -1; } SSQ_TRACE("[%d]pQueHeader: %d\n", (SYNC_VID_SHM_KEY<<8)|channelid, pObj->shmHdrid); ret = shm_create((SYNC_VID_SHM_KEY<<16)|channelid, &pObj->shmDatid, bufsize, (char**)&pObj->pQueData); if (ret < 0) { shm_delete(&pObj->shmHdrid, (char*)pObj->pQueHeader); return -1; } pObj->pQueHeader->bufsize = bufsize; SSQ_TRACE("[%d]pQueData: %d\n", (SYNC_VID_SHM_KEY<<16)|channelid, pObj->shmDatid); #endif return 0; } int SSQ_Deinit(SS_QUEUE_OBJ_T *pObj) { if (NULL == pObj) return -1; #ifdef _WIN32 if (NULL != pObj->hMutex) { CloseHandle(pObj->hMutex); pObj->hMutex = NULL; } if (NULL != pObj->hSSHeader) { if (! UnmapViewOfFile(pObj->pQueHeader)) { } pObj->pQueHeader = NULL; CloseHandle(pObj->hSSHeader); pObj->hSSHeader = NULL; } if (NULL != pObj->pQueHeader) { delete []pObj->pQueHeader; pObj->pQueHeader = NULL; } if (NULL != pObj->hSSData) { if (! UnmapViewOfFile(pObj->pQueData)) { } pObj->pQueData = NULL; CloseHandle(pObj->hSSData); pObj->hSSData = NULL; } if (NULL != pObj->pQueData) { delete []pObj->pQueData; pObj->pQueData = NULL; } if (NULL != pObj->hSSFrameList) { if (! UnmapViewOfFile(pObj->pFrameinfoList)) { } pObj->pFrameinfoList = NULL; CloseHandle(pObj->hSSFrameList); pObj->hSSFrameList = NULL; } if (NULL != pObj->pFrameinfoList) { delete []pObj->pFrameinfoList; pObj->pFrameinfoList = NULL; } #else if (pObj->shmHdrid>0 && pObj->pQueHeader!=NULL) { shm_delete(&pObj->shmHdrid, (char*)pObj->pQueHeader); } if (pObj->shmDatid>0 && pObj->pQueData!=NULL) { shm_delete(&pObj->shmDatid, (char*)pObj->pQueData); pObj->pQueData = NULL; } #endif return 0; } int SSQ_SetClearFlag(SS_QUEUE_OBJ_T *pObj, unsigned int _flag) { if (NULL == pObj) return -1; if (NULL==pObj->pQueData) return -1; pObj->pQueHeader->clear_flag = _flag; return 0; } int SSQ_Clear(SS_QUEUE_OBJ_T *pObj) { if (NULL == pObj) return -1; if (NULL==pObj->pQueData) return -1; //WaitForSingleObject(pObj->hMutex, INFINITE); //Lock(); pObj->pQueHeader->writepos = 0; pObj->pQueHeader->readpos = 0; pObj->pQueHeader->totalsize= 0; pObj->pQueHeader->videoframes = 0; //視頻幀數 pObj->pQueHeader->maxframeno = 0; pObj->pQueHeader->frameno = 0; memset(pObj->pQueData, 0x00, pObj->pQueHeader->bufsize); //ReleaseMutex(pObj->hMutex); //Unlock(); return 0; } int SSQ_AddFrameInfo(SS_QUEUE_OBJ_T *pObj, unsigned int _pos, MEDIA_FRAME_INFO *frameinfo) { if (NULL == pObj) return -1; if (NULL == pObj->pQueHeader) return -1; if (NULL == pObj->pFrameinfoList) return -1; if ( pObj->pQueHeader->frameno+1 > pObj->pQueHeader->framelistNum) { memmove((char*)pObj->pFrameinfoList, (char*)pObj->pFrameinfoList+sizeof(FRAMEINFO_LIST_T), sizeof(FRAMEINFO_LIST_T)*pObj->pQueHeader->framelistNum-1); pObj->pQueHeader->frameno --; pObj->pQueHeader->maxframeno = pObj->pQueHeader->frameno+1; } pObj->pFrameinfoList[pObj->pQueHeader->frameno].pos = pObj->pQueHeader->writepos; #ifdef _DEBUG1 static int nTimestamp = 0; pObj->pQueHeader->pFrameinfoList[pObj->pQueHeader->frameno].timestamp = ++nTimestamp; #else pObj->pFrameinfoList[pObj->pQueHeader->frameno].timestamp_sec = frameinfo->timestamp_sec; pObj->pFrameinfoList[pObj->pQueHeader->frameno].rtp_timestamp = frameinfo->timestamp_sec*1000+frameinfo->timestamp_usec/1000; #endif //SSQ_TRACE("幀號: %d\n", pObj->pQueHeader->frameno); pObj->pQueHeader->frameno ++; pObj->pQueHeader->maxframeno = pObj->pQueHeader->frameno; /* if ( pObj->pQueHeader->frameno >= pObj->pQueHeader->framelistNum) { SSQ_TRACE("最大幀: %d\n", pObj->pQueHeader->maxframeno); pObj->pQueHeader->frameno = 0; } if (pObj->pQueHeader->frameno > pObj->pQueHeader->maxframeno) { pObj->pQueHeader->maxframeno = pObj->pQueHeader->frameno; } */ return 0; } int SSQ_AddData(SS_QUEUE_OBJ_T *pObj, unsigned int channelid, unsigned int mediatype, MEDIA_FRAME_INFO *frameinfo, char *pbuf) { int ret = 0; SS_BUF_T bufNode; if (NULL==pObj || NULL==frameinfo || NULL==pbuf) return -1; if (NULL == pObj->pQueData) return -1; if (NULL == pObj->pQueHeader) return -1; if (frameinfo->length < 1) { #ifdef _DEBUG SSQ_TRACE("幀長度小於1: %d\n", frameinfo->length); #endif return -1; } #ifdef _DEBUG1 SSQ_TRACE("frame: %02x%02x%02x%02x%02x\n", (unsigned char)pbuf[0], (unsigned char)pbuf[1], (unsigned char)pbuf[2], (unsigned char)pbuf[3], (unsigned char)pbuf[4]); #endif if (frameinfo->length > pObj->pQueHeader->bufsize) { SSQ_TRACE("Buffer too low.. Current Frame Size: %d\tBuffer Size: %d\n", frameinfo->length, pObj->pQueHeader->bufsize); return -1; } WaitForSingleObject(pObj->hMutex, INFINITE); //Lock if (pObj->pQueHeader->clear_flag == 0x01) { //SSQ_TRACE("Received clear signal... Clear Buffer..\n"); SSQ_TRACE("接收到清空信號. 清空緩存.. WritePos: %d\n", pObj->pQueHeader->writepos); SSQ_Clear(pObj); pObj->pQueHeader->clear_flag = 0x00; } if (pObj->pQueHeader->writepos == pObj->pQueHeader->readpos) { if (pObj->pQueHeader->totalsize < 0 || pObj->pQueHeader->videoframes < 0) { SSQ_TRACE("讀的速度==寫的速度... 數據大小異常: %d 視頻幀數:%d\n", pObj->pQueHeader->totalsize, pObj->pQueHeader->videoframes); } //Clear(); } if (sizeof(SS_BUF_T) + frameinfo->length + pObj->pQueHeader->totalsize > pObj->pQueHeader->bufsize) { SSQ_TRACE("超出緩沖區大小.. 幀長:%d\ttotalsize:%d\tbufsize:%d 緩存幀數:%d\n", frameinfo->length, pObj->pQueHeader->totalsize, pObj->pQueHeader->bufsize, pObj->pQueHeader->videoframes); ReleaseMutex(pObj->hMutex); pObj->pQueHeader->isfull = 0x01; return -1; } pObj->pQueHeader->isfull = 0x00; memset(&bufNode, 0x00, sizeof(SS_BUF_T)); memcpy(&bufNode.frameinfo, frameinfo, sizeof(MEDIA_FRAME_INFO)); bufNode.channelid = channelid;//++m_FrameTally; bufNode.mediatype = mediatype; bufNode.flag = BUF_QUE_FLAG; //_TRACE("WritePos: %d totalSize: %d\n", pQueHeader->writepos, pQueHeader->totalsize); //Lock(); //從頭到尾 寫 if ((pObj->pQueHeader->writepos + sizeof(SS_BUF_T) + frameinfo->length) <= pObj->pQueHeader->bufsize) { //copy to queue //記錄幀位置 if (mediatype==MEDIA_TYPE_VIDEO) SSQ_AddFrameInfo(pObj, pObj->pQueHeader->writepos, frameinfo); unsigned int nAdd = pObj->pQueHeader->writepos; memcpy(pObj->pQueData+nAdd, &bufNode, sizeof(SS_BUF_T)); nAdd += sizeof(SS_BUF_T); memcpy(pObj->pQueData+nAdd, pbuf, frameinfo->length); nAdd += frameinfo->length; pObj->pQueHeader->writepos = nAdd; pObj->pQueHeader->totalsize+= sizeof(SS_BUF_T); pObj->pQueHeader->totalsize+= frameinfo->length; if (mediatype==MEDIA_TYPE_VIDEO) pObj->pQueHeader->videoframes ++; //_TRACE("順序新增.. writepos: %d / %d\n", pQueHeader->writepos, pQueHeader->size); } else if (pObj->pQueHeader->writepos == pObj->pQueHeader->bufsize) //從頭開始 { //記錄幀位置 if (mediatype==MEDIA_TYPE_VIDEO) SSQ_AddFrameInfo(pObj, 0, frameinfo); memcpy(pObj->pQueData, &bufNode, sizeof(SS_BUF_T)); pObj->pQueHeader->writepos = sizeof(SS_BUF_T); memcpy(pObj->pQueData+pObj->pQueHeader->writepos, pbuf, frameinfo->length); pObj->pQueHeader->writepos += frameinfo->length; pObj->pQueHeader->totalsize= sizeof(SS_BUF_T); pObj->pQueHeader->totalsize+= frameinfo->length; if (mediatype==MEDIA_TYPE_VIDEO) pObj->pQueHeader->videoframes ++; SSQ_TRACE("從頭開始寫.. writepos: %d\n", pObj->pQueHeader->writepos); } //else if (pQueHeader->size - pQueHeader->writepos+pQueHeader->readpos >= (int)(frameinfo->length+sizeof(SS_BUF_T))) //從尾寫到頭 //else if (pObj->pQueHeader->bufsize - pObj->pQueHeader->writepos+pObj->pQueHeader->readpos >= (int)(frameinfo->length+sizeof(SS_BUF_T))) //從尾寫到頭 else// if (pObj->pQueHeader->bufsize - pObj->pQueHeader->writepos >= (int)(frameinfo->length+sizeof(SS_BUF_T))) //從尾寫到頭 { unsigned int remain = pObj->pQueHeader->bufsize - pObj->pQueHeader->writepos; //剩余空間 if (remain>=sizeof(SS_BUF_T)) { unsigned int nAdd = pObj->pQueHeader->writepos; //記錄幀位置 if (mediatype==MEDIA_TYPE_VIDEO) SSQ_AddFrameInfo(pObj, nAdd, frameinfo); //_TRACE("WritePos111: %d\n", pQueHeader->writepos); memcpy(pObj->pQueData+nAdd, &bufNode, sizeof(SS_BUF_T)); nAdd += sizeof(SS_BUF_T); //pQueHeader->totalsize+= sizeof(SS_BUF_T); //_TRACE("WritePos222: %d\n", pQueHeader->writepos+sizeof(SS_BUF_T)); remain = pObj->pQueHeader->bufsize - nAdd;//pQueHeader->writepos; //_TRACE("WritePos: %d\n", pQueHeader->writepos+sizeof(SS_BUF_T)+remain); if (remain>0) { memcpy(pObj->pQueData+nAdd, pbuf, remain); memcpy(pObj->pQueData, pbuf+remain, frameinfo->length-remain); nAdd = frameinfo->length-remain; pObj->pQueHeader->writepos = nAdd; pObj->pQueHeader->totalsize+= sizeof(SS_BUF_T); pObj->pQueHeader->totalsize+= frameinfo->length; if (pObj->pQueHeader->totalsize>pObj->pQueHeader->bufsize) { //原因: rtsp server已停止讀該隊列(當前沒有客戶端訪問) SSQ_TRACE("[RTSP Server已停止讀取該隊列]錯誤 %d > %d frameinfo->length:%d...\n", pObj->pQueHeader->totalsize, pObj->pQueHeader->bufsize, frameinfo->length); //SSQ_Clear(pObj); } else { if (mediatype==MEDIA_TYPE_VIDEO) pObj->pQueHeader->videoframes ++; //SSQ_TRACE("111Header及部分幀數據位於緩存尾, 部分幀數據位於緩存首.. remain: %d writepos: %d totalsize: %d\n", remain, pObj->pQueHeader->writepos, pObj->pQueHeader->totalsize); } } else if (remain==0) { memcpy(pObj->pQueData, pbuf, frameinfo->length); nAdd = frameinfo->length; pObj->pQueHeader->writepos = nAdd; pObj->pQueHeader->totalsize+= sizeof(SS_BUF_T); pObj->pQueHeader->totalsize+= frameinfo->length; if (mediatype==MEDIA_TYPE_VIDEO) pObj->pQueHeader->videoframes ++; if (pObj->pQueHeader->totalsize>pObj->pQueHeader->bufsize) { SSQ_TRACE("錯誤222 %d > %d frameinfo->length:%d...\n", pObj->pQueHeader->totalsize, pObj->pQueHeader->bufsize, frameinfo->length); //SSQ_Clear(pObj); } else { SSQ_TRACE("22222Header位於緩存尾,幀數據位於緩存首.. writepos: %d\n", pObj->pQueHeader->writepos); } } else { SSQ_TRACE("錯誤... 剩余空間小於0: %d\n", remain); ret = -1; } } else if (remain>0) { char *tmpbuf = (char *)&bufNode; unsigned int nAdd = pObj->pQueHeader->writepos; //記錄幀位置 if (mediatype==MEDIA_TYPE_VIDEO) SSQ_AddFrameInfo(pObj, nAdd, frameinfo); //SSQ_TRACE("ADD DATA...%d\n", nAdd); memcpy(pObj->pQueData+nAdd, tmpbuf, remain); //SSQ_TRACE("ADD DATA222...%d\n", sizeof(SS_BUF_T)-remain); memcpy(pObj->pQueData, tmpbuf+remain, sizeof(SS_BUF_T)-remain); nAdd = sizeof(SS_BUF_T)-remain; memcpy(pObj->pQueData+nAdd, pbuf, frameinfo->length); nAdd += frameinfo->length; pObj->pQueHeader->writepos = nAdd; pObj->pQueHeader->totalsize+= sizeof(SS_BUF_T); pObj->pQueHeader->totalsize+= frameinfo->length; if (pObj->pQueHeader->totalsize>pObj->pQueHeader->bufsize) { SSQ_TRACE("錯誤333 %d > %d frameinfo->length:%d...\n", pObj->pQueHeader->totalsize, pObj->pQueHeader->bufsize, frameinfo->length); //SSQ_Clear(pObj); } if (mediatype==MEDIA_TYPE_VIDEO) pObj->pQueHeader->videoframes ++; } else { ret = -1; SSQ_TRACE("ERROR...\n"); } } //else { //else if (pObj->pQueHeader->bufsize - pObj->pQueHeader->writepos+pObj->pQueHeader->readpos >= (int)(frameinfo->length+sizeof(SS_BUF_T))) //從尾寫到頭 //SSQ_TRACE("寫尾錯誤.. 未處理. 寫位置:%d / pObj->pQueHeader->bufsize 讀位置:%d\n", pObj->pQueHeader->writepos, pObj->pQueHeader->bufsize, pObj->pQueHeader->readpos); } //Unlock(); ReleaseMutex(pObj->hMutex); //SSQ_TRACE("writepos: %d\ttotalsize: %d\n", pObj->pQueHeader->writepos, pObj->pQueHeader->totalsize); #ifdef _DEBUG1 if (mediatype==MEDIA_TYPE_VIDEO) { SSQ_TRACE("==========================\n"); for (int i=0; i pQueHeader->maxframeno; i++) { SSQ_TRACE("[%d] times: %d pos: %d %02X%02X%02X%02X%02X\n", i, pObj->pQueHeader->pFrameinfoList[i].timestamp, pObj->pQueHeader->pFrameinfoList[i].pos, (unsigned char)pObj->pQueData[pObj->pQueHeader->pFrameinfoList[i].pos+sizeof(SS_BUF_T)+0], (unsigned char)pObj->pQueData[pObj->pQueHeader->pFrameinfoList[i].pos+sizeof(SS_BUF_T)+1], (unsigned char)pObj->pQueData[pObj->pQueHeader->pFrameinfoList[i].pos+sizeof(SS_BUF_T)+2], (unsigned char)pObj->pQueData[pObj->pQueHeader->pFrameinfoList[i].pos+sizeof(SS_BUF_T)+3], (unsigned char)pObj->pQueData[pObj->pQueHeader->pFrameinfoList[i].pos+sizeof(SS_BUF_T)+4]); } } #endif return ret; } int SSQ_GetData(SS_QUEUE_OBJ_T *pObj, unsigned int *channelid, unsigned int *mediatype, MEDIA_FRAME_INFO *frameinfo, char *pbuf) { int ret = 0; unsigned int remain = 0; if (NULL == pObj) return -1; if (NULL == pObj->pQueHeader) return -1; if (NULL == pObj->pFrameinfoList) return -1; WaitForSingleObject(pObj->hMutex, INFINITE); //Lock if (pObj->pQueHeader->totalsize < 0) { SSQ_TRACE("pObj->pQueHeader->totalsize<0: %d\n", pObj->pQueHeader->totalsize); ReleaseMutex(pObj->hMutex); return -1; } if (pObj->pQueHeader->totalsize <= sizeof(SS_BUF_T)) { ReleaseMutex(pObj->hMutex); return -1; } //_TRACE("讀位置: %d\n", pQueHeader->readpos); //if (NULL != chid) *chid = m_chid; //Lock(); #if 0 ret = -1; for (unsigned int i=0; i pQueHeader->maxframeno; i++) { if (pObj->pFrameinfoList[i].rtp_timestamp > frameinfo->rtptimestamp) { pObj->pQueHeader->readpos = pObj->pFrameinfoList[i].pos; ret = SSQ_GetDataByPosition(pObj, pObj->pFrameinfoList[i].pos, channelid, mediatype, frameinfo, pbuf); break; } } #else if (pObj->pQueHeader->readpos == pObj->pQueHeader->bufsize) { SSQ_TRACE("重置讀位置[%d / %d]..\n", pObj->pQueHeader->readpos, pObj->pQueHeader->bufsize); pObj->pQueHeader->readpos = 0; } if ((pObj->pQueHeader->readpos + sizeof(SS_BUF_T)) <= pObj->pQueHeader->bufsize) { SS_BUF_T *pNode = (SS_BUF_T *)(pObj->pQueData + pObj->pQueHeader->readpos); //if (pNode->id<1) if (pNode->flag != BUF_QUE_FLAG) { SSQ_TRACE("標志位錯誤... 緩存視頻幀:%d 字節數:%d 清空隊列\n", pObj->pQueHeader->videoframes, pObj->pQueHeader->totalsize); if (pObj->hSSHeader == NULL) //同一個進程內使用 { SSQ_Clear(pObj); } else { pObj->pQueHeader->clear_flag = 0x01; #ifdef _WIN32 while (pObj->pQueHeader->clear_flag!=0x00) {Sleep(1);} #else while (pObj->pQueHeader->clear_flag!=0x00) {usleep(100);} #endif } //Unlock(); SSQ_TRACE("111標志位錯誤... 緩存視頻幀:%d 字節數:%d\n", pObj->pQueHeader->videoframes, pObj->pQueHeader->totalsize); SSQ_TRACE("標志位錯誤.. 清空隊列.. readpos: %d\n", pObj->pQueHeader->readpos); //pObj->pQueHeader->clear_flag = 0x01; //_TRACE("標志位錯誤.. 清空隊列完成..\n"); ReleaseMutex(pObj->hMutex); return -1; } if (NULL!=mediatype) *mediatype = pNode->mediatype; if (NULL != channelid) *channelid = pNode->channelid; memcpy(frameinfo, &pNode->frameinfo, sizeof(MEDIA_FRAME_INFO)); if ( (pObj->pQueHeader->readpos + pNode->frameinfo.length+sizeof(SS_BUF_T)) <= pObj->pQueHeader->bufsize) { //從頭到尾讀 if (pObj->pQueHeader->totalsize < frameinfo->length+sizeof(SS_BUF_T)) { //數據量不夠 SSQ_TRACE("數據量不夠... 總字節數[%d]<幀長[%d]. 讀位置:%d\n", pObj->pQueHeader->totalsize, frameinfo->length+sizeof(SS_BUF_T), pObj->pQueHeader->readpos); ReleaseMutex(pObj->hMutex); return -1; } if (frameinfo->length+sizeof(SS_BUF_T) > pObj->pQueHeader->totalsize) { SSQ_TRACE("總字節數[%d]<幀長[%d]. 讀位置:%d\n", pObj->pQueHeader->totalsize, frameinfo->length+sizeof(SS_BUF_T), pObj->pQueHeader->readpos); } //_TRACE("讀位置00000000...: %d / %d\n", pQueHeader->readpos, pQueHeader->size); pObj->pQueHeader->readpos += sizeof(SS_BUF_T); unsigned int total1 = pObj->pQueHeader->totalsize; pObj->pQueHeader->totalsize -= sizeof(SS_BUF_T); if (NULL!=pbuf) memcpy(pbuf, pObj->pQueData+pObj->pQueHeader->readpos, frameinfo->length); //memset(pObj->pQueData+pObj->pQueHeader->readpos, 0x00, frameinfo->length); //clear pObj->pQueHeader->readpos += frameinfo->length; unsigned int total2 = pObj->pQueHeader->totalsize; pObj->pQueHeader->totalsize -= (frameinfo->length); if (pObj->pQueHeader->readpos == pObj->pQueHeader->bufsize) { pObj->pQueHeader->readpos = 0; } if (pObj->pQueHeader->readpos > pObj->pQueHeader->bufsize) { SSQ_TRACE("讀位置錯誤11111...: %d / %d\n", pObj->pQueHeader->readpos, pObj->pQueHeader->bufsize); } if (pObj->pQueHeader->totalsize < 0) { SSQ_TRACE("讀位置: %d\t寫位置: %d 當前幀大小:%d total1:%d\ttotal2:%d\ttotal3:%d\n", pObj->pQueHeader->readpos, pObj->pQueHeader->writepos, frameinfo->length, total1, total2, pObj->pQueHeader->totalsize); SSQ_TRACE("總字節數錯誤: %d\n", pObj->pQueHeader->totalsize); } } else { if (pObj->pQueHeader->totalsize < (pNode->frameinfo.length+sizeof(SS_BUF_T))) { SSQ_TRACE("總字節數<幀長+sizeof(SS_BUF_T)..\n"); //Unlock(); ReleaseMutex(pObj->hMutex); return -1; } remain = pObj->pQueHeader->bufsize-pObj->pQueHeader->readpos; if (remain>=sizeof(SS_BUF_T)) { pObj->pQueHeader->readpos += sizeof(SS_BUF_T); remain = pObj->pQueHeader->bufsize - pObj->pQueHeader->readpos; if (remain>0) { //SSQ_TRACE("111尾有部分數據... 首有部分數據... remain>0: %d\n", remain); if (NULL!=pbuf) memcpy(pbuf, pObj->pQueData+pObj->pQueHeader->readpos, remain); //memset(pObj->pQueData+pObj->pQueHeader->readpos, 0x00, remain); //clear if (NULL!=pbuf) memcpy(pbuf+remain, pObj->pQueData, frameinfo->length-remain); //memset(pObj->pQueData, 0x00, frameinfo->length-remain); //clear pObj->pQueHeader->readpos = frameinfo->length-remain; pObj->pQueHeader->totalsize -= frameinfo->length; pObj->pQueHeader->totalsize -= sizeof(SS_BUF_T); if (pObj->pQueHeader->totalsize < 0) { SSQ_TRACE("3333pObj->pQueHeader->totalsize<0: %d\n", pObj->pQueHeader->totalsize); } } else { if (remain < 0) { SSQ_TRACE("位移錯誤: 剩余字節數<0:%d\n", remain); } else { SSQ_TRACE("111尾有部分數據... 首有部分數據... remain<=0: %d\n", remain); if (NULL!=pbuf) memcpy(pbuf, pObj->pQueData, frameinfo->length); //memset(pObj->pQueData, 0x00, frameinfo->length-remain); //clear pObj->pQueHeader->readpos = frameinfo->length; pObj->pQueHeader->totalsize -= frameinfo->length; pObj->pQueHeader->totalsize -= sizeof(SS_BUF_T); if (pObj->pQueHeader->totalsize < 0) { SSQ_TRACE("4444pObj->pQueHeader->totalsize<0: %d\n", pObj->pQueHeader->totalsize); } } } //SSQ_TRACE("remain > sizeof(SS_BUF_T): %d\t\treadpos: %d\n", remain, pObj->pQueHeader->readpos); } else { //SSQ_TRACE("remain < sizeof(SS_BUF_T): %d\n", remain); //執行到此處,說明異常 //remain = pObj->pQueHeader->bufsize - pObj->pQueHeader->readpos; if (remain>0) { SSQ_TRACE("222尾有部分數據... 首有部分數據... remain>0: %d\n", remain); if (NULL!=pbuf) memcpy(pbuf, pObj->pQueData+pObj->pQueHeader->readpos, remain); //memset(pObj->pQueData+pObj->pQueHeader->readpos, 0x00, remain); //clear if (NULL!=pbuf) memcpy(pbuf+remain, pObj->pQueData, frameinfo->length-remain); //memset(pObj->pQueData, 0x00, frameinfo->length-remain); //clear pObj->pQueHeader->readpos = frameinfo->length-remain; pObj->pQueHeader->totalsize -= frameinfo->length; if (pObj->pQueHeader->totalsize < 0) { SSQ_TRACE("555pObj->pQueHeader->totalsize<0: %d\n", pObj->pQueHeader->totalsize); } } else { SSQ_TRACE("222尾有部分數據... 首有部分數據... remain<=0: %d\n", remain); if (NULL!=pbuf) memcpy(pbuf, pObj->pQueData, frameinfo->length); //memset(pObj->pQueData, 0x00, frameinfo->length-remain); //clear pObj->pQueHeader->readpos = frameinfo->length; pObj->pQueHeader->totalsize -= frameinfo->length; if (pObj->pQueHeader->totalsize < 0) { SSQ_TRACE("666pObj->pQueHeader->totalsize<0: %d\n", pObj->pQueHeader->totalsize); } } pObj->pQueHeader->totalsize -= sizeof(SS_BUF_T); if (pObj->pQueHeader->totalsize < 0) { SSQ_TRACE("777pObj->pQueHeader->totalsize<0: %d\n", pObj->pQueHeader->totalsize); } } //pObj->pQueHeader->readpos += sizeof(SS_BUF_T); //pObj->pQueHeader->totalsize -= sizeof(SS_BUF_T); if (pObj->pQueHeader->readpos>pObj->pQueHeader->bufsize) { SSQ_TRACE("讀位置錯誤...: %d / %d\n", pObj->pQueHeader->readpos, pObj->pQueHeader->bufsize); } /* if (NULL!=pbuf) memcpy(pbuf, pShareMemoryBuff+pQueHeader->readpos, remain); memset(pShareMemoryBuff+pQueHeader->readpos, 0x00, remain); //clear if (NULL!=pbuf) memcpy(pbuf+remain, pShareMemoryBuff, frameinfo->length-remain); memset(pShareMemoryBuff, 0x00, frameinfo->length-remain); //clear pQueHeader->readpos = frameinfo->length-remain; pQueHeader->totalsize -= (frameinfo->length-remain); */ } if (MEDIA_TYPE_VIDEO==pNode->mediatype) pObj->pQueHeader->videoframes --; //memset(pNode, 0x00, sizeof(SS_BUF_T)); } else { unsigned int remain = pObj->pQueHeader->bufsize-pObj->pQueHeader->readpos; SS_BUF_T bufnode; char *pp = (char *)&bufnode; memset(&bufnode, 0x00, sizeof(SS_BUF_T)); //SSQ_TRACE("GET DATA...\n"); //SSQ_TRACE("1 REMAIN: %d\n", remain); if (remain>0) { memcpy(pp, pObj->pQueData+pObj->pQueHeader->readpos, remain); //memset(pObj->pQueData+pObj->pQueHeader->readpos, 0x00, remain); //clear //SSQ_TRACE("2 read: %d\n", sizeof(SS_BUF_T)-remain); memcpy(pp+remain, pObj->pQueData, sizeof(SS_BUF_T)-remain); //memset(pObj->pQueData, 0x00, sizeof(SS_BUF_T)-remain); //clear memcpy(frameinfo, &bufnode.frameinfo, sizeof(MEDIA_FRAME_INFO)); if (NULL != channelid) *channelid = bufnode.channelid; //if (bufnode.id<1) if (bufnode.flag != BUF_QUE_FLAG) { //Unlock(); SSQ_Clear(pObj); ReleaseMutex(pObj->hMutex); SSQ_TRACE("SSQ_標志符錯誤...\n"); return -1; } pObj->pQueHeader->readpos = sizeof(SS_BUF_T)-remain; if (NULL!=mediatype) *mediatype = bufnode.mediatype; //SSQ_TRACE("3 frame length: %d\n", bufnode.frameinfo.length); if (NULL!=pbuf) memcpy(pbuf, pObj->pQueData+pObj->pQueHeader->readpos, bufnode.frameinfo.length); //memset(pObj->pQueData+pObj->pQueHeader->readpos, 0x00, bufnode.frameinfo.length); //clear pObj->pQueHeader->readpos += bufnode.frameinfo.length; pObj->pQueHeader->totalsize -= sizeof(SS_BUF_T); //pObj->pQueHeader->totalsize -= (frameinfo->length-remain); pObj->pQueHeader->totalsize -= (frameinfo->length);//20140521 if (pObj->pQueHeader->totalsize < 0) { SSQ_TRACE("888pObj->pQueHeader->totalsize<0: %d\n", pObj->pQueHeader->totalsize); } //SSQ_TRACE("GET DATA OK..\n"); if (MEDIA_TYPE_VIDEO==bufnode.mediatype) pObj->pQueHeader->videoframes --; } else { SSQ_TRACE("異常... REMAIN <= 0....\n"); } ret = 1000; } #endif //Unlock(); ReleaseMutex(pObj->hMutex); return ret; } //=========================================== //根據位置獲取對應的幀數據 int SSQ_GetDataByPosition(SS_QUEUE_OBJ_T *pObj, unsigned int position, unsigned int clearflag, unsigned int *channelid, unsigned int *mediatype, MEDIA_FRAME_INFO *frameinfo, char *pbuf) { int ret = 0; unsigned int remain = 0; if (NULL == pObj) return -1; if (NULL == pObj->pQueHeader) return -1; if (NULL == pObj->pFrameinfoList) return -1; unsigned int *pOffset = (unsigned int *)&position; unsigned int totalsize = pObj->pQueHeader->totalsize; unsigned int *pTotalSize = (unsigned int*)&totalsize; if (clearflag == 0x01) { pOffset = (unsigned int*)&pObj->pQueHeader->readpos; pTotalSize = (unsigned int*)&pObj->pQueHeader->totalsize; } WaitForSingleObject(pObj->hMutex, INFINITE); //Lock if (*pOffset == pObj->pQueHeader->bufsize) { SSQ_TRACE("重置讀位置[%d / %d]..\n", *pOffset, pObj->pQueHeader->bufsize); *pOffset = 0; } if (clearflag==0x01) { if (pObj->pQueHeader->totalsize <= sizeof(SS_BUF_T)) { ReleaseMutex(pObj->hMutex); return -1; } if (pObj->pQueHeader->readpos == pObj->pQueHeader->bufsize) { pObj->pQueHeader->readpos = 0; } } if ((*pOffset + sizeof(SS_BUF_T)) <= pObj->pQueHeader->bufsize) { SS_BUF_T *pNode = (SS_BUF_T *)(pObj->pQueData + *pOffset); if (pNode->flag != BUF_QUE_FLAG) { SSQ_TRACE("[SSQ_GetDataByPosition]標志位錯誤...\n"); if (clearflag == 0x01) { if (pObj->hSSHeader==NULL) //同一進程 { SSQ_Clear(pObj); } else { pObj->pQueHeader->clear_flag = 0x01; while (pObj->pQueHeader->clear_flag!=0x00) {Sleep(1);} } } ReleaseMutex(pObj->hMutex); return -1; } if (NULL!=mediatype) *mediatype = pNode->mediatype; if (NULL != channelid) *channelid = pNode->channelid; memcpy(frameinfo, &pNode->frameinfo, sizeof(MEDIA_FRAME_INFO)); if ( (*pOffset + pNode->frameinfo.length+sizeof(SS_BUF_T)) <= pObj->pQueHeader->bufsize) { //從頭到尾讀 *pOffset += sizeof(SS_BUF_T); if (NULL!=pbuf) memcpy(pbuf, pObj->pQueData+*pOffset, frameinfo->length); *pOffset += frameinfo->length; *pTotalSize -= (frameinfo->length+sizeof(SS_BUF_T)); if (*pOffset == pObj->pQueHeader->bufsize) { *pOffset = 0; } if (*pOffset > pObj->pQueHeader->bufsize) { SSQ_TRACE("[SSQ_GetDataByPosition]讀位置錯誤11111...: %d / %d\n", *pOffset, pObj->pQueHeader->bufsize); } } else { remain = pObj->pQueHeader->bufsize - *pOffset; if (remain>=sizeof(SS_BUF_T)) { *pOffset += sizeof(SS_BUF_T); remain = pObj->pQueHeader->bufsize - *pOffset; if (remain>0) { if (NULL!=pbuf) memcpy(pbuf, pObj->pQueData + *pOffset, remain); if (NULL!=pbuf) memcpy(pbuf+remain, pObj->pQueData, frameinfo->length-remain); *pOffset = frameinfo->length-remain; *pTotalSize -= (frameinfo->length+sizeof(SS_BUF_T)); } else { if (remain < 0) { SSQ_TRACE("[SSQ_GetDataByPosition]位移錯誤: 剩余字節數<0:%d\n", remain); } else { SSQ_TRACE("[SSQ_GetDataByPosition]尾有部分數據... 首有部分數據... remain<=0: %d\n", remain); if (NULL!=pbuf) memcpy(pbuf, pObj->pQueData, frameinfo->length); //memset(pObj->pQueData, 0x00, frameinfo->length-remain); //clear *pOffset = frameinfo->length; *pTotalSize -= (frameinfo->length+sizeof(SS_BUF_T)); } } } else { //SSQ_TRACE("remain < sizeof(SS_BUF_T): %d\n", remain); //執行到此處,說明異常 SSQ_TRACE("[SSQ_GetDataByPosition] 異常 #########... remain>0: %d\n", remain); if (remain>0) { SSQ_TRACE("[SSQ_GetDataByPosition]222尾有部分數據... 首有部分數據... remain>0: %d\n", remain); if (NULL!=pbuf) memcpy(pbuf, pObj->pQueData + *pOffset, remain); if (NULL!=pbuf) memcpy(pbuf+remain, pObj->pQueData, frameinfo->length-remain); *pOffset = frameinfo->length-remain; *pTotalSize -= (frameinfo->length); } else { SSQ_TRACE("[SSQ_GetDataByPosition]222尾有部分數據... 首有部分數據... remain<=0: %d\n", remain); if (NULL!=pbuf) memcpy(pbuf, pObj->pQueData, frameinfo->length); //memset(pObj->pQueData, 0x00, frameinfo->length-remain); //clear *pOffset = frameinfo->length; } } if (*pOffset > pObj->pQueHeader->bufsize) { SSQ_TRACE("[SSQ_GetDataByPosition]讀位置錯誤...: %d / %d\n", *pOffset, pObj->pQueHeader->bufsize); } } if (MEDIA_TYPE_VIDEO==pNode->mediatype && clearflag==0x01) pObj->pQueHeader->videoframes --; } else { unsigned int remain = pObj->pQueHeader->bufsize - *pOffset; SS_BUF_T bufnode; char *pp = (char *)&bufnode; memset(&bufnode, 0x00, sizeof(SS_BUF_T)); //SSQ_TRACE("GET DATA...\n"); SSQ_TRACE("[SSQ_GetDataByPosition]1 REMAIN: %d\n", remain); if (remain>0) { memcpy(pp, pObj->pQueData + *pOffset, remain); SSQ_TRACE("[SSQ_GetDataByPosition]2 read: %d\n", sizeof(SS_BUF_T)-remain); memcpy(pp+remain, pObj->pQueData, sizeof(SS_BUF_T)-remain); memcpy(frameinfo, &bufnode.frameinfo, sizeof(MEDIA_FRAME_INFO)); if (NULL != channelid) *channelid = bufnode.channelid; if (bufnode.flag != BUF_QUE_FLAG) { //Unlock(); SSQ_Clear(pObj); ReleaseMutex(pObj->hMutex); SSQ_TRACE("[SSQ_GetDataByPosition]SSQ_標志符錯誤...\n"); return -1; } *pOffset = sizeof(SS_BUF_T)-remain; if (NULL!=mediatype) *mediatype = bufnode.mediatype; SSQ_TRACE("[SSQ_GetDataByPosition]3 frame length: %d\n", bufnode.frameinfo.length); if (NULL!=pbuf) memcpy(pbuf, pObj->pQueData + *pOffset, bufnode.frameinfo.length); *pOffset += bufnode.frameinfo.length; *pTotalSize -= (frameinfo->length+sizeof(SS_BUF_T)); SSQ_TRACE("[SSQ_GetDataByPosition]GET DATA OK..\n"); if (MEDIA_TYPE_VIDEO==bufnode.mediatype && clearflag==0x01) pObj->pQueHeader->videoframes --; } else { SSQ_TRACE("[SSQ_GetDataByPosition]異常... REMAIN <= 0....\n"); } ret = 1000; } //Unlock(); ReleaseMutex(pObj->hMutex); return ret; } int SSQ_TRACE(char* szFormat, ...) { #ifdef _DEBUG char buff[1024] = {0,}; wchar_t wszbuff[1024] = {0,}; va_list args; va_start(args,szFormat); _vsnprintf(buff, 1023, szFormat,args); va_end(args); MByteToWChar(buff, wszbuff, sizeof(wszbuff)/sizeof(wszbuff[0])); #ifdef _WIN32 OutputDebugString(wszbuff); #endif printf("TRACE: %s", buff); #endif return 0; }
具體代碼實現:https://github.com/EasyDarwin/EasyPlayer/blob/master/win32/libEasyPlayer/ssqueue.cpp
說到圖片,第一反映就是bitmap,那就先來認識一下bitmapBitmap是Android系統中的圖像處理的最重要類之一。用它可以獲取圖像文件信息,進行圖像剪切、旋轉、
前言此前講了很多,終於可以講到這一節了,本文的例子是一個自定義的ViewGroup,左右滑動切換不同的頁面,類似一個特別簡化的ViewPager,這篇文章會涉及到這個系列
顯示地圖和定位對於一個地圖SDK來說,首先要顯示地圖,然後定位到當前城市。這方面百度地圖和高德地圖的處理代碼差不多,下面是兩種地圖sdk顯示並定位的代碼例子:百度地圖//
在移動應用滿天飛的時代,隨著移動支付的盛行,很多應用中都集成了支付功能。之前的支付一直不是我負責,近期這個項目我負責訂單模塊少不了要做支付,每每提起支付就覺得怕怕,覺得很