在开发操作系统前,需要先弄清楚数据同步的问题。避免出现不可预知的错误。

首先我们需要先弄清楚并发操作中数据不同步的原因,然后再学习解决方法。

非预期结果的全局变量

代码如下:

int a = 0;
void interrupt_handle()
{
    a++;
}
void thread_func()
{
    a++;
}
 

通常编译器会把a++翻译成三条指令:

  1. 将a加载到某个寄存器中。
  2. 这个寄存器加1。
  3. 把这个寄存器写回内存。

如果出现这样一个情况:thread_func刚执行完第二条指令就发生了中断,CPU执行完中断返回后继续执行第三条指令,最终a的结果为1,这显然是错误的。看下表:

这种错误是因为:一个逻辑上是一个整体的操作实际上对应多条指令,造成执行一半的途中进行中断切换,造成了非预期的结果。

接下来就是解决方案了。

一:原子操作 拿下单体变量

解决的一种思路就是将a++变成原子操作,要么不执行,要么全部执行。 x86平台支持很多原子操作,我们只需要直接应用这些指令,用汇编代码写出对应的原子操作函数即可。 需要了解一下GCC嵌入式汇编

实现代码如下:

// 定义一个原子类型
typedef struct s_ATOMIC {
	volatile s32_t a_count; //在变量前加上volatile,是为了禁止编译器优化,使其每次都从内存中加载变量
}atomic_t;
 
// 原子读
static inline s32_t atomic_read(const atomic_t *v) {
	// x86平台取地址是原子操作
	return (*(volatile u32_t*)&(v)->a_count);
}
 
// 原子写
static inline void atomic_write(atomic_t *v, int i) {
	// x86平台把一个值写入地址也是原子的
	v->a_count = i;
}
 
// 原子加上一个整数
static inline void atomic_add(int i, atomic_t *v) {
	__asm__ __volatile__("lock;" "addl %1, %0"
				: "+m"(v->a_count)
				: "ir"(i));
}
 
//原子减去一个整数
static inline void atomic_sub(int i, atomic_t *v){
	__asm__ __volatile__("lock;" "subl %1,%0"
	         : "+m" (v->a_count)
	         : "ir" (i));
}
//原子加1
static inline void atomic_inc(atomic_t *v){        
	__asm__ __volatile__("lock;" "incl %0"                       
			: "+m" (v->a_count));
}
//原子减1
static inline void atomic_dec(atomic_t *v){
	__asm__ __volatile__("lock;" "decl %0"                     
			: "+m" (v->a_count));
}

以上代码中,加lock前缀的addl, subl, incl, decl指令都是原子操作,lock表示锁定总线

利用原子操作,我们可以将代码改写为:

atomic_t a = {0};
 
void interupt_handle() {
	atomic_inc(&a);
}
 
void thread_func() {
	atomic_inc(&a);
}

Note

原子操作适合单个变量全局数据。例如全局计数器、标志变量。对于有几百个字节的数据结构或者包含不同类型的数据类型,原子操作时无法解决的

二:中断控制 搞定复杂变量

中断是CPU响应外部事件的重要机制。 但是如果中断代码中操作了其它代码的数据,这就需要相应的控制机制了。

对于需要保证原子性的一系列操作,我们通过开关中断来保证在执行过程中不会被打断。

在x86 CPU上关闭、开启中断的指令为cli、sti。通过对CPU的EFLAGS寄存器的IF位进行清楚和置位,CPU通过此位决定是否响应中断信号

这两条指令需要Ring0权限才能执行。

//关闭中断
void hal_cli()
{
    __asm__ __volatile__("cli": : :"memory");
}
//开启中断
void hal_sti()
{
    __asm__ __volatile__("sti": : :"memory");
}
//使用场景
void foo()
{
    hal_cli();
    //操作数据……
    hal_sti();
}
void bar()
{
    hal_cli();
    //操作数据……
    hal_sti();
}

但是这样的话,hal_cli()和hal_sti()无法嵌套使用。为了解决这个问题,我们要对这两个函数进行修改。 在关闭中断函数中先保存EFLAGS寄存器,然后执行cli指令,开启中断函数中直接恢复之前保存的EFLAGS寄存器即可

