Inside Out

自分自身の備忘録。アウトプット用のブログです。

食事する哲学者問題を C# で解いてみる

これは Sansan Advent Calendar 2020 の17日目の記事です。 adventar.org

アドベントカレンダーとか機会がないとブログとかめったに書かないので、一応例年書いてるし今年も何か書こうかなと思ってた頃に同僚から誘われて The Art of Multiprocessor Programming を読み始めたので、それに関連した記事です。

食事する哲学者問題

この本各章毎に Exercise がついており第1章の最初の Exercise に以下のような問題がついておりました。

以下の状況を想像してほしい。
5人の哲学者がいて彼らは思索を巡らすのと食事をする事だけで生きている。彼らは円卓に座っており、そこには5本の箸が置かれている。
彼らは空腹になると自分に最も近い箸を2本取る。2本の箸が取れればしばしの間食事を始める。食事を終えると箸を円卓に置き、再び思索を巡らせる。

1. これを哲学者をスレッドに、箸を共有オブジェクトに見たててプログラムで表現せよ。なお哲学者が同時に同じ箸を取る事は防がなければならない。
2. 全ての哲学者が箸を1本持って2本目の箸が行きわたらない状況を防ぐようにプログラムを修正せよ。
3. 哲学者が飢えないようにプログラムを修正せよ。

ド文系で職業ITエンジニアやってる自分はこの問題について初めて知りましたが、計算機科学の課程ではほぼ含まれてる「食事する哲学者問題(Dining Philosophers Problem)」として有名な話だそうです。元々は構造化プログラミングで有名なエドガー・ダイクストラが5台のコンピュータからテープ装置への競合アクセスする問題を考案しそれを、クイックソートの考案者のアントニー・ホーアが哲学者の話に変形させたものが広まったものらしいです。今回はまぁCS学習者にとっては常識っぽい問題ですがこれをといてみようかなと思います。

1 の回答

この問題が提示しているのはいわゆる排他制御で発生する問題です。本に書かれてる問題 1. を単にやると状況だと各哲学者が左の箸を持って右の箸を取ろうとするとそれは別の哲学者にとっては左の箸であり、すでに別の哲学者が持っているために取得できず延々と待ち続ける状態になります。いわゆる dead lock 状態となります。以下のコードは 1. を表現するコードですが、実行してみると食事にありつけずに延々と考えつづけます。

namespace DiningPhilosophers
{
    class Program
    {
        static void Main(string[] args)
        {
            var chopsticks = Enumerable.Range(0, 5).Select(x => new Chopstick(x)).ToList();
            var philosophers = new List<Philosopher>();

            for (var i = 0; i < chopsticks.Count; i++)
            {
                var philosopher = i == (chopsticks.Count - 1)
                    ? new Philosopher(i,chopsticks[i], chopsticks[0])
                    : new Philosopher(i, chopsticks[i], chopsticks[i + 1]);
                philosophers.Add(philosopher);
            }

            // Task は ThreadPool の状態に依存して必ず別 Thread での実行である事を保証しないので今回は 生 Thread を使う
            var threads = philosophers.Select(x => new Thread(() =>
            {
                while (true)
                {
                    try
                    {
                        x.Think();

                        x.Eat();
                    }
                    catch (ThreadInterruptedException)
                    {
                        Console.WriteLine("thread interrupted.");
                        break;
                    }
                }
            })).ToArray();

            foreach (var thread in threads)
            {
                thread.Start();
            }
        }
    }

    public class Philosopher
    {
        public Chopstick RightChopstick { get; }
        public Chopstick LeftChopstick { get; }

        public int PhilosopherId { get; }

        public Philosopher(int philosopherId, Chopstick rightChopstick, Chopstick leftChopstick)
        {
            PhilosopherId = philosopherId;
            RightChopstick = rightChopstick;
            LeftChopstick = leftChopstick;
        }

        public void Think()
        {
            Console.WriteLine($"Philosopher({PhilosopherId}) is thinking...");
            Thread.Sleep(TimeSpan.FromMilliseconds(1000));
        }

        public void Eat()
        {
            while (RightChopstick.InUse)
            {
                Think();
            }
            
            RightChopstick.Use(PhilosopherId);

            while (LeftChopstick.InUse)
            {
                Think();
            }

            LeftChopstick.Use(PhilosopherId);

            Console.WriteLine($"Philosopher({PhilosopherId}) is eating...");

            Thread.Sleep(TimeSpan.FromMilliseconds(1000));

            RightChopstick.ReturnToTable();
            LeftChopstick.ReturnToTable();
        }
    }

    public class Chopstick
    {
        public int ChopstickId { get; }

        public bool InUse => UserId.HasValue;

        private int? UserId { get; set; }

        public Chopstick(int chopstickId)
        {
            ChopstickId = chopstickId;
        }

        public void Use(int userId)
        {
            UserId = userId;
        }

        public void ReturnToTable()
        {
            UserId = null;
        }
    }
}

2. の回答

  1. を達成すべく解法はいくつかあるみたいですが、C# での Monitor とほぼ同等の lock 構文よって箸を持って食事が終わるまで他の哲学者は待つようにしてみました。このように dead lock が生じない特性の事を dead lock free と呼びます。
