From 540014a7702a9bae7a3b9c00098671a132e869e8 Mon Sep 17 00:00:00 2001
From: ulysseskao <ulysseskao@gmail.com>
Date: Thu, 05 May 2016 10:21:11 +0800
Subject: [PATCH] Merge remote-tracking branch 'origin/usestp'
---
CCSTrace/CCS/CCSMain.cs | 335 ++++++++++++++++++++++++++++++-------------------------
1 files changed, 182 insertions(+), 153 deletions(-)
diff --git a/CCSTrace/CCS/CCSMain.cs b/CCSTrace/CCS/CCSMain.cs
index a9a480b..81b520d 100644
--- a/CCSTrace/CCS/CCSMain.cs
+++ b/CCSTrace/CCS/CCSMain.cs
@@ -1,13 +1,9 @@
using System;
-using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Data.OracleClient;
using System.IO;
using System.Linq;
-using System.Runtime.CompilerServices;
-using System.Threading;
-using System.Web;
using System.Xml;
using Amib.Threading;
using CCSTrace.CCS.Domain;
@@ -24,31 +20,31 @@
private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
private static CcsMain _instance;
- private readonly int _connectionCount = 1;
- private OracleConnection _mainConn = null;
-
- private SmartThreadPool _mainjobThreadPool = null;
- private readonly object _syncDbQueue = new object();
- private readonly Queue<OracleConnection> _dbQueue = new Queue<OracleConnection>();
private readonly List<string> _processingCases = new List<string>();
- private readonly List<KeyValuePair<string, int>> _waitingCases = new List<KeyValuePair<string, int>>();//Record the case when the case's FDR processed by another
+ //Record the case when the case's FDR processed by another
+ private readonly SortedSet<int> _processingFdr = new SortedSet<int>();
+ // private readonly object _syncDbQueue = new object();
+ // private readonly Queue<OracleConnection> _dbQueue = new Queue<OracleConnection>();
+ private readonly object _syncProcessingCases = new object();
+ private readonly object _syncProcessingFdrs = new object();
+ private readonly object _syncWaitingCases = new object();
+ private readonly List<KeyValuePair<string, int>> _waitingCases = new List<KeyValuePair<string, int>>();
+
- public CcsMain()
- {
- _mainjobThreadPool = new SmartThreadPool();
- // Startup();
- }
+ // private readonly int _connectionCount = 1;
+ private OracleConnection _mainConn;
+
+ private SmartThreadPool _mainjobThreadPool;
+
+ private IWorkItemsGroup _wigFinsh;
+ private IWorkItemsGroup _wigNextJobs;
+ private IWorkItemsGroup _wigProcess;
+
public string AppDataPath { get; set; }
- public SortedSet<int> ProcessingFdr { get; } = new SortedSet<int>();
-
public static CcsMain Instance => _instance ?? (_instance = new CcsMain());
-
- public List<string> ProcessingCases => _processingCases;
-
- public List<KeyValuePair<string, int>> WaitingCases => _waitingCases;
public void Startup()
{
@@ -56,8 +52,8 @@
try
{
- ReadXml();
- Logger.Info("CCSMain ReadXml Complete.");
+ ReadConfigXml();
+ Logger.Info("CCSMain ReadConfigXml Complete.");
if (_mainConn == null)
_mainConn = CreateConnection();
@@ -66,10 +62,12 @@
GlobalVariable.CcsCodelist = new CcsCodelist(_mainConn);
GlobalVariable.EosCodelist = new EosCodelist(_mainConn);
+ /*
for (var i = 0; i < _connectionCount; i++)
{
lock (_syncDbQueue) _dbQueue.Enqueue(CreateConnection());
}
+ */
}
catch (Exception e)
{
@@ -79,7 +77,34 @@
AddrContrast.Instance.Initialize(_mainConn);
AlarmData.Instance.Initialize(_mainConn);
+ DeptContrast.Instance.Initialize(_mainConn);
+ var stpStartInfo = new STPStartInfo
+ {
+ StartSuspended = true,
+ IdleTimeout = GlobalVariable.IdleTimeout*1000,
+ MaxWorkerThreads = GlobalVariable.MaxThreadSize,
+ MinWorkerThreads = GlobalVariable.MinThreadSize
+ };
+ _mainjobThreadPool = new SmartThreadPool(stpStartInfo);
+ var wigStartInfo = new WIGStartInfo
+ {
+ FillStateWithArgs = true
+ };
+ _wigProcess = _mainjobThreadPool.CreateWorkItemsGroup(1, wigStartInfo);
+ _wigFinsh = _mainjobThreadPool.CreateWorkItemsGroup(1, wigStartInfo);
+ _wigNextJobs = _mainjobThreadPool.CreateWorkItemsGroup(1, wigStartInfo);
+
+ _mainjobThreadPool.Start();
+ _wigNextJobs.Start();
+ _wigProcess.Start();
+ _wigFinsh.Start();
+
+ Logger.Info("Create Thread Pool.");
+
+ // doNextJob
+ _wigNextJobs.QueueWorkItem(DoNextJobs);
+ /*
CCSEventRecord ccsEventRecord = GetWaitRecord(_mainConn);
if (ccsEventRecord != null)
@@ -88,15 +113,7 @@
lock (_syncDbQueue) conn = _dbQueue.Dequeue();
ProcessCase(ccsEventRecord, conn);
}
-
- STPStartInfo stpStartInfo = new STPStartInfo
- {
- IdleTimeout = GlobalVariable.IdleTimeout*1000,
- MaxWorkerThreads = GlobalVariable.MaxThreadSize,
- MinWorkerThreads = GlobalVariable.MinThreadSize
- };
- _mainjobThreadPool = new SmartThreadPool(stpStartInfo);
- Logger.Info("Create Thread Pool.");
+ */
}
public void Shutdown()
@@ -108,90 +125,65 @@
GC.WaitForPendingFinalizers();
}
- public void AcceptEvent2(CCSEventRecord ccsEventRecord)
+ private void DoNextJobs()
{
- // WorkItemCallback workItemCallback = new WorkItemCallback(this.DoEventWork);
- _mainjobThreadPool.QueueWorkItem(DoEventWork, ccsEventRecord);
- }
-
- private void DoEventWork(CCSEventRecord state)
- {
- OracleConnection conn;
- lock (_syncDbQueue) conn = _dbQueue.Dequeue();
+ Logger.Debug("Enter DoNextJobs");
+ var conn = CreateConnection();
try
{
- AcceptEvent(state);
+ var ccsEventRecord = GetWaitRecord(conn);
+
+ if (ccsEventRecord != null)
+ {
+ _wigProcess.QueueWorkItem(ProcessCase, ccsEventRecord);
+ }
}
finally
{
- lock (_syncDbQueue) _dbQueue.Enqueue(conn);
+ conn?.Close();
+ Logger.Debug("Leave DoNextJobs");
}
-
-
}
public void AcceptEvent(CCSEventRecord ccsEventRecord)
{
- /*
- WIGStartInfo wigStartInfo = new WIGStartInfo()
+ _mainjobThreadPool.QueueWorkItem(AcceptWebServiceEvent, ccsEventRecord);
+ }
+
+ private void AcceptWebServiceEvent(CCSEventRecord ccsEventRecord)
+ {
+ Logger.Debug("Enter AcceptWebServiceEvent");
+ var conn = CreateConnection();
+
+ try
{
- FillStateWithArgs = true,
- };
- _wig1 = _smartThreadPool.CreateWorkItemsGroup((int)spinCon1.Value, wigStartInfo);
- _wig2 = _smartThreadPool.CreateWorkItemsGroup((int)spinCon2.Value, wigStartInfo);
-
- */
-
- int i = 0;
- int reConnectCount = 1;
-
- while (i <= reConnectCount)
+ if (InsertCCSEventRecord(ccsEventRecord, conn))
+ {
+ _wigNextJobs.QueueWorkItem(DoNextJobs);
+ }
+ }
+ catch (Exception e)
{
- try
- {
- if (InsertCCSEventRecord(ccsEventRecord, _mainConn))
- {
- if (_dbQueue.Count > 0)
- {
- //將EVETNQUERY的案件狀態改為開始處理
- CCSEventRecord waitCcsEventRecord = GetWaitRecord(_mainConn);
-
- if (waitCcsEventRecord != null)
- {
- OracleConnection conn;
- lock (_syncDbQueue) conn = _dbQueue.Dequeue();
- ProcessCase(waitCcsEventRecord, conn);
- }
- }
-
- break;
- }
- }
- catch (Exception e)
- {
- Logger.Error(e, e.Message);
- if (_mainConn.State == ConnectionState.Closed)
- {
- i++;
- if (i > reConnectCount)
- throw;
- }
- else
- throw;
- }
+ Logger.Error(e, e.Message);
+ }
+ finally
+ {
+ conn?.Close();
+ Logger.Debug("Leave AcceptWebServiceEvent");
}
}
- private void ReadXml()
+
+ private void ReadConfigXml()
{
XmlReader reader = null;
try
{
- string file = Path.Combine(AppDataPath, DbConfigFilename);
+ var file = Path.Combine(AppDataPath, DbConfigFilename);
// 建立 XML 讀取器
- XmlReaderSettings settings = new XmlReaderSettings
+ var settings = new XmlReaderSettings
{
IgnoreComments = true, // 不處理註解
IgnoreWhitespace = true, // 跳過空白
@@ -205,7 +197,7 @@
switch (reader.NodeType)
{
case XmlNodeType.Element:
- string localName = reader.LocalName; // 取得標籤名稱
+ var localName = reader.LocalName; // 取得標籤名稱
// Step 3: 讀取 FileInfo 標籤的屬性
if (localName.Equals("DBSetting"))
@@ -214,7 +206,7 @@
$"Data source={reader["DataSource"]};User Id={reader["UserId"]};Password={reader["Password"]};";
GlobalVariable.TraceConnectionString =
$"{reader["UserId"]}/{reader["Password"]}@{reader["DataSource"]}";
- string token = reader["ConnectionCount"];
+ var token = reader["ConnectionCount"];
if (token != null)
{
GlobalVariable.MaxConnectionCount = int.Parse(token);
@@ -224,9 +216,10 @@
{
GlobalVariable.ShowError = bool.Parse(token);
}
- } else if (localName.Equals("ThreadSetting"))
+ }
+ else if (localName.Equals("ThreadSetting"))
{
- string token = reader["maxThreadSize"];
+ var token = reader["maxThreadSize"];
if (token != null)
{
GlobalVariable.MaxThreadSize = int.Parse(token);
@@ -252,22 +245,19 @@
private OracleConnection CreateConnection()
{
- OracleConnectionStringBuilder builder = new OracleConnectionStringBuilder(GlobalVariable.ConnectionString)
+ var builder = new OracleConnectionStringBuilder(GlobalVariable.ConnectionString)
{
MaxPoolSize = GlobalVariable.MaxConnectionCount,
MinPoolSize = 1,
Pooling = true
};
- string connectstring = builder.ToString();
+ var connectstring = builder.ToString();
- OracleConnection dbConn = new OracleConnection(connectstring);
-
+ var dbConn = new OracleConnection(connectstring);
dbConn.Open();
-
return dbConn;
}
- [MethodImpl(MethodImplOptions.Synchronized)]
private bool InsertCCSEventRecord(CCSEventRecord ccsEventRecord, OracleConnection conn)
{
OracleTransaction transaction = null;
@@ -330,80 +320,76 @@
return true;
}
- private delegate void WorkerThreadHandler();
-
- private void ProcessCase(CCSEventRecord ccsEventRecord, OracleConnection conn)
+ private void ProcessCase(CCSEventRecord ccsEventRecord)
{
+ Logger.Debug("Enter ProcessCase");
+ OracleConnection conn = CreateConnection();
try
{
if (conn.State == ConnectionState.Closed)
conn.Open();
- ProcessEvent processEvent = new ProcessEvent(ccsEventRecord, conn, GlobalVariable.TraceConnectionString);
- processEvent.ThreadFinish += ThreadEndEventProcess;
+ var processEvent = new ProcessEvent();
+ processEvent.Run(ccsEventRecord, conn, GlobalVariable.TraceConnectionString);
- ThreadStart threadStart = processEvent.Run;
- Thread thread = new Thread(threadStart);
- thread.Start();
+ _wigNextJobs.QueueWorkItem(DoNextJobs);
}
- catch
+ catch (Exception e)
{
- lock (_syncDbQueue) _dbQueue.Enqueue(conn);
+ Logger.Error(e, e.Message);
}
+ finally
+ {
+ conn?.Close();
+ }
+ Logger.Debug("Leave ProcessCase");
}
- private void ThreadEndEventProcess(object sender, ThreadEndEvent e)
- {
- //將EVETNQUERY的案件狀態改為開始處理
- CCSEventRecord waitCcsEventRecord = GetWaitRecord(e.GetConnection());
-
- try
- {
- if (waitCcsEventRecord != null)
- ProcessCase(waitCcsEventRecord, e.GetConnection());
- }
- finally
- {
- lock (_syncDbQueue) _dbQueue.Enqueue(e.GetConnection());
- }
- }
-
- [MethodImpl(MethodImplOptions.Synchronized)]
+ // [MethodImpl(MethodImplOptions.Synchronized)]
private CCSEventRecord GetWaitRecord(OracleConnection conn)
{
- string processCcsid = "";
- string ccsid = "";
+ var processCcsid = "";
+ var ccsid = "";
CCSEventRecord ccsEventRecord = null;
- foreach (var obj in _waitingCases)
+ lock (_syncWaitingCases)
{
- var ccsId = obj.Key;
- var fdrid = obj.Value;
-
- if (ProcessingFdr.Contains(fdrid)) //該條饋線仍有案件在處理中
- processCcsid = processCcsid + "'" + ccsid + "',";
- else
+ foreach (var obj in _waitingCases)
{
- ccsid = ccsId;
- _waitingCases.Remove(obj);
- break;
+ var ccsId = obj.Key;
+ var fdrid = obj.Value;
+
+ if (ContainProcessingFdr(fdrid)) //該條饋線仍有案件在處理中
+ processCcsid = processCcsid + "'" + ccsid + "',";
+ else
+ {
+ ccsid = ccsId;
+ _waitingCases.Remove(obj);
+ break;
+ }
}
}
- if (ccsid.Length == 0) //沒有因同饋線而在等候中的案件
+ if (ccsid.Length == 0) //沒有因同饋線而在等候中的案件
{
- var sqlStmt = "SELECT Q.CCSID AS CCSID FROM CCS.EVENTQUERY Q,CCS.EVENTRECORD R WHERE Q.CASESTATUS IN (" +
- (int) CCSCaseState.EventInitial + "," +
+ var sqlStmt =
+ "SELECT Q.CCSID AS CCSID FROM CCS.EVENTQUERY Q,CCS.EVENTRECORD R WHERE Q.CASESTATUS IN (" +
+ (int) CCSCaseState.EventInitial + "," +
(int) CCSCaseState.EventProcess + ")";
- processCcsid = _processingCases.Aggregate(processCcsid, (current, item) => current + "'" + item + "',");
+ lock (_processingCases)
+ {
+ processCcsid = _processingCases.Aggregate(processCcsid,
+ (current, item) => current + "'" + item + "',");
+ }
if (processCcsid.Length != 0)
- sqlStmt = sqlStmt + " AND Q.CCSID NOT IN (" + processCcsid.Substring(0, processCcsid.Length - 1) + ")";
+ sqlStmt = sqlStmt + " AND Q.CCSID NOT IN (" + processCcsid.Substring(0, processCcsid.Length - 1) +
+ ")";
sqlStmt = sqlStmt + " AND Q.CCSID = R.CCSID AND ROWNUM < 2 ORDER BY Q.CHANGETIME";
- OracleCommand command = new OracleCommand(sqlStmt, conn);
- OracleDataReader reader = command.ExecuteReader();
+ var command = new OracleCommand(sqlStmt, conn);
+ var reader = command.ExecuteReader();
try
{
@@ -426,14 +412,14 @@
if (ccsEventRecord != null)
{
- CCSEventQuery ccsEventQuery = new CCSEventQuery
+ var ccsEventQuery = new CCSEventQuery
{
CcsId = ccsEventRecord.CcsId,
CaseStatus = (int) CCSCaseState.EventProcess
};
//先將EVETNQUERY的案件狀態改為開始處理
- OracleTransaction transaction = conn.BeginTransaction();
+ var transaction = conn.BeginTransaction();
try
{
@@ -441,7 +427,10 @@
{
Logger.Info("更新EVENTQUERY的案件狀態為處理中.(CCSID = " + ccsEventRecord.CcsId + ")");
transaction.Commit();
- _processingCases.Add(ccsEventRecord.CcsId);
+ lock (_syncProcessingCases)
+ {
+ _processingCases.Add(ccsEventRecord.CcsId);
+ }
}
else
{
@@ -463,5 +452,45 @@
return ccsEventRecord;
}
+
+ public void AddWaitingCases(string ccsId, int fdrid)
+ {
+ lock (_syncWaitingCases)
+ {
+ _waitingCases.Add(new KeyValuePair<string, int>(ccsId, fdrid));
+ }
+ }
+
+ public void AddProcessingFdr(int fdrid)
+ {
+ lock (_syncProcessingFdrs)
+ {
+ _processingFdr.Add(fdrid);
+ }
+ }
+
+ public bool ContainProcessingFdr(int fdrid)
+ {
+ lock (_syncProcessingFdrs)
+ {
+ return _processingFdr.Contains(fdrid);
+ }
+ }
+
+ public void RemoveProcessingFdr(int fdrId)
+ {
+ lock (_syncProcessingFdrs)
+ {
+ _processingFdr.Remove(fdrId);
+ }
+ }
+
+ public void RemoveProcessingCases(string ccsId)
+ {
+ lock (_syncProcessingCases)
+ {
+ _processingCases.Remove(ccsId);
+ }
+ }
}
}
\ No newline at end of file
--
Gitblit v0.0.0-SNAPSHOT