typedef u32_t cpuflg_t;
static inline void hal_save_flags_cli(cpuflg_t* flags)
{
     __asm__ __volatile__(
            "pushfl \t\n" //把eflags寄存器压入当前栈顶
            "cli    \t\n" //关闭中断
            "popl %0 \t\n"//把当前栈顶弹出到flags为地址的内存中        
            : "=m"(*flags)
            :
            : "memory"
          );
}
static inline void hal_restore_flags_sti(cpuflg_t* flags)
{
    __asm__ __volatile__(
              "pushl %0 \t\n"//把flags为地址处的值寄存器压入当前栈顶
              "popfl \t\n"   //把当前栈顶弹出到eflags寄存器中
              :
              : "m"(*flags)
              : "memory"
              );
}

hal_restore_flags_sti()是否开启中断完全依赖于上一次eflags寄存器中的值

其实,直接通过维持堆栈平衡也可以实现,没有必要引入一个中间变量。

Note

中断控制。适合同步数据很多并且单核心的CPU。因为单核心的CPU只要控制全局中断的开关就能够保证代码流的原子性

三:自旋锁 协调多核心CPU

前面的中断控制对于单CPU来说是解决了问题,因为单CPU只有一条代码流并且只能通过中断来切换到另一条代码流,所以通过中断控制就能够安全操作全局数据。

但是对于多核心的CPU,同一时刻下有多条代码流,控制中断只能控制本地CPU的中断,无法控制其它CPU核心的中断,于是就有了自旋锁这种机制来维护多核心CPU下的全局数据。

自旋锁的原理如下:

  1. 读取锁变量。判断是否加锁,如果没有加锁则加锁,然后返回表示加锁成功,否则执行下一步。
  2. 已经加锁了,那么返回第一步。

也就是说,会如果没有加锁成功,那么会一直在这里循环,直到加锁成功,这也是自旋锁名称的由来。

Note

这个算法正确执行前提是,读取锁变量和判断锁变量是否加锁的操作是原子执行的。不然,在CPU0读取锁变量后,CPU1也读取锁变量并判断未加锁执行加锁,然后CPU0也判断未加锁执行加锁,就会导致出错。因此读取和判断是要具有原子性的

typedef struct {
	volatile u32_t lock; //volatile可以防止编译器优化,保证其它代码始终从内存加载lock变量的值
}spinlock_t;
 
// 初始化锁
static inline void x86_spin_lock_init(spinlock_t *lock) {
	lock->lock = 0;
}
 
//加锁函数
static inline void x86_spin_lock(spinlock_t * lock){
	__asm__ __volatile__ (
	"1: \n"    
	"lock; xchg  %0, %1 \n"//把值为1的寄存器和lock内存中的值进行交换    
	"cmpl   $0, %0 \n" //用0和交换回来的值进行比较    
	"jnz    2f \n"  //不等于0则跳转后面2标号处运行    
	"jmp 3f \n"     //若等于0则跳转后面3标号处返回    
	"2:         \n"     
	"cmpl   $0, %1  \n"//用0和lock内存中的值进行比较    
	"jne    2b      \n"//若不等于0则跳转到前面2标号处运行继续比较      
	"jmp    1b      \n"//若等于0则跳转到前面1标号处运行,交换并加锁    
	"3:  \n"     :    : "r"(1), "m"(*lock));
}
 
//解锁函数
static inline void x86_spin_unlock(spinlock_t * lock){    
	__asm__ __volatile__(    
	"movl   $0, %0\n"//解锁把lock内存中的值设为0就行    
	:    : "m"(*lock));}

这里介绍一下加锁的实现逻辑:

  1. 通过xchg将值为1的寄存器与内存中的锁变量的值进行交换,这样对于所有的CPU核心来说,内存中的锁变量都为1,表示加锁。
  2. 判断寄存器的值,如果寄存器的值为0,表示加锁成功,可以返回。否则执行下一步。
  3. 寄存器的值为1,表示已经有其它CPU核心获取到锁了,所以当前CPU获取锁失败,执行下一步。
  4. 不断判断内存中的锁变量是否为0,如果为0,那么表示自旋锁被让出,执行第一步,尝试重新获取锁。

