mutex的简单介绍

http://www.itjxue.com  2023-01-16 03:10  来源:未知  点击次数: 

[RT-Thread]互斥锁(mutex)

和 semaphore 一样, mutex 在RTT中也归类为 ipc ( ipc 应该是进程间通信,感觉命名是不是有点不贴切)。

mutex 用于资源互斥的场景,比如多个线程可能同时访问(R/W)同一个全局变量,这个时候,就需要加锁控制。

仍然主要关注 mutex控制块 、 take 和 release 操作。

parent :

和其它 ipc 方式相同,主要是维护 suspend_thread 。

value :

value其实就两个值:0或者1,1表示已被持有,0表示空闲状态。

original_priority :

保存持有该mutex的thread的本来的线程优先级,便于优先级继承后,恢复原本线程优先级。

hold :

hold 记录被同一thread请求的次数,嵌套场景。

owner :

持有该mutex的thread。

Linux 之mutex 源码分析

?mutex相关的函数并不是linux kernel实现的,而是glibc实现的,源码位于nptl目录下。

首先说数据结构:

typedef union

{

? struct

? {

??? int __lock;

??? unsigned int __count;

??? int __owner;

??? unsigned int __nusers;

??? /* KIND must stay at this position in the structure to maintain

?????? binary compatibility.? */

??? int __kind;

??? int __spins;

? } __data;

? char __size[__SIZEOF_PTHREAD_MUTEX_T];

? long int __align;

} pthread_mutex_t;

?int __lock;? 资源竞争引用计数

?int __kind; 锁类型,init 函数中mutexattr 参数传递,该参数可以为NULL,一般为 PTHREAD_MUTEX_NORMAL

结构体其他元素暂时不了解,以后更新。

/*nptl/pthread_mutex_init.c*/

int

__pthread_mutex_init (mutex, mutexattr)

???? pthread_mutex_t *mutex;

???? const pthread_mutexattr_t *mutexattr;

{

? const struct pthread_mutexattr *imutexattr;

? assert (sizeof (pthread_mutex_t) = __SIZEOF_PTHREAD_MUTEX_T);

? imutexattr = (const struct pthread_mutexattr *) mutexattr ?: default_attr;

? /* Clear the whole variable.? */

? memset (mutex, '\0', __SIZEOF_PTHREAD_MUTEX_T);

? /* Copy the values from the attribute.? */

? mutex-__data.__kind = imutexattr-mutexkind ~0x80000000;

? /* Default values: mutex not used yet.? */

? // mutex-__count = 0;????????already done by memset

? // mutex-__owner = 0;????????already done by memset

? // mutex-__nusers = 0;????????already done by memset

? // mutex-__spins = 0;????????already done by memset

? return 0;

}

init函数就比较简单了,将mutex结构体清零,设置结构体中__kind属性。

/*nptl/pthread_mutex_lock.c*/

int

__pthread_mutex_lock (mutex)

???? pthread_mutex_t *mutex;

{

? assert (sizeof (mutex-__size) = sizeof (mutex-__data));

? pid_t id = THREAD_GETMEM (THREAD_SELF, tid);

? switch (__builtin_expect (mutex-__data.__kind, PTHREAD_MUTEX_TIMED_NP))

??? {

???? …

??? default:

????? /* Correct code cannot set any other type.? */

??? case PTHREAD_MUTEX_TIMED_NP:

??? simple:

????? /* Normal mutex.? */

????? LLL_MUTEX_LOCK (mutex-__data.__lock);

????? break;

??…

??}

? /* Record the ownership.? */

? assert (mutex-__data.__owner == 0);

? mutex-__data.__owner = id;

#ifndef NO_INCR

? ++mutex-__data.__nusers;

#endif

? return 0;

}

该函数主要是调用LLL_MUTEX_LOCK, 省略部分为根据mutex结构体__kind属性不同值做些处理。

宏定义函数LLL_MUTEX_LOCK最终调用,将结构体mutex的__lock属性作为参数传递进来

#define __lll_mutex_lock(futex)????????????????????????????????????????????????\

? ((void) ({????????????????????????????????????????????????????????????????\

??? int *__futex = (futex);????????????????????????????????????????????????\

??? if (atomic_compare_and_exchange_bool_acq (__futex, 1, 0) != 0)????????\

????? __lll_lock_wait (__futex);????????????????????????????????????????\

? }))

atomic_compare_and_exchange_bool_acq (__futex, 1, 0)宏定义为:

