在开发操作系统前,需要先弄清楚数据同步的问题。避免出现不可预知的错误。
首先我们需要先弄清楚并发操作中数据不同步的原因,然后再学习解决方法。
非预期结果的全局变量
代码如下:
int a = 0;
void interrupt_handle()
{
a++;
}
void thread_func()
{
a++;
}
通常编译器会把a++翻译成三条指令:
- 将a加载到某个寄存器中。
- 这个寄存器加1。
- 把这个寄存器写回内存。
如果出现这样一个情况:thread_func刚执行完第二条指令就发生了中断,CPU执行完中断返回后继续执行第三条指令,最终a的结果为1,这显然是错误的。看下表:

这种错误是因为:一个逻辑上是一个整体的操作实际上对应多条指令,造成执行一半的途中进行中断切换,造成了非预期的结果。
接下来就是解决方案了。
一:原子操作 拿下单体变量
解决的一种思路就是将a++变成原子操作,要么不执行,要么全部执行。 x86平台支持很多原子操作,我们只需要直接应用这些指令,用汇编代码写出对应的原子操作函数即可。 需要了解一下GCC嵌入式汇编。
实现代码如下:
// 定义一个原子类型
typedef struct s_ATOMIC {
volatile s32_t a_count; //在变量前加上volatile,是为了禁止编译器优化,使其每次都从内存中加载变量
}atomic_t;
// 原子读
static inline s32_t atomic_read(const atomic_t *v) {
// x86平台取地址是原子操作
return (*(volatile u32_t*)&(v)->a_count);
}
// 原子写
static inline void atomic_write(atomic_t *v, int i) {
// x86平台把一个值写入地址也是原子的
v->a_count = i;
}
// 原子加上一个整数
static inline void atomic_add(int i, atomic_t *v) {
__asm__ __volatile__("lock;" "addl %1, %0"
: "+m"(v->a_count)
: "ir"(i));
}
//原子减去一个整数
static inline void atomic_sub(int i, atomic_t *v){
__asm__ __volatile__("lock;" "subl %1,%0"
: "+m" (v->a_count)
: "ir" (i));
}
//原子加1
static inline void atomic_inc(atomic_t *v){
__asm__ __volatile__("lock;" "incl %0"
: "+m" (v->a_count));
}
//原子减1
static inline void atomic_dec(atomic_t *v){
__asm__ __volatile__("lock;" "decl %0"
: "+m" (v->a_count));
}以上代码中,加lock前缀的addl, subl, incl, decl指令都是原子操作,lock表示锁定总线。
利用原子操作,我们可以将代码改写为:
atomic_t a = {0};
void interupt_handle() {
atomic_inc(&a);
}
void thread_func() {
atomic_inc(&a);
}Note
原子操作适合单个变量全局数据。例如全局计数器、标志变量。对于有几百个字节的数据结构或者包含不同类型的数据类型,原子操作时无法解决的
二:中断控制 搞定复杂变量
中断是CPU响应外部事件的重要机制。 但是如果中断代码中操作了其它代码的数据,这就需要相应的控制机制了。
对于需要保证原子性的一系列操作,我们通过开关中断来保证在执行过程中不会被打断。
在x86 CPU上关闭、开启中断的指令为cli、sti。通过对CPU的EFLAGS寄存器的IF位进行清楚和置位,CPU通过此位决定是否响应中断信号。
这两条指令需要Ring0权限才能执行。
//关闭中断
void hal_cli()
{
__asm__ __volatile__("cli": : :"memory");
}
//开启中断
void hal_sti()
{
__asm__ __volatile__("sti": : :"memory");
}
//使用场景
void foo()
{
hal_cli();
//操作数据……
hal_sti();
}
void bar()
{
hal_cli();
//操作数据……
hal_sti();
}但是这样的话,hal_cli()和hal_sti()无法嵌套使用。为了解决这个问题,我们要对这两个函数进行修改。 在关闭中断函数中先保存EFLAGS寄存器,然后执行cli指令,开启中断函数中直接恢复之前保存的EFLAGS寄存器即可。
typedef u32_t cpuflg_t;
static inline void hal_save_flags_cli(cpuflg_t* flags)
{
__asm__ __volatile__(
"pushfl \t\n" //把eflags寄存器压入当前栈顶
"cli \t\n" //关闭中断
"popl %0 \t\n"//把当前栈顶弹出到flags为地址的内存中
: "=m"(*flags)
:
: "memory"
);
}
static inline void hal_restore_flags_sti(cpuflg_t* flags)
{
__asm__ __volatile__(
"pushl %0 \t\n"//把flags为地址处的值寄存器压入当前栈顶
"popfl \t\n" //把当前栈顶弹出到eflags寄存器中
:
: "m"(*flags)
: "memory"
);
}hal_restore_flags_sti()是否开启中断完全依赖于上一次eflags寄存器中的值。
其实,直接通过维持堆栈平衡也可以实现,没有必要引入一个中间变量。
Note
中断控制。适合同步数据很多并且单核心的CPU。因为单核心的CPU只要控制全局中断的开关就能够保证代码流的原子性
三:自旋锁 协调多核心CPU
前面的中断控制对于单CPU来说是解决了问题,因为单CPU只有一条代码流并且只能通过中断来切换到另一条代码流,所以通过中断控制就能够安全操作全局数据。
但是对于多核心的CPU,同一时刻下有多条代码流,控制中断只能控制本地CPU的中断,无法控制其它CPU核心的中断,于是就有了自旋锁这种机制来维护多核心CPU下的全局数据。
自旋锁的原理如下:
- 读取锁变量。判断是否加锁,如果没有加锁则加锁,然后返回表示加锁成功,否则执行下一步。
- 已经加锁了,那么返回第一步。
也就是说,会如果没有加锁成功,那么会一直在这里循环,直到加锁成功,这也是自旋锁名称的由来。

