From 540014a7702a9bae7a3b9c00098671a132e869e8 Mon Sep 17 00:00:00 2001
From: ulysseskao <ulysseskao@gmail.com>
Date: Thu, 05 May 2016 10:21:11 +0800
Subject: [PATCH] Merge remote-tracking branch 'origin/usestp'

---
 CCSTrace/CCS/CCSMain.cs |  335 ++++++++++++++++++++++++++++++-------------------------
 1 files changed, 182 insertions(+), 153 deletions(-)

diff --git a/CCSTrace/CCS/CCSMain.cs b/CCSTrace/CCS/CCSMain.cs
index a9a480b..81b520d 100644
--- a/CCSTrace/CCS/CCSMain.cs
+++ b/CCSTrace/CCS/CCSMain.cs
@@ -1,13 +1,9 @@
 using System;
-using System.Collections;
 using System.Collections.Generic;
 using System.Data;
 using System.Data.OracleClient;
 using System.IO;
 using System.Linq;
-using System.Runtime.CompilerServices;
-using System.Threading;
-using System.Web;
 using System.Xml;
 using Amib.Threading;
 using CCSTrace.CCS.Domain;
@@ -24,31 +20,31 @@
         private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
         private static CcsMain _instance;
 
-        private readonly int _connectionCount = 1;
-        private OracleConnection _mainConn = null;
-
-        private SmartThreadPool _mainjobThreadPool = null;
-        private readonly object _syncDbQueue = new object();
-        private readonly Queue<OracleConnection> _dbQueue = new Queue<OracleConnection>();
         private readonly List<string> _processingCases = new List<string>();
-        private readonly List<KeyValuePair<string, int>> _waitingCases = new List<KeyValuePair<string, int>>();//Record the case when the case's FDR processed by another
+        //Record the case when the case's FDR processed by another
+        private readonly SortedSet<int> _processingFdr = new SortedSet<int>();
+        // private readonly object _syncDbQueue = new object();
+        // private readonly Queue<OracleConnection> _dbQueue = new Queue<OracleConnection>();
+        private readonly object _syncProcessingCases = new object();
+        private readonly object _syncProcessingFdrs = new object();
+        private readonly object _syncWaitingCases = new object();
 
+        private readonly List<KeyValuePair<string, int>> _waitingCases = new List<KeyValuePair<string, int>>();
+ 
 
-        public CcsMain()
-        {
-            _mainjobThreadPool = new SmartThreadPool();
-            // Startup();
-        }
+        // private readonly int _connectionCount = 1;
+        private OracleConnection _mainConn;
+
+        private SmartThreadPool _mainjobThreadPool;
+
+        private IWorkItemsGroup _wigFinsh;
+        private IWorkItemsGroup _wigNextJobs;
+        private IWorkItemsGroup _wigProcess;
+
 
         public string AppDataPath { get; set; }
 
-        public SortedSet<int> ProcessingFdr { get; } = new SortedSet<int>();
-
         public static CcsMain Instance => _instance ?? (_instance = new CcsMain());
