Skip to content

[ce-oem] Add rs485 config handle for multiple rs485 port (BugFix) #1897

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,8 @@
import argparse


def print_ports_config(string: str, rs485_conf: str = None):
def print_ports_config(string: str):
ports_config_list = string.split()
rs485_conf_lists = {}
"""
Parse RS485 config,
e.g.
Input:
RS485_CONFIG = "/dev/ttySC0:True:False:0.0:0.0
/dev/ttySC2:True:False:0.0:0.0"

Output:
rs485_conf_lists = {
"/dev/ttySC0": {
"rts_level_for_tx": True,
"rts_level_for_rx": False,
"delay_before_tx: 0.0,
"delay_before_rx: 0.0,
}
"/dev/ttySC2": {
"rts_level_for_tx": True,
"rts_level_for_rx": False,
"delay_before_tx: 0.0,
"delay_before_rx: 0.0,
}
}
"""
if rs485_conf:
for rs485_conf_list in rs485_conf.split():
node, rts_tx, rts_rx, delay_tx, delay_rx = rs485_conf_list.split(
":"
)
rs485_conf_lists[node] = {
"rts_level_for_tx": rts_tx,
"rts_level_for_rx": rts_rx,
"delay_before_tx": delay_tx,
"delay_before_rx": delay_rx,
}
serials = []
rs485_nodes = []
rs422_nodes = []
Expand All @@ -54,26 +19,8 @@ def print_ports_config(string: str, rs485_conf: str = None):
serial = {}
port_type, port_node, baud_rate = config_parts
serial["type"] = port_type

"""
Init a config dict if type is RS485, and the init value are refer
to the serial.rs485 module.
ref:
https://pyserial.readthedocs.io/en/latest/pyserial_api.html#serial.rs485.RS485Settings
"""
if port_type == "RS485":
serial["rs485_conf"] = {
"rts_level_for_tx": True,
"rts_level_for_rx": False,
"delay_before_tx": 0.0,
"delay_before_rx": 0.0,
}
serial["node"] = port_node
serial["baudrate"] = baud_rate

# Mapping rs485 configs with rs485 node name and update the config
if port_node in rs485_conf_lists.keys():
serial["rs485_conf"] = rs485_conf_lists[port_node]
serials.append(serial)
if port_type == "RS485":
rs485_nodes.append(port_node)
Expand All @@ -83,9 +30,6 @@ def print_ports_config(string: str, rs485_conf: str = None):
for serial in serials:
print("type: {}".format(serial["type"]))
print("node: {}".format(serial["node"]))
if serial["type"] == "RS485":
for key, value in serial["rs485_conf"].items():
print("{}: {}".format(key, value))
print("baudrate: {}".format(serial["baudrate"]))
print("group: ", end="")
if serial["type"] == "RS485":
Expand All @@ -106,15 +50,8 @@ def main():
type=str,
help="The string needed to be parsed",
)
parser.add_argument(
"--rs485-conf",
type=str,
help="RS485 specific configurations.",
default=None,
required=False,
)
args = parser.parse_args()
print_ports_config(args.string, args.rs485_conf)
print_ports_config(args.string)


if __name__ == "__main__":
Expand Down
154 changes: 97 additions & 57 deletions contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/serial_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,11 @@ def __init__(
self.stopbits = stopbits if stopbits else serial.STOPBITS_ONE
self.timeout = timeout if timeout else 3
self.datasize = datasize if datasize else 1024
self.rs485_settings = (
rs485_settings
if rs485_settings
else {
"rts_level_for_tx": True,
"rts_level_for_rx": False,
"delay_before_tx": 0.0,
"delay_before_rx": 0.0,
}
)
"""
Assign default config since no addtional RS485 config that
input by user.
"""
self.rs485_settings = rs485_settings
group = group if group else []
self.ser = self.serial_init(node)
self.group = []
Expand All @@ -104,12 +99,36 @@ def serial_init(self, node: str) -> serial.Serial:
stopbits=self.stopbits,
timeout=self.timeout,
)
if self.type == "RS485":
if self.rs485_settings:
"""
Mapping RS485 node with specific RS485 settings to
handle different rts_level.
"""
ser.rs485_mode = serial.rs485.RS485Settings(
rts_level_for_tx=self.rs485_settings.get("rts_level_for_tx"),
rts_level_for_rx=self.rs485_settings.get("rts_level_for_rx"),
delay_before_tx=self.rs485_settings.get("delay_before_tx"),
delay_before_rx=self.rs485_settings.get("delay_before_rx"),
rts_level_for_tx=self.rs485_settings[node].get(
"rts_level_for_tx"
),
rts_level_for_rx=self.rs485_settings[node].get(
"rts_level_for_rx"
),
delay_before_tx=self.rs485_settings[node].get(
"delay_before_tx"
),
delay_before_rx=self.rs485_settings[node].get(
"delay_before_rx"
),
)
logging.info(
"Init port %s with RS485 config "
"rts_level_for_tx: %s "
"rts_level_for_rx: %s "
"delay_befor_tx: %s "
"delay_befor_rx: %s ",
node,
ser.rs485_mode.rts_level_for_tx,
ser.rs485_mode.rts_level_for_rx,
ser.rs485_mode.delay_before_tx,
ser.rs485_mode.delay_before_rx,
)
ser.reset_input_buffer()
ser.reset_output_buffer()
Expand Down Expand Up @@ -142,6 +161,62 @@ def generate_random_string(length):
return "".join(random.choice(letters) for _ in range(length))


