libpeer分析
- 网络
- 2024-11-24
- 677热度
- 0评论
关键数据结构
PeerConfiguration
typedef struct PeerConfiguration {
IceServer ice_servers[5];
MediaCodec audio_codec;
MediaCodec video_codec;
DataChannelType datachannel;
void (*onaudiotrack)(uint8_t* data, size_t size, void* userdata);
void (*onvideotrack)(uint8_t* data, size_t size, void* userdata);
void (*on_request_keyframe)(void* userdata);
void* user_data;
} PeerConfiguration;
实例:
PeerConfiguration config = {
.ice_servers = {
{.urls = "stun:stun.l.google.com:19302"},
},
.datachannel = DATA_CHANNEL_STRING,
.video_codec = CODEC_H264,
.audio_codec = CODEC_PCMA};
Agent
Agent 结构体,主要用于实现 WebRTC 或类似协议中的 ICE (Interactive Connectivity Establishment) 机制。ICE 是一种 NAT 穿越协议,用于在点对点通信中建立连接并选择最佳的网络路径。Agent 结构体存储了与 ICE 协议操作相关的各种数据,用于管理连接的候选地址、协商过程以及状态。
struct Agent {
char remote_ufrag[ICE_UFRAG_LENGTH + 1];
char remote_upwd[ICE_UPWD_LENGTH + 1];
char local_ufrag[ICE_UFRAG_LENGTH + 1];
char local_upwd[ICE_UPWD_LENGTH + 1];
IceCandidate local_candidates[AGENT_MAX_CANDIDATES];
IceCandidate remote_candidates[AGENT_MAX_CANDIDATES];
int local_candidates_count;
int remote_candidates_count;
UdpSocket udp_sockets[2];
Address host_addr;
int b_host_addr;
uint64_t binding_request_time;
AgentState state;
AgentMode mode;
IceCandidatePair candidate_pairs[AGENT_MAX_CANDIDATE_PAIRS];
IceCandidatePair* selected_pair;
IceCandidatePair* nominated_pair;
int candidate_pairs_num;
int use_candidate;
uint32_t transaction_id[3];
};
ServiceConfiguration
typedef struct ServiceConfiguration {
const char* mqtt_url;
int mqtt_port;
const char* client_id;
const char* http_url;
int http_port;
const char* username;
const char* password;
PeerConnection* pc;
} ServiceConfiguration;
实例
#define SERVICE_CONFIG_DEFAULT() \
{ \
.mqtt_url = "broker.emqx.io", \ //访问的MQTT服务器
.mqtt_port = 8883, \ //访问的MQTT端口
.client_id = "peer", \ //用于订阅的主题 webrtc/peer/jsonrpc
//发布的主题 webrtc/peer/jsonrpc-reply
.http_url = "", \ //信令使用的是MQTT,没有使用http,
.http_port = 443, \ //因此HTTP的没有用。
.username = "", \
.password = "", \
.pc = NULL \
}
PeerConnection
struct PeerConnection {
PeerConfiguration config;
PeerConnectionState state;
Agent agent;
DtlsSrtp dtls_srtp;
Sctp sctp;
Sdp local_sdp;
Sdp remote_sdp;
void (*onicecandidate)(char* sdp, void* user_data);
void (*oniceconnectionstatechange)(PeerConnectionState state, void* user_data);
void (*on_connected)(void* userdata);
void (*on_receiver_packet_loss)(float fraction_loss, uint32_t total_loss, void* user_data);
uint8_t temp_buf[CONFIG_MTU];
uint8_t agent_buf[CONFIG_MTU];
int agent_ret;
int b_local_description_created;
Buffer* audio_rb;
Buffer* video_rb;
Buffer* data_rb;
RtpEncoder artp_encoder;
RtpEncoder vrtp_encoder;
RtpDecoder vrtp_decoder;
RtpDecoder artp_decoder;
uint32_t remote_assrc;
uint32_t remote_vssrc;
};
WebRTC简介
概览
上图中一共有4中角色,分别是signaling server、STUN/RURN server、Client A,Client B;
- signaling server:信令指的是管理两个通信设备A和B建立和管理点对点连接过程中的控制消息交换机制。让通信双方能够交换各种信息,从而建立、维护和终止一个点对点的实时通信连接。
- STUN/RURN server:用于设备A和设备B传过NAT。
- Client A:通信设备A
- Client B:通信设备B
通信流程
WebRTC中客户端与信令服务器、STUN/TURN服务器的交互流程如下:
- ClientA首先创建PeerConnection对象,然后打开本地音视频设备,将音视频数据封装成MediaStream添加到PeerConnection中。
-
ClientA调用PeerConnection的CreateOffer方法创建一个用于offer的SDP对象,SDP对象中保存当前音视频的相关参数。ClientA通过PeerConnection的SetLocalDescription方法将该SDP对象保存起来,并通过Signal服务器发送给ClientB。
-
ClientB接收到ClientA发送过的offer SDP对象,通过PeerConnection的SetRemoteDescription方法将其保存起来,并调用PeerConnection的CreateAnswer方法创建一个应答的SDP对象,通过PeerConnection的SetLocalDescription的方法保存该应答SDP对象并将它通过Signal服务器发送给ClientA。
-
ClientA接收到ClientB发送过来的应答SDP对象,将其通过PeerConnection的SetRemoteDescription方法保存起来。
-
在SDP信息的offer/answer流程中,ClientA和ClientB已经根据SDP信息创建好相应的音频Channel和视频Channel并开启Candidate数据的收集,Candidate数据可以简单地理解成Client端的IP地址信息(本地IP地址、公网IP地址、Relay服务端分配的地址)。
-
当ClientA收集到Candidate信息后,PeerConnection会通过OnIceCandidate接口给ClientA发送通知,ClientA将收到的Candidate信息通过Signal服务器发送给ClientB,ClientB通过PeerConnection的AddIceCandidate方法保存起来。同样的操作ClientB对ClientA再来一次。
-
这样ClientA和ClientB就已经建立了音视频传输的P2P通道,ClientB接收到ClientA传送过来的音视频流,会通过PeerConnection的OnAddStream回调接口返回一个标识ClientA端音视频流的MediaStream对象,在ClientB端渲染出来即可。同样操作也适应ClientB到ClientA的音视频流的传输。
上述序列中,WebRTC并不提供Stun服务器和Signal服务器,服务器端需要自己实现。Stun服务器可以用google提供的实现stun协议的测试服务器(stun:stun.l.google.com:19302),Signal服务器则完全需要自己实现了,可以使用MQTT的broker.emqx.io来作为信令服务器。它需要在ClientA和ClientB之间传送彼此的SDP信息和candidate信息,ClientA和ClientB通过这些信息建立P2P连接来传送音视频数据。
信令
待补充
SDP
SDP是一个比较老的协议,发布于2006年,以type=value的格式描述回话内容。WebRTC引入SDP来描述媒体信息,用于媒体协商时决定双方是否可以进行通信。当对端设备anser的时候会携带一个SDP格式的内容,如下。
v=0
o=- 8646007345366799659 2 IN IP4 127.0.0.1
s=-
t=0 0
a=group:BUNDLE video audio datachannel
a=msid-semantic: WMS 5a162afd-e195-4207-a699-e977bb510327
m=video 10773 UDP/TLS/RTP/SAVPF 96 102
c=IN IP4 14.29.67.186
a=rtcp:9 IN IP4 0.0.0.0
a=candidate:1969734079 1 udp 2122260223 192.168.31.123 65492 typ host generation 0 network-id 1 network-cost 10
a=candidate:2345473323 1 tcp 1518280447 192.168.31.123 9 typ host tcptype active generation 0 network-id 1 network-cost 10
a=candidate:3819939686 1 udp 1686052607 14.29.67.186 10773 typ srflx raddr 192.168.31.123 rport 65492 generation 0 network-id 1 network-cost 10
a=ice-ufrag:JgBs
a=ice-pwd:1USyOciE2u1Uzq97tWIhcx7v
a=ice-options:trickle
a=fingerprint:sha-256 C1:6F:4D:10:6E:99:AC:9C:5F:CD:24:C8:A5:83:75:AE:45:1A:D4:7D:E0:73:B6:0A:67:4E:ED:C3:88:C2:6C:42
a=setup:active
a=mid:video
a=recvonly
a=rtcp-mux
a=rtpmap:96 H264/90000
a=fmtp:96 level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42e01f
a=rtpmap:102 H264/90000
a=rtcp-fb:102 nack
a=rtcp-fb:102 nack pli
a=fmtp:102 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f
m=audio 9 UDP/TLS/RTP/SAVP 8
c=IN IP4 0.0.0.0
a=rtcp:9 IN IP4 0.0.0.0
a=ice-ufrag:JgBs //对端用户名
a=ice-pwd:1USyOciE2u1Uzq97tWIhcx7v //对端的密码
a=ice-options:trickle
a=fingerprint:sha-256 C1:6F:4D:10:6E:99:AC:9C:5F:CD:24:C8:A5:83:75:AE:45:1A:D4:7D:E0:73:B6:0A:67:4E:ED:C3:88:C2:6C:42 //指纹信息
a=setup:active
a=mid:audio
a=sendrecv
a=msid:5a162afd-e195-4207-a699-e977bb510327 693effe8-ba8d-469d-ba6a-f58e2b5f73df
a=rtcp-mux
a=rtpmap:8 PCMA/8000
a=ssrc:1668486723 cname:o+PtcUckwEIRvBPp
m=application 9 UDP/DTLS/SCTP webrtc-datachannel
c=IN IP4 0.0.0.0
a=ice-ufrag:JgBs
a=ice-pwd:1USyOciE2u1Uzq97tWIhcx7v
a=ice-options:trickle
a=fingerprint:sha-256 C1:6F:4D:10:6E:99:AC:9C:5F:CD:24:C8:A5:83:75:AE:45:1A:D4:7D:E0:73:B6:0A:67:4E:ED:C3:88:C2:6C:42
a=setup:active
a=mid:datachannel
a=sctp-port:5000
a=max-message-size:262144
ICE candidates
ICE (Interactive Connectivity Establishment 互动连接建立) candidates(候选人)简称ICE candidates,WebRTC中的ICE Candidate是用来描述可以连接的远端的基本信息,什么是candidate,两台设备的连接,需要知道设备的网络信息,如IP地址,端口号以及使用的协议,因此candidate至少包含{address,port,protocol}三元组信息集。通过SDP会话来交互candidate信息。
WebRTC将Candidate分成四种类型,且类型间存在优先级次序,从高到低分别为host、srflx、prflx和relay,本章节使用的是srflx类型,从STUN服务器获取的地址。
- host:从本机网卡上获取到的地址,一般来说,一个网卡对应一个地址。
- srflx(server reflexive):从STUN服务器获取到的地址。
- relay:从TRUN服务器获取到的地址。
- prflx(peer reflexive):在交互过程中从对端数据报文中获取到的地址。
其中,srflx和prflx地址可能是一样的,但获取的途径不一样,下面是描述ICE candidates的数据结构。
typedef enum IceCandidateType {
ICE_CANDIDATE_TYPE_HOST,
ICE_CANDIDATE_TYPE_SRFLX,
ICE_CANDIDATE_TYPE_PRFLX,
ICE_CANDIDATE_TYPE_RELAY,
} IceCandidateType;
typedef struct IceCandidate IceCandidate;
struct IceCandidate {
int foundation;
int component; //1:RTP,2:RTCP
uint32_t priority;//优先级
char transport[32 + 1]; //传输协议,基于UDP
IceCandidateType type; //类型,如上。
IceCandidateState state;
Address addr;
Address raddr;
};
main函数分析
/*1. 调用srtp_init,初始化srtp内核模块加密套件、debug等*/
peer_init();
/*2. 创建一个peer连接,其中创建了agent,打开了一个udp socket,初始化
了rtp 音频、视频编解码,主要是确定用什么编码格式。*/
g_pc = peer_connection_create(&config);
/*3. 注册一个peer连接的状态切换回调函数,回调函数 onconnectionstatechange,
用于处理对等连接的 ICE 连接状态变更。当连接的状态发生变化(例如从new到connected,
或者 failed),这个回调函数将被调用。*/
peer_connection_oniceconnectionstatechange(g_pc, onconnectionstatechange);
/*4. 设置数据通道(DataChannel)相关的回调函数。当数据通道打开时,会触发
onopen;接收到消息时,会触发 onmessage;当数据通道关闭时会触发onclose*/
peer_connection_ondatachannel(g_pc, onmessage, onopen, onclose);
/*5. 设置信令服务器的配置,通常用于 WebRTC 会话中的信令部分。通过传入
service_config 配置,WebRTC 客户端可以向信令服务器注册或传递配置信息
(如 client_id 和对等连接句柄 pc)
这里的信令服务器用的是公用的mqtt服务器? broker.emqx.io
使用mqtt来作为信令通信,没有使用http,在里面会注册一个peer_connect的候选回调
*/
/*什么是信令
在webrtc中,信令指的是建立和管理点对点连接过程中的控制消息交换机制。让通信双方
能够交换各种信息,从而建立、维护和终止一个点对点的实时通信连接。
WebRTC 的核心目标是实现浏览器之间的直接通信,支持音频、视频和数据传输,
但在建立这种连接之前,双方需要交换一些控制信息,包括:
- 会话描述(如 SDP,Session Description Protocol):描述连接的媒体设置(如视频编码格式、音频参数等)。
- 网络候选(ICE候选,ICE candidates):用于寻找最佳的点对点网络路径,以确保即使在复杂的网络环境下也能建立连接。Interactive Connectivity Establishment(互动连接建立)
- 连接状态:例如连接是否已建立、是否关闭等。
*/
/*信令的实现
虽然 WebRTC 协议本身并没有指定信令的实现方式,但是它提供了用于交换数据的 API。
开发者可以选择适合自己的信令协议和传输方式,通常的实现方式包括:
- WebSocket:一个常见的双向通信协议,可以实时地交换信令数据。
- HTTP 请求:通过轮询、长轮询等方式进行信令交换。
- SIP (Session Initiation Protocol):一些应用可能采用 SIP 作为信令协议。
- XMPP (Extensible Messaging and Presence Protocol):也是一种可以用来实现信令的协议,尤其适用于即时消息和通信应用。
- MQTT:通过订阅和发布的方式来进行传输,本文就是使用这种方式。
*/
/*信令的工作流程
假设有两个用户 A 和 B 通过 WebRTC 建立视频通话,信令的过程大致如下:
(1) 用户A生成offer:用 A创建一个 RTCPeerConnection 对象,并
生成一个offer。这个 offer 包含了 A 的媒体设置和网络候选信息(ICE 候选)。
A 将这个 offer 通过信令系统(例如 MQTT)发送给用户 B。
(2)用户B接收到 offer,并生成 answer:用户 B 收到 offer 后,使用它来创
建一个 RTCPeerConnection,并生成一个 answer(应答)。这个 answer 包含 B
的媒体配置和 ICE 候选信息。B 将 answer 通过信令系统发送给用户 A。
(3)交换 ICE 候选:在连接过程中,双方都会收集到 ICE 候选并通过信令进行交换,
直到双方都确定了最佳的网络路径。
(4)建立连接:当双方完成了 ICE 候选交换并且建立了连接时,通信就可以开始了。
(5)连接终止:当会话结束时,双方通过信令通知对方关闭连接。*/
service_config.client_id = argv[1];
service_config.pc = g_pc;
peer_signaling_set_config(&service_config);
/*6.初始化MQTT,连接broker.emqx.io, */
peer_signaling_join_channel();
/*7. 创建peer连接的task*/
pthread_create(&peer_connection_thread, NULL, peer_connection_task, NULL);
/*8. 创建signal的task*/
pthread_create(&peer_singaling_thread, NULL, peer_singaling_task, NULL);
/*9. 读取本地音视频初始化*/
reader_init();
/*10. 循环判断PEER的状态是否是连接态,如果是已经连接,则发送音视频数据*/
while (!g_interrupted) {
if (g_state == PEER_CONNECTION_COMPLETED) {
curr_time = get_timestamp();
// FPS 25
if (curr_time - video_time > 40) {
video_time = curr_time;
if (reader_get_video_frame(buf, &size) == 0) {
peer_connection_send_video(g_pc, buf, size);
}
}
if (curr_time - audio_time > 20) {
if (reader_get_audio_frame(buf, &size) == 0) {
peer_connection_send_audio(g_pc, buf, size);
}
audio_time = curr_time;
}
usleep(1000);
}
}
信令交互
int peer_signaling_join_channel() {
/*1. 与信令服务器建立连接,实际上就是连接MQTT的broker*/
if (peer_signaling_mqtt_connect(g_ps.mqtt_host, g_ps.mqtt_port) < 0) {
LOGW("Connect MQTT server failed");
return -1;
}
/*1. 发布信令的消息*/
peer_signaling_mqtt_subscribe(1);
return 0;
}
与信令服务器建立连接
因为与MQTT broker.emqx.io的交互是TLS加解密的,因此先使用ssl_transport_connect建立加解密通道,接着对MQTT进行初始化,将ssl的收发函数注册到MQTT中,这样MQTT后续的就可以用SSL的加解密通道进行通信,最后是发起MQTT连接。
static int peer_signaling_mqtt_connect(const char* hostname, int port) {
MQTTStatus_t status;
MQTTConnectInfo_t conn_info;
bool session_present;
/*1. 与mqtt的broker,其host="broker.emqx.io",port=8883建立TLS连接,MQTT的收发消息
是否通过加密的方式,因此需要先使用TLS进行连接*/
if (ssl_transport_connect(&g_ps.net_ctx, hostname, port, NULL) < 0) {
LOGE("ssl transport connect failed");
return -1;
}
/*2. ssl建立连接之后,将发送和接收的函数通过MQTT_Init进行注册,后续其收发就会调
调用ssl_transport_recv和ssl_transport_send进行解加密收发。
MQTT_Init初始化时还注册回调函数, 对CONNACK/PUBLISH/SUBACK事件进处理
需要注意的时,当设备收到broker发过来的PUBLISH事件时,会回调这个函数进行
处理,这样的场景是当对端设备访问连接是,就会调用peer_signaling_mqtt_event_cb。*/
g_ps.transport.recv = ssl_transport_recv;
g_ps.transport.send = ssl_transport_send;
g_ps.transport.pNetworkContext = &g_ps.net_ctx;
g_ps.mqtt_fixed_buf.pBuffer = g_ps.mqtt_buf;
g_ps.mqtt_fixed_buf.size = sizeof(g_ps.mqtt_buf);
status = MQTT_Init(&g_ps.mqtt_ctx, &g_ps.transport,
ports_get_epoch_time, peer_signaling_mqtt_event_cb, &g_ps.mqtt_fixed_buf);
memset(&conn_info, 0, sizeof(conn_info));
conn_info.cleanSession = false;
if (strlen(g_ps.username) > 0) {
conn_info.pUserName = g_ps.username;
conn_info.userNameLength = strlen(g_ps.username);
}
if (strlen(g_ps.password) > 0) {
conn_info.pPassword = g_ps.password;
conn_info.passwordLength = strlen(g_ps.password);
}
if (strlen(g_ps.client_id) > 0) {
conn_info.pClientIdentifier = g_ps.client_id;
conn_info.clientIdentifierLength = strlen(g_ps.client_id);
}
conn_info.keepAliveSeconds = KEEP_ALIVE_TIMEOUT_SECONDS;
/*3.建立MQTT的,MQTT协议与broker连接。前面步骤1是建立SSL连接,相当于是建立了加解密通道,
而这里是建立MQTT的协议通道。*/
status = MQTT_Connect(&g_ps.mqtt_ctx,
&conn_info, NULL, CONNACK_RECV_TIMEOUT_MS, &session_present);
if (status != MQTTSuccess) {
LOGE("MQTT_Connect failed: Status=
return -1;
}
LOGI("MQTT_Connect succeeded.");
return 0;
}
通知信令服务器已准备
通知服务器已经准备,主要是通过MQTT的订阅主题为\"webrtc/peer/jsonrpc\",等待接入设备接入。
static int peer_signaling_mqtt_subscribe(int subscribed) {
MQTTStatus_t status = MQTTSuccess;
MQTTSubscribeInfo_t sub_info;
/*1. 获取MQTT的packet id*/
uint16_t packet_id = MQTT_GetPacketId(&g_ps.mqtt_ctx);、
/*2. MQTT QOS为0即只发一次,订阅的主题是"webrtc/peer/jsonrpc" */
memset(&sub_info, 0, sizeof(sub_info));
sub_info.qos = MQTTQoS0;
sub_info.pTopicFilter = g_ps.subtopic;
sub_info.topicFilterLength = strlen(g_ps.subtopic);
/*3. 发送订阅消息*/
if (subscribed) {
status = MQTT_Subscribe(&g_ps.mqtt_ctx, &sub_info, 1, packet_id);
} else {
status = MQTT_Unsubscribe(&g_ps.mqtt_ctx, &sub_info, 1, packet_id);
}
if (status != MQTTSuccess) {
LOGE("MQTT_Subscribe failed: Status=
return -1;
}
/*4. 等待MQTT 订阅消息发送成功*/
status = MQTT_ProcessLoop(&g_ps.mqtt_ctx);
/* 调用该函数实时处理循环接收数据,并自定发送心跳包保持链接,当有数据到来时,会触发myEventCallback回调函数*/
if (status != MQTTSuccess) {
LOGE("MQTT_ProcessLoop failed: Status=
return -1;
}
LOGD("MQTT Subscribe/Unsubscribe succeeded.");
return 0;
}
接收信令服务器事件处理
在peer_signaling_mqtt_connect函数中,调用MQTT_Init初始化时,注册了一个peer_signaling_mqtt_event_cb回调函数,其对接收的MQTT消息进行处理,包括收到了Broker发过来的PUBLISH消息,因此当设备接入时,调用的是peer_signaling_mqtt_event_cb函数。
static void peer_signaling_mqtt_event_cb(MQTTContext_t* mqtt_ctx,
MQTTPacketInfo_t* packet_info,
MQTTDeserializedInfo_t* deserialized_info) {
switch (packet_info->type) {
case MQTT_PACKET_TYPE_CONNACK:
LOGI("MQTT_PACKET_TYPE_CONNACK");
break;
case MQTT_PACKET_TYPE_PUBLISH:
LOGI("MQTT_PACKET_TYPE_PUBLISH");
/*收到了broker发送过来的PUBLISH消息*/
peer_signaling_on_pub_event(deserialized_info->pPublishInfo->pPayload,
deserialized_info->pPublishInfo->payloadLength);
break;
case MQTT_PACKET_TYPE_SUBACK:
LOGD("MQTT_PACKET_TYPE_SUBACK");
break;
default:
break;
}
}
static void peer_signaling_on_pub_event(const char* msg, size_t size) {
cJSON *req, *res, *item, *result, *error;
int id = -1;
char* payload = NULL;
PeerConnectionState state;
req = res = item = result = error = NULL;
/*1. 先获取peer connect的状态*/
state = peer_connection_get_state(g_ps.pc);
printf("
do {
/*2. 解析json文件*/
req = cJSON_Parse(msg);
if (!req) {
error = cJSON_CreateRaw(RPC_ERROR_PARSE_ERROR);
LOGW("Parse json failed");
break;
}
/*3. 解析jsion中字段的id值*/
item = cJSON_GetObjectItem(req, "id");
if (!item && !cJSON_IsNumber(item)) {
error = cJSON_CreateRaw(RPC_ERROR_INVALID_REQUEST);
LOGW("Cannot find id");
break;
}
id = item->valueint;
/*4. 解析jison中的method字段是是什么?*/
item = cJSON_GetObjectItem(req, "method");
if (!item && cJSON_IsString(item)) {
error = cJSON_CreateRaw(RPC_ERROR_INVALID_REQUEST);
LOGW("Cannot find method");
break;
}
/* 5. 解析json文件,发现metho为offer,broker发过来要求offer,表示有设备加入了。*/
/* {"jsonrpc":"2.0","method":"offer","id":89} */
if (strcmp(item->valuestring, RPC_METHOD_OFFER) == 0) {
switch (state) {
case PEER_CONNECTION_NEW:
case PEER_CONNECTION_DISCONNECTED:
case PEER_CONNECTION_FAILED:
case PEER_CONNECTION_CLOSED: {
g_ps.id = id;
/*6. 将peer connect状态切换为PEER_CONNECTION_NEW
在peer_connection_loop中就会调用到peer_connection_state_new,
offer一个SDP*/
peer_connection_create_offer(g_ps.pc);
} break;
default: {
error = cJSON_CreateRaw(RPC_ERROR_INTERNAL_ERROR);
} break;
}
/*7. broker发送过来的是anwer,表示对端设备的应答*/
} else if (strcmp(item->valuestring, RPC_METHOD_ANSWER) == 0) {
item = cJSON_GetObjectItem(req, "params");
if (!item && !cJSON_IsString(item)) {
error = cJSON_CreateRaw(RPC_ERROR_INVALID_PARAMS);
LOGW("Cannot find params");
break;
}
/*8. 收到对端应答后,解析对端anser的SDP内容,设置远端的描述信息*/
if (state == PEER_CONNECTION_NEW) {
peer_connection_set_remote_description(g_ps.pc, item->valuestring);
result = cJSON_CreateString("");
}
} else if (strcmp(item->valuestring, RPC_METHOD_STATE) == 0) {
result = cJSON_CreateString(peer_connection_state_to_string(state));
} else if (strcmp(item->valuestring, RPC_METHOD_CLOSE) == 0) {
peer_connection_close(g_ps.pc);
result = cJSON_CreateString("");
} else {
error = cJSON_CreateRaw(RPC_ERROR_METHOD_NOT_FOUND);
LOGW("Unsupport method");
}
} while (0);
/*9. 发布消息,发布什么内容??*/
if (result || error) {
res = cJSON_CreateObject();
cJSON_AddStringToObject(res, "jsonrpc", RPC_VERSION);
cJSON_AddNumberToObject(res, "id", id);
if (result) {
cJSON_AddItemToObject(res, "result", result);
} else if (error) {
cJSON_AddItemToObject(res, "error", error);
}
payload = cJSON_PrintUnformatted(res);
if (payload) {
peer_signaling_mqtt_publish(&g_ps.mqtt_ctx, payload);
free(payload);
}
cJSON_Delete(res);
}
if (req) {
cJSON_Delete(req);
}
}
Offer SDP
当对端Client B设备通过信令服务器发起连接时,信令服务器的MQTT broker发送publish给当前Client A设备,接收到MQTT 的publish后,解析到method为offer,即将peer的状态切换为PEER_CONNECTION_NEW,继而peer_connection_loop中状态切换为peer_connection_state_new。
static void peer_connection_state_new(PeerConnection* pc, DtlsSrtpRole role, int isOfferer) {
char* description = (char*)pc->temp_buf;
memset(pc->temp_buf, 0, sizeof(pc->temp_buf));
dtls_srtp_reset_session(&pc->dtls_srtp);
/*dtls srtp 初始化,在后续的章节再进行描述。*/
dtls_srtp_init(&pc->dtls_srtp, role, pc);
pc->dtls_srtp.udp_recv = peer_connection_dtls_srtp_recv;
pc->dtls_srtp.udp_send = peer_connection_dtls_srtp_send;
pc->sctp.connected = 0;
if (isOfferer) {
agent_clear_candidates(&pc->agent);
pc->agent.mode = AGENT_MODE_CONTROLLING;
} else {
pc->agent.mode = AGENT_MODE_CONTROLLED;
}
/*1. 通过stun传输获取candidate信息*/
agent_gather_candidate(&pc->agent, NULL, NULL, NULL); // host address
for (int i = 0; i < sizeof(pc->config.ice_servers) / sizeof(pc->config.ice_servers[0]); ++i) {
if (pc->config.ice_servers[i].urls) {
LOGI("ice server:
agent_gather_candidate(&pc->agent, pc->config.ice_servers[i].urls, pc->config.ice_servers[i].username, pc->config.ice_servers[i].credential);
}
}
/*将candidate信息转化为SDP格式?*/
agent_get_local_description(&pc->agent, description, sizeof(pc->temp_buf));
memset(&pc->local_sdp, 0, sizeof(pc->local_sdp));
// TODO: check if we have video or audio codecs
/*创建一个SDP会话*/
sdp_create(&pc->local_sdp,
pc->config.video_codec != CODEC_NONE,
pc->config.audio_codec != CODEC_NONE,
pc->config.datachannel);
/*填充其他的sdp信息*/
if (pc->config.video_codec == CODEC_H264) {
sdp_append_h264(&pc->local_sdp);
sdp_append(&pc->local_sdp, "a=fingerprint:sha-256
sdp_append(&pc->local_sdp, peer_connection_dtls_role_setup_value(role));
strcat(pc->local_sdp.content, description);
}
switch (pc->config.audio_codec) {
case CODEC_PCMA:
sdp_append_pcma(&pc->local_sdp);
sdp_append(&pc->local_sdp, "a=fingerprint:sha-256
sdp_append(&pc->local_sdp, peer_connection_dtls_role_setup_value(role));
strcat(pc->local_sdp.content, description);
break;
case CODEC_PCMU:
sdp_append_pcmu(&pc->local_sdp);
sdp_append(&pc->local_sdp, "a=fingerprint:sha-256
sdp_append(&pc->local_sdp, peer_connection_dtls_role_setup_value(role));
strcat(pc->local_sdp.content, description);
break;
case CODEC_OPUS:
sdp_append_opus(&pc->local_sdp);
sdp_append(&pc->local_sdp, "a=fingerprint:sha-256
sdp_append(&pc->local_sdp, peer_connection_dtls_role_setup_value(role));
strcat(pc->local_sdp.content, description);
default:
break;
}
if (pc->config.datachannel) {
sdp_append_datachannel(&pc->local_sdp);
sdp_append(&pc->local_sdp, "a=fingerprint:sha-256
sdp_append(&pc->local_sdp, peer_connection_dtls_role_setup_value(role));
strcat(pc->local_sdp.content, description);
}
pc->b_local_description_created = 1;
/*将SDP信息发送出去。*/
if (pc->onicecandidate) {
pc->onicecandidate(pc->local_sdp.content, pc->config.user_data);
}
}
anser SDP
peer_signaling_on_pub_event
strcmp(item->valuestring, RPC_METHOD_ANSWER
peer_connection_set_remote_description(g_ps.pc, item->valuestring);
设备收到publish消息后,解析jsion判断出method是anser,调用peer_connection_set_remote_description进行处理。
void peer_connection_set_remote_description(PeerConnection* pc, const char* sdp_text) {
char* start = (char*)sdp_text;
char* line = NULL;
char buf[256];
char* val_start = NULL;
uint32_t* ssrc = NULL;
DtlsSrtpRole role = DTLS_SRTP_ROLE_SERVER;
int is_update = 0;
Agent* agent = &pc->agent;
/*1. 解析SDP会话内容*/
while ((line = strstr(start, "\n"))) {
line = strstr(start, "\n");
strncpy(buf, start, line - start);
buf[line - start] = '\0';
if (strstr(buf, "a=setup:passive")) {
role = DTLS_SRTP_ROLE_CLIENT;
}
if (strstr(buf, "a=fingerprint")) {
strncpy(pc->dtls_srtp.remote_fingerprint, buf + 22, DTLS_SRTP_FINGERPRINT_LENGTH);
}
if (strstr(buf, "a=ice-ufrag") &&
strlen(agent->remote_ufrag) != 0 &&
(strncmp(buf + strlen("a=ice-ufrag:"), agent->remote_ufrag, strlen(agent->remote_ufrag)) == 0)) {
is_update = 1;
}
if (strstr(buf, "m=video")) {
ssrc = &pc->remote_vssrc;
} else if (strstr(buf, "m=audio")) {
ssrc = &pc->remote_assrc;
}
if ((val_start = strstr(buf, "a=ssrc:")) && ssrc) {
*ssrc = strtoul(val_start + 7, NULL, 10);
LOGD("SSRC:
}
start = line + 2;
}
if (is_update) {
return;
}
if (!pc->b_local_description_created) {
peer_connection_state_new(pc, role, 0);
}
/*2. 设置远程的描述信息,主要是填充PeerConnection中agent的信息,
这个agent信息存储了本地和远端的icecandicate*/
agent_set_remote_description(&pc->agent, (char*)sdp_text);
/*3. 将peer connect状态切换为checking*/
STATE_CHANGED(pc, PEER_CONNECTION_CHECKING);
}
测试P2P通路
收到对端的anser SDP后,解析得到了ICE candidate,那么将peer connect切换为checking状态,先测试一下P2P的通路。
int agent_connectivity_check(Agent* agent) {
char addr_string[ADDRSTRLEN];
uint8_t buf[1400];
StunMessage msg;
if (agent->nominated_pair->state != ICE_CANDIDATE_STATE_INPROGRESS) {
LOGI("nominated pair is not in progress");
return -1;
}
memset(&msg, 0, sizeof(msg));
/*1. 使用对端的ip地址发送数据*/
if (agent->nominated_pair->conncheck
addr_to_string(&agent->nominated_pair->remote->addr, addr_string, sizeof(addr_string));
LOGD("send binding request to remote ip:
agent_create_binding_request(agent, &msg);
agent_socket_send(agent, &agent->nominated_pair->remote->addr, msg.buf, msg.size);
}
/*2. 接收对端的反馈数据,如果返回正确,表示链路连通正常*/
agent_recv(agent, buf, sizeof(buf));
if (agent->nominated_pair->state == ICE_CANDIDATE_STATE_SUCCEEDED) {
agent->selected_pair = agent->nominated_pair;
return 0;
}
return -1;
}
测试P2P链路正常后,就将Peer Connection状态切换为PEER_CONNECTION_CONNECTED,接下来就是正式的P2P链路创建连接了。
P2P通信
通过上面信令的建立,P2P链路已经建立了连接,接下来就是创建P2P的通信链路了,P2P的传输使用的是SRTP链路。
初始化
在offer SDP的时候,函数peer_connection_state_new中会调用dtls_srtp_init进行初始化。
int dtls_srtp_init(DtlsSrtp* dtls_srtp, DtlsSrtpRole role, void* user_data) {
static const mbedtls_ssl_srtp_profile default_profiles[] = {
MBEDTLS_TLS_SRTP_AES128_CM_HMAC_SHA1_80,
MBEDTLS_TLS_SRTP_AES128_CM_HMAC_SHA1_32,
MBEDTLS_TLS_SRTP_NULL_HMAC_SHA1_80,
MBEDTLS_TLS_SRTP_NULL_HMAC_SHA1_32,
MBEDTLS_TLS_SRTP_UNSET};
/*1. 设置srtp的角色,目前是做server,并更新为INIT状态,并设置UDP收发的回调函数。*/
dtls_srtp->role = role;
dtls_srtp->state = DTLS_SRTP_STATE_INIT;
dtls_srtp->user_data = user_data;
dtls_srtp->udp_send = dtls_srtp_udp_send;
dtls_srtp->udp_recv = dtls_srtp_udp_recv;
/*2.初始化 MBEDTLS 相关结构体*/
mbedtls_ssl_config_init(&dtls_srtp->conf);
//初始化 ssl_config 结构体,这是 SSL 配置的核心,包含加密协议、证书、密钥等信息
mbedtls_ssl_init(&dtls_srtp->ssl);
//初始化 ssl 结构体,表示一个 SSL 会话上下文。
mbedtls_x509_crt_init(&dtls_srtp->cert);
//初始化 X.509 证书结构,用于存储公钥证书。
mbedtls_pk_init(&dtls_srtp->pkey);
//初始化公钥对象结构,用于存储私钥。
mbedtls_entropy_init(&dtls_srtp->entropy);
//初始化熵源,熵源是生成随机数所需的基础。
mbedtls_ctr_drbg_init(&dtls_srtp->ctr_drbg);
//初始化 CTR_DRBG(Counter-based Deterministic Random Byte Generator),
//它是用于生成随机数的伪随机数生成器。
/*3. 设置打印等级*/
#if CONFIG_MBEDTLS_DEBUG
mbedtls_debug_set_threshold(3);
mbedtls_ssl_conf_dbg(&dtls_srtp->conf, dtls_srtp_debug, NULL);
#endif
/*4.生成一个自签名证书。这个证书将在 DTLS 握手期间用于身份验证*/
dtls_srtp_selfsign_cert(dtls_srtp);
/*5. 配置证书验证和认证模式*/
mbedtls_ssl_conf_verify(&dtls_srtp->conf, dtls_srtp_cert_verify, NULL);
//配置证书验证回调函数 dtls_srtp_cert_verify,用来验证远端证书的有效性。
mbedtls_ssl_conf_authmode(&dtls_srtp->conf, MBEDTLS_SSL_VERIFY_REQUIRED);
//设置身份验证模式为MBEDTLS_SSL_VERIFY_REQUIRED,即客户端必须提供有效证书进行验证(服务器验证客户端的证书)。
/*6. 配置证书链和私钥*/
mbedtls_ssl_conf_ca_chain(&dtls_srtp->conf, &dtls_srtp->cert, NULL);
//配置服务器的根证书链,这里使用dtls_srtp->cert 作为证书链的起始证书。
mbedtls_ssl_conf_own_cert(&dtls_srtp->conf, &dtls_srtp->cert, &dtls_srtp->pkey);
//配置本地证书和私钥。服务器和客户端都需要自己的证书和私钥来进行加密通信。
/*7. 配置随机数生成器和超时*/
mbedtls_ssl_conf_rng(&dtls_srtp->conf, mbedtls_ctr_drbg_random, &dtls_srtp->ctr_drbg);
//配置随机数生成器,使用 mbedtls_ctr_drbg_random 和 dtls_srtp->ctr_drbg 作为随机数源。
mbedtls_ssl_conf_read_timeout(&dtls_srtp->conf, 1000);
//设置读取超时时间为 1000 毫秒,意味着在等待数据时,如果超过 1000 毫秒没有接收到数据,则超时。
/*8. 如果是服务器 (DTLS_SRTP_ROLE_SERVER),则配置为 DTLS 服务器模式,并设置用
于防止 DoS 攻击的 DTLS cookies(cookie 是用于防止重放攻击的)。
如果角色是客户端,则直接配置为 DTLS 客户端模式。这里是做服务端*/
if (dtls_srtp->role == DTLS_SRTP_ROLE_SERVER) {
mbedtls_ssl_config_defaults(&dtls_srtp->conf,
MBEDTLS_SSL_IS_SERVER,
MBEDTLS_SSL_TRANSPORT_DATAGRAM,
MBEDTLS_SSL_PRESET_DEFAULT);
mbedtls_ssl_cookie_init(&dtls_srtp->cookie_ctx);
mbedtls_ssl_cookie_setup(&dtls_srtp->cookie_ctx, mbedtls_ctr_drbg_random, &dtls_srtp->ctr_drbg);
mbedtls_ssl_conf_dtls_cookies(&dtls_srtp->conf, mbedtls_ssl_cookie_write, mbedtls_ssl_cookie_check, &dtls_srtp->cookie_ctx);
} else {
mbedtls_ssl_config_defaults(&dtls_srtp->conf,
MBEDTLS_SSL_IS_CLIENT,
MBEDTLS_SSL_TRANSPORT_DATAGRAM,
MBEDTLS_SSL_PRESET_DEFAULT);
}
/*10. 生成并打印本地指纹, 用于生成本地证书的指纹(一个哈希值),用于标识证书。*/
dtls_srtp_x509_digest(&dtls_srtp->cert, dtls_srtp->local_fingerprint);
LOGD("local fingerprint:
/*11. 配置 DTLS-SRTP 的保护配置文件,default_profiles 是预定义的安全配置,
包含 SRTP 协议所支持的加密算法、密钥长度等信息。*/
mbedtls_ssl_conf_dtls_srtp_protection_profiles(&dtls_srtp->conf, default_profiles);
/*12. 配置 SRTP MKI(主密钥标识符)的支持情况。在这里,设置为不支持 MKI。*/
mbedtls_ssl_conf_srtp_mki_value_supported(&dtls_srtp->conf, MBEDTLS_SSL_DTLS_SRTP_MKI_UNSUPPORTED);
/*13. 禁用客户端证书请求中的 CA 列表。*/
mbedtls_ssl_conf_cert_req_ca_list(&dtls_srtp->conf, MBEDTLS_SSL_CERT_REQ_CA_LIST_DISABLED);
/*14. 完成所有配置后,调用 mbedtls_ssl_setup 初始化 ssl 上下文并将配置应用于
当前的 DTLS 会话。*/
mbedtls_ssl_setup(&dtls_srtp->ssl, &dtls_srtp->conf);
return 0;
}
创建
case PEER_CONNECTION_CONNECTED:
/*1. 建立DTLS SRTP链接*/
if (dtls_srtp_handshake(&pc->dtls_srtp, NULL) == 0) {
LOGD("DTLS-SRTP handshake done");
/*2. 建立sctp 链接*/
if (pc->config.datachannel) {
LOGI("SCTP create socket");
sctp_create_socket(&pc->sctp, &pc->dtls_srtp);
pc->sctp.userdata = pc->config.user_data;
}
STATE_CHANGED(pc, PEER_CONNECTION_COMPLETED);
}
主要是创建DTLS_SRTP和SCTP两条链路。
DTSL_SRTP
dtls_srtp_handshake_server()
{
/* 循环发起*/
while(1) {
mbedtls_ssl_session_reset(&dtls_srtp->ssl);
mbedtls_ssl_set_client_transport_id(&dtls_srtp->ssl, client_ip, sizeof(client_ip));
/* 发起握手*/
ret = dtls_srtp_do_handshake(dtls_srtp);
if (ret == MBEDTLS_ERR_SSL_HELLO_VERIFY_REQUIRED) {
LOGD("DTLS hello verification requested");
} else if (ret != 0) {
LOGE("failed! mbedtls_ssl_handshake returned -0x
break;
} else {
break;
}
}
}
判断返回的错误是MBEDTLS_ERR_SSL_HELLO_VERIFY_REQUIRED,会进行再次循环发起。
static int dtls_srtp_do_handshake(DtlsSrtp* dtls_srtp) {
int ret;
/* timer 是 mbedtls_timing_delay_context 类型的一个静态变量,用于管理
定时器(通常是控制超时的机制)。这个定时器在 DTLS 协议中用于处理超时等操作。*/
static mbedtls_timing_delay_context timer;
/*定时器是 DTLS 握手中用来处理超时的一个重要机制,因为 DTLS 是基于UDP协议的,
而 UDP 本身不保证数据的可靠传输,因此需要在握手过程中手动管理超时。*/
mbedtls_ssl_set_timer_cb(&dtls_srtp->ssl, &timer, mbedtls_timing_set_delay, mbedtls_timing_get_delay);
#if CONFIG_MBEDTLS_2_X
/*回调函数会在 DTLS 握手完成后用于派发加密密钥。*/
mbedtls_ssl_conf_export_keys_ext_cb(&dtls_srtp->conf, dtls_srtp_key_derivation_cb, dtls_srtp);
#else
mbedtls_ssl_set_export_keys_cb(&dtls_srtp->ssl, dtls_srtp_key_derivation_cb, dtls_srtp);
#endif
/*用于设置 DTLS 上下文的 BIO(Basic Input/Output)回调函数,这些回调函数
定义了数据的发送和接收方式。回调函数在dtls_srtp_init进行初始化的。*/
mbedtls_ssl_set_bio(&dtls_srtp->ssl, dtls_srtp, dtls_srtp->udp_send, dtls_srtp->udp_recv, NULL);
/*执行 DTLS 握手,直到握手成功或发生错误。*/
do {
ret = mbedtls_ssl_handshake(&dtls_srtp->ssl);
} while (ret == MBEDTLS_ERR_SSL_WANT_READ || ret == MBEDTLS_ERR_SSL_WANT_WRITE);
return ret;
}
SSL的握手
/* Main handshake loop */
while (ssl->state != MBEDTLS_SSL_HANDSHAKE_OVER) {
ret = mbedtls_ssl_handshake_step(ssl);
if (ret != 0) {
break;
}
}
主要就是调用mbedtls_ssl_handshake_step,下面只列出了服务端的代码示例。
int mbedtls_ssl_handshake_step(mbedtls_ssl_context *ssl)
{
int ret = MBEDTLS_ERR_ERROR_CORRUPTION_DETECTED;
ret = ssl_prepare_handshake_step(ssl);
if (ret != 0) {
return ret;
}
ret = mbedtls_ssl_handle_pending_alert(ssl);
if (ret != 0) {
goto cleanup;
}
#if defined(MBEDTLS_SSL_SRV_C)
if (ssl->conf->endpoint == MBEDTLS_SSL_IS_SERVER) {
/*调用mbedtls_ssl_handshake_server_step进行握手管理*/
if (mbedtls_ssl_conf_is_tls12_only(ssl->conf)) {
ret = mbedtls_ssl_handshake_server_step(ssl);
}
}
#endif
if (ret != 0) {
/* handshake_step return error. And it is same
* with alert_reason.
*/
if (ssl->send_alert) {
ret = mbedtls_ssl_handle_pending_alert(ssl);
goto cleanup;
}
}
cleanup:
return ret;
}
主要是接着调用mbedtls_ssl_handshake_server_step进行处理。
int mbedtls_ssl_handshake_server_step(mbedtls_ssl_context *ssl)
{
int ret = 0;
MBEDTLS_SSL_DEBUG_MSG(2, ("server state:
switch (ssl->state) {
case MBEDTLS_SSL_HELLO_REQUEST:
ssl->state = MBEDTLS_SSL_CLIENT_HELLO;
break;
/*
* <== ClientHello
*/
case MBEDTLS_SSL_CLIENT_HELLO:
ret = ssl_parse_client_hello(ssl);
break;
#if defined(MBEDTLS_SSL_PROTO_DTLS)
case MBEDTLS_SSL_SERVER_HELLO_VERIFY_REQUEST_SENT:
return MBEDTLS_ERR_SSL_HELLO_VERIFY_REQUIRED;
#endif
/*
* ==> ServerHello
* Certificate
* ( ServerKeyExchange )
* ( CertificateRequest )
* ServerHelloDone
*/
case MBEDTLS_SSL_SERVER_HELLO:
ret = ssl_write_server_hello(ssl);
break;
case MBEDTLS_SSL_SERVER_CERTIFICATE:
ret = mbedtls_ssl_write_certificate(ssl);
break;
case MBEDTLS_SSL_SERVER_KEY_EXCHANGE:
ret = ssl_write_server_key_exchange(ssl);
break;
case MBEDTLS_SSL_CERTIFICATE_REQUEST:
ret = ssl_write_certificate_request(ssl);
break;
case MBEDTLS_SSL_SERVER_HELLO_DONE:
ret = ssl_write_server_hello_done(ssl);
/*
* <== ( Certificate/Alert )
* ClientKeyExchange
* ( CertificateVerify )
* ChangeCipherSpec
* Finished
*/
case MBEDTLS_SSL_CLIENT_CERTIFICATE:
ret = mbedtls_ssl_parse_certificate(ssl);
break;
case MBEDTLS_SSL_CLIENT_KEY_EXCHANGE:
ret = ssl_parse_client_key_exchange(ssl);
break;
case MBEDTLS_SSL_CERTIFICATE_VERIFY:
ret = ssl_parse_certificate_verify(ssl);
break;
case MBEDTLS_SSL_CLIENT_CHANGE_CIPHER_SPEC:
ret = mbedtls_ssl_parse_change_cipher_spec(ssl);
break;
case MBEDTLS_SSL_CLIENT_FINISHED:
ret = mbedtls_ssl_parse_finished(ssl);
break;
/*
* ==> ( NewSessionTicket )
* ChangeCipherSpec
* Finished
*/
case MBEDTLS_SSL_SERVER_CHANGE_CIPHER_SPEC:
#if defined(MBEDTLS_SSL_SESSION_TICKETS)
if (ssl->handshake->new_session_ticket != 0) {
ret = ssl_write_new_session_ticket(ssl);
} else
#endif
ret = mbedtls_ssl_write_change_cipher_spec(ssl);
break;
case MBEDTLS_SSL_SERVER_FINISHED:
ret = mbedtls_ssl_write_finished(ssl);
break;
case MBEDTLS_SSL_FLUSH_BUFFERS:
MBEDTLS_SSL_DEBUG_MSG(2, ("handshake: done"));
ssl->state = MBEDTLS_SSL_HANDSHAKE_WRAPUP;
break;
case MBEDTLS_SSL_HANDSHAKE_WRAPUP:
mbedtls_ssl_handshake_wrapup(ssl);
break;
default:
MBEDTLS_SSL_DEBUG_MSG(1, ("invalid state
return MBEDTLS_ERR_SSL_BAD_INPUT_DATA;
}
return ret;
}
主要就是针对握手的状态进行握手交互,具体这里就不阐述了,可以参考《SSL/TLS协议分析》文章。
SCTP
SCTP主要使用的usrctp开源组件,
传输
while (!g_interrupted) {
if (g_state == PEER_CONNECTION_COMPLETED) {
curr_time = get_timestamp();
// FPS 25
if (curr_time - video_time > 40) {
video_time = curr_time;
if (reader_get_video_frame(buf, &size) == 0) {
peer_connection_send_video(g_pc, buf, size);
}
}
if (curr_time - audio_time > 20) {
if (reader_get_audio_frame(buf, &size) == 0) {
peer_connection_send_audio(g_pc, buf, size);
}
audio_time = curr_time;
}
usleep(1000);
}
}