Note

自旋锁有中断嵌套的问题,如果获取自旋锁之后,发生中断,在中断处理程序中又获取自旋锁,那么就会导致发生死锁。

因此我们需要进行中断控制。完善的代码如下:

static inline void x86_spin_lock_disable_irq(spinlock_t * lock,cpuflg_t* flags)
{
    __asm__ __volatile__(
    "pushfq                 \n\t"
    "cli                    \n\t"
    "popq %0                \n\t"
    "1:         \n\t"
    "lock; xchg  %1, %2 \n\t"
    "cmpl   $0,%1       \n\t"
    "jnz    2f      \n\t"
    "jmp    3f      \n"  
    "2:         \n\t"
    "cmpl   $0,%2       \n\t" 
    "jne    2b      \n\t"
    "jmp    1b      \n\t"
    "3:     \n"     
     :"=m"(*flags)
    : "r"(1), "m"(*lock));
}
static inline void x86_spin_unlock_enabled_irq(spinlock_t* lock,cpuflg_t* flags)
{
    __asm__ __volatile__(
    "movl   $0, %0\n\t"
    "pushq %1 \n\t"
    "popfq \n\t"
    :
    : "m"(*lock), "m"(*flags));
}

Note

自旋锁。适合同步数据多,CPU为多核心,同步资源不需要长时间等待

四:信号量

原子操作和自旋锁都不适合长时间等待的情况,很多资源获取的时候通常都需要等待,如果使用自旋锁来同步这种资源,会造成CPU资源的浪费。

为了同步需要长时间等待的资源并避免CPU资源的浪费,出现了信号量的同步机制。

如果一个代码执行流要获取一个需要长时间等待的资源,我们完全没有必要循环这个程序直到资源获取成功,而是执行其它代码执行流,直到有可用资源的时候,再考虑执行这个代码流。

也就是三个问题:等待、互斥、唤醒

需要设计一种数据结构,至少需要一个变量来表示互斥,还有一个等待链,用于保持等待执行的代码流。

#define SEM_FLG_MUTEX 0
#define SEM_FLG_MULTI 1
#define SEM_MUTEX_ONE_LOCK 1
#define SEM_MULTI_LOCK 0
//等待链数据结构,用于挂载等待代码执行流(线程)的结构,里面有用于挂载代码执行流的链表和计数器变量,这里我们先不深入研究这个数据结构。
typedef struct s_KWLST
{   
    spinlock_t wl_lock;
    uint_t   wl_tdnr;
    list_h_t wl_list;
}kwlst_t;
//信号量数据结构
typedef struct s_SEM
{
    spinlock_t sem_lock;//维护sem_t自身数据的自旋锁
    uint_t sem_flg;//信号量相关的标志
    sint_t sem_count;//信号量计数值
    kwlst_t sem_waitlst;//用于挂载等待代码执行流(线程)结构
}sem_t;

因为使用信号量的数据的时候,要保证信号量的数据同步,为此我们借用自旋锁来进行同步,因此对信号量的数据的获取和修改并不需要长时间等待,所以正合适。

信号量的使用分为三个步骤:

  1. 获取信号量
  2. 执行代码流
  3. 释放信号量

获取信号量

  1. 首先对用户保护信号量自身的自旋锁sem_lock进行加锁。
  2. 对信号量sem_count执行减1操作,并检查其值是否为0.
  3. 如果上步检查的结果为小于0,那么就让进程进入等待状态并将其挂入sem_waitlst中,然后调度其它进程执行。否则信号量获取成功。
  4. 释放自旋锁。
//获取信号量
void krlsem_down(sem_t* sem)
{
    cpuflg_t cpufg;
start_step:    
    krlspinlock_cli(&sem->sem_lock,&cpufg);
    if(sem->sem_count<1)
    {//如果信号量值小于1,则让代码执行流(线程)睡眠
        krlwlst_wait(&sem->sem_waitlst);
        krlspinunlock_sti(&sem->sem_lock,&cpufg);
        krlschedul();//切换代码执行流,下次恢复执行时依然从下一行开始执行,所以要goto开始处重新获取信号量
        goto start_step; 
    }
    sem->sem_count--;//信号量值减1,表示成功获取信号量
    krlspinunlock_sti(&sem->sem_lock,&cpufg);
    return;
}