def parse_rs485_config(
target_node: str, rs485_conf: str = "", group: list = []
):
rs485_conf_lists = {}
"""
Parse RS485 config,
e.g.
Input:
RS485_CONFIG = "/dev/ttySC0:True:False:0.0:0.0
/dev/ttySC2:True:False:0.0:0.0"

Output:
rs485_conf_lists = {
"/dev/ttySC0": {
"rts_level_for_tx": True,
"rts_level_for_rx": False,
"delay_before_tx: 0.0,
"delay_before_rx: 0.0,
}
"/dev/ttySC2": {
"rts_level_for_tx": True,
"rts_level_for_rx": False,
"delay_before_tx: 0.0,
"delay_before_rx: 0.0,
}
}
"""
# Mapping rs485 config
for rs485_conf_list in rs485_conf.split():
node, rts_tx, rts_rx, delay_tx, delay_rx = rs485_conf_list.split(":")
rs485_conf_lists[node] = {
"rts_level_for_tx": True if rts_tx == "True" else False,
"rts_level_for_rx": True if rts_rx == "True" else False,
"delay_before_tx": float(delay_tx),
"delay_before_rx": float(delay_rx),
}
# Asign default value to the RS485 in group but not defined in RS485_CONFIG
for group_port in group:
if group_port not in rs485_conf_lists.keys():
rs485_conf_lists[group_port] = {
"rts_level_for_tx": True,
"rts_level_for_rx": False,
"delay_before_tx": 0.0,
"delay_before_rx": 0.0,
}
# mapping target port
if target_node not in rs485_conf_lists.keys():
rs485_conf_lists[target_node] = {
"rts_level_for_tx": True,
"rts_level_for_rx": False,
"delay_before_tx": 0.0,
"delay_before_rx": 0.0,
}
return rs485_conf_lists


def server_mode(
node,
type=None,
Expand Down Expand Up @@ -360,40 +435,12 @@ def create_args():
help="Timeout to receive",
default=3,
)

# Create RS485 subparser that only activates when --type=RS485
rs485_group = parser.add_argument_group(
"RS485 Options", "RS485-specific configuration options"
)
rs485_group.add_argument(
"--rts-level-for-tx",
choices=["True", "False"],
type=str,
help="RTS level for transmission." "Equal to RTS_ON_SEND",
default="True",
required=False,
)
rs485_group.add_argument(
"--rts-level-for-rx",
choices=["True", "False"],
parser.add_argument(
"--rs485-config",
type=str,
help="RTS level for reception." "Equal to RTS_AFTER_SEND",
default="False",
required=False,
)
rs485_group.add_argument(
"--rts-delay-before-tx",
type=float,
help="Delay after setting RTS but before transmission starts.",
default=0.0,
required=False,
)
rs485_group.add_argument(
"--rts-delay-before-rx",
type=float,
help="Delay after transmission ends and resetting RTS.",
default=0.0,
help="RS485 configuration",
required=False,
default="",
)
return parser

Expand All @@ -404,16 +451,9 @@ def main():

init_logger()
if args.type == "RS485":
rs485_settings = {
"rts_level_for_tx": (
True if args.rts_level_for_tx == "True" else False
),
"rts_level_for_rx": (
True if args.rts_level_for_rx == "True" else False
),
"delay_before_tx": args.rts_delay_before_tx,
"delay_before_rx": args.rts_delay_before_rx,
}
rs485_settings = parse_rs485_config(
args.node, args.rs485_config, args.group
)
else:
rs485_settings = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,15 @@ _description:
This is to allow template jobs to then be instantiated.
TYPE:NODE:BAUDRATE
SERIAL_PORTS="RS485:/dev/ttyS0:9600 RS485:/dev/ttyS1:9600 RS232:/dev/ttyS2:115200"
For RS485 specific configuration:
NODE:rts_level_for_tx{True|False}:rts_level_for_rx{True|False}:delay_before_tx{float}:delay_before_rx{float}
RS485_CONFIG="/dev/ttySC0:True:False:0.0:0.0"
plugin: resource
estimated_duration: 1.0
environ:
SERIAL_PORTS RS485_CONFIG
SERIAL_PORTS
command:
if [ -z "$SERIAL_PORTS" ]; then
exit 0
fi
serial_config_parser.py "$SERIAL_PORTS" --rs485-conf "$RS485_CONFIG"
serial_config_parser.py "$SERIAL_PORTS"

unit: template
template-resource: ce-oem-serial/serial-list
Expand All @@ -84,10 +81,11 @@ user: root
category_id: com.canonical.certification::serial
estimated_duration: 30
flags: also-after-suspend
environ: RS485_CONFIG
command:
# shellcheck disable=SC2050
if [ {{ type }} == "RS485" ]; then
serial_test.py {{ node }} --mode client --type {{ type }} --group {{ group }} --baudrate {{ baudrate }} --rts-level-for-tx {{ rts_level_for_tx }} --rts-level-for-rx {{ rts_level_for_rx }} --rts-delay-before-tx {{ delay_before_tx }} --rts-delay-before-rx {{ delay_before_rx }}
serial_test.py {{ node }} --mode client --type {{ type }} --group {{ group }} --baudrate {{ baudrate }} --rs485-config "$RS485_CONFIG"
else
serial_test.py {{ node }} --mode client --type {{ type }} --group {{ group }} --baudrate {{ baudrate }}
fi
Loading