想了解更多关于开源的内容,请访问:
51CTO 开源基础软件社区
https://ost.51cto.com
前言
本篇基于上一篇OpenHarmony智能开发套件[内核编程·上]继续介绍OpenHarmony在智能开发套件Hi3861上的内核编程学习。
内核编程
还不了解OpenHarmony内核的伙伴们可以参考上篇文章,上篇已经简单通俗地介绍了OpenHarmony的内核。
OpenHarmony智能开发套件内核编程·上
互斥锁
线程的状态
在介绍互斥锁之前,我们有必要去了解一下线程的状态,或者说线程的生命周期。避免伙伴们因为不够熟悉线程而对这个互斥锁的概念感到困难。
首先介绍一下线程的几个状态,他们分别有:
- 创建
创建线程,在OpenHarmony的源码中,线程的属性被封装成了一个名为”osThreadAttr_t“的结构体
typedef struct {
const char *name;
uint32_t attr_bits;
void *cb_mem;
uint32_t cb_size;
void *stack_mem;
uint32_t stack_size;
osPriority_t priority;
TZ_ModuleId_t tz_module;
uint32_t reserved;
} osThreadAttr_t;
- name:线程的名称。
- attr_bits:线程属性位。
- cb_mem:线程控制块的内存地址。
- cb_size:线程控制块的内存大小。
- stack_mem:线程栈的内存地址。
- stack_size:线程栈的大小。
- priority:线程的优先级。
- tz_module:线程所属的TrustZone模块。
- reserved:保留字段。
当我们创建一个线程的时候,系统就会为该线程分配所需要的资源,将线程加入到系统的线程调度队列中,此时线程已经处在就绪状态了。
- 就绪
线程一旦被创建,就会进入就绪状态,他表示我们完成的线程的创建(线程相关属性的初始化),但是并未运行,线程正在等待操作系统调度程序,将其调度运行起来。
- 运行
在上篇我们介绍了一个关于线程的API,他可以将就绪状态的线程加入到活跃线程组
osThreadId_t osThreadNew (osThreadFunc_t func, void *argument, const osThreadAttr_t *attr);
此时的线程将会占用部分cpu资源执行他的程序,直到程序结束,或者线程被阻塞,cpu被抢占等。
- 阻塞
线程阻塞,可以理解为线程无法继续往下执行,但时线程执行的程序不会直接退出,他会进入到等待的状态,直到相关资源被释放,线程可以继续执行。
- 终止
上篇我们介绍了线程终止的API
sStatus_t status = osThreadTerminate(osThreadId_t tid);
此时线程完成了自己的代码逻辑,我们方可主动调用该API彻底删除这个线程。
当然如果具体细分,其实不止这五个状态,但是了解了这五个状态就足够了。下面我们来聊聊什么是互斥锁。
互斥锁简介
互斥锁的应用场景时处理多线程,资源访问的问题。这里还是给大家举一个例子:动物园卖票。
在我们的程序设计中,往往会有共有资源,每个线程都可以进来访问这些资源。这里的共有资源就是我们的门票,一个售票口就是一个线程,当有人来窗口买票,我们的门票就会减少一张。当然一个动物园的流量时巨大的,我们不能只设立一个售票口,这样的效率是很低的。京东淘宝抢购秒杀也一样,我们必须设立多个窗口,在同一时刻为多个人处理业务。多线程解决了效率问题但也带来了安全隐患。
我们假设一个这样的场景,动物园仅剩一张门票,但是有两个在不同的窗口同时付了钱,当售票员为他们拿票的时候就会发现,少了一张票,两个人都付了钱都想要那张票,就陷入了一个死局,无奈动物园只能让他们都进去。但是在程序中体现的可能就是一个bug,动物园的门票剩余数为:-1。
if(count > 0){
count--;
}
售票的逻辑很简单,只要票数大于零,还有票能卖,那就在此基础上减掉一。问题就在于,两个线程同时在count = 1的时候通过了if判断,都执行的count–;那么就会出现了count = -1,也就是票数为负 的bug结果。
互斥锁的出现就很好地解决了一个问题,他能够阻止上面两个线程同时访问资源的同步行为,也就是说当一个线程进入这个if语句后,别的线程都不能进入。形象起来说,就像对这段代码加了锁,只有有钥匙的线程才能够访问它。
互斥锁API
通过对几个API的介绍,让大家知道怎么为一段代码加上互斥锁
- 创建互斥锁。
与线程的定义一样,互斥锁也封装成了一个结构体
typedef struct {
/** Mutex name */
const char *name;
/** Reserved attribute bits */
uint32_t attr_bits;
/** Memory for the mutex control block */
void *cb_mem;
/** Size of the memory for the mutex control block */
uint32_t cb_size;
} osMutexAttr_t;
- name:互斥锁的名称
- attr_bits:保留的属性位
- cb_mem:互斥锁控制块的内存
- cb_size:互斥锁控制块内存的大小
- 获取互斥锁的id。
osMutexId_t osMutexNew(const osMutexAttr_t *attr);
将我们上面定义的互斥锁属性传入函数,返回互斥锁的id
- 线程获取互斥锁
osStatus_t osMutexAcquire(osMutexId_t mutex_id, uint32_t timeout);
传入互斥锁id,设置我们的延迟时间,当线程获取到我们的互斥锁时,返回的状态是osOK。
下面是全部的状态码。
typedef enum {
/** Operation completed successfully */
osOK = 0,
/** Unspecified error */
osError = -1,
/** Timeout */
osErrorTimeout = -2,
/** Resource error */
osErrorResource = -3,
/** Incorrect parameter */
osErrorParameter = -4,
/** Insufficient memory */
osErrorNoMemory = -5,
/** Service interruption */
osErrorISR = -6,
/** Reserved. It is used to prevent the compiler from optimizing enumerations. */
osStatusReserved = 0x7FFFFFFF
} osStatus_t;
- 释放锁。
osStatus_t osMutexRelease(osMutexId_t mutex_id);
当我们的线程执行完业务后需要把锁释放出来,让别的线程获取锁,执行业务。当然这个过程是线程之间的竞争,一个线程可能一直得不到锁,一个线程也可能刚释放锁又获得锁,我们可以添加休眠操作,提高锁在各个线程间的分配。
其他API请参考:
Mutex API
操作
- 新建样例目录。
applications/sample/wifi-iot/app/mutex_demo - 新建源文件和gn文件。
applications/sample/wifi-iot/app/mutex_demo/mutex.c
applications/sample/wifi-iot/app/mutex_demo/BUILD.gn
- 源码编写。
因为已经介绍了主要的API,这里就直接给伙伴们上源码了。
#include
#include
#include "ohos_init.h"
#include "cmsis_os2.h"
// 模拟动物园门票数
static int count = 100;
// 售票业务线程
void outThread(void *args){
// 获取互斥锁
osMutexId_t *mid = (osMutexId_t *)args;
// 每个线程都在不停地买票
while(1){
// 获取锁,进入业务流程
if(osMutexAcquire(*mid, 100) == osOK){
if(count > 0){
count--;
// 设置提示信息
printf("[Mutex Test] Thread %s get a value, the less is %d.\r\n", osThreadGetName(osThreadGetId()), count);
} else {
// 告知这些线程已经没有门票卖了,线程结束
printf("[Mutex Test] The value is out!\r\n");
osThreadTerminate(osThreadGetId());
}
}
// 释放锁
osMutexRelease(*mid);
osDelay(5);
}
}
// 创建线程封装
osThreadId_t createThreads(char *name, osThreadFunc_t func, void *args){
osThreadAttr_t attr = {
name, 0, NULL, 0, NULL, 1024, osPriorityNormal, 0, 0
};
osThreadId_t tid = osThreadNew(func, args, &attr);
return tid;
}
// 主函数实现多线程的创建,执行买票业务
void mutexMain(void){
// 创建互斥锁
osMutexAttr_t attr = {0};
// 获取互斥锁的id
osMutexId_t mid = osMutexNew(&attr);
if(mid == NULL){
printf("[Mutex Test] Failed to create a mutex!\r\n");
}
// 创建多线程
osThreadId_t tid1 = createThreads("Thread_1", (osThreadFunc_t)outThread, &mid);
osThreadId_t tid2 = createThreads("Thread_2", (osThreadFunc_t)outThread, &mid);
osThreadId_t tid3 = createThreads("Thread_3", (osThreadFunc_t)outThread, &mid);
osDelay(1000);
}
// 测试线程
void MainTest(void){
osThreadId_t tid = createThreads("MainTest", (osThreadFunc_t)mutexMain, NULL);
}
APP_FEATURE_INIT(MainTest);
- 编写gn文件。
static_library("mutex_demo"){
sources = [
"mutex.c"
]
include_dirs = [
"//commonlibrary/utils_lite/include",
"//device/soc/hisilicon/hi3861v100/hi3861_adapter/kal/cmsis"
]
}
- 编写app目录下的gn文件。
结果展示
可能有的伙伴们看到这里不太清晰,会觉得这段代码真的上锁了吗?
if(osMutexAcquire(*mid, 100) == osOK){
if(count > 0){
count--;
printf("[Mutex Test] Thread %s get a value, the less is %d.\r\n", osThreadGetName(osThreadGetId()), count);
} else {
printf("[Mutex Test] The value is out!\r\n");
osThreadTerminate(osThreadGetId());
}
}
那么我们可以不使用互斥锁再次执行这段代码。
结果展示如下:
注:这里笔者还另外多加了3个线程,一共六个线程,可以看出来控制台的输出很混乱,当一个线程在执行输出指令时,另一个线程也插了进来执行输出指令所造成的,再看票数,也是出现了明显的问题。因此互斥锁在处理多线程问题时,起到了非常重要的作用。
可能有伙伴好奇,怎么没有负数票的出现,笔者作为学习者,代码能力也有限,可能写出来的案例并不是非常精确,仅供参考。
信号量
对大部分初学者而言,这又是一个新名词,什么是信号量?其实他跟我们上篇介绍的互斥锁很像。互斥锁是在多线程中允许一个线程访问资源,信号量是在多线程中允许多个线程访问资源。
初学者一定会感到困惑,为了解决多线程访问资源的风险我们限制只能有一个线程在某一时刻访问资源,现在这个信号量怎么有允许多个线程访问资源呢。笔者刚开始也比较困惑,结合一些案例理解后,也是明白了这样的设计初衷。实际上,信号量,互斥锁本就是两种不同的多形成同步运行机制,在特定的应用场景下,有特定的需求,而信号量,互斥锁可以满足不同的需求,具体是什么需求呢,举个例子给大家。
卖票,我们的确需要互斥锁解决多线程可能带来的错误,那么如果是验票呢,为了提高效率,我们开设多个入口同时验票且不会发生冲突,信号量就做到了限制线程数量访问资源的作用。如果我们不限制并发的数量,我们的程序占用资源可能会非常大,甚至崩溃,就像检票的入口没有被明确入口数量一样,门口的人们会乱成一片。
信号量API
- 创建信号量。
osSemaphoreId_t osSemaphoreNew(uint32_t max_count, uint32_t initial_count, const osSemaphoreAttr_t *attr);
参数解释:最大容量量,初始容纳量,信号量属性。
最大容纳量说明了,我们的资源最大能被多少线程访问。
初始容纳量说明了,我们当前实际能有多少线程访问资源,因为一个信号对应一个线程的许可。
返回值:信号量的id。
- 获取信号量。
osStatus_t osSemaphoreAcquire(osSemaphoreId_t semaphore_id, uint32_t timeout);
参数解释:信号量的id,等待时长。
返回值:状态码 (介绍很多遍了,就不说明了)。
我们往往会在timoeout处设置为 oswaitForever。
#define osWaitForever 0xFFFFFFFFU
这样我们的线程就会一直等,直到有信号量空出来被他获取,才执行后续的代码。
- 释放信号量。
osStatus_t osSemaphoreRelease(osSemaphoreId_t semaphore_id);
很简单,传入信号量的id,就可以释放一个信号量出来。
其他的API请参考:
Semaphore API
任务
有4个售票窗,2个检票口,每次会有4个人来买票,然后去检票,用互斥锁控制购票,信号量控制检票。
操作
- 新建样例目录。
applications/sample/wifi-iot/app/semaphore_demo。 - 新建源文件和gn文件。
applications/sample/wifi-iot/app/semaphore_demo/semaphore.c。
applications/sample/wifi-iot/app/semaphore_demo/BUILD.gn。
- 源码编写。
直接上源码了。
#include
#include
#include "ohos_init.h"
#include "cmsis_os2.h"
// 售票口 4
#define OUT_NUM 4
// 检票口 2
#define IN_NUM 2
// 信号量
osSemaphoreId_t sid;
// 待检票人数
static int people = 0;
// 售票业务
void outThread(void *args){
// 获取互斥锁
osMutexId_t *mid = (osMutexId_t *)args;
while(1){
if(osMutexAcquire(*mid, 100) == osOK){
// 卖一张票,带检票的人数就会加一位
people++;
printf("[SEMAPHORE TEST] out, people: %d.\r\n", people);
}
osMutexRelease(*mid);
osDelay(50);
}
}
// 检票业务
void inThread(void *args){
// 获取信号量
osSemaphoreAcquire(sid, osWaitForever);
while(1){
if(people > 0){
people--;
printf("[SEMAPHORE TEST] in, people: %d.\r\n", people);
}
osSemaphoreRelease(sid);
osDelay(100);
}
}
// 创建线程封装
osThreadId_t createThreads(char *name, osThreadFunc_t func, void *args){
osThreadAttr_t attr = {
name, 0, NULL, 0, NULL, 1024 * 2, osPriorityNormal, 0, 0
};
osThreadId_t tid = osThreadNew(func, args, &attr);
return tid;
}
// 主线程
void SemaphoreMain(void){
// 创建信号量
sid = osSemaphoreNew(IN_NUM, IN_NUM, NULL);
// 创建互斥锁
osMutexAttr_t attr = {0};
// 获取互斥锁的id
osMutexId_t mid = osMutexNew(&attr);
// 创建售票线程
for(int i = 0; i < OUT_NUM; i++){
createThreads("", (osThreadFunc_t)outThread, &mid);
}
// 创建检票线程
for(int i = 0; i < IN_NUM; i++){
createThreads("", (osThreadFunc_t)inThread, NULL);
}
}
// 测试函数
void MainTest(){
createThreads("MainTest", (osThreadFunc_t)SemaphoreMain, NULL);
}
APP_FEATURE_INIT(MainTest);
- 编写gn文件。
static_library("semaphore_demo"){
sources = [
"semaphore.c"
]
include_dirs = [
"//utils/native/lite/include",
]
}
- 编写app目录下的gn文件。
结果展示
大家可以加长检票业务的休眠时间,我们的检票口是两个,in的业务一定是两个一起执行的。
总之信号量和互斥锁是多线程管理中的重点,大家一定要好好体会他们的作用和区别。
消息队列
本篇的最后我们来介绍消息队列。队列相信大部分朋友都不陌生,是一种基本且常用的数据结构,这里笔者就不介绍队列的相关信息了。那么什么是消息队列呢?有什么应用场景呢。
消息队列也是多线程,高并发中的处理方式,大家可以理解为“同步入队,异步出队”。老样子从一个案例解释,网购秒杀。
在网购秒杀时,会有上万甚至上百万的流量涌入服务器,下单即可看作一个请求,向服务器请求获取某个商品。服务器处理,生成买家的订单号。当然强大服务器的也无法在同一时刻支持如此多的请求,并且商品的数量也不足被所有人购买,这个时候,我们的消息队列就会同步接受大家的请求,所有的请求都会被压进一个队列中,服务器从队列中依次获取消息,确保不会因为资源被占用而导致系统崩溃。
#### 消息队列API
- 创建消息队列。
osMessageQueueId_t osMessageQueueNew (uint32_t msg_count, uint32_t msg_size, const osMessageQueueAttr_t *attr);
参数说明:
- msg_count:消息队列中的消息数量。
- msg_size:消息队列中每个消息的大小,通常我们的消息会用一个结构体来自定义消息的内容
- attr:指向消息队列属性。
该函数的返回值是osMessageQueueId_t类型,表示消息队列的ID。如果创建消息队列失败,函数将返回NULL。
- 向消息队列加入消息。
osStatus_t osMessageQueuePut (osMessageQueueId_t mq_id, const void *msg_ptr, uint8_t msg_prio, uint32_t timeout);
参数说明:
- mq_id:消息队列的ID,通过调用osMessageQueueNew函数获得。
- msg_ptr:指向要放入消息队列的消息缓冲区的指针,也就是我们将结构体的指针转递给函数
- msg_prio:消息的优先级。
- timeout:延时,使用osWaitForever,线程就会一直等待直到队列中有空余的位置。
该函数的返回值是osStatus_t类型,表示函数执行的结果。
- 从消息队列中接受消息。
osStatus_t osMessageQueueGet (osMessageQueueId_t mq_id, void *msg_ptr, uint8_t *msg_prio, uint32_t timeout);
参数说明:
- mq_id:消息队列的ID,通过调用osMessageQueueNew函数获得。
- msg_ptr:指向存储从消息队列中获取的消息的缓冲区的指针。
- msg_prio:指向存储从消息队列中获取的消息的优先级的缓冲区的指针。
- timeout:延时,使用osWaitForever,线程就会一直等待直到队列中有消息了。
该函数的返回值是osStatus_t类型,表示函数执行的结果。
- 删除消息队列。
osStatus_t osMessageQueueDelete (osMessageQueueId_t mq_id);
参数说明:
- mq_id:消息队列的ID,通过调用osMessageQueueNew函数获得。
该函数的返回值是osStatus_t类型,表示函数执行的结果。
其他的API请参考:
MessageQueue API
任务
模拟抢购秒杀,假设我们有10个线程,15个大小的消息队列,5件商品。
操作
- 新建样例目录。
applications/sample/wifi-iot/app/queue_demo。 - 新建源文件和gn文件。
applications/sample/wifi-iot/app/queue_demo/queue.c。
applications/sample/wifi-iot/app/queue_demo/BUILD.gn。
- 编写源码。
直接上源码。
#include
#include
#include "ohos_init.h"
#include "cmsis_os2.h"
// 定义消息队列的大小
#define QUEUE_SIZE 15
// 定义请求数量
#define REQ_SIZE 10
// 定义消息的结构
typedef struct{
osThreadId_t tid;
} message_queue;
// 创建消息队列id
osMessageQueueId_t qid;
// 模拟发送业务
void sendThread(void){
// 定义一个消息结构
message_queue sentry;
sentry.tid = osThreadGetId();
osDelay(100);
// 消息入队
osMessageQueuePut(qid, (const void*)&sentry, 0, osWaitForever);
// 设置提示信息
printf("[MESSAGEQUEUE TEST] %d send a message.\r\n", sentry.tid);
}
// 模拟处理业务
void receiverThread(void){
// 定义一个消息结构
message_queue rentry;
int less = 5;
while(less > 0){
osMessageQueueGet(qid, (void *)&rentry, NULL, osWaitForever);
less--;
printf("[MESSAGEQUEUE TEST] %d get a product, less = %d.\r\n", rentry.tid, less);
osDelay(5);
}
printf("[MESSAGEQUEUE TEST] over!\r\n");
}
// 创建线程封装
osThreadId_t createThreads(char *name, osThreadFunc_t func, void *args){
osThreadAttr_t attr = {
name, 0, NULL, 0, NULL, 1024 * 2, osPriorityNormal, 0, 0
};
osThreadId_t tid = osThreadNew(func, args, &attr);
return tid;
}
// 主线程
void MessageQueueMain(void){
// 创建一个消息队列
qid = osMessageQueueNew(QUEUE_SIZE, sizeof(message_queue), NULL);
// 创建发送线程
for(int i = 0; i < REQ_SIZE; i++){
createThreads("", (osThreadFunc_t)sendThread, NULL);
}
osDelay(5);
// 创建接收线程
createThreads("", (osThreadFunc_t)receiverThread, NULL);
osDelay(500);
// 删除消息队列
osMessageQueueDelete(qid);
}
// 测试函数
void MainTest(){
createThreads("MainTest", (osThreadFunc_t)MessageQueueMain, NULL);
}
APP_FEATURE_INIT(MainTest);
- 编写gn。
static_library("queue_demo"){
sources = [
"queue.c",
]
include_dirs = [
"//utils/native/lite/include",
]
}
- 编写app目录下的gn。
结果展示
因为线程创建是循环创建的,先创建的线程就优先发送了请求,可以看的出来,前五个线程抢到了商品。如果线程可以同时发送请求,争抢入队的时机,模拟将会更加准确一些,这里只是简单的模拟。
结束语
至此内核的基础内容就给伙伴们介绍完了,内核作为一个系统的底层起到了相当重要的作用,大家要好好体会,希望能帮助到学习OpenHarmony的伙伴们。
想了解更多关于开源的内容,请访问:
51CTO 开源基础软件社区
https://ost.51cto.com