From 35266c8d7d5578f426a44704de1fcbeb74975f78 Mon Sep 17 00:00:00 2001 From: BlueMatthew Date: Sat, 18 Nov 2023 12:28:06 +0800 Subject: [PATCH] =?UTF-8?q?=E9=95=BF=E6=B3=B0=E5=8A=9F=E8=83=BD=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- etc/ht_iec104.conf | 16 +- etc/ht_service.conf | 9 +- include/HTCP56Time.h | 2 + include/HTDataStruct.h | 5 +- include/HTGlobal.h | 6 + include/HTHP104Table.h | 41 ++++ include/HTIEC104.h | 3 +- include/HTPublic.h | 3 + src/HTCP56Time.cpp | 15 ++ src/HTGlobal.cpp | 16 +- src/HTIEC104.cpp | 386 ++++++++++++++++++++++++++++++++++++- src/HTInitUtils.cpp | 3 + src/HTMemCacheData.cpp | 80 ++++++++ src/HTPublic.cpp | 13 ++ src/HTService.cpp | 2 + vsproj/iec104.vcxproj | 7 +- vsproj/iec104.vcxproj.user | 5 +- 17 files changed, 590 insertions(+), 22 deletions(-) diff --git a/etc/ht_iec104.conf b/etc/ht_iec104.conf index 1cef11e..fbdf339 100644 --- a/etc/ht_iec104.conf +++ b/etc/ht_iec104.conf @@ -5,4 +5,18 @@ # Copyright (C) 2018, huatek, Inc. /********************************************************************/ [IEC_DEVCOUNT] -# 涓诲彉璁惧鎬绘暟閲 \ No newline at end of file +iec_byq_count = 1 +iec_break_count = 1 +iec_site_id = 1 + +[COMM_SAVE_INTERNAL] +DATA_SAVE_INTERNAL = 30 + +[IEC_OBJS_ADDR] +iec_global_addr = 1 +yx_start_addr = 1 +yx_stop_addr = 16000 +jb_start_addr = 16001 +jb_stop_addr = 16384 +yc_start_addr = 16385 +yc_stop_addr = 30000 \ No newline at end of file diff --git a/etc/ht_service.conf b/etc/ht_service.conf index 1a79799..504f73b 100644 --- a/etc/ht_service.conf +++ b/etc/ht_service.conf @@ -32,11 +32,12 @@ iec_w = 3 # application database config parameter [HT_DATABASE] -db_host_addr = 192.168.128.86 +#db_host_addr = 192.168.128.86 +db_host_addr = 127.0.0.1 db_host_port = 3306 -db_name = hmf_0617 -db_username = root -db_password = 123456 +db_name = iec104 +db_username = cac +db_password = Cac@123 db_min_conn = 5 db_max_conn = 8 diff --git a/include/HTCP56Time.h b/include/HTCP56Time.h index 8c5e2e9..7c792d1 100644 --- a/include/HTCP56Time.h +++ b/include/HTCP56Time.h @@ -70,6 +70,8 @@ public: void GetTimeString(char *buf, size_t size); FILETIME _GetFileTime(void); SYSTEMTIME _GetSystemTime(void); + + time_t GetTime(void); void ActualTimeToCP56Time(unsigned char *data); diff --git a/include/HTDataStruct.h b/include/HTDataStruct.h index 78c557f..98dc0ee 100644 --- a/include/HTDataStruct.h +++ b/include/HTDataStruct.h @@ -15,7 +15,7 @@ #pragma pack (push ,1) -#define DEF_EQM_CODE_SIZE 32+1 // 设备编码eqm_code最大长度 +#define DEF_EQM_CODE_SIZE (32+1) // 设备编码eqm_code最大长度 //#define _SITE_ID_TYPE_INT // 站点ID数据类型定义 #ifdef _SITE_ID_TYPE_INT @@ -46,6 +46,9 @@ typedef struct { time_t lastTime; // 最近一次入库时间 time_t last_yx_time; time_t last_yc_time; + + time_t last_origin_time; + unsigned long m_gis_fault_count; // gis故障次数 //unsigned long m_gis_count; // gis开合次数 bool m_gis_change; // 断路器断开时,记录入库 diff --git a/include/HTGlobal.h b/include/HTGlobal.h index 08dabba..a77ef28 100644 --- a/include/HTGlobal.h +++ b/include/HTGlobal.h @@ -80,6 +80,10 @@ extern mutex g_map_relation_mutex; // GIS extern map g_map_iec; // 104报文解析缓存数据 extern mutex g_map_iec_mutex; // 104报文解析缓存数据信息队列锁 +extern map g_map_devices; // 104报文解析缓存数据 +extern map g_map_iec_new; // 104报文解析缓存数据 +extern mutex g_map_iec_mutex_new; // 104报文解析缓存数据信息队列锁 + extern map g_map_sadr; // 点表匹配关系 extern mutex g_map_sadr_mutex; @@ -107,6 +111,8 @@ extern pthread_t thread_handle_timer; // 应答线程 extern pthread_t thread_handle_client; // 客户端链接线程 extern pthread_t thread_handle_warning; // 报警处理及入库线程句柄 extern pthread_t thread_handle_origin; // 原始数据如何线程句柄 +extern pthread_t thread_handle_origin_new; // 原始数据如何线程句柄 +extern pthread_t thread_handle_busi_data; // 业务数据如何线程句柄 extern pthread_t thread_handle_pingce; // IEC104评测数据入库线程句柄 extern pthread_t thread_handle_gishold; // 断路器断开数据入库线程句柄 diff --git a/include/HTHP104Table.h b/include/HTHP104Table.h index 710df95..d9be2a9 100644 --- a/include/HTHP104Table.h +++ b/include/HTHP104Table.h @@ -24,6 +24,47 @@ typedef struct { unsigned int objval; // 值域 }ST_OBJVAL; +typedef struct { + unsigned char stype; // 类型 + time_t dtime; + unsigned int sadr; // 信息对象地址(3) + union { // 值域 + unsigned int ival; + float fval; + }; +}IEC_OBJVAL_NEW; + +typedef struct tagIEC_DEVICE +{ + unsigned int site_id; // 站点ID + unsigned int sensor_id; // 装置ID + unsigned char eqm_code[DEF_EQM_CODE_SIZE]; // eqm_code,主设备ID,对应busi_site_tree.id + unsigned char sys_code[DEF_EQM_CODE_SIZE]; // sys_code,主设备ID,对应iec_sites.system_code + unsigned char tableName[DEF_EQM_CODE_SIZE * 2]; // 阈值字段名称,transformer_runing_status_threshold表字段 + unsigned char devidFieldName[DEF_EQM_CODE_SIZE * 2]; // 数据表sensor id字段名称 + unsigned char dtimeFieldName[DEF_EQM_CODE_SIZE * 2]; // 数据表采集时间字段名称 + unsigned char one_dtime; +}IEC_DEVICE; + +typedef struct tagIEC_POINT +{ + unsigned int sadr; // 信息体地址,map.key + unsigned char stype; // 地址类型 1:遥信 2:遥测 + unsigned int sensor_id; // 装置ID + unsigned char fieldName[DEF_EQM_CODE_SIZE * 2]; // 阈值字段名称,transformer_runing_status_threshold表字段 + unsigned char isget; // 评测字段提取标志,YX:{1=总开关状态 2=发电状态 3=抽水状态 4=事故状态} YC:{1=提取 0:不提取} + unsigned short unit; // 104协议数据单位 + unsigned short sysunit; // 系统固定的字段单位,参与评测数据 + double devrate; // 偏差率,当值在±之间时,保存数据 + unsigned char eqm_type; // 设备类型 1:主变 2:GIS 3:测温 4:避雷器 + unsigned char warnFlag; // 告警判断, =1:>预警值时告警 =2:<预警值时告警(压力) 油位如何告警(最低/高)? + time_t dtime; // 告警时间 + unsigned char wstate; // 工作状态:1-发电状态 2:抽水状态 3:空闲状态 + unsigned char bfault; // 主变事故状态:0-正常 1-事故状态 + // unsigned char poidesc[DEF_BUFFER_256]; // 点表字段描述 +}IEC_POINT; + + // 变压器设备唯一编码 typedef struct _BYQ_EQM_CODE { unsigned char szEqmCode[DEF_EQM_CODE_SIZE]; // 设备编码,16位 diff --git a/include/HTIEC104.h b/include/HTIEC104.h index 9192f4c..33d1f4b 100644 --- a/include/HTIEC104.h +++ b/include/HTIEC104.h @@ -60,7 +60,6 @@ void UpdateLastTime(); void *thread_listen_proc(void *arg); void *thread_client_proc(void *arg); void *thread_warn_proc(void *arg); -void *thread_origin_proc(void *arg); void * thread_pingce_proc(void * arg); void *thread_recv_proc(void *arg) ; @@ -71,6 +70,8 @@ void *thread_cache_proc(void *arg) ; // 同步内存数据线程 //void *thread_setdb_proc(void *arg); // 数据入库线程 //void *thread_getdata_proc(void *arg); void *thread_origin_proc(void *arg); // 原始数据入库现场 +void *thread_origin_proc_new(void *arg); // 原始数据入库现场 +void *thread_busi_data_proc(void* arg); void *thread_gis_hold_proc(void * arg); // 刷新机组的工作状态,1:发电、2:抽水、3:空闲 diff --git a/include/HTPublic.h b/include/HTPublic.h index 925ccf3..2f19e67 100644 --- a/include/HTPublic.h +++ b/include/HTPublic.h @@ -222,5 +222,8 @@ char* strSwap(char* szT,int len); #ifdef __cplusplus } #endif + +int replaceAll(std::string& input, const std::string& search, const std::string& replace); + #endif // end __HT_PUBLIC_H diff --git a/src/HTCP56Time.cpp b/src/HTCP56Time.cpp index a6a38a1..29de9fa 100644 --- a/src/HTCP56Time.cpp +++ b/src/HTCP56Time.cpp @@ -53,6 +53,21 @@ FILETIME CP56Time::_GetFileTime(void) return ft; } +// +// return time_t of CP56Time decoded +// +time_t CP56Time::GetTime(void) +{ + struct tm ptm = { 0 }; + ptm.tm_year = stime.wYear - 1900; + ptm.tm_mon = stime.wMonth - 1; + ptm.tm_mday = stime.wDay; + ptm.tm_hour = stime.wHour; + ptm.tm_min = stime.wMinute; + ptm.tm_sec = stime.wSecond; + return mktime(&ptm); +} + // // return CP56Time data of actual time. // diff --git a/src/HTGlobal.cpp b/src/HTGlobal.cpp index 2ea43f8..5eab328 100644 --- a/src/HTGlobal.cpp +++ b/src/HTGlobal.cpp @@ -46,6 +46,10 @@ mutex g_map_relation_mutex; // GIS缓存数 map g_map_iec; // 104点表报文解析缓存数据 mutex g_map_iec_mutex; // 104报文解析缓存数据信息队列锁 +map g_map_devices; // 104报文解析缓存数据 +map g_map_iec_new; // 104点表报文解析缓存数据 +mutex g_map_iec_mutex_new; // 104报文解析缓存数据信息队列锁 + map g_map_sadr; // 点表匹配关系 mutex g_map_sadr_mutex; @@ -64,11 +68,13 @@ pthread_t thread_handle_parse; // 104报文解析线程 pthread_t thread_handle_timer; // 超时监测线程 //pthread_t thread_handle_getdata; // 数据提取线程 -pthread_t thread_handle_client; // 客户端链接线程 -pthread_t thread_handle_warning; // 报警处理及入库线程句柄 -pthread_t thread_handle_origin; // 原始数据如何线程句柄 -pthread_t thread_handle_pingce; // IEC104评测数据入库线程句柄 -pthread_t thread_handle_gishold; // 断路器断开数据入库线程句柄 +pthread_t thread_handle_client; // 客户端链接线程 +pthread_t thread_handle_warning; // 报警处理及入库线程句柄 +pthread_t thread_handle_origin; // 原始数据如何线程句柄 +pthread_t thread_handle_origin_new; // 原始数据如何线程句柄 +pthread_t thread_handle_busi_data; // 业务数据如何线程句柄 +pthread_t thread_handle_pingce; // IEC104评测数据入库线程句柄 +pthread_t thread_handle_gishold; // 断路器断开数据入库线程句柄 pthread_t thread_handle_active; // 心跳线程 pthread_t thread_handle_linkmgr; // 链路管理线程 diff --git a/src/HTIEC104.cpp b/src/HTIEC104.cpp index ddaeac6..d76a955 100644 --- a/src/HTIEC104.cpp +++ b/src/HTIEC104.cpp @@ -28,13 +28,18 @@ mutex g_list_dbset_mutex; list g_list_origin; mutex g_list_origin_mutex; +list g_list_origin_new; +mutex g_list_origin_mutex_new; + +list g_list_busi_data; +mutex g_list_busi_data_mutex; + list g_list_pingce; mutex g_list_pingce_mutex; list g_list_warn; mutex g_list_warn_mutex; - mutex g_sendno_mutex; CIEC104::CIEC104() @@ -720,11 +725,15 @@ void IEC104EnvLoad() mutex_create(g_list_dbset_mutex); mutex_create(g_sendno_mutex); mutex_create(g_list_origin_mutex); + mutex_create(g_list_origin_mutex_new); + mutex_create(g_list_busi_data_mutex); mutex_create(g_list_pingce_mutex); mutex_create(g_list_warn_mutex); g_list_pack.clear(); g_list_dbset.clear(); g_list_origin.clear(); + g_list_origin_new.clear(); + g_list_busi_data.clear(); g_list_pingce.clear(); g_list_warn.clear(); InitIECENV(); @@ -741,6 +750,8 @@ void IEC104EnvFree() mutex_close(g_list_dbset_mutex); mutex_close(g_sendno_mutex); mutex_close(g_list_origin_mutex); + mutex_close(g_list_origin_mutex_new); + mutex_close(g_list_busi_data_mutex); mutex_close(g_list_pingce_mutex); mutex_close(g_list_warn_mutex); g_list_pack.clear(); @@ -852,6 +863,20 @@ static void addOriginDataList(ST_IECPOINT_TABLE &stData) mutex_unlock(g_list_origin_mutex); } +// 添加原始数据入库队列 +static void addOriginDataListNew(const IEC_OBJVAL_NEW &stData) +{ + int i = 0; + IEC_OBJVAL_NEW stSetData; + + memset(&stSetData, 0x00, sizeof(IEC_OBJVAL_NEW)); + memcpy(&stSetData, &stData, sizeof(IEC_OBJVAL_NEW)); + + mutex_lock(g_list_origin_mutex_new); + g_list_origin_new.push_front(stSetData); + mutex_unlock(g_list_origin_mutex_new); +} + // 获取发送序列号 void vAutoSendSeqNo(int b) { @@ -1267,6 +1292,14 @@ void vYaoKongUnexe(unsigned char *msgbuf, unsigned int len) // 遥信值入map bool bSetPointTableValueYX(unsigned char v, unsigned int adr) { + IEC_OBJVAL_NEW objVal = { 0 }; + objVal.stype = 1; + objVal.dtime = time(NULL); + objVal.ival = v; + objVal.sadr = adr; + + addOriginDataListNew(objVal); + bool is_state_opend = false; unsigned char ovl = 0; map::iterator m_pIter; @@ -1313,6 +1346,20 @@ bool bSetPointTableValueYX(unsigned char v, unsigned int adr) return true; } +bool bSetPointTableValueYX(const std::vector& values) +{ + /* + IEC_OBJVAL_NEW objVal = { 0 }; + objVal.stype = 1; + objVal.dtime = time(NULL); + objVal.ival = v; + objVal.sadr = adr; + + addOriginDataListNew(objVal); + */ + return true; +} + //遥测值入map bool bSetPointTableValueYC(float v, unsigned int adr) { @@ -1369,10 +1416,136 @@ bool bSetPointTableValueYC(float v, unsigned int adr) } } } + else + { + vPrtLogMsg(LOG_DEBUG, RET_OK, "bSetPointTableValueYC:adr Not found, size=%d", (int)g_map_iec.size()); + + } mutex_unlock(g_map_iec_mutex); return true; } +bool bSetPointTableValueYC(const std::vector& values) +{ + // Save the origin data into database first + for (std::vector::const_iterator it = values.begin(); it != values.end(); ++it) + { + addOriginDataListNew(*it); + } + + std::string tableName; + std::string dtimeFildName; + std::string devidFildName; + unsigned int devId = 0; + bool one_dtime = true; + time_t ts = 0; + std::map::const_iterator itPoint; + std::map::const_iterator itDev; + std::string fields, fieldValues; + + char dataBuf[32] = { 0 }; + mutex_lock(g_map_iec_mutex_new); + for (std::vector::const_iterator it = values.begin(); it != values.end(); ++it) + { + itPoint = g_map_iec_new.find(it->sadr); + if (itPoint != g_map_iec_new.end()) + { + continue; + } + itDev = g_map_devices.find(itPoint->second.sensor_id); + if (itDev != g_map_devices.end()) + { + continue; + } + + if (tableName.empty()) + { + tableName = (const char *)itDev->second.tableName; + dtimeFildName = (const char *)itDev->second.dtimeFieldName; + devidFildName = (const char *)itDev->second.devidFieldName; + devId = itDev->second.sensor_id; + one_dtime = (itDev->second.one_dtime != 0); + ts = it->dtime; + + vPrtLogMsg(LOG_DEBUG, RET_OK, "Table: %s oneTime=%u", tableName.c_str(), (unsigned int)itDev->second.one_dtime); + if (tableName.empty()) + { + break; + } + } + else + { + if (tableName.compare((const char *)itDev->second.tableName) != 0) + { + continue; + } + } + + fields.append((const char *)itPoint->second.fieldName); + fields.append(","); + + fieldValues.append("'"); + snprintf(dataBuf, sizeof(dataBuf), "%0.4f", it->fval); + fieldValues.append(dataBuf); + fieldValues.append("',"); + + if (!one_dtime) + { + fields.append((const char *)itPoint->second.fieldName); + fields.append("_time,"); + + fieldValues.append("FROM_UNIXTIME("); + snprintf(dataBuf, sizeof(dataBuf), "%lld", (long long)it->dtime); + fieldValues.append(dataBuf); + fieldValues.append("),"); + } + } + mutex_unlock(g_map_iec_mutex_new); + + // Assume they belongs the same object + std::string sql = "INSERT INTO "; + sql.append(tableName); + sql.append("("); + sql.append(devidFildName); + sql.append(","); + + sql.append(fields, 0, fields.size() - 1); + sql.append(" VALUES("); + + snprintf(dataBuf, sizeof(dataBuf), "%u", devId); + sql.append(dataBuf); + sql.append(","); + + if (one_dtime) + { + sql.append("FROM_UNIXTIME("); + snprintf(dataBuf, sizeof(dataBuf), "%lld", (long long)ts); + sql.append(dataBuf); + sql.append("),"); + } + + sql.append(fieldValues, 0, fieldValues.size() - 1); + sql.append(")"); + + std::list::iterator it; + mutex_lock(g_list_busi_data_mutex); + it = g_list_busi_data.insert(g_list_busi_data.end(), std::string()); + it->swap(sql); + mutex_unlock(g_list_busi_data_mutex); + + /* + IEC_OBJVAL_NEW objVal = { 0 }; + objVal.stype = 2; + objVal.dtime = time(NULL); + objVal.fval = v; + objVal.sadr = adr; + + addOriginDataListNew(objVal); + */ + + return true; +} + // I-Formar message decodification I格式帧解码 // msgbuf : asdu_header + asdu_body // len : asdu length @@ -1454,6 +1627,10 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen // 类型标识值<0>未用,在本配套标准中定义了 1 至 127 的值,128 至 255 未定义。136 // 至 255 可以由此标准的使用者彼此独立的进行定义,仅当使用具有类型标识号为 1 至 127 // 的范围的应用服务数据单元才能达到全部互操作。 + std::vector ycItems; + IEC_OBJVAL_NEW ycItem = { 2, 0 }; + time_t ts = time(NULL); + switch (header->type & 0xff) { case 1: // Single-point information @@ -1469,6 +1646,7 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen { adr++; pi = (SIQ104*)&msgbuf[pos]; + // bSetPointTableValueYX(pi->spi, adr); if (pi->spi) vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SP_%d: val:%d Valid: %d Blocked: %d", header->type, adr, pi->spi, pi->iv, pi->bl); @@ -1496,6 +1674,12 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen adr = pf->addr2 << 16 | pf->addr1 << 8 | pf->addr0; short *val = (short*)&pf->val[0]; bSetPointTableValueYC(*val, adr); + + ycItem.dtime = ts; + ycItem.sadr = adr; + ycItem.fval = *val; + ycItems.push_back(ycItem); + vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%d OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pf->qds.ov, pf->qds.iv, pf->qds.bl); pos += sizeof(SH104); adr++; @@ -1504,6 +1688,12 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen SHP104 *pfv = (SHP104*)&msgbuf[pos]; short *val = (short*)&pfv->val[0]; bSetPointTableValueYC(*val, adr); + + ycItem.dtime = ts; + ycItem.sadr = adr; + ycItem.fval = *val; + ycItems.push_back(ycItem); + if (*val) vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%d OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pfv->qds.ov, pfv->qds.iv, pfv->qds.bl); pos += sizeof(SHP104); @@ -1517,6 +1707,12 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen adr = pf->addr2 << 16 | pf->addr1 << 8 | pf->addr0; short *val = (short*)&pf->val[0]; bSetPointTableValueYC(*val, adr); + + ycItem.dtime = ts; + ycItem.sadr = adr; + ycItem.fval = *val; + ycItems.push_back(ycItem); + vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%d OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pf->qds.ov, pf->qds.iv, pf->qds.bl); pos += sizeof(SH104); } @@ -1530,7 +1726,13 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen adr = pf->addr2 << 16 | pf->addr1 << 8 | pf->addr0; float *val = (float*)&pf->val[0]; bSetPointTableValueYC(*val, adr); - vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%-.4f OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pf->qds.ov, pf->qds.iv, pf->qds.bl); + + ycItem.dtime = ts; + ycItem.sadr = adr; + ycItem.fval = *val; + ycItems.push_back(ycItem); + + vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%-.4f OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pf->qds.ov, pf->qds.iv, pf->qds.bl); pos += sizeof(SFP104); adr++; while (pos < len) @@ -1538,6 +1740,12 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen SFP104V *pfv = (SFP104V*)&msgbuf[pos]; float *val = (float*)&pfv->val[0]; bSetPointTableValueYC(*val, adr); + + ycItem.dtime = ts; + ycItem.sadr = adr; + ycItem.fval = *val; + ycItems.push_back(ycItem); + vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%-.4f OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pfv->qds.ov, pfv->qds.iv, pfv->qds.bl); pos += sizeof(SFP104V); adr++; @@ -1550,7 +1758,13 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen adr = pf->addr2 << 16 | pf->addr1 << 8 | pf->addr0; float *val = (float*)&pf->val[0]; bSetPointTableValueYC(*val, adr); - vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%-.4f OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pf->qds.ov, pf->qds.iv, pf->qds.bl); + + ycItem.dtime = ts; + ycItem.sadr = adr; + ycItem.fval = *val; + ycItems.push_back(ycItem); + + vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%-.4f OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pf->qds.ov, pf->qds.iv, pf->qds.bl); pos += sizeof(SFP104); } } @@ -1561,9 +1775,9 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen { SP104_T *ps = (SP104_T*)&msgbuf[pos]; adr = ps->addr2 << 16 | ps->addr1 << 8 | ps->addr0; - CP56Time *t = new CP56Time(ps->time); + CP56Time t56(ps->time); char buf[32]; - t->GetTimeString(buf, sizeof(buf)); + t56.GetTimeString(buf, sizeof(buf)); bSetPointTableValueYX(ps->siq.spi, adr); vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SP_%d: val:%d Valid: %d Blocked: %d time:%s", header->type, adr, ps->siq.spi, ps->siq.iv, ps->siq.bl, buf); pos += sizeof(SP104_T); @@ -1578,9 +1792,16 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen adr = pf->addr2 << 16 | pf->addr1 << 8 | pf->addr0; float *val = (float*)&pf->val[0]; bSetPointTableValueYC(*val, adr); - CP56Time *t = new CP56Time(pf->time); + + CP56Time t56(pf->time); + + ycItem.dtime = t56.GetTime(); + ycItem.sadr = adr; + ycItem.fval = *val; + ycItems.push_back(ycItem); + char buf[32]; - t->GetTimeString(buf, sizeof(buf)); + t56.GetTimeString(buf, sizeof(buf)); vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d: val:%-.4f OFlow: %d Valid: %d Blocked: %d time:%s", header->type, adr, *val, pf->qds.ov, pf->qds.iv, pf->qds.bl, buf); pos += sizeof(SFP104_T); } @@ -1627,6 +1848,11 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen vPrtLogMsg(LOG_WARNG, RET_OK, "Not Implemented!,type=%d", header->type); break; } + + if (!ycItems.empty()) + { + bSetPointTableValueYC(ycItems); + } return RET_OK; } // 发送召唤激活 @@ -1866,6 +2092,152 @@ void * thread_origin_proc(void * arg) return NULL; } +/*************************************************************************** +** function name : thread_origin_proc_new +** deacription : 104 origin data save database thread +** parameter : none +** return code : NULL +***************************************************************************/ +void * thread_origin_proc_new(void * arg) +{ + int count = 0, n = 0; + IEC_OBJVAL_NEW pData; + char szSql[512] = { 0 }, szWaringTime[32] = { 0 }; + MYSQL *pMySql = NULL; + g_IecCtrl.last_origin_time = time(NULL); + vPrtLogMsg(LOG_DEBUG, 0, "thread_origin_proc_new = %d startup...", GETTID()); + while (g_Running) + { + pthread_testcancels(); + memset(&pData, 0x00, sizeof(ST_IECPOINT_TABLE)); + mutex_lock(g_list_origin_mutex_new); + count = g_list_origin_new.size(); + n = 0; + if (g_list_origin_new.empty() || g_list_origin_new.size() <= 0) { + mutex_unlock(g_list_origin_mutex_new); + _SLEEP(MAX_SLEEP_EMPTY * 10); + continue; + } + //if (time(NULL) - g_IecCtrl.last_yc_time <= g_iec_conf.save_internal) // 超过设定保存时间间隔时 + //{ + // g_list_origin_new.clear(); + // mutex_unlock(g_list_origin_mutex_new); + // _SLEEP(MAX_SLEEP_EMPTY * 10); + // continue; + //} + //g_IecCtrl.last_yc_time = time(NULL); + + CDBMySQL *pdbHandle = CDBMySQL::Instance(); + if (!pdbHandle) { + mutex_unlock(g_list_origin_mutex_new); + _SLEEP(MAX_SLEEP_EMPTY * 5); + continue; + } + pMySql = pdbHandle->GetIdleMySql(); + if (pMySql == NULL) { + mutex_unlock(g_list_origin_mutex_new); + vPrtLogMsg(LOG_WARNG, RET_FAIL, "GetIdleMySql handle failed."); + _SLEEP(MAX_SLEEP_EMPTY); + continue; + } + pdbHandle->dbAutoCommit(pMySql, false); + for (int n = 0; n < (count > 500 ? 500 : count); n++) + { + memcpy(&pData, &(g_list_origin_new.back()), sizeof(IEC_OBJVAL_NEW)); // 由尾取出 + g_list_origin_new.pop_back(); // 由尾删除 + memset(szSql, 0x00, sizeof(szSql)); + vTranHostTimeFmt(pData.dtime, szWaringTime); + if (pData.stype == 1) // 遥信 + { + snprintf(szSql, sizeof(szSql), "INSERT INTO niec_origin_data(d_time,stype,sadr,ival) " + "VALUES('%s', '%d', '%d', '%u')", + szWaringTime, pData.stype, pData.sadr, pData.ival); + } + else { // 遥测 + snprintf(szSql, sizeof(szSql), "INSERT INTO niec_origin_data(d_time,stype,sadr,fval) " + "VALUES('%s', '%d', '%d', '%.4f')", + szWaringTime, pData.stype, pData.sadr, pData.fval); + } + pdbHandle->AddInsertRecord(pMySql, szSql); + } + pdbHandle->dbCommit(pMySql); + pdbHandle->dbAutoCommit(pMySql, true); + + time_t now = time(NULL); + if (now - g_IecCtrl.last_origin_time >= g_iec_conf.save_internal) // 超过设定保存时间间隔时 + { + g_IecCtrl.last_origin_time = now; + memset(szSql, 0x00, sizeof(szSql)); + vTranHostTimeFmt(now, szWaringTime); + snprintf(szSql, sizeof(szSql), "DELETE FROM iec_origin_data_new WHERE d_time<'%s' ", + szWaringTime); + pdbHandle->AddInsertRecord(pMySql, szSql); + } + + pdbHandle->SetIdleMysql(pMySql); + mutex_unlock(g_list_origin_mutex_new); + _SLEEP(MAX_SLEEP_EMPTY * 10); + } + return NULL; +} + +/*************************************************************************** +** function name : thread_busi_data_proc +** deacription : 104 business data save database thread +** parameter : none +** return code : NULL +***************************************************************************/ +void * thread_busi_data_proc(void * arg) +{ + int count = 0, n = 0; + ST_IECPOINT_TABLE pData; + char szSql[512] = { 0 }, szWaringTime[32] = { 0 }; + std::string sql; + MYSQL *pMySql = NULL; + vPrtLogMsg(LOG_DEBUG, 0, "thread_busi_data_proc = %d startup...", GETTID()); + while (g_Running) + { + pthread_testcancels(); + memset(&pData, 0x00, sizeof(ST_IECPOINT_TABLE)); + mutex_lock(g_list_busi_data_mutex); + count = g_list_busi_data.size(); + n = 0; + if (g_list_busi_data.empty() || g_list_busi_data.size() <= 0) { + mutex_unlock(g_list_busi_data_mutex); + _SLEEP(MAX_SLEEP_EMPTY * 10); + continue; + } + + CDBMySQL *pdbHandle = CDBMySQL::Instance(); + if (!pdbHandle) { + mutex_unlock(g_list_busi_data_mutex); + _SLEEP(MAX_SLEEP_EMPTY * 5); + continue; + } + pMySql = pdbHandle->GetIdleMySql(); + if (pMySql == NULL) { + mutex_unlock(g_list_busi_data_mutex); + vPrtLogMsg(LOG_WARNG, RET_FAIL, "GetIdleMySql handle failed."); + _SLEEP(MAX_SLEEP_EMPTY); + continue; + } + pdbHandle->dbAutoCommit(pMySql, false); + for (int n = 0; n < (count > 500 ? 500 : count); n++) + { + sql.swap(g_list_busi_data.back()); + g_list_busi_data.pop_back(); // Remove the tail + // memset(szSql, 0x00, sizeof(szSql)); + pdbHandle->AddInsertRecord(pMySql, sql.c_str()); + } + pdbHandle->dbCommit(pMySql); + pdbHandle->dbAutoCommit(pMySql, true); + pdbHandle->SetIdleMysql(pMySql); + mutex_unlock(g_list_busi_data_mutex); + _SLEEP(MAX_SLEEP_EMPTY * 10); + } + return NULL; +} + /*************************************************************************** ** function name : iGetSadrMatchRecord ** deacription : 104 addr point match list diff --git a/src/HTInitUtils.cpp b/src/HTInitUtils.cpp index 1577df9..9b0379f 100644 --- a/src/HTInitUtils.cpp +++ b/src/HTInitUtils.cpp @@ -38,6 +38,7 @@ int loadServiceEvent() //mutex_create(g_map_blq_mutex); //mutex_create(g_map_gis_104_mutex); mutex_create(g_map_iec_mutex); + mutex_create(g_map_iec_mutex_new); mutex_create(g_map_relation_mutex); mutex_create(g_map_sadr_mutex); mutex_create(g_map_img_thres_mutex); @@ -57,6 +58,7 @@ int loadServiceEvent() //mutex_close(g_map_blq_mutex); //mutex_close(g_map_gis_104_mutex); mutex_close(g_map_iec_mutex); + mutex_close(g_map_iec_mutex_new); mutex_close(g_map_relation_mutex); mutex_close(g_map_sadr_mutex); mutex_close(g_map_img_thres_mutex); @@ -91,6 +93,7 @@ int unLoadServiceEvent() // mutex_close(g_map_blq_mutex); //mutex_close(g_map_gis_104_mutex); mutex_close(g_map_iec_mutex); + mutex_close(g_map_iec_mutex_new); mutex_close(g_map_relation_mutex); mutex_close(g_map_sadr_mutex); mutex_close(g_map_img_thres_mutex); diff --git a/src/HTMemCacheData.cpp b/src/HTMemCacheData.cpp index a27d952..d2b68cf 100644 --- a/src/HTMemCacheData.cpp +++ b/src/HTMemCacheData.cpp @@ -381,6 +381,85 @@ static void vLoadIECPointTable() return; } +static void vLoadIECPointTableNew() +{ + char szSql[512] = { 0 }; + + CDBMySQL *pdbHandle = CDBMySQL::Instance(); + if (!pdbHandle) return; + strncpy(szSql, "SELECT t1.sadr,t1.stype,t1.site_id,t1.sensor_id,t2.table_name,t1.filed,t2.sensor_code,t2.system_code,t2.dtime_field_name,t2.devid_field_name,t2.one_dtime" + " FROM niec_points AS t1 LEFT JOIN niec_sensors AS t2 ON t1.sensor_id=t2.`id`" + " WHERE t1.sadr>0 AND t1.`status`=1 AND t2.`status`=1 " + " ORDER BY t1.site_id,t1.sadr", sizeof(szSql)); + MYSQL_RES *res = pdbHandle->SelectRecord(szSql); + if (!res) return; + + MYSQL_ROW row = NULL; + IEC_POINT stPoint; + IEC_DEVICE stSensor; + size_t idx = 0; + while (row = pdbHandle->GetRecord(res)) + { + memset(&stPoint, 0x00, sizeof(IEC_POINT)); + memset(&stSensor, 0x00, sizeof(IEC_DEVICE)); + // memset(&stDevStat, 0x00, sizeof(ST_DEVICE_TIME_STAT)); + + idx = 0; + if (row[idx] && strlen(row[idx]) > 0) { + stPoint.sadr = strtoul(row[idx], NULL, 10); + } + idx++; + if (row[idx] && strlen(row[idx]) > 0) { + stPoint.stype = strtoul(row[idx], NULL, 10); + } + idx++; + if (row[idx] && strlen(row[idx]) > 0) + { + stSensor.site_id = strtoul(row[idx], NULL, 10); + } + idx++; + if (row[idx] && strlen(row[idx]) > 0) + { + stPoint.sensor_id = strtoul(row[idx], NULL, 10); + } + idx++; + if (row[idx] && strlen(row[idx]) > 0) { + strncpy((char*)stSensor.tableName, (const char*)row[idx], sizeof(stSensor.tableName)); + } + idx++; + if (row[idx] && strlen(row[idx]) > 0) { + strncpy((char*)stPoint.fieldName, (const char*)row[idx], sizeof(stPoint.fieldName)); + } + // t2.sensor_code,t2.system_code,t2.dtime_field_name,t2.devid_field_name,t2.one_dtime + idx++; + idx++; + if (row[idx] && strlen(row[idx]) > 0) { + strncpy((char*)stSensor.sys_code, (const char*)row[idx], sizeof(stSensor.sys_code)); + } + idx++; + if (row[idx] && strlen(row[idx]) > 0) { + strncpy((char*)stSensor.dtimeFieldName, (const char*)row[idx], sizeof(stSensor.dtimeFieldName)); + } + idx++; + if (row[idx] && strlen(row[idx]) > 0) { + strncpy((char*)stSensor.devidFieldName, (const char*)row[idx], sizeof(stSensor.devidFieldName)); + } + idx++; + if (row[idx] && strlen(row[idx]) > 0) { + stSensor.one_dtime = (unsigned char)strtoul(row[idx], NULL, 10); + } + stSensor.sensor_id = stPoint.sensor_id; + + // vPrtLogMsg(LOG_WARNG, RET_OK, "----TABLE %s oneTime=%u", (const char*)stSensor.tableName, (unsigned int)stSensor.one_dtime); + + mutex_lock(g_map_iec_mutex_new); + g_map_devices[stSensor.sensor_id] = stSensor; + g_map_iec_new[stPoint.sadr] = stPoint; + mutex_unlock(g_map_iec_mutex_new); + } + pdbHandle->FreeRecord(res); +} + // 刷新设备更新时间 void SethDevTimeStat(unsigned char *sys_code, unsigned char ws) { @@ -864,6 +943,7 @@ void *thread_cache_proc(void *arg) // 同步内存数据线程 vLoadBYQThresholdTable(false); vLoadDeviceRelationsTable(); vLoadIECPointTable(); + vLoadIECPointTableNew(); vLoadImageThreshold(); //vPrtGisCacheRelation(); //vPrtBlqCacheRelation(); diff --git a/src/HTPublic.cpp b/src/HTPublic.cpp index 2057695..f5b3b5a 100644 --- a/src/HTPublic.cpp +++ b/src/HTPublic.cpp @@ -1285,3 +1285,16 @@ double getRandByTime() return (double)((double)(rand()%100 + 1) / (double)(RAND_MAX / rand())); } +int replaceAll(std::string& input, const std::string& search, const std::string& replace) +{ + int matched = 0; + size_t pos = 0; + while ((pos = input.find(search, pos)) != std::string::npos) + { + input.replace(pos, search.length(), replace); + pos += replace.length(); + ++matched; + } + + return matched; +} \ No newline at end of file diff --git a/src/HTService.cpp b/src/HTService.cpp index 6c54398..48692af 100644 --- a/src/HTService.cpp +++ b/src/HTService.cpp @@ -319,6 +319,8 @@ int main(int argc, char* argv[]) #ifdef _HT_IEC104_APP ht_pthread_create_background(&thread_handle_warning, thread_warn_proc, NULL); // 告警处理线程 ht_pthread_create_background(&thread_handle_origin, thread_origin_proc, NULL); // 原始数据入库线程 + ht_pthread_create_background(&thread_handle_origin_new, thread_origin_proc_new, NULL); // 原始数据入库线程 + ht_pthread_create_background(&thread_handle_busi_data, thread_busi_data_proc, NULL); // 业务数据入库线程 ht_pthread_create_background(&thread_handle_pingce, thread_pingce_proc, NULL); // 评测数据入库线程 ht_pthread_create_background(&thread_handle_gishold, thread_gis_hold_proc, NULL); // 评测数据入库线程 diff --git a/vsproj/iec104.vcxproj b/vsproj/iec104.vcxproj index fcfcc5a..f95b9b7 100644 --- a/vsproj/iec104.vcxproj +++ b/vsproj/iec104.vcxproj @@ -121,8 +121,8 @@ true - $(ProjectDir)..\include;D:\Workspace\deps\opencv-3.4.16\build\include;C:\Program Files\MariaDB 10.11\include;$(VC_IncludePath);$(WindowsSDK_IncludePath); - D:\Workspace\deps\opencv-3.4.16\build\x64\vc15\lib;D:\Workspace\deps\x64\dbg;$(VC_LibraryPath_x64);C:\Program Files\MariaDB 10.11\lib;$(WindowsSDK_LibraryPath_x64);$(NETFXKitsDir)Lib\um\x64 + $(ProjectDir)..\include;D:\Workspace\deps\opencv-3.4.16\build\include;C:\Program Files\MariaDB 10.10\include;$(VC_IncludePath);$(WindowsSDK_IncludePath); + D:\Workspace\deps\opencv-3.4.16\build\x64\vc15\lib;D:\Workspace\deps\x64\dbg;$(VC_LibraryPath_x64);C:\Program Files\MariaDB 10.10\lib;$(WindowsSDK_LibraryPath_x64);$(NETFXKitsDir)Lib\um\x64 false @@ -162,6 +162,9 @@ true kernel32.lib;user32.lib;gdi32.lib;winspool.lib;comdlg32.lib;advapi32.lib;shell32.lib;ole32.lib;oleaut32.lib;uuid.lib;odbc32.lib;odbccp32.lib;Shlwapi.lib;Secur32.lib;Crypt32.lib;zlibd.lib;%(AdditionalDependencies) + + copy /Y "D:\Workspace\deps\opencv-3.4.16\build\x64\vc15\bin\opencv_world3416d.dll" "$(OutDir)\opencv_world3416d.dll" + diff --git a/vsproj/iec104.vcxproj.user b/vsproj/iec104.vcxproj.user index be25078..ca2a4a1 100644 --- a/vsproj/iec104.vcxproj.user +++ b/vsproj/iec104.vcxproj.user @@ -1,4 +1,7 @@  - + + start + WindowsLocalDebugger + \ No newline at end of file