#define atomic_compare_and_exchange_bool_acq(mem, newval, oldval) \

? ({ __typeof (mem) __gmemp = (mem);????????????????????????????????????? \

???? __typeof (*mem) __gnewval = (newval);????????????????????????????? \

????? \

???? *__gmemp == (oldval) ? (*__gmemp = __gnewval, 0) : 1; })

这个宏实现的功能是:

如果mem的值等于oldval,则把newval赋值给mem,放回0,否则不做任何处理,返回1.

由此可以看出,当mutex锁限制的资源没有竞争时,__lock 属性被置为1,并返回0,不会调用__lll_lock_wait (__futex); 当存在竞争时,再次调用lock函数,该宏不做任何处理,返回1,调用__lll_lock_wait (__futex);

void

__lll_lock_wait (int *futex)

{

? do

??? {

????? int oldval = atomic_compare_and_exchange_val_acq (futex, 2, 1);

????? if (oldval != 0)

lll_futex_wait (futex, 2);

??? }

? while (atomic_compare_and_exchange_bool_acq (futex, 2, 0) != 0);

}

atomic_compare_and_exchange_val_acq (futex, 2, 1); 宏定义:

/* The only basic operation needed is compare and exchange.? */

#define atomic_compare_and_exchange_val_acq(mem, newval, oldval) \

? ({ __typeof (mem) __gmemp = (mem);????????????????????????????????????? \

???? __typeof (*mem) __gret = *__gmemp;????????????????????????????????????? \

???? __typeof (*mem) __gnewval = (newval);????????????????????????????? \

????? \

???? if (__gret == (oldval))????????????????????????????????????????????? \

?????? *__gmemp = __gnewval;????????????????????????????????????????????? \

???? __gret; })

这个宏实现的功能是,当mem等于oldval时,将mem置为newval,始终返回mem原始值。

此时,futex等于1,futex将被置为2,并且返回1. 进而调用

lll_futex_wait (futex, 2);

#define lll_futex_timed_wait(ftx, val, timespec)????????????????????????\

({????????????????????????????????????????????????????????????????????????\

?? DO_INLINE_SYSCALL(futex, 4, (long) (ftx), FUTEX_WAIT, (int) (val),????????\

???? (long) (timespec));????????????????????????????????\

?? _r10 == -1 ? -_retval : _retval;????????????????????????????????????????\

})

该宏对于不同的平台架构会用不同的实现,采用汇编语言实现系统调用。不过确定的是调用了Linux kernel的futex系统调用。

futex在linux kernel的实现位于:kernel/futex.c

SYSCALL_DEFINE6(futex, u32 __user *, uaddr, int, op, u32, val,

struct timespec __user *, utime, u32 __user *, uaddr2,

u32, val3)

{

struct timespec ts;

ktime_t t, *tp = NULL;

u32 val2 = 0;

int cmd = op FUTEX_CMD_MASK;

if (utime (cmd == FUTEX_WAIT || cmd == FUTEX_LOCK_PI ||

????? cmd == FUTEX_WAIT_BITSET ||

????? cmd == FUTEX_WAIT_REQUEUE_PI)) {

if (copy_from_user(ts, utime, sizeof(ts)) != 0)

return -EFAULT;

if (!timespec_valid(ts))

return -EINVAL;

t = timespec_to_ktime(ts);

if (cmd == FUTEX_WAIT)

t = ktime_add_safe(ktime_get(), t);

tp = t;

}

/*

?* requeue parameter in 'utime' if cmd == FUTEX_*_REQUEUE_*.

?* number of waiters to wake in 'utime' if cmd == FUTEX_WAKE_OP.

?*/

if (cmd == FUTEX_REQUEUE || cmd == FUTEX_CMP_REQUEUE ||

??? cmd == FUTEX_CMP_REQUEUE_PI || cmd == FUTEX_WAKE_OP)

val2 = (u32) (unsigned long) utime;

return do_futex(uaddr, op, val, tp, uaddr2, val2, val3);

}

futex具有六个形参,pthread_mutex_lock最终只关注了前四个。futex函数对参数进行判断和转化之后,直接调用do_futex。

long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout,

u32 __user *uaddr2, u32 val2, u32 val3)