释放信号量

  1. 先对用户保护信号量本身的自旋锁sem_lock加锁。
  2. 对信号量sem_count执行“加1”操作,并检查是否大于0。
  3. 上步的检查结果大于0,就唤醒sem_waitlst中的进程,然后进行进程调度
  4. 释放自旋锁
//释放信号量
void krlsem_up(sem_t* sem)
{
    cpuflg_t cpufg;
    krlspinlock_cli(&sem->sem_lock,&cpufg);
    sem->sem_count++;//释放信号量
    if(sem->sem_count<1)
    {//如果小于1,则说数据结构出错了,挂起系统
        krlspinunlock_sti(&sem->sem_lock,&cpufg);
        hal_sysdie("sem up err");
    }
    //唤醒该信号量上所有等待的代码执行流(线程)
    krlwlst_allup(&sem->sem_waitlst);
    krlspinunlock_sti(&sem->sem_lock,&cpufg);
    krlsched_set_schedflgs();
    return;
}

Note

信号量。需要长时间等待才能够获取数据

Linux下同步原语的实现

同步原语的基本原理和实现上面已经介绍,接下来就学习一下具体的实现。

原子变量

<<<<<<< HEAD 再Linux下有许多共享资源都是简单的整数类型,例如文件描述符中的计数器,open打开的时候对应文件描述符的计数器加1,close关闭文件的时候对应文件描述符的计数器减1。

为了保证这类数据的同步,Linux提供了原子类型变量atomic_t。如果是64位机器,还有一个64位的对应类型。

typedef struct {
    int counter;
} atomic_t;//常用的32位的原子变量类型
#ifdef CONFIG_64BIT
typedef struct {
    s64 counter;
} atomic64_t;//64位的原子变量类型
#endif

并且需要Linux提供的接口函数去操作原子类型

 
//原子读取变量中的值
static __always_inline int arch_atomic_read(const atomic_t *v)
{
    return __READ_ONCE((v)->counter);
}
//原子写入一个具体的值
static __always_inline void arch_atomic_set(atomic_t *v, int i)
{
    __WRITE_ONCE(v->counter, i);
}
//原子加上一个具体的值
static __always_inline void arch_atomic_add(int i, atomic_t *v)
{
    asm volatile(LOCK_PREFIX "addl %1,%0"
             : "+m" (v->counter)
             : "ir" (i) : "memory");
}
//原子减去一个具体的值
static __always_inline void arch_atomic_sub(int i, atomic_t *v)
{
    asm volatile(LOCK_PREFIX "subl %1,%0"
             : "+m" (v->counter)
             : "ir" (i) : "memory");
}
//原子加1
static __always_inline void arch_atomic_inc(atomic_t *v)
{
    asm volatile(LOCK_PREFIX "incl %0"
             : "+m" (v->counter) :: "memory");
}
//原子减1
static __always_inline void arch_atomic_dec(atomic_t *v)
{
    asm volatile(LOCK_PREFIX "decl %0"
             : "+m" (v->counter) :: "memory");
}

在单核心情况下是不需要lock前缀的,LOCK_PREFIX宏展开为空串,多核心下展开为”lock;”

同样__READ_ONCE和__WRITE_ONCE宏展开如下:

#define __READ_ONCE(x)  \
(*(const volatile __unqual_scalar_typeof(x) *)&(x))
 
#define __WRITE_ONCE(x, val) \
do {*(volatile typeof(x) *)&(x) = (val);} while (0)
 
//__unqual_scalar_typeof表示声明一个非限定的标量类型,非标量类型保持不变。说人话就是返回x变量的类型,这是GCC的功能,typeof只是纯粹返回x的类型。
//如果 x 是int类型则返回“int” 
 
#define __READ_ONCE(x)  \
(*(const volatile int *)&(x))
 
#define __WRITE_ONCE(x, val) \
do {*(volatile int *)&(x) = (val);} while (0) 

其中的volatile int *是为了提醒编译器:这是对内存地址读写,不要有优化动作,每次都必须强制写入内存或从内存读取。

