c# channel

C# 2022. 3. 15. 09:47

 

 

 

 

 

 

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;

namespace test
{
    /// <summary>
    /// 채널 스레드 관리 정보
    /// </summary>
    public class ChannelManagementInfo
    {
        // 큐 처리 계산
        private long _enqueueCount; // enqueue 된 수
        private long _dequeueCount; // dequeue 된 수

        public ulong Count { get { return (ulong)_enqueueCount - (ulong)_dequeueCount; } }
        public ulong EnqueueCount { get { return (ulong)_enqueueCount; } }
        public ulong DequeueCount { get { return (ulong)_dequeueCount; } }


        public void CountEnqueue()
        {
            _ = Interlocked.Increment(ref _enqueueCount);
        }

        public void CountDequeue()
        {
            _ = Interlocked.Increment(ref _dequeueCount);
        }
    }


    /// <summary>
    /// channel 에 대한 worker thread
    /// 상속하여 Run() 구현하여 사용
    /// </summary>
    /// <typeparam name="ItemT">ItemT</typeparam>
    public abstract class BaseChannelThread<ItemT>
    {
        public string ThreadName { get { return _thread.Name; } }

        public Channel<ItemT> Channel { get; set; }
        public ChannelManagementInfo Info { get; set; }

        private readonly Thread _thread;
        private static int _seq = 0;

        private readonly Stopwatch stopwatch = new Stopwatch();
        private ItemT consumeItem;

        public BaseChannelThread(string threadName = null)
        {
            _thread = new Thread(new ThreadStart(RunInThread))
            {
                Name = threadName ?? $"ct-{Interlocked.Increment(ref _seq)}"
            };
        }


        public void Start()
        {
            _thread.Start();
        }

        private void RunInThread()
        {
            while (Channel.Reader.WaitToReadAsync().AsTask().Result)
            {
                while (Channel.Reader.TryRead(out consumeItem))
                {
                    Info.CountDequeue();

                    stopwatch.Start();

                    // 로직 수행
                    Loop(consumeItem);

                    stopwatch.Reset();
                }
            }
        }


        public abstract void Loop(ItemT item);


        public bool StopAndWait(int msec = 3000)
        {

            int waitTimeMsec = 100;

            for (int i = 0; i < msec; i += waitTimeMsec)
            {
                if (!_thread.IsAlive)
                {
                    return true;
                }

                Thread.Sleep(waitTimeMsec);
            }

            return false;
        }


        // Run() 실행 후 경과 시간(msec)
        protected long GetElasedTimeMsec()
        {
            return stopwatch.ElapsedMilliseconds;
        }
    }

    /// <summary>
    /// BaseChannelThreadManager
    /// </summary>
    /// <typeparam name="ItemT">ItemT</typeparam>
    public class BaseChannelThreadManager<ItemT>
    {
        protected Channel<ItemT> CreateChannel(int channelCapacity)
        {
            if (channelCapacity == 0)
            {
                // channel capacity 무제한. OOM 가능성이 있다.
                return Channel.CreateUnbounded<ItemT>(
                    new UnboundedChannelOptions()
                );
            }
            else
            {
                // channel capacity 를 지정하고 모자란 경우 wait 하게 된다.
                return Channel.CreateBounded<ItemT>(
                    new BoundedChannelOptions(channelCapacity)
                );
            }
        }
    }


    /// <summary>
    /// 하나의 channel, 여러개의 worker thread 사용.
    /// </summary>
    /// <typeparam name="ThreadT">ThreadT</typeparam>
    /// <typeparam name="ItemT">ItemT</typeparam>
    public class ChannelThreadsManager<ThreadT, ItemT> : BaseChannelThreadManager<ItemT> where ThreadT : BaseChannelThread<ItemT>
    {
        private readonly Channel<ItemT> channel;
        private readonly List<ThreadT> threads = new List<ThreadT>();

        public ChannelManagementInfo Info { get; private set; } = new ChannelManagementInfo();


        public ChannelThreadsManager(int channelCapacity = 0)
        {
            channel = CreateChannel(channelCapacity);
        }

        public void AddThread(BaseChannelThread<ItemT> thread)
        {
            thread.Channel = channel;
            thread.Info = Info;
            threads.Add((ThreadT)thread);
        }

        public void StartThread()
        {
            foreach (var thread in threads)
            {
                thread.Start();
            }
        }

