ESP-ADF架构探索(II. 深入Element)

本文最后更新于 2025年9月1日 下午

ESP-ADF架构探索(II. 深入Element)

本文是第二篇,从源码探索Element的内部实现,包括其结构与机制、状态与事件等。

Audio Element 是构成 Audio Pipeline 的基础模块。它是一个独立的的处理单元,通常作为 FreeRTOS 的一个任务运行。其核心是将一个复杂的音频任务(如从 SD 卡读取 MP3 文件、解码、然后通过 I2S 输出)分解为一系列独立的、可链接的 Elements(例如:fatfs_stream -> mp3_decoder -> i2s_stream)。

每个 Element 负责一项单一功能,并通过 ringbuf从上一个 Element 接收数据(或从回调函数产生或读入数据),并将处理后的数据发送到下一个 Element。

(I. 初探ADF)

(II. 深入Element)

(III. 深入Pipeline)

本文阅读基础:

  • I 篇文章阅读基础,或已有ADF经验
  • C语言能力,IDF使用经验
  • 一定英文能力

本文 (II. 深入Element) 提供:

  • Element源码节选学习
  • Element结构与机制解析
  • Element状态与事件详细整理、典型流程分析
  • Element API介绍

本文所用源码地址:

esp-adf/components/audio_pipeline/audio_element.c

esp-adf/components/audio_pipeline/audio_event_iface.c

esp-adf/components/audio_stream/i2s_stream.c