中断控制

Linux有很多场景需要关中断才能安全执行,例如多个中断程序访问共享数据,就需要保证自身和其它中断互斥。

 
//实际保存eflags寄存器
extern __always_inline unsigned long native_save_fl(void){
    unsigned long flags;
    asm volatile("# __raw_save_flags\n\t"
                 "pushf ; pop %0":"=rm"(flags)::"memory");
    return flags;
}
//实际恢复eflags寄存器
extern inline void native_restore_fl(unsigned long flags){
    asm volatile("push %0 ; popf"::"g"(flags):"memory","cc");
}
//实际关中断
static __always_inline void native_irq_disable(void){
    asm volatile("cli":::"memory");
}
//实际开启中断
static __always_inline void native_irq_enable(void){
    asm volatile("sti":::"memory");
}
//arch层关中断
static __always_inline void arch_local_irq_disable(void){
    native_irq_disable();
}
//arch层开启中断
static __always_inline void arch_local_irq_enable(void){ 
    native_irq_enable();
}
//arch层保存eflags寄存器
static __always_inline unsigned long           arch_local_save_flags(void){
    return native_save_fl();
}
//arch层恢复eflags寄存器
static  __always_inline void arch_local_irq_restore(unsigned long flags){
    native_restore_fl(flags);
}
//实际保存eflags寄存器并关中断
static __always_inline unsigned long arch_local_irq_save(void){
    unsigned long flags = arch_local_save_flags();
    arch_local_irq_disable();
    return flags;
}
//raw层关闭开启中断宏
#define raw_local_irq_disable()     arch_local_irq_disable()
#define raw_local_irq_enable()      arch_local_irq_enable()
//raw层保存恢复eflags寄存器宏
#define raw_local_irq_save(flags)           \
    do {                        \
        typecheck(unsigned long, flags);    \
        flags = arch_local_irq_save();      \
    } while (0)
    
#define raw_local_irq_restore(flags)            \
    do {                        \
        typecheck(unsigned long, flags);    \
        arch_local_irq_restore(flags);      \
    } while (0)
    
#define raw_local_save_flags(flags)         \
    do {                        \
        typecheck(unsigned long, flags);    \
        flags = arch_local_save_flags();    \
    } while (0)
//通用层接口宏 
#define local_irq_enable()              \
    do { \
        raw_local_irq_enable();         \
    } while (0)
 
#define local_irq_disable()             \
    do {                        \
        raw_local_irq_disable();        \
    } while (0)
 
#define local_irq_save(flags)               \
    do {                        \
        raw_local_irq_save(flags);      \
    } while (0)
 
#define local_irq_restore(flags)            \
    do {                        \
        raw_local_irq_restore(flags);       \
    } while (0)

带 native_ 前缀之类的函数则跟我们之前实现的 hal_ 前缀对应,而 Linux 为了支持不同的硬件平台,做了多层封装。

自旋锁

Linux 有多种自旋锁,我们这里只介绍两种,原始自旋锁和排队自旋锁

原始自旋锁

原始自旋锁本质上用一个整数来表示,值为 1 代表锁未被占用,为 0 或者负数则表示被占用。与前面相反,也没有使用xchg,而是直接使用赋值和原子减1操作。

对应的数据结构和实现与前面介绍的也没有太大差别。

//最底层的自旋锁数据结构
typedef struct{
volatile unsigned long lock;//真正的锁值变量,用volatile标识
}spinlock_t;

接口的实现如下:

 
#define spin_unlock_string \  
    "movb $1,%0" \ //写入1表示解锁
    :"=m" (lock->lock) : : "memory"
 
#define spin_lock_string \
  "\n1:\t" \  
    "lock ; decb %0\n\t" \ //原子减1
  "js 2f\n" \    //当结果小于0则跳转到标号2处,表示加锁失败
    ".section .text.lock,\"ax\"\n" \ //重新定义一个代码段,这是优化技术,避免后面的代码填充cache,因为大部分情况会加锁成功,链接器会处理好这个代码段的
  "2:\t" \  
    "cmpb $0,%0\n\t" \  //和0比较
    "rep;nop\n\t" \  //空指令
    "jle 2b\n\t" \   //小于或等于0跳转到标号2
    "jmp 1b\n" \   //跳转到标号1  
    ".previous"
