OP-TEE GP存储并发访问控制机制分析
并发访问控制架构
OP-TEE存储系统设计了多层次的并发访问控制机制,确保在多TA、多线程环境下的数据一致性和线程安全。Copy
┌─────────────────────────────────────────────────────────────┐
│ TA Session Layer │ ← TA会话级锁定
├─────────────────────────────────────────────────────────────┤
│ Object Handle Layer │ ← 对象句柄级锁定
├─────────────────────────────────────────────────────────────┤
│ Persistent Object Layer │ ← 持久化对象级锁定
├─────────────────────────────────────────────────────────────┤
│ Backend Lock Layer │ ← 后端存储锁定
├─────────────────────────────────────────────────────────────┤
│ Hash Tree Layer │ ← 哈希树并发控制
├─────────────────────────────────────────────────────────────┤
│ RPC Layer │ ← RPC序列化访问
└─────────────────────────────────────────────────────────────┘
核心并发控制机制
1. 对象级访问控制
持久化对象引用计数
位置:optee_os/core/tee/tee_pobj.c
Copy
struct tee_pobj {
TAILQ_ENTRY(tee_pobj) link;
uint32_t refcount; // 引用计数
struct mutex lock; // 对象级互斥锁
uint32_t flags; // 访问标志
bool temporary; // 临时对象标志
TEE_UUID uuid; // TA UUID
void *objectID; // 对象ID
uint32_t objectID_len; // 对象ID长度
const struct tee_file_operations *fops; // 文件操作
void *fh; // 文件句柄
};
// 对象引用管理
static struct tee_pobj *tee_pobj_get(const TEE_UUID *uuid,
void *obj_id, uint32_t obj_id_len)
{
struct tee_pobj *po;
mutex_lock(&pobj_list_mutex);
// 查找现有对象
TAILQ_FOREACH(po, &tee_pobj_head, link) {
if (uuid_equal(&po->uuid, uuid) &&
obj_id_len == po->objectID_len &&
!memcmp(obj_id, po->objectID, obj_id_len)) {
// 增加引用计数
po->refcount++;
mutex_unlock(&pobj_list_mutex);
return po;
}
}
mutex_unlock(&pobj_list_mutex);
return NULL;
}
static void tee_pobj_put(struct tee_pobj *po)
{
mutex_lock(&pobj_list_mutex);
assert(po->refcount > 0);
po->refcount--;
// 如果引用计数为0,清理对象
if (po->refcount == 0) {
TAILQ_REMOVE(&tee_pobj_head, po, link);
tee_pobj_release(po);
}
mutex_unlock(&pobj_list_mutex);
}
对象访问标志控制
Copy
// 对象访问标志检查
static TEE_Result check_access_conflict(struct tee_pobj *po, uint32_t flags)
{
mutex_lock(&po->lock);
// 检查写访问冲突
if ((flags & TEE_DATA_FLAG_ACCESS_WRITE) &&
(po->flags & TEE_DATA_FLAG_ACCESS_WRITE)) {
mutex_unlock(&po->lock);
return TEE_ERROR_ACCESS_CONFLICT;
}
// 检查独占访问
if ((flags & TEE_DATA_FLAG_EXCLUSIVE) ||
(po->flags & TEE_DATA_FLAG_EXCLUSIVE)) {
if (po->refcount > 1) {
mutex_unlock(&po->lock);
return TEE_ERROR_ACCESS_CONFLICT;
}
}
// 更新访问标志
po->flags |= flags;
mutex_unlock(&po->lock);
return TEE_SUCCESS;
}
2. 存储后端并发控制
REE文件系统并发控制
位置:optee_os/core/tee/tee_ree_fs.c
Copy
// REE FS文件结构
struct ree_fs_file {
struct mutex lock; // 文件级锁
struct ree_fs_file_meta meta; // 文件元数据
bool meta_dirty; // 元数据脏标志
struct block_cache *block_cache; // 块缓存
uint8_t *backup_version; // 备份版本(COW)
bool cow_active; // COW活动标志
};
// 文件级锁定机制
static TEE_Result ree_fs_file_lock(struct ree_fs_file *f, bool exclusive)
{
if (exclusive) {
mutex_lock(&f->lock);
// 等待所有读操作完成
while (f->read_count > 0) {
condvar_wait(&f->read_complete, &f->lock);
}
} else {
mutex_lock(&f->lock);
f->read_count++;
mutex_unlock(&f->lock);
}
return TEE_SUCCESS;
}
static void ree_fs_file_unlock(struct ree_fs_file *f, bool exclusive)
{
if (exclusive) {
mutex_unlock(&f->lock);
} else {
mutex_lock(&f->lock);
f->read_count--;
if (f->read_count == 0) {
condvar_broadcast(&f->read_complete);
}
mutex_unlock(&f->lock);
}
}
写时复制并发安全
Copy
// COW操作的原子性保证
static TEE_Result ree_fs_cow_operation(struct ree_fs_file *f)
{
mutex_lock(&f->lock);
// 检查COW状态
if (f->cow_active) {
// 另一个线程正在进行COW操作
mutex_unlock(&f->lock);
return TEE_ERROR_BUSY;
}
// 标记COW开始
f->cow_active = true;
// 创建备份版本
f->backup_version = create_backup_copy(f);
if (!f->backup_version) {
f->cow_active = false;
mutex_unlock(&f->lock);
return TEE_ERROR_OUT_OF_MEMORY;
}
mutex_unlock(&f->lock);
// 执行写操作...
// 提交或回滚
mutex_lock(&f->lock);
if (operation_success) {
commit_cow_changes(f);
} else {
rollback_cow_changes(f);
}
f->cow_active = false;
mutex_unlock(&f->lock);
return TEE_SUCCESS;
}
RPMB文件系统并发控制
位置:optee_os/core/tee/tee_rpmb_fs.c
Copy
// RPMB全局锁机制
static struct mutex rpmb_mutex = MUTEX_INITIALIZER;
// RPMB操作序列化
static TEE_Result rpmb_fs_operation(enum rpmb_op_type op, void *data)
{
TEE_Result res;
// RPMB操作必须序列化
mutex_lock(&rpmb_mutex);
switch (op) {
case RPMB_READ:
res = rpmb_read_operation(data);
break;
case RPMB_WRITE:
res = rpmb_write_operation(data);
break;
case RPMB_GET_COUNTER:
res = rpmb_get_counter_operation(data);
break;
default:
res = TEE_ERROR_BAD_PARAMETERS;
}
mutex_unlock(&rpmb_mutex);
return res;
}
// RPMB缓存并发控制
struct rpmb_cache_entry {
struct mutex lock; // 缓存条目锁
bool valid; // 有效标志
bool dirty; // 脏标志
uint16_t block_id; // 块ID
uint8_t data[RPMB_BLOCK_SIZE]; // 数据
struct condvar write_complete; // 写完成条件变量
};
static TEE_Result rpmb_cache_get_block(uint16_t block_id,
struct rpmb_cache_entry **entry)
{
struct rpmb_cache_entry *e = find_cache_entry(block_id);
if (e) {
mutex_lock(&e->lock);
// 等待写操作完成
while (e->dirty) {
condvar_wait(&e->write_complete, &e->lock);
}
*entry = e;
return TEE_SUCCESS;
}
return TEE_ERROR_ITEM_NOT_FOUND;
}
3. 哈希树并发控制
位置:optee_os/core/tee/fs_htree.c
Copy
// 哈希树结构
struct tee_fs_htree {
struct mutex tree_lock; // 树级锁
struct tee_fs_htree_meta meta; // 元数据
bool meta_dirty; // 元数据脏标志
struct htree_node_cache *cache; // 节点缓存
uint32_t update_generation; // 更新代数
};
// 哈希树读锁定
static TEE_Result htree_read_lock(struct tee_fs_htree *ht)
{
mutex_lock(&ht->tree_lock);
ht->reader_count++;
mutex_unlock(&ht->tree_lock);
return TEE_SUCCESS;
}
// 哈希树写锁定
static TEE_Result htree_write_lock(struct tee_fs_htree *ht)
{
mutex_lock(&ht->tree_lock);
// 等待所有读者完成
while (ht->reader_count > 0) {
condvar_wait(&ht->readers_done, &ht->tree_lock);
}
// 独占访问
ht->writer_active = true;
return TEE_SUCCESS;
}
// 原子更新机制
static TEE_Result htree_atomic_update(struct tee_fs_htree *ht,
struct update_operation *ops,
size_t num_ops)
{
TEE_Result res;
uint32_t old_generation;
htree_write_lock(ht);
// 保存当前代数
old_generation = ht->update_generation;
// 开始原子更新
for (size_t i = 0; i < num_ops; i++) {
res = apply_update_operation(ht, &ops[i]);
if (res != TEE_SUCCESS) {
// 回滚到原始状态
rollback_to_generation(ht, old_generation);
htree_write_unlock(ht);
return res;
}
}
// 提交更新
ht->update_generation++;
commit_updates(ht);
htree_write_unlock(ht);
return TEE_SUCCESS;
}
4. RPC并发控制
位置:optee_os/core/tee/tee_fs_rpc.c
Copy
// RPC调用序列化
static struct mutex rpc_fs_mutex = MUTEX_INITIALIZER;
// RPC文件系统调用
static TEE_Result tee_fs_rpc_call(uint32_t cmd, void *params)
{
TEE_Result res;
// RPC调用必须序列化
mutex_lock(&rpc_fs_mutex);
res = thread_rpc_cmd(cmd, params);
mutex_unlock(&rpc_fs_mutex);
return res;
}
// RPC超时和重试机制
static TEE_Result rpc_with_retry(uint32_t cmd, void *params)
{
TEE_Result res;
int retry_count = 0;
do {
res = tee_fs_rpc_call(cmd, params);
if (res == TEE_ERROR_COMMUNICATION &&
retry_count < MAX_RPC_RETRIES) {
// 等待后重试
msleep(RPC_RETRY_DELAY_MS);
retry_count++;
continue;
}
break;
} while (retry_count < MAX_RPC_RETRIES);
return res;
}
死锁预防机制
锁排序策略
Copy
// 锁获取顺序:避免死锁
// 1. 全局锁 (pobj_list_mutex, rpmb_mutex)
// 2. 对象锁 (tee_pobj.lock)
// 3. 文件锁 (ree_fs_file.lock)
// 4. 哈希树锁 (htree.tree_lock)
// 5. 缓存锁 (cache_entry.lock)
static TEE_Result acquire_multiple_locks(struct tee_pobj *po,
struct ree_fs_file *f)
{
// 按固定顺序获取锁
mutex_lock(&po->lock);
if (f) {
if (mutex_trylock(&f->lock) != 0) {
// 避免死锁:释放已获取的锁
mutex_unlock(&po->lock);
// 重新按顺序获取
msleep(1); // 短暂延迟
mutex_lock(&po->lock);
mutex_lock(&f->lock);
}
}
return TEE_SUCCESS;
}
超时机制
Copy
// 带超时的锁获取
static TEE_Result mutex_lock_timeout(struct mutex *m, uint32_t timeout_ms)
{
uint64_t deadline = get_current_time_ms() + timeout_ms;
while (get_current_time_ms() < deadline) {
if (mutex_trylock(m) == 0) {
return TEE_SUCCESS;
}
msleep(1); // 短暂等待
}
return TEE_ERROR_BUSY;
}
性能优化
读写分离优化
Copy
// 读写锁实现
struct rwlock {
struct mutex lock;
struct condvar readers_done;
struct condvar writer_done;
int reader_count;
bool writer_active;
};
static void rwlock_read_lock(struct rwlock *rw)
{
mutex_lock(&rw->lock);
// 等待写者完成
while (rw->writer_active) {
condvar_wait(&rw->writer_done, &rw->lock);
}
rw->reader_count++;
mutex_unlock(&rw->lock);
}
static void rwlock_write_lock(struct rwlock *rw)
{
mutex_lock(&rw->lock);
// 等待所有读者和写者完成
while (rw->reader_count > 0 || rw->writer_active) {
if (rw->reader_count > 0) {
condvar_wait(&rw->readers_done, &rw->lock);
} else {
condvar_wait(&rw->writer_done, &rw->lock);
}
}
rw->writer_active = true;
mutex_unlock(&rw->lock);
}
无锁数据结构
Copy
// 原子操作的引用计数
struct atomic_refcount {
volatile uint32_t count;
};
static uint32_t atomic_inc_ref(struct atomic_refcount *ref)
{
return __atomic_add_fetch(&ref->count, 1, __ATOMIC_SEQ_CST);
}
static uint32_t atomic_dec_ref(struct atomic_refcount *ref)
{
return __atomic_sub_fetch(&ref->count, 1, __ATOMIC_SEQ_CST);
}
// 无锁队列实现(用于日志记录)
struct lockfree_queue {
volatile uint32_t head;
volatile uint32_t tail;
void *items[QUEUE_SIZE];
};
static bool queue_enqueue(struct lockfree_queue *q, void *item)
{
uint32_t current_tail = q->tail;
uint32_t next_tail = (current_tail + 1) % QUEUE_SIZE;
if (next_tail == q->head) {
return false; // 队列满
}
q->items[current_tail] = item;
__atomic_store_n(&q->tail, next_tail, __ATOMIC_RELEASE);
return true;
}
并发测试和验证
并发压力测试
位置:optee_test/ta/storage/
Copy
// 多线程存储测试
static void test_concurrent_storage_access(void)
{
const int NUM_THREADS = 8;
const int OPERATIONS_PER_THREAD = 1000;
struct thread_params {
int thread_id;
int operation_count;
TEE_Result result;
} params[NUM_THREADS];
// 启动多个线程
for (int i = 0; i < NUM_THREADS; i++) {
params[i].thread_id = i;
params[i].operation_count = OPERATIONS_PER_THREAD;
create_thread(storage_worker_thread, ¶ms[i]);
}
// 等待所有线程完成
wait_all_threads();
// 验证结果
for (int i = 0; i < NUM_THREADS; i++) {
assert(params[i].result == TEE_SUCCESS);
}
}
static void storage_worker_thread(void *arg)
{
struct thread_params *p = (struct thread_params *)arg;
for (int i = 0; i < p->operation_count; i++) {
// 随机存储操作
switch (rand() % 4) {
case 0:
p->result = test_create_object(p->thread_id, i);
break;
case 1:
p->result = test_read_object(p->thread_id, i);
break;
case 2:
p->result = test_write_object(p->thread_id, i);
break;
case 3:
p->result = test_delete_object(p->thread_id, i);
break;
}
if (p->result != TEE_SUCCESS &&
p->result != TEE_ERROR_ITEM_NOT_FOUND) {
break;
}
}
}
死锁检测测试
Copy
// 死锁检测测试
static void test_deadlock_detection(void)
{
// 创建可能导致死锁的场景
struct test_context ctx1, ctx2;
// 线程1:先锁A再锁B
create_thread(deadlock_thread1, &ctx1);
// 线程2:先锁B再锁A
create_thread(deadlock_thread2, &ctx2);
// 设置超时
msleep(DEADLOCK_TIMEOUT_MS);
// 检查是否发生死锁
if (ctx1.completed && ctx2.completed) {
// 正常完成
assert(true);
} else {
// 可能发生死锁,终止测试
kill_thread(ctx1.thread);
kill_thread(ctx2.thread);
assert(false); // 死锁检测失败
}
}
最佳实践建议
1. 锁使用原则
- 最小锁范围: 尽可能缩小临界区
- 锁排序: 按固定顺序获取多个锁
- 避免嵌套: 减少锁的嵌套层次
- 及时释放: 尽快释放不需要的锁
2. 性能优化建议
- 读写分离: 对读多写少的场景使用读写锁
- 细粒度锁: 使用更细粒度的锁减少争用
- 无锁编程: 在可能的情况下使用原子操作
- 缓存友好: 考虑CPU缓存一致性
3. 错误处理
- 超时机制: 为锁操作设置合理超时
- 错误传播: 正确传播并发相关错误
- 资源清理: 确保异常情况下的资源释放
- 状态恢复: 提供并发错误后的状态恢复
设计思想与原理分析
1. 多层次并发控制架构的设计理念
为什么选择分层架构?
Copy
设计动机分析:
┌─────────────────────────────────────────────────────────────┐
│ TA Session Layer │ 业务逻辑隔离,防止TA间互相干扰 │
├─────────────────────────────────────────────────────────────┤
│ Object Handle Layer │ 用户接口保护,维护API语义一致性 │
├─────────────────────────────────────────────────────────────┤
│ Persistent Object │ 数据一致性保护,防止对象状态冲突 │
├─────────────────────────────────────────────────────────────┤
│ Backend Lock Layer │ 存储介质保护,处理硬件并发限制 │
├─────────────────────────────────────────────────────────────┤
│ Hash Tree Layer │ 完整性验证,确保元数据原子更新 │
├─────────────────────────────────────────────────────────────┤
│ RPC Layer │ 通信序列化,避免REE/TEE竞争条件 │
└─────────────────────────────────────────────────────────────┘
- 关注点分离: 每层解决特定的并发问题,避免单点复杂性
- 渐进式保护: 从粗粒度到细粒度的递进式锁定策略
- 故障隔离: 某层的问题不会传播到其他层
- 可扩展性: 便于添加新的并发控制机制
2. 引用计数机制的设计思想
为什么选择引用计数而非传统锁?
Copy
// 传统锁方式的问题
struct traditional_approach {
struct mutex global_lock; // 全局锁导致性能瓶颈
bool locked; // 简单状态无法处理复杂共享
};
// 引用计数的优势
struct reference_counting {
uint32_t refcount; // 精确跟踪使用者数量
struct mutex fine_lock; // 细粒度锁减少争用
uint32_t access_flags; // 灵活的访问控制
};
- 性能优化: 允许多个读者同时访问同一对象
- 资源管理: 自动清理无引用的对象,防止内存泄漏
- 访问语义: 支持复杂的共享访问模式(只读/读写/独占)
- 生命周期管理: 清晰的对象生命周期控制
引用计数的设计挑战与解决方案
Copy
// 挑战1: ABA问题
// 问题:对象可能在检查和使用之间被释放并重新分配
// 解决:generation计数 + 双重检查
struct safe_pobj_ref {
struct tee_pobj *po;
uint32_t generation; // 代数标记防止ABA
};
static struct tee_pobj *safe_pobj_get(const TEE_UUID *uuid, void *obj_id)
{
struct tee_pobj *po;
uint32_t gen1, gen2;
do {
gen1 = read_generation();
po = tee_pobj_get(uuid, obj_id);
gen2 = read_generation();
} while (gen1 != gen2 || !po); // 重试直到稳定状态
return po;
}
// 挑战2: 引用计数溢出
// 解决:饱和计数 + 降级保护
static TEE_Result safe_ref_inc(struct tee_pobj *po)
{
mutex_lock(&po->lock);
if (po->refcount >= REFCOUNT_MAX) {
// 降级为独占访问模式
mutex_unlock(&po->lock);
return TEE_ERROR_ACCESS_DENIED;
}
po->refcount++;
mutex_unlock(&po->lock);
return TEE_SUCCESS;
}
3. 写时复制(COW)机制的设计原理
为什么选择COW而非in-place更新?
传统问题分析:Copy
In-place Update 的问题:
┌─────────────────────────────────────────────────────────────┐
│ 写操作期间读者看到不一致状态 -> 数据竞争 │
│ 写失败无法回滚 -> 数据损坏 │
│ 大粒度锁导致性能下降 -> 可扩展性差 │
└─────────────────────────────────────────────────────────────┘
COW 的优势:
┌─────────────────────────────────────────────────────────────┐
│ 读者始终看到一致的快照 -> 无读写竞争 │
│ 原子提交/回滚 -> 数据安全 │
│ 读操作无锁 -> 高并发性能 │
└─────────────────────────────────────────────────────────────┘
Copy
// COW实现的关键设计
struct cow_context {
void *original_data; // 原始数据指针
void *working_copy; // 工作副本
bool copy_created; // 延迟复制标志
struct mutex cow_lock; // COW操作锁
uint32_t reader_epoch; // 读者时期标记
};
// 延迟复制策略:只在真正需要写时才复制
static TEE_Result cow_prepare_write(struct cow_context *ctx)
{
mutex_lock(&ctx->cow_lock);
if (!ctx->copy_created) {
// 延迟复制:节省内存和时间
ctx->working_copy = create_copy(ctx->original_data);
if (!ctx->working_copy) {
mutex_unlock(&ctx->cow_lock);
return TEE_ERROR_OUT_OF_MEMORY;
}
ctx->copy_created = true;
}
mutex_unlock(&ctx->cow_lock);
return TEE_SUCCESS;
}
- 延迟分配: 只在实际需要时创建副本
- 增量复制: 只复制被修改的页面/块
- 垃圾回收: 及时清理未使用的副本
4. RPMB序列化访问的设计考量
为什么RPMB必须序列化?
硬件限制分析:Copy
RPMB Hardware Constraints:
┌─────────────────────────────────────────────────────────────┐
│ 单一认证序列 -> 并发认证导致状态混乱 │
│ 写计数器机制 -> 并发写导致计数器同步失败 │
│ 有限的命令队列 -> 硬件无法处理并发请求 │
│ 认证MAC依赖 -> 并发操作破坏MAC验证链 │
└─────────────────────────────────────────────────────────────┘
Copy
// 方案1: 全局序列化(当前实现)
struct rpmb_serialization {
struct mutex global_rpmb_lock; // 简单但性能受限
// 优点:实现简单,绝对安全
// 缺点:性能瓶颈,无法充分利用RPMB带宽
};
// 方案2: 操作类型序列化(未来优化)
struct rpmb_operation_queue {
struct mutex read_lock; // 读操作可以并行
struct mutex write_lock; // 写操作必须串行
struct condvar write_complete; // 写完成通知
// 优点:读并行提高性能
// 缺点:复杂度增加,需要处理读写依赖
};
RPMB缓存设计的安全考量
Copy
// 缓存一致性的设计挑战
struct rpmb_cache_design {
// 挑战1: 缓存失效时机
uint32_t write_counter; // 跟踪RPMB写计数器
bool cache_valid; // 缓存有效性标记
// 挑战2: 并发缓存更新
struct mutex cache_update_lock; // 缓存更新锁
volatile bool update_in_progress; // 更新进行标志
// 挑战3: 读写一致性
struct rwlock cache_rwlock; // 缓存读写锁
};
// 安全的缓存失效策略
static void invalidate_rpmb_cache_safe(uint16_t start_block, uint16_t count)
{
rwlock_write_lock(&cache_rwlock);
// 原子地失效整个范围
for (uint16_t i = start_block; i < start_block + count; i++) {
struct cache_entry *entry = find_cache_entry(i);
if (entry) {
entry->valid = false;
entry->write_pending = false;
}
}
// 强制缓存同步
memory_barrier();
rwlock_write_unlock(&cache_rwlock);
}
5. 读写锁vs互斥锁的选择策略
性能权衡分析
Copy
// 选择决策矩阵
struct lock_selection_criteria {
float read_write_ratio; // 读写比例
int contention_level; // 争用程度
int critical_section_size; // 临界区大小
bool priority_inversion_risk; // 优先级反转风险
};
// 读多写少场景:优选读写锁
static void read_heavy_scenario_analysis()
{
/*
读写比例 > 10:1 时,读写锁优势明显:
- 读操作并行度大幅提升
- 写操作延迟略有增加(可接受)
- 内存开销适中(额外的条件变量)
*/
}
// 写多读少场景:优选互斥锁
static void write_heavy_scenario_analysis()
{
/*
读写比例 < 3:1 时,互斥锁更优:
- 避免读写锁的复杂状态机开销
- 减少上下文切换开销
- 更好的缓存局部性
*/
}
读写锁的实现优化
Copy
// 读写锁的公平性设计
struct fair_rwlock {
struct mutex lock;
struct condvar readers_done;
struct condvar writers_done;
int reader_count;
int writer_count; // 等待写者数量
bool writer_active;
bool prefer_writers; // 写者优先标志
};
// 防止写者饥饿的策略
static void rwlock_write_lock_fair(struct fair_rwlock *rw)
{
mutex_lock(&rw->lock);
rw->writer_count++; // 标记有写者等待
// 等待当前读者完成,并阻止新读者
while (rw->reader_count > 0 || rw->writer_active) {
rw->prefer_writers = true; // 设置写者优先
condvar_wait(&rw->writers_done, &rw->lock);
}
rw->writer_count--;
rw->writer_active = true;
mutex_unlock(&rw->lock);
}
6. 无锁数据结构的使用策略
为什么在某些场景选择无锁?
无锁编程的适用场景:Copy
// 场景1: 高频率的简单操作
struct atomic_counters {
volatile uint32_t ref_count; // 引用计数
volatile uint64_t access_time; // 访问时间戳
volatile uint32_t error_count; // 错误计数
// 优势:避免锁开销,提高响应速度
// 劣势:只适用于简单的原子操作
};
// 场景2: 日志记录和统计
struct lockfree_statistics {
volatile uint64_t read_ops; // 读操作计数
volatile uint64_t write_ops; // 写操作计数
volatile uint64_t total_bytes; // 总字节数
// 优势:不影响主要业务逻辑性能
// 劣势:可能丢失少量统计信息(可接受)
};
- 操作原子性: 确保单个操作的原子性
- 内存排序: 正确使用内存屏障
- ABA预防: 使用版本号或指针标记
- 降级策略: 提供锁定版本作为后备
Copy
// 内存排序的正确使用
struct lockfree_queue_safe {
volatile uint32_t head;
volatile uint32_t tail;
void* volatile items[QUEUE_SIZE];
volatile uint32_t generation[QUEUE_SIZE]; // ABA防护
};
static bool safe_enqueue(struct lockfree_queue_safe *q, void *item)
{
uint32_t current_tail, next_tail;
uint32_t current_gen;
do {
current_tail = __atomic_load_n(&q->tail, __ATOMIC_ACQUIRE);
next_tail = (current_tail + 1) % QUEUE_SIZE;
if (next_tail == __atomic_load_n(&q->head, __ATOMIC_ACQUIRE)) {
return false; // 队列满
}
current_gen = __atomic_load_n(&q->generation[current_tail],
__ATOMIC_ACQUIRE);
} while (!__atomic_compare_exchange_n(&q->tail, ¤t_tail, next_tail,
false, __ATOMIC_RELEASE,
__ATOMIC_ACQUIRE));
// 安全地存储数据
__atomic_store_n(&q->items[current_tail], item, __ATOMIC_RELEASE);
__atomic_store_n(&q->generation[current_tail], current_gen + 1,
__ATOMIC_RELEASE);
return true;
}
7. 死锁预防的设计哲学
银行家算法在TEE中的应用
Copy
// 资源分配的安全状态检查
struct resource_allocation_state {
int max_demand[MAX_TAS][MAX_RESOURCES]; // 最大需求矩阵
int allocated[MAX_TAS][MAX_RESOURCES]; // 已分配矩阵
int available[MAX_RESOURCES]; // 可用资源向量
};
// 安全状态检查:防止死锁
static bool is_safe_state(struct resource_allocation_state *state)
{
bool finished[MAX_TAS] = {false};
int work[MAX_RESOURCES];
// 初始化工作向量
memcpy(work, state->available, sizeof(work));
// 寻找安全序列
for (int count = 0; count < MAX_TAS; count++) {
bool found = false;
for (int ta = 0; ta < MAX_TAS; ta++) {
if (!finished[ta] && can_finish(state, ta, work)) {
// 模拟TA完成,释放资源
for (int res = 0; res < MAX_RESOURCES; res++) {
work[res] += state->allocated[ta][res];
}
finished[ta] = true;
found = true;
break;
}
}
if (!found) {
return false; // 无法找到安全序列
}
}
return true; // 存在安全序列
}
锁排序的设计原理
Copy
// 层次化锁排序:避免循环等待
enum lock_hierarchy {
LOCK_LEVEL_GLOBAL = 1000, // 全局资源锁
LOCK_LEVEL_SESSION = 900, // 会话级锁
LOCK_LEVEL_OBJECT = 800, // 对象级锁
LOCK_LEVEL_FILE = 700, // 文件级锁
LOCK_LEVEL_CACHE = 600, // 缓存级锁
LOCK_LEVEL_LOGGING = 100 // 日志级锁(最低优先级)
};
struct hierarchical_lock {
struct mutex lock;
enum lock_hierarchy level;
const char *name; // 调试用锁名
uint32_t owner_thread; // 锁持有者
};
// 安全的锁获取:检查层次顺序
static TEE_Result acquire_lock_safe(struct hierarchical_lock *hlock)
{
uint32_t current_thread = get_current_thread_id();
enum lock_hierarchy current_level = get_thread_max_lock_level(current_thread);
// 检查锁层次顺序
if (hlock->level >= current_level) {
EMSG("Lock ordering violation: trying to acquire %s (level %d) "
"while holding level %d", hlock->name, hlock->level, current_level);
return TEE_ERROR_BAD_STATE;
}
mutex_lock(&hlock->lock);
hlock->owner_thread = current_thread;
update_thread_lock_level(current_thread, hlock->level);
return TEE_SUCCESS;
}
8. 性能vs安全性的权衡哲学
设计权衡矩阵
Copy
// 不同场景的权衡策略
struct security_performance_tradeoff {
enum scenario_type {
CRITICAL_SECURITY, // 安全关键:RPMB存储
BALANCED, // 平衡考虑:REE存储
PERFORMANCE_FIRST // 性能优先:临时数据
} scenario;
struct tradeoff_params {
int lock_granularity; // 锁粒度(细粒度vs粗粒度)
bool enable_optimizations; // 是否启用性能优化
int timeout_ms; // 锁超时时间
bool allow_speculation; // 是否允许推测执行
} params;
};
// 安全关键场景:最大化安全性
static void configure_critical_security(struct tradeoff_params *p)
{
p->lock_granularity = COARSE_GRAIN; // 粗粒度锁,减少竞争
p->enable_optimizations = false; // 禁用可能不安全的优化
p->timeout_ms = INFINITE; // 不设置超时,确保完成
p->allow_speculation = false; // 禁用推测执行
}
// 性能优先场景:最大化性能
static void configure_performance_first(struct tradeoff_params *p)
{
p->lock_granularity = FINE_GRAIN; // 细粒度锁,提高并行度
p->enable_optimizations = true; // 启用所有优化
p->timeout_ms = 100; // 短超时,避免长时间等待
p->allow_speculation = true; // 允许推测执行
}
自适应并发控制
Copy
// 动态调整并发策略
struct adaptive_concurrency_controller {
struct performance_metrics {
uint64_t avg_wait_time; // 平均等待时间
uint32_t contention_rate; // 争用率
uint32_t throughput; // 吞吐量
uint32_t error_rate; // 错误率
} metrics;
struct adaptation_state {
enum lock_strategy current_strategy; // 当前策略
uint32_t adaptation_threshold; // 自适应阈值
uint64_t last_adaptation_time; // 上次调整时间
} state;
};
// 基于运行时指标的策略调整
static void adapt_concurrency_strategy(struct adaptive_concurrency_controller *acc)
{
struct performance_metrics *m = &acc->metrics;
struct adaptation_state *s = &acc->state;
// 检测性能恶化
if (m->avg_wait_time > WAIT_TIME_THRESHOLD ||
m->contention_rate > CONTENTION_THRESHOLD) {
// 切换到更保守的策略
if (s->current_strategy == AGGRESSIVE_LOCKING) {
s->current_strategy = CONSERVATIVE_LOCKING;
DMSG("Switching to conservative locking due to high contention");
}
} else if (m->throughput < THROUGHPUT_THRESHOLD) {
// 切换到更激进的策略
if (s->current_strategy == CONSERVATIVE_LOCKING) {
s->current_strategy = AGGRESSIVE_LOCKING;
DMSG("Switching to aggressive locking for better performance");
}
}
s->last_adaptation_time = get_current_time();
}
总结
OP-TEE的GP存储并发控制机制具有以下特点:- 多层次保护: 从对象到后端的全方位并发控制
- 死锁预防: 通过锁排序和超时机制避免死锁
- 性能优化: 读写分离和细粒度锁提高并发性能
- 原子操作: 确保关键操作的原子性和一致性
- 全面测试: 完整的并发测试套件验证正确性
设计哲学总结
OP-TEE存储系统的并发控制设计体现了以下核心哲学:- 安全第一: 在性能和安全性冲突时,优先保证安全性
- 分层防护: 通过多层次的控制机制提供深度防御
- 渐进优化: 从简单可靠的设计开始,逐步引入性能优化
- 适应性设计: 支持不同场景的差异化并发策略
- 可验证性: 确保并发行为的可测试和可验证