@@ -79,16 +79,7 @@ def __init__(
79
79
Assign default config since no addtional RS485 config that
80
80
input by user.
81
81
"""
82
- self .rs485_settings = (
83
- rs485_settings
84
- if rs485_settings
85
- else {
86
- "rts_level_for_tx" : True ,
87
- "rts_level_for_rx" : False ,
88
- "delay_before_tx" : 0.0 ,
89
- "delay_before_rx" : 0.0 ,
90
- }
91
- )
82
+ self .rs485_settings = rs485_settings
92
83
group = group if group else []
93
84
self .ser = self .serial_init (node )
94
85
self .group = []
@@ -108,42 +99,25 @@ def serial_init(self, node: str) -> serial.Serial:
108
99
stopbits = self .stopbits ,
109
100
timeout = self .timeout ,
110
101
)
111
- if self .type == "RS485" :
102
+ if self .rs485_settings :
112
103
"""
113
104
Mapping RS485 node with specific RS485 settings to
114
105
handle different rts_level.
115
106
"""
116
- if node in self .rs485_settings :
117
- ser .rs485_mode = serial .rs485 .RS485Settings (
118
- rts_level_for_tx = self .rs485_settings [node ].get (
119
- "rts_level_for_tx"
120
- )
121
- == "True" ,
122
- rts_level_for_rx = self .rs485_settings [node ].get (
123
- "rts_level_for_rx"
124
- )
125
- == "True" ,
126
- delay_before_tx = float (
127
- self .rs485_settings [node ].get ("delay_before_tx" )
128
- ),
129
- delay_before_rx = float (
130
- self .rs485_settings [node ].get ("delay_before_rx" )
131
- ),
132
- )
133
- else :
134
- """
135
- Use default RS485 setting if node does not have it own config
136
- """
137
- ser .rs485_mode = serial .rs485 .RS485Settings (
138
- rts_level_for_tx = self .rs485_settings .get (
139
- "rts_level_for_tx"
140
- ),
141
- rts_level_for_rx = self .rs485_settings .get (
142
- "rts_level_for_rx"
143
- ),
144
- delay_before_tx = self .rs485_settings .get ("delay_before_tx" ),
145
- delay_before_rx = self .rs485_settings .get ("delay_before_rx" ),
146
- )
107
+ ser .rs485_mode = serial .rs485 .RS485Settings (
108
+ rts_level_for_tx = self .rs485_settings [node ].get (
109
+ "rts_level_for_tx"
110
+ ),
111
+ rts_level_for_rx = self .rs485_settings [node ].get (
112
+ "rts_level_for_rx"
113
+ ),
114
+ delay_before_tx = self .rs485_settings [node ].get (
115
+ "delay_before_tx"
116
+ ),
117
+ delay_before_rx = self .rs485_settings [node ].get (
118
+ "delay_before_rx"
119
+ ),
120
+ )
147
121
logging .info (
148
122
"Init port %s with RS485 config "
149
123
"rts_level_for_tx: %s "
@@ -187,7 +161,9 @@ def generate_random_string(length):
187
161
return "" .join (random .choice (letters ) for _ in range (length ))
188
162
189
163
190
- def parse_rs485_config (rs485_conf : str = None ):
164
+ def parse_rs485_config (
165
+ target_node : str , rs485_conf : str = "" , group : list = []
166
+ ):
191
167
rs485_conf_lists = {}
192
168
"""
193
169
Parse RS485 config,
@@ -212,13 +188,31 @@ def parse_rs485_config(rs485_conf: str = None):
212
188
}
213
189
}
214
190
"""
191
+ # Mapping rs485 config
215
192
for rs485_conf_list in rs485_conf .split ():
216
193
node , rts_tx , rts_rx , delay_tx , delay_rx = rs485_conf_list .split (":" )
217
194
rs485_conf_lists [node ] = {
218
- "rts_level_for_tx" : rts_tx ,
219
- "rts_level_for_rx" : rts_rx ,
220
- "delay_before_tx" : delay_tx ,
221
- "delay_before_rx" : delay_rx ,
195
+ "rts_level_for_tx" : True if rts_tx == "True" else False ,
196
+ "rts_level_for_rx" : True if rts_rx == "True" else False ,
197
+ "delay_before_tx" : float (delay_tx ),
198
+ "delay_before_rx" : float (delay_rx ),
199
+ }
200
+ # Asign default value to the RS485 in group but not defined in RS485_CONFIG
201
+ for group_port in group :
202
+ if group_port not in rs485_conf_lists .keys ():
203
+ rs485_conf_lists [group_port ] = {
204
+ "rts_level_for_tx" : True ,
205
+ "rts_level_for_rx" : False ,
206
+ "delay_before_tx" : 0.0 ,
207
+ "delay_before_rx" : 0.0 ,
208
+ }
209
+ # mapping target port
210
+ if target_node not in rs485_conf_lists .keys ():
211
+ rs485_conf_lists [target_node ] = {
212
+ "rts_level_for_tx" : True ,
213
+ "rts_level_for_rx" : False ,
214
+ "delay_before_tx" : 0.0 ,
215
+ "delay_before_rx" : 0.0 ,
222
216
}
223
217
return rs485_conf_lists
224
218
@@ -445,6 +439,8 @@ def create_args():
445
439
"--rs485-config" ,
446
440
type = str ,
447
441
help = "RS485 configuration" ,
442
+ required = False ,
443
+ default = "" ,
448
444
)
449
445
return parser
450
446
@@ -454,8 +450,10 @@ def main():
454
450
args = parser .parse_args ()
455
451
456
452
init_logger ()
457
- if args .rs485_config :
458
- rs485_settings = parse_rs485_config (args .rs485_config )
453
+ if args .type == "RS485" :
454
+ rs485_settings = parse_rs485_config (
455
+ args .node , args .rs485_config , args .group
456
+ )
459
457
else :
460
458
rs485_settings = None
461
459
0 commit comments