//获取自旋锁
static inline void spin_lock(spinlock_t*lock){
    __asm__ __volatile__(
    spin_lock_string
    :"=m"(lock->lock)::"memory"
    );
}
//释放自旋锁
static inline void spin_unlock(spinlock_t*lock){
__asm__ __volatile__(
    spin_unlock_string
    );
}

排队自旋锁

可能会出现多个进程获取同一个自旋锁的情况,而哪个进程能够访问内存是由总线仲裁协议决定的。为了能够让进程先来先服务,保证获取自旋锁的公平性,使用排序自旋锁解决这个问题。

 
//RAW层的自旋锁数据结构
typedef struct raw_spinlock{
    unsigned int slock;//真正的锁值变量
}raw_spinlock_t;
//最上层的自旋锁数据结构
typedef struct spinlock{
    struct raw_spinlock rlock;
}spinlock_t;
//Linux没有这样的结构,这只是为了描述方便
typedef struct raw_spinlock{
    union {
        unsigned int slock;//真正的锁值变量
        struct {
        u16 owner;
        u16 next;
        }
    }
}raw_spinlock_t;

slock域分为两部分:owner和next,即锁持有者和未来锁申请者。 owner和next相等,表示自旋锁未被获取。

申请锁的时候:将next原子加1并将原值作为自己的序号。该进程循环检查 owner 域是否等于自己持有的序号,一旦相等,则表明锁轮到自己获取。 释放锁的时候:将owner原子加1。

这样就保证了按照先后申请顺序获得锁。

 
static inline void __raw_spin_lock(raw_spinlock_t*lock){
	int inc = 0x00010000;
	int tmp;
	__asm__ __volatile__(
	"lock ; xaddl %0, %1\n" //将inc和slock交换,然后 inc=inc+slock
	                        //相当于原子读取next和owner并对next+1
	"movzwl %w0, %2\n\t"//将inc的低16位做0扩展后送tmp tmp=(u16)inc
	"shrl $16, %0\n\t" //将inc右移16位 inc=inc>>16
	"1:\t"
	"cmpl %0, %2\n\t" //比较inc和tmp,即比较next和owner 
	"je 2f\n\t" //相等则跳转到标号2处返回
	"rep ; nop\n\t" //空指令
	"movzwl %1, %2\n\t" //将slock的低16位做0扩展后送tmp 即tmp=owner
	"jmp 1b\n" //跳转到标号1处继续比较
	"2:"
	:"+Q"(inc),"+m"(lock->slock),"=r"(tmp)
	::"memory","cc"
	);
}
#define UNLOCK_LOCK_PREFIX LOCK_PREFIX
static inline void __raw_spin_unlock(raw_spinlock_t*lock){
	__asm__ __volatile__(
	UNLOCK_LOCK_PREFIX"incw %0"//将slock的低16位加1 即owner+1
	:"+m"(lock->slock)
	::"memory","cc");
}

还有一种场景是,我们尝试获取自旋锁,如果获取成功则执行,获取失败则执行别的任务,不在这里等待。

 
static inline int __raw_spin_trylock(raw_spinlock_t*lock){
    int tmp;
    int new;
    asm volatile(
    "movl %2,%0\n\t"//tmp=slock
    "movl %0,%1\n\t"//new=tmp
    "roll $16, %0\n\t"//tmp循环左移16位,即next和owner交换了
    "cmpl %0,%1\n\t"//比较tmp和new即(owner、next)?=(next、owner)
    "jne 1f\n\t" //不等则跳转到标号1处 
    "addl $0x00010000, %1\n\t"//相当于next+1
    "lock ; cmpxchgl %1,%2\n\t"//new和slock交换比较    
    "1:"
    "sete %b1\n\t" //new = eflags.ZF位,ZF取决于前面的判断是否相等
    "movzbl %b1,%0\n\t" //tmp = new
    :"=&a"(tmp),"=Q"(new),"+m"(lock->slock)
    ::"memory","cc");
    return tmp;
}
int __lockfunc _spin_trylock(spinlock_t*lock){ 
    preempt_disable();
    if(_raw_spin_trylock(lock)){
        spin_acquire(&lock->dep_map,0,1,_RET_IP_);
        return 1;
    }
    preempt_enable();
    return 0;
}
#define spin_trylock(lock) __cond_lock(lock, _spin_trylock(lock))