-
-        public List<string> ProcessingCases => _processingCases;
-
-        public List<KeyValuePair<string, int>> WaitingCases => _waitingCases;
 
         public void Startup()
         {
@@ -56,8 +52,8 @@
 
             try
             {
-                ReadXml();
-                Logger.Info("CCSMain ReadXml Complete.");
+                ReadConfigXml();
+                Logger.Info("CCSMain ReadConfigXml Complete.");
 
                 if (_mainConn == null)
                     _mainConn = CreateConnection();
@@ -66,10 +62,12 @@
                 GlobalVariable.CcsCodelist = new CcsCodelist(_mainConn);
                 GlobalVariable.EosCodelist = new EosCodelist(_mainConn);
 
+                /*
                 for (var i = 0; i < _connectionCount; i++)
                 {
                     lock (_syncDbQueue) _dbQueue.Enqueue(CreateConnection());
                 }
+                */
             }
             catch (Exception e)
             {
@@ -79,7 +77,34 @@
 
             AddrContrast.Instance.Initialize(_mainConn);
             AlarmData.Instance.Initialize(_mainConn);
+            DeptContrast.Instance.Initialize(_mainConn);
 
+            var stpStartInfo = new STPStartInfo
+            {
+                StartSuspended = true,
+                IdleTimeout = GlobalVariable.IdleTimeout*1000,
+                MaxWorkerThreads = GlobalVariable.MaxThreadSize,
+                MinWorkerThreads = GlobalVariable.MinThreadSize
+            };
+            _mainjobThreadPool = new SmartThreadPool(stpStartInfo);
+            var wigStartInfo = new WIGStartInfo
+            {
+                FillStateWithArgs = true
+            };
+            _wigProcess = _mainjobThreadPool.CreateWorkItemsGroup(1, wigStartInfo);
+            _wigFinsh = _mainjobThreadPool.CreateWorkItemsGroup(1, wigStartInfo);
+            _wigNextJobs = _mainjobThreadPool.CreateWorkItemsGroup(1, wigStartInfo);
+
+            _mainjobThreadPool.Start();
+            _wigNextJobs.Start();
+            _wigProcess.Start();
+            _wigFinsh.Start();
+
+            Logger.Info("Create Thread Pool.");
+
+            // doNextJob
+            _wigNextJobs.QueueWorkItem(DoNextJobs);
+            /*
             CCSEventRecord ccsEventRecord = GetWaitRecord(_mainConn);
 
             if (ccsEventRecord != null)
@@ -88,15 +113,7 @@
                 lock (_syncDbQueue) conn = _dbQueue.Dequeue();
                 ProcessCase(ccsEventRecord, conn);
             }
-
-            STPStartInfo stpStartInfo = new STPStartInfo
-            {
-                IdleTimeout = GlobalVariable.IdleTimeout*1000,
-                MaxWorkerThreads = GlobalVariable.MaxThreadSize,
-                MinWorkerThreads = GlobalVariable.MinThreadSize
-            };
-            _mainjobThreadPool = new SmartThreadPool(stpStartInfo);
-            Logger.Info("Create Thread Pool.");
+            */
         }
 
         public void Shutdown()
@@ -108,90 +125,65 @@
             GC.WaitForPendingFinalizers();
         }
 
