以testRTSPClient.cpp测试程序来对Live555 RTSP播放进行一个简单的分析。同时对Live555几大模块的功能及使用进行简单描述。
因为我对Live555使用的比较多的是在客户端播放场景下,所以可能有些不足或者错误,请指正。
RTSPClient处于Live555 liveMedia模块,这部分是Live555流媒体的核心部分,主要是实现了各种流媒体交互流程。我们先了解一些重要的类,以帮助后面分析代码。(RTSP播放流程分析)
GroupSockGroupSock是Live555对网络接口的封装,支持UDP/TCP,同时也支持单播/组播。
udpGroupsock = new Groupsock(*env, udpIP, udpPort, ttl);
像这样通过IP及端口,就可以创建一个GroupSock,liveMedia中的模块就可以通过该socket进行网络数据的读写。
Source、Filter 和Sink Source 发送端,流的起点, 可直观理解为生产者,负责读取文件或网络流的信息。 Filter 本质上也是Source,因为可以由多级Source,Filter可以对上级Source进行处理。 Sink 接收端, 流的终点,可理解为是消费者。数据流向简单来说如下:
FramedSource
,必须实现virtual void doGetNextFrame() = 0;
函数;
Filter基于FramedFilter
,FramedFilter
实际上也是基于FramedSource
,Filter一般是会以上一级的Source作为传入参数,其实就是从上一级Source中获取数据在进行处理。第三级、第四级也一样;
Sink基于MediaSink
,必须实现virtual Boolean continuePlaying() = 0;
函数。
具体如下:
Source的创建需要有GroupSock作为参数,指明其读取的socket。
例如:
udpSource = BasicUDPSource::createNew(*env, udpGroupsock);
rtpSource = SimpleRTPSource::createNew(*env, rtpGroupsock, 33, 90000, "video/MP2T", 0, False /*no 'M' bit*/);
如果还有Filter,就以Source为参数,创建Filter,当然Filter也需要实现virtual void doGetNextFrame() = 0;
函数;在其中进行Filter的实现处理。以流媒体中最常见的MPEG2TransportStreamFramer为例,在其自身的afterGettingFrame函数中做了一些TS对齐、解析等操作后,调用的上级Source的doGetNextFrame。
readSource= MPEG2TransportStreamFramer::createNew(*env, rtpSource);
Sink的创建需要Source 作为参数,并调用startPlaying函数启动,当启动后,会调用continuePlaying。如下:
Boolean MediaSink::startPlaying(MediaSource& source,
afterPlayingFunc* afterFunc,
void* afterClientData) {
// Make sure we're not already being played:
if (fSource != NULL) {
envir().setResultMsg("This sink is already being played");
return False;
}
// Make sure our source is compatible:
if (!sourceIsCompatibleWithUs(source)) {
envir().setResultMsg("MediaSink::startPlaying(): source is not compatible!");
return False;
}
fSource = (FramedSource*)&source;
fAfterFunc = afterFunc;
fAfterClientData = afterClientData;
return continuePlaying();
}
要注意这里面定义了fAfterFunc 就是最后播放结束调用的函数。
例子如下:
sink->startPlaying((FramedSource&)(*readSource), afterPlaying, sink);
continuePlaying中,最基本需要实现的操作就是从Source中读取下一帧数据getNextFrame
,进而会调用到Source的doGetNextFrame
函数。
getNextFrame
中传入的几个参数,fBuffer是存放读取数据的内存,fBufferSize是读取的数据大小,afterGettingFrame是读完一帧后的处理函数。
Boolean FileSink::continuePlaying() {
if (fSource == NULL) return False;
fSource->getNextFrame(fBuffer, fBufferSize,
afterGettingFrame, this,
onSourceClosure, this);
return True;
}
每种类型Source的doGetNextFrame
函数读取每一帧数据都会有不同的处理,但最后,都会调用父类FramedSource的afterGetting(this);函数,最后调用到在continuePlaying实现中传进来的获取每一帧后的处理函数,例如上面的例子就是afterGettingFrame函数。
void FramedSource::afterGetting(FramedSource* source) {
source->fIsCurrentlyAwaitingData = False;
// indicates that we can be read again
// Note that this needs to be done here, in case the "fAfterFunc"
// called below tries to read another frame (which it usually will)
if (source->fAfterGettingFunc != NULL) {
(*(source->fAfterGettingFunc))(source->fAfterGettingClientData,
source->fFrameSize, source->fNumTruncatedBytes,
source->fPresentationTime,
source->fDurationInMicroseconds);
}
}
一般来说,afterGettingFrame函数会完成各种Sink定义的操作后(如播放帧数据),然后消费完这一帧后,又会调用continuePlaying函数,从而完成一个闭环。
例如:
void FileSink::afterGettingFrame(unsigned frameSize,
unsigned numTruncatedBytes,
struct timeval presentationTime) {
if (numTruncatedBytes > 0) {
envir() << "FileSink::afterGettingFrame(): The input frame data was too large for our buffer size ("
<< fBufferSize << "). "
<< numTruncatedBytes << " bytes of trailing data was dropped! Correct this by increasing the \"bufferSize\" parameter in the \"createNew()\" call to at least "
<< fBufferSize + numTruncatedBytes <stopGettingFrames();
onSourceClosure(this);
return;
}
if (fPerFrameFileNameBuffer != NULL) {
if (fOutFid != NULL) { fclose(fOutFid); fOutFid = NULL; }
}
// Then try getting the next frame:
continuePlaying();
}
最后,这样Souce和Sink就形成一个闭环,不断生产->消费->生产->消费…
TaskScheduler和BasicTaskScheduler这两个是Live555中任务调度的模块,TaskScheduler定义了接口,BasicTaskScheduler继承BasicTaskScheduler0,BasicTaskScheduler0继承TaskScheduler。
在sink调用startPlaying后,最后必须调用TaskScheduler的doEventLoop,来让整个程序跑起来。
介绍重要的函数及概念:
BasicTaskScheduler0中doEventLoop
为程序循环函数,实际看SingleStep
中的实现。
void BasicTaskScheduler0::doEventLoop(char volatile* watchVariable) {
// Repeatedly loop, handling readble sockets and timed events:
while (1) {
if (watchVariable != NULL && *watchVariable != 0) break;
SingleStep();
}
}
SingleStep
中根据任务的类别,分成三类的来处理,每一次循环都会按照下面顺序来完成调用。
1、首先处理的是Socket-Event。
负责I/O复用,使用select函数等待指定的描述字准备好读、写或有异常条件处理。若select返回值大于-1,则转到相应的处理函数;否则表明发生异常,程序将转到错误处理代码中去。该类型适合于有I/O操作的任务。
像UDP/RTP Source从Socket的读取便是这种类型。
相关函数:
turnOnBackgroundReadHandling //加入到后台IO
setBackgroundHandling //加入到后台IO
disableBackgroundHandling //禁止后台IO,即清空
moveSocketHandling //移除指定后台IO
2、接着是处理触发器事件(Trigger-Event)。
Live555定义了一个32位的位图来实现触发事件,当某一位设置为1则表明要触发该位对应的事件。若同时有多个(3个或以上)触发事件,它们触发的先后还会跟事件创建的先后有关,因此这一类型仅适合于没有顺序依赖关系的任务。
使用方法:
Trigger-Event的使用需先创建一个EventTrigger,指定其处理函数,需要触发时,调用triggerEvent。如:
testEventID = scheduler->createEventTrigger(testEventHandler);
void testEventHandler(void* user_data)
{
ALOGD("%s\n", __FUNCTION__);
}
scheduler->triggerEvent(testEventID , this);
相关函数:
createEventTrigger //创建触发器
deleteEventTrigger //删除触发器
triggerEvent //触发
3、最后一个是定时任务(Delayed Task)。
它是一个带有时间的任务。当剩余时间不为0,则任务不执行。通过调整任务的剩余时间,可以灵活地安排任务。
使用方法:
创建一个定时任务,给定时间及处理函数即可,如:
if (scs.duration > 0) {
unsigned const delaySlop = 2; // number of seconds extra to delay, after the stream's expected duration. (This is optional.)
scs.duration += delaySlop;
unsigned uSecsToDelay = (unsigned)(scs.duration*1000000);
scs.streamTimerTask = env.taskScheduler().scheduleDelayedTask(uSecsToDelay, (TaskFunc*)streamTimerHandler, rtspClient);
}
void streamTimerHandler(void* clientData) {
ourRTSPClient* rtspClient = (ourRTSPClient*)clientData;
StreamClientState& scs = rtspClient->scs; // alias
scs.streamTimerTask = NULL;
// Shut down the stream:
shutdownStream(rtspClient);
}
相关函数:
scheduleDelayedTask //添加定时任务
unscheduleDelayedTask //移除定时任务
rescheduleDelayedTask //重置定时任务
BasicTaskScheduler0实现以及触发事件和延迟任务。
BasicTaskScheduler类实现剩下的I/O操作任务接口和三类任务的实际调度(singleStep函数)。
至此,我们了解了Live555的基本模块使用,那么接下来进入主题,RTSPClient的分析