前言

  • 什么是死锁
  • 死锁发生的四个必要条件
  • 破坏死锁发生的条件预防死锁
  • 小结
  •  

    前言

    我们使用加锁机制来保证线程安全,但是如果过度地使用加锁,则可能会导致死锁。下面将介绍关于死锁的相关知识以及我们在编写程序时如何预防死锁。

    什么是死锁

    学习操作系统时,给出死锁的定义为两个或两个以上的线程在执行过程中,由于竞争资源而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。简化一点说就是:一组相互竞争资源的线程因为互相等待,导致“永久”阻塞的现象

    下面我们通过一个转账例子来深入理解死锁。

    class Account {
        private int balance;
        // 转账
        void transfer(Account target, int amt){
            if (this.balance > amt) {
                this.balance -= amt;
                target.balance += amt;
            }
        } 
    }
    

    为了使以上转账方法transfer()不存在并发问题,很快地我们可以想使用Java的synchronized修饰transfer方法,于是代码如下:

    class Account {
        private int balance;
        // 转账
        synchronized void transfer(Account target, int amt){
            if (this.balance > amt) {
                this.balance -= amt;
                target.balance += amt;
            }
        } 
    }
    

    需要注意,这里我们使用的内置锁是this,这把锁虽然可以保护我们自己的balance,却不可以保护target的balance。使用我们上一篇介绍的锁模型来描绘这个代码就是下面这样:(图来自参考[1])

    更具体来说,假设有 A、B、C 三个账户,余额都是 200 元,我们用两个线程分别执行两个转账操作:账户 A 转给账户 B 100 元,账户 B 转给账户 C 100 元,最后我们期望的结果应该是账户 A 的余额是 100 元,账户 B 的余额是 200 元, 账户 C 的余额是 300 元。
    如果有两个线程1和线程2,线程1 执行账户 A 转账户 B 的操作,线程2执行账户 B 转账户 C 的操作。这两个线程分别运行在两颗的CPU上,由于this这个锁只能保护自己的balance而不能保护别人的,线程 1 锁定的是账户 A 的实例(A.this),而线程 2 锁定的是账户 B 的实例(B.this),所以这两个线程可以同时进入临界区 transfer(),因此两个线程没有实现互斥。
    出现可能的结果就为,两个线程同时读到账户B的余额为200元,导致最终账户 B 的余额可能是 300(线程 1 后于线程 2 写 B.balance,线程 2 写的 B.balance 值被线程 1 覆盖),可能是 100(线程 1 先于线程 2 写 B.balance,线程 1 写的 B.balance 值被线程 2 覆盖)。在这种情况下,账户B的余额就不会是 200。
    并发转账示意图(图来自参考[1])

    于是我们应该使用一个能够覆盖所有保护资源的锁,如果还记得我们上一篇讲synchronized修饰静态方法时默认的锁对象的话,那这里就很容易解决了。这个默认的锁就是类的class对象。于是,我们就可以使用Account.class作为一个可以保护这个转账过程的锁。

    class Account {
        private int balance;
        // 转账
        void transfer(Account target, int amt){
            synchronized(Account.class) {
                if (this.balance > amt) {
                    this.balance -= amt;
                    target.balance += amt;
                }
            }
        } 
    }
    

    这个方案虽然不存在并发问题,但是所有账户的转账操作都是串行的。现实世界中,账户 A 转账户 B、账户 C 转账户 D 这两个转账操作现实世界里是可以并行的。较于实际情况来说,这个方案就显得性能太差。

    于是,我们尽量模仿现实世界的转账操作:
    每个账户都有一个账本,这些账本都统一存放在文件架上。当转账A给账户B转账时,柜员会去拿A账本和B账本做登记,此时柜员在拿账本时会遇到三种情况:

    1. 文件架上恰好有A账本和B账本,那就同时拿走;
    2. 如果文件架上只有A账本和B账本之一,那这个柜员就先把文件架上有的账本拿到手,同时等着其他柜员把另外一个账本送回来;
    3. A账本和B账本都没有,那这个柜员就等着两个账本都被送回来

    在编程实现中,我们可以使用两把锁来实现这个过程。在 transfer() 方法内部,我们首先尝试锁定转出账户 this(先把A账本拿到手),然后尝试锁定转入账户 target(再把B账本拿到手),只有当两者都成功时,才执行转账操作。
    这个逻辑可以图形化为下图这个样子,(图来自参考[1]):

    代码如下:

    class Account {
        private int balance;
        // 转账
        void transfer(Account target, int amt){
            // 锁定转出账户A
            synchronized(this) {              
                // 锁定转入账户B
                synchronized(target) {           
                    if (this.balance > amt) {
                        this.balance -= amt;
                        target.balance += amt;
                    }
                }
            }
        } 
    }
    

    经过这样的优化后,账户 A 转账户 B 和账户 C 转账户 D 这两个转账操作就可以并行了。

    但是这样却会导致死锁。例如情况:柜员张三做账户A转账户B的转账操作,柜员李四做账户B转账户C的转账操作。他们两个同时操作,于是就会出现下面这种情形:(图来自参考[1])

    他俩会一直等待对方将账本放到文件架上,造成一个一直僵持的局势。

    关于这种现象,我们还可以借助资源分配图来可视化锁的占用情况(资源分配图是个有向图,它可以描述资源和线程的状态)。其中,资源用方形节点表示,线程用圆形节点表示;资源中的点指向线程的边表示线程已经获得该资源,线程指向资源的边则表示线程请求资源,但尚未得到。(图来自参考[1])

    Java并发程序一旦死锁,一般没有特别好的方法,恢复应用程序的唯一方式就是中止并重启。因此,我们要尽量避免死锁的发生,最好不要产生死锁。要知道如何才能做到不要产生死锁,我们首先要知道什么条件会发生死锁。

    死锁发生的四个必要条件

    虽然进程在运行过程中,可能发生死锁,但死锁的发生也必须具备一定的条件,死锁的发生必须具备以下四个必要条件:

    • 互斥,共享资源 X 和 Y 只能被一个线程占用;
    • 占有且等待,线程 T1 已经取得共享资源 X,在等待共享资源 Y 的时候,不释放共享资源 X;
    • 不可抢占,其他线程不能强行抢占线程 T1 占有的资源;
    • 循环等待,线程 T1 等待线程 T2 占有的资源,线程 T2 等待线程 T1 占有的资源,就是循环等待。

    破坏死锁发生的条件预防死锁

    只有这四个条件都发生时才会出现死锁,那么反过来,也就是说只要我们破坏其中一个,就可以成功预防死锁的发生

    四个条件中我们不能破坏互斥,因为我们使用锁目的就是保证资源被互斥访问,于是我们就对其他三个条件进行破坏:

    • 占用且等待:一次性申请所有的资源,这样就不存在等待了。
    • 不可抢占,占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源。
    • 循环等待,靠按序申请资源来预防。所谓按序申请,是指资源是有线性顺序的,申请的时候可以先申请资源序号小的,再申请资源序号大的,这样线性化申请后就不存在循环了。

    下面我们使用这些方法去解决如上的死锁问题。

    破坏占用且等待条件

    一次性申请完所有资源。我们设置一个管理员来管理账本,柜员同时申请需要的账本,而管理员同时出他们需要的账本。如果不能同时出借,则柜员就需要等待。

    “同时申请”:这个操作是一个临界区,含有两个操作,同时申请资源apply()和同时释放资源free()。

    class Allocator {
        private List<Object> als = new ArrayList<>();
        // 一次性申请所有资源
        synchronized boolean apply( Object from, Object to){
            if(als.contains(from) || als.contains(to)){    //from 或者 to账户被其他线程拥有
                return false;  
            } else {
                als.add(from);
                als.add(to);  
            }
            return true;
        }
        // 归还资源
        synchronized void free(Object from, Object to){
            als.remove(from);
            als.remove(to);
        }
    }
    
    class Account {
        // actr 应该为单例,只能由一个人来分配资源
        private Allocator actr;
        private int balance;
        // 转账
        void transfer(Account target, int amt){
            // 一次性申请转出账户和转入账户,直到成功
            while(!actr.apply(this, target))  //最好可以加个timeout避免一直循环try{
                    // 锁定转出账户
                    synchronized(this){ //存在客户对自己账户的操作
                        // 锁定转入账户
                        synchronized(target){           
                            if (this.balance > amt){
                                this.balance -= amt;
                                target.balance += amt;
                            }
                        }
                    }
                } finally {
                    actr.free(this, target)    //释放资源
                }
        }
    }
    

    破坏不可抢占条件

    破坏不抢占要能够主动释放它占有的资源,但synchronized是做不到的。原因为synchronized申请不到资源时,线程直接进入了阻塞状态,而线程进入了阻塞状态也就没有办法释放它占有的资源了。不过SDK中的java.util.concurrent提供了Lock解决这个问题。

    支持定时的锁

    显式使用Lock类中的定时tryLock功能来代替内置锁机制,可以检测死锁和从死锁中恢复过来。使用内置锁的线程获取不到锁会被阻塞,而显式锁可以指定一个超时时限(Timeout),在等待超过该时间后tryLock就会返回一个失败信息,也会释放其拥有的资源。

    破坏循环等待条件

    破坏这个条件,需要对资源进行排序,然后按序申请资源。我们假设每个账户都有不同的属性 id,这个 id 可以作为排序字段,申请的时候,我们可以按照从小到大的顺序来申请。
    比如下面代码中,①~⑤处的代码对转出账户(this)和转入账户(target)排序,然后按照序号从小到大的顺序锁定账户。这样就不存在“循环”等待了。

    class Account {
        private int id;
        private int balance;
        // 转账
        void transfer(Account target, int amt){
            Account left = this            // ①
                Account right = target;    // ②
            if (this.id > target.id) {     // ③
                left = target;             // ④
                right = this;              // ⑤
            }                          
            // 锁定序号小的账户
            synchronized(left){
                // 锁定序号大的账户
                synchronized(right){ 
                    if (this.balance > amt){
                        this.balance -= amt;
                        target.balance += amt;
                    }
                }
            }
        } 
    }
    

    小结

    记得学习操作系统时还有避免死锁,其和预防死锁的区别在于:预防死锁是设法至少破坏产生死锁的四个必要条件之一,严格地防止死锁的出现,但是这也会使系统性能降低;而避免死锁则不那么严格的限制产生死锁的必要条件的存在,因为即使死锁的必要条件存在,也不一定发生死锁,死锁避免是在系统运行过程中注意避免死锁的最终发生。避免死锁的经典算法就是银行家算法,这里就不扩开介绍了。

    还有一个避免出现死锁的结论:如果所有线程以固定顺序来获得锁,那么在程序中就不会出现锁顺序死锁问题。查看参考[4]理解。

    我们使用细粒度锁锁住多个资源时,要注意死锁的产生。只有先嗅到死锁的味道,才有我们的施展之地。

    参考:
    [1]极客时间专栏王宝令《Java并发编程实战》
    [2]Brian Goetz.Tim Peierls. et al.Java并发编程实战[M].北京:机械工业出版社,2016
    [3]iywwuyifan.避免死锁和预防思索的区别.https://blog.csdn.net/masterchiefcc/article/details/83303813
    [4]AddoilDan.死锁面试题(什么是死锁,产生死锁的原因及必要条件).https://blog.csdn.net/hd12370/article/details/82814348

    点赞(0)

    评论列表 共有 0 条评论

    暂无评论
    立即
    投稿
    发表
    评论
    返回
    顶部