p2p传输rtmp

p2p传输rtmpKKP2P SDK 介绍 kkp2p sdk 是库快科技 官网试用下载 研发的支持 p2p 通信的中间件 是一套与业务无关的通用的 p2p sdk 库 可以免费下载试用版本使用 一句话概括其特点 支持面向登录账号进行网络编程 即只要传入对端的账号 id 应用层就能得到一个 socket 句柄 fd

大家好,我是讯享网,很高兴认识大家。

#  KKP2P SDK介绍
kkp2p sdk是库快科技(官网试用下载)研发的支持p2p通信的中间件,是一套与业务无关的通用的p2p sdk库, 可以免费下载试用版本使用。
一句话概括其特点:支持面向登录账号进行网络编程。
即只要传入对端的账号id,应用层就能得到一个socket句柄fd,应用层通过读写该socket fd句柄和对端进行通信。

优秀特性 说明
跨平台 kkp2p的sdk库是由c语言开发,在linux、windows、android、ios等平台编译出了静态库,以及其他一些嵌入式平台也编译出了静态库,大家可以直接下载进行使用。云端服务是由golang语言开发,也支持在各种平台下编译出直接可以运行的程序,配置也比较简单,大家下载之后就可以按照官网文档说明自行进行部署。
体积小 kkp2p不依赖于任何第三方库,编译出来的库只有500KB左右大小。
性能强 在服务器上测试,P2P方式通信的速度可以超过10MB每秒;中转方式通信的速度取决于您云端服务器的带宽
易使用 提供了类似于socket编程接口的kkp2p_connect、kkp2p_listen、kkp2p_accept、kkp2p_read、kkp2p_write几个核心函数,使用起来非常简单方便。您只要指定对端的登录账号通过kkp2p_connect函数就能和对端创建一个虚拟的传输管道,然后通过kkp2p_read和kkp2p_write函数来读写数据和对端进行通信。您还可以通过参数指定是使用P2P方式通信还是使用中转(relay)方式通信,完全不用关心底层传输通道的创建和管理细节,一切由kkp2p的sdk库帮您解决。
高安全 支持加密通道传输,您只需要在kkp2p_connect的参数中指定需要加密数据即可。sdk会自动创建一个加密的虚拟通信管道出来;您写入明文,sdk会自动加密成密文传输;sdk收到密文,会自动解密成明文返回给您。通信双方的共同密钥是双方的sdk通过DH算法自动协商而成,外界无法获取;并且每次sdk的启动都会自动协商生成一个新的动态密钥,严格保障您的通信数据的安全。如果您为了提升数据传输的性能,不想对数据进行加解密,只需要在创建连接的函数kkp2p_connect参数中指定不需要加密数据即可。sdk的数据加解密功能只有在商业版本中才有,在个人试用版本中没有该功能。
通用性 kkp2p是一套适用于各种场景的通用的通信中间件,会完全透传用户的数据,您可以灵活的自定义通信双方的协议,kkp2p不会解析您的业务数据。kkp2p的P2P通信是基于udp实现的,kkp2p会自动帮您解决丢包、乱序、重传问题,也会根据您的实际带宽做自适应的带宽流控,您使用起来具有tcp传输的效果,相当于是用udp模拟实现了tcp。中转(relay)通信是基于tcp原生实现的。

#  源码介绍

本例子用于举例说明,如何基于p2p使用rtmp协议。众所周知,rtmp传输协议广泛用于直播点播系统中,主要用于传输音视频数据。

本例子分两个程序配合演示,一个作为rtmp server的接入代理(peer client);一个作为服务端(rtmp server)。最后演示时候用ffmpeg推流,用vlc播放器播放。测试流程描述如下:


讯享网

首先我们看peer client代码,该代码在windows平台下用visual studio编译测试通过。peer client利用接口kkp2p_start_proxy启动一个本地代理和rtmp server进行通信。源码讲解如下:

#include <windows.h> #include <process.h> #include <iostream> #include <stdio.h> #include <stdint.h> #include <errno.h> #include <string.h> #include <sys/types.h> #include <fcntl.h> #include <sys/timeb.h> #include <signal.h> // 去kkuai.com获取最新的头文件和库 #include "kkp2p_sdk.h" #pragma comment(lib,"Ws2_32.lib") #pragma comment(lib, "iphlpapi.lib") // CTRL+c退出程序 char run_flag = 1; void SignalHandler(int signal) { printf("exit...\n"); run_flag = 0; } // 总共5个参数,为代理ip、port、对端通信的peer id,代理和peer id建联模式 int main(int argc, char argv) { if (argc < 5) { printf("usage:%s proxy_ip proxy_port peer_id connect_mode\n", argv[0]); return -1; } typedef void(*SignalHandlerPointer)(int); SignalHandlerPointer previousHandler; previousHandler = signal(SIGINT, SignalHandler); WSADATA wsadata;//注释2 WSAStartup(MAKEWORD(2, 2), &wsadata); // 设置p2p服务端的登录域名和端口 kkp2p_engine_conf_t kkp2p_conf; kkp2p_conf.login_domain = (char*)"124.71.217.198"; kkp2p_conf.login_port = 3080; kkp2p_conf.lan_search_port = 3549; kkp2p_conf.max_log_size = 1024 * 1024 * 10; kkp2p_conf.log_path = NULL; kkp2p_engine_t* g_engine = kkp2p_engine_init(&kkp2p_conf, 5000); kkp2p_switch_log_level(g_engine, 4); kkp2p_connect_ctx_t ctx; memset(&ctx, 0, sizeof(kkp2p_connect_ctx_t)); strncpy(ctx.peer_id, argv[3], 32); ctx.timeout = 5000; // 和peer id建连模式,0为自动模式,1为仅p2p模式,2为仅relay模式 ctx.connect_mode = atoi(argv[4]); uint32_t proxyId = 0; int ret = kkp2p_start_proxy(g_engine, argv[1], atoi(argv[2]), &ctx, &proxyId); if (ret < 0) { printf("create proxy(%s:%d) to peer %s error.\n", argv[1], atoi(argv[2]), argv[3]); return -1; } else { printf("create proxy(%s:%d) to peer %s success.\n", argv[1], atoi(argv[2]), argv[3]); } while (run_flag) { Sleep(1000); } kkp2p_stop_proxy(g_engine, proxyId); kkp2p_engine_destroy(g_engine); return 0; }

