Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 65 additions & 10 deletions umatiGateway/Core/PubSub/PubSubProvider.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 FVA GmbH - interop4x. All rights reserved.
using MQTTnet.Packets;
using NLog;
using Opc.Ua;
using Opc.Ua.Client;
Expand Down Expand Up @@ -35,6 +36,10 @@ public class PubSubProvider
private readonly System.Timers.Timer pubTestTimer = new System.Timers.Timer(5000);
private ReferenceDescriptionResolver referenceDescriptionResolver;
private bool alwaysIncludeBrowsePathIndex = false;
private Dictionary<string, string> metaTopics = new Dictionary<string, string>();
private Dictionary<string, string> topics = new Dictionary<string, string>();
private List<Replacement> topicReplacements = new List<Replacement>();
private List<Replacement> metaTopicReplacements = new List<Replacement>();
private readonly BlockingCollection<(MonitoredItem, MonitoredItemNotificationEventArgs)> notificationqueue = new BlockingCollection<(MonitoredItem, MonitoredItemNotificationEventArgs)>();
public List<MachineNode> MachineNodes { get; set; } = new List<MachineNode>();

Expand All @@ -47,10 +52,12 @@ public PubSubProvider(UmatiGatewayApp app)
}
public void Connect()
{
this.metaTopics.Clear();
this.topics.Clear();
this.metaTopicReplacements.Clear();
this.topicReplacements.Clear();
Session session = this.client.CheckSession();
Console.WriteLine("FetchTypeTree start");
session.FetchTypeTree(ObjectTypeIds.BaseObjectType);
Console.WriteLine("FetchTypeTree end");
referenceDescriptionResolver = new ReferenceDescriptionResolver(client);
CreateSubscriptions();
client.SubscribeToDataChanges(subscriptionIds, updateDataValue);
Expand Down Expand Up @@ -132,25 +139,62 @@ private void PreSubscribe(NodeId nodeId)
public void Subscribe(HierarchicalNode hierarchicalNode)
{
PreSubscribe(hierarchicalNode.NodeId);
createDataSetAndWritersNew(hierarchicalNode);
foreach (HierarchicalNode hierarchicalChildNode in hierarchicalNode.hierarchicalChilds.Values)
{
Subscribe(hierarchicalChildNode);
}
createDataSetAndWritersNew(hierarchicalNode);
//createDataSetAndWritersNew(hierarchicalNode);
}
public string CreateTopic(HierarchicalNode hierarchicalProperty)
{
UmatiConfiguration config = app.ActiveConfiguration;
string nstopic = $"{config.PubSubProviderConfig.Prefix}/json/data/{config.PubSubProviderConfig.ClientId}/{GetBrowsePath(hierarchicalProperty, true)}";
string topic = $"{config.PubSubProviderConfig.Prefix}/json/data/{config.PubSubProviderConfig.ClientId}/{GetBrowsePath(hierarchicalProperty)}";
foreach (KeyValuePair<string, string> knownTopic in this.topics)
{
if (topic == knownTopic.Value)
{
this.topicReplacements.Add(new Replacement(nstopic, topic, $"{config.PubSubProviderConfig.Prefix}/json/data/{config.PubSubProviderConfig.ClientId}/{GetBrowsePath(hierarchicalProperty, true, true)}"));
}
}
topics.TryAdd(nstopic, topic);
for (int i = this.topicReplacements.Count - 1; i >= 0; i--)
{
Replacement replacement = this.topicReplacements[i];
if (nstopic.StartsWith(replacement.nsTopic))
{
topic = topic.Substring(replacement.topic.Length);
topic = replacement.replaceTopic + topic;
}
}
return topic;
}
public string CreateMetaTopic(HierarchicalNode hierarchicalProperty)
{
UmatiConfiguration config = app.ActiveConfiguration;
string topic = $"{config.PubSubProviderConfig.Prefix}/json/metadata/{config.PubSubProviderConfig.ClientId}/{GetBrowsePath(hierarchicalProperty)}";
return topic;
string nsMetaTopic = $"{config.PubSubProviderConfig.Prefix}/json/metadata/{config.PubSubProviderConfig.ClientId}/{GetBrowsePath(hierarchicalProperty, true)}";
string metaTopic = $"{config.PubSubProviderConfig.Prefix}/json/metadata/{config.PubSubProviderConfig.ClientId}/{GetBrowsePath(hierarchicalProperty)}";
foreach (KeyValuePair<string, string> knownMetaTopic in this.metaTopics)
{
if (metaTopic == knownMetaTopic.Value)
{
this.metaTopicReplacements.Add(new Replacement(nsMetaTopic, metaTopic, $"{config.PubSubProviderConfig.Prefix}/json/metadata/{config.PubSubProviderConfig.ClientId}/{GetBrowsePath(hierarchicalProperty, true, true)}"));
}
}
this.metaTopics.TryAdd(nsMetaTopic, metaTopic);
for (int i = this.metaTopicReplacements.Count - 1; i >= 0; i--)
{
Replacement replacement = this.metaTopicReplacements[i];
if (nsMetaTopic.StartsWith(replacement.nsTopic))
{
metaTopic = metaTopic.Substring(replacement.topic.Length);
metaTopic = replacement.replaceTopic + metaTopic;
}
}
return metaTopic;
}
public string GetBrowsePath(HierarchicalNode hierarchicalNode, bool includeNamespaceIndex = false, string delimeter = "/")
public string GetBrowsePath(HierarchicalNode hierarchicalNode, bool includeNamespaceIndex = false, bool onlyIndexCurrentNode = false, string delimeter = "/")
{
string browsePath = "";
if (includeNamespaceIndex || alwaysIncludeBrowsePathIndex)
Expand All @@ -164,7 +208,7 @@ public string GetBrowsePath(HierarchicalNode hierarchicalNode, bool includeNames
HierarchicalNode? parentNode = hierarchicalNode.Parent;
while (parentNode != null)
{
if (includeNamespaceIndex || alwaysIncludeBrowsePathIndex)
if ((includeNamespaceIndex && !onlyIndexCurrentNode) || alwaysIncludeBrowsePathIndex)
{
browsePath = parentNode.BrowseName.ToString() + delimeter + browsePath;
}
Expand Down Expand Up @@ -202,7 +246,7 @@ public void createDataSetAndWritersForObject(HierarchicalNode hierarchicalNode)
}
}
//Add a Virtaul Id
DataValue dataValue = new DataValue(GetBrowsePath(hierarchicalNode, true, "."));
DataValue dataValue = new DataValue(GetBrowsePath(hierarchicalNode, true, false, "."));
VirtualId virtualId = new VirtualId(new NodeId("virtualId_" + uniqueint, 1), dataValue);
virtualIds.Add(virtualId);
KeyValuePairCollection keyValuePairs = GetRealationsAsKeyValuePair(hierarchicalNode);
Expand Down Expand Up @@ -673,10 +717,9 @@ public void createDataSetAndWritersForVariable(HierarchicalNode hierarchicalNode
return;
}
int uniqueint = ++counter;
topic = CreateTopic(hierarchicalNode);
string dataSetName = new ExpandedNodeId(hierarchicalNode.NodeId).ToString();
//Add a Virtaul Id
DataValue dataValue = new DataValue(GetBrowsePath(hierarchicalNode, true, "."));
DataValue dataValue = new DataValue(GetBrowsePath(hierarchicalNode, true, false, "."));
VirtualId virtualId = new VirtualId(new NodeId("virtualId_" + uniqueint, 1), dataValue);
virtualIds.Add(virtualId);
KeyValuePairCollection keyValuePairs = GetRealationsAsKeyValuePair(hierarchicalNode);
Expand Down Expand Up @@ -970,4 +1013,16 @@ public void StartWorker()
return hierarchicalNode;
}
}
public class Replacement
{
public string nsTopic = "";
public string topic = "";
public string replaceTopic = "";
public Replacement(string nsTopic, string topic, string replaceTopic)
{
this.nsTopic = nsTopic;
this.topic = topic;
this.replaceTopic = replaceTopic;
}
}
}
Loading