-        public void AcceptEvent2(CCSEventRecord ccsEventRecord)
+        private void DoNextJobs()
         {
-            // WorkItemCallback workItemCallback = new WorkItemCallback(this.DoEventWork);
-            _mainjobThreadPool.QueueWorkItem(DoEventWork, ccsEventRecord);
-        }
-
-        private void DoEventWork(CCSEventRecord state)
-        {
-            OracleConnection conn;
-            lock (_syncDbQueue) conn = _dbQueue.Dequeue();
+            Logger.Debug("Enter DoNextJobs");
+            var conn = CreateConnection();
 
             try
             {
-                AcceptEvent(state);
+                var ccsEventRecord = GetWaitRecord(conn);
+
+                if (ccsEventRecord != null)
+                {
+                    _wigProcess.QueueWorkItem(ProcessCase, ccsEventRecord);
+                }
             }
             finally
             {
-                lock (_syncDbQueue) _dbQueue.Enqueue(conn);
+                conn?.Close();
+                Logger.Debug("Leave DoNextJobs");
             }
-
-
         }
 
         public void AcceptEvent(CCSEventRecord ccsEventRecord)
         {
-            /*
-            WIGStartInfo wigStartInfo = new WIGStartInfo()
+            _mainjobThreadPool.QueueWorkItem(AcceptWebServiceEvent, ccsEventRecord);
+        }
+
+        private void AcceptWebServiceEvent(CCSEventRecord ccsEventRecord)
+        {
+            Logger.Debug("Enter AcceptWebServiceEvent");
+            var conn = CreateConnection();
+
+            try
             {
-                FillStateWithArgs = true,
-            };
-            _wig1 = _smartThreadPool.CreateWorkItemsGroup((int)spinCon1.Value, wigStartInfo);
-            _wig2 = _smartThreadPool.CreateWorkItemsGroup((int)spinCon2.Value, wigStartInfo);
-
-            */
-
-            int i = 0;
-            int reConnectCount = 1;
-
-            while (i <= reConnectCount)
+                if (InsertCCSEventRecord(ccsEventRecord, conn))
+                {
+                    _wigNextJobs.QueueWorkItem(DoNextJobs);
+                }
+            }
+            catch (Exception e)
             {
-                try
-                {
-                    if (InsertCCSEventRecord(ccsEventRecord, _mainConn))
-                    {
-                        if (_dbQueue.Count > 0)
-                        {
-                            //將EVETNQUERY的案件狀態改為開始處理
-                            CCSEventRecord waitCcsEventRecord = GetWaitRecord(_mainConn);
-
-                            if (waitCcsEventRecord != null)
-                            {
-                                OracleConnection conn;
-                                lock (_syncDbQueue) conn = _dbQueue.Dequeue();
-                                ProcessCase(waitCcsEventRecord, conn);
-                            }
-                        }
-
-                        break;
-                    }
-                }
-                catch (Exception e)
-                {
-                    Logger.Error(e, e.Message);
-                    if (_mainConn.State == ConnectionState.Closed)
-                    {
-                        i++;
-                        if (i > reConnectCount)
-                            throw;
-                    }
-                    else
-                        throw;
-                }
+                Logger.Error(e, e.Message);
+            }
+            finally
+            {
+                conn?.Close();
+                Logger.Debug("Leave AcceptWebServiceEvent");
             }
         }
 
