Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/env python3
from building_control_py_pkg_interfaces.msg import TempUnit
from building_control_py_pkg_interfaces.msg import FanAck
from building_control_py_pkg_interfaces.msg import FanCmd

#========================================================
# Re-running Codegen will overwrite changes to this file
#========================================================

def enumToString(value):
typedValue = TempUnit()
typedValue.data = value
match (typedValue.temp_unit):
case TempUnit.TEMP_UNIT_FAHRENHEIT:
return "TempUnit Fahrenheit"
case TempUnit.TEMP_UNIT_CELSIUS:
return "TempUnit Celsius"
case TempUnit.TEMP_UNIT_KELVIN:
return "TempUnit Kelvin"
case default:
return "Unknown value for TempUnit"

def enumToString(value):
typedValue = FanAck()
typedValue.data = value
match (typedValue.fan_ack):
case FanAck.FAN_ACK_OK:
return "FanAck Ok"
case FanAck.FAN_ACK_ERROR:
return "FanAck Error"
case default:
return "Unknown value for FanAck"

def enumToString(value):
typedValue = FanCmd()
typedValue.data = value
match (typedValue.fan_cmd):
case FanCmd.FAN_CMD_ON:
return "FanCmd On"
case FanCmd.FAN_CMD_OFF:
return "FanCmd Off"
case default:
return "Unknown value for FanCmd"

Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from collections import deque
from building_control_py_pkg.user_code.tcp_fan_src import *
from rclpy.callback_groups import ReentrantCallbackGroup
from building_control_py_pkg_interfaces.msg import FanCmd
from building_control_py_pkg_interfaces.msg import FanAck

#========================================================
# Re-running Codegen will overwrite changes to this file
#========================================================

class tcp_fan_base(Node):
def __init__(self):
super().__init__("tcp_fan")

self.cb_group_ = ReentrantCallbackGroup()

# Setting up connections
self.tcp_fan_fanCmd_subscription_ = self.create_subscription(
FanCmd,
"tcp_fan_fanCmd",
self.handle_fanCmd,
1,
callback_group=self.cb_group_)

self.tcp_fan_fanAck_publisher_ = self.create_publisher(
FanAck,
"tcp_tempControl_fanAck",
1)

def timeTriggered(self):
raise NotImplementedError("Subclasses must implement this method")

#=================================================
# C o m m u n i c a t i o n
#=================================================

def put_fanAck(self, msg):
self.tcp_fan_fanAck_publisher_.publish(msg)


#=================================================
# C o m p u t e E n t r y P o i n t
#=================================================
def handle_fanCmd(self, msg):
raise NotImplementedError("Subclasses must implement this method")

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
from building_control_py_pkg.user_code.tcp_fan_src import tcp_fan
#========================================================
# Re-running Codegen will overwrite changes to this file
#========================================================
def main(args=None):
rclpy.init(args=args)
node = tcp_fan()
executor = MultiThreadedExecutor()
executor.add_node(node)
executor.spin()
rclpy.shutdown()

if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from collections import deque
from building_control_py_pkg.user_code.tcp_tempControl_src import *
from rclpy.callback_groups import ReentrantCallbackGroup
from building_control_py_pkg_interfaces.msg import Temperatureimpl
from building_control_py_pkg_interfaces.msg import FanAck
from building_control_py_pkg_interfaces.msg import SetPointimpl
from building_control_py_pkg_interfaces.msg import FanCmd
from building_control_py_pkg_interfaces.msg import Empty

#========================================================
# Re-running Codegen will overwrite changes to this file
#========================================================

class tcp_tempControl_base(Node):
def __init__(self):
super().__init__("tcp_tempControl")

self.cb_group_ = ReentrantCallbackGroup()

# Setting up connections
self.tcp_tempControl_currentTemp_subscription_ = self.create_subscription(
Temperatureimpl,
"tcp_tempControl_currentTemp",
self.handle_currentTemp,
1,
callback_group=self.cb_group_)

self.tcp_tempControl_fanAck_subscription_ = self.create_subscription(
FanAck,
"tcp_tempControl_fanAck",
self.handle_fanAck,
1,
callback_group=self.cb_group_)

self.tcp_tempControl_setPoint_subscription_ = self.create_subscription(
SetPointimpl,
"tcp_tempControl_setPoint",
self.handle_setPoint,
1,
callback_group=self.cb_group_)

self.tcp_tempControl_tempChanged_subscription_ = self.create_subscription(
Empty,
"tcp_tempControl_tempChanged",
self.event_handle_tempChanged,
1,
callback_group=self.cb_group_)

