using System; using System.Collections; using System.Collections.Generic; using System.Data; using System.Data.OracleClient; using System.IO; using System.Linq; using System.Runtime.CompilerServices; using System.Threading; using System.Web; using System.Xml; using Amib.Threading; using CCSTrace.CCS.Domain; using CCSTrace.CCS.Object; using Iesi.Collections.Generic; using NLog; namespace CCSTrace.CCS { public class CcsMain { private const string DbConfigFilename = "DBConfig.xml"; private static readonly Logger Logger = LogManager.GetCurrentClassLogger(); private static CcsMain _instance; private readonly int _connectionCount = 1; private OracleConnection _mainConn = null; private SmartThreadPool _mainjobThreadPool = null; private readonly object _syncDbQueue = new object(); private readonly Queue _dbQueue = new Queue(); private readonly List _processingCases = new List(); private readonly List> _waitingCases = new List>();//Record the case when the case's FDR processed by another public CcsMain() { _mainjobThreadPool = new SmartThreadPool(); // Startup(); } public string AppDataPath { get; set; } public SortedSet ProcessingFdr { get; } = new SortedSet(); public static CcsMain Instance => _instance ?? (_instance = new CcsMain()); public List ProcessingCases => _processingCases; public List> WaitingCases => _waitingCases; public void Startup() { Logger.Info("CCSMain Startup."); try { ReadXml(); Logger.Info("CCSMain ReadXml Complete."); if (_mainConn == null) _mainConn = CreateConnection(); Logger.Info("CCSMain has been connected database."); GlobalVariable.CcsCodelist = new CcsCodelist(_mainConn); GlobalVariable.EosCodelist = new EosCodelist(_mainConn); for (var i = 0; i < _connectionCount; i++) { lock (_syncDbQueue) _dbQueue.Enqueue(CreateConnection()); } } catch (Exception e) { Logger.Error(e, e.Message); throw; } var addrContrast = AddrContrast.Instance; addrContrast.initialize(_mainConn); CCSEventRecord ccsEventRecord = GetWaitRecord(_mainConn); if (ccsEventRecord != null) { OracleConnection conn; lock (_syncDbQueue) conn = _dbQueue.Dequeue(); ProcessCase(ccsEventRecord, conn); } STPStartInfo stpStartInfo = new STPStartInfo { IdleTimeout = GlobalVariable.IdleTimeout*1000, MaxWorkerThreads = GlobalVariable.MaxThreadSize, MinWorkerThreads = GlobalVariable.MinThreadSize }; _mainjobThreadPool = new SmartThreadPool(stpStartInfo); Logger.Info("Create Thread Pool."); } public void Shutdown() { _mainjobThreadPool.Shutdown(true, 1000); _mainjobThreadPool.Dispose(); _mainjobThreadPool = null; GC.Collect(); GC.WaitForPendingFinalizers(); } public void AcceptEvent(CCSEventRecord ccsEventRecord) { /* WorkItemCallback workItemCallback = new WorkItemCallback(this.DoWork); _mainjobThreadPool.QueueWorkItem(workItemCallback, ccsEventRecord); WIGStartInfo wigStartInfo = new WIGStartInfo() { FillStateWithArgs = true, }; _wig1 = _smartThreadPool.CreateWorkItemsGroup((int)spinCon1.Value, wigStartInfo); _wig2 = _smartThreadPool.CreateWorkItemsGroup((int)spinCon2.Value, wigStartInfo); */ int i = 0; int reConnectCount = 1; while (i <= reConnectCount) { try { if (InsertCCSEventRecord(ccsEventRecord, _mainConn)) { if (_dbQueue.Count > 0) { //將EVETNQUERY的案件狀態改為開始處理 CCSEventRecord waitCcsEventRecord = GetWaitRecord(_mainConn); if (waitCcsEventRecord != null) { OracleConnection conn = null; lock (_syncDbQueue) conn = _dbQueue.Dequeue(); ProcessCase(waitCcsEventRecord, conn); } } break; } } catch (Exception e) { Logger.Error(e, e.Message); if (_mainConn.State == ConnectionState.Closed) { i++; if (i > reConnectCount) throw; } else throw; } } } private void ReadXml() { XmlReader reader = null; try { string file = Path.Combine(AppDataPath, DbConfigFilename); // 建立 XML 讀取器 XmlReaderSettings settings = new XmlReaderSettings(); settings.IgnoreComments = true; // 不處理註解 settings.IgnoreWhitespace = true; // 跳過空白 settings.ValidationType = ValidationType.None; // 不驗證任何資料 reader = XmlReader.Create(file, settings); // 進入讀取主要部分 while (reader.Read()) { switch (reader.NodeType) { case XmlNodeType.Element: string localName = reader.LocalName; // 取得標籤名稱 // Step 3: 讀取 FileInfo 標籤的屬性 if (localName.Equals("DBSetting")) { GlobalVariable.ConnectionString = $"Data source={reader["DataSource"]};User Id={reader["UserId"]};Password={reader["Password"]};"; GlobalVariable.TraceConnectionString = $"{reader["UserId"]}/{reader["Password"]}@{reader["DataSource"]}"; string token = reader["ConnectionCount"]; if (token != null) { GlobalVariable.MaxConnectionCount = int.Parse(token); } token = reader["ShowError"]; if (token != null) { GlobalVariable.ShowError = bool.Parse(token); } } else if (localName.Equals("ThreadSetting")) { string token = reader["maxThreadSize"]; if (token != null) { GlobalVariable.MaxThreadSize = int.Parse(token); } token = reader["minThreadSize"]; if (token != null) { GlobalVariable.MinThreadSize = int.Parse(token); } } break; } } reader.Close(); } catch (XmlException xe) { Logger.Error(xe, xe.Message); reader?.Close(); } } private OracleConnection CreateConnection() { OracleConnectionStringBuilder builder = new OracleConnectionStringBuilder(GlobalVariable.ConnectionString) { MaxPoolSize = 20, MinPoolSize = 5, Pooling = true }; string connectstring = builder.ToString(); OracleConnection dbConn = new OracleConnection(connectstring); dbConn.Open(); return dbConn; } [MethodImpl(MethodImplOptions.Synchronized)] private bool InsertCCSEventRecord(CCSEventRecord ccsEventRecord, OracleConnection conn) { OracleTransaction transaction = null; try { if (conn.State == ConnectionState.Closed) conn.Open(); transaction = conn.BeginTransaction(); if (ccsEventRecord.InsertDb(conn, transaction)) { var ccsEventQuery = new CCSEventQuery { CcsId = ccsEventRecord.CcsId, Meter = ccsEventRecord.Meter, CaseStatus = (int) CCSCaseState.EventInitial, ChangeTime = ccsEventRecord.AcceptTime }; if (ccsEventQuery.Insert(conn, transaction)) transaction.Commit(); else { if (transaction.Connection.State == ConnectionState.Open) transaction.Rollback(); throw new Exception("案件未受理成功。"); } } else { if (transaction.Connection.State == ConnectionState.Open) transaction.Rollback(); throw new Exception("案件未受理成功。"); } } catch (OracleException e) { Logger.Error(e, e.Message); if (transaction != null && (transaction.Connection.State == ConnectionState.Open)) transaction.Rollback(); throw; } catch (Exception ex) { Logger.Error(ex, ex.Message); if (transaction != null && (transaction.Connection.State == ConnectionState.Open)) transaction.Rollback(); throw; } return true; } private delegate void WorkerThreadHandler(); private void ProcessCase(CCSEventRecord ccsEventRecord, OracleConnection conn) { try { if (conn.State == ConnectionState.Closed) conn.Open(); ProcessEvent processEvent = new ProcessEvent(ccsEventRecord, conn, GlobalVariable.TraceConnectionString); processEvent.ThreadFinish += ThreadEndEventProcess; ThreadStart threadStart = processEvent.Run; Thread thread = new Thread(threadStart); thread.Start(); } catch { lock (_syncDbQueue) _dbQueue.Enqueue(conn); } } private void ThreadEndEventProcess(object sender, ThreadEndEvent e) { //將EVETNQUERY的案件狀態改為開始處理 CCSEventRecord waitCcsEventRecord = GetWaitRecord(e.GetConnection()); try { if (waitCcsEventRecord != null) ProcessCase(waitCcsEventRecord, e.GetConnection()); } finally { lock (_syncDbQueue) _dbQueue.Enqueue(e.GetConnection()); } } [MethodImpl(MethodImplOptions.Synchronized)] private CCSEventRecord GetWaitRecord(OracleConnection conn) { string processCcsid = ""; string ccsid = ""; CCSEventRecord ccsEventRecord = null; foreach (var obj in _waitingCases) { var ccsId = obj.Key; var fdrid = obj.Value; if (ProcessingFdr.Contains(fdrid)) //該條饋線仍有案件在處理中 processCcsid = processCcsid + "'" + ccsid + "',"; else { ccsid = ccsId; _waitingCases.Remove(obj); break; } } if (ccsid.Length == 0) //沒有因同饋線而在等候中的案件 { var sqlStmt = "SELECT Q.CCSID AS CCSID FROM CCS.EVENTQUERY Q,CCS.EVENTRECORD R WHERE Q.CASESTATUS IN (" + (int) CCSCaseState.EventInitial + "," + (int) CCSCaseState.EventProcess + ")"; processCcsid = _processingCases.Aggregate(processCcsid, (current, item) => current + "'" + item + "',"); if (processCcsid.Length != 0) sqlStmt = sqlStmt + " AND Q.CCSID NOT IN (" + processCcsid.Substring(0, processCcsid.Length - 1) + ")"; sqlStmt = sqlStmt + " AND Q.CCSID = R.CCSID AND ROWNUM < 2 ORDER BY Q.CHANGETIME"; OracleCommand command = new OracleCommand(sqlStmt, conn); OracleDataReader reader = command.ExecuteReader(); try { if (reader.Read()) ccsid = reader["CCSID"].ToString(); } catch (Exception e) { Logger.Error(e, e.Message); } finally { reader.Close(); command.Dispose(); } } if (ccsid.Length != 0) ccsEventRecord = new CCSEventRecord(ccsid, conn); if (ccsEventRecord != null) { CCSEventQuery ccsEventQuery = new CCSEventQuery { CcsId = ccsEventRecord.CcsId, CaseStatus = (int) CCSCaseState.EventProcess }; //先將EVETNQUERY的案件狀態改為開始處理 OracleTransaction transaction = conn.BeginTransaction(); try { if (ccsEventQuery.UpdateCaseStatus(conn, transaction)) { Logger.Info("更新EVENTQUERY的案件狀態為處理中.(CCSID = " + ccsEventRecord.CcsId + ")"); transaction.Commit(); _processingCases.Add(ccsEventRecord.CcsId); } else { Logger.Error("無法更新EVENTQUERY的案件狀態.(CCSID = " + ccsEventRecord.CcsId + ")"); if (transaction.Connection.State == ConnectionState.Open) transaction.Rollback(); } } catch (Exception e) { if (transaction.Connection.State == ConnectionState.Open) transaction.Rollback(); Logger.Error(e, e.Message); ccsEventRecord = null; } } return ccsEventRecord; } } }