ulysseskao
2016-05-03 410dc2d04dbecf019129cd8cd65a3be2c62b4d0c
CCSTrace/CCS/CCSMain.cs
@@ -4,6 +4,7 @@
using System.Data;
using System.Data.OracleClient;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Web;
@@ -11,6 +12,7 @@
using Amib.Threading;
using CCSTrace.CCS.Domain;
using CCSTrace.CCS.Object;
using Iesi.Collections.Generic;
using NLog;
namespace CCSTrace.CCS
@@ -22,17 +24,15 @@
        private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
        private static CcsMain _instance;
        public static ArrayList ProcessCases = new ArrayList();
        public static Queue<int> ProcessCaseQueue = new Queue<int>();
        public static Hashtable ProcessFdr = new Hashtable();//Record the processing FDR
        public static ArrayList WaitingCases = new ArrayList();//Record the case when the case's FDR processed by another
        private int _connectionCount = 1;
        private readonly int _connectionCount = 1;
        private OracleConnection _mainConn = null;
        private SmartThreadPool _mainjobThreadPool = null;
        private readonly object _syncDbQueue = new object();
        private readonly Queue<OracleConnection> _dbQueue = new Queue<OracleConnection>();
        private readonly List<string> _processingCases = new List<string>();
        private readonly List<KeyValuePair<string, int>> _waitingCases = new List<KeyValuePair<string, int>>();//Record the case when the case's FDR processed by another
        public CcsMain()
        {
@@ -42,18 +42,13 @@
        public string AppDataPath { get; set; }
        public static CcsMain Instance()
        {
            // Uses lazy initialization.
        public SortedSet<int> ProcessingFdr { get; } = new SortedSet<int>();
            // Note: this is not thread safe.
        public static CcsMain Instance => _instance ?? (_instance = new CcsMain());
            if (_instance == null)
            {
                _instance = new CcsMain();
            }
            return _instance;
        }
        public List<string> ProcessingCases => _processingCases;
        public List<KeyValuePair<string, int>> WaitingCases => _waitingCases;
        public void Startup()
        {
@@ -66,13 +61,12 @@
                if (_mainConn == null)
                    _mainConn = CreateConnection();
                //MainConn = CreateMainConnection();
                Logger.Info("CCSMain has been connected database.");
                GlobalVariable.CcsCodelist = new CcsCodelist(_mainConn);
                GlobalVariable.EosCodelist = new EosCodelist(_mainConn);
                for (int i = 0; i < _connectionCount; i++)
                for (var i = 0; i < _connectionCount; i++)
                {
                    lock (_syncDbQueue) _dbQueue.Enqueue(CreateConnection());
                }
@@ -82,23 +76,25 @@
                Logger.Error(e, e.Message);
                throw;
            }
            finally
            {
            }
            CcsRecord mRecord = GetWaitRecord(_mainConn);
            var addrContrast = AddrContrast.Instance;
            addrContrast.initialize(_mainConn);
            if (mRecord != null)
            CCSEventRecord ccsEventRecord = GetWaitRecord(_mainConn);
            if (ccsEventRecord != null)
            {
                OracleConnection conn = null;
                OracleConnection conn;
                lock (_syncDbQueue) conn = _dbQueue.Dequeue();
                ProcessCase(mRecord, conn);
                ProcessCase(ccsEventRecord, conn);
            }
            STPStartInfo stpStartInfo = new STPStartInfo();
            stpStartInfo.IdleTimeout = GlobalVariable.IdleTimeout * 1000;
            stpStartInfo.MaxWorkerThreads = GlobalVariable.MaxThreadSize;
            stpStartInfo.MinWorkerThreads = GlobalVariable.MinThreadSize;
            STPStartInfo stpStartInfo = new STPStartInfo
            {
                IdleTimeout = GlobalVariable.IdleTimeout*1000,
                MaxWorkerThreads = GlobalVariable.MaxThreadSize,
                MinWorkerThreads = GlobalVariable.MinThreadSize
            };
            _mainjobThreadPool = new SmartThreadPool(stpStartInfo);
            Logger.Info("Create Thread Pool.");
        }
