本文介紹PacketQueue,相對(duì)于FrameQueue來(lái)說(shuō)比較簡(jiǎn)單,可以類比Android中的MessageQueue。
PacketQueue總體介紹
- 單向鏈表結(jié)構(gòu)。first_pkt、last_pkt,是鏈表的起點(diǎn)和終點(diǎn)結(jié)點(diǎn);recycle_pkt鏈表用于節(jié)點(diǎn)復(fù)用;
- 是一個(gè)多線程安全隊(duì)列,靠等待喚醒機(jī)制保證線程安全;
- 當(dāng)遇到flush_pkt時(shí),serial加1自增,標(biāo)志著流序列變化,區(qū)分是否是連續(xù)的流;
typedef struct MyAVPacketList {
AVPacket pkt;
struct MyAVPacketList *next;
int serial;
} MyAVPacketList;
typedef struct PacketQueue {
MyAVPacketList *first_pkt, *last_pkt;
int nb_packets;
int size;
int64_t duration;
int abort_request;
int serial;
SDL_mutex *mutex;
SDL_cond *cond;
MyAVPacketList *recycle_pkt;
int recycle_count;
int alloc_count;
int is_buffer_indicator;
SDL_Profiler videoBufferProfiler;
SDL_Profiler audioBufferProfiler;
void *ffp;
} PacketQueue;
PacketQueue API介紹
packet_queue_init:初始化;
packet_queue_start:?jiǎn)?dòng)隊(duì)列,設(shè)置abort_request為0,先放一個(gè)flush_pkt;
packet_queue_put:存入一個(gè)節(jié)點(diǎn),;
packet_queue_put_nullpacket:存入一個(gè)空節(jié)點(diǎn);
packet_queue_put_private:存入一個(gè)節(jié)點(diǎn),后喚醒packet_queue_get等待鎖;
packet_queue_get:獲取一個(gè)節(jié)點(diǎn);
packet_queue_get_or_buffering:去緩沖等待水位后獲取一個(gè)節(jié)點(diǎn);
packet_queue_abort:中止,設(shè)置abort_request=1后喚醒packet_queue_get等待鎖;
packet_queue_flush:清除隊(duì)列內(nèi)所有的節(jié)點(diǎn);
packet_queue_destroy:銷毀;
初始化
static int packet_queue_init(PacketQueue *q) {
memset(q, 0, sizeof(PacketQueue));
q->mutex = SDL_CreateMutex();
q->cond = SDL_CreateCond();
q->abort_request = 1;
return 0;
}
static void packet_queue_start(PacketQueue *q) {
SDL_LockMutex(q->mutex);
q->abort_request = 0;
packet_queue_put_private(q, &flush_pkt);
SDL_UnlockMutex(q->mutex);
}
put操作
/*
* 存入null結(jié)點(diǎn),eof和error時(shí)候存入,表示流結(jié)束
*/
static int packet_queue_put_nullpacket(PacketQueue *q, int stream_index) {
AVPacket pkt1, *pkt = &pkt1;
av_init_packet(pkt);
pkt->data = NULL;
pkt->size = 0;
pkt->stream_index = stream_index;
return packet_queue_put(q, pkt);
}
static int packet_queue_put(PacketQueue *q, AVPacket *pkt) {
int ret;
SDL_LockMutex(q->mutex);
ret = packet_queue_put_private(q, pkt);
SDL_UnlockMutex(q->mutex);
if (pkt != &flush_pkt && ret < 0)
av_packet_unref(pkt);
return ret;
}
static int packet_queue_put_private(PacketQueue *q, AVPacket *pkt) {
if (q->abort_request) {
return -1;
}
// 如果有已經(jīng)回收的就復(fù)用該回收的結(jié)點(diǎn),沒(méi)有就申請(qǐng)一個(gè);
MyAVPacketList *pkt1 = q->recycle_pkt;
if (pkt1) {
q->recycle_pkt = pkt1->next; // 移動(dòng)到下一個(gè)
q->recycle_count++;
} else {
q->alloc_count++;
pkt1 = av_malloc(sizeof(MyAVPacketList));
}
if (!pkt1) {
return -1;
}
pkt1->pkt = *pkt;
pkt1->next = NULL;
// 遇到flush_pkt就升級(jí)serial序列號(hào),標(biāo)志剛開(kāi)始或進(jìn)行了seek
if (pkt == &flush_pkt) {
q->serial++;
}
pkt1->serial = q->serial;
// 賦值first_pkt和last_pkt,定義鏈表的起點(diǎn)和終點(diǎn);
if (!q->last_pkt) { // 條件判斷同 !q->first_pkt
q->first_pkt = pkt1;
} else {
q->last_pkt->next = pkt1;
}
q->last_pkt = pkt1;
q->nb_packets++;
q->size += pkt1->pkt.size + sizeof(*pkt1);
q->duration += FFMAX(pkt1->pkt.duration, MIN_PKT_DURATION);
/* XXX: should duplicate packet data in DV case */
SDL_CondSignal(q->cond);
return 0;
}
get操作
/*
* block: 是否阻塞
* 返回1表示獲取到了,返回值<=0表示沒(méi)獲取到
*/
static int packet_queue_get(PacketQueue *q, AVPacket *pkt, int block, int *serial) {
MyAVPacketList *pkt1;
int ret;
// 加鎖
SDL_LockMutex(q->mutex);
for (;;) {
if (q->abort_request) {
ret = -1;
break;
}
pkt1 = q->first_pkt;
if (pkt1) {
q->first_pkt = pkt1->next;
if (!q->first_pkt) {
// 說(shuō)明只有一個(gè)結(jié)點(diǎn)
q->last_pkt = NULL;
}
q->nb_packets--;
q->size -= pkt1->pkt.size + sizeof(*pkt1);
q->duration -= FFMAX(pkt1->pkt.duration, MIN_PKT_DURATION);
*pkt = pkt1->pkt;
if (serial) {
*serial = pkt1->serial;
}
// 把pkt1持有的pkt給出去后進(jìn)行回收,放到recycle_pkt鏈表頭部
pkt1->next = q->recycle_pkt;
q->recycle_pkt = pkt1;
ret = 1;
break;
} else if (!block) {
ret = 0;
break;
} else {
// wait阻塞,等待put喚醒
SDL_CondWait(q->cond, q->mutex);
}
}
SDL_UnlockMutex(q->mutex);
return ret;
}
/*
* 阻塞等待直到退出或者有AVPacket數(shù)據(jù)
* >= 0 即取到值;
*/
static int packet_queue_get_or_buffering(FFPlayer *ffp, PacketQueue *q, AVPacket *pkt, int *serial,
int *finished) {
if (!ffp->packet_buffering)
return packet_queue_get(q, pkt, 1, serial); // queue為空時(shí)會(huì)阻塞等待
while (1) {
int new_packet = packet_queue_get(q, pkt, 0, serial); // 非阻塞,直接返回
if (new_packet < 0) {
// abort_request了
return -1;
} else if (new_packet == 0) {
// 隊(duì)列為空,去緩沖
if (q->is_buffer_indicator && !*finished) {
ffp_toggle_buffering(ffp, 1);
}
// 再阻塞獲取,等待水位填充滿
new_packet = packet_queue_get(q, pkt, 1, serial);
if (new_packet < 0) {
// abort_request了
return -1;
}
}
if (*finished == *serial) {
av_packet_unref(pkt);
continue;
} else {
break;
}
}
return 1;
}
重置、銷毀操作
// stream_close時(shí)第一個(gè)調(diào)用它,主要是置abort_request為1,阻斷后續(xù)所有流程
static void packet_queue_abort(PacketQueue *q) {
SDL_LockMutex(q->mutex);
q->abort_request = 1;
SDL_CondSignal(q->cond);
SDL_UnlockMutex(q->mutex);
}
// seek或destory時(shí)調(diào)用
static void packet_queue_flush(PacketQueue *q) {
SDL_LockMutex(q->mutex);
// 釋放所有pkt
MyAVPacketList *pkt, *pkt1;
for (pkt = q->first_pkt; pkt; pkt = pkt1) {
pkt1 = pkt->next;
av_packet_unref(&pkt->pkt);
// 回收,放到鏈表頭部
pkt->next = q->recycle_pkt;
q->recycle_pkt = pkt;
}
q->last_pkt = NULL;
q->first_pkt = NULL;
q->nb_packets = 0;
q->size = 0;
q->duration = 0;
SDL_UnlockMutex(q->mutex);
}
// 清空所有pkt,包括recycle_pkt,stream_close處調(diào)用
static void packet_queue_destroy(PacketQueue *q) {
packet_queue_flush(q);
SDL_LockMutex(q->mutex);
while (q->recycle_pkt) {
MyAVPacketList *pkt = q->recycle_pkt;
if (pkt)
q->recycle_pkt = pkt->next;
av_freep(&pkt);
}
SDL_UnlockMutex(q->mutex);
SDL_DestroyMutex(q->mutex);
SDL_DestroyCond(q->cond);
}