-        private void ReadXml()
+
+        private void ReadConfigXml()
         {
             XmlReader reader = null;
 
             try
             {
-                string file = Path.Combine(AppDataPath, DbConfigFilename);
+                var file = Path.Combine(AppDataPath, DbConfigFilename);
                 // 建立 XML 讀取器
-                XmlReaderSettings settings = new XmlReaderSettings
+                var settings = new XmlReaderSettings
                 {
                     IgnoreComments = true, // 不處理註解
                     IgnoreWhitespace = true, // 跳過空白
@@ -205,7 +197,7 @@
                     switch (reader.NodeType)
                     {
                         case XmlNodeType.Element:
-                            string localName = reader.LocalName; // 取得標籤名稱
+                            var localName = reader.LocalName; // 取得標籤名稱
 
                             // Step 3: 讀取 FileInfo 標籤的屬性
                             if (localName.Equals("DBSetting"))
@@ -214,7 +206,7 @@
                                     $"Data source={reader["DataSource"]};User Id={reader["UserId"]};Password={reader["Password"]};";
                                 GlobalVariable.TraceConnectionString =
                                     $"{reader["UserId"]}/{reader["Password"]}@{reader["DataSource"]}";
-                                string token = reader["ConnectionCount"];
+                                var token = reader["ConnectionCount"];
                                 if (token != null)
                                 {
                                     GlobalVariable.MaxConnectionCount = int.Parse(token);
@@ -224,9 +216,10 @@
                                 {
                                     GlobalVariable.ShowError = bool.Parse(token);
                                 }
-                            } else if (localName.Equals("ThreadSetting"))
+                            }
+                            else if (localName.Equals("ThreadSetting"))
                             {
-                                string token = reader["maxThreadSize"];
+                                var token = reader["maxThreadSize"];
                                 if (token != null)
                                 {
                                     GlobalVariable.MaxThreadSize = int.Parse(token);
@@ -252,22 +245,19 @@
 
         private OracleConnection CreateConnection()
         {
-            OracleConnectionStringBuilder builder = new OracleConnectionStringBuilder(GlobalVariable.ConnectionString)
+            var builder = new OracleConnectionStringBuilder(GlobalVariable.ConnectionString)
             {
                 MaxPoolSize = GlobalVariable.MaxConnectionCount,
                 MinPoolSize = 1,
                 Pooling = true
             };
-            string connectstring = builder.ToString();
+            var connectstring = builder.ToString();
 
-            OracleConnection dbConn = new OracleConnection(connectstring);
-
+            var dbConn = new OracleConnection(connectstring);
             dbConn.Open();
-
             return dbConn;
         }
 
-        [MethodImpl(MethodImplOptions.Synchronized)]
         private bool InsertCCSEventRecord(CCSEventRecord ccsEventRecord, OracleConnection conn)
         {
             OracleTransaction transaction = null;
@@ -330,80 +320,76 @@
             return true;
         }
 
-        private delegate void WorkerThreadHandler();
-
-        private void ProcessCase(CCSEventRecord ccsEventRecord, OracleConnection conn)
+        private void ProcessCase(CCSEventRecord ccsEventRecord)
         {
+            Logger.Debug("Enter ProcessCase");
+            OracleConnection conn = CreateConnection();
             try
             {
                 if (conn.State == ConnectionState.Closed)
                     conn.Open();
 
-                ProcessEvent processEvent = new ProcessEvent(ccsEventRecord, conn, GlobalVariable.TraceConnectionString);
-                processEvent.ThreadFinish += ThreadEndEventProcess;
+                var processEvent = new ProcessEvent();
+                processEvent.Run(ccsEventRecord, conn, GlobalVariable.TraceConnectionString);
 
-                ThreadStart threadStart = processEvent.Run;
-                Thread thread = new Thread(threadStart);
-                thread.Start();
+                _wigNextJobs.QueueWorkItem(DoNextJobs);
             }
-            catch
+            catch (Exception e)
             {
-                lock (_syncDbQueue) _dbQueue.Enqueue(conn);
+                Logger.Error(e, e.Message);
             }
+            finally
+            {
+                conn?.Close();
+            }
+            Logger.Debug("Leave ProcessCase");
         }
 
-        private void ThreadEndEventProcess(object sender, ThreadEndEvent e)
-        {
-            //將EVETNQUERY的案件狀態改為開始處理
-            CCSEventRecord waitCcsEventRecord = GetWaitRecord(e.GetConnection());
-
-            try
-            {
-                if (waitCcsEventRecord != null)
-                    ProcessCase(waitCcsEventRecord, e.GetConnection());
-            }
-            finally 
-            {
-                lock (_syncDbQueue) _dbQueue.Enqueue(e.GetConnection());
-            }
-        }
-
-        [MethodImpl(MethodImplOptions.Synchronized)]
+        // [MethodImpl(MethodImplOptions.Synchronized)]
         private CCSEventRecord GetWaitRecord(OracleConnection conn)
         {
-            string processCcsid = "";
-            string ccsid = "";
+            var processCcsid = "";
+            var ccsid = "";
             CCSEventRecord ccsEventRecord = null;
 
-            foreach (var obj in _waitingCases)
+            lock (_syncWaitingCases)
             {
-                var ccsId = obj.Key;
-                var fdrid = obj.Value;
-
-                if (ProcessingFdr.Contains(fdrid)) //該條饋線仍有案件在處理中
-                    processCcsid = processCcsid + "'" + ccsid + "',";
-                else
+                foreach (var obj in _waitingCases)
                 {
-                    ccsid = ccsId;
-                    _waitingCases.Remove(obj);
-                    break;
+                    var ccsId = obj.Key;
+                    var fdrid = obj.Value;
+
+                    if (ContainProcessingFdr(fdrid)) //該條饋線仍有案件在處理中
+                        processCcsid = processCcsid + "'" + ccsid + "',";
+                    else
+                    {
+                        ccsid = ccsId;
+                        _waitingCases.Remove(obj);
+                        break;
+                    }
                 }
             }
 
-            if (ccsid.Length == 0)  //沒有因同饋線而在等候中的案件
+            if (ccsid.Length == 0) //沒有因同饋線而在等候中的案件
             {
-                var sqlStmt = "SELECT Q.CCSID AS CCSID FROM CCS.EVENTQUERY Q,CCS.EVENTRECORD R WHERE Q.CASESTATUS IN (" + 
-                    (int) CCSCaseState.EventInitial + "," + 
+                var sqlStmt =
+                    "SELECT Q.CCSID AS CCSID FROM CCS.EVENTQUERY Q,CCS.EVENTRECORD R WHERE Q.CASESTATUS IN (" +
+                    (int) CCSCaseState.EventInitial + "," +
                     (int) CCSCaseState.EventProcess + ")";
 
-                processCcsid = _processingCases.Aggregate(processCcsid, (current, item) => current + "'" + item + "',");
+                lock (_processingCases)
+                {
+                    processCcsid = _processingCases.Aggregate(processCcsid,
+                        (current, item) => current + "'" + item + "',");
+                }
 
                 if (processCcsid.Length != 0)
-                    sqlStmt = sqlStmt + " AND Q.CCSID NOT IN (" + processCcsid.Substring(0, processCcsid.Length - 1) + ")";
+                    sqlStmt = sqlStmt + " AND Q.CCSID NOT IN (" + processCcsid.Substring(0, processCcsid.Length - 1) +
+                              ")";
 
                 sqlStmt = sqlStmt + " AND Q.CCSID = R.CCSID AND ROWNUM < 2 ORDER BY Q.CHANGETIME";
-                OracleCommand command = new OracleCommand(sqlStmt, conn);
-                OracleDataReader reader = command.ExecuteReader();
+                var command = new OracleCommand(sqlStmt, conn);
+                var reader = command.ExecuteReader();
 
                 try
                 {
@@ -426,14 +412,14 @@
 
             if (ccsEventRecord != null)
             {
-                CCSEventQuery ccsEventQuery = new CCSEventQuery
+                var ccsEventQuery = new CCSEventQuery
                 {
                     CcsId = ccsEventRecord.CcsId,
                     CaseStatus = (int) CCSCaseState.EventProcess
                 };
                 //先將EVETNQUERY的案件狀態改為開始處理
 
-                OracleTransaction transaction = conn.BeginTransaction();
+                var transaction = conn.BeginTransaction();
 
                 try
                 {
@@ -441,7 +427,10 @@
                     {
                         Logger.Info("更新EVENTQUERY的案件狀態為處理中.(CCSID = " + ccsEventRecord.CcsId + ")");
                         transaction.Commit();
-                        _processingCases.Add(ccsEventRecord.CcsId);
+                        lock (_syncProcessingCases)
+                        {
+                            _processingCases.Add(ccsEventRecord.CcsId);
+                        }
                     }
                     else
                     {
@@ -463,5 +452,45 @@
 
             return ccsEventRecord;
         }
+
+        public void AddWaitingCases(string ccsId, int fdrid)
+        {
+            lock (_syncWaitingCases)
+            {
+                _waitingCases.Add(new KeyValuePair<string, int>(ccsId, fdrid));
+            }
+        }
+
+        public void AddProcessingFdr(int fdrid)
+        {
+            lock (_syncProcessingFdrs)
+            {
+                _processingFdr.Add(fdrid);
+            }
+        }
+
+        public bool ContainProcessingFdr(int fdrid)
+        {
+            lock (_syncProcessingFdrs)
+            {
+                return _processingFdr.Contains(fdrid);
+            }
+        }
+
+        public void RemoveProcessingFdr(int fdrId)
+        {
+            lock (_syncProcessingFdrs)
+            {
+                _processingFdr.Remove(fdrId);
+            }
+        }
+
+        public void RemoveProcessingCases(string ccsId)
+        {
+            lock (_syncProcessingCases)
+            {
+                _processingCases.Remove(ccsId);
+            }
+        }
     }
 }
\ No newline at end of file

--
Gitblit v0.0.0-SNAPSHOT