perf: 优化订阅的永远断线重连以及间隔

main
huangfeng 6 months ago
parent 02db7284cb
commit 7acb7e4d9c

@ -10,6 +10,7 @@ import com.xydl.cac.iec.RealTimeDataService;
import com.xydl.cac.model.ColumnModel;
import com.xydl.cac.model.IcdAttUpdateModel;
import com.xydl.cac.model.Response;
import com.xydl.cac.repository.IcdIedRepository;
import com.xydl.cac.service.DataService;
import com.xydl.cac.service.IcdFileConfigService;
import io.swagger.annotations.Api;
@ -25,6 +26,7 @@ import javax.annotation.Resource;
import javax.validation.constraints.NotNull;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Optional;
@RestController
@Api(tags = {"IcdConfig相关接口"})
@ -38,6 +40,8 @@ public class IcdConfigController extends BasicController {
DataService dataService;
@Resource
RealTimeDataService realTimeDataService;
@Resource
IcdIedRepository iedRepository;
@PostMapping("upload")
@ApiOperation("上传客户端icd文件")
@ -161,8 +165,22 @@ public class IcdConfigController extends BasicController {
if (RealTimeDataService.inDoing) {
throw new BusinessException("请稍后再操作");
}
realTimeDataService.startCollect(iedId);
Optional<IcdIed> optional = iedRepository.findById(iedId);
if (!optional.isPresent()) {
throw new BusinessException("未找到该IED");
}
IcdIed ied = optional.get();
try {
realTimeDataService.startCollect(ied);
ied.setStart(Constants.TRUE);
iedRepository.save(ied);
return Response.success("OK");
} catch (Exception ex) {
realTimeDataService.stopCollect(ied.getId());
ied.setStart(Constants.FALSE);
iedRepository.save(ied);
throw ex;
}
}
@PostMapping("stopCollect")
@ -174,7 +192,14 @@ public class IcdConfigController extends BasicController {
if (RealTimeDataService.inDoing) {
throw new BusinessException("请稍后再操作");
}
realTimeDataService.stopCollect(iedId);
Optional<IcdIed> optional = iedRepository.findById(iedId);
if (!optional.isPresent()) {
throw new BusinessException("未找到该IED");
}
IcdIed ied = optional.get();
ied.setStart(Constants.FALSE);
iedRepository.save(ied);
realTimeDataService.stopCollect(ied.getId());
return Response.success("OK");
}

@ -55,7 +55,8 @@ public class IEDCollectService {
}
public void connect() throws Exception {
iecClient.connect(ied, xml);
iecClient.init(ied, xml);
iecClient.connect();
}
public void disconnect() {

@ -29,9 +29,11 @@ public class IecClient implements ClientEventListener {
ServerModel serverModel;
public boolean keep = false;
public boolean connected = false;
private RealTimeDataService realTimeDataService;
private WebSocketServer webSocketServer;
public int retry = 10;
public int retry = 0;
public int seconds = 0;
public Date lastReportTime;
public IecClient() {
@ -42,16 +44,20 @@ public class IecClient implements ClientEventListener {
webSocketServer = _webSocketServer;
}
public void connect(IcdIed _ied, String xml) throws Exception {
public void init(IcdIed _ied, String xml) throws Exception {
InputStream in = IOUtils.toInputStream(xml, StandardCharsets.UTF_8);
this.connect(_ied, in);
this.init(_ied, in);
}
public void connect(IcdIed _ied, InputStream in) throws Exception {
public void init(IcdIed _ied, InputStream in) throws Exception {
ied = _ied;
serverModel = SclParser.parse(in).get(0);
}
public void connect() throws Exception {
title = new int[]{1, 3, 9999, 33};
if (StringUtils.isNotBlank(_ied.getApTitle())) {
String[] strs = _ied.getApTitle().replaceAll(" ", ",").split(",");
if (StringUtils.isNotBlank(ied.getApTitle())) {
String[] strs = ied.getApTitle().replaceAll(" ", ",").split(",");
if (strs.length == 4) {
title[0] = Integer.parseInt(strs[0]);
title[1] = Integer.parseInt(strs[1]);
@ -69,14 +75,15 @@ public class IecClient implements ClientEventListener {
clientSap.setTSelRemote(new byte[]{0, 1});
clientSap.setTSelLocal(new byte[]{0, 0});
clientSap.setApTitleCalled(title);
serverModel = SclParser.parse(in).get(0);
clientAssociation = clientSap.associate(InetAddress.getByName(ied.getIp()), ied.getPort(), null, this);
clientAssociation.setServerModel(serverModel);
connected = true;
}
public void disconnect() {
try {
connected = false;
clientAssociation.disconnect();
clientAssociation = null;
} catch (Exception ignore) {
@ -87,11 +94,13 @@ public class IecClient implements ClientEventListener {
if (StaticVariable.shutdown == 1) {
return;
}
retry++;
seconds = 0;
clientAssociation = clientSap.associate(InetAddress.getByName(ied.getIp()), ied.getPort(), null, this);
clientAssociation.setServerModel(serverModel);
log.info("61850订阅断线重连成功, ied=" + ied.getName() + ", ip=" + ied.getIp() + ", port=" + ied.getPort());
retry = 10;
keep = true;
connected = true;
retry = 0;
}
public String getValue(String paramindex, String fc) throws Exception {
@ -126,7 +135,6 @@ public class IecClient implements ClientEventListener {
}
}
log.info("61850订阅成功, ied=" + ied.getName() + ", ip=" + ied.getIp() + ", port=" + ied.getPort());
keep = true;
}
public void disableReporting() {
@ -151,7 +159,6 @@ public class IecClient implements ClientEventListener {
} else {
log.info("61850停止订阅, ied=" + ied.getName() + ", ip=" + ied.getIp() + ", port=" + ied.getPort());
}
keep = false;
}
public List<FileInformation> listFile(String path) throws Exception {
@ -178,34 +185,27 @@ public class IecClient implements ClientEventListener {
@Override
public void associationClosed(IOException e) {
if (keep) {
retry = 0;
seconds = 0;
this.disableReporting();
this.disconnect();
while (retry > 0 && StaticVariable.shutdown == 0) {
retry--;
while (!connected && StaticVariable.shutdown == 0) {
try {
if (retry >= 5) {
StaticVariable.wait(10);
this.reconnect();
} else if (retry == 4) {
log.warn("61850订阅断线重连已失败5次, ied=" + ied.getName() + ", ip=" + ied.getIp() + ", port=" + ied.getPort());
StaticVariable.wait(60);
Thread.sleep(1000);
seconds++;
if (retry < 10 && seconds > 60) {
this.reconnect();
} else if (retry >= 1) {
StaticVariable.wait(60);
} else if (seconds > 60 * 60) {
this.reconnect();
} else {
String err = "61850订阅断线重连已失败多次不再重连. ied=" + ied.getName() + ", ip=" + ied.getIp() + ", port=" + ied.getPort();
log.warn(err);
if (realTimeDataService != null && ied != null) {
realTimeDataService.stopCollect(ied.getId());
}
if (webSocketServer != null) {
} catch (Exception ignore) {
String err = "61850订阅断线重连失败" + retry + "次, ied="
+ ied.getName() + ", ip=" + ied.getIp() + ", port=" + ied.getPort();
log.warn(err);
if (webSocketServer != null && retry >= 10) {
webSocketServer.sendMessage(err, null);
}
}
break;
} catch (Exception ignore) {
}
}
}
}
@ -218,7 +218,8 @@ public class IecClient implements ClientEventListener {
.port(102)
.apTitle("1 3 9999 33")
.build();
iecClient.connect(ied, new FileInputStream("C:/资料/om.SCD"));
iecClient.init(ied, new FileInputStream("C:/资料/om.SCD"));
iecClient.connect();
String str = iecClient.getValue("OMDLMONT/SPDC1.MaxDsch.mag.f", "MX");
System.out.println(str);
str = iecClient.getValue("OMDLMONT/SPDC1.MaxDsch.t", "MX");

@ -36,52 +36,37 @@ public class RealTimeDataService {
if (!CollectionUtils.isEmpty(list)) {
for (IcdIed ied : list) {
try {
this.startCollect(ied.getId());
this.startCollect(ied);
} catch (Exception ignore) {
}
}
}
}
public void startCollect(Integer iedId) throws BusinessException {
Optional<IcdIed> optional = iedRepository.findById(iedId);
if (!optional.isPresent()) {
throw new BusinessException("未找到该IED");
}
IcdIed ied = optional.get();
public void startCollect(IcdIed ied) throws BusinessException {
Optional<IcdFile> optionalFile = fileRepository.findById(ied.getIcdFileId());
if (!optionalFile.isPresent()) {
throw new BusinessException("未找到该icd文件");
}
inDoing = true;
IcdFile icdFile = optionalFile.get();
try {
IecClient iecClient = new IecClient(this, webSocketServer);
iecClient.connect(ied, icdFile.getXml());
iecClient.enableReporting();
ied.setStart(Constants.TRUE);
iedRepository.save(ied);
try {
iecClient.init(ied, icdFile.getXml());
iecClient.keep = true;
StaticVariable.realTimeClientMap.put(ied.getId(), iecClient);
iecClient.connect();
iecClient.enableReporting();
} catch (Exception ex) {
String err = "61850订阅异常, ied=" + ied.getName() + ", ip=" + ied.getIp() + ", port=" + ied.getPort();
log.error(err, ex);
this.stopCollect(iedId);
iecClient.associationClosed(null);
throw new BusinessException(err);
} finally {
inDoing = false;
}
}
public void stopCollect(Integer iedId) {
this.onlyStop(iedId);
Optional<IcdIed> optional = iedRepository.findById(iedId);
if (optional.isPresent()) {
IcdIed ied = optional.get();
ied.setStart(Constants.FALSE);
iedRepository.save(ied);
}
}
public static void onErrorCheck(Integer iedId) {
IecClient iecClient = StaticVariable.realTimeClientMap.get(iedId);
if (iecClient != null) {
@ -89,13 +74,13 @@ public class RealTimeDataService {
}
}
private void onlyStop(Integer iedId) {
public void stopCollect(Integer iedId) {
IecClient iecClient = StaticVariable.realTimeClientMap.get(iedId);
if (iecClient != null) {
StaticVariable.realTimeClientMap.remove(iedId);
iecClient.keep = false;
iecClient.disableReporting();
iecClient.disconnect();
StaticVariable.realTimeClientMap.remove(iedId);
}
}
@ -109,7 +94,7 @@ public class RealTimeDataService {
idList.add(it.next());
}
for (Integer iedId : idList) {
this.onlyStop(iedId);
this.stopCollect(iedId);
}
}

@ -100,7 +100,8 @@ public class IedDlConfigServiceImpl implements IedDlConfigService {
IcdFile icdFile = optionalFile.get();
IecClient iecClient = new IecClient();
try {
iecClient.connect(ied, icdFile.getXml());
iecClient.init(ied, icdFile.getXml());
iecClient.connect();
return iecClient.listFile(path);
} catch (Exception ex) {
throw new BusinessException(ex.getMessage());

Loading…
Cancel
Save