信号量

保证资源在一个时刻只有一个进程使用,这是单值信号量。也可以作为资源计数器,比如一种资源有五份,同时最多可以有五个进程,这是多值信号量。

信号量最大的优势就是可用让申请失败的进程睡眠,还可以作为资源计数器使用。

 
struct semaphore{
    raw_spinlock_t lock;//保护信号量自身的自旋锁
    unsigned int count;//信号量值
    struct list_head wait_list;//挂载睡眠等待进程的链表
};

使用代码案例:

 
#define down_console_sem() do { \
    down(&console_sem);\
} while (0)
static void __up_console_sem(unsigned long ip) {
    up(&console_sem);
}
#define up_console_sem() __up_console_sem(_RET_IP_)
//加锁console
void console_lock(void)
{
    might_sleep();
    down_console_sem();//获取信号量console_sem
    if (console_suspended)
        return;
    console_locked = 1;
    console_may_schedule = 1;
}
//解锁console
void console_unlock(void)
{
    static char ext_text[CONSOLE_EXT_LOG_MAX];
    static char text[LOG_LINE_MAX + PREFIX_MAX];
    //……删除了很多代码
    up_console_sem();//释放信号量console_sem
    raw_spin_lock(&logbuf_lock);
    //……删除了很多代码   
}

Linux中使用宏 DEFINE_SEMAPHORE 声明了一个单值信号量 console_sem,也可以说是互斥锁,用于保护 console 驱动列表 console_drivers 以及同步对整个 console 驱动的访问。

宏 down_console_sem() 来获得信号量 console_sem,定义了宏 up_console_sem() 来释放信号量 console_sem

down_console_sem() 和 up_console_sem() 宏的核心主要是调用了信号量的接口函数 down、up 函数,完成获取、释放信号量的核心操作,代码如下。

 
static inline int __sched __down_common(struct semaphore *sem, long state,long timeout)
{
    struct semaphore_waiter waiter;
    //把waiter加入sem->wait_list的头部
    list_add_tail(&waiter.list, &sem->wait_list);
    waiter.task = current;//current表示当前进程,即调用该函数的进程
    waiter.up = false;
    for (;;) {
        if (signal_pending_state(state, current))
            goto interrupted;
        if (unlikely(timeout <= 0))
            goto timed_out;
        __set_current_state(state);//设置当前进程的状态,进程睡眠,即先前__down函数中传入的TASK_UNINTERRUPTIBLE:该状态是等待资源有效时唤醒(比如等待键盘输入、socket连接、信号(signal)等等),但不可以被中断唤醒
        raw_spin_unlock_irq(&sem->lock);//释放在down函数中加的锁
        timeout = schedule_timeout(timeout);//真正进入睡眠
        raw_spin_lock_irq(&sem->lock);//进程下次运行会回到这里,所以要加锁
        if (waiter.up)
            return 0;
    }
 timed_out:
    list_del(&waiter.list);
    return -ETIME;
 interrupted:
    list_del(&waiter.list);
    return -EINTR;
 
    //为了简单起见处理进程信号(signal)和超时的逻辑代码我已经删除
}
//进入睡眠等待
static noinline void __sched __down(struct semaphore *sem)
{
    __down_common(sem, TASK_UNINTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
}
//获取信号量
void down(struct semaphore *sem)
{
    unsigned long flags;
    //对信号量本身加锁并关中断,也许另一段代码也在操作该信号量
    raw_spin_lock_irqsave(&sem->lock, flags);
    if (likely(sem->count > 0))
        sem->count--;//如果信号量值大于0,则对其减1
    else
        __down(sem);//否则让当前进程进入睡眠
    raw_spin_unlock_irqrestore(&sem->lock, flags);
}
//实际唤醒进程 
static noinline void __sched __up(struct semaphore *sem)
{
    struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list, struct semaphore_waiter, list);
    //获取信号量等待链表中的第一个数据结构semaphore_waiter,它里面保存着睡眠进程的指针
    list_del(&waiter->list);
    waiter->up = true;
    wake_up_process(waiter->task);//唤醒进程重新加入调度队列
}
//释放信号量
void up(struct semaphore *sem)
{
    unsigned long flags;
    //对信号量本身加锁并关中断,必须另一段代码也在操作该信号量
    raw_spin_lock_irqsave(&sem->lock, flags);
    if (likely(list_empty(&sem->wait_list)))
        sem->count++;//如果信号量等待链表中为空,则对信号量值加1
    else
        __up(sem);//否则执行唤醒进程相关的操作
    raw_spin_unlock_irqrestore(&sem->lock, flags);
}