Note
这个算法正确执行前提是,读取锁变量和判断锁变量是否加锁的操作是原子执行的。不然,在CPU0读取锁变量后,CPU1也读取锁变量并判断未加锁执行加锁,然后CPU0也判断未加锁执行加锁,就会导致出错。因此读取和判断是要具有原子性的。
typedef struct {
volatile u32_t lock; //volatile可以防止编译器优化,保证其它代码始终从内存加载lock变量的值
}spinlock_t;
// 初始化锁
static inline void x86_spin_lock_init(spinlock_t *lock) {
lock->lock = 0;
}
//加锁函数
static inline void x86_spin_lock(spinlock_t * lock){
__asm__ __volatile__ (
"1: \n"
"lock; xchg %0, %1 \n"//把值为1的寄存器和lock内存中的值进行交换
"cmpl $0, %0 \n" //用0和交换回来的值进行比较
"jnz 2f \n" //不等于0则跳转后面2标号处运行
"jmp 3f \n" //若等于0则跳转后面3标号处返回
"2: \n"
"cmpl $0, %1 \n"//用0和lock内存中的值进行比较
"jne 2b \n"//若不等于0则跳转到前面2标号处运行继续比较
"jmp 1b \n"//若等于0则跳转到前面1标号处运行,交换并加锁
"3: \n" : : "r"(1), "m"(*lock));
}
//解锁函数
static inline void x86_spin_unlock(spinlock_t * lock){
__asm__ __volatile__(
"movl $0, %0\n"//解锁把lock内存中的值设为0就行
: : "m"(*lock));}这里介绍一下加锁的实现逻辑:
- 通过xchg将值为1的寄存器与内存中的锁变量的值进行交换,这样对于所有的CPU核心来说,内存中的锁变量都为1,表示加锁。
- 判断寄存器的值,如果寄存器的值为0,表示加锁成功,可以返回。否则执行下一步。
- 寄存器的值为1,表示已经有其它CPU核心获取到锁了,所以当前CPU获取锁失败,执行下一步。
- 不断判断内存中的锁变量是否为0,如果为0,那么表示自旋锁被让出,执行第一步,尝试重新获取锁。
Note
自旋锁有中断嵌套的问题,如果获取自旋锁之后,发生中断,在中断处理程序中又获取自旋锁,那么就会导致发生死锁。
因此我们需要进行中断控制。完善的代码如下:
static inline void x86_spin_lock_disable_irq(spinlock_t * lock,cpuflg_t* flags)
{
__asm__ __volatile__(
"pushfq \n\t"
"cli \n\t"
"popq %0 \n\t"
"1: \n\t"
"lock; xchg %1, %2 \n\t"
"cmpl $0,%1 \n\t"
"jnz 2f \n\t"
"jmp 3f \n"
"2: \n\t"
"cmpl $0,%2 \n\t"
"jne 2b \n\t"
"jmp 1b \n\t"
"3: \n"
:"=m"(*flags)
: "r"(1), "m"(*lock));
}
static inline void x86_spin_unlock_enabled_irq(spinlock_t* lock,cpuflg_t* flags)
{
__asm__ __volatile__(
"movl $0, %0\n\t"
"pushq %1 \n\t"
"popfq \n\t"
:
: "m"(*lock), "m"(*flags));
}Note
自旋锁。适合同步数据多,CPU为多核心,同步资源不需要长时间等待。
四:信号量
原子操作和自旋锁都不适合长时间等待的情况,很多资源获取的时候通常都需要等待,如果使用自旋锁来同步这种资源,会造成CPU资源的浪费。
为了同步需要长时间等待的资源并避免CPU资源的浪费,出现了信号量的同步机制。
如果一个代码执行流要获取一个需要长时间等待的资源,我们完全没有必要循环这个程序直到资源获取成功,而是执行其它代码执行流,直到有可用资源的时候,再考虑执行这个代码流。
也就是三个问题:等待、互斥、唤醒。
需要设计一种数据结构,至少需要一个变量来表示互斥,还有一个等待链,用于保持等待执行的代码流。
#define SEM_FLG_MUTEX 0
#define SEM_FLG_MULTI 1
#define SEM_MUTEX_ONE_LOCK 1
#define SEM_MULTI_LOCK 0
//等待链数据结构,用于挂载等待代码执行流(线程)的结构,里面有用于挂载代码执行流的链表和计数器变量,这里我们先不深入研究这个数据结构。
typedef struct s_KWLST
{
spinlock_t wl_lock;
uint_t wl_tdnr;
list_h_t wl_list;
}kwlst_t;
//信号量数据结构
typedef struct s_SEM
{
spinlock_t sem_lock;//维护sem_t自身数据的自旋锁
uint_t sem_flg;//信号量相关的标志
sint_t sem_count;//信号量计数值
kwlst_t sem_waitlst;//用于挂载等待代码执行流(线程)结构
}sem_t;因为使用信号量的数据的时候,要保证信号量的数据同步,为此我们借用自旋锁来进行同步,因此对信号量的数据的获取和修改并不需要长时间等待,所以正合适。
信号量的使用分为三个步骤:
- 获取信号量
- 执行代码流
- 释放信号量
获取信号量
- 首先对用户保护信号量自身的自旋锁sem_lock进行加锁。
- 对信号量sem_count执行减1操作,并检查其值是否为0.
- 如果上步检查的结果为小于0,那么就让进程进入等待状态并将其挂入sem_waitlst中,然后调度其它进程执行。否则信号量获取成功。
- 释放自旋锁。
//获取信号量
void krlsem_down(sem_t* sem)
{
cpuflg_t cpufg;
start_step:
krlspinlock_cli(&sem->sem_lock,&cpufg);
if(sem->sem_count<1)
{//如果信号量值小于1,则让代码执行流(线程)睡眠
krlwlst_wait(&sem->sem_waitlst);
krlspinunlock_sti(&sem->sem_lock,&cpufg);
krlschedul();//切换代码执行流,下次恢复执行时依然从下一行开始执行,所以要goto开始处重新获取信号量
goto start_step;
}
sem->sem_count--;//信号量值减1,表示成功获取信号量
krlspinunlock_sti(&sem->sem_lock,&cpufg);
return;
}释放信号量
- 先对用户保护信号量本身的自旋锁sem_lock加锁。
- 对信号量sem_count执行“加1”操作,并检查是否大于0。
- 上步的检查结果大于0,就唤醒sem_waitlst中的进程,然后进行进程调度
- 释放自旋锁
//释放信号量
void krlsem_up(sem_t* sem)
{
cpuflg_t cpufg;
krlspinlock_cli(&sem->sem_lock,&cpufg);
sem->sem_count++;//释放信号量
if(sem->sem_count<1)
{//如果小于1,则说数据结构出错了,挂起系统
krlspinunlock_sti(&sem->sem_lock,&cpufg);
hal_sysdie("sem up err");
}
//唤醒该信号量上所有等待的代码执行流(线程)
krlwlst_allup(&sem->sem_waitlst);
krlspinunlock_sti(&sem->sem_lock,&cpufg);
krlsched_set_schedflgs();
return;
}Note
信号量。需要长时间等待才能够获取数据。
Linux下同步原语的实现
同步原语的基本原理和实现上面已经介绍,接下来就学习一下具体的实现。
原子变量
<<<<<<< HEAD 再Linux下有许多共享资源都是简单的整数类型,例如文件描述符中的计数器,open打开的时候对应文件描述符的计数器加1,close关闭文件的时候对应文件描述符的计数器减1。
为了保证这类数据的同步,Linux提供了原子类型变量atomic_t。如果是64位机器,还有一个64位的对应类型。
typedef struct {
int counter;
} atomic_t;//常用的32位的原子变量类型
#ifdef CONFIG_64BIT
typedef struct {
s64 counter;
} atomic64_t;//64位的原子变量类型
#endif并且需要Linux提供的接口函数去操作原子类型
//原子读取变量中的值
static __always_inline int arch_atomic_read(const atomic_t *v)
{
return __READ_ONCE((v)->counter);
}
//原子写入一个具体的值
static __always_inline void arch_atomic_set(atomic_t *v, int i)
{
__WRITE_ONCE(v->counter, i);
}
//原子加上一个具体的值
static __always_inline void arch_atomic_add(int i, atomic_t *v)
{
asm volatile(LOCK_PREFIX "addl %1,%0"
: "+m" (v->counter)
: "ir" (i) : "memory");
}
//原子减去一个具体的值
static __always_inline void arch_atomic_sub(int i, atomic_t *v)
{
asm volatile(LOCK_PREFIX "subl %1,%0"
: "+m" (v->counter)
: "ir" (i) : "memory");
}
//原子加1
static __always_inline void arch_atomic_inc(atomic_t *v)
{
asm volatile(LOCK_PREFIX "incl %0"
: "+m" (v->counter) :: "memory");
}
//原子减1
static __always_inline void arch_atomic_dec(atomic_t *v)
{
asm volatile(LOCK_PREFIX "decl %0"
: "+m" (v->counter) :: "memory");
}在单核心情况下是不需要lock前缀的,LOCK_PREFIX宏展开为空串,多核心下展开为”lock;”。
同样__READ_ONCE和__WRITE_ONCE宏展开如下:
#define __READ_ONCE(x) \
(*(const volatile __unqual_scalar_typeof(x) *)&(x))
#define __WRITE_ONCE(x, val) \
do {*(volatile typeof(x) *)&(x) = (val);} while (0)
//__unqual_scalar_typeof表示声明一个非限定的标量类型,非标量类型保持不变。说人话就是返回x变量的类型,这是GCC的功能,typeof只是纯粹返回x的类型。
//如果 x 是int类型则返回“int”
#define __READ_ONCE(x) \
(*(const volatile int *)&(x))
#define __WRITE_ONCE(x, val) \
do {*(volatile int *)&(x) = (val);} while (0) 其中的volatile int *是为了提醒编译器:这是对内存地址读写,不要有优化动作,每次都必须强制写入内存或从内存读取。
中断控制
Linux有很多场景需要关中断才能安全执行,例如多个中断程序访问共享数据,就需要保证自身和其它中断互斥。
//实际保存eflags寄存器
extern __always_inline unsigned long native_save_fl(void){
unsigned long flags;
asm volatile("# __raw_save_flags\n\t"
"pushf ; pop %0":"=rm"(flags)::"memory");
return flags;
}
//实际恢复eflags寄存器
extern inline void native_restore_fl(unsigned long flags){
asm volatile("push %0 ; popf"::"g"(flags):"memory","cc");
}
//实际关中断
static __always_inline void native_irq_disable(void){
asm volatile("cli":::"memory");
}
//实际开启中断
static __always_inline void native_irq_enable(void){
asm volatile("sti":::"memory");
}
//arch层关中断
static __always_inline void arch_local_irq_disable(void){
native_irq_disable();
}
//arch层开启中断
static __always_inline void arch_local_irq_enable(void){
native_irq_enable();
}
//arch层保存eflags寄存器
static __always_inline unsigned long arch_local_save_flags(void){
return native_save_fl();
}
//arch层恢复eflags寄存器
static __always_inline void arch_local_irq_restore(unsigned long flags){
native_restore_fl(flags);
}
//实际保存eflags寄存器并关中断
static __always_inline unsigned long arch_local_irq_save(void){
unsigned long flags = arch_local_save_flags();
arch_local_irq_disable();
return flags;
}
//raw层关闭开启中断宏
#define raw_local_irq_disable() arch_local_irq_disable()
#define raw_local_irq_enable() arch_local_irq_enable()
//raw层保存恢复eflags寄存器宏
#define raw_local_irq_save(flags) \
do { \
typecheck(unsigned long, flags); \
flags = arch_local_irq_save(); \
} while (0)
#define raw_local_irq_restore(flags) \
do { \
typecheck(unsigned long, flags); \
arch_local_irq_restore(flags); \
} while (0)
#define raw_local_save_flags(flags) \
do { \
typecheck(unsigned long, flags); \
flags = arch_local_save_flags(); \
} while (0)
//通用层接口宏
#define local_irq_enable() \
do { \
raw_local_irq_enable(); \
} while (0)
#define local_irq_disable() \
do { \
raw_local_irq_disable(); \
} while (0)
#define local_irq_save(flags) \
do { \
raw_local_irq_save(flags); \
} while (0)
#define local_irq_restore(flags) \
do { \
raw_local_irq_restore(flags); \
} while (0)带 native_ 前缀之类的函数则跟我们之前实现的 hal_ 前缀对应,而 Linux 为了支持不同的硬件平台,做了多层封装。
自旋锁
Linux 有多种自旋锁,我们这里只介绍两种,原始自旋锁和排队自旋锁。
原始自旋锁
原始自旋锁本质上用一个整数来表示,值为 1 代表锁未被占用,为 0 或者负数则表示被占用。与前面相反,也没有使用xchg,而是直接使用赋值和原子减1操作。
对应的数据结构和实现与前面介绍的也没有太大差别。
//最底层的自旋锁数据结构
typedef struct{
volatile unsigned long lock;//真正的锁值变量,用volatile标识
}spinlock_t;接口的实现如下:
#define spin_unlock_string \
"movb $1,%0" \ //写入1表示解锁
:"=m" (lock->lock) : : "memory"
#define spin_lock_string \
"\n1:\t" \
"lock ; decb %0\n\t" \ //原子减1
"js 2f\n" \ //当结果小于0则跳转到标号2处,表示加锁失败
".section .text.lock,\"ax\"\n" \ //重新定义一个代码段,这是优化技术,避免后面的代码填充cache,因为大部分情况会加锁成功,链接器会处理好这个代码段的
"2:\t" \
"cmpb $0,%0\n\t" \ //和0比较
"rep;nop\n\t" \ //空指令
"jle 2b\n\t" \ //小于或等于0跳转到标号2
"jmp 1b\n" \ //跳转到标号1
".previous"
//获取自旋锁
static inline void spin_lock(spinlock_t*lock){
__asm__ __volatile__(
spin_lock_string
:"=m"(lock->lock)::"memory"
);
}
//释放自旋锁
static inline void spin_unlock(spinlock_t*lock){
__asm__ __volatile__(
spin_unlock_string
);
}排队自旋锁
可能会出现多个进程获取同一个自旋锁的情况,而哪个进程能够访问内存是由总线仲裁协议决定的。为了能够让进程先来先服务,保证获取自旋锁的公平性,使用排序自旋锁解决这个问题。
//RAW层的自旋锁数据结构
typedef struct raw_spinlock{
unsigned int slock;//真正的锁值变量
}raw_spinlock_t;
//最上层的自旋锁数据结构
typedef struct spinlock{
struct raw_spinlock rlock;
}spinlock_t;
//Linux没有这样的结构,这只是为了描述方便
typedef struct raw_spinlock{
union {
unsigned int slock;//真正的锁值变量
struct {
u16 owner;
u16 next;
}
}
}raw_spinlock_t;slock域分为两部分:owner和next,即锁持有者和未来锁申请者。 owner和next相等,表示自旋锁未被获取。
申请锁的时候:将next原子加1并将原值作为自己的序号。该进程循环检查 owner 域是否等于自己持有的序号,一旦相等,则表明锁轮到自己获取。 释放锁的时候:将owner原子加1。
这样就保证了按照先后申请顺序获得锁。
static inline void __raw_spin_lock(raw_spinlock_t*lock){
int inc = 0x00010000;
int tmp;
__asm__ __volatile__(
"lock ; xaddl %0, %1\n" //将inc和slock交换,然后 inc=inc+slock
//相当于原子读取next和owner并对next+1
"movzwl %w0, %2\n\t"//将inc的低16位做0扩展后送tmp tmp=(u16)inc
"shrl $16, %0\n\t" //将inc右移16位 inc=inc>>16
"1:\t"
"cmpl %0, %2\n\t" //比较inc和tmp,即比较next和owner
"je 2f\n\t" //相等则跳转到标号2处返回
"rep ; nop\n\t" //空指令
"movzwl %1, %2\n\t" //将slock的低16位做0扩展后送tmp 即tmp=owner
"jmp 1b\n" //跳转到标号1处继续比较
"2:"
:"+Q"(inc),"+m"(lock->slock),"=r"(tmp)
::"memory","cc"
);
}
#define UNLOCK_LOCK_PREFIX LOCK_PREFIX
static inline void __raw_spin_unlock(raw_spinlock_t*lock){
__asm__ __volatile__(
UNLOCK_LOCK_PREFIX"incw %0"//将slock的低16位加1 即owner+1
:"+m"(lock->slock)
::"memory","cc");
}还有一种场景是,我们尝试获取自旋锁,如果获取成功则执行,获取失败则执行别的任务,不在这里等待。
static inline int __raw_spin_trylock(raw_spinlock_t*lock){
int tmp;
int new;
asm volatile(
"movl %2,%0\n\t"//tmp=slock
"movl %0,%1\n\t"//new=tmp
"roll $16, %0\n\t"//tmp循环左移16位,即next和owner交换了
"cmpl %0,%1\n\t"//比较tmp和new即(owner、next)?=(next、owner)
"jne 1f\n\t" //不等则跳转到标号1处
"addl $0x00010000, %1\n\t"//相当于next+1
"lock ; cmpxchgl %1,%2\n\t"//new和slock交换比较
"1:"
"sete %b1\n\t" //new = eflags.ZF位,ZF取决于前面的判断是否相等
"movzbl %b1,%0\n\t" //tmp = new
:"=&a"(tmp),"=Q"(new),"+m"(lock->slock)
::"memory","cc");
return tmp;
}
int __lockfunc _spin_trylock(spinlock_t*lock){
preempt_disable();
if(_raw_spin_trylock(lock)){
spin_acquire(&lock->dep_map,0,1,_RET_IP_);
return 1;
}
preempt_enable();
return 0;
}
#define spin_trylock(lock) __cond_lock(lock, _spin_trylock(lock))信号量
保证资源在一个时刻只有一个进程使用,这是单值信号量。也可以作为资源计数器,比如一种资源有五份,同时最多可以有五个进程,这是多值信号量。
信号量最大的优势就是可用让申请失败的进程睡眠,还可以作为资源计数器使用。
struct semaphore{
raw_spinlock_t lock;//保护信号量自身的自旋锁
unsigned int count;//信号量值
struct list_head wait_list;//挂载睡眠等待进程的链表
};使用代码案例:
#define down_console_sem() do { \
down(&console_sem);\
} while (0)
static void __up_console_sem(unsigned long ip) {
up(&console_sem);
}
#define up_console_sem() __up_console_sem(_RET_IP_)
//加锁console
void console_lock(void)
{
might_sleep();
down_console_sem();//获取信号量console_sem
if (console_suspended)
return;
console_locked = 1;
console_may_schedule = 1;
}
//解锁console
void console_unlock(void)
{
static char ext_text[CONSOLE_EXT_LOG_MAX];
static char text[LOG_LINE_MAX + PREFIX_MAX];
//……删除了很多代码
up_console_sem();//释放信号量console_sem
raw_spin_lock(&logbuf_lock);
//……删除了很多代码
}Linux中使用宏 DEFINE_SEMAPHORE 声明了一个单值信号量 console_sem,也可以说是互斥锁,用于保护 console 驱动列表 console_drivers 以及同步对整个 console 驱动的访问。
宏 down_console_sem() 来获得信号量 console_sem,定义了宏 up_console_sem() 来释放信号量 console_sem
down_console_sem() 和 up_console_sem() 宏的核心主要是调用了信号量的接口函数 down、up 函数,完成获取、释放信号量的核心操作,代码如下。
static inline int __sched __down_common(struct semaphore *sem, long state,long timeout)
{
struct semaphore_waiter waiter;
//把waiter加入sem->wait_list的头部
list_add_tail(&waiter.list, &sem->wait_list);
waiter.task = current;//current表示当前进程,即调用该函数的进程
waiter.up = false;
for (;;) {
if (signal_pending_state(state, current))
goto interrupted;
if (unlikely(timeout <= 0))
goto timed_out;
__set_current_state(state);//设置当前进程的状态,进程睡眠,即先前__down函数中传入的TASK_UNINTERRUPTIBLE:该状态是等待资源有效时唤醒(比如等待键盘输入、socket连接、信号(signal)等等),但不可以被中断唤醒
raw_spin_unlock_irq(&sem->lock);//释放在down函数中加的锁
timeout = schedule_timeout(timeout);//真正进入睡眠
raw_spin_lock_irq(&sem->lock);//进程下次运行会回到这里,所以要加锁
if (waiter.up)
return 0;
}
timed_out:
list_del(&waiter.list);
return -ETIME;
interrupted:
list_del(&waiter.list);
return -EINTR;
//为了简单起见处理进程信号(signal)和超时的逻辑代码我已经删除
}
//进入睡眠等待
static noinline void __sched __down(struct semaphore *sem)
{
__down_common(sem, TASK_UNINTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
}
//获取信号量
void down(struct semaphore *sem)
{
unsigned long flags;
//对信号量本身加锁并关中断,也许另一段代码也在操作该信号量
raw_spin_lock_irqsave(&sem->lock, flags);
if (likely(sem->count > 0))
sem->count--;//如果信号量值大于0,则对其减1
else
__down(sem);//否则让当前进程进入睡眠
raw_spin_unlock_irqrestore(&sem->lock, flags);
}
//实际唤醒进程
static noinline void __sched __up(struct semaphore *sem)
{
struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list, struct semaphore_waiter, list);
//获取信号量等待链表中的第一个数据结构semaphore_waiter,它里面保存着睡眠进程的指针
list_del(&waiter->list);
waiter->up = true;
wake_up_process(waiter->task);//唤醒进程重新加入调度队列
}
//释放信号量
void up(struct semaphore *sem)
{
unsigned long flags;
//对信号量本身加锁并关中断,必须另一段代码也在操作该信号量
raw_spin_lock_irqsave(&sem->lock, flags);
if (likely(list_empty(&sem->wait_list)))
sem->count++;//如果信号量等待链表中为空,则对信号量值加1
else
__up(sem);//否则执行唤醒进程相关的操作
raw_spin_unlock_irqrestore(&sem->lock, flags);
}读写锁
合适共享数据读多写少的情况。
读写锁分为读锁和写锁,读锁之间是共享的,其它之间是互斥的。读写锁也是自旋锁的一个变种。
//读写锁初始化锁值
#define RW_LOCK_BIAS 0x01000000
//读写锁的底层数据结构
typedef struct{
unsigned int lock;
}arch_rwlock_t;
//释放读锁
static inline void arch_read_unlock(arch_rwlock_t*rw){
asm volatile(
LOCK_PREFIX"incl %0" //原子对lock加1
:"+m"(rw->lock)::"memory");
}
//释放写锁
static inline void arch_write_unlock(arch_rwlock_t*rw){
asm volatile(
LOCK_PREFIX"addl %1, %0"//原子对lock加上RW_LOCK_BIAS
:"+m"(rw->lock):"i"(RW_LOCK_BIAS):"memory");
}
//获取写锁失败时调用
ENTRY(__write_lock_failed)
//(%eax)表示由eax指向的内存空间是调用者传进来的
2:LOCK_PREFIX addl $ RW_LOCK_BIAS,(%eax)
1:rep;nop//空指令
cmpl $RW_LOCK_BIAS,(%eax)
//不等于初始值则循环比较,相等则表示有进程释放了写锁
jne 1b
//执行加写锁
LOCK_PREFIX subl $ RW_LOCK_BIAS,(%eax)
jnz 2b //不为0则继续测试,为0则表示加写锁成功
ret //返回
ENDPROC(__write_lock_failed)
//获取读锁失败时调用
ENTRY(__read_lock_failed)
//(%eax)表示由eax指向的内存空间是调用者传进来的
2:LOCK_PREFIX incl(%eax)//原子加1
1: rep; nop//空指令
cmpl $1,(%eax) //和1比较 小于0则
js 1b //为负则继续循环比较
LOCK_PREFIX decl(%eax) //加读锁
js 2b //为负则继续加1并比较,否则返回
ret //返回
ENDPROC(__read_lock_failed)
//获取读锁
static inline void arch_read_lock(arch_rwlock_t*rw){
asm volatile(
LOCK_PREFIX" subl $1,(%0)\n\t"//原子对lock减1
"jns 1f\n"//不为小于0则跳转标号1处,表示获取读锁成功
"call __read_lock_failed\n\t"//调用__read_lock_failed
"1:\n"
::LOCK_PTR_REG(rw):"memory");
}
//获取写锁
static inline void arch_write_lock(arch_rwlock_t*rw){
asm volatile(
LOCK_PREFIX"subl %1,(%0)\n\t"//原子对lock减去RW_LOCK_BIAS
"jz 1f\n"//为0则跳转标号1处
"call __write_lock_failed\n\t"//调用__write_lock_failed
"1:\n"
::LOCK_PTR_REG(rw),"i"(RW_LOCK_BIAS):"memory");
}将读写锁的初值设置为0x1000000。获取读锁的时候lock原子减1,不小于0表示读锁获取成功,获取写锁的时候直接减去0x01000000。
这样只要获取锁后的值不小于0就表示成功获取到锁,获取失败则还原。
=======
中断控制
自旋锁
信号量
ddc41c16d92a059f6fa702c4299a59441ec83400