-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathkafkaHWMapper.cxx
154 lines (132 loc) · 5.72 KB
/
kafkaHWMapper.cxx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
/** © Copyright 2019 CERN
*
* This software is distributed under the terms of the
* GNU Lesser General Public Licence version 3 (LGPL Version 3),
* copied verbatim in the file “LICENSE”
*
* In applying this licence, CERN does not waive the privileges
* and immunities granted to it by virtue of its status as an
* Intergovernmental Organization or submit itself to any jurisdiction.
*
* Author: Alexandru Savulescu (HSE-CEN-CO)
*
**/
#include "kafkaHWMapper.hxx"
#include "Transformations/kafkaStringTrans.hxx"
#include "Transformations/kafkaInt32Trans.hxx"
#include "Transformations/kafkaInt64Trans.hxx"
#include "Transformations/kafkaFloatTrans.hxx"
#include "Transformations/kafkaBoolTrans.hxx"
#include "Transformations/kafkaUint8Trans.hxx"
#include "Transformations/kafkaTimeTrans.hxx"
#include "Common/Logger.hxx"
#include "Common/Utils.hxx"
#include <PVSSMacros.hxx> // DEBUG macros
//--------------------------------------------------------------------------------
// We get new configs here. Create a new HW-Object on arrival and insert it.
PVSSboolean kafkaHWMapper::addDpPa(DpIdentifier &dpId, PeriphAddr *confPtr)
{
// We don't use Subindices here, so its simple.
// Otherwise we had to look if we already have a HWObject and adapt its length.
Common::Logger::globalInfo(Common::Logger::L1,"addDpPa called for ", confPtr->getName().c_str());
Common::Logger::globalInfo(Common::Logger::L1,"addDpPa direction ", CharString(confPtr->getDirection()));
// tell the config how we will transform data to/from the device
// by installing a Transformation object into the PeriphAddr
// In this template, the Transformation type was set via the
// configuration panel (it is already set in the PeriphAddr)
// TODO this really depends on your protocol and is therefore just an example
// in this example we use the ones from Pbus, as those can be selected
// with the SIM driver parametrization panel
switch ((uint32_t)confPtr->getTransformationType()) {
case TransUndefinedType:
Common::Logger::globalInfo(Common::Logger::L3,"Undefined transformation" + CharString(confPtr->getTransformationType()));
return HWMapper::addDpPa(dpId, confPtr);
case kafkaDrvBoolTransType:
Common::Logger::globalInfo(Common::Logger::L3,"Bool transformation");
confPtr->setTransform(new Transformations::kafkaBoolTrans);
break;
case kafkaDrvUint8TransType:
Common::Logger::globalInfo(Common::Logger::L3,"Uint8 transformation");
confPtr->setTransform(new Transformations::kafkaUint8Trans);
break;
case kafkaDrvInt32TransType:
Common::Logger::globalInfo(Common::Logger::L3,"Int32 transformation");
confPtr->setTransform(new Transformations::kafkaInt32Trans);
break;
case kafkaDrvInt64TransType:
Common::Logger::globalInfo(Common::Logger::L3,"Int64 transformation");
confPtr->setTransform(new Transformations::kafkaInt64Trans);
break;
case kafkaDrvFloatTransType:
Common::Logger::globalInfo(Common::Logger::L3,"Float transformation");
confPtr->setTransform(new Transformations::kafkaFloatTrans);
break;
case kafkaDrvStringTransType:
Common::Logger::globalInfo(Common::Logger::L3,"String transformation");
confPtr->setTransform(new Transformations::kafkaStringTrans);
break;
case kafkaDrvTimeTransType:
Common::Logger::globalInfo(Common::Logger::L3,"Time transformation");
confPtr->setTransform(new Transformations::kafkaTimeTrans);
break;
default:
Common::Logger::globalError("kafkaHWMapper::addDpPa", CharString("Illegal transformation type ") + CharString((int) confPtr->getTransformationType()));
return HWMapper::addDpPa(dpId, confPtr);
}
// First add the config, then the HW-Object
if ( !HWMapper::addDpPa(dpId, confPtr) ) // FAILED !!
return PVSS_FALSE;
HWObject *hwObj = new HWObject;
// Set Address and Subindex
Common::Logger::globalInfo(Common::Logger::L3, "New Object", "name:" + confPtr->getName());
hwObj->setConnectionId(confPtr->getConnectionId());
hwObj->setAddress(confPtr->getName()); // Resolve the HW-Address, too
// Set the data type.
hwObj->setType(confPtr->getTransform()->isA());
// Set the len needed for data from _all_ subindices of this PVSS-Address.
// Because we will deal with subix 0 only this is the Transformation::itemSize
hwObj->setDlen(confPtr->getTransform()->itemSize());
//TODO - number of elements?
// Add it to the list
addHWObject(hwObj);
if(confPtr->getDirection() == DIRECTION_IN)
{
std::vector<std::string> streamOptions = Common::Utils::split(hwObj->getAddress().c_str());
if (streamOptions.size() == 2) // TOPIC + KEY
{
addTopic(streamOptions[0]);
}
}
return PVSS_TRUE;
}
//--------------------------------------------------------------------------------
PVSSboolean kafkaHWMapper::clrDpPa(DpIdentifier &dpId, PeriphAddr *confPtr)
{
DEBUG_DRV_USR1("clrDpPa called for " << confPtr->getName());
// Find our HWObject via a template
HWObject adrObj;
adrObj.setAddress(confPtr->getName());
// Lookup HW-Object via the Name, not via the HW-Address
// The class type isn't important here
HWObject *hwObj = findHWAddr(&adrObj);
if ( hwObj )
{
// Object exists, remove it from the list and delete it.
clrHWObject(hwObj);
delete hwObj;
}
// Call function of base class to remove config
return HWMapper::clrDpPa(dpId, confPtr);
}
void kafkaHWMapper::addTopic(const std::string &topic)
{
if(consumer_topics.find(topic) == consumer_topics.end())
{
consumer_topics.insert(topic);
if(_newConsumerTopicCB)
{
_newConsumerTopicCB(topic);
}
}
}
//--------------------------------------------------------------------------------