讯享网

peer server(rtmp server)端模式流程简介如下,主线程不断调用kkp2p_accept接入新的连接,如果有新的连接过来,会得到一个句柄fd,然后在该fd句柄上处理rtmp协议即可。服务端代码来自于互联网,在原有代码上做了p2p的使用适配,我们放在最后讲解,该服务端代码在linux平台行测试通过。由于仅是演示作用,有些细节考虑不完善,仅做参考使用。

我们现在看测试过程,首先再windows下启动peer client,即rtmp proxy代理, 地址为127.0.0.1:32915

然后再linux平台下启动peer server,即rtmp  server模块

接着再windows下启动vlc播放器

然后再windows下平台启动ffmpeg进行推流

最后可以看到vlc的播放画面

 从上面例子可以看到,我们可以基于库快科技的p2p库,用p2p( 基于udp协议)传输方式,来传输rtmp协议数据,这样再一对一音视频传输场景下,会为您节约大量的带宽成本。

最后我们看peer server端(rtmp服务端)源码,该源码大部分都是rtmp协议相关,主要来自于互联网,仅做了p2p适配处理,我们可以看到库快科技的p2p库是极易使用的。

编译该源码除了链接库快科技的p2p库之外,还需要链接rtmp的库

讯享网#include <stdio.h> #include <stdlib.h> #include <stdint.h> #include <unistd.h> #include <string.h> #include <pthread.h> #include <semaphore.h> #include <sys/time.h> #include <fcntl.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <netinet/tcp.h> #include <arpa/inet.h> #include <string> #include <vector> #include <list> #include <set> #include <map> #include <librtmp/rtmp.h> #include <librtmp/log.h> #include "kkp2p_sdk.h" kkp2p_engine_t* g_engine = NULL; class CMutex { pthread_mutex_t m_mutex; public: CMutex() { pthread_mutex_init(&m_mutex, NULL); } ~CMutex() { pthread_mutex_destroy(&m_mutex); } void lock() { pthread_mutex_lock(&m_mutex); } void unlock() { pthread_mutex_unlock(&m_mutex); } }; template<typename T> class CAutoLock { T* m_pLock; public: CAutoLock(T* pLock) : m_pLock(pLock) { m_pLock->lock(); } ~CAutoLock() { m_pLock->unlock(); } }; class CConnection { uint32_t m_uConnID; uint32_t m_uNextStreamID; RTMP* m_pRtmp; std::string m_strApp; int m_nStreamType; std::set<uint32_t> m_setUsingStreamID; std::map<uint32_t, std::string> m_mapPublishStreamIDPlayPath; std::map<uint32_t, std::string> m_mapPlayStreamIDPlayPath; std::list<RTMPPacket*> m_listpPacket; CMutex m_mutex; sem_t m_sem; public: // 流类型 enum EStreamType { Unkown = 0, Publish, Play }; CConnection(uint32_t uConnID, int nSocket) : m_uConnID(uConnID) , m_uNextStreamID(1) , m_pRtmp(NULL) , m_strApp("") , m_nStreamType(Unkown) { m_pRtmp = RTMP_Alloc(); RTMP_Init(m_pRtmp); m_pRtmp->m_sb.sb_socket = nSocket; sem_init(&m_sem, 0, 0); } virtual ~CConnection() { RTMP_Close(m_pRtmp); RTMP_Free(m_pRtmp); sem_destroy(&m_sem); } uint32_t ConnID() { return m_uConnID; } RTMP* Rtmp() { return m_pRtmp; } int Socket() { return m_pRtmp->m_sb.sb_socket; } void setAppName(const std::string& strApp) { m_strApp = strApp; } const std::string& getAppName() const { return m_strApp; } void setStreamType(EStreamType emType) { m_nStreamType = emType; } EStreamType getStreamType() { return (EStreamType)m_nStreamType; } uint32_t genStreamID() { uint32_t uStreamID = m_uNextStreamID++; m_setUsingStreamID.insert(uStreamID); return uStreamID; } // 检查流ID是否合法 bool isValidStreamID(uint32_t uStreamID) { return (m_setUsingStreamID.find(uStreamID) != m_setUsingStreamID.end()); } // 登记 推流ID与playpath 映射关系 void bindPublishPlayPath(uint32_t uStreamID, const std::string& strPlayPath) { CAutoLock<CMutex> lock(&m_mutex); m_setUsingStreamID.erase(uStreamID); m_mapPublishStreamIDPlayPath[uStreamID] = strPlayPath; } // 取消登记 推流ID与playpath 映射关系 void unbindPublishPlayPath(uint32_t uStreamID) { CAutoLock<CMutex> lock(&m_mutex); m_mapPublishStreamIDPlayPath.erase(uStreamID); } // 获取 推流ID映射关系 std::string getPublishPlayPath(uint32_t uStreamID) { CAutoLock<CMutex> lock(&m_mutex); auto iter = m_mapPublishStreamIDPlayPath.find(uStreamID); if (iter == m_mapPublishStreamIDPlayPath.end()) return ""; return iter->second; } // 断连时获取 推流的playpath列表 const void getPublishPlayPaths(std::vector<std::string>& vecPlayPath) { CAutoLock<CMutex> lock(&m_mutex); for (auto iter = m_mapPublishStreamIDPlayPath.begin(); iter != m_mapPublishStreamIDPlayPath.end(); ++iter) { vecPlayPath.push_back(iter->second); } } // 断连时清除 推流ID与playpath 映射关系 void cleanPublishPlayPath() { CAutoLock<CMutex> lock(&m_mutex); m_mapPublishStreamIDPlayPath.clear(); } // 登记 拉流ID与playpath 映射关系 void bindPlayPlayPath(uint32_t uStreamID, const std::string& strPlayPath) { CAutoLock<CMutex> lock(&m_mutex); m_setUsingStreamID.erase(uStreamID); m_mapPlayStreamIDPlayPath[uStreamID] = strPlayPath; } // 取消登记 拉流ID与playpath 映射关系 void unbindPlayPlayPath(uint32_t uStreamID) { CAutoLock<CMutex> lock(&m_mutex); m_mapPlayStreamIDPlayPath.erase(uStreamID); } // 获取 拉流ID映射关系 std::string getPlayPlayPath(uint32_t uStreamID) { CAutoLock<CMutex> lock(&m_mutex); auto iter = m_mapPlayStreamIDPlayPath.find(uStreamID); if (iter == m_mapPlayStreamIDPlayPath.end()) return ""; return iter->second; } // 断连时获取 拉流的playpath列表 const void getPlayPlayPaths(std::vector<std::string>& vecPlayPath) { CAutoLock<CMutex> lock(&m_mutex); for (auto iter = m_mapPlayStreamIDPlayPath.begin(); iter != m_mapPlayStreamIDPlayPath.end(); ++iter) { vecPlayPath.push_back(iter->second); } } // 断连时清除 拉流ID与playpath 映射关系 void cleanPlayPlayPath() { CAutoLock<CMutex> lock(&m_mutex); m_mapPlayStreamIDPlayPath.clear(); } // 通知指定的playpath即将重置 void tellResetPlayPath(const std::string& strPlayPath) { CAutoLock<CMutex> lock(&m_mutex); uint32_t uStreamID = 0; for (auto iter = m_mapPublishStreamIDPlayPath.begin(); iter != m_mapPublishStreamIDPlayPath.end(); ++iter) { if (iter->second == strPlayPath) { uStreamID = iter->first; m_mapPublishStreamIDPlayPath.erase(iter); break; } } if (uStreamID == 0) { for (auto iter = m_mapPlayStreamIDPlayPath.begin(); iter != m_mapPlayStreamIDPlayPath.end(); ++iter) { if (iter->second == strPlayPath) { uStreamID = iter->first; m_mapPlayStreamIDPlayPath.erase(iter); break; } } } } // 提取待发送的报文 RTMPPacket* popPacket() { struct timeval tv; gettimeofday(&tv, NULL); double ftime = tv.tv_sec + (tv.tv_usec + ) / (double); struct timespec ts = { (long)ftime, (long)((ftime - (int)ftime) * ) }; sem_timedwait(&m_sem, &ts); CAutoLock<CMutex> lock(&m_mutex); if (m_listpPacket.empty()) return NULL; RTMPPacket* pPacket = m_listpPacket.front(); m_listpPacket.pop_front(); return pPacket; } // 向连接拷贝多个报文 void copyPackets(const std::string& strPlayPath, const std::vector<RTMPPacket*>& vecpPacket) { CAutoLock<CMutex> lock(&m_mutex); for (auto iter = vecpPacket.begin(); iter != vecpPacket.end(); ++iter) { m_listpPacket.push_back(*iter); sem_post(&m_sem); } } }; // 客户端连接管理 => class CConnections { uint32_t m_uNextConnID; std::map<uint32_t, CConnection*> m_mapConnection; CMutex m_mutex; public: CConnections() : m_uNextConnID(1) { } virtual ~CConnections() {} CConnection* createConnection(int nSocket) { CAutoLock<CMutex> lock(&m_mutex); CConnection* pConnection = new CConnection(m_uNextConnID++, nSocket); m_mapConnection[pConnection->ConnID()] = pConnection; return pConnection; } void releaseConnection(uint32_t uConnID) { CAutoLock<CMutex> lock(&m_mutex); CConnection* pConn = __getConnection(uConnID); if (pConn) { m_mapConnection.erase(uConnID); delete pConn; } } CConnection* getConnection(uint32_t uConnID) { CAutoLock<CMutex> lock(&m_mutex); return __getConnection(uConnID); } private: CConnection* __getConnection(uint32_t uConnID) { auto iter = m_mapConnection.find(uConnID); if (iter == m_mapConnection.end()) return NULL; return iter->second; } }; CConnections g_Conns; // 节目容器 => class CPlayPath { std::string m_strPlayPath; bool m_bEOF; uint32_t m_uPublishConnID; std::set<uint32_t> m_setPlayConnID; CMutex m_mutex; public: CPlayPath(const std::string& strPlayPath) : m_strPlayPath(strPlayPath) , m_uPublishConnID(0) , m_bEOF(true) { } virtual ~CPlayPath() {} const std::string& getName() const { return m_strPlayPath; } // 设置/获取 结束标志 void setEOF() { m_bEOF = true; } bool isEOF() { return m_bEOF; } // 重置节目对象 void reset(bool bCleanPlayer = false) { // 清除结束标志 m_bEOF = false; uint32_t uPublishConnID = 0; std::set<uint32_t> setPlayConnID; // 清除推流和拉流连接 { CAutoLock<CMutex> lock(&m_mutex); uPublishConnID = m_uPublishConnID; m_uPublishConnID = 0; if (bCleanPlayer) { m_setPlayConnID.swap(setPlayConnID); } } // 通知推流连接做清除处理 if (uPublishConnID > 0) { CConnection* pConn = g_Conns.getConnection(uPublishConnID); if (pConn) { pConn->tellResetPlayPath(m_strPlayPath); } } // 通知拉流连接做清除处理 for (auto iter = setPlayConnID.begin(); iter != setPlayConnID.end(); ++iter) { CConnection* pConn = g_Conns.getConnection( (*iter) ); if (pConn) { pConn->tellResetPlayPath(m_strPlayPath); } } } // 登记推流连接 void setPublishConn(uint32_t uConnID) { CAutoLock<CMutex> lock(&m_mutex); m_uPublishConnID = uConnID; } // 取消登记推流连接 bool unsetPublishConn() { CAutoLock<CMutex> lock(&m_mutex); m_uPublishConnID = 0; } // 登记拉流连接 bool addPlayConn(uint32_t uConnID) { CAutoLock<CMutex> lock(&m_mutex); auto iter = m_setPlayConnID.find(uConnID); if (iter != m_setPlayConnID.end()) return false; m_setPlayConnID.insert(uConnID); return true; } // 取消登记拉流连接 bool delPlayConn(uint32_t uConnID) { CAutoLock<CMutex> lock(&m_mutex); auto iter = m_setPlayConnID.find(uConnID); if (iter == m_setPlayConnID.end()) return false; m_setPlayConnID.erase(uConnID); return true; } // 暂存媒体报文 void cacheMediaPacket(RTMPPacket* pPacket) { std::set<uint32_t> setPlayConnID; { CAutoLock<CMutex> lock(&m_mutex); setPlayConnID = m_setPlayConnID; } // 简单起见,直接拷贝到拉流连接 for (auto iter = setPlayConnID.begin(); iter != setPlayConnID.end(); ++iter) { CConnection* pConn = g_Conns.getConnection( (*iter) ); if (pConn == NULL) continue; std::vector<RTMPPacket*> vecpPacket; RTMPPacket* pPacketCP = new RTMPPacket; RTMPPacket_Reset(pPacketCP); memcpy(pPacketCP, pPacket, sizeof(RTMPPacket)); RTMPPacket_Alloc(pPacketCP, pPacket->m_nBodySize); memcpy(pPacketCP->m_body, pPacket->m_body, pPacket->m_nBodySize); pPacketCP->m_headerType = RTMP_PACKET_SIZE_MEDIUM; vecpPacket.push_back(pPacketCP); pConn->copyPackets(m_strPlayPath, vecpPacket); } } }; // 应用容器 => class CApp { std::string m_strApp; std::map<std::string, CPlayPath*> m_mappPlayPath; CMutex m_mutex; public: CApp(const std::string& strApp) : m_strApp(strApp) {} virtual ~CApp() {} const std::string& getName() const { return m_strApp; } CPlayPath* getPlayPath(const std::string& strPlayPath, bool bCreate = true) { CAutoLock<CMutex> lock(&m_mutex); auto iter = m_mappPlayPath.find(strPlayPath); if (iter != m_mappPlayPath.end()) return iter->second; if (bCreate) { CPlayPath* pPlayPath = new CPlayPath(strPlayPath); m_mappPlayPath[strPlayPath] = pPlayPath; return pPlayPath; } return NULL; } }; // 应用集合管理 => class CApps { std::map<std::string, CApp*> m_mapApp; CMutex m_mutex; public: CApps() {} virtual ~CApps() {} CApp* getApp(const std::string& strApp, bool bCreate = true) { CAutoLock<CMutex> lock(&m_mutex); auto iter = m_mapApp.find(strApp); if (iter != m_mapApp.end()) return iter->second; if (bCreate) { CApp* pApp = new CApp(strApp); m_mapApp[strApp] = pApp; return pApp; } return NULL; } }; CApps g_Apps; // 程序逻辑 => void* ClientThread(void* _lp); bool MyHandshake(int nSocket); bool Dispatch(CConnection* pConn, RTMPPacket* pPacket); int HandleInvoke(CConnection* pConn, RTMPPacket* pPacket); int HandleMediaPacket(CConnection* pConn, RTMPPacket* pPacket); bool sendWindowAckSize(CConnection* pConn); bool sendPeerOutputBandWide(CConnection* pConn); bool sendOutputChunkSize(CConnection* pConn); bool sendConnectResult(CConnection* pConn, int nOperateID); bool sendCreateStreamResult(CConnection* pConn, int nOperateID, uint32_t nStreamID); bool sendPublishStatus(CConnection* pConn, int nInputStreamID); bool sendPublishError(CConnection* pConn, int nInputStreamID); bool sendPlayStreamBegin(CConnection* pConn, int nInputStreamID); bool sendPlayStatus(CConnection* pConn, int nInputStreamID); int main(int argc, char* argv[]) { if (argc < 3) { printf("usage:%s peer_id peer_key\n", argv[0]); return -1; } RTMP_LogSetLevel(RTMP_LOGDEBUG); kkp2p_engine_conf_t kkp2p_conf; kkp2p_conf.login_domain = "124.71.217.198"; kkp2p_conf.login_port = 3080; kkp2p_conf.lan_search_port = 3549; kkp2p_conf.max_log_size = 1024*1024*10; kkp2p_conf.log_path = NULL; g_engine = kkp2p_engine_init(&kkp2p_conf, 5000); kkp2p_switch_log_level(g_engine, 4); kkp2p_join_lan(g_engine, argv[1]); kkp2p_join_net(g_engine, argv[1], argv[2]); kkp2p_channel_t* channel = (kkp2p_channel_t*)calloc(1, sizeof(kkp2p_channel_t)); while(1) { int ret = kkp2p_accept(g_engine, 1000, channel); if (ret < 0) { // error printf("kkp2p_accept error,exit\n"); free(channel); break; } else if (ret == 0) { // timeout continue; } else { // success pthread_t ThreadId; printf("accept new connection,fd:%d, mode is %d,channel id:%u.\n",channel->fd, channel->transmit_mode, channel->channel_id); pthread_create(&ThreadId, NULL, ClientThread,(void*)channel); channel = (kkp2p_channel_t*)calloc(1, sizeof(kkp2p_channel_t)); } } return 0; } void* ClientThread(void* param) { pthread_detach(pthread_self()); kkp2p_channel_t* channel = (kkp2p_channel_t*)param; // use default block int val = fcntl(channel->fd, F_GETFL, 0); fcntl(channel->fd, F_SETFL, val & (~O_NONBLOCK)); g_Conns.createConnection(channel->fd); CConnection* pConn = g_Conns.createConnection(channel->fd); printf("connection:[%d] coming... \n", pConn->ConnID()); // 握手 bool b = MyHandshake(pConn->Socket()); if (!b) { printf("connection:[%d] handshake failed! \n", pConn->ConnID()); g_Conns.releaseConnection(pConn->ConnID()); return NULL; } while (true) { RTMPPacket packet; packet.m_body = NULL; packet.m_chunk = NULL; RTMPPacket_Reset(&packet); // 读取报文 if (!RTMP_ReadPacket(pConn->Rtmp(), &packet)) { printf("connection:[%d] read error! \n", pConn->ConnID()); break; } if (!RTMPPacket_IsReady(&packet)) continue; //printf("connection:[%d] read headerType:[%d] packetType:[%d] CSID:[%d] StreamID:[%d] hasAbsTimestamp:[%d] nTimeStamp:[%d] m_nBodySize:[%d] \n", // pConn->ConnID(), packet.m_headerType, packet.m_packetType, packet.m_nChannel, packet.m_nInfoField2, packet.m_hasAbsTimestamp, packet.m_nTimeStamp, packet.m_nBodySize); // 报文分派交互 bool b = Dispatch(pConn, &packet); RTMPPacket_Free(&packet); if (!b) { printf("connection:[%d] Dispatch failed! \n", pConn->ConnID()); break; } if (pConn->getStreamType() == CConnection::Play) { printf("connection:[%d] now play... \n", pConn->ConnID()); break; } } // 进入拉流状态 struct timeval tv; gettimeofday(&tv, NULL); double fLastReadTime = tv.tv_sec + tv.tv_usec / (double); while (pConn->getStreamType() == CConnection::Play) { RTMPPacket* pPacket = pConn->popPacket(); struct timeval tvNow; gettimeofday(&tvNow, NULL); double fNowReadTime = tvNow.tv_sec + tvNow.tv_usec / (double); // 超时检查 if (pPacket == NULL) { if (fNowReadTime - fLastReadTime < 30) continue; printf("connection:[%d] too time no packet \n", pConn->ConnID()); break; } fLastReadTime = fNowReadTime; // 下发媒体报文 bool b = RTMP_SendPacket(pConn->Rtmp(), pPacket, FALSE); RTMPPacket_Free(pPacket); delete pPacket; if (!b) { printf("connection:[%d] send failed! \n", pConn->ConnID()); break; } } // 连接退出前关系解除 switch (pConn->getStreamType()) { case CConnection::Publish: { std::vector<std::string> vecPlayPath; pConn->getPublishPlayPaths(vecPlayPath); for (auto iter = vecPlayPath.begin(); iter != vecPlayPath.end(); ++iter) { CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath((*iter), false); if (pPlayPath) { pPlayPath->setEOF(); pPlayPath->unsetPublishConn(); } } pConn->cleanPublishPlayPath(); } break; case CConnection::Play: { std::vector<std::string> vecPlayPath; pConn->getPlayPlayPaths(vecPlayPath); for (auto iter = vecPlayPath.begin(); iter != vecPlayPath.end(); ++iter) { CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath((*iter), false); if (pPlayPath) { pPlayPath->delPlayConn(pConn->ConnID()); } } pConn->cleanPlayPlayPath(); } break; } printf("connection:[%d] exit! \n", pConn->ConnID()); g_Conns.releaseConnection(pConn->ConnID()); kkp2p_close_fd(channel->fd); kkp2p_close_channel(g_engine, channel->channel_id); free(channel); return NULL; } int send_data(int fd, char* buff, int len) { int sended = 0 ; while (sended < len) { int wl = send(fd, buff + sended, len - sended, 0); if (wl < 0) { printf("SendData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len, errno, strerror(errno)); return -1; } sended += wl; } return len; } int recv_data(int fd, char* buff, int len) { int recved = 0 ; while (recved < len) { int wl = recv(fd, buff + recved, len - recved, 0); if (wl < 0) { printf("RecvData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len,errno, strerror(errno)); return -1; } recved += wl; } return len; } // 握手操作 #define RTMP_SIG_SIZE 1536 bool MyHandshake(int nSocket) { char type = 0; if (recv_data(nSocket, (char*)&type, 1) != 1) { return false; } if (type != 3) { return false; } char sClientSIG[RTMP_SIG_SIZE] = {0}; if (recv_data(nSocket, sClientSIG, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) { return false; } if (send_data(nSocket, sClientSIG, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) { return false; } char sServerSIG[1 + RTMP_SIG_SIZE] = {0}; sServerSIG[0] = 3; if (send_data(nSocket, sServerSIG, 1 + RTMP_SIG_SIZE) != 1 + RTMP_SIG_SIZE) { return false; } if (recv_data(nSocket, sServerSIG + 1, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) { return false; } return true; } // 报文分派交互处理 bool Dispatch(CConnection* pConn, RTMPPacket* pPacket) { switch (pPacket->m_packetType) { case 0x01: { if (pPacket->m_nBodySize >= 4) { pConn->Rtmp()->m_inChunkSize = AMF_DecodeInt32(pPacket->m_body); printf("connection:[%d] received: chunk size change to %d \n", pConn->ConnID(), pConn->Rtmp()->m_inChunkSize); } } break; case 0x04: { } break; case 0x05: { if (pPacket->m_nBodySize >= 4) { int nWindowAckSize = AMF_DecodeInt32(pPacket->m_body); printf("connection:[%d] received: window ack size change to %d \n", pConn->ConnID(), nWindowAckSize); } } break; case 0x06: { if (pPacket->m_nBodySize >= 4) { int nOutputBW = AMF_DecodeInt32(pPacket->m_body); printf("connection:[%d] received: output bw change to %d \n", pConn->ConnID(), nOutputBW); } if (pPacket->m_nBodySize >= 5) { int nOutputBW2 = pPacket->m_body[4]; printf("connection:[%d] received: output bw2 change to %d \n", pConn->ConnID(), nOutputBW2); } } break; case 0x08: { HandleMediaPacket(pConn, pPacket); } break; case 0x09: { HandleMediaPacket(pConn, pPacket); } break; case 0x12: { } break; case 0x14: { if (HandleInvoke(pConn, pPacket) < 0) return false; } break; } return true; } #define SAVC(x) static const AVal av_x = AVC(#x) SAVC(connect); SAVC(_result); SAVC(releaseStream); SAVC(FCPublish); SAVC(createStream); SAVC(publish); SAVC(onStatus); SAVC(FCUnpublish); SAVC(deleteStream); SAVC(play); AVal makeAVal(const char* pStr) { return {(char*)pStr, (int)strlen(pStr)}; } // 处理远程调用 int HandleInvoke(CConnection* pConn, RTMPPacket* pPacket) { if (pPacket->m_body[0] != 0x02) { printf("connection:[%d] invalid invoke! \n", pConn->ConnID()); return -1; } uint32_t nInputStreamID = pPacket->m_nInfoField2; AMFObject obj; int nSize = AMF_Decode(&obj, pPacket->m_body, pPacket->m_nBodySize, FALSE); if (nSize < 0) { printf("connection:[%d] invalid packet! \n", pConn->ConnID()); return -1; } AVal method; AMFProp_GetString(AMF_GetProp(&obj, NULL, 0), &method); int nOperateID = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 1)); printf("connection:[%d] server invoking <%s> %d \n", pConn->ConnID(), method.av_val, nOperateID); if (AVMATCH(&method, &av_connect)) { AMFObject obj1; AMFProp_GetObject(AMF_GetProp(&obj, NULL, 2), &obj1); AVal appName = makeAVal("app"); AVal app; AMFProp_GetString(AMF_GetProp(&obj1, &appName, -1), &app); std::string strApp(app.av_val, app.av_len); printf("connection:[%d] connect, app:[%s] \n", pConn->ConnID(), strApp.c_str()); pConn->setAppName(strApp); if (!sendWindowAckSize(pConn)) return -1; if (!sendPeerOutputBandWide(pConn)) return -1; if (!sendOutputChunkSize(pConn)) return -1; if (!sendConnectResult(pConn, nOperateID)) return -1; } else if (AVMATCH(&method, &av_releaseStream)) { AVal playpath; AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath); std::string strPlayPath(playpath.av_val, playpath.av_len); printf("connection:[%d] releaseStream, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str()); // 检查该节目是否推流结束 CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true); if (!pPlayPath->isEOF()) { if (!sendPublishError(pConn, nInputStreamID)) return -1; return 0; } // 重置节目 pPlayPath->reset(false); } else if (AVMATCH(&method, &av_FCPublish)) { AVal playpath; AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath); std::string strPlayPath(playpath.av_val, playpath.av_len); printf("connection:[%d] FCPublish, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str()); // 安全起见,初使化节目 g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true); } else if (AVMATCH(&method, &av_createStream)) { // 生成流ID uint32_t uStreamID = pConn->genStreamID(); printf("connection:[%d] createStream, streamID:[%d] \n", pConn->ConnID(), uStreamID); if (!sendCreateStreamResult(pConn, nOperateID, uStreamID)) return -1; } else if (AVMATCH(&method, &av_publish)) { AVal playpath; AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath); std::string strPlayPath(playpath.av_val, playpath.av_len); printf("connection:[%d] publish, streamID:[%d] playpath:[%s] \n", pConn->ConnID(), nInputStreamID, strPlayPath.c_str()); // 检查streamID的有效性 if (!pConn->isValidStreamID(nInputStreamID)) { printf("connection:[%d] publish, streamID:[%d] invalid! \n", pConn->ConnID(), nInputStreamID); return -1; } // 连接与节目 建立双向关联 pConn->setStreamType(CConnection::Publish); pConn->bindPublishPlayPath(nInputStreamID, strPlayPath); g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->setPublishConn(pConn->ConnID()); if (!sendPublishStatus(pConn, nInputStreamID)) return -1; } else if (AVMATCH(&method, &av_play)) { AVal playpath; AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath); int time = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 4)); std::string strPlayPath(playpath.av_val, playpath.av_len); printf("connection:[%d] play, streamID:[%d] playpath:[%s] time:[%d] \n", pConn->ConnID(), nInputStreamID, strPlayPath.c_str(), time); // 检查streamID的有效性 if (!pConn->isValidStreamID(nInputStreamID)) { printf("connection:[%d] play, streamID:[%d] invalid! \n", pConn->ConnID(), nInputStreamID); return -1; } // 连接与节目 建立双向关联 pConn->setStreamType(CConnection::Play); pConn->bindPlayPlayPath(nInputStreamID, strPlayPath); g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->addPlayConn(pConn->ConnID()); if (!sendPlayStreamBegin(pConn, nInputStreamID)) return -1; if (!sendPlayStatus(pConn, nInputStreamID)) return -1; } else if (AVMATCH(&method, &av_FCUnpublish)) { AVal playpath; AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath); std::string strPlayPath(playpath.av_val, playpath.av_len); printf("connection:[%d] FCUnpublish, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str()); g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->setEOF(); } else if (AVMATCH(&method, &av_deleteStream)) { int nStreamID = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 3)); printf("connection:[%d] deleteStream, streamID:[%d] \n", pConn->ConnID(), nStreamID); // 连接与节目 解除双向关联 std::string strPlayPath = pConn->getPublishPlayPath(nStreamID); if (strPlayPath != "") { pConn->unbindPublishPlayPath(nStreamID); g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->unsetPublishConn(); } strPlayPath = pConn->getPlayPlayPath(nStreamID); if (strPlayPath != "") { pConn->unbindPlayPlayPath(nStreamID); g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->delPlayConn(pConn->ConnID()); } } AMF_Reset(&obj); return 0; } // 处理媒体报文 int HandleMediaPacket(CConnection* pConn, RTMPPacket* pPacket) { uint32_t nInputStreamID = pPacket->m_nInfoField2; const std::string& strPlayPath = pConn->getPublishPlayPath(nInputStreamID); g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->cacheMediaPacket(pPacket); return 0; } // 发送应答窗口大小报文 bool sendWindowAckSize(CConnection* pConn) { char sBuf[256] = {0}; char* pEnd = sBuf + sizeof(sBuf); RTMPPacket packet; packet.m_nChannel = 0x02; packet.m_headerType = RTMP_PACKET_SIZE_LARGE; packet.m_packetType = 0x05; packet.m_nTimeStamp = 0; packet.m_nInfoField2 = 0; packet.m_hasAbsTimestamp = 0; packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE; AMF_EncodeInt32(packet.m_body, pEnd, ); packet.m_nBodySize = 4; if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE)) { printf("connection:[%d] send packet for set window ack size failed! \n", pConn->ConnID()); return false; } return true; } // 发送设置对端输出带宽报文 bool sendPeerOutputBandWide(CConnection* pConn) { char sBuf[256] = {0}; char* pEnd = sBuf + sizeof(sBuf); RTMPPacket packet; packet.m_nChannel = 0x02; packet.m_headerType = RTMP_PACKET_SIZE_LARGE; packet.m_packetType = 0x06; packet.m_nTimeStamp = 0; packet.m_nInfoField2 = 0; packet.m_hasAbsTimestamp = 0; packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE; AMF_EncodeInt32(packet.m_body, pEnd, ); packet.m_body[4] = 2; packet.m_nBodySize = 5; if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE)) { printf("connection:[%d] send packet for set peer output bandwide size failed! \n", pConn->ConnID()); return false; } return true; } // 发送设置输出块大小报文 bool sendOutputChunkSize(CConnection* pConn) { pConn->Rtmp()->m_outChunkSize = 4096; char sBuf[256] = {0}; char* pEnd = sBuf + sizeof(sBuf); RTMPPacket packet; packet.m_nChannel = 0x02; packet.m_headerType = RTMP_PACKET_SIZE_LARGE; packet.m_packetType = 0x01; packet.m_nTimeStamp = 0; packet.m_nInfoField2 = 0; packet.m_hasAbsTimestamp = 0; packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE; AMF_EncodeInt32(packet.m_body, pEnd, 4096); packet.m_nBodySize = 4; if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE)) { printf("connection:[%d] send packet for set chunk size failed! \n", pConn->ConnID()); return false; } return true; } // 发送连接响应报文 bool sendConnectResult(CConnection* pConn, int nOperateID) { char sBuf[256] = {0}; char* pEnd = sBuf + sizeof(sBuf); RTMPPacket packet; packet.m_nChannel = 0x03; packet.m_headerType = RTMP_PACKET_SIZE_LARGE; packet.m_packetType = 0x14; packet.m_nTimeStamp = 0; packet.m_nInfoField2 = 0; packet.m_hasAbsTimestamp = 0; packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE; char* pEnc = packet.m_body; pEnc = AMF_EncodeString(pEnc, pEnd, &av__result); pEnc = AMF_EncodeNumber(pEnc, pEnd, nOperateID); AMFObject obj1 = {0, NULL}; AMFObjectProperty fmsVer; fmsVer.p_name = makeAVal("fmsVer"); fmsVer.p_type = AMF_STRING; fmsVer.p_vu.p_aval = makeAVal("FMS/3,0,1,123"); AMF_AddProp(&obj1, &fmsVer); AMFObjectProperty capabilities; capabilities.p_name = makeAVal("capabilities"); capabilities.p_type = AMF_NUMBER; capabilities.p_vu.p_number = 31; AMF_AddProp(&obj1, &capabilities); pEnc = AMF_Encode(&obj1, pEnc, pEnd); AMFObject obj2 = {0, NULL}; AMFObjectProperty level; level.p_name = makeAVal("level"); level.p_type = AMF_STRING; level.p_vu.p_aval = makeAVal("status"); AMF_AddProp(&obj2, &level); AMFObjectProperty code; code.p_name = makeAVal("code"); code.p_type = AMF_STRING; code.p_vu.p_aval = makeAVal("NetConnection.Connect.Success"); AMF_AddProp(&obj2, &code); pEnc = AMF_Encode(&obj2, pEnc, pEnd); packet.m_nBodySize = pEnc - packet.m_body; if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE)) { printf("connection:[%d] send packet for connect _result failed! \n", pConn->ConnID()); return false; } return true; } // 发送创建流响应报文 bool sendCreateStreamResult(CConnection* pConn, int nOperateID, uint32_t nStreamID) { char sBuf[256] = {0}; char* pEnd = sBuf + sizeof(sBuf); RTMPPacket packet; packet.m_nChannel = 0x03; packet.m_headerType = RTMP_PACKET_SIZE_LARGE; packet.m_packetType = 0x14; packet.m_nTimeStamp = 0; packet.m_nInfoField2 = 0; packet.m_hasAbsTimestamp = 0; packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE; char* pEnc = packet.m_body; pEnc = AMF_EncodeString(pEnc, pEnd, &av__result); pEnc = AMF_EncodeNumber(pEnc, pEnd, nOperateID); *pEnc++ = AMF_NULL; pEnc = AMF_EncodeNumber(pEnc, pEnd, nStreamID); packet.m_nBodySize = pEnc - packet.m_body; if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE)) { printf("connection:[%d] send packet for createStream _result failed! \n", pConn->ConnID()); return false; } return true; } // 发送推流状态响应报文 bool sendPublishStatus(CConnection* pConn, int nInputStreamID) { char sBuf[256] = {0}; char* pEnd = sBuf + sizeof(sBuf); RTMPPacket packet; packet.m_nChannel = 0x05; packet.m_headerType = RTMP_PACKET_SIZE_LARGE; packet.m_packetType = 0x14; packet.m_nTimeStamp = 0; packet.m_nInfoField2 = nInputStreamID; packet.m_hasAbsTimestamp = 0; packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE; char* pEnc = packet.m_body; pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus); pEnc = AMF_EncodeNumber(pEnc, pEnd, 0); *pEnc++ = AMF_NULL; AMFObject obj2 = {0, NULL}; AMFObjectProperty level; level.p_name = makeAVal("level"); level.p_type = AMF_STRING; level.p_vu.p_aval = makeAVal("status"); AMF_AddProp(&obj2, &level); AMFObjectProperty code; code.p_name = makeAVal("code"); code.p_type = AMF_STRING; code.p_vu.p_aval = makeAVal("NetStream.Publish.Start"); AMF_AddProp(&obj2, &code); pEnc = AMF_Encode(&obj2, pEnc, pEnd); packet.m_nBodySize = pEnc - packet.m_body; if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE)) { printf("connection:[%d] send packet for publish onStatus failed! \n", pConn->ConnID()); return false; } return true; } // 发送推流错误响应报文 bool sendPublishError(CConnection* pConn, int nInputStreamID) { char sBuf[256] = {0}; char* pEnd = sBuf + sizeof(sBuf); RTMPPacket packet; packet.m_nChannel = 0x05; packet.m_headerType = RTMP_PACKET_SIZE_LARGE; packet.m_packetType = 0x14; packet.m_nTimeStamp = 0; packet.m_nInfoField2 = nInputStreamID; packet.m_hasAbsTimestamp = 0; packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE; char* pEnc = packet.m_body; pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus); pEnc = AMF_EncodeNumber(pEnc, pEnd, 0); *pEnc++ = AMF_NULL; AMFObject obj2 = {0, NULL}; AMFObjectProperty level; level.p_name = makeAVal("level"); level.p_type = AMF_STRING; level.p_vu.p_aval = makeAVal("error"); AMF_AddProp(&obj2, &level); AMFObjectProperty code; code.p_name = makeAVal("code"); code.p_type = AMF_STRING; code.p_vu.p_aval = makeAVal("NetStream.Publish.BadName"); AMF_AddProp(&obj2, &code); AMFObjectProperty description; description.p_name = makeAVal("description"); description.p_type = AMF_STRING; description.p_vu.p_aval = makeAVal("Already publishing"); AMF_AddProp(&obj2, &description); pEnc = AMF_Encode(&obj2, pEnc, pEnd); packet.m_nBodySize = pEnc - packet.m_body; if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE)) { printf("connection:[%d] send packet for publish onStatus failed! \n", pConn->ConnID()); return false; } return true; } // 发送拉流事件报文 bool sendPlayStreamBegin(CConnection* pConn, int nInputStreamID) { char sBuf[256] = {0}; char* pEnd = sBuf + sizeof(sBuf); RTMPPacket packet; packet.m_nChannel = 0x02; packet.m_headerType = RTMP_PACKET_SIZE_LARGE; packet.m_packetType = 0x04; packet.m_nTimeStamp = 0; packet.m_nInfoField2 = 0; packet.m_hasAbsTimestamp = 0; packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE; char* pEnc = packet.m_body; pEnc = AMF_EncodeInt16(pEnc, pEnd, 0); pEnc = AMF_EncodeInt32(pEnc, pEnd, nInputStreamID); packet.m_nBodySize = pEnc - packet.m_body; if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE)) { printf("connection:[%d] send packet for play event failed! \n", pConn->ConnID()); return false; } return true; } // 发送拉流状态响应报文 bool sendPlayStatus(CConnection* pConn, int nInputStreamID) { char sBuf[256] = {0}; char* pEnd = sBuf + sizeof(sBuf); RTMPPacket packet; packet.m_nChannel = 0x05; packet.m_headerType = RTMP_PACKET_SIZE_LARGE; packet.m_packetType = 0x14; packet.m_nTimeStamp = 0; packet.m_nInfoField2 = nInputStreamID; packet.m_hasAbsTimestamp = 0; packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE; char* pEnc = packet.m_body; pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus); pEnc = AMF_EncodeNumber(pEnc, pEnd, 0); *pEnc++ = AMF_NULL; AMFObject obj2 = {0, NULL}; AMFObjectProperty level; level.p_name = makeAVal("level"); level.p_type = AMF_STRING; level.p_vu.p_aval = makeAVal("status"); AMF_AddProp(&obj2, &level); AMFObjectProperty code; code.p_name = makeAVal("code"); code.p_type = AMF_STRING; code.p_vu.p_aval = makeAVal("NetStream.Play.Start"); AMF_AddProp(&obj2, &code); pEnc = AMF_Encode(&obj2, pEnc, pEnd); packet.m_nBodySize = pEnc - packet.m_body; if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE)) { printf("connection:[%d] send packet for play onStatus failed! \n", pConn->ConnID()); return false; } return true; }
小讯
上一篇 2025-03-27 14:26
下一篇 2025-03-06 22:13

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/127677.html