前排提醒:

  1. 本文出示的代码均为经删改的伪代码,仅用于逻辑演示。
  2. 本文中一些机械性的叙述由LLM辅助编写,但是经过N次人工校对修改。
  3. 本文(第 II 篇)主要研究ESP-ADF的内部实现和架构,不重点研究应用层。(起因是最近用Rust想写一个嵌入式音频框架 embedded-audio
  4. 文章发布时间为2025年6-7月,请注意是否过时。

ADF仓库结构

espressif/esp-adf: Espressif Advanced Development Framework

1
2
3
4
5
6
7
8
9
esp-adf/
├── components # 核心组件和第三方库
├── docs # 文档
├── examples # 示例代码
├── tools # 实用工具
├── CMakeLists.txt # 顶层构建脚本
├── esp-idf # idf 作为 submodoule
├── README.md # 项目根 README
└── ... # 其他配置文件

我们重点关注components,也就是ADF的核心组件。

核心框架组件

以下是一部分,详见:结构、组件和例程表

  • audio_pipeline: 提供了音频管道 (Audio Pipeline) 的核心实现,负责将音频元素(如输入、解码、输出)连接起来,管理数据流和状态。

  • audio_stream: 提供了音频管道中音频元素的一部分具体实现,包括:http_stream, fatfs_stream, i2s_stream 等音频元素。

  • audio_sal (Audio System Abstraction Layer): 音频系统抽象层,封装 FreeRTOS 的任务、队列、互斥锁、内存管理等功能,为上层应用提供统一接口。

  • audio_hal: 音频硬件抽象层,定义标准接口控制音频编解码芯片 (Codec),如初始化、配置音量、设置采样率,与硬件解耦。

  • esp_codec_dev: 新一代 Codec 设备抽象层,提供通用可扩展接口,支持控制接口(I2C, SPI)和数据接口(I2S)的灵活组合。

  • audio_board: 针对乐鑫官方开发板(如 ESP32-LyraT, ESP32-S3-Korvo-2)的板级支持,初始化引脚、外设和 audio_hal 接口。

  • esp_peripherals: 外设管理服务,统一管理按键、Wi-Fi、SD 卡、触摸板等外设,通过 audio_event_iface 传递事件。

  • wifi_service: 专用 Wi-Fi 服务,封装连接、断线重连、获取 IP 等功能,支持 SmartConfig、AirKiss 等配网方式。

  • bluetooth_service: 蓝牙服务,封装 A2DP(音频流传输)、AVRCP(远程控制)、HFP(免提通话)等协议,简化蓝牙音频开发。

  • audio_mixer: 提供音频混音功能,将多个音频流(如背景音乐和提示音)混合成单声道或双声道音频流。

1. 数据结构: struct audio_element

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
struct audio_element {
/* Functions/RingBuffers */
el_io_func open;
ctrl_func seek;
process_func process;
el_io_func close;
el_io_func destroy;
io_type_t read_type;
union {
ringbuf_handle_t input_rb;
io_callback_t read_cb;
} in;
io_type_t write_type;
union {
ringbuf_handle_t output_rb;
io_callback_t write_cb;
} out;

audio_multi_rb_t multi_in;
audio_multi_rb_t multi_out;

/* Properties */
volatile bool is_open;
audio_element_state_t state;

events_type_t events_type;
audio_event_iface_handle_t iface_event;
audio_callback_t callback_event;

int buf_size;
char *buf;

// ... (related to freertos)

/* PrivateData */
void *data;
EventGroupHandle_t state_event;
int input_wait_time;
int output_wait_time;
int out_buf_size_expect;
int out_rb_size;
volatile bool is_running;
volatile bool task_run;
volatile bool stopping;
};
  • 功能接口 (open, process, close, seek, destroy): 这函数指针由具体的 Element 实现(如 i2s_stream.chttp_stream.c)来定义其特定的行为。

  • I/O: in, out支持两种 I/O 模式:

    • IO_TYPE_RB: 通过RingBuffer ringbuf_handle_t 进行数据交换,这是 Pipeline 中 Element 间链接的标准方式。
    • IO_TYPE_CB: 通过回调函数 (io_callback_t) 进行数据交换,提供了与 Pipeline 外部模块交互的灵活性。
  • 状态和事件: 在[第四节](#4. 状态与事件)会详细讲解。

    • 事件接口 (iface_event): Element有个audio_event_iface_handle_t ,使得 Element 能够接收来自 Pipeline 的命令(如 PAUSE, RESUME, STOP)并向外即时报告状态。

    • state (audio_element_state_t): 是个枚举。Element 的当前状态(如 AEL_STATE_RUNNING, AEL_STATE_PAUSED)。。

    • is_runningstopping 是 volatile bool flags,用于控制任务主循环和处理停止请求。

    • state_event (EventGroupHandle_t): 一个 FreeRTOS EventGroup,用于同步任务状态。

  • 任务配置 (task_stack, task_prio, task_core): 定义了 Element 任务的堆栈大小、优先级和运行核心。如果 task_stack <= 0,则该 Element 不会创建自己的任务,其 process 函数将由外部调用。

  • 数据缓冲区 (buf, buf_size): 每个 Element 内部都有一个工作缓冲区,用于 process 函数处理数据。

  • 信息 (info): audio_element_info_t 存储媒体信息,如采样率、通道数、总字节数和当前播放位置等。

2. 生命周期

一个 Element 的生命周期由其状态机(audio_element_state_t,见[第四节](#4. 状态与事件))驱动,并通过 audio_event_iface 发送的命令切换状态。

2.1. 初始化

audio_element_init(audio_element_cfg_t *config) 函数根据 audio_element_cfg_t 配置来初始化一个 Element 实例,分配 audio_element 结构体内存,初始化其内部的事件接口 (iface_event),并注册 audio_element_on_cmd ,创建互斥锁等。

2.2. 运行

audio_element_run 会创建并启动 Element 的FreeRTOS主任务 (audio_element_task)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
esp_err_t audio_element_run(audio_element_handle_t el)
{
// ...
if (el->task_stack > 0) {
ret = audio_thread_create(&el->audio_thread, el->tag, audio_element_task, el, el->task_stack,
el->task_prio, el->stack_in_ext, el->task_core);
// 等待任务创建信号,确保 run 函数返回时,Element 任务已经成功启动
EventBits_t uxBits = xEventGroupWaitBits(el->state_event, TASK_CREATED_BIT, false, true, DEFAULT_MAX_WAIT_TIME);
// ...
} else { // 对于没有独立任务的 Element,直接设置运行状态
el->task_run = true;
el->is_running = true;
audio_element_force_set_state(el, AEL_STATE_RUNNING);
}
// ...
}

audio_element_task 驱动了整个数据处理流程。一般由pipeline的主任务进行的调用,也可以手动调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void audio_element_task(void *pv)
{
audio_element_handle_t el = (audio_element_handle_t)pv;
el->task_run = true;
xEventGroupSetBits(el->state_event, TASK_CREATED_BIT);// 发送任务创建成功信号
el->buf = audio_calloc(1, el->buf_size); // ...

xEventGroupClearBits(el->state_event, STOPPED_BIT);// 清除旧的停止标志,准备进入循环
while (el->task_run) { //
if ((ret = audio_event_iface_waiting_cmd_msg(el->iface_event)) != ESP_OK) { // 1. 等待并处理命令
xEventGroupSetBits(el->state_event, STOPPED_BIT);
if (ret == AEL_IO_ABORT) { // 收到 DESTROY 命令
break;
}
}
// 2. 运行数据处理逻辑,下面细说
if (audio_element_process_running(el) != ESP_OK) { /*处理错误*/ }
}


if (el->is_open && el->close) { // 任务退出清理
el->close(el);
audio_element_force_set_state(el, AEL_STATE_STOPPED);
} // ...

xEventGroupSetBits(el->state_event, STOPPED_BIT); // 发送停止和销毁完成信号
xEventGroupSetBits(el->state_event, RESUMED_BIT); // 确保任何等待 resume 的调用者都能被释放
xEventGroupSetBits(el->state_event, TASK_DESTROYED_BIT);
// ... 删除任务 ...
}

任务主循环的核心逻辑分为两部分:

  1. 命令处理: 通过 audio_event_iface_waiting_cmd_msg 等待来自 Pipeline 或其他模块的命令(如 STOP, PAUSE)。on_cmd 回调(即 audio_element_on_cmd)会处理这些命令并改变 Element 的状态。
  2. 数据处理: 调用 audio_element_process_running,它会进一步调用用户实现的 process 函数来处理音频数据。

2.3. 数据处理

audio_element_process_running此函数处理数据流,它调用特定 Element 在初始化时注册的process回调(比如mp3 decoder就注册一个解码mp3的process回调)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static esp_err_t audio_element_process_running(audio_element_handle_t el)
{
int process_len = -1; // ...
process_len = el->process(el, el->buf, el->buf_size); // 调用用户定义的 process 函数

if (process_len <= 0) {
switch (process_len) {
// 根据返回值处理不同情况
case AEL_IO_DONE:
case AEL_IO_OK:
audio_element_set_ringbuf_done(el);
audio_element_on_cmd_finish(el); // 发送 FINISH 命令
break;
case AEL_IO_FAIL:
audio_element_report_status(el, AEL_STATUS_ERROR_PROCESS);
audio_element_on_cmd_error(el); // 发送 ERROR 命令
break;
// ... 其他情况处理
}
}
return ESP_OK;
}

3. 数据流

3.1. 数据输入/输出 (I/O) 模式

Element 提供了两种数据 I/O 方式,以适应不同的应用场景。

(但是这种模式仍然不够灵活,并可能有一定的Copy消耗,GMF的进步(TODO)就是在这里动了刀子)

3.1.1. Ringbuffer 模式

这是 Pipeline 内部 Element 之间数据交换的标准方式。上一个 Element 的输出 ringbuf 就是下一个 Element 的输入 ringbuf

  • audio_element_input(el, buffer, size): 从输入 ringbuf (el->in.input_rb) 读取数据。

    1
    in_len = rb_read(el->in.input_rb, buffer, wanted_size, el->input_wait_time);
  • audio_element_output(el, buffer, size): 向输出 ringbuf (el->out.output_rb) 写入数据。

    1
    output_len = rb_write(el->out.output_rb, buffer, write_size, el->output_wait_time);

3.1.2.Callback 模式

当 Element 需要与 Pipeline 外部的模块(如网络、文件系统或其他非 ADF 组件)直接交互时,可以使用回调模式。

audio_element_set_read_cb(el, func, ctx) 注册一个读回调函数。当调用 audio_element_input 时,如果 read_typeIO_TYPE_CB,框架将执行这个注册的回调来获取数据。audio_element_set_write_cb(el, func, ctx)同理。

这种机制在 http_streami2s_stream 等 Element 中被广泛使用,它们的 process 函数内部通过调用 audio_element_inputaudio_element_output 来驱动数据流动,而具体的读写行为则由其各自的 _http_read_i2s_write 等回调函数实现。

3.1.3. 多路输入/输出 (Multi-Stream I/O)

除了标准的单路 I/O,audio_element 还支持混音 (Mixing) 和分流 (Splitting) ,主要是通过multi_output_ringbuf.

在上一篇文章讲解的 element_wav_amr_sdcard 例程中,我们使用audio_element_set_multi_output_ringbuf(i2s_stream_reader,ringbuf11,0)设置了两个输出ringbuf。

  • 相关 API:

    audio_element_set_multi_input/output_ringbuf() : 设置指定索引的输入/输出 Ringbuffer。

    audio_element_multi_input(): 从指定索引的输入 Ringbuffer 读取数据。

    audio_element_multi_output(): 将数据同时写入所有已配置的输出 Ringbuffer。

3.1. 核心处理函数: process 回调

所有数据处理的起点都源于 Element 的 process 回调函数。在 Element 的主任务循环 audio_element_task 中,一旦 Element 处于 AEL_STATE_RUNNING 状态,audio_element_process_running 函数便会持续调用用户在初始化时注册的 process 回调:

1
2
// in audio_element_process_running()
process_len = el->process(el, el->buf, el->buf_size); // 调用用户定义的 process 函数

process 函数的实现者需要在此函数内部,获取数据,处理数据,发送数据。(获取和发送不由element模块直接接管从而在具体的 process 中实现,提供了更高的灵活性)

i2s_stream 为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static int _i2s_process(audio_element_handle_t self, char *in_buffer, int in_len)
{
// 首先通过 audio_element_input 获取上游输入数据,element模块根据配置从RingBuffer或者CallBack读入数据。
int r_size = audio_element_input(self, in_buffer, in_len);
int w_size = 0;
i2s_stream_t *i2s = (i2s_stream_t *)audio_element_getdata(self);
audio_element_info_t i2s_info = {0};
// ... 这里省略了很多内容 ...

// 将数据使用 audio_element_output 填入下游的RingBuffer或者CallBack中
audio_element_multi_output(self, in_buffer, r_size, 0);
w_size = audio_element_output(self, in_buffer, r_size);
audio_element_update_byte_pos(self, w_size); // 并同步位置信息
return w_size;
}

3.3. 寻址 (Seeking)

框架提供了 seek 功能,具体我还没研究。

3.4. 数据流信息

在数据处理过程中,Element 需要动态地管理和报告流信息,如播放进度、码率等。框架提供了一组原子化的信息更新接口。

细粒度更新 API:

  • audio_element_update_byte_pos() / audio_element_set_byte_pos(): 更新或设置当前处理的字节位置 byte_pos
  • audio_element_set_total_bytes(): 设置总字节数 total_bytes
  • audio_element_set_music_info(): 一次性设置采样率、通道数和位深。
  • audio_element_set_bps(): 设置码率(bits per second)。
  • audio_element_set_duration(): 设置总时长(秒)。

一个解码器 Element 在 process 中解析出媒体元数据后,会调用自身 set_music_info 等函数来初始化信息。在处理每个数据块后,它会调用 update_byte_pos 来更新进度。这些更新后的信息可以通过 audio_element_report_posaudio_element_report_info 异步报告给上层应用,用于实现播放进度条等功能。

4. 状态与事件

不得不说,这个状态机有点繁琐和复杂,好在GMF抛下了历史包袱(TODO)。

一个典型的 audio_element 控制流程如下:

  1. audio_pipeline 或用户代码 通过调用 audio_element_pause() 等 API 发起一个操作。
  2. 该 API 将一个命令(如 AEL_MSG_CMD_PAUSE)通过 iface_event 的内部队列发送给 Element 自己的任务。
  3. 同时,API 函数使用 state_event 事件组来阻塞等待,直到 Element 任务确认该操作已完成。
  4. Element 的 audio_element_task 任务在其主循环中被命令唤醒,并执行相应的命令处理函数(如 audio_element_on_cmd_pause)。
  5. 命令处理函数会更新 state 状态枚举,并根据需要设置 is_runningstopping 标志。
  6. 操作完成后,命令处理函数会设置 state_event 中对应的事件位,以唤醒并通知外部调用者操作已完成。

接下来,我们将详细解析每个组件。

4.1. state - 逻辑状态

audio_element_state_t是一个枚举类型,定义了 Element 从初始化到销毁的宏观逻辑状态。 主要状态包括:

状态码 描述
AEL_STATUS_NONE 无状态
AEL_STATUS_ERROR_OPEN 打开 Element 失败
AEL_STATUS_ERROR_INPUT 从上游读取数据时出错
AEL_STATUS_ERROR_PROCESS process 回调中处理数据时出错
AEL_STATUS_ERROR_OUTPUT 向下游写入数据时出错
AEL_STATUS_ERROR_CLOSE 关闭 Element 时出错
AEL_STATUS_ERROR_TIMEOUT 操作超时
AEL_STATUS_ERROR_UNKNOWN 未知错误
AEL_STATUS_INPUT_DONE 上游数据已全部读取完毕
AEL_STATUS_INPUT_BUFFERING 正在缓冲输入数据
AEL_STATUS_OUTPUT_DONE 下游数据已全部写入完毕
AEL_STATUS_OUTPUT_BUFFERING 正在缓冲输出数据
AEL_STATUS_STATE_RUNNING RUNNING
AEL_STATUS_STATE_PAUSED PAUSED
AEL_STATUS_STATE_STOPPED STOPPED
AEL_STATUS_STATE_FINISHED FINISHED
AEL_STATUS_MOUNTED 设备已挂载。
AEL_STATUS_UNMOUNTED 设备已卸载

state 是对 Element 当前工作阶段的顶层抽象描述。它被外部调用者(如 audio_pipeline)和 Element 内部任务共同用来判断 Element 的行为。例如,Pipeline 根据此状态决定是否可以向 Element 发送数据,而 Element 内部则根据它来决定是否要执行 process 函数。

内部: 该状态只应在 Element 的任务线程内部被修改,通常是在响应特定命令后。例如,在 audio_element_on_cmd_pause 中(这是一个内部函数),当暂停逻辑执行完毕后,状态会被设置为 AEL_STATE_PAUSED

外部: 通过 audio_element_get_state() API 获取当前状态。

4.2. state_event - 线程状态同步

只应在Element内部使用。

它是一个 FreeRTOS 的 EventGroupHandle_t,利用其事件位 (Event Bits) 作为同步标志。

当一个外部线程(如应用主任务)调用一个阻塞 API(如 audio_element_pause)时,它需要等待 Element 的内部任务实际完成这个操作。state_event 正是

外部 API 在发送命令后,会调用 xEventGroupWaitBits() 来阻塞自身,等待特定的事件位。

1
2
3
4
5
6
// audio_element.c -> audio_element_pause()
xEventGroupClearBits(el->state_event, PAUSED_BIT);
if (audio_element_cmd_send(el, AEL_MSG_CMD_PAUSE) != ESP_OK) {
return ESP_FAIL;
}
EventBits_t uxBits = xEventGroupWaitBits(el->state_event, PAUSED_BIT, false, true, DEFAULT_MAX_WAIT_TIME);

Element 的内部任务在完成了相应的操作后,会调用 xEventGroupSetBits()来设置对应的事件位,从而唤醒(unblock)正在等待的外部线程。

1
2
// audio_element.c -> audio_element_on_cmd_pause (在任务线程中被调用)
xEventGroupSetBits(el->state_event, PAUSED_BIT);
说明
BIT0 STOPPED_BIT 标志元素已完成停止流程,进入 STOPPED, FINISHED, 或 ERROR 状态。通常由 audio_element_on_cmd_stop 等函数设置,由 audio_element_wait_for_stop 等待。
BIT1 STARTED_BIT 标志元素的 open 回调已成功执行。
BIT2 BUFFER_REACH_LEVEL_BIT 标志输出 ringbuf 的数据量已达到 audio_element_wait_for_buffer 函数所期望的水平。
BIT3 TASK_CREATED_BIT 标志元素的后台任务已成功创建并即将进入主循环。由任务自身设置,由 audio_element_run 等待。
BIT4 TASK_DESTROYED_BIT 标志元素的后台任务已完成所有清理工作,即将自我销毁。由任务自身设置,由 audio_element_terminate 等待。
BIT5 PAUSED_BIT 标志元素已成功进入 PAUSED 状态。由 audio_element_on_cmd_pause 设置,由 audio_element_pause 等待。
BIT6 RESUMED_BIT 标志元素已成功从暂停或停止状态中恢复到运行状态。由 audio_element_on_cmd_resume 设置,由 audio_element_resume 等待。

4.3. iface_event - 命令与事件通信

audio_event_iface_handle_t它是一个事件接口句柄,其内部封装了两个 FreeRTOS 队列:一个用于内部命令 (internal_queue),一个用于向外部报告事件(external_queue)。

audio_element_pause等外向API,调用 audio_event_iface_cmd(),将一个audio_event_iface_msg_t消息(例如AEL_MSG_CMD_PAUSE)发送到 Element 的internal_queue

1
2
3
4
5
6
// audio_element.c -> audio_element_change_cmd()
esp_err_t audio_element_change_cmd(audio_element_handle_t el, audio_element_msg_cmd_t cmd)
{
AUDIO_NULL_CHECK(TAG, el, return ESP_ERR_INVALID_ARG);
return audio_element_cmd_send(el, cmd); // audio_element_cmd_send 内部调用 audio_event_iface_cmd
}

Element 的主任务 audio_element_task 在其循环中调用 audio_event_iface_waiting_cmd_msg(),这将阻塞地从 internal_queue 中获取消息。获取到消息后,会调用注册的 on_cmd 回调(即 audio_element_on_cmd_pause)来执行具体的命令逻辑。

当 Element 需要向外报告状态时(如 process 返回 AEL_IO_DONE),它会调用 audio_element_report_status() 等辅助函数,这些函数最终会调用 audio_event_iface_sendout(),将一个报告消息发送到 external_queue,供 audio_pipeline 或其他监听者消费。

控制命令 (由外部发送给元素) enum值
AEL_MSG_CMD_FINISH 2 通知元素数据流已结束,应进入 FINISHED 状态。通常由元素自身在处理完数据后间接触发。
AEL_MSG_CMD_STOP 3 命令元素停止当前操作,清理资源(调用 close),并进入 STOPPED 状态。
AEL_MSG_CMD_PAUSE 4 命令元素暂停数据处理,并进入 PAUSED 状态。
AEL_MSG_CMD_RESUME 5 命令元素从 PAUSEDSTOPPED 状态恢复,重新初始化(调用 open)并进入 RUNNING 状态。
AEL_MSG_CMD_DESTROY 6 命令元素永久终止其后台任务并准备被销毁。
报告类型 (由元素发送给外部)
AEL_MSG_CMD_REPORT_STATUS 8 标志该消息是一个状态报告,其 data 字段携带一个 audio_element_status_t 状态码。
AEL_MSG_CMD_REPORT_MUSIC_INFO 9 标志该消息是一个媒体元数据报告,其 data 字段携带 audio_element_info_t 结构体。
AEL_MSG_CMD_REPORT_CODEC_FMT 10 标志该消息报告了已识别的音频编码格式。
AEL_MSG_CMD_REPORT_POSITION 11 标志该消息报告了当前的播放/处理进度(字节位置)。

4.4. is_running, task_run 和 stopping - 即时状态标志

只应在Element内部使用。

它们是volatile bool 类型的标志位。它们提供了比 state 更轻量、更即时的状态控制,主要用于在 Element 任务的单个循环迭代中快速判断是否应继续执行数据处理或中断 I/O。

  • **is_running **:

    • 设置: 在 audio_element_on_cmdRESUME 分支中被设为 true,在 PAUSESTOPFINISH 等分支中被设为 false
    • 检查: 在 audio_element_process_running 函数的入口处被检查。如果 is_runningfalse,该函数会立即返回,从而跳过 process 回调的执行。这确保了在接收到暂停或停止命令后,能迅速停止数据处理,即使 state 的完整转换还未完成。
  • **task_run **:

    • 设置: 在 audio_element_task 的开始设置为 true,在task结束或错误的时候设为 false。还在audio_element_runaudio_element_terminate等函数中被设置为对应的值。
    • 检查: 在 audio_element_runaudio_element_terminate 等函数中检查,比如,pause、terminate时检查是否已经停止。
  • **stopping **:

    • 设置: 在 audio_element_stop() API 中设为 true,在发送命令之前。
    • 检查: 主要由 ringbuf.c 中的 I/O 函数(如 rb_readrb_write)通过 audio_element_is_stopping() 检查。当 I/O 函数发现该标志为 true 时,会立即中止阻塞等待,并返回 AEL_IO_ABORT。这确保了当外部请求停止时,即使 Element 正阻塞在 ringbuf 的读写上,也能被快速唤醒并响应停止命令。用户在自定义的 process 实现中,也应在耗时操作前后检查此标志,以便快速退出。

4.5. 状态与事件API

1. 外部控制与同步 API

这类 API 是 audio_element 提供给外部调用者(如 audio_pipeline 或用户应用)的高级接口,用于控制元素的生命周期。它们的核心工作模式是:通过 iface_event 异步发送命令,然后通过 state_event 事件组同步等待操作完成的确认信号。

  • audio_element_run
    启动元素的后台任务,并等待任务成功创建的信号。
  • audio_element_stop
    请求元素停止。它会立即设置 stopping 标志以中断 I/O,然后发送异步的 STOP 命令,立即返回。
  • audio_element_wait_for_stop
    阻塞调用线程,同步等待元素进入停止状态(可为STOPPED, FINISHED, ERROR)。可与前者结合使用。
  • audio_element_pause
    请求元素暂停。它发送异步的 PAUSE 命令,并等待暂停完成的信号。
  • audio_element_resume
    请求元素恢复。它发送异步的 RESUME 命令,并等待恢复完成的信号。
  • audio_element_terminate
    请求元素永久终止。它发送异步的 DESTROY 命令,并等待任务完全销毁的信号。
  • audio_element_wait_for_buffer
    阻塞调用线程,同步等待元素的输出 ringbuf 填充到指定水平。
  • audio_element_change_cmd
    更底层的命令发送接口,用于直接向元素发送一个指定的 audio_element_msg_cmd_t 命令。
  • audio_element_reset_state
    强制将元素的内部逻辑状态 state 变量重置为 AEL_STATE_INIT

2. 内部命令处理 API(on_cmd)

这类函数不被外部直接调用,它们是 audio_element.c 内部的 static 函数。当 audio_element_taskiface_event 收到命令时,会调用这些函数来执行具体的状态转换逻辑。它们是实际操作 stateis_runningstate_event 的地方。

  • audio_element_on_cmd
    作为命令分发器,在元素的任务上下文中被调用,根据收到的消息 cmd 来调用下面具体的处理函数。

  • audio_element_on_cmd_stop
    处理 STOP 命令。它会调用 close() 回调,更新 stateAEL_STATE_STOPPED,设置 is_runningfalse,并设置 state_eventSTOPPED_BIT 来唤醒等待者。

  • audio_element_on_cmd_pause
    处理 PAUSE 命令。它更新 stateAEL_STATE_PAUSED,设置 is_runningfalse,并设置 state_eventPAUSED_BIT

  • audio_element_on_cmd_resume
    处理 RESUME 命令。它会调用 open() 回调,更新 stateAEL_STATE_RUNNING,设置 is_runningtrue,并设置 state_eventRESUMED_BIT

  • audio_element_on_cmd_finish
    当数据流正常结束时被内部调用。它更新 stateAEL_STATE_FINISHED,设置 is_runningfalse,并设置 state_eventSTOPPED_BIT

  • audio_element_on_cmd_error
    当发生错误时被内部调用。它更新 stateAEL_STATE_ERROR,设置 is_runningfalse,并设置 state_eventSTOPPED_BIT

3. 事件接口通信 API (iface_event)

这些是 audio_event_iface.h 中定义的函数,构成了元素异步通信的基石。它们操作 iface_event 句柄内部的两个 FreeRTOS 队列。

  • audio_event_iface_cmd
    向事件接口的内部命令队列发送一条消息。外部控制 API(如 audio_element_stop)通过此函数将命令传递给元素任务。

  • audio_event_iface_cmd_from_isr
    audio_event_iface_cmd 的中断安全版本。

  • audio_event_iface_waiting_cmd_msg
    使元素任务阻塞等待其内部命令队列,直到收到新命令或超时。这是元素任务事件驱动的核心。

  • audio_event_iface_sendout
    向事件接口的外部报告队列发送一条消息。内部上报 API(如 audio_element_report_status)通过此函数将事件和状态信息传递给外部监听者。

  • audio_event_iface_set_listener
    将一个事件接口(如 audio_pipelineiface)设置为另一个事件接口(如 audio_elementiface)的监听者。这样,element 通过 sendout 发送的事件就能被 pipeline 接收到。

4. 内部状态与信息上报 API

这类 API 由元素实现者在 processopen 等回调函数中调用。它们通过 iface_eventsendout 机制,将元素的内部状态变化和关键信息异步地报告给外部。

  • audio_element_report_status
    上报一个 audio_element_status_t 状态码,通知外部监听者元素内部发生的具体事件(如 AEL_STATUS_ERROR_INPUT, AEL_STATUS_STATE_FINISHED)。

  • audio_element_report_info
    上报完整的 audio_element_info_t 结构体,通常在获取到媒体元数据(如采样率、总时长)后调用。

  • audio_element_report_codec_fmt
    上报识别出的音频编码格式。

  • audio_element_report_pos
    上报当前的播放/处理字节位置。

  • audio_element_set_ringbuf_done
    在元素的输出 ringbuf 上设置一个完成标志(EOF),这是通知下游元素数据已经结束的特定信令。

5. 状态与信息查询 API

这类 API 用于从外部安全地查询元素的当前状态和信息。它们通常是线程安全的,内部会使用互斥锁来保护被访问的数据。

  • audio_element_get_state
    获取元素当前的高层逻辑状态 (audio_element_state_t)。
  • audio_element_getinfo
    获取元素的完整 audio_element_info_t 结构体信息。
  • audio_element_get_uri
    获取元素当前配置的 URI。
  • audio_element_is_stopping
    查询 stopping 紧急停止标志位,判断元素是否正在响应一个 stop 请求。

5. 一些控制流程

audio_element_resume 恢复/启动流程

resume 操作用于启动或从暂停状态恢复 Element。这是一个异步过程。

  1. 应用层调用 audio_element_resume(el, ...) (涉及 iface_event, state_event)
    此函数负责发起 RESUME 命令并等待,直到 Element 内部任务确认完成恢复操作。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    esp_err_t audio_element_resume(audio_element_handle_t el, float wait_for_rb_threshold, TickType_t timeout)
    {
    if (!el->task_run) { /*处理错误*/ }
    if (el->state == AEL_STATE_RUNNING) { /* 已经在跑 */ }
    if (el->task_stack <= 0) { /*先不关注*/ }

    // 1. 清除旧的 RESUMED_BIT 事件标志位,为本次同步做准备
    xEventGroupClearBits(el->state_event, RESUMED_BIT);

    // 2. 发送异步命令到 Element 内部队列
    if (audio_element_cmd_send(el, AEL_MSG_CMD_RESUME) == ESP_FAIL) { /*处理错误*/ }

    // 3. 阻塞当前线程(也就是应用层调用这个pause API的线程),等待 Element 任务处理完成并设置 RESUMED_BIT
    EventBits_t uxBits = xEventGroupWaitBits(el->state_event, RESUMED_BIT, false, true, timeout);
    }
  2. Element 任务主循环 audio_element_task(void *pv) (涉及 iface_event)
    Element 的核心任务在 audio_element_task 中运行。它是一个循环,大部分时间阻塞在 audio_event_iface_waiting_cmd_msg(),等待新的命令或事件。当 RESUME 命令到达时,audio_event_iface_waiting_cmd_msg 会调用注册的 audio_element_on_cmd 回调函数来处理该命令。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    void audio_element_task(void *pv)
    {
    // ... 初始化 ...
    while (el->task_run) {
    // 4. 任务在此等待命令,当 RESUME 命令到达时,函数返回,并由 on_cmd 回调处理
    if (audio_event_iface_waiting_cmd_msg(el->iface_event) != ESP_OK) { /* 异常处理 */ }
    // 5. 如果 Element 处于运行状态,则处理数据
    audio_element_process_running(el);
    }
    // ... 清理和退出 ...
    }
  3. 命令处理函数 audio_element_on_cmd_resume(el) (涉及 state, is_running, state_event)
    此函数是 RESUME 命令的最终执行者。

    • 调用 open 回调: 执行 audio_element_process_init(el),它会调用用户注册的 el->open(el) 函数来重新初始化资源。
    • 发出同步信号: 一旦 open 成功,设置 el->state = AEL_STATE_RUNNING,并通过 xEventGroupSetBits 设置 RESUMED_BIT,唤醒在第 1 步中等待的 audio_element_resume 函数。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    // audio_element_on_cmd() -> case AEL_MSG_CMD_RESUME:
    static esp_err_t audio_element_on_cmd_resume(audio_element_handle_t el)
    {
    // 6. 设置运行标志
    el->is_running = true;
    // 7. 发出同步信号,通知 API 调用者操作已开始
    xEventGroupSetBits(el->state_event, RESUMED_BIT);

    // 8. 调用 open 回调进行初始化,如果失败则回滚
    if (audio_element_process_init(el) != ESP_OK) {
    el->is_running = false;
    /* 处理错误 */
    }
    }

    // audio_element_process_init() 会调用用户定义的 open 函数
    esp_err_t audio_element_process_init(audio_element_handle_t el)
    {
    if (el->open == NULL) { /* 无 open 回调,直接成功 */ }

    el->is_open = true;
    audio_element_force_set_state(el, AEL_STATE_INITIALIZING);
    esp_err_t ret = el->open(el); // <-- 调用用户注册的 open 函数

    if (ret == ESP_OK) {
    audio_element_force_set_state(el, AEL_STATE_RUNNING);
    audio_element_report_status(el, AEL_STATUS_STATE_RUNNING);
    } else { /* 处理 open 失败 */ }
    }

audio_element_stop 停止流程

stop 操作与 resume 不同,此函数发送一个 STOP 命令,但他不等待。如果需要等待,则应调用 audio_element_wait_for_stop(el)

  1. 应用层调用 audio_element_stop(el) (涉及 iface_event, stopping)

    • 中断数据流: 调用 rb_abort 中断输入和输出 Ringbuffer,让 I/O 操作快速响应。
    • 发送异步命令: 发送 AEL_MSG_CMD_STOP 命令。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    esp_err_t audio_element_stop(audio_element_handle_t el)
    {
    // 1. 中断输入输出 Ringbuffer。(对于是RingBuffer还是CallBack的判断是在这个abort函数内部的)
    audio_element_abort_output_ringbuf(el);
    audio_element_abort_input_ringbuf(el);

    if (el->stopping) { /* 防止重复发送 */ }

    el->stopping = true;
    // 2. 发送停止命令
    if (audio_element_cmd_send(el, AEL_MSG_CMD_STOP) != ESP_OK) {
    el->stopping = false;
    /* 处理错误 */
    }
    }
  2. 命令处理函数 audio_element_on_cmd_stop(el) (涉及 state, is_running, state_event)
    当 Element 任务收到 STOP 命令后,audio_event_iface_waiting_cmd_msg 会调用此函数以执行停止逻辑。

    • 反初始化: 调用 audio_element_process_deinit(el),执行用户注册的 close 回调。
    • 更新状态: 设置 el->state = AEL_STATE_STOPPEDel->is_running = false
    • 发出同步信号: 设置 STOPPED_BIT,通知等待者(如audio_element_wait_for_stop) Element 已停止。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    // audio_element_on_cmd() -> case AEL_MSG_CMD_STOP:
    static esp_err_t audio_element_on_cmd_stop(audio_element_handle_t el)
    {
    if ((el->state != AEL_STATE_FINISHED) && (el->state != AEL_STATE_STOPPED)) {
    // 3. 调用 close 回调释放资源
    audio_element_process_deinit(el);
    // 4. 更新状态为 STOPPED
    el->state = AEL_STATE_STOPPED;
    audio_element_report_status(el, AEL_STATUS_STATE_STOPPED);
    // 5. 停止数据处理循环
    el->is_running = false;
    el->stopping = false;
    // 6. 设置 STOPPED_BIT,唤醒等待者
    xEventGroupSetBits(el->state_event, STOPPED_BIT);
    } else {
    /* 处理其他情况,例如从 ERROR 或 FINISHED 状态强制变为 STOPPED */
    }
    }

    // audio_element_process_deinit() 会调用用户定义的 close 函数
    esp_err_t audio_element_process_deinit(audio_element_handle_t el)
    {
    if (el->is_open && el->close) {
    el->close(el); // <-- 调用用户注册的 close 函数
    }
    el->is_open = false;
    }

    随即回到audio_element_task的主循环,因为已经stop,所以audio_element_process_running内部将不会执行,随即退出循环。

数据流正常结束 (Finish)

当一个 Element(例如解码器)处理完所有输入数据时,它会进入 FINISH 状态。

  1. Process 回调 el->process(el, ...) (涉及 state)
    流程的起点是 Element 的数据处理函数 el->process() 返回 AEL_IO_DONE

  2. Process 运行器 audio_element_process_running() (涉及 iface_event)
    该函数在任务循环中检查 process() 的返回值。

    • 捕获 AEL_IO_DONE: 一旦 process() 返回 AEL_IO_DONE ,启动结束流程。
    • 通知下游: 调用 audio_element_set_ringbuf_done(el),通知下游数据流已结束。
    • 执行 Finish 命令: 调用 audio_element_on_cmd_finish(el)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    // audio_element_process_running()
    static esp_err_t audio_element_process_running(audio_element_handle_t el)
    {
    process_len = el->process(el, el->buf, el->buf_size);
    if (process_len <= 0) {
    switch (process_len) {
    case AEL_IO_DONE:
    case AEL_IO_OK:
    // 1. 通知下游 Element 数据已结束
    audio_element_set_ringbuf_done(el);
    // 2. 触发 Finish 流程
    audio_element_on_cmd_finish(el);
    break;
    /* ... 其他 case ... */
    }
    }
    }
  3. Finish 命令处理 audio_element_on_cmd_finish(el) (涉及 state, is_running, state_event, iface_event)
    此函数负责将 Element 置于 FINISHED 状态。

    • 资源清理: 调用 audio_element_process_deinit(el) 来执行 close() 回调。
    • 状态变更与报告: 设置状态为 FINISHED,停止运行,并向上层报告。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // audio_element_on_cmd_finish()
    static esp_err_t audio_element_on_cmd_finish(audio_element_handle_t el)
    {
    /* 状态检查 */
    // 3. 调用 close 回调
    audio_element_process_deinit(el);
    // 4. 更新状态
    el->state = AEL_STATE_FINISHED;
    audio_element_report_status(el, AEL_STATUS_STATE_FINISHED);
    // 5. 停止数据处理
    el->is_running = false;
    // 6. 设置停止位,兼容 wait_for_stop
    xEventGroupSetBits(el->state_event, STOPPED_BIT);
    }

错误

当发生不可恢复的错误(比如IO错误或者数据处理错误)时,Element 会进入 ERROR 状态。,我们自下往上看一看:

  1. Process 或 I/O 函数 process() / rb_read() / rb_write() (涉及 state)
    无论是 process 函数返回 AEL_PROCESS_FAIL,还是 I/O 操作返回 AEL_IO_FAIL,都会触发错误处理。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // audio_element_input() 中对 rb_read 的错误处理
    audio_element_err_t audio_element_input(audio_element_handle_t el, char *buffer, int wanted_size)
    {
    in_len = rb_read(el->in.input_rb, buffer, wanted_size, el->input_wait_time);
    if (in_len <= 0) {
    switch (in_len) {
    case AEL_IO_FAIL:
    // 1. 立即向上层报告 I/O 错误状态
    audio_element_report_status(el, AEL_STATUS_ERROR_INPUT);
    break;
    // ...
    }
    }
    }
  2. audio_element_process_running() (涉及 iface_event)
    audio_element_process_running 捕获错误返回码。

* **立即上报错误**: 调用 `audio_element_report_status()` 快速将错误报告给上层。
* **进入Element错误处理流程**: 调用 `audio_element_on_cmd_error(el)`。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// audio_element_process_running()
static esp_err_t audio_element_process_running(audio_element_handle_t el)
{
process_len = el->process(el, el->buf, el->buf_size);
if (process_len <= 0) {
switch (process_len) {
case AEL_IO_FAIL:
case AEL_PROCESS_FAIL:
// 2. 报告处理错误
audio_element_report_status(el, AEL_STATUS_ERROR_PROCESS);
// 3. 触发 Error 流程
audio_element_on_cmd_error(el);
break;
/* ... 其他 case ... */
}
}
}
  1. Error 命令处理 audio_element_on_cmd_error(el) (涉及 state, is_running, state_event)
    此函数负责将 Element 置于错误状态。

    • 尝试清理: 调用 audio_element_process_deinit(el) 尝试安全地执行 close()
    • 设置错误状态: 设置 el->state = AEL_STATE_ERROR 并停止运行。
    • 设置停止位: 设置 STOPPED_BIT,将 Element 停在错误状态。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // audio_element_on_cmd_error()
    static esp_err_t audio_element_on_cmd_error(audio_element_handle_t el)
    {
    if (el->state != AEL_STATE_STOPPED) {
    // 4. 尝试调用 close 回调释放资源
    audio_element_process_deinit(el);
    // 5. 设置状态为 ERROR
    el->state = AEL_STATE_ERROR;
    // 6. 停止数据处理
    el->is_running = false;
    // 7. 设置停止位,让系统可以从错误中停止下来
    xEventGroupSetBits(el->state_event, STOPPED_BIT);
    }
    }

接下来就到了 Pipeline 层(若有),经过进一步上传,由用户层进行最终的错误处理,比如进行恢复操作。

最后

在下一篇中,我们将深入研究Pipeline,这个高级调度器的实现。

传送门:(III. 深入Pipeline)


ESP-ADF架构探索(II. 深入Element)
https://decaday.github.io/blog/exploring-esp-adf-ii/
作者
星期十
发布于
2025年6月10日
更新于
2025年9月1日
许可协议