ulysseskao
2016-05-05 540014a7702a9bae7a3b9c00098671a132e869e8
CCSTrace/CCS/CCSMain.cs
@@ -1,13 +1,9 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Data.OracleClient;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Web;
using System.Xml;
using Amib.Threading;
using CCSTrace.CCS.Domain;
@@ -24,31 +20,31 @@
        private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
        private static CcsMain _instance;
        private readonly int _connectionCount = 1;
        private OracleConnection _mainConn = null;
        private SmartThreadPool _mainjobThreadPool = null;
        private readonly object _syncDbQueue = new object();
        private readonly Queue<OracleConnection> _dbQueue = new Queue<OracleConnection>();
        private readonly List<string> _processingCases = new List<string>();
        private readonly List<KeyValuePair<string, int>> _waitingCases = new List<KeyValuePair<string, int>>();//Record the case when the case's FDR processed by another
        //Record the case when the case's FDR processed by another
        private readonly SortedSet<int> _processingFdr = new SortedSet<int>();
        // private readonly object _syncDbQueue = new object();
        // private readonly Queue<OracleConnection> _dbQueue = new Queue<OracleConnection>();
        private readonly object _syncProcessingCases = new object();
        private readonly object _syncProcessingFdrs = new object();
        private readonly object _syncWaitingCases = new object();
        private readonly List<KeyValuePair<string, int>> _waitingCases = new List<KeyValuePair<string, int>>();
        public CcsMain()
        {
            _mainjobThreadPool = new SmartThreadPool();
            // Startup();
        }
        // private readonly int _connectionCount = 1;
        private OracleConnection _mainConn;
        private SmartThreadPool _mainjobThreadPool;
        private IWorkItemsGroup _wigFinsh;
        private IWorkItemsGroup _wigNextJobs;
        private IWorkItemsGroup _wigProcess;
        public string AppDataPath { get; set; }
        public SortedSet<int> ProcessingFdr { get; } = new SortedSet<int>();
        public static CcsMain Instance => _instance ?? (_instance = new CcsMain());
        public List<string> ProcessingCases => _processingCases;
        public List<KeyValuePair<string, int>> WaitingCases => _waitingCases;
        public void Startup()
        {
@@ -56,8 +52,8 @@
            try
            {
                ReadXml();
                Logger.Info("CCSMain ReadXml Complete.");
                ReadConfigXml();
                Logger.Info("CCSMain ReadConfigXml Complete.");
                if (_mainConn == null)
                    _mainConn = CreateConnection();
@@ -66,10 +62,12 @@
                GlobalVariable.CcsCodelist = new CcsCodelist(_mainConn);
                GlobalVariable.EosCodelist = new EosCodelist(_mainConn);
                /*
                for (var i = 0; i < _connectionCount; i++)
                {
                    lock (_syncDbQueue) _dbQueue.Enqueue(CreateConnection());
                }
                */
            }
            catch (Exception e)
            {
@@ -79,7 +77,34 @@
            AddrContrast.Instance.Initialize(_mainConn);
            AlarmData.Instance.Initialize(_mainConn);
            DeptContrast.Instance.Initialize(_mainConn);
            var stpStartInfo = new STPStartInfo
            {
                StartSuspended = true,
                IdleTimeout = GlobalVariable.IdleTimeout*1000,
                MaxWorkerThreads = GlobalVariable.MaxThreadSize,
                MinWorkerThreads = GlobalVariable.MinThreadSize
            };
            _mainjobThreadPool = new SmartThreadPool(stpStartInfo);
            var wigStartInfo = new WIGStartInfo
            {
                FillStateWithArgs = true
            };
            _wigProcess = _mainjobThreadPool.CreateWorkItemsGroup(1, wigStartInfo);
            _wigFinsh = _mainjobThreadPool.CreateWorkItemsGroup(1, wigStartInfo);
            _wigNextJobs = _mainjobThreadPool.CreateWorkItemsGroup(1, wigStartInfo);
            _mainjobThreadPool.Start();
            _wigNextJobs.Start();
            _wigProcess.Start();
            _wigFinsh.Start();
            Logger.Info("Create Thread Pool.");
            // doNextJob
            _wigNextJobs.QueueWorkItem(DoNextJobs);
            /*
            CCSEventRecord ccsEventRecord = GetWaitRecord(_mainConn);
            if (ccsEventRecord != null)
@@ -88,15 +113,7 @@
                lock (_syncDbQueue) conn = _dbQueue.Dequeue();
                ProcessCase(ccsEventRecord, conn);
            }
            STPStartInfo stpStartInfo = new STPStartInfo
            {
                IdleTimeout = GlobalVariable.IdleTimeout*1000,
                MaxWorkerThreads = GlobalVariable.MaxThreadSize,
                MinWorkerThreads = GlobalVariable.MinThreadSize
            };
            _mainjobThreadPool = new SmartThreadPool(stpStartInfo);
            Logger.Info("Create Thread Pool.");
            */
        }
        public void Shutdown()
@@ -108,90 +125,65 @@
            GC.WaitForPendingFinalizers();
        }
        public void AcceptEvent2(CCSEventRecord ccsEventRecord)
        private void DoNextJobs()
        {
            // WorkItemCallback workItemCallback = new WorkItemCallback(this.DoEventWork);
            _mainjobThreadPool.QueueWorkItem(DoEventWork, ccsEventRecord);
        }
        private void DoEventWork(CCSEventRecord state)
        {
            OracleConnection conn;
            lock (_syncDbQueue) conn = _dbQueue.Dequeue();
            Logger.Debug("Enter DoNextJobs");
            var conn = CreateConnection();
            try
            {
                AcceptEvent(state);
                var ccsEventRecord = GetWaitRecord(conn);
                if (ccsEventRecord != null)
                {
                    _wigProcess.QueueWorkItem(ProcessCase, ccsEventRecord);
                }
            }
            finally
            {
                lock (_syncDbQueue) _dbQueue.Enqueue(conn);
                conn?.Close();
                Logger.Debug("Leave DoNextJobs");
            }
        }
        public void AcceptEvent(CCSEventRecord ccsEventRecord)
        {
            /*
            WIGStartInfo wigStartInfo = new WIGStartInfo()
            _mainjobThreadPool.QueueWorkItem(AcceptWebServiceEvent, ccsEventRecord);
        }
        private void AcceptWebServiceEvent(CCSEventRecord ccsEventRecord)
        {
            Logger.Debug("Enter AcceptWebServiceEvent");
            var conn = CreateConnection();
            try
            {
                FillStateWithArgs = true,
            };
            _wig1 = _smartThreadPool.CreateWorkItemsGroup((int)spinCon1.Value, wigStartInfo);
            _wig2 = _smartThreadPool.CreateWorkItemsGroup((int)spinCon2.Value, wigStartInfo);
            */
            int i = 0;
            int reConnectCount = 1;
            while (i <= reConnectCount)
                if (InsertCCSEventRecord(ccsEventRecord, conn))
                {
                    _wigNextJobs.QueueWorkItem(DoNextJobs);
                }
            }
            catch (Exception e)
            {
                try
                {
                    if (InsertCCSEventRecord(ccsEventRecord, _mainConn))
                    {
                        if (_dbQueue.Count > 0)
                        {
                            //將EVETNQUERY的案件狀態改為開始處理
                            CCSEventRecord waitCcsEventRecord = GetWaitRecord(_mainConn);
                            if (waitCcsEventRecord != null)
                            {
                                OracleConnection conn;
                                lock (_syncDbQueue) conn = _dbQueue.Dequeue();
                                ProcessCase(waitCcsEventRecord, conn);
                            }
                        }
                        break;
                    }
                }
                catch (Exception e)
                {
                    Logger.Error(e, e.Message);
                    if (_mainConn.State == ConnectionState.Closed)
                    {
                        i++;
                        if (i > reConnectCount)
                            throw;
                    }
                    else
                        throw;
                }
                Logger.Error(e, e.Message);
            }
            finally
            {
                conn?.Close();
                Logger.Debug("Leave AcceptWebServiceEvent");
            }
        }
        private void ReadXml()
        private void ReadConfigXml()
        {
            XmlReader reader = null;
            try
            {
                string file = Path.Combine(AppDataPath, DbConfigFilename);
                var file = Path.Combine(AppDataPath, DbConfigFilename);
                // 建立 XML 讀取器
                XmlReaderSettings settings = new XmlReaderSettings
                var settings = new XmlReaderSettings
                {
                    IgnoreComments = true, // 不處理註解
                    IgnoreWhitespace = true, // 跳過空白
@@ -205,7 +197,7 @@
                    switch (reader.NodeType)
                    {
                        case XmlNodeType.Element:
                            string localName = reader.LocalName; // 取得標籤名稱
                            var localName = reader.LocalName; // 取得標籤名稱
                            // Step 3: 讀取 FileInfo 標籤的屬性
                            if (localName.Equals("DBSetting"))
@@ -214,7 +206,7 @@
                                    $"Data source={reader["DataSource"]};User Id={reader["UserId"]};Password={reader["Password"]};";
                                GlobalVariable.TraceConnectionString =
                                    $"{reader["UserId"]}/{reader["Password"]}@{reader["DataSource"]}";
                                string token = reader["ConnectionCount"];
                                var token = reader["ConnectionCount"];
                                if (token != null)
                                {
                                    GlobalVariable.MaxConnectionCount = int.Parse(token);
@@ -224,9 +216,10 @@
                                {
                                    GlobalVariable.ShowError = bool.Parse(token);
                                }
                            } else if (localName.Equals("ThreadSetting"))
                            }
                            else if (localName.Equals("ThreadSetting"))
                            {
                                string token = reader["maxThreadSize"];
                                var token = reader["maxThreadSize"];
                                if (token != null)
                                {
                                    GlobalVariable.MaxThreadSize = int.Parse(token);
@@ -252,22 +245,19 @@
        private OracleConnection CreateConnection()
        {
            OracleConnectionStringBuilder builder = new OracleConnectionStringBuilder(GlobalVariable.ConnectionString)
            var builder = new OracleConnectionStringBuilder(GlobalVariable.ConnectionString)
            {
                MaxPoolSize = GlobalVariable.MaxConnectionCount,
                MinPoolSize = 1,
                Pooling = true
            };
            string connectstring = builder.ToString();
            var connectstring = builder.ToString();
            OracleConnection dbConn = new OracleConnection(connectstring);
            var dbConn = new OracleConnection(connectstring);
            dbConn.Open();
            return dbConn;
        }
        [MethodImpl(MethodImplOptions.Synchronized)]
        private bool InsertCCSEventRecord(CCSEventRecord ccsEventRecord, OracleConnection conn)
        {
            OracleTransaction transaction = null;
@@ -330,80 +320,76 @@
            return true;
        }
        private delegate void WorkerThreadHandler();
        private void ProcessCase(CCSEventRecord ccsEventRecord, OracleConnection conn)
        private void ProcessCase(CCSEventRecord ccsEventRecord)
        {
            Logger.Debug("Enter ProcessCase");
            OracleConnection conn = CreateConnection();
            try
            {
                if (conn.State == ConnectionState.Closed)
                    conn.Open();
                ProcessEvent processEvent = new ProcessEvent(ccsEventRecord, conn, GlobalVariable.TraceConnectionString);
                processEvent.ThreadFinish += ThreadEndEventProcess;
                var processEvent = new ProcessEvent();
                processEvent.Run(ccsEventRecord, conn, GlobalVariable.TraceConnectionString);
                ThreadStart threadStart = processEvent.Run;
                Thread thread = new Thread(threadStart);
                thread.Start();
                _wigNextJobs.QueueWorkItem(DoNextJobs);
            }
            catch
            catch (Exception e)
            {
                lock (_syncDbQueue) _dbQueue.Enqueue(conn);
                Logger.Error(e, e.Message);
            }
            finally
            {
                conn?.Close();
            }
            Logger.Debug("Leave ProcessCase");
        }
        private void ThreadEndEventProcess(object sender, ThreadEndEvent e)
        {
            //將EVETNQUERY的案件狀態改為開始處理
            CCSEventRecord waitCcsEventRecord = GetWaitRecord(e.GetConnection());
            try
            {
                if (waitCcsEventRecord != null)
                    ProcessCase(waitCcsEventRecord, e.GetConnection());
            }
            finally
            {
                lock (_syncDbQueue) _dbQueue.Enqueue(e.GetConnection());
            }
        }
        [MethodImpl(MethodImplOptions.Synchronized)]
        // [MethodImpl(MethodImplOptions.Synchronized)]
        private CCSEventRecord GetWaitRecord(OracleConnection conn)
        {
            string processCcsid = "";
            string ccsid = "";
            var processCcsid = "";
            var ccsid = "";
            CCSEventRecord ccsEventRecord = null;
            foreach (var obj in _waitingCases)
            lock (_syncWaitingCases)
            {
                var ccsId = obj.Key;
                var fdrid = obj.Value;
                if (ProcessingFdr.Contains(fdrid)) //該條饋線仍有案件在處理中
                    processCcsid = processCcsid + "'" + ccsid + "',";
                else
                foreach (var obj in _waitingCases)
                {
                    ccsid = ccsId;
                    _waitingCases.Remove(obj);
                    break;
                    var ccsId = obj.Key;
                    var fdrid = obj.Value;
                    if (ContainProcessingFdr(fdrid)) //該條饋線仍有案件在處理中
                        processCcsid = processCcsid + "'" + ccsid + "',";
                    else
                    {
                        ccsid = ccsId;
                        _waitingCases.Remove(obj);
                        break;
                    }
                }
            }
            if (ccsid.Length == 0)  //沒有因同饋線而在等候中的案件
            if (ccsid.Length == 0) //沒有因同饋線而在等候中的案件
            {
                var sqlStmt = "SELECT Q.CCSID AS CCSID FROM CCS.EVENTQUERY Q,CCS.EVENTRECORD R WHERE Q.CASESTATUS IN (" +
                    (int) CCSCaseState.EventInitial + "," +
                var sqlStmt =
                    "SELECT Q.CCSID AS CCSID FROM CCS.EVENTQUERY Q,CCS.EVENTRECORD R WHERE Q.CASESTATUS IN (" +
                    (int) CCSCaseState.EventInitial + "," +
                    (int) CCSCaseState.EventProcess + ")";
                processCcsid = _processingCases.Aggregate(processCcsid, (current, item) => current + "'" + item + "',");
                lock (_processingCases)
                {
                    processCcsid = _processingCases.Aggregate(processCcsid,
                        (current, item) => current + "'" + item + "',");
                }
                if (processCcsid.Length != 0)
                    sqlStmt = sqlStmt + " AND Q.CCSID NOT IN (" + processCcsid.Substring(0, processCcsid.Length - 1) + ")";
                    sqlStmt = sqlStmt + " AND Q.CCSID NOT IN (" + processCcsid.Substring(0, processCcsid.Length - 1) +
                              ")";
                sqlStmt = sqlStmt + " AND Q.CCSID = R.CCSID AND ROWNUM < 2 ORDER BY Q.CHANGETIME";
                OracleCommand command = new OracleCommand(sqlStmt, conn);
                OracleDataReader reader = command.ExecuteReader();
                var command = new OracleCommand(sqlStmt, conn);
                var reader = command.ExecuteReader();
                try
                {
@@ -426,14 +412,14 @@
            if (ccsEventRecord != null)
            {
                CCSEventQuery ccsEventQuery = new CCSEventQuery
                var ccsEventQuery = new CCSEventQuery
                {
                    CcsId = ccsEventRecord.CcsId,
                    CaseStatus = (int) CCSCaseState.EventProcess
                };
                //先將EVETNQUERY的案件狀態改為開始處理
                OracleTransaction transaction = conn.BeginTransaction();
                var transaction = conn.BeginTransaction();
                try
                {
@@ -441,7 +427,10 @@
                    {
                        Logger.Info("更新EVENTQUERY的案件狀態為處理中.(CCSID = " + ccsEventRecord.CcsId + ")");
                        transaction.Commit();
                        _processingCases.Add(ccsEventRecord.CcsId);
                        lock (_syncProcessingCases)
                        {
                            _processingCases.Add(ccsEventRecord.CcsId);
                        }
                    }
                    else
                    {
@@ -463,5 +452,45 @@
            return ccsEventRecord;
        }
        public void AddWaitingCases(string ccsId, int fdrid)
        {
            lock (_syncWaitingCases)
            {
                _waitingCases.Add(new KeyValuePair<string, int>(ccsId, fdrid));
            }
        }
        public void AddProcessingFdr(int fdrid)
        {
            lock (_syncProcessingFdrs)
            {
                _processingFdr.Add(fdrid);
            }
        }
        public bool ContainProcessingFdr(int fdrid)
        {
            lock (_syncProcessingFdrs)
            {
                return _processingFdr.Contains(fdrid);
            }
        }
        public void RemoveProcessingFdr(int fdrId)
        {
            lock (_syncProcessingFdrs)
            {
                _processingFdr.Remove(fdrId);
            }
        }
        public void RemoveProcessingCases(string ccsId)
        {
            lock (_syncProcessingCases)
            {
                _processingCases.Remove(ccsId);
            }
        }
    }
}