본문 바로가기

프로그래밍/└ .NET

[C#] Pipe 통신

PipeBase.cs

using System;
using System.Collections.Generic;
using System.IO.Pipes;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Common.Pipe
{
    public abstract class PipeBase
    {
        // protected
        protected PipeStream pipeStream;
        protected Action<PipeBase> asyncReaderStart;
        protected void StartByteReaderAsync(Action<byte[]> packetReceived)
        {
            int count = 4;
            byte[] bDataLength = new byte[count];
            this.pipeStream.ReadAsync(bDataLength, 0, count).ContinueWith((Action<Task<int>>)(t =>
            {
                int len = t.Result;
                if (len == 0)
                {
                    EventHandler<EventArgs> pipeClosed = this.PipeClosed;
                    if (pipeClosed == null)
                        return;
                    pipeClosed((object)this, EventArgs.Empty);
                }
                else
                {
                    int int32 = BitConverter.ToInt32(bDataLength, 0);
                    byte[] data = new byte[int32];
                    this.pipeStream.ReadAsync(data, 0, int32).ContinueWith((Action<Task<int>>)(t2 =>
                    {
                        len = t2.Result;
                        if (len == 0)
                        {
                            EventHandler<EventArgs> pipeClosed = this.PipeClosed;
                            if (pipeClosed == null)
                                return;
                            pipeClosed((object)this, EventArgs.Empty);
                        }
                        else
                        {
                            packetReceived(data);
                            this.StartByteReaderAsync(packetReceived);
                        }
                    }));
                }
            }));
        }

        // public
        public event EventHandler<PipeBaseEventArgs> DataReceived;
        public event EventHandler<EventArgs> PipeClosed;
        public void Close()
        {
            this.pipeStream.WaitForPipeDrain();
            this.pipeStream.Close();
            this.pipeStream.Dispose();
            this.pipeStream = null;
            this.DataReceived = null;
        }
        public void StartByteReaderAsync()
        {
            this.StartByteReaderAsync((Action<byte[]>)(b =>
            {
                EventHandler<PipeBaseEventArgs> dataReceived = this.DataReceived;
                if (dataReceived == null)
                    return;
                dataReceived((object)this, new PipeBaseEventArgs(b, b.Length));
            }));
        }
        public void StartStringReaderAsync()
        {
            this.StartByteReaderAsync((Action<byte[]>)(b =>
            {
                string str = Encoding.UTF8.GetString(b).TrimEnd(new char[1]);
                EventHandler<PipeBaseEventArgs> dataReceived = this.DataReceived;
                if (dataReceived == null)
                    return;
                dataReceived((object)this, new PipeBaseEventArgs(str));
            }));
        }
        public void Flush()
        {
            this.pipeStream.Flush();
        }
        public Task WriteString(string str)
        {
            return this.WriteBytes(Encoding.UTF8.GetBytes(str));
        }
        public Task WriteBytes(byte[] bytes)
        {
            byte[] array = ((IEnumerable<byte>)BitConverter.GetBytes(bytes.Length)).Concat<byte>((IEnumerable<byte>)bytes).ToArray<byte>();
            return this.pipeStream.WriteAsync(array, 0, array.Length);
        }
        public void ClearReceivedHandler()
        {
            this.DataReceived = null;
        }
    }
}

 

 

PipeBaseEventArgs.cs

namespace Common.Pipe
{
    public class PipeBaseEventArgs
    {
        public byte[] Data { get; protected set; }
        public int Len { get; protected set; }
        public string String { get; protected set; }
        public object ClientManager { get; set; }
        public PipeBaseEventArgs(string str)
        {
            this.String = str;
        }
        public PipeBaseEventArgs(byte[] data, int len)
        {
            this.Data = data;
            this.Len = len;
        }
    }
}

 

PieBaseForServer.cs

using System;
using System.IO.Pipes;

namespace Common.Pipe
{
    public class PipeBaseForServer : PipeBase
    {
        // protected
        protected NamedPipeServerStream serverPipeStream;
        protected string PipeName { get; set; }
        protected void PipeConnected(IAsyncResult ar)
        {
            this.serverPipeStream.EndWaitForConnection(ar);
            EventHandler<EventArgs> connected = this.Connected;
            if (connected != null)
            {
                connected(this, new EventArgs());
            }

            this.asyncReaderStart(this);
        }

        // public
        public event EventHandler<EventArgs> Connected;
        public PipeBaseForServer(string pipeName, Action<PipeBase> asyncReaderStart)
        {
            this.asyncReaderStart = asyncReaderStart;
            this.PipeName = pipeName;
            this.serverPipeStream = new NamedPipeServerStream(pipeName, PipeDirection.InOut, CommonConstant.PipeMaxInstancesCount, 
                PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
            this.pipeStream = this.serverPipeStream;
            this.serverPipeStream.BeginWaitForConnection(new AsyncCallback(this.PipeConnected), null);
        }
        public PipeBaseForServer(string pipeName, Action<PipeBase> asyncReaderStart, PipeSecurity security)
        {
            this.asyncReaderStart = asyncReaderStart;
            this.PipeName = pipeName;
            this.serverPipeStream = new NamedPipeServerStream(pipeName, PipeDirection.InOut, CommonConstant.PipeMaxInstancesCount, 
                PipeTransmissionMode.Byte, PipeOptions.Asynchronous, CommonConstant.PipeBufferSize, CommonConstant.PipeBufferSize, security);
            this.pipeStream = this.serverPipeStream;
            this.serverPipeStream.BeginWaitForConnection(new AsyncCallback(this.PipeConnected), null);
        }
        public void ClearConnectedEvent()
        {
            this.Connected = null;
        }
    }
}

 

PipeBaseForClient.cs

using System;
using System.IO.Pipes;

namespace Common.Pipe
{
    public class PipeBaseForClient : PipeBase
    {
        // protected
        protected NamedPipeClientStream clientPipeStream;

        // public
        public PipeBaseForClient(string serverName, string pipeName, Action<PipeBase> asyncReaderStart)
        {
            this.asyncReaderStart = asyncReaderStart;
            this.clientPipeStream = new NamedPipeClientStream(serverName, pipeName, PipeDirection.InOut, PipeOptions.Asynchronous);
            this.pipeStream =  this.clientPipeStream;
        }
        public void Connect()
        {
            this.clientPipeStream.Connect();
            this.asyncReaderStart(this);
        }
        public void Connect(int milliseconds)
        {
            this.clientPipeStream.Connect(milliseconds);
            this.asyncReaderStart(this);
        }
    }
}

 

PipeServer.cs

using Common.Pipe.Packet;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.IO.Pipes;
using System.Text;

namespace Common.Pipe
{
    public class PipeServer
    {
        // private
        private List<PipeBaseForServer> serverList = null;
        private PipeBaseForServer lastServer = null;
        private string pipeName = "";
        private PipeFunction pipeFunction = null;
        private PipeCloseFunction pipeCloseFunction = null;
        private void CreateServer()
        {
            PipeSecurity pipeSecurity = new PipeSecurity();
            pipeSecurity.AddAccessRule(new PipeAccessRule("Users", PipeAccessRights.FullControl, System.Security.AccessControl.AccessControlType.Allow));
            pipeSecurity.AddAccessRule(new PipeAccessRule("SYSTEM", PipeAccessRights.FullControl, System.Security.AccessControl.AccessControlType.Allow));

            lastServer = new PipeBaseForServer(this.pipeName, p => p.StartByteReaderAsync(), pipeSecurity);
            lastServer.Connected += new EventHandler<EventArgs>(Connected);
            lastServer.DataReceived += new EventHandler<PipeBaseEventArgs>(DataReceived);
            lastServer.PipeClosed += new EventHandler<EventArgs>(PipeClosed);
        }
        private void Connected(object sender, EventArgs args)
        {
            // loop 구조의 pipe 서버가 아닌 한건씩 처리하고 종료후 새로생성
            serverList.Add(lastServer);
            lastServer.ClearConnectedEvent();
            this.CreateServer();
        }
        private void PipeClosed(object sender, EventArgs args)
        {
            this.pipeCloseFunction();
        }
        private void DataReceived(object sender, PipeBaseEventArgs args)
        {
            PipeBaseForServer server = (PipeBaseForServer)sender;
            byte[] reply = args.Data;
            this.pipeFunction(ref reply);
            server.WriteBytes(reply);
        }

        // pubilc
        public delegate void PipeFunction(ref byte[] reply);    // data received 함수포인터
        public delegate void PipeCloseFunction();               // pipe closed 함수포인터
        public PipeServer(string pipeName, PipeFunction pipeFunction, PipeCloseFunction pipeCloseFunction = null)
        {
            this.serverList = new List<PipeBaseForServer>();
            this.pipeName = pipeName;
            this.pipeFunction = pipeFunction;
            this.pipeCloseFunction = pipeCloseFunction;

            CreateServer();
        }
        public void Close()
        {
            try
            {
                for (int i = 0; i < this.serverList.Count; i++)
                {
                    this.serverList[i].Close();
                }
            }
            catch (Exception e)
            {
                CommonLog.WriteDebugView(LogLevel.Error, "Except during closing pipe server. err: " + e.Message);
            }
        }
    }
}

 

PipeClient.cs

using System;
using System.Text;
using System.Threading;
using System.Diagnostics;

using Common.Pipe.Packet;
using Newtonsoft.Json.Linq;

namespace Common.Pipe
{
    public class PipeClient
    {
        // private
        private PipeBaseForClient client = null;
        private string pipeName = "";
        private object locker = new object();
        private string replyData;
        private PipePacketStatus pipeStatus = PipePacketStatus.Error;
        private bool CheckStatusSuccess()
        {
            lock (locker)
            {
                if (this.pipeStatus == PipePacketStatus.Success)
                {
                    return true;
                }

                return false;
            }
        }
        private void OnReceive(object sender, PipeBaseEventArgs args)
        {
            byte[] data = args.Data;
            string received = Encoding.UTF8.GetString(data);
            this.replyData = received;
            SetStatus(PipePacketStatus.Success);
            //CommonLog.WriteDebugView(LogLevel.Debug, "Client received: " + received);
        }
        private void SetStatus(PipePacketStatus pipeStatus)
        {
            lock (locker)
            {
                this.pipeStatus = pipeStatus;
            }
        }

        // public
        public JObject ReceiveData(ref PipePacketStatus status)
        {
            JObject whole = JObject.Parse(this.replyData);
            status = (PipePacketStatus)Convert.ToInt32(((string)whole["status"]));

            return whole;
        }
        public PipeClient(string pipeName)
        {
            this.pipeName = pipeName;
            client = new PipeBaseForClient(".", this.pipeName, p => p.StartByteReaderAsync());
            client.Connect();
            client.DataReceived += OnReceive;
        }
        public PipeClient(string pipeName, int timeoutMilliseconds)
        {
            this.pipeName = pipeName;
            client = new PipeBaseForClient(".", this.pipeName, p => p.StartByteReaderAsync());
            client.Connect(timeoutMilliseconds);
            client.DataReceived += OnReceive;
        }
        public void SendData(byte[] data, bool closeConnection = false)
        {
            SetStatus(PipePacketStatus.Error);
            byte[] request = data;
            client.WriteBytes(request);
            if (closeConnection == true) client.Close();
        }
        public void SendData(string data, bool closeConnection = false)
        {
            SetStatus(PipePacketStatus.Error);
            byte[] request = Encoding.UTF8.GetBytes(data);
            client.WriteBytes(request);
            if(closeConnection == true) client.Close();
        }
        public void Close()
        {
            this.Release();
            client.ClearReceivedHandler();
            client.Close();
        }
        public void Release()
        {
            ////this.RecvData = null;
        }
        public bool Wait(int timeout = 10)
        {
            timeout *= 40;
            while (timeout > 0)
            {
                Thread.Sleep(25);
                if (CheckStatusSuccess() == true)
                {
                    return true;
                }

                timeout -= 1;
            }

            if (this.pipeStatus == PipePacketStatus.Pending)
            {
                client.Close();
                client = new PipeBaseForClient(".", pipeName, p => p.StartByteReaderAsync());
                client.Connect();
                client.DataReceived += OnReceive;
            }

            return false;
        }
    }
}