self.tcp_tempControl_fanCmd_publisher_ = self.create_publisher(
FanCmd,
"tcp_fan_fanCmd",
1)

self.currentTemp_msg_holder = None

def init_currentTemp(self, val):
self.currentTemp_msg_holder = val

def timeTriggered(self):
raise NotImplementedError("Subclasses must implement this method")

#=================================================
# C o m m u n i c a t i o n
#=================================================

def handle_currentTemp(self, msg):
self.currentTemp_msg_holder = msg

def get_currentTemp(self):
return self.currentTemp_msg_holder

def event_handle_tempChanged(self, msg):
self.handle_tempChanged()

def put_fanCmd(self, msg):
self.tcp_tempControl_fanCmd_publisher_.publish(msg)


#=================================================
# C o m p u t e E n t r y P o i n t
#=================================================
def handle_fanAck(self, msg):
raise NotImplementedError("Subclasses must implement this method")

def handle_setPoint(self, msg):
raise NotImplementedError("Subclasses must implement this method")

def handle_tempChanged(self):
raise NotImplementedError("Subclasses must implement this method")

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
from building_control_py_pkg.user_code.tcp_tempControl_src import tcp_tempControl
#========================================================
# Re-running Codegen will overwrite changes to this file
#========================================================
def main(args=None):
rclpy.init(args=args)
node = tcp_tempControl()
executor = MultiThreadedExecutor()
executor.add_node(node)
executor.spin()
rclpy.shutdown()

if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from collections import deque
from rclpy.callback_groups import ReentrantCallbackGroup
from building_control_py_pkg_interfaces.msg import Temperatureimpl
from building_control_py_pkg_interfaces.msg import Empty

#========================================================
# Re-running Codegen will overwrite changes to this file
#========================================================

class tcp_tempSensor_base(Node):
def __init__(self):
super().__init__("tcp_tempSensor")

self.cb_group_ = ReentrantCallbackGroup()

# Setting up connections
self.tcp_tempSensor_currentTemp_publisher_ = self.create_publisher(
Temperatureimpl,
"tcp_tempControl_currentTemp",
1)

self.tcp_tempSensor_tempChanged_publisher_ = self.create_publisher(
Empty,
"tcp_tempControl_tempChanged",
1)

# timeTriggered callback timer
self.periodTimer_ = self.create_timer(1, self.timeTriggered, callback_group=self.cb_group_)

def timeTriggered(self):
raise NotImplementedError("Subclasses must implement this method")

#=================================================
# C o m m u n i c a t i o n
#=================================================

def put_currentTemp(self, msg):
self.tcp_tempSensor_currentTemp_publisher_.publish(msg)

def put_tempChanged(self):
msg = Empty()
self.tcp_tempSensor_tempChanged_publisher_.publish(msg)

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
from building_control_py_pkg.user_code.tcp_tempSensor_src import tcp_tempSensor
#========================================================
# Re-running Codegen will overwrite changes to this file
#========================================================
def main(args=None):
rclpy.init(args=args)
node = tcp_tempSensor()
executor = MultiThreadedExecutor()
executor.add_node(node)
executor.spin()
rclpy.shutdown()

if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rosidl_runtime_py.convert import message_to_yaml
from building_control_py_pkg.base_code.tcp_fan_base_src import tcp_fan_base
from building_control_py_pkg_interfaces.msg import FanCmd
from building_control_py_pkg_interfaces.msg import FanAck
from building_control_py_pkg.base_code.enum_converter import *

#===========================================================
# This file will not be overwritten when re-running Codegen
#===========================================================
class tcp_fan(tcp_fan_base):
def __init__(self):
super().__init__()
# invoke initialize entry point
self.initialize()

self.get_logger().info("tcp_fan infrastructure set up")

#=================================================
# I n i t i a l i z e E n t r y P o i n t
#=================================================
def initialize(self):
self.get_logger().info("Initialize Entry Point invoked")

# Initialize the node

# Initialize the node's incoming data port values here


#=================================================
# C o m p u t e E n t r y P o i n t
#=================================================
def message_to_string(self, msg):
yaml_str = message_to_yaml(msg)
return yaml_str

def handle_fanCmd(self, msg):
# Handle fanCmd msg
self.get_logger().info(f"Received fanCmd: {self.message_to_string(msg)}")


#=================================================
# Include any additional declarations here
#=================================================
# Additions within these tags will be preserved when re-running Codegen

# Additions within these tags will be preserved when re-running Codegen
Loading