using System; using System.Collections.Generic; using System.Data; using System.Data.OracleClient; using System.IO; using System.Linq; using System.Xml; using Amib.Threading; using CCSTrace.CCS.Domain; using CCSTrace.CCS.Object; using Iesi.Collections.Generic; using NLog; namespace CCSTrace.CCS { public class CcsMain { private const string DbConfigFilename = "DBConfig.xml"; private static readonly Logger Logger = LogManager.GetCurrentClassLogger(); private static CcsMain _instance; private readonly List _processingCases = new List(); //Record the case when the case's FDR processed by another private readonly SortedSet _processingFdr = new SortedSet(); // private readonly object _syncDbQueue = new object(); // private readonly Queue _dbQueue = new Queue(); private readonly object _syncProcessingCases = new object(); private readonly object _syncProcessingFdrs = new object(); private readonly object _syncWaitingCases = new object(); private readonly List> _waitingCases = new List>(); // private readonly int _connectionCount = 1; private OracleConnection _mainConn; private SmartThreadPool _mainjobThreadPool; private IWorkItemsGroup _wigFinsh; private IWorkItemsGroup _wigNextJobs; private IWorkItemsGroup _wigProcess; public string AppDataPath { get; set; } public static CcsMain Instance => _instance ?? (_instance = new CcsMain()); public void Startup() { Logger.Info("CCSMain Startup."); try { ReadConfigXml(); Logger.Info("CCSMain ReadConfigXml Complete."); if (_mainConn == null) _mainConn = CreateConnection(); Logger.Info("CCSMain has been connected database."); GlobalVariable.CcsCodelist = new CcsCodelist(_mainConn); GlobalVariable.EosCodelist = new EosCodelist(_mainConn); /* for (var i = 0; i < _connectionCount; i++) { lock (_syncDbQueue) _dbQueue.Enqueue(CreateConnection()); } */ } catch (Exception e) { Logger.Error(e, e.Message); throw; } AddrContrast.Instance.Initialize(_mainConn); AlarmData.Instance.Initialize(_mainConn); DeptContrast.Instance.Initialize(_mainConn); var stpStartInfo = new STPStartInfo { StartSuspended = true, IdleTimeout = GlobalVariable.IdleTimeout*1000, MaxWorkerThreads = GlobalVariable.MaxThreadSize, MinWorkerThreads = GlobalVariable.MinThreadSize }; _mainjobThreadPool = new SmartThreadPool(stpStartInfo); var wigStartInfo = new WIGStartInfo { FillStateWithArgs = true }; _wigProcess = _mainjobThreadPool.CreateWorkItemsGroup(1, wigStartInfo); _wigFinsh = _mainjobThreadPool.CreateWorkItemsGroup(1, wigStartInfo); _wigNextJobs = _mainjobThreadPool.CreateWorkItemsGroup(1, wigStartInfo); _mainjobThreadPool.Start(); _wigNextJobs.Start(); _wigProcess.Start(); _wigFinsh.Start(); Logger.Info("Create Thread Pool."); // doNextJob _wigNextJobs.QueueWorkItem(DoNextJobs); /* CCSEventRecord ccsEventRecord = GetWaitRecord(_mainConn); if (ccsEventRecord != null) { OracleConnection conn; lock (_syncDbQueue) conn = _dbQueue.Dequeue(); ProcessCase(ccsEventRecord, conn); } */ } public void Shutdown() { _mainjobThreadPool.Shutdown(true, 1000); _mainjobThreadPool.Dispose(); _mainjobThreadPool = null; GC.Collect(); GC.WaitForPendingFinalizers(); } private void DoNextJobs() { Logger.Debug("Enter DoNextJobs"); var conn = CreateConnection(); try { var ccsEventRecord = GetWaitRecord(conn); if (ccsEventRecord != null) { _wigProcess.QueueWorkItem(ProcessCase, ccsEventRecord); } } finally { conn?.Close(); Logger.Debug("Leave DoNextJobs"); } } public void AcceptEvent(CCSEventRecord ccsEventRecord) { _mainjobThreadPool.QueueWorkItem(AcceptWebServiceEvent, ccsEventRecord); } private void AcceptWebServiceEvent(CCSEventRecord ccsEventRecord) { Logger.Debug("Enter AcceptWebServiceEvent"); var conn = CreateConnection(); try { if (InsertCCSEventRecord(ccsEventRecord, conn)) { _wigNextJobs.QueueWorkItem(DoNextJobs); } } catch (Exception e) { Logger.Error(e, e.Message); } finally { conn?.Close(); Logger.Debug("Leave AcceptWebServiceEvent"); } } private void ReadConfigXml() { XmlReader reader = null; try { var file = Path.Combine(AppDataPath, DbConfigFilename); // 建立 XML 讀取器 var settings = new XmlReaderSettings { IgnoreComments = true, // 不處理註解 IgnoreWhitespace = true, // 跳過空白 ValidationType = ValidationType.None // 不驗證任何資料 }; reader = XmlReader.Create(file, settings); // 進入讀取主要部分 while (reader.Read()) { switch (reader.NodeType) { case XmlNodeType.Element: var localName = reader.LocalName; // 取得標籤名稱 // Step 3: 讀取 FileInfo 標籤的屬性 if (localName.Equals("DBSetting")) { GlobalVariable.ConnectionString = $"Data source={reader["DataSource"]};User Id={reader["UserId"]};Password={reader["Password"]};"; GlobalVariable.TraceConnectionString = $"{reader["UserId"]}/{reader["Password"]}@{reader["DataSource"]}"; var token = reader["ConnectionCount"]; if (token != null) { GlobalVariable.MaxConnectionCount = int.Parse(token); } token = reader["ShowError"]; if (token != null) { GlobalVariable.ShowError = bool.Parse(token); } } else if (localName.Equals("ThreadSetting")) { var token = reader["maxThreadSize"]; if (token != null) { GlobalVariable.MaxThreadSize = int.Parse(token); } token = reader["minThreadSize"]; if (token != null) { GlobalVariable.MinThreadSize = int.Parse(token); } } break; } } reader.Close(); } catch (XmlException xe) { Logger.Error(xe, xe.Message); reader?.Close(); } } private OracleConnection CreateConnection() { var builder = new OracleConnectionStringBuilder(GlobalVariable.ConnectionString) { MaxPoolSize = GlobalVariable.MaxConnectionCount, MinPoolSize = 1, Pooling = true }; var connectstring = builder.ToString(); var dbConn = new OracleConnection(connectstring); dbConn.Open(); return dbConn; } private bool InsertCCSEventRecord(CCSEventRecord ccsEventRecord, OracleConnection conn) { OracleTransaction transaction = null; try { if (conn.State == ConnectionState.Closed) conn.Open(); transaction = conn.BeginTransaction(); if (ccsEventRecord.InsertDb(conn, transaction)) { var ccsEventQuery = new CCSEventQuery { CcsId = ccsEventRecord.CcsId, Meter = ccsEventRecord.Meter, CaseStatus = (int) CCSCaseState.EventInitial, ChangeTime = ccsEventRecord.AcceptTime }; if (ccsEventQuery.Insert(conn, transaction)) transaction.Commit(); else { if (transaction.Connection.State == ConnectionState.Open) transaction.Rollback(); throw new Exception("案件未受理成功。"); } } else { if (transaction.Connection.State == ConnectionState.Open) transaction.Rollback(); throw new Exception("案件未受理成功。"); } } catch (OracleException e) { Logger.Error(e, e.Message); if (transaction != null && (transaction.Connection.State == ConnectionState.Open)) transaction.Rollback(); throw; } catch (Exception ex) { Logger.Error(ex, ex.Message); if (transaction != null && (transaction.Connection.State == ConnectionState.Open)) transaction.Rollback(); throw; } return true; } private void ProcessCase(CCSEventRecord ccsEventRecord) { Logger.Debug("Enter ProcessCase"); OracleConnection conn = CreateConnection(); try { if (conn.State == ConnectionState.Closed) conn.Open(); var processEvent = new ProcessEvent(); processEvent.Run(ccsEventRecord, conn, GlobalVariable.TraceConnectionString); _wigNextJobs.QueueWorkItem(DoNextJobs); } catch (Exception e) { Logger.Error(e, e.Message); } finally { conn?.Close(); } Logger.Debug("Leave ProcessCase"); } // [MethodImpl(MethodImplOptions.Synchronized)] private CCSEventRecord GetWaitRecord(OracleConnection conn) { var processCcsid = ""; var ccsid = ""; CCSEventRecord ccsEventRecord = null; lock (_syncWaitingCases) { foreach (var obj in _waitingCases) { var ccsId = obj.Key; var fdrid = obj.Value; if (ContainProcessingFdr(fdrid)) //該條饋線仍有案件在處理中 processCcsid = processCcsid + "'" + ccsid + "',"; else { ccsid = ccsId; _waitingCases.Remove(obj); break; } } } if (ccsid.Length == 0) //沒有因同饋線而在等候中的案件 { var sqlStmt = "SELECT Q.CCSID AS CCSID FROM CCS.EVENTQUERY Q,CCS.EVENTRECORD R WHERE Q.CASESTATUS IN (" + (int) CCSCaseState.EventInitial + "," + (int) CCSCaseState.EventProcess + ")"; lock (_processingCases) { processCcsid = _processingCases.Aggregate(processCcsid, (current, item) => current + "'" + item + "',"); } if (processCcsid.Length != 0) sqlStmt = sqlStmt + " AND Q.CCSID NOT IN (" + processCcsid.Substring(0, processCcsid.Length - 1) + ")"; sqlStmt = sqlStmt + " AND Q.CCSID = R.CCSID AND ROWNUM < 2 ORDER BY Q.CHANGETIME"; var command = new OracleCommand(sqlStmt, conn); var reader = command.ExecuteReader(); try { if (reader.Read()) ccsid = reader["CCSID"].ToString(); } catch (Exception e) { Logger.Error(e, e.Message); } finally { reader.Close(); command.Dispose(); } } if (ccsid.Length != 0) ccsEventRecord = new CCSEventRecord(ccsid, conn); if (ccsEventRecord != null) { var ccsEventQuery = new CCSEventQuery { CcsId = ccsEventRecord.CcsId, CaseStatus = (int) CCSCaseState.EventProcess }; //先將EVETNQUERY的案件狀態改為開始處理 var transaction = conn.BeginTransaction(); try { if (ccsEventQuery.UpdateCaseStatus(conn, transaction)) { Logger.Info("更新EVENTQUERY的案件狀態為處理中.(CCSID = " + ccsEventRecord.CcsId + ")"); transaction.Commit(); lock (_syncProcessingCases) { _processingCases.Add(ccsEventRecord.CcsId); } } else { Logger.Error("無法更新EVENTQUERY的案件狀態.(CCSID = " + ccsEventRecord.CcsId + ")"); if (transaction.Connection.State == ConnectionState.Open) transaction.Rollback(); } } catch (Exception e) { if (transaction.Connection.State == ConnectionState.Open) transaction.Rollback(); Logger.Error(e, e.Message); ccsEventRecord = null; } } return ccsEventRecord; } public void AddWaitingCases(string ccsId, int fdrid) { lock (_syncWaitingCases) { _waitingCases.Add(new KeyValuePair(ccsId, fdrid)); } } public void AddProcessingFdr(int fdrid) { lock (_syncProcessingFdrs) { _processingFdr.Add(fdrid); } } public bool ContainProcessingFdr(int fdrid) { lock (_syncProcessingFdrs) { return _processingFdr.Contains(fdrid); } } public void RemoveProcessingFdr(int fdrId) { lock (_syncProcessingFdrs) { _processingFdr.Remove(fdrId); } } public void RemoveProcessingCases(string ccsId) { lock (_syncProcessingCases) { _processingCases.Remove(ccsId); } } } }