ESP-ADF架构探索(II. 深入Element)
本文最后更新于 2025年9月1日 下午
ESP-ADF架构探索(II. 深入Element)
本文是第二篇,从源码探索Element的内部实现,包括其结构与机制、状态与事件等。
Audio Element 是构成 Audio Pipeline 的基础模块。它是一个独立的的处理单元,通常作为 FreeRTOS 的一个任务运行。其核心是将一个复杂的音频任务(如从 SD 卡读取 MP3 文件、解码、然后通过 I2S 输出)分解为一系列独立的、可链接的 Elements(例如:fatfs_stream -> mp3_decoder -> i2s_stream)。
每个 Element 负责一项单一功能,并通过 ringbuf从上一个 Element 接收数据(或从回调函数产生或读入数据),并将处理后的数据发送到下一个 Element。
本文阅读基础:
- 第 I 篇文章阅读基础,或已有ADF经验
- C语言能力,IDF使用经验
- 一定英文能力
本文 (II. 深入Element) 提供:
- Element源码节选学习
- Element结构与机制解析
- Element状态与事件详细整理、典型流程分析
- Element API介绍
本文所用源码地址:
esp-adf/components/audio_pipeline/audio_element.c
esp-adf/components/audio_pipeline/audio_event_iface.c
esp-adf/components/audio_stream/i2s_stream.c
前排提醒:
- 本文出示的代码均为经删改的伪代码,仅用于逻辑演示。
- 本文中一些机械性的叙述由LLM辅助编写,但是经过N次人工校对修改。
- 本文(第 II 篇)主要研究ESP-ADF的内部实现和架构,不重点研究应用层。(起因是最近用Rust想写一个嵌入式音频框架 embedded-audio)
- 文章发布时间为2025年6-7月,请注意是否过时。
ADF仓库结构
espressif/esp-adf: Espressif Advanced Development Framework
1 | |
我们重点关注components,也就是ADF的核心组件。
核心框架组件
以下是一部分,详见:结构、组件和例程表
audio_pipeline: 提供了音频管道 (Audio Pipeline) 的核心实现,负责将音频元素(如输入、解码、输出)连接起来,管理数据流和状态。
audio_stream: 提供了音频管道中音频元素的一部分具体实现,包括:http_stream, fatfs_stream, i2s_stream 等音频元素。
audio_sal (Audio System Abstraction Layer): 音频系统抽象层,封装 FreeRTOS 的任务、队列、互斥锁、内存管理等功能,为上层应用提供统一接口。
audio_hal: 音频硬件抽象层,定义标准接口控制音频编解码芯片 (Codec),如初始化、配置音量、设置采样率,与硬件解耦。
esp_codec_dev: 新一代 Codec 设备抽象层,提供通用可扩展接口,支持控制接口(I2C, SPI)和数据接口(I2S)的灵活组合。
audio_board: 针对乐鑫官方开发板(如 ESP32-LyraT, ESP32-S3-Korvo-2)的板级支持,初始化引脚、外设和 audio_hal 接口。
esp_peripherals: 外设管理服务,统一管理按键、Wi-Fi、SD 卡、触摸板等外设,通过 audio_event_iface 传递事件。
wifi_service: 专用 Wi-Fi 服务,封装连接、断线重连、获取 IP 等功能,支持 SmartConfig、AirKiss 等配网方式。
bluetooth_service: 蓝牙服务,封装 A2DP(音频流传输)、AVRCP(远程控制)、HFP(免提通话)等协议,简化蓝牙音频开发。
audio_mixer: 提供音频混音功能,将多个音频流(如背景音乐和提示音)混合成单声道或双声道音频流。
1. 数据结构: struct audio_element
1 | |
功能接口 (
open,process,close,seek,destroy): 这函数指针由具体的 Element 实现(如i2s_stream.c或http_stream.c)来定义其特定的行为。I/O:
in,out支持两种 I/O 模式:IO_TYPE_RB: 通过RingBufferringbuf_handle_t进行数据交换,这是 Pipeline 中 Element 间链接的标准方式。IO_TYPE_CB: 通过回调函数 (io_callback_t) 进行数据交换,提供了与 Pipeline 外部模块交互的灵活性。
状态和事件: 在[第四节](#4. 状态与事件)会详细讲解。
事件接口 (
iface_event): Element有个audio_event_iface_handle_t,使得 Element 能够接收来自 Pipeline 的命令(如 PAUSE, RESUME, STOP)并向外即时报告状态。state(audio_element_state_t): 是个枚举。Element 的当前状态(如AEL_STATE_RUNNING,AEL_STATE_PAUSED)。。is_running和stopping是 volatile bool flags,用于控制任务主循环和处理停止请求。state_event(EventGroupHandle_t): 一个 FreeRTOS EventGroup,用于同步任务状态。
任务配置 (
task_stack,task_prio,task_core): 定义了 Element 任务的堆栈大小、优先级和运行核心。如果task_stack<= 0,则该 Element 不会创建自己的任务,其process函数将由外部调用。数据缓冲区 (
buf,buf_size): 每个 Element 内部都有一个工作缓冲区,用于process函数处理数据。信息 (
info):audio_element_info_t存储媒体信息,如采样率、通道数、总字节数和当前播放位置等。
2. 生命周期
一个 Element 的生命周期由其状态机(audio_element_state_t,见[第四节](#4. 状态与事件))驱动,并通过 audio_event_iface 发送的命令切换状态。
2.1. 初始化
audio_element_init(audio_element_cfg_t *config) 函数根据 audio_element_cfg_t 配置来初始化一个 Element 实例,分配 audio_element 结构体内存,初始化其内部的事件接口 (iface_event),并注册 audio_element_on_cmd ,创建互斥锁等。
2.2. 运行
audio_element_run 会创建并启动 Element 的FreeRTOS主任务 (audio_element_task)。
1 | |
audio_element_task 驱动了整个数据处理流程。一般由pipeline的主任务进行的调用,也可以手动调用。
1 | |
任务主循环的核心逻辑分为两部分:
- 命令处理: 通过
audio_event_iface_waiting_cmd_msg等待来自 Pipeline 或其他模块的命令(如STOP,PAUSE)。on_cmd回调(即audio_element_on_cmd)会处理这些命令并改变 Element 的状态。 - 数据处理: 调用
audio_element_process_running,它会进一步调用用户实现的process函数来处理音频数据。
2.3. 数据处理
audio_element_process_running此函数处理数据流,它调用特定 Element 在初始化时注册的process回调(比如mp3 decoder就注册一个解码mp3的process回调)。
1 | |
3. 数据流
3.1. 数据输入/输出 (I/O) 模式
Element 提供了两种数据 I/O 方式,以适应不同的应用场景。
(但是这种模式仍然不够灵活,并可能有一定的Copy消耗,GMF的进步(TODO)就是在这里动了刀子)
3.1.1. Ringbuffer 模式
这是 Pipeline 内部 Element 之间数据交换的标准方式。上一个 Element 的输出 ringbuf 就是下一个 Element 的输入 ringbuf。
audio_element_input(el, buffer, size): 从输入ringbuf(el->in.input_rb) 读取数据。1
in_len = rb_read(el->in.input_rb, buffer, wanted_size, el->input_wait_time);audio_element_output(el, buffer, size): 向输出ringbuf(el->out.output_rb) 写入数据。1
output_len = rb_write(el->out.output_rb, buffer, write_size, el->output_wait_time);
3.1.2.Callback 模式
当 Element 需要与 Pipeline 外部的模块(如网络、文件系统或其他非 ADF 组件)直接交互时,可以使用回调模式。
如 audio_element_set_read_cb(el, func, ctx) 注册一个读回调函数。当调用 audio_element_input 时,如果 read_type 是 IO_TYPE_CB,框架将执行这个注册的回调来获取数据。audio_element_set_write_cb(el, func, ctx)同理。
这种机制在 http_stream、i2s_stream 等 Element 中被广泛使用,它们的 process 函数内部通过调用 audio_element_input 和 audio_element_output 来驱动数据流动,而具体的读写行为则由其各自的 _http_read 或 _i2s_write 等回调函数实现。
3.1.3. 多路输入/输出 (Multi-Stream I/O)
除了标准的单路 I/O,audio_element 还支持混音 (Mixing) 和分流 (Splitting) ,主要是通过multi_output_ringbuf.
在上一篇文章讲解的 element_wav_amr_sdcard 例程中,我们使用audio_element_set_multi_output_ringbuf(i2s_stream_reader,ringbuf11,0)设置了两个输出ringbuf。
相关 API:
audio_element_set_multi_input/output_ringbuf(): 设置指定索引的输入/输出 Ringbuffer。audio_element_multi_input(): 从指定索引的输入 Ringbuffer 读取数据。audio_element_multi_output(): 将数据同时写入所有已配置的输出 Ringbuffer。
3.1. 核心处理函数: process 回调
所有数据处理的起点都源于 Element 的 process 回调函数。在 Element 的主任务循环 audio_element_task 中,一旦 Element 处于 AEL_STATE_RUNNING 状态,audio_element_process_running 函数便会持续调用用户在初始化时注册的 process 回调:
1 | |
process 函数的实现者需要在此函数内部,获取数据,处理数据,发送数据。(获取和发送不由element模块直接接管从而在具体的 process 中实现,提供了更高的灵活性)
以 i2s_stream 为例:
1 | |
3.3. 寻址 (Seeking)
框架提供了 seek 功能,具体我还没研究。
3.4. 数据流信息
在数据处理过程中,Element 需要动态地管理和报告流信息,如播放进度、码率等。框架提供了一组原子化的信息更新接口。
细粒度更新 API:
audio_element_update_byte_pos()/audio_element_set_byte_pos(): 更新或设置当前处理的字节位置byte_pos。audio_element_set_total_bytes(): 设置总字节数total_bytes。audio_element_set_music_info(): 一次性设置采样率、通道数和位深。audio_element_set_bps(): 设置码率(bits per second)。audio_element_set_duration(): 设置总时长(秒)。
一个解码器 Element 在 process 中解析出媒体元数据后,会调用自身 set_music_info 等函数来初始化信息。在处理每个数据块后,它会调用 update_byte_pos 来更新进度。这些更新后的信息可以通过 audio_element_report_pos 或 audio_element_report_info 异步报告给上层应用,用于实现播放进度条等功能。
4. 状态与事件
不得不说,这个状态机有点繁琐和复杂,好在GMF抛下了历史包袱(TODO)。
一个典型的 audio_element 控制流程如下:
audio_pipeline或用户代码 通过调用audio_element_pause()等 API 发起一个操作。- 该 API 将一个命令(如
AEL_MSG_CMD_PAUSE)通过iface_event的内部队列发送给 Element 自己的任务。 - 同时,API 函数使用
state_event事件组来阻塞等待,直到 Element 任务确认该操作已完成。 - Element 的
audio_element_task任务在其主循环中被命令唤醒,并执行相应的命令处理函数(如audio_element_on_cmd_pause)。 - 命令处理函数会更新
state状态枚举,并根据需要设置is_running或stopping标志。 - 操作完成后,命令处理函数会设置
state_event中对应的事件位,以唤醒并通知外部调用者操作已完成。
接下来,我们将详细解析每个组件。
4.1. state - 逻辑状态
audio_element_state_t是一个枚举类型,定义了 Element 从初始化到销毁的宏观逻辑状态。 主要状态包括:
| 状态码 | 描述 |
|---|---|
AEL_STATUS_NONE |
无状态 |
AEL_STATUS_ERROR_OPEN |
打开 Element 失败 |
AEL_STATUS_ERROR_INPUT |
从上游读取数据时出错 |
AEL_STATUS_ERROR_PROCESS |
在 process 回调中处理数据时出错 |
AEL_STATUS_ERROR_OUTPUT |
向下游写入数据时出错 |
AEL_STATUS_ERROR_CLOSE |
关闭 Element 时出错 |
AEL_STATUS_ERROR_TIMEOUT |
操作超时 |
AEL_STATUS_ERROR_UNKNOWN |
未知错误 |
AEL_STATUS_INPUT_DONE |
上游数据已全部读取完毕 |
AEL_STATUS_INPUT_BUFFERING |
正在缓冲输入数据 |
AEL_STATUS_OUTPUT_DONE |
下游数据已全部写入完毕 |
AEL_STATUS_OUTPUT_BUFFERING |
正在缓冲输出数据 |
AEL_STATUS_STATE_RUNNING |
RUNNING |
AEL_STATUS_STATE_PAUSED |
PAUSED |
AEL_STATUS_STATE_STOPPED |
STOPPED |
AEL_STATUS_STATE_FINISHED |
FINISHED |
AEL_STATUS_MOUNTED |
设备已挂载。 |
AEL_STATUS_UNMOUNTED |
设备已卸载 |
state 是对 Element 当前工作阶段的顶层抽象描述。它被外部调用者(如 audio_pipeline)和 Element 内部任务共同用来判断 Element 的行为。例如,Pipeline 根据此状态决定是否可以向 Element 发送数据,而 Element 内部则根据它来决定是否要执行 process 函数。
内部: 该状态只应在 Element 的任务线程内部被修改,通常是在响应特定命令后。例如,在 audio_element_on_cmd_pause 中(这是一个内部函数),当暂停逻辑执行完毕后,状态会被设置为 AEL_STATE_PAUSED。
外部: 通过 audio_element_get_state() API 获取当前状态。
4.2. state_event - 线程状态同步
只应在Element内部使用。
它是一个 FreeRTOS 的 EventGroupHandle_t,利用其事件位 (Event Bits) 作为同步标志。
当一个外部线程(如应用主任务)调用一个阻塞 API(如 audio_element_pause)时,它需要等待 Element 的内部任务实际完成这个操作。state_event 正是
外部 API 在发送命令后,会调用 xEventGroupWaitBits() 来阻塞自身,等待特定的事件位。
1 | |
Element 的内部任务在完成了相应的操作后,会调用 xEventGroupSetBits()来设置对应的事件位,从而唤醒(unblock)正在等待的外部线程。
1 | |
| 位 | 说明 | |
|---|---|---|
BIT0 |
STOPPED_BIT |
标志元素已完成停止流程,进入 STOPPED, FINISHED, 或 ERROR 状态。通常由 audio_element_on_cmd_stop 等函数设置,由 audio_element_wait_for_stop 等待。 |
BIT1 |
STARTED_BIT |
标志元素的 open 回调已成功执行。 |
BIT2 |
BUFFER_REACH_LEVEL_BIT |
标志输出 ringbuf 的数据量已达到 audio_element_wait_for_buffer 函数所期望的水平。 |
BIT3 |
TASK_CREATED_BIT |
标志元素的后台任务已成功创建并即将进入主循环。由任务自身设置,由 audio_element_run 等待。 |
BIT4 |
TASK_DESTROYED_BIT |
标志元素的后台任务已完成所有清理工作,即将自我销毁。由任务自身设置,由 audio_element_terminate 等待。 |
BIT5 |
PAUSED_BIT |
标志元素已成功进入 PAUSED 状态。由 audio_element_on_cmd_pause 设置,由 audio_element_pause 等待。 |
BIT6 |
RESUMED_BIT |
标志元素已成功从暂停或停止状态中恢复到运行状态。由 audio_element_on_cmd_resume 设置,由 audio_element_resume 等待。 |
4.3. iface_event - 命令与事件通信
audio_event_iface_handle_t它是一个事件接口句柄,其内部封装了两个 FreeRTOS 队列:一个用于内部命令 (internal_queue),一个用于向外部报告事件(external_queue)。
audio_element_pause等外向API,调用 audio_event_iface_cmd(),将一个audio_event_iface_msg_t消息(例如AEL_MSG_CMD_PAUSE)发送到 Element 的internal_queue。
1 | |
Element 的主任务 audio_element_task 在其循环中调用 audio_event_iface_waiting_cmd_msg(),这将阻塞地从 internal_queue 中获取消息。获取到消息后,会调用注册的 on_cmd 回调(即 audio_element_on_cmd_pause)来执行具体的命令逻辑。
当 Element 需要向外报告状态时(如 process 返回 AEL_IO_DONE),它会调用 audio_element_report_status() 等辅助函数,这些函数最终会调用 audio_event_iface_sendout(),将一个报告消息发送到 external_queue,供 audio_pipeline 或其他监听者消费。
| 控制命令 (由外部发送给元素) | enum值 | |
|---|---|---|
AEL_MSG_CMD_FINISH |
2 | 通知元素数据流已结束,应进入 FINISHED 状态。通常由元素自身在处理完数据后间接触发。 |
AEL_MSG_CMD_STOP |
3 | 命令元素停止当前操作,清理资源(调用 close),并进入 STOPPED 状态。 |
AEL_MSG_CMD_PAUSE |
4 | 命令元素暂停数据处理,并进入 PAUSED 状态。 |
AEL_MSG_CMD_RESUME |
5 | 命令元素从 PAUSED 或 STOPPED 状态恢复,重新初始化(调用 open)并进入 RUNNING 状态。 |
AEL_MSG_CMD_DESTROY |
6 | 命令元素永久终止其后台任务并准备被销毁。 |
| 报告类型 (由元素发送给外部) | ||
AEL_MSG_CMD_REPORT_STATUS |
8 | 标志该消息是一个状态报告,其 data 字段携带一个 audio_element_status_t 状态码。 |
AEL_MSG_CMD_REPORT_MUSIC_INFO |
9 | 标志该消息是一个媒体元数据报告,其 data 字段携带 audio_element_info_t 结构体。 |
AEL_MSG_CMD_REPORT_CODEC_FMT |
10 | 标志该消息报告了已识别的音频编码格式。 |
AEL_MSG_CMD_REPORT_POSITION |
11 | 标志该消息报告了当前的播放/处理进度(字节位置)。 |
4.4. is_running, task_run 和 stopping - 即时状态标志
只应在Element内部使用。
它们是volatile bool 类型的标志位。它们提供了比 state 更轻量、更即时的状态控制,主要用于在 Element 任务的单个循环迭代中快速判断是否应继续执行数据处理或中断 I/O。
**
is_running**:- 设置: 在
audio_element_on_cmd的RESUME分支中被设为true,在PAUSE、STOP、FINISH等分支中被设为false。 - 检查: 在
audio_element_process_running函数的入口处被检查。如果is_running为false,该函数会立即返回,从而跳过process回调的执行。这确保了在接收到暂停或停止命令后,能迅速停止数据处理,即使state的完整转换还未完成。
- 设置: 在
**
task_run**:- 设置: 在
audio_element_task的开始设置为true,在task结束或错误的时候设为false。还在audio_element_run、audio_element_terminate等函数中被设置为对应的值。 - 检查: 在
audio_element_run、audio_element_terminate等函数中检查,比如,pause、terminate时检查是否已经停止。
- 设置: 在
**
stopping**:- 设置: 在
audio_element_stop()API 中设为true,在发送命令之前。 - 检查: 主要由
ringbuf.c中的 I/O 函数(如rb_read和rb_write)通过audio_element_is_stopping()检查。当 I/O 函数发现该标志为true时,会立即中止阻塞等待,并返回AEL_IO_ABORT。这确保了当外部请求停止时,即使 Element 正阻塞在ringbuf的读写上,也能被快速唤醒并响应停止命令。用户在自定义的process实现中,也应在耗时操作前后检查此标志,以便快速退出。
- 设置: 在
4.5. 状态与事件API
1. 外部控制与同步 API
这类 API 是 audio_element 提供给外部调用者(如 audio_pipeline 或用户应用)的高级接口,用于控制元素的生命周期。它们的核心工作模式是:通过 iface_event 异步发送命令,然后通过 state_event 事件组同步等待操作完成的确认信号。
audio_element_run
启动元素的后台任务,并等待任务成功创建的信号。audio_element_stop
请求元素停止。它会立即设置stopping标志以中断 I/O,然后发送异步的STOP命令,立即返回。audio_element_wait_for_stop
阻塞调用线程,同步等待元素进入停止状态(可为STOPPED,FINISHED,ERROR)。可与前者结合使用。audio_element_pause
请求元素暂停。它发送异步的PAUSE命令,并等待暂停完成的信号。audio_element_resume
请求元素恢复。它发送异步的RESUME命令,并等待恢复完成的信号。audio_element_terminate
请求元素永久终止。它发送异步的DESTROY命令,并等待任务完全销毁的信号。audio_element_wait_for_buffer
阻塞调用线程,同步等待元素的输出ringbuf填充到指定水平。audio_element_change_cmd
更底层的命令发送接口,用于直接向元素发送一个指定的audio_element_msg_cmd_t命令。audio_element_reset_state
强制将元素的内部逻辑状态state变量重置为AEL_STATE_INIT。
2. 内部命令处理 API(on_cmd)
这类函数不被外部直接调用,它们是 audio_element.c 内部的 static 函数。当 audio_element_task 从 iface_event 收到命令时,会调用这些函数来执行具体的状态转换逻辑。它们是实际操作 state、is_running 和 state_event 的地方。
audio_element_on_cmd
作为命令分发器,在元素的任务上下文中被调用,根据收到的消息cmd来调用下面具体的处理函数。audio_element_on_cmd_stop
处理STOP命令。它会调用close()回调,更新state为AEL_STATE_STOPPED,设置is_running为false,并设置state_event的STOPPED_BIT来唤醒等待者。audio_element_on_cmd_pause
处理PAUSE命令。它更新state为AEL_STATE_PAUSED,设置is_running为false,并设置state_event的PAUSED_BIT。audio_element_on_cmd_resume
处理RESUME命令。它会调用open()回调,更新state为AEL_STATE_RUNNING,设置is_running为true,并设置state_event的RESUMED_BIT。audio_element_on_cmd_finish
当数据流正常结束时被内部调用。它更新state为AEL_STATE_FINISHED,设置is_running为false,并设置state_event的STOPPED_BIT。audio_element_on_cmd_error
当发生错误时被内部调用。它更新state为AEL_STATE_ERROR,设置is_running为false,并设置state_event的STOPPED_BIT。
3. 事件接口通信 API (iface_event)
这些是 audio_event_iface.h 中定义的函数,构成了元素异步通信的基石。它们操作 iface_event 句柄内部的两个 FreeRTOS 队列。
audio_event_iface_cmd
向事件接口的内部命令队列发送一条消息。外部控制 API(如audio_element_stop)通过此函数将命令传递给元素任务。audio_event_iface_cmd_from_isr
audio_event_iface_cmd的中断安全版本。audio_event_iface_waiting_cmd_msg
使元素任务阻塞等待其内部命令队列,直到收到新命令或超时。这是元素任务事件驱动的核心。audio_event_iface_sendout
向事件接口的外部报告队列发送一条消息。内部上报 API(如audio_element_report_status)通过此函数将事件和状态信息传递给外部监听者。audio_event_iface_set_listener
将一个事件接口(如audio_pipeline的iface)设置为另一个事件接口(如audio_element的iface)的监听者。这样,element通过sendout发送的事件就能被pipeline接收到。
4. 内部状态与信息上报 API
这类 API 由元素实现者在 process、open 等回调函数中调用。它们通过 iface_event 的 sendout 机制,将元素的内部状态变化和关键信息异步地报告给外部。
audio_element_report_status
上报一个audio_element_status_t状态码,通知外部监听者元素内部发生的具体事件(如AEL_STATUS_ERROR_INPUT,AEL_STATUS_STATE_FINISHED)。audio_element_report_info
上报完整的audio_element_info_t结构体,通常在获取到媒体元数据(如采样率、总时长)后调用。audio_element_report_codec_fmt
上报识别出的音频编码格式。audio_element_report_pos
上报当前的播放/处理字节位置。audio_element_set_ringbuf_done
在元素的输出ringbuf上设置一个完成标志(EOF),这是通知下游元素数据已经结束的特定信令。
5. 状态与信息查询 API
这类 API 用于从外部安全地查询元素的当前状态和信息。它们通常是线程安全的,内部会使用互斥锁来保护被访问的数据。
audio_element_get_state
获取元素当前的高层逻辑状态 (audio_element_state_t)。audio_element_getinfo
获取元素的完整audio_element_info_t结构体信息。audio_element_get_uri
获取元素当前配置的 URI。audio_element_is_stopping
查询stopping紧急停止标志位,判断元素是否正在响应一个stop请求。
5. 一些控制流程
audio_element_resume 恢复/启动流程
resume 操作用于启动或从暂停状态恢复 Element。这是一个异步过程。
应用层调用
audio_element_resume(el, ...)(涉及iface_event,state_event)
此函数负责发起RESUME命令并等待,直到 Element 内部任务确认完成恢复操作。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15esp_err_t audio_element_resume(audio_element_handle_t el, float wait_for_rb_threshold, TickType_t timeout)
{
if (!el->task_run) { /*处理错误*/ }
if (el->state == AEL_STATE_RUNNING) { /* 已经在跑 */ }
if (el->task_stack <= 0) { /*先不关注*/ }
// 1. 清除旧的 RESUMED_BIT 事件标志位,为本次同步做准备
xEventGroupClearBits(el->state_event, RESUMED_BIT);
// 2. 发送异步命令到 Element 内部队列
if (audio_element_cmd_send(el, AEL_MSG_CMD_RESUME) == ESP_FAIL) { /*处理错误*/ }
// 3. 阻塞当前线程(也就是应用层调用这个pause API的线程),等待 Element 任务处理完成并设置 RESUMED_BIT
EventBits_t uxBits = xEventGroupWaitBits(el->state_event, RESUMED_BIT, false, true, timeout);
}Element 任务主循环
audio_element_task(void *pv)(涉及iface_event)
Element 的核心任务在audio_element_task中运行。它是一个循环,大部分时间阻塞在audio_event_iface_waiting_cmd_msg(),等待新的命令或事件。当RESUME命令到达时,audio_event_iface_waiting_cmd_msg会调用注册的audio_element_on_cmd回调函数来处理该命令。1
2
3
4
5
6
7
8
9
10
11void audio_element_task(void *pv)
{
// ... 初始化 ...
while (el->task_run) {
// 4. 任务在此等待命令,当 RESUME 命令到达时,函数返回,并由 on_cmd 回调处理
if (audio_event_iface_waiting_cmd_msg(el->iface_event) != ESP_OK) { /* 异常处理 */ }
// 5. 如果 Element 处于运行状态,则处理数据
audio_element_process_running(el);
}
// ... 清理和退出 ...
}命令处理函数
audio_element_on_cmd_resume(el)(涉及state,is_running,state_event)
此函数是RESUME命令的最终执行者。- 调用
open回调: 执行audio_element_process_init(el),它会调用用户注册的el->open(el)函数来重新初始化资源。 - 发出同步信号: 一旦
open成功,设置el->state = AEL_STATE_RUNNING,并通过xEventGroupSetBits设置RESUMED_BIT,唤醒在第 1 步中等待的audio_element_resume函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29// audio_element_on_cmd() -> case AEL_MSG_CMD_RESUME:
static esp_err_t audio_element_on_cmd_resume(audio_element_handle_t el)
{
// 6. 设置运行标志
el->is_running = true;
// 7. 发出同步信号,通知 API 调用者操作已开始
xEventGroupSetBits(el->state_event, RESUMED_BIT);
// 8. 调用 open 回调进行初始化,如果失败则回滚
if (audio_element_process_init(el) != ESP_OK) {
el->is_running = false;
/* 处理错误 */
}
}
// audio_element_process_init() 会调用用户定义的 open 函数
esp_err_t audio_element_process_init(audio_element_handle_t el)
{
if (el->open == NULL) { /* 无 open 回调,直接成功 */ }
el->is_open = true;
audio_element_force_set_state(el, AEL_STATE_INITIALIZING);
esp_err_t ret = el->open(el); // <-- 调用用户注册的 open 函数
if (ret == ESP_OK) {
audio_element_force_set_state(el, AEL_STATE_RUNNING);
audio_element_report_status(el, AEL_STATUS_STATE_RUNNING);
} else { /* 处理 open 失败 */ }
}- 调用
audio_element_stop 停止流程
stop 操作与 resume 不同,此函数发送一个 STOP 命令,但他不等待。如果需要等待,则应调用 audio_element_wait_for_stop(el)。
应用层调用
audio_element_stop(el)(涉及iface_event,stopping)- 中断数据流: 调用
rb_abort中断输入和输出 Ringbuffer,让 I/O 操作快速响应。 - 发送异步命令: 发送
AEL_MSG_CMD_STOP命令。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15esp_err_t audio_element_stop(audio_element_handle_t el)
{
// 1. 中断输入输出 Ringbuffer。(对于是RingBuffer还是CallBack的判断是在这个abort函数内部的)
audio_element_abort_output_ringbuf(el);
audio_element_abort_input_ringbuf(el);
if (el->stopping) { /* 防止重复发送 */ }
el->stopping = true;
// 2. 发送停止命令
if (audio_element_cmd_send(el, AEL_MSG_CMD_STOP) != ESP_OK) {
el->stopping = false;
/* 处理错误 */
}
}- 中断数据流: 调用
命令处理函数
audio_element_on_cmd_stop(el)(涉及state,is_running,state_event)
当 Element 任务收到STOP命令后,audio_event_iface_waiting_cmd_msg会调用此函数以执行停止逻辑。- 反初始化: 调用
audio_element_process_deinit(el),执行用户注册的close回调。 - 更新状态: 设置
el->state = AEL_STATE_STOPPED和el->is_running = false。 - 发出同步信号: 设置
STOPPED_BIT,通知等待者(如audio_element_wait_for_stop) Element 已停止。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27// audio_element_on_cmd() -> case AEL_MSG_CMD_STOP:
static esp_err_t audio_element_on_cmd_stop(audio_element_handle_t el)
{
if ((el->state != AEL_STATE_FINISHED) && (el->state != AEL_STATE_STOPPED)) {
// 3. 调用 close 回调释放资源
audio_element_process_deinit(el);
// 4. 更新状态为 STOPPED
el->state = AEL_STATE_STOPPED;
audio_element_report_status(el, AEL_STATUS_STATE_STOPPED);
// 5. 停止数据处理循环
el->is_running = false;
el->stopping = false;
// 6. 设置 STOPPED_BIT,唤醒等待者
xEventGroupSetBits(el->state_event, STOPPED_BIT);
} else {
/* 处理其他情况,例如从 ERROR 或 FINISHED 状态强制变为 STOPPED */
}
}
// audio_element_process_deinit() 会调用用户定义的 close 函数
esp_err_t audio_element_process_deinit(audio_element_handle_t el)
{
if (el->is_open && el->close) {
el->close(el); // <-- 调用用户注册的 close 函数
}
el->is_open = false;
}随即回到audio_element_task的主循环,因为已经stop,所以audio_element_process_running内部将不会执行,随即退出循环。
- 反初始化: 调用
数据流正常结束 (Finish)
当一个 Element(例如解码器)处理完所有输入数据时,它会进入 FINISH 状态。
Process 回调
el->process(el, ...)(涉及state)
流程的起点是 Element 的数据处理函数el->process()返回AEL_IO_DONE。Process 运行器
audio_element_process_running()(涉及iface_event)
该函数在任务循环中检查process()的返回值。- 捕获
AEL_IO_DONE: 一旦process()返回AEL_IO_DONE,启动结束流程。 - 通知下游: 调用
audio_element_set_ringbuf_done(el),通知下游数据流已结束。 - 执行
Finish命令: 调用audio_element_on_cmd_finish(el)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17// audio_element_process_running()
static esp_err_t audio_element_process_running(audio_element_handle_t el)
{
process_len = el->process(el, el->buf, el->buf_size);
if (process_len <= 0) {
switch (process_len) {
case AEL_IO_DONE:
case AEL_IO_OK:
// 1. 通知下游 Element 数据已结束
audio_element_set_ringbuf_done(el);
// 2. 触发 Finish 流程
audio_element_on_cmd_finish(el);
break;
/* ... 其他 case ... */
}
}
}- 捕获
Finish 命令处理
audio_element_on_cmd_finish(el)(涉及state,is_running,state_event,iface_event)
此函数负责将 Element 置于FINISHED状态。- 资源清理: 调用
audio_element_process_deinit(el)来执行close()回调。 - 状态变更与报告: 设置状态为
FINISHED,停止运行,并向上层报告。
1
2
3
4
5
6
7
8
9
10
11
12
13
14// audio_element_on_cmd_finish()
static esp_err_t audio_element_on_cmd_finish(audio_element_handle_t el)
{
/* 状态检查 */
// 3. 调用 close 回调
audio_element_process_deinit(el);
// 4. 更新状态
el->state = AEL_STATE_FINISHED;
audio_element_report_status(el, AEL_STATUS_STATE_FINISHED);
// 5. 停止数据处理
el->is_running = false;
// 6. 设置停止位,兼容 wait_for_stop
xEventGroupSetBits(el->state_event, STOPPED_BIT);
}- 资源清理: 调用
错误
当发生不可恢复的错误(比如IO错误或者数据处理错误)时,Element 会进入 ERROR 状态。,我们自下往上看一看:
Process 或 I/O 函数
process()/rb_read()/rb_write()(涉及state)
无论是process函数返回AEL_PROCESS_FAIL,还是 I/O 操作返回AEL_IO_FAIL,都会触发错误处理。1
2
3
4
5
6
7
8
9
10
11
12
13
14// audio_element_input() 中对 rb_read 的错误处理
audio_element_err_t audio_element_input(audio_element_handle_t el, char *buffer, int wanted_size)
{
in_len = rb_read(el->in.input_rb, buffer, wanted_size, el->input_wait_time);
if (in_len <= 0) {
switch (in_len) {
case AEL_IO_FAIL:
// 1. 立即向上层报告 I/O 错误状态
audio_element_report_status(el, AEL_STATUS_ERROR_INPUT);
break;
// ...
}
}
}audio_element_process_running()(涉及iface_event)audio_element_process_running捕获错误返回码。
* **立即上报错误**: 调用 `audio_element_report_status()` 快速将错误报告给上层。
* **进入Element错误处理流程**: 调用 `audio_element_on_cmd_error(el)`。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// audio_element_process_running()
static esp_err_t audio_element_process_running(audio_element_handle_t el)
{
process_len = el->process(el, el->buf, el->buf_size);
if (process_len <= 0) {
switch (process_len) {
case AEL_IO_FAIL:
case AEL_PROCESS_FAIL:
// 2. 报告处理错误
audio_element_report_status(el, AEL_STATUS_ERROR_PROCESS);
// 3. 触发 Error 流程
audio_element_on_cmd_error(el);
break;
/* ... 其他 case ... */
}
}
}
Error 命令处理
audio_element_on_cmd_error(el)(涉及state,is_running,state_event)
此函数负责将 Element 置于错误状态。- 尝试清理: 调用
audio_element_process_deinit(el)尝试安全地执行close()。 - 设置错误状态: 设置
el->state = AEL_STATE_ERROR并停止运行。 - 设置停止位: 设置
STOPPED_BIT,将 Element 停在错误状态。
1
2
3
4
5
6
7
8
9
10
11
12
13
14// audio_element_on_cmd_error()
static esp_err_t audio_element_on_cmd_error(audio_element_handle_t el)
{
if (el->state != AEL_STATE_STOPPED) {
// 4. 尝试调用 close 回调释放资源
audio_element_process_deinit(el);
// 5. 设置状态为 ERROR
el->state = AEL_STATE_ERROR;
// 6. 停止数据处理
el->is_running = false;
// 7. 设置停止位,让系统可以从错误中停止下来
xEventGroupSetBits(el->state_event, STOPPED_BIT);
}
}- 尝试清理: 调用
接下来就到了 Pipeline 层(若有),经过进一步上传,由用户层进行最终的错误处理,比如进行恢复操作。
最后
在下一篇中,我们将深入研究Pipeline,这个高级调度器的实现。