{

int clockrt, ret = -ENOSYS;

int cmd = op FUTEX_CMD_MASK;

int fshared = 0;

if (!(op FUTEX_PRIVATE_FLAG))

fshared = 1;

clockrt = op FUTEX_CLOCK_REALTIME;

if (clockrt cmd != FUTEX_WAIT_BITSET cmd != FUTEX_WAIT_REQUEUE_PI)

return -ENOSYS;

switch (cmd) {

case FUTEX_WAIT:

val3 = FUTEX_BITSET_MATCH_ANY;

case FUTEX_WAIT_BITSET:

ret = futex_wait(uaddr, fshared, val, timeout, val3, clockrt);

break;

???????? …

default:

ret = -ENOSYS;

}

return ret;

}

省略部分为对其他cmd的处理,pthread_mutex_lock函数最终传入的cmd参数为FUTEX_WAIT,所以在此只关注此分之,分析futex_wait函数的实现。

static int futex_wait(u32 __user *uaddr, int fshared,

????? u32 val, ktime_t *abs_time, u32 bitset, int clockrt)

{

struct hrtimer_sleeper timeout, *to = NULL;

struct restart_block *restart;

struct futex_hash_bucket *hb;

struct futex_q q;

int ret;

?????????? … … //delete parameters check and convertion

retry:

/* Prepare to wait on uaddr. */

ret = futex_wait_setup(uaddr, val, fshared, q, hb);

if (ret)

goto out;

/* queue_me and wait for wakeup, timeout, or a signal. */

futex_wait_queue_me(hb, q, to);

… … //other handlers

return ret;

}

futex_wait_setup 将线程放进休眠队列中,

futex_wait_queue_me(hb, q, to);将本线程休眠,等待唤醒。

唤醒后,__lll_lock_wait函数中的while (atomic_compare_and_exchange_bool_acq (futex, 2, 0) != 0); 语句将被执行,由于此时futex在pthread_mutex_unlock中置为0,所以atomic_compare_and_exchange_bool_acq (futex, 2, 0)语句将futex置为2,返回0. 退出循环,访问用户控件的临界资源。

/*nptl/pthread_mutex_unlock.c*/

int

internal_function attribute_hidden

__pthread_mutex_unlock_usercnt (mutex, decr)

???? pthread_mutex_t *mutex;

???? int decr;

{

? switch (__builtin_expect (mutex-__data.__kind, PTHREAD_MUTEX_TIMED_NP))

??? {

???… …

??? default:

????? /* Correct code cannot set any other type.? */

??? case PTHREAD_MUTEX_TIMED_NP:

??? case PTHREAD_MUTEX_ADAPTIVE_NP:

????? /* Normal mutex.? Nothing special to do.? */

????? break;

??? }

? /* Always reset the owner field.? */

? mutex-__data.__owner = 0;

? if (decr)

??? /* One less user.? */

??? --mutex-__data.__nusers;

? /* Unlock.? */

? lll_mutex_unlock (mutex-__data.__lock);

? return 0;

}

省略部分是针对不同的__kind属性值做的一些处理,最终调用 lll_mutex_unlock。

该宏函数最终的定义为:

#define __lll_mutex_unlock(futex)????????????????????????\

? ((void) ({????????????????????????????????????????????????\

??? int *__futex = (futex);????????????????????????????????\

??? int __val = atomic_exchange_rel (__futex, 0);????????\

\

??? if (__builtin_expect (__val 1, 0))????????????????\

????? lll_futex_wake (__futex, 1);????????????????????????\

? }))

atomic_exchange_rel (__futex, 0);宏为:

#define atomic_exchange_rel(mem, value) \

? (__sync_synchronize (), __sync_lock_test_and_set (mem, value))

实现功能为:将mem设置为value,返回原始mem值。

__builtin_expect (__val 1, 0) 是编译器优化语句,告诉编译器期望值,也就是大多数情况下__val 1 ?是0,其逻辑判断依然为if(__val 1)为真的话执行 lll_futex_wake。

现在分析,在资源没有被竞争的情况下,__futex 为1,那么返回值__val则为1,那么 lll_futex_wake (__futex, 1);????????不会被执行,不产生系统调用。 当资源产生竞争的情况时,根据对pthread_mutex_lock 函数的分析,__futex为2, __val则为2,执行 lll_futex_wake (__futex, 1); 从而唤醒等在临界资源的线程。

lll_futex_wake (__futex, 1); 最终会调动同一个系统调用,即futex, 只是传递的cmd参数为FUTEX_WAKE。

在linux kernel的futex实现中,调用

static int futex_wake(u32 __user *uaddr, int fshared, int nr_wake, u32 bitset)

