using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
namespace test
{
/// <summary>
/// 채널 스레드 관리 정보
/// </summary>
public class ChannelManagementInfo
{
// 큐 처리 계산
private long _enqueueCount; // enqueue 된 수
private long _dequeueCount; // dequeue 된 수
public ulong Count { get { return (ulong)_enqueueCount - (ulong)_dequeueCount; } }
public ulong EnqueueCount { get { return (ulong)_enqueueCount; } }
public ulong DequeueCount { get { return (ulong)_dequeueCount; } }
public void CountEnqueue()
{
_ = Interlocked.Increment(ref _enqueueCount);
}
public void CountDequeue()
{
_ = Interlocked.Increment(ref _dequeueCount);
}
}
/// <summary>
/// channel 에 대한 worker thread
/// 상속하여 Run() 구현하여 사용
/// </summary>
/// <typeparam name="ItemT">ItemT</typeparam>
public abstract class BaseChannelThread<ItemT>
{
public string ThreadName { get { return _thread.Name; } }
public Channel<ItemT> Channel { get; set; }
public ChannelManagementInfo Info { get; set; }
private readonly Thread _thread;
private static int _seq = 0;
private readonly Stopwatch stopwatch = new Stopwatch();
private ItemT consumeItem;
public BaseChannelThread(string threadName = null)
{
_thread = new Thread(new ThreadStart(RunInThread))
{
Name = threadName ?? $"ct-{Interlocked.Increment(ref _seq)}"
};
}
public void Start()
{
_thread.Start();
}
private void RunInThread()
{
while (Channel.Reader.WaitToReadAsync().AsTask().Result)
{
while (Channel.Reader.TryRead(out consumeItem))
{
Info.CountDequeue();
stopwatch.Start();
// 로직 수행
Loop(consumeItem);
stopwatch.Reset();
}
}
}
public abstract void Loop(ItemT item);
public bool StopAndWait(int msec = 3000)
{
int waitTimeMsec = 100;
for (int i = 0; i < msec; i += waitTimeMsec)
{
if (!_thread.IsAlive)
{
return true;
}
Thread.Sleep(waitTimeMsec);
}
return false;
}
// Run() 실행 후 경과 시간(msec)
protected long GetElasedTimeMsec()
{
return stopwatch.ElapsedMilliseconds;
}
}
/// <summary>
/// BaseChannelThreadManager
/// </summary>
/// <typeparam name="ItemT">ItemT</typeparam>
public class BaseChannelThreadManager<ItemT>
{
protected Channel<ItemT> CreateChannel(int channelCapacity)
{
if (channelCapacity == 0)
{
// channel capacity 무제한. OOM 가능성이 있다.
return Channel.CreateUnbounded<ItemT>(
new UnboundedChannelOptions()
);
}
else
{
// channel capacity 를 지정하고 모자란 경우 wait 하게 된다.
return Channel.CreateBounded<ItemT>(
new BoundedChannelOptions(channelCapacity)
);
}
}
}
/// <summary>
/// 하나의 channel, 여러개의 worker thread 사용.
/// </summary>
/// <typeparam name="ThreadT">ThreadT</typeparam>
/// <typeparam name="ItemT">ItemT</typeparam>
public class ChannelThreadsManager<ThreadT, ItemT> : BaseChannelThreadManager<ItemT> where ThreadT : BaseChannelThread<ItemT>
{
private readonly Channel<ItemT> channel;
private readonly List<ThreadT> threads = new List<ThreadT>();
public ChannelManagementInfo Info { get; private set; } = new ChannelManagementInfo();
public ChannelThreadsManager(int channelCapacity = 0)
{
channel = CreateChannel(channelCapacity);
}
public void AddThread(BaseChannelThread<ItemT> thread)
{
thread.Channel = channel;
thread.Info = Info;
threads.Add((ThreadT)thread);
}
public void StartThread()
{
foreach (var thread in threads)
{
thread.Start();
}
}
public void StopAndWaitThread(int msec = 10000)
{
channel.Writer.TryComplete();
// wait until msec
foreach (var thread in threads)
{
thread.StopAndWait(msec);
}
}
public bool AddItem(ItemT item)
{
if (item == null)
{
return false;
}
Info.CountEnqueue();
return channel.Writer.TryWrite(item);
}
}
/// <summary>
/// channel 및 thread 관리자
/// 하나의 channel 은 하나의 worker thread 에서 처리
/// </summary>
/// <typeparam name="ThreadT">ThreadT</typeparam>
/// <typeparam name="ItemT">ItemT</typeparam>
public class ChannelsThreadsManager<ThreadT, ItemT> : BaseChannelThreadManager<ItemT> where ThreadT : BaseChannelThread<ItemT>
{
public class ThreadInfo
{
public ThreadT Thread { get; set; }
public Channel<ItemT> Channel { get; set; }
}
private readonly List<ThreadInfo> threads = new List<ThreadInfo>();
private readonly int _channelCapacity = 0;
public ChannelManagementInfo Info { get; private set; } = new ChannelManagementInfo();
// 채널 생성
public ChannelsThreadsManager(int channelCapacity = 0)
{
_channelCapacity = channelCapacity;
}
public void AddThread(BaseChannelThread<ItemT> thread)
{
var channel = CreateChannel(_channelCapacity);
thread.Info = Info;
thread.Channel = channel;
threads.Add(
new ThreadInfo()
{
Channel = channel,
Thread = (ThreadT)thread
}
);
}
public void StartThread()
{
foreach (var info in threads)
{
info.Thread.Start();
}
}
public void StopAndWaitThread(int msec = 3000)
{
// null item 인 경우 thread 가 종료되도록 한다.
for (int i = 0; i < threads.Count; i++)
{
var info = threads[i];
//info.Channel.Writer.TryWrite(default);
info.Channel.Writer.TryComplete();
}
// wait until msec
foreach (var info in threads)
{
info.Thread.StopAndWait(msec);
}
}
/// <summary>
/// 숫자(num) 별로 동일한 thread 를 사용하도록 한다.
/// </summary>
/// <param name="i">i</param>
/// <param name="item">item</param>
public void AddItem(int i, ItemT item)
{
if (item == null)
{
return;
}
Info.CountEnqueue();
int idx = i % threads.Count;
threads[idx].Channel.Writer.TryWrite(item);
}
}
/// <summary>
/// 테스트 샘플 클래스 (참고용)
/// </summary>
class TestChannelThread
{
public static void Test()
{
Console.WriteLine("start...");
new TestChannelThread().Run();
Console.WriteLine("done...");
}
public void Run()
{
// manager 생성
var test = new ChannelThreadsManager<WorkerThread, string>();
// thread 생성
test.AddThread(new WorkerThread());
test.AddThread(new WorkerThread());
test.StartThread();
// 데이터 입력
for (int i = 0; i < 10; i++)
{
test.AddItem("test");
}
while (true)
{
var input = Console.ReadLine();
if ("q".Equals(input))
{
Console.WriteLine("quit");
break;
}
else
{
test.AddItem(input);
}
}
test.StopAndWaitThread();
}
class WorkerThread : BaseChannelThread<string>
{
public override void Loop(string job)
{
Thread.Sleep(100);
Console.WriteLine($"job: {job}, ElasedTime: {GetElasedTimeMsec()}");
}
}
}
}
'C#' 카테고리의 다른 글
c# DebuggerDisplay, DebuggerTypeProxy attribute (0) | 2022.03.16 |
---|---|
c# method attribute 의 값을 해당 메소드에서 가져오기 (0) | 2022.03.15 |
c# HttpClient (0) | 2022.03.15 |
object pool, object type pool (0) | 2022.03.11 |
GC TEST (0) | 2022.03.08 |
GC EventPipe 모니터링 (0) | 2022.03.08 |
C# .net core 빌드 및 powershell 전송 (0) | 2022.03.01 |
c# stack size 확인 (0) | 2022.02.24 |