        public void StopAndWaitThread(int msec = 10000)
        {
            channel.Writer.TryComplete();

            // wait until msec
            foreach (var thread in threads)
            {
                thread.StopAndWait(msec);
            }
        }

        public bool AddItem(ItemT item)
        {
            if (item == null)
            {
                return false;
            }

            Info.CountEnqueue();
            return channel.Writer.TryWrite(item);
        }

    }


    /// <summary>
    /// channel 및 thread 관리자
    /// 하나의 channel 은 하나의 worker thread 에서 처리
    /// </summary>
    /// <typeparam name="ThreadT">ThreadT</typeparam>
    /// <typeparam name="ItemT">ItemT</typeparam>
    public class ChannelsThreadsManager<ThreadT, ItemT> : BaseChannelThreadManager<ItemT> where ThreadT : BaseChannelThread<ItemT>
    {
        public class ThreadInfo
        {
            public ThreadT Thread { get; set; }
            public Channel<ItemT> Channel { get; set; }
        }

        private readonly List<ThreadInfo> threads = new List<ThreadInfo>();
        private readonly int _channelCapacity = 0;

        public ChannelManagementInfo Info { get; private set; } = new ChannelManagementInfo();

        // 채널 생성
        public ChannelsThreadsManager(int channelCapacity = 0)
        {
            _channelCapacity = channelCapacity;
        }

        public void AddThread(BaseChannelThread<ItemT> thread)
        {
            var channel = CreateChannel(_channelCapacity);
            thread.Info = Info;
            thread.Channel = channel;
            threads.Add(
                new ThreadInfo()
                {
                    Channel = channel,
                    Thread = (ThreadT)thread
                }
            );
        }

        public void StartThread()
        {
            foreach (var info in threads)
            {
                info.Thread.Start();
            }
        }

        public void StopAndWaitThread(int msec = 3000)
        {
            // null item 인 경우 thread 가 종료되도록 한다.
            for (int i = 0; i < threads.Count; i++)
            {
                var info = threads[i];
                //info.Channel.Writer.TryWrite(default);
                info.Channel.Writer.TryComplete();
            }

            // wait until msec
            foreach (var info in threads)
            {
                info.Thread.StopAndWait(msec);
            }
        }


        /// <summary>
        /// 숫자(num) 별로 동일한 thread 를 사용하도록 한다.
        /// </summary>
        /// <param name="i">i</param>
        /// <param name="item">item</param>
        public void AddItem(int i, ItemT item)
        {
            if (item == null)
            {
                return;
            }

            Info.CountEnqueue();

            int idx = i % threads.Count;
            threads[idx].Channel.Writer.TryWrite(item);
        }

    }

    /// <summary>
    /// 테스트 샘플 클래스 (참고용)
    /// </summary>
    class TestChannelThread
    {
        public static void Test()
        {
            Console.WriteLine("start...");
            new TestChannelThread().Run();
            Console.WriteLine("done...");
        }

        public void Run()
        {
            // manager 생성
            var test = new ChannelThreadsManager<WorkerThread, string>();

            // thread 생성
            test.AddThread(new WorkerThread());
            test.AddThread(new WorkerThread());
            test.StartThread();

            // 데이터 입력
            for (int i = 0; i < 10; i++)
            {
                test.AddItem("test");
            }

            while (true)
            {
                var input = Console.ReadLine();
                if ("q".Equals(input))
                {
                    Console.WriteLine("quit");
                    break;
                }
                else
                {
                    test.AddItem(input);
                }
            }
            test.StopAndWaitThread();
        }



        class WorkerThread : BaseChannelThread<string>
        {
            public override void Loop(string job)
            {
                Thread.Sleep(100);
                Console.WriteLine($"job: {job}, ElasedTime: {GetElasedTimeMsec()}");
            }

        }
    }
}

 

 

 

 

 

 

 

 

 

 

'C#' 카테고리의 다른 글

c# DebuggerDisplay, DebuggerTypeProxy attribute  (0) 2022.03.16
c# method attribute 의 값을 해당 메소드에서 가져오기  (0) 2022.03.15
c# HttpClient  (0) 2022.03.15
object pool, object type pool  (0) 2022.03.11
GC TEST  (0) 2022.03.08
GC EventPipe 모니터링  (0) 2022.03.08
C# .net core 빌드 및 powershell 전송  (0) 2022.03.01
c# stack size 확인  (0) 2022.02.24