读写锁

合适共享数据读多写少的情况。

读写锁分为读锁和写锁,读锁之间是共享的,其它之间是互斥的。读写锁也是自旋锁的一个变种。

 
//读写锁初始化锁值
#define RW_LOCK_BIAS     0x01000000
//读写锁的底层数据结构
typedef struct{
    unsigned int lock;
}arch_rwlock_t;
//释放读锁 
static inline void arch_read_unlock(arch_rwlock_t*rw){ 
    asm volatile(
        LOCK_PREFIX"incl %0" //原子对lock加1
        :"+m"(rw->lock)::"memory");
}
//释放写锁
static inline void arch_write_unlock(arch_rwlock_t*rw){
    asm volatile(
        LOCK_PREFIX"addl %1, %0"//原子对lock加上RW_LOCK_BIAS
        :"+m"(rw->lock):"i"(RW_LOCK_BIAS):"memory");
}
//获取写锁失败时调用
ENTRY(__write_lock_failed)
    //(%eax)表示由eax指向的内存空间是调用者传进来的 
    2:LOCK_PREFIX addl  $ RW_LOCK_BIAS,(%eax)
    1:rep;nop//空指令
    cmpl $RW_LOCK_BIAS,(%eax)
    //不等于初始值则循环比较,相等则表示有进程释放了写锁
    jne   1b
    //执行加写锁
    LOCK_PREFIX subl  $ RW_LOCK_BIAS,(%eax)
    jnz 2b //不为0则继续测试,为0则表示加写锁成功
    ret //返回
ENDPROC(__write_lock_failed)
//获取读锁失败时调用
ENTRY(__read_lock_failed)
    //(%eax)表示由eax指向的内存空间是调用者传进来的 
    2:LOCK_PREFIX incl(%eax)//原子加1
    1:  rep; nop//空指令
    cmpl  $1,(%eax) //和1比较 小于0则
    js 1b //为负则继续循环比较
    LOCK_PREFIX decl(%eax) //加读锁
    js  2b  //为负则继续加1并比较,否则返回
    ret //返回
ENDPROC(__read_lock_failed)
//获取读锁
static inline void arch_read_lock(arch_rwlock_t*rw){
    asm volatile(
        LOCK_PREFIX" subl $1,(%0)\n\t"//原子对lock减1
        "jns 1f\n"//不为小于0则跳转标号1处,表示获取读锁成功
        "call __read_lock_failed\n\t"//调用__read_lock_failed
        "1:\n"
        ::LOCK_PTR_REG(rw):"memory");
}
//获取写锁
static inline void arch_write_lock(arch_rwlock_t*rw){
    asm volatile(
        LOCK_PREFIX"subl %1,(%0)\n\t"//原子对lock减去RW_LOCK_BIAS
        "jz 1f\n"//为0则跳转标号1处
        "call __write_lock_failed\n\t"//调用__write_lock_failed
        "1:\n"
        ::LOCK_PTR_REG(rw),"i"(RW_LOCK_BIAS):"memory");
}

将读写锁的初值设置为0x1000000。获取读锁的时候lock原子减1,不小于0表示读锁获取成功,获取写锁的时候直接减去0x01000000。

这样只要获取锁后的值不小于0就表示成功获取到锁,获取失败则还原。

=======

中断控制

自旋锁

信号量

ddc41c16d92a059f6fa702c4299a59441ec83400