namespace DiningPhilosophers
{
    class Program
    {
        private static object LockObject { get; } = new object();

        static void Main(string[] args)
        {
            // 1. で記載ある部分は省略

            var threads = philosophers.Select(x => new Thread(() =>
            {
                while (true)
                {
                    try
                    {
                        x.Think();
                        // Eat 中は他の哲学者をブロックする
                        lock (LockObject)
                        {
                            x.Eat();
                        }
                    }
                    catch (ThreadInterruptedException)
                    {
                        Console.WriteLine("thread interrupted.");
                        break;
                    }
                }
            })).ToArray();

            // 1. で記載ある部分は省略
        }
    }

    // Philosopher と Chopstick は変更がないので省略
}

こちらを実行すると哲学者は1本目を持つ前に他の哲学者が食事をしていたら待つ状態になり dead lock は発生しなくなります。一方で、タイミング次第ですが食事ができる哲学者が偏る問題が発生します。例えばある哲学者Aが食事をしている間に哲学者Bが待っていて、Aが食事が終わった後にすぐにCが食事をはじめ、それも完了後にDやAが食事を始めてしまうみたいな状況が繰り返されるとBはなかなか食事にありつけない状態になります。このように処理に必要なリソース状態は食事できない事に引っ掛けていわゆる starvation と呼ばれます。

3. の回答

  1. への解法ですが、食事回数を見て均等に優先的に食事が可能なように食事回数が少ない場合に優先順位を振り分けてやります。
namespace DiningPhilosophers
{
    class Program
    {
        private static object LockObject { get; } = new object();

        static void Main(string[] args)
        {
            var chopsticks = Enumerable.Range(0, 5).Select(x => new Chopstick(x)).ToList();
            var philosophers = new List<Philosopher>();

            for (var i = 0; i < chopsticks.Count; i++)
            {
                var philosopher = i == (chopsticks.Count - 1)
                    ? new Philosopher(i, chopsticks[i], chopsticks[0])
                    : new Philosopher(i, chopsticks[i], chopsticks[i + 1]);
                philosophers.Add(philosopher);
            }

            var observer = new Observer(philosophers);

            // Task は ThreadPool の状態に依存して必ず別 Thread での実行である事を保証しないので今回は 生 Thread を使う
            var threads = philosophers.Select(x => new Thread(() =>
            {
                while (true)
                {
                    try
                    {
                        x.Think();

                        // observer が食事回数を監視して食ってない哲学者を優先する
                        if (!observer.CanPhilosopherEat(x.PhilosopherId)) continue;
                        
                        lock (LockObject)
                        {
                            x.Eat();
                        }

                    }
                    catch (ThreadInterruptedException)
                    {
                        Console.WriteLine("thread interrupted.");
                        break;
                    }
                }
            })).ToArray();

            foreach (var thread in threads)
            {
                thread.Start();
            }

            Thread.Sleep(10000);

            foreach (var thread in threads)
            {
                thread.Interrupt();
            }

            foreach (var philosopher in philosophers)
            {
                Console.WriteLine($"Philosopher({philosopher.PhilosopherId}) {philosopher.EatCount} times eating.");
            }
        }
    }

    public class Observer
    {
        private IDictionary<int, Philosopher> Philosophers { get; }

        public Observer(IEnumerable<Philosopher> philosophers)
        {
            Philosophers = philosophers.ToDictionary(x => x.PhilosopherId, y => y);
        }

        public bool CanPhilosopherEat(int philosopherId)
        {
            var philosopherEatCount = Philosophers[philosopherId].EatCount;

            var minimumPhilosopherEatCount = Philosophers.Values.Min(x => x.EatCount);

            return philosopherEatCount <= minimumPhilosopherEatCount;
        }
    }

    // 変更分だけ記載
    public class Philosopher
    {
        // 食事回数を追加
        public int EatCount { get; private set; } = 0;

        public void Eat()
        {
            while (RightChopstick.InUse)
            {
                Think();
            }

            RightChopstick.Use(PhilosopherId);

            while (LeftChopstick.InUse)
            {
                Think();
            }

            LeftChopstick.Use(PhilosopherId);

            Console.WriteLine($"Philosopher({PhilosopherId}) is eating...");
            Thread.Sleep(TimeSpan.FromMilliseconds(100));

            EatCount +=1;

            RightChopstick.ReturnToTable();
            LeftChopstick.ReturnToTable();
        }
    }

これで食事の回数が大体均等になるような結果がでるようになります。このように starvation が発生しない状況を starvation free といいます。starvation は deadlock によっても引き起こされる現象です。そのため starvation free は deadlock free も内包する性質であるとも言えます。排他制御はこのような deadlock や starvation を気にしながら実装しないと問題を引き起こすはめになります。

所感

実際このような排他制御は複雑でデバッグも面倒なので、業務コードでは可能な限りしないように設計するのがベターなのであまり本格的に取り組んだ事がなかったのですが、去年ちょっとだけ扱う必要があり、少しだけ勉強したのを思い出したのと本を同僚から教えてもらってせっかく買ったので、年末はこの辺の内容を覚えていこうかなと思ってます。

参考

ja.wikipedia.org