@@ -112,11 +108,18 @@
            GC.WaitForPendingFinalizers();
        }
        public void AcceptEvent(CcsRecord record)
        public void AcceptEvent(CCSEventRecord ccsEventRecord)
        {
            /*
            WorkItemCallback workItemCallback = new WorkItemCallback(this.DoWork);
            _mainjobThreadPool.QueueWorkItem(workItemCallback, record);
            _mainjobThreadPool.QueueWorkItem(workItemCallback, ccsEventRecord);
            WIGStartInfo wigStartInfo = new WIGStartInfo()
            {
                FillStateWithArgs = true,
            };
            _wig1 = _smartThreadPool.CreateWorkItemsGroup((int)spinCon1.Value, wigStartInfo);
            _wig2 = _smartThreadPool.CreateWorkItemsGroup((int)spinCon2.Value, wigStartInfo);
            */
            int i = 0;
@@ -126,18 +129,18 @@
            {
                try
                {
                    if (InsertEventRecord(record, _mainConn))
                    if (InsertCCSEventRecord(ccsEventRecord, _mainConn))
                    {
                        if (_dbQueue.Count > 0)
                        {
                            //將EVETNQUERY的案件狀態改為開始處理
                            CcsRecord mRecord = GetWaitRecord(_mainConn);
                            CCSEventRecord waitCcsEventRecord = GetWaitRecord(_mainConn);
                            if (mRecord != null)
                            if (waitCcsEventRecord != null)
                            {
                                OracleConnection conn = null;
                                lock (_syncDbQueue) conn = _dbQueue.Dequeue();
                                ProcessCase(mRecord, conn);
                                ProcessCase(waitCcsEventRecord, conn);
                            }
                        }
@@ -226,7 +229,15 @@
        private OracleConnection CreateConnection()
        {
            OracleConnection dbConn = new OracleConnection(GlobalVariable.ConnectionString);
            OracleConnectionStringBuilder builder = new OracleConnectionStringBuilder(GlobalVariable.ConnectionString)
            {
                MaxPoolSize = 20,
                MinPoolSize = 5,
                Pooling = true
            };
            string connectstring = builder.ToString();
            OracleConnection dbConn = new OracleConnection(connectstring);
            dbConn.Open();
@@ -234,7 +245,7 @@
        }
        [MethodImpl(MethodImplOptions.Synchronized)]
        private bool InsertEventRecord(CcsRecord record, OracleConnection conn)
        private bool InsertCCSEventRecord(CCSEventRecord ccsEventRecord, OracleConnection conn)
        {
            OracleTransaction transaction = null;
@@ -245,18 +256,18 @@
                transaction = conn.BeginTransaction();
                if (record.InsertDb(conn, transaction))
                if (ccsEventRecord.InsertDb(conn, transaction))
                {
                    var eventQuery = new EventQuery
                    var ccsEventQuery = new CCSEventQuery
                    {
                        CcsId = record.CcsId,
                        Meter = record.Meter,
                        CcsId = ccsEventRecord.CcsId,
                        Meter = ccsEventRecord.Meter,
                        CaseStatus = (int) CCSCaseState.EventInitial,
                        ChangeTime = record.AcceptTime
                        ChangeTime = ccsEventRecord.AcceptTime
                    };
                    if (eventQuery.Insert(conn, transaction))
                    if (ccsEventQuery.Insert(conn, transaction))
                        transaction.Commit();
                    else
                    {
@@ -298,14 +309,14 @@
        private delegate void WorkerThreadHandler();
        private void ProcessCase(CcsRecord record, OracleConnection conn)
        private void ProcessCase(CCSEventRecord ccsEventRecord, OracleConnection conn)
        {
            try
            {
                if (conn.State.ToString().Equals("Closed"))
                if (conn.State == ConnectionState.Closed)
                    conn.Open();
                ProcessEvent processEvent = new ProcessEvent(record, conn, GlobalVariable.TraceConnectionString);
                ProcessEvent processEvent = new ProcessEvent(ccsEventRecord, conn, GlobalVariable.TraceConnectionString);
                processEvent.ThreadFinish += ThreadEndEventProcess;
                ThreadStart threadStart = processEvent.Run;
@@ -321,12 +332,12 @@
        private void ThreadEndEventProcess(object sender, ThreadEndEvent e)
        {
            //將EVETNQUERY的案件狀態改為開始處理
            CcsRecord mRecord = GetWaitRecord(e.GetConnection());
            CCSEventRecord waitCcsEventRecord = GetWaitRecord(e.GetConnection());
            try
            {
                if (mRecord != null)
                    ProcessCase(mRecord, e.GetConnection());
                if (waitCcsEventRecord != null)
                    ProcessCase(waitCcsEventRecord, e.GetConnection());
            }
            finally 
            {
@@ -335,39 +346,39 @@
        }
        [MethodImpl(MethodImplOptions.Synchronized)]
        private CcsRecord GetWaitRecord(OracleConnection conn)
        private CCSEventRecord GetWaitRecord(OracleConnection conn)
        {
            string processCcsid = "";
            string ccsid = "";
            CcsRecord record = null;
            CCSEventRecord ccsEventRecord = null;
            foreach (string[] obj in WaitingCases)
            foreach (var obj in _waitingCases)
            {
                string ccsId = obj[0];
                string fdrid = obj[1];
                var ccsId = obj.Key;
                var fdrid = obj.Value;
                if (ProcessFdr.ContainsKey(fdrid)) //該條饋線仍有案件在處理中
                if (ProcessingFdr.Contains(fdrid)) //該條饋線仍有案件在處理中
                    processCcsid = processCcsid + "'" + ccsid + "',";
                else
                {
                    ccsid = ccsId;
                    WaitingCases.Remove(obj);
                    _waitingCases.Remove(obj);
                    break;
                }
            }
            if (ccsid.Length == 0)  //沒有因同饋線而在等候中的案件
            {
                string sqlStmt = "SELECT Q.CCSID AS CCSID FROM CCS.EVENTQUERY Q,CCS.EVENTRECORD R WHERE Q.CASESTATUS IN (" + (int) CCSCaseState.EventInitial + "," + (int) CCSCaseState.EventProcess + ")";
                IEnumerator Enum = ProcessCases.GetEnumerator();
                var sqlStmt = "SELECT Q.CCSID AS CCSID FROM CCS.EVENTQUERY Q,CCS.EVENTRECORD R WHERE Q.CASESTATUS IN (" +
                    (int) CCSCaseState.EventInitial + "," +
                    (int) CCSCaseState.EventProcess + ")";
                while (Enum.MoveNext())
                    if (Enum.Current != null) processCcsid = processCcsid + "'" + Enum.Current + "',";
                processCcsid = _processingCases.Aggregate(processCcsid, (current, item) => current + "'" + item + "',");
                if (processCcsid.Length != 0)
                    sqlStmt = sqlStmt + " AND Q.CCSID NOT IN (" + processCcsid.Substring(0, processCcsid.Length - 1) + ")";
                sqlStmt = sqlStmt + " AND Q.CCSID = R.CCSID AND ROWNUM < 2 ORDER BY Q.ChangeTime";
                sqlStmt = sqlStmt + " AND Q.CCSID = R.CCSID AND ROWNUM < 2 ORDER BY Q.CHANGETIME";
                OracleCommand command = new OracleCommand(sqlStmt, conn);
                OracleDataReader reader = command.ExecuteReader();
@@ -388,44 +399,46 @@
            }
            if (ccsid.Length != 0)
                record = new CcsRecord(ccsid, conn);
                ccsEventRecord = new CCSEventRecord(ccsid, conn);
            if (record != null)
            if (ccsEventRecord != null)
            {
                EventQuery eventQuery = new EventQuery();
                CCSEventQuery ccsEventQuery = new CCSEventQuery
                {
                    CcsId = ccsEventRecord.CcsId,
                    CaseStatus = (int) CCSCaseState.EventProcess
                };
                //先將EVETNQUERY的案件狀態改為開始處理
                eventQuery.CcsId = record.CcsId;
                eventQuery.CaseStatus = (int) CCSCaseState.EventProcess;
                OracleTransaction transaction = conn.BeginTransaction();
                try
                {
                    if (eventQuery.UpdateCaseStatus(conn, transaction))
                    if (ccsEventQuery.UpdateCaseStatus(conn, transaction))
                    {
                        Logger.Info("更新EVENTQUERY的案件狀態為處理中.(CCSID = " + record.CcsId + ")");
                        Logger.Info("更新EVENTQUERY的案件狀態為處理中.(CCSID = " + ccsEventRecord.CcsId + ")");
                        transaction.Commit();
                        ProcessCases.Add(record.CcsId);
                        _processingCases.Add(ccsEventRecord.CcsId);
                    }
                    else
                    {
                        Logger.Error("無法更新EVENTQUERY的案件狀態.(CCSID = " + record.CcsId + ")");
                        Logger.Error("無法更新EVENTQUERY的案件狀態.(CCSID = " + ccsEventRecord.CcsId + ")");
                        if (transaction.Connection.State.ToString().Equals("Open"))
                        if (transaction.Connection.State == ConnectionState.Open)
                            transaction.Rollback();
                    }
                }
                catch (Exception e)
                {
                    if (transaction.Connection.State.ToString().Equals("Open"))
                    if (transaction.Connection.State == ConnectionState.Open)
                        transaction.Rollback();
                    Logger.Error(e, e.Message);
                    record = null;
                    ccsEventRecord = null;
                }
            }
            return record;
            return ccsEventRecord;
        }
    }
}