From 540014a7702a9bae7a3b9c00098671a132e869e8 Mon Sep 17 00:00:00 2001 From: ulysseskao <ulysseskao@gmail.com> Date: Thu, 05 May 2016 10:21:11 +0800 Subject: [PATCH] Merge remote-tracking branch 'origin/usestp' --- CCSTrace/CCS/CCSMain.cs | 335 ++++++++++++++++++++++++++++++------------------------- 1 files changed, 182 insertions(+), 153 deletions(-) diff --git a/CCSTrace/CCS/CCSMain.cs b/CCSTrace/CCS/CCSMain.cs index a9a480b..81b520d 100644 --- a/CCSTrace/CCS/CCSMain.cs +++ b/CCSTrace/CCS/CCSMain.cs @@ -1,13 +1,9 @@ using System; -using System.Collections; using System.Collections.Generic; using System.Data; using System.Data.OracleClient; using System.IO; using System.Linq; -using System.Runtime.CompilerServices; -using System.Threading; -using System.Web; using System.Xml; using Amib.Threading; using CCSTrace.CCS.Domain; @@ -24,31 +20,31 @@ private static readonly Logger Logger = LogManager.GetCurrentClassLogger(); private static CcsMain _instance; - private readonly int _connectionCount = 1; - private OracleConnection _mainConn = null; - - private SmartThreadPool _mainjobThreadPool = null; - private readonly object _syncDbQueue = new object(); - private readonly Queue<OracleConnection> _dbQueue = new Queue<OracleConnection>(); private readonly List<string> _processingCases = new List<string>(); - private readonly List<KeyValuePair<string, int>> _waitingCases = new List<KeyValuePair<string, int>>();//Record the case when the case's FDR processed by another + //Record the case when the case's FDR processed by another + private readonly SortedSet<int> _processingFdr = new SortedSet<int>(); + // private readonly object _syncDbQueue = new object(); + // private readonly Queue<OracleConnection> _dbQueue = new Queue<OracleConnection>(); + private readonly object _syncProcessingCases = new object(); + private readonly object _syncProcessingFdrs = new object(); + private readonly object _syncWaitingCases = new object(); + private readonly List<KeyValuePair<string, int>> _waitingCases = new List<KeyValuePair<string, int>>(); + - public CcsMain() - { - _mainjobThreadPool = new SmartThreadPool(); - // Startup(); - } + // private readonly int _connectionCount = 1; + private OracleConnection _mainConn; + + private SmartThreadPool _mainjobThreadPool; + + private IWorkItemsGroup _wigFinsh; + private IWorkItemsGroup _wigNextJobs; + private IWorkItemsGroup _wigProcess; + public string AppDataPath { get; set; } - public SortedSet<int> ProcessingFdr { get; } = new SortedSet<int>(); - public static CcsMain Instance => _instance ?? (_instance = new CcsMain()); - - public List<string> ProcessingCases => _processingCases; - - public List<KeyValuePair<string, int>> WaitingCases => _waitingCases; public void Startup() { @@ -56,8 +52,8 @@ try { - ReadXml(); - Logger.Info("CCSMain ReadXml Complete."); + ReadConfigXml(); + Logger.Info("CCSMain ReadConfigXml Complete."); if (_mainConn == null) _mainConn = CreateConnection(); @@ -66,10 +62,12 @@ GlobalVariable.CcsCodelist = new CcsCodelist(_mainConn); GlobalVariable.EosCodelist = new EosCodelist(_mainConn); + /* for (var i = 0; i < _connectionCount; i++) { lock (_syncDbQueue) _dbQueue.Enqueue(CreateConnection()); } + */ } catch (Exception e) { @@ -79,7 +77,34 @@ AddrContrast.Instance.Initialize(_mainConn); AlarmData.Instance.Initialize(_mainConn); + DeptContrast.Instance.Initialize(_mainConn); + var stpStartInfo = new STPStartInfo + { + StartSuspended = true, + IdleTimeout = GlobalVariable.IdleTimeout*1000, + MaxWorkerThreads = GlobalVariable.MaxThreadSize, + MinWorkerThreads = GlobalVariable.MinThreadSize + }; + _mainjobThreadPool = new SmartThreadPool(stpStartInfo); + var wigStartInfo = new WIGStartInfo + { + FillStateWithArgs = true + }; + _wigProcess = _mainjobThreadPool.CreateWorkItemsGroup(1, wigStartInfo); + _wigFinsh = _mainjobThreadPool.CreateWorkItemsGroup(1, wigStartInfo); + _wigNextJobs = _mainjobThreadPool.CreateWorkItemsGroup(1, wigStartInfo); + + _mainjobThreadPool.Start(); + _wigNextJobs.Start(); + _wigProcess.Start(); + _wigFinsh.Start(); + + Logger.Info("Create Thread Pool."); + + // doNextJob + _wigNextJobs.QueueWorkItem(DoNextJobs); + /* CCSEventRecord ccsEventRecord = GetWaitRecord(_mainConn); if (ccsEventRecord != null) @@ -88,15 +113,7 @@ lock (_syncDbQueue) conn = _dbQueue.Dequeue(); ProcessCase(ccsEventRecord, conn); } - - STPStartInfo stpStartInfo = new STPStartInfo - { - IdleTimeout = GlobalVariable.IdleTimeout*1000, - MaxWorkerThreads = GlobalVariable.MaxThreadSize, - MinWorkerThreads = GlobalVariable.MinThreadSize - }; - _mainjobThreadPool = new SmartThreadPool(stpStartInfo); - Logger.Info("Create Thread Pool."); + */ } public void Shutdown() @@ -108,90 +125,65 @@ GC.WaitForPendingFinalizers(); } - public void AcceptEvent2(CCSEventRecord ccsEventRecord) + private void DoNextJobs() { - // WorkItemCallback workItemCallback = new WorkItemCallback(this.DoEventWork); - _mainjobThreadPool.QueueWorkItem(DoEventWork, ccsEventRecord); - } - - private void DoEventWork(CCSEventRecord state) - { - OracleConnection conn; - lock (_syncDbQueue) conn = _dbQueue.Dequeue(); + Logger.Debug("Enter DoNextJobs"); + var conn = CreateConnection(); try { - AcceptEvent(state); + var ccsEventRecord = GetWaitRecord(conn); + + if (ccsEventRecord != null) + { + _wigProcess.QueueWorkItem(ProcessCase, ccsEventRecord); + } } finally { - lock (_syncDbQueue) _dbQueue.Enqueue(conn); + conn?.Close(); + Logger.Debug("Leave DoNextJobs"); } - - } public void AcceptEvent(CCSEventRecord ccsEventRecord) { - /* - WIGStartInfo wigStartInfo = new WIGStartInfo() + _mainjobThreadPool.QueueWorkItem(AcceptWebServiceEvent, ccsEventRecord); + } + + private void AcceptWebServiceEvent(CCSEventRecord ccsEventRecord) + { + Logger.Debug("Enter AcceptWebServiceEvent"); + var conn = CreateConnection(); + + try { - FillStateWithArgs = true, - }; - _wig1 = _smartThreadPool.CreateWorkItemsGroup((int)spinCon1.Value, wigStartInfo); - _wig2 = _smartThreadPool.CreateWorkItemsGroup((int)spinCon2.Value, wigStartInfo); - - */ - - int i = 0; - int reConnectCount = 1; - - while (i <= reConnectCount) + if (InsertCCSEventRecord(ccsEventRecord, conn)) + { + _wigNextJobs.QueueWorkItem(DoNextJobs); + } + } + catch (Exception e) { - try - { - if (InsertCCSEventRecord(ccsEventRecord, _mainConn)) - { - if (_dbQueue.Count > 0) - { - //將EVETNQUERY的案件狀態改為開始處理 - CCSEventRecord waitCcsEventRecord = GetWaitRecord(_mainConn); - - if (waitCcsEventRecord != null) - { - OracleConnection conn; - lock (_syncDbQueue) conn = _dbQueue.Dequeue(); - ProcessCase(waitCcsEventRecord, conn); - } - } - - break; - } - } - catch (Exception e) - { - Logger.Error(e, e.Message); - if (_mainConn.State == ConnectionState.Closed) - { - i++; - if (i > reConnectCount) - throw; - } - else - throw; - } + Logger.Error(e, e.Message); + } + finally + { + conn?.Close(); + Logger.Debug("Leave AcceptWebServiceEvent"); } } - private void ReadXml() + + private void ReadConfigXml() { XmlReader reader = null; try { - string file = Path.Combine(AppDataPath, DbConfigFilename); + var file = Path.Combine(AppDataPath, DbConfigFilename); // 建立 XML 讀取器 - XmlReaderSettings settings = new XmlReaderSettings + var settings = new XmlReaderSettings { IgnoreComments = true, // 不處理註解 IgnoreWhitespace = true, // 跳過空白 @@ -205,7 +197,7 @@ switch (reader.NodeType) { case XmlNodeType.Element: - string localName = reader.LocalName; // 取得標籤名稱 + var localName = reader.LocalName; // 取得標籤名稱 // Step 3: 讀取 FileInfo 標籤的屬性 if (localName.Equals("DBSetting")) @@ -214,7 +206,7 @@ $"Data source={reader["DataSource"]};User Id={reader["UserId"]};Password={reader["Password"]};"; GlobalVariable.TraceConnectionString = $"{reader["UserId"]}/{reader["Password"]}@{reader["DataSource"]}"; - string token = reader["ConnectionCount"]; + var token = reader["ConnectionCount"]; if (token != null) { GlobalVariable.MaxConnectionCount = int.Parse(token); @@ -224,9 +216,10 @@ { GlobalVariable.ShowError = bool.Parse(token); } - } else if (localName.Equals("ThreadSetting")) + } + else if (localName.Equals("ThreadSetting")) { - string token = reader["maxThreadSize"]; + var token = reader["maxThreadSize"]; if (token != null) { GlobalVariable.MaxThreadSize = int.Parse(token); @@ -252,22 +245,19 @@ private OracleConnection CreateConnection() { - OracleConnectionStringBuilder builder = new OracleConnectionStringBuilder(GlobalVariable.ConnectionString) + var builder = new OracleConnectionStringBuilder(GlobalVariable.ConnectionString) { MaxPoolSize = GlobalVariable.MaxConnectionCount, MinPoolSize = 1, Pooling = true }; - string connectstring = builder.ToString(); + var connectstring = builder.ToString(); - OracleConnection dbConn = new OracleConnection(connectstring); - + var dbConn = new OracleConnection(connectstring); dbConn.Open(); - return dbConn; } - [MethodImpl(MethodImplOptions.Synchronized)] private bool InsertCCSEventRecord(CCSEventRecord ccsEventRecord, OracleConnection conn) { OracleTransaction transaction = null; @@ -330,80 +320,76 @@ return true; } - private delegate void WorkerThreadHandler(); - - private void ProcessCase(CCSEventRecord ccsEventRecord, OracleConnection conn) + private void ProcessCase(CCSEventRecord ccsEventRecord) { + Logger.Debug("Enter ProcessCase"); + OracleConnection conn = CreateConnection(); try { if (conn.State == ConnectionState.Closed) conn.Open(); - ProcessEvent processEvent = new ProcessEvent(ccsEventRecord, conn, GlobalVariable.TraceConnectionString); - processEvent.ThreadFinish += ThreadEndEventProcess; + var processEvent = new ProcessEvent(); + processEvent.Run(ccsEventRecord, conn, GlobalVariable.TraceConnectionString); - ThreadStart threadStart = processEvent.Run; - Thread thread = new Thread(threadStart); - thread.Start(); + _wigNextJobs.QueueWorkItem(DoNextJobs); } - catch + catch (Exception e) { - lock (_syncDbQueue) _dbQueue.Enqueue(conn); + Logger.Error(e, e.Message); } + finally + { + conn?.Close(); + } + Logger.Debug("Leave ProcessCase"); } - private void ThreadEndEventProcess(object sender, ThreadEndEvent e) - { - //將EVETNQUERY的案件狀態改為開始處理 - CCSEventRecord waitCcsEventRecord = GetWaitRecord(e.GetConnection()); - - try - { - if (waitCcsEventRecord != null) - ProcessCase(waitCcsEventRecord, e.GetConnection()); - } - finally - { - lock (_syncDbQueue) _dbQueue.Enqueue(e.GetConnection()); - } - } - - [MethodImpl(MethodImplOptions.Synchronized)] + // [MethodImpl(MethodImplOptions.Synchronized)] private CCSEventRecord GetWaitRecord(OracleConnection conn) { - string processCcsid = ""; - string ccsid = ""; + var processCcsid = ""; + var ccsid = ""; CCSEventRecord ccsEventRecord = null; - foreach (var obj in _waitingCases) + lock (_syncWaitingCases) { - var ccsId = obj.Key; - var fdrid = obj.Value; - - if (ProcessingFdr.Contains(fdrid)) //該條饋線仍有案件在處理中 - processCcsid = processCcsid + "'" + ccsid + "',"; - else + foreach (var obj in _waitingCases) { - ccsid = ccsId; - _waitingCases.Remove(obj); - break; + var ccsId = obj.Key; + var fdrid = obj.Value; + + if (ContainProcessingFdr(fdrid)) //該條饋線仍有案件在處理中 + processCcsid = processCcsid + "'" + ccsid + "',"; + else + { + ccsid = ccsId; + _waitingCases.Remove(obj); + break; + } } } - if (ccsid.Length == 0) //沒有因同饋線而在等候中的案件 + if (ccsid.Length == 0) //沒有因同饋線而在等候中的案件 { - var sqlStmt = "SELECT Q.CCSID AS CCSID FROM CCS.EVENTQUERY Q,CCS.EVENTRECORD R WHERE Q.CASESTATUS IN (" + - (int) CCSCaseState.EventInitial + "," + + var sqlStmt = + "SELECT Q.CCSID AS CCSID FROM CCS.EVENTQUERY Q,CCS.EVENTRECORD R WHERE Q.CASESTATUS IN (" + + (int) CCSCaseState.EventInitial + "," + (int) CCSCaseState.EventProcess + ")"; - processCcsid = _processingCases.Aggregate(processCcsid, (current, item) => current + "'" + item + "',"); + lock (_processingCases) + { + processCcsid = _processingCases.Aggregate(processCcsid, + (current, item) => current + "'" + item + "',"); + } if (processCcsid.Length != 0) - sqlStmt = sqlStmt + " AND Q.CCSID NOT IN (" + processCcsid.Substring(0, processCcsid.Length - 1) + ")"; + sqlStmt = sqlStmt + " AND Q.CCSID NOT IN (" + processCcsid.Substring(0, processCcsid.Length - 1) + + ")"; sqlStmt = sqlStmt + " AND Q.CCSID = R.CCSID AND ROWNUM < 2 ORDER BY Q.CHANGETIME"; - OracleCommand command = new OracleCommand(sqlStmt, conn); - OracleDataReader reader = command.ExecuteReader(); + var command = new OracleCommand(sqlStmt, conn); + var reader = command.ExecuteReader(); try { @@ -426,14 +412,14 @@ if (ccsEventRecord != null) { - CCSEventQuery ccsEventQuery = new CCSEventQuery + var ccsEventQuery = new CCSEventQuery { CcsId = ccsEventRecord.CcsId, CaseStatus = (int) CCSCaseState.EventProcess }; //先將EVETNQUERY的案件狀態改為開始處理 - OracleTransaction transaction = conn.BeginTransaction(); + var transaction = conn.BeginTransaction(); try { @@ -441,7 +427,10 @@ { Logger.Info("更新EVENTQUERY的案件狀態為處理中.(CCSID = " + ccsEventRecord.CcsId + ")"); transaction.Commit(); - _processingCases.Add(ccsEventRecord.CcsId); + lock (_syncProcessingCases) + { + _processingCases.Add(ccsEventRecord.CcsId); + } } else { @@ -463,5 +452,45 @@ return ccsEventRecord; } + + public void AddWaitingCases(string ccsId, int fdrid) + { + lock (_syncWaitingCases) + { + _waitingCases.Add(new KeyValuePair<string, int>(ccsId, fdrid)); + } + } + + public void AddProcessingFdr(int fdrid) + { + lock (_syncProcessingFdrs) + { + _processingFdr.Add(fdrid); + } + } + + public bool ContainProcessingFdr(int fdrid) + { + lock (_syncProcessingFdrs) + { + return _processingFdr.Contains(fdrid); + } + } + + public void RemoveProcessingFdr(int fdrId) + { + lock (_syncProcessingFdrs) + { + _processingFdr.Remove(fdrId); + } + } + + public void RemoveProcessingCases(string ccsId) + { + lock (_syncProcessingCases) + { + _processingCases.Remove(ccsId); + } + } } } \ No newline at end of file -- Gitblit v0.0.0-SNAPSHOT