{

struct futex_hash_bucket *hb;

struct futex_q *this, *next;

struct plist_head *head;

union futex_key key = FUTEX_KEY_INIT;

int ret;

if (!bitset)

return -EINVAL;

ret = get_futex_key(uaddr, fshared, key);

if (unlikely(ret != 0))

goto out;

hb = hash_futex(key);

spin_lock(hb-lock);

head = hb-chain;

plist_for_each_entry_safe(this, next, head, list) {

if (match_futex (this-key, key)) {

if (this-pi_state || this-rt_waiter) {

ret = -EINVAL;

break;

}

/* Check if one of the bits is set in both bitsets */

if (!(this-bitset bitset))

continue;

wake_futex(this);

if (++ret = nr_wake)

break;

}

}

spin_unlock(hb-lock);

put_futex_key(fshared, key);

out:

return ret;

}

该函数遍历在该mutex上休眠的所有线程,调用wake_futex进行唤醒,

static void wake_futex(struct futex_q *q)

{

struct task_struct *p = q-task;

/*

?* We set q-lock_ptr = NULL _before_ we wake up the task. If

?* a non futex wake up happens on another CPU then the task

?* might exit and p would dereference a non existing task

?* struct. Prevent this by holding a reference on p across the

?* wake up.

?*/

get_task_struct(p);

plist_del(q-list, q-list.plist);

/*

?* The waiting task can free the futex_q as soon as

?* q-lock_ptr = NULL is written, without taking any locks. A

?* memory barrier is required here to prevent the following

?* store to lock_ptr from getting ahead of the plist_del.

?*/

smp_wmb();

q-lock_ptr = NULL;

wake_up_state(p, TASK_NORMAL);

put_task_struct(p);

}

wake_up_state(p, TASK_NORMAL);? 的实现位于kernel/sched.c中,属于linux进程调度的技术。

Mutex和信号量的区别

mutex的设计目的是"持有后快速释放",也就是说如果一个竞争者在获取失败后,会spin几个循环后再尝试,如果仍然失败,则进入睡眠,这相当于semaphore获取失败后直接睡眠,多了一个spin过程,所以如果每个mutex在持有后又很快释放,那么就不存在CPU的唤醒过程。这显然比semaphore快很多。具体参考

深入理解mutex工作机制

可以认为是一种更加轻量级的latch, 两者的区别在于:mutex是内存结构本身所包含,随着内存的分配而分配、销毁而销毁,latch是一种独立的内存结构。基于这个特性,mutex保护的内存结构更细粒度,并发性、支持性要好于latch.

与shared latch一样可使mutex用cas模式申请latch. 因此也可以把mutex当成一种shared latch, 只是其粒度更细,更加轻量级。mutex采用spin+FIRO策略,而shared latch采用spin+FIFO策略。更多详细关于mutex的原理可以参见:

从10.2.0.2版本开始,oracle的mutex机制逐步替代了传统的library cache的一些机制。

#### 10.2.0.5 ####

#### 11.2.0.4 ####

不同版本中,对于library cache的latch变化较大,特别是从Oracle 10gR2(10.2.0.2)开始,引入了mutex来替代cursor的latch. 在该版本上通过隐含参数_kks_use_mutex_pin的调整可以限制是否使用Mutex机制来实现Cursor Pin.

mutex与latch的对应关系如下:

以下参数在11.2.0.2以上才有

由于mutex采用了传统的spin+sleep机制而不是latch的spin+post机制,所以sleep time的设置对于mutex性能具有重要地位,一般来说由于mutex的冲突远比latch要小,所以合适的yield和更小的sleep选择是恰当的。一般情况不需要改动,默认都是1ms.

不同模式的比较可以参考:

不包含sleep, 只在简单yield之后进行不断的mutex spin. 显然,在冲突不发生的情况下会有最好的性能,但是在有冲突的时候可能会消耗很高的CPU资源。

对10g模式的一种修正,增加了sleep部分以降低CPU消耗,sleep时间收到参数_first_spare_parameter的控制

在Solaris SPARC 10.2.0.4这个参数是_second_spare_parameter.

mutex的spin次数受mutex_spin_count控制,默认为255. 该参数可以动态修改。

某些时候,该值可能需要适当增大来获取更好的性能。设置方法可参考latch的spin count设置方法。v$mutex_sleep_history视图包含相关的gets和sleep信息。

关于mutex_spin_conut设置对mutex响应性能的影响可参考:

(责任编辑:IT教学网)

更多

推荐金山WPS文章