小智Ai语音交互简要分析
- Ai
- 28天前
- 106热度
- 0评论
app start
主要是初始化板级、显示、WiFi连接、音频codec、编解码、协议、音效、唤醒几个环节。
auto& board = Board::GetInstance(); //获取板级实例
SetDeviceState(kDeviceStateStarting);//设置出事状态为kDeviceStateStarting
/* Setup the display */
auto display = board.GetDisplay(); //获取显示实例
/* Setup the audio codec */
auto codec = board.GetAudioCodec();//获取codec实例
opus_decode_sample_rate_ = codec->output_sample_rate();//获取当前codec的采样率
opus_decoder_ = std::make_unique<OpusDecoderWrapper>(opus_decode_sample_rate_, 1);//初始化opus解码,设置解码采样率
opus_encoder_ = std::make_unique<OpusEncoderWrapper>(16000, 1, OPUS_FRAME_DURATION_MS);//初始化opus编码,设置采样率16Khz
// For ML307 boards, we use complexity 5 to save bandwidth
// For other boards, we use complexity 3 to save CPU
//根据板级来设置opus编码的复杂度
if (board.GetBoardType() == "ml307") {
ESP_LOGI(TAG, "ML307 board detected, setting opus encoder complexity to 5");
opus_encoder_->SetComplexity(5);
} else {
ESP_LOGI(TAG, "WiFi board detected, setting opus encoder complexity to 3");
opus_encoder_->SetComplexity(3);
}
//如果codec的采样率不是16Khz,需要进行重采样,下面是重采样初始化。
if (codec->input_sample_rate() != 16000) {
input_resampler_.Configure(codec->input_sample_rate(), 16000);
reference_resampler_.Configure(codec->input_sample_rate(), 16000);
}
//注册codec输入音频的回调,表示有录音的pcm,触发mainloop处理。
codec->OnInputReady([this, codec]() {
BaseType_t higher_priority_task_woken = pdFALSE;
xEventGroupSetBitsFromISR(event_group_, AUDIO_INPUT_READY_EVENT, &higher_priority_task_woken);
return higher_priority_task_woken == pdTRUE;
});
//注册codec输出音频的回调,表示有录音的pcm,触发mainloop处理。
codec->OnOutputReady([this]() {
BaseType_t higher_priority_task_woken = pdFALSE;
xEventGroupSetBitsFromISR(event_group_, AUDIO_OUTPUT_READY_EVENT, &higher_priority_task_woken);
return higher_priority_task_woken == pdTRUE;
});
//启动硬件codec,使能录音和播放。
codec->Start();
//开启一个mainloop线程,处理主要逻辑
/* Start the main loop */
xTaskCreate([](void* arg) {
Application* app = (Application*)arg;
app->MainLoop();
vTaskDelete(NULL);
}, "main_loop", 4096 * 2, this, 4, nullptr);
//等待WiFi连接好
/* Wait for the network to be ready */
board.StartNetwork();
// Initialize the protocol
display->SetStatus(Lang::Strings::LOADING_PROTOCOL);//显示正在加载协议
根据使用MQTT还是Websocet来选择通信协议
#ifdef CONFIG_CONNECTION_TYPE_WEBSOCKET
protocol_ = std::make_unique<WebsocketProtocol>();
#else
protocol_ = std::make_unique<MqttProtocol>();
#endif
//注册网络接收异常回调函数
protocol_->OnNetworkError([this](const std::string& message) {
SetDeviceState(kDeviceStateIdle);
Alert(Lang::Strings::ERROR, message.c_str(), "sad", Lang::Sounds::P3_EXCLAMATION);
});
//注册接收音频的回调函数,接收到音频后,往加入解码队列
protocol_->OnIncomingAudio([this](std::vector<uint8_t>&& data) {
std::lock_guard<std::mutex> lock(mutex_);
if (device_state_ == kDeviceStateSpeaking) {
audio_decode_queue_.emplace_back(std::move(data));
}
});
//注册接收协议打开音频的回调,主要是下发解码的的属性信息,包括采样率等。
protocol_->OnAudioChannelOpened([this, codec, &board]() {
board.SetPowerSaveMode(false);
if (protocol_->server_sample_rate() != codec->output_sample_rate()) {
ESP_LOGW(TAG, "Server sample rate
protocol_->server_sample_rate(), codec->output_sample_rate());
}
SetDecodeSampleRate(protocol_->server_sample_rate());
auto& thing_manager = iot::ThingManager::GetInstance();
protocol_->SendIotDescriptors(thing_manager.GetDescriptorsJson());
std::string states;
if (thing_manager.GetStatesJson(states, false)) {
protocol_->SendIotStates(states);
}
});
//注册音频的关闭回调
protocol_->OnAudioChannelClosed([this, &board]() {
board.SetPowerSaveMode(true);
Schedule([this]() {
auto display = Board::GetInstance().GetDisplay();
display->SetChatMessage("system", "");
SetDeviceState(kDeviceStateIdle);
});
});
//注册json解析回调,通知文本,状态等信息
protocol_->OnIncomingJson([this, display](const cJSON* root) {
// Parse JSON data
auto type = cJSON_GetObjectItem(root, "type");
//文字转语音的状态,包括start,stop,sentence_start/stop(句子开始结束),
if (strcmp(type->valuestring, "tts") == 0) {
auto state = cJSON_GetObjectItem(root, "state");
if (strcmp(state->valuestring, "start") == 0) {
Schedule([this]() {
aborted_ = false;
if (device_state_ == kDeviceStateIdle || device_state_ == kDeviceStateListening) {
SetDeviceState(kDeviceStateSpeaking);
}
});
} else if (strcmp(state->valuestring, "stop") == 0) {
Schedule([this]() {
if (device_state_ == kDeviceStateSpeaking) {
background_task_->WaitForCompletion();
if (keep_listening_) {
protocol_->SendStartListening(kListeningModeAutoStop);
SetDeviceState(kDeviceStateListening);
} else {
SetDeviceState(kDeviceStateIdle);
}
}
});
//句子开始
} else if (strcmp(state->valuestring, "sentence_start") == 0) {
auto text = cJSON_GetObjectItem(root, "text");
if (text != NULL) {
ESP_LOGI(TAG, "<<
Schedule([this, display, message = std::string(text->valuestring)]() {
display->SetChatMessage("assistant", message.c_str());
});
}
}
=//stt:语音转文字信息
} else if (strcmp(type->valuestring, "stt") == 0) {
auto text = cJSON_GetObjectItem(root, "text");
if (text != NULL) {
ESP_LOGI(TAG, ">>
Schedule([this, display, message = std::string(text->valuestring)]() {
display->SetChatMessage("user", message.c_str());
});
}
} else if (strcmp(type->valuestring, "llm") == 0) {
auto emotion = cJSON_GetObjectItem(root, "emotion");
if (emotion != NULL) {
Schedule([this, display, emotion_str = std::string(emotion->valuestring)]() {
display->SetEmotion(emotion_str.c_str());
});
}
} else if (strcmp(type->valuestring, "iot") == 0) {
auto commands = cJSON_GetObjectItem(root, "commands");
if (commands != NULL) {
auto& thing_manager = iot::ThingManager::GetInstance();
for (int i = 0; i < cJSON_GetArraySize(commands); ++i) {
auto command = cJSON_GetArrayItem(commands, i);
thing_manager.Invoke(command);
}
}
}
});
//启动协议
protocol_->Start();
//检测OTA的版本,如果版本比较低则进行升级
// Check for new firmware version or get the MQTT broker address
ota_.SetCheckVersionUrl(CONFIG_OTA_VERSION_URL);
ota_.SetHeader("Device-Id", SystemInfo::GetMacAddress().c_str());
ota_.SetHeader("Client-Id", board.GetUuid());
ota_.SetHeader("Accept-Language", Lang::CODE);
auto app_desc = esp_app_get_description();
ota_.SetHeader("User-Agent", std::string(BOARD_NAME "/") + app_desc->version);
xTaskCreate([](void* arg) {
Application* app = (Application*)arg;
app->CheckNewVersion();
vTaskDelete(NULL);
}, "check_new_version", 4096 * 2, this, 2, nullptr);
#if CONFIG_USE_AUDIO_PROCESSOR
//初始化音频处理,主要是降噪,回声消除,VAD检测等。
audio_processor_.Initialize(codec->input_channels(), codec->input_reference());
audio_processor_.OnOutput([this](std::vector<int16_t>&& data) {
background_task_->Schedule([this, data = std::move(data)]() mutable {
opus_encoder_->Encode(std::move(data), [this](std::vector<uint8_t>&& opus) {
//如果启动了音效处理,注册ouput的输出回调。
Schedule([this, opus = std::move(opus)]() {
protocol_->SendAudio(opus);
});
});
});
});
//注册VAD状态变化
audio_processor_.OnVadStateChange([this](bool speaking) {
if (device_state_ == kDeviceStateListening) {
Schedule([this, speaking]() {
if (speaking) {
voice_detected_ = true;
} else {
voice_detected_ = false;
}
auto led = Board::GetInstance().GetLed();
led->OnStateChanged();//只点个灯??
});
}
});
#endif
#if CONFIG_USE_WAKE_WORD_DETECT
//启动唤醒检测,初始化唤醒
wake_word_detect_.Initialize(codec->input_channels(), codec->input_reference());
//唤醒词处理回调函数,其中获取到的唤醒词是字符串,还包括获取处理唤醒词的音频编解码
//唤醒词音频部分是否仅仅是唤醒词部分,还包含其他内容数据?需要确认
wake_word_detect_.OnWakeWordDetected([this](const std::string& wake_word) {
Schedule([this, &wake_word]() {
//如果是idle状态,主要逻辑是,处理业务为连接网络,编码唤醒词,重开唤醒检测
//推送唤醒的音频数据和预料字符串到云端服务器。
if (device_state_ == kDeviceStateIdle) {
SetDeviceState(kDeviceStateConnecting);
//将唤醒音频内容进行编码
wake_word_detect_.EncodeWakeWordData();
if (!protocol_->OpenAudioChannel()) {
//重新再次打开唤醒检测,
wake_word_detect_.StartDetection();
return;
}
//哪些情况会停止唤醒检测:1 检测到唤醒词后会停止。2.处于listening的时候会停止。3.OTA升级过程会停止
std::vector<uint8_t> opus;
//编码并将唤醒数据推送到服务器(除了唤醒词可能还包括说话数据?)
// Encode and send the wake word data to the server
while (wake_word_detect_.GetWakeWordOpus(opus)) {
protocol_->SendAudio(opus);
}
//发送唤醒词的字符串
// Set the chat state to wake word detected
protocol_->SendWakeWordDetected(wake_word);
ESP_LOGI(TAG, "Wake word detected:
keep_listening_ = true;
SetDeviceState(kDeviceStateIdle);
} else if (device_state_ == kDeviceStateSpeaking) {
//如果说话状态,则将说话进行停止,设置一个停止标志位,并发送停止speak给服务不要再发opus了?
AbortSpeaking(kAbortReasonWakeWordDetected);
} else if (device_state_ == kDeviceStateActivating) {
SetDeviceState(kDeviceStateIdle);
}
});
});
//启动唤醒检测
wake_word_detect_.StartDetection();
#endif
//设置状态为IDLE状态
SetDeviceState(kDeviceStateIdle);
esp_timer_start_periodic(clock_timer_handle_, 1000000);
mainloop
void Application::MainLoop() {
while (true) {
auto bits = xEventGroupWaitBits(event_group_,
SCHEDULE_EVENT | AUDIO_INPUT_READY_EVENT | AUDIO_OUTPUT_READY_EVENT,
pdTRUE, pdFALSE, portMAX_DELAY);
//处理录音音频处理,将收到的音频做处理送到队列
if (bits & AUDIO_INPUT_READY_EVENT) {
InputAudio();
}
//处理云端音频处理,将编码的音频进行解码送播放器
if (bits & AUDIO_OUTPUT_READY_EVENT) {
OutputAudio();
}
//处理其他任务的队列
if (bits & SCHEDULE_EVENT) {
std::unique_lock<std::mutex> lock(mutex_);
std::list<std::function<void()>> tasks = std::move(main_tasks_);
lock.unlock();
for (auto& task : tasks) {
task();
}
}
}
}
录音通路
录音处理
// I2S收到音频,触发app应用注册的回调函数通知函数codec->OnInputReady,如下
//通知有数据了,实际读数据通过Read去读。
IRAM_ATTR bool AudioCodec::on_recv(i2s_chan_handle_t handle, i2s_event_data_t *event, void *user_ctx) {
auto audio_codec = (AudioCodec*)user_ctx;
if (audio_codec->input_enabled_ && audio_codec->on_input_ready_) {
return audio_codec->on_input_ready_();
}
return false;
}
//通过eventsetbit触发通知mainloop线程处理音频
codec->OnInputReady([this, codec]() {
BaseType_t higher_priority_task_woken = pdFALSE;
xEventGroupSetBitsFromISR(event_group_, AUDIO_INPUT_READY_EVENT, &higher_priority_task_woken);
return higher_priority_task_woken == pdTRUE;
});
//在mainloop中触发Application::InputAudio()
void Application::InputAudio() {
//获取codec的实例
auto codec = Board::GetInstance().GetAudioCodec();
std::vector<int16_t> data;
//获取codec的音频pcm数据存到data中。
if (!codec->InputData(data)) {
return;//如果数据为空,直接返回
}
//如果采样率不是16Khz,需要进行重采样
if (codec->input_sample_rate() != 16000) {
if (codec->input_channels() == 2) {
auto mic_channel = std::vector<int16_t>(data.size() / 2);
auto reference_channel = std::vector<int16_t>(data.size() / 2);
for (size_t i = 0, j = 0; i < mic_channel.size(); ++i, j += 2) {
mic_channel[i] = data[j];
reference_channel[i] = data[j + 1];
}
auto resampled_mic = std::vector<int16_t>(input_resampler_.GetOutputSamples(mic_channel.size()));
auto resampled_reference = std::vector<int16_t>(reference_resampler_.GetOutputSamples(reference_channel.size()));
input_resampler_.Process(mic_channel.data(), mic_channel.size(), resampled_mic.data());
reference_resampler_.Process(reference_channel.data(), reference_channel.size(), resampled_reference.data());
data.resize(resampled_mic.size() + resampled_reference.size());
for (size_t i = 0, j = 0; i < resampled_mic.size(); ++i, j += 2) {
data[j] = resampled_mic[i];
data[j + 1] = resampled_reference[i];
}
} else {
auto resampled = std::vector<int16_t>(input_resampler_.GetOutputSamples(data.size()));
input_resampler_.Process(data.data(), data.size(), resampled.data());
data = std::move(resampled);
}
}
//如果启动了唤醒检测,判断唤醒检测是否还在运行,如果还在运行将当前的数据合并到唤醒
//检测的buffer中。
#if CONFIG_USE_WAKE_WORD_DETECT
if (wake_word_detect_.IsDetectionRunning()) {
wake_word_detect_.Feed(data);
//会将当前的数据喂给AFE接口,用于做唤醒词
//唤醒词也直接送到云端了???
}
#endif
//如果打开了音效处理,将音频数据push到音效处理中,直接返回
#if CONFIG_USE_AUDIO_PROCESSOR
if (audio_processor_.IsRunning()) {
audio_processor_.Input(data);
}
#else
//如果没有打开音效处理,判断当前的状态是否是监听状态,如果是将音频进行编码
//然后推送到远端服务中。
if (device_state_ == kDeviceStateListening) {
background_task_->Schedule([this, data = std::move(data)]() mutable {
opus_encoder_->Encode(std::move(data), [this](std::vector<uint8_t>&& opus) {
Schedule([this, opus = std::move(opus)]() {
protocol_->SendAudio(opus);
});
});
});
}
#endif
}
音效处理
以下是音效处理过程
//将数据喂给AFE模块,当处理完了之后会触发回调?
void AudioProcessor::Input(const std::vector<int16_t>& data) {
input_buffer_.insert(input_buffer_.end(), data.begin(), data.end());
auto feed_size = afe_iface_->get_feed_chunksize(afe_data_) * channels_;
while (input_buffer_.size() >= feed_size) {
auto chunk = input_buffer_.data();
afe_iface_->feed(afe_data_, chunk);
input_buffer_.erase(input_buffer_.begin(), input_buffer_.begin() + feed_size);
}
}
void AudioProcessor::AudioProcessorTask() {
auto fetch_size = afe_iface_->get_fetch_chunksize(afe_data_);
auto feed_size = afe_iface_->get_feed_chunksize(afe_data_);
ESP_LOGI(TAG, "Audio communication task started, feed size:
feed_size, fetch_size);
while (true) {
//获取到PROCESSOR_RUNNING后,不会清除bit(第三个参数),也就说会再次得到运行。
//也就是说AudioProcessor::Start()后,这个会循环运行,直到调用Stop清除。
xEventGroupWaitBits(event_group_, PROCESSOR_RUNNING, pdFALSE, pdTRUE, portMAX_DELAY);
//等待获取处理后的数据。
auto res = afe_iface_->fetch_with_delay(afe_data_, portMAX_DELAY);
if ((xEventGroupGetBits(event_group_) & PROCESSOR_RUNNING) == 0) {
continue;
}
if (res == nullptr || res->ret_value == ESP_FAIL) {
if (res != nullptr) {
ESP_LOGI(TAG, "Error code:
}
continue;
}
// VAD state change
if (vad_state_change_callback_) {
if (res->vad_state == VAD_SPEECH && !is_speaking_) {
is_speaking_ = true;
vad_state_change_callback_(true);
} else if (res->vad_state == VAD_SILENCE && is_speaking_) {
is_speaking_ = false;
vad_state_change_callback_(false);
}
}
//获取到数据,将数据回调给app->audio_processor_.OnOutput
if (output_callback_) {
output_callback_(std::vector<int16_t>(res->data, res->data + res->data_size / sizeof(int16_t)));
}
}
}
//处理的音效数据的回调,将数据进行编码,然后推送到云端服务器。
audio_processor_.OnOutput([this](std::vector<int16_t>&& data) {
background_task_->Schedule([this, data = std::move(data)]() mutable {
opus_encoder_->Encode(std::move(data), [this](std::vector<uint8_t>&& opus) {
Schedule([this, opus = std::move(opus)]() {
protocol_->SendAudio(opus);
});
});
});
});
播放通路
//1. 通过解析输入的json来启动状态的切换。
protocol_->OnIncomingJson([this, display](const cJSON* root) {
// Parse JSON data
auto type = cJSON_GetObjectItem(root, "type");
if (strcmp(type->valuestring, "tts") == 0) {
auto state = cJSON_GetObjectItem(root, "state");
//收到云端音频,云端会发送start,需要切换到speaking状态。
if (strcmp(state->valuestring, "start") == 0) {
Schedule([this]() {
aborted_ = false;
if (device_state_ == kDeviceStateIdle || device_state_ == kDeviceStateListening) {
SetDeviceState(kDeviceStateSpeaking);
}
});
//本次话题结束后,云端会发送stop,可切换到idle。
} else if (strcmp(state->valuestring, "stop") == 0) {
Schedule([this]() {
if (device_state_ == kDeviceStateSpeaking) {
background_task_->WaitForCompletion();
if (keep_listening_) {
protocol_->SendStartListening(kListeningModeAutoStop);
SetDeviceState(kDeviceStateListening);
} else {
SetDeviceState(kDeviceStateIdle);
}
}
});
} else if (strcmp(state->valuestring, "sentence_start") == 0) {
auto text = cJSON_GetObjectItem(root, "text");
if (text != NULL) {
ESP_LOGI(TAG, "<<
Schedule([this, display, message = std::string(text->valuestring)]() {
display->SetChatMessage("assistant", message.c_str());
});
}
}
//2.解析到云端的json后,会发生状态的迁移
void Application::SetDeviceState(DeviceState state) {
if (device_state_ == state) {
return;
}
clock_ticks_ = 0;
auto previous_state = device_state_;
device_state_ = state;
ESP_LOGI(TAG, "STATE:
// The state is changed, wait for all background tasks to finish
background_task_->WaitForCompletion();
//如果后台有线程还在运行,等待运行结束
auto& board = Board::GetInstance();
auto codec = board.GetAudioCodec();
auto display = board.GetDisplay();
auto led = board.GetLed();
led->OnStateChanged();
switch (state) {
case kDeviceStateUnknown:
case kDeviceStateIdle:
//idle状态,显示"待命"
display->SetStatus(Lang::Strings::STANDBY);
display->SetEmotion("neutral");
#if CONFIG_USE_AUDIO_PROCESSOR
//关掉音效处理
audio_processor_.Stop();
#endif
#if CONFIG_USE_WAKE_WORD_DETECT
//开启语音唤醒检测
wake_word_detect_.StartDetection();
#endif
break;
case kDeviceStateConnecting:
//连接状态,表示连接服务器
display->SetStatus(Lang::Strings::CONNECTING);
display->SetEmotion("neutral");
display->SetChatMessage("system", "");
break;
case kDeviceStateListening:
//说话状态,显示说话中
display->SetStatus(Lang::Strings::LISTENING);
display->SetEmotion("neutral");
//复位解码器,清除掉原来的
ResetDecoder();
//复位编码器的状态
opus_encoder_->ResetState();
#if CONFIG_USE_AUDIO_PROCESSOR
//启动音效处理(回声消除?)
audio_processor_.Start();
#endif
#if CONFIG_USE_WAKE_WORD_DETECT
//关闭唤醒检测
wake_word_detect_.StopDetection();
#endif
//更新IOT状态
UpdateIotStates();
if (previous_state == kDeviceStateSpeaking) {
// FIXME: Wait for the speaker to empty the buffer
vTaskDelay(pdMS_TO_TICKS(120));
}
break;
case kDeviceStateSpeaking:
display->SetStatus(Lang::Strings::SPEAKING);
//复位解码器
ResetDecoder();
//使能codec输出
codec->EnableOutput(true);
#if CONFIG_USE_AUDIO_PROCESSOR
//音效处理停止
audio_processor_.Stop();
#endif
#if CONFIG_USE_WAKE_WORD_DETECT
//开启唤醒检测
wake_word_detect_.StartDetection();
#endif
break;
default:
// Do nothing
break;
}
}
//3. 接收云端音频数据的回调,如果是speak状态,将数据入队到队列
protocol_->OnIncomingAudio([this](std::vector<uint8_t>&& data) {
std::lock_guard<std::mutex> lock(mutex_);
if (device_state_ == kDeviceStateSpeaking) {
audio_decode_queue_.emplace_back(std::move(data));
}
});
//4.当音频输出准备好后,不会不断的调用这个回调??触发mainloop调用OutputAudio
codec->OnOutputReady([this]() {
BaseType_t higher_priority_task_woken = pdFALSE;
xEventGroupSetBitsFromISR(event_group_, AUDIO_OUTPUT_READY_EVENT, &higher_priority_task_woken);
return higher_priority_task_woken == pdTRUE;
});
//5. output处理
void Application::OutputAudio() {
auto now = std::chrono::steady_clock::now();
auto codec = Board::GetInstance().GetAudioCodec();
const int max_silence_seconds = 10;
std::unique_lock<std::mutex> lock(mutex_);
//判断解码队列是否为空,如果为空,把codec输出关了,也就是不要再触发回调
if (audio_decode_queue_.empty()) {
// Disable the output if there is no audio data for a long time
if (device_state_ == kDeviceStateIdle) {
auto duration = std::chrono::duration_cast<std::chrono::seconds>(now - last_output_time_).count();
if (duration > max_silence_seconds) {
codec->EnableOutput(false);
}
}
return;
}
//如果是在监听状态,清除掉解码队列,直接返回
if (device_state_ == kDeviceStateListening) {
audio_decode_queue_.clear();
return;
}
//获取编码的数据
last_output_time_ = now;
auto opus = std::move(audio_decode_queue_.front());
audio_decode_queue_.pop_front();
lock.unlock();
//将解码数据添加到调度中进行解码播放
background_task_->Schedule([this, codec, opus = std::move(opus)]() mutable {
//如果禁止标志位置起,直接退出。在打断唤醒的时候回置起
if (aborted_) {
return;
}
std::vector<int16_t> pcm;
//解码为pcm
if (!opus_decoder_->Decode(std::move(opus), pcm)) {
return;
}
//如果云端的采样率和codec采样率不一样,进行重采样。
// Resample if the sample rate is different
if (opus_decode_sample_rate_ != codec->output_sample_rate()) {
int target_size = output_resampler_.GetOutputSamples(pcm.size());
std::vector<int16_t> resampled(target_size);
output_resampler_.Process(pcm.data(), pcm.size(), resampled.data());
pcm = std::move(resampled);
}
//播放音频
codec->OutputData(pcm);
});
}