ROS 2 QoS
ROS 2 通訊層使用 DDS 的一大原因就是 DDS 有支援 QoS (Quality of Service)。 QoS 能夠指定通訊相關的設定,像是資料傳輸的可靠性、是否要保留歷史資料等等。 然而 DDS 所支援的 QoS 實在太多了,從官方規範就有 20 幾種,因此 ROS 2 只選擇跟機器人比較相關的其中六種出來:
QoS | 功能 |
---|---|
Reliability | 用來確認通訊的可靠性,有 best effort 和 reliable 兩種 |
History | 能夠保留多少通訊的歷史資料,可以選擇 keep_last N 或 keep_all |
Durability | 是否要將歷史資料提供給 late-joiner,可以選擇 volatile 或 transient_local |
Liveliness | 主動確認 node 是否存活 |
Lifespan | 設定資料的存活時間,如果超過時間未被收取將會捨棄,並且跳出警示 |
Deadline | 在連續的資料間,最多可以有多長的間隔時間 |
一個很重要的點是,各個 QoS 本身上是獨立運行不會受到其他人影響,舉例來說,reliability 的設定不會影響到 durability,甚至其他 QoS 的行為。 因此我們在看待這六個 QoS 的時候,可以當成六種可以調整的 config 來看帶。
官方有針對 Liveliness, Lifespan 和 Deadline 提供 ROS 2 demo,如果想要實際了解其中內容,可以直接操作看看。
RxO (Request vs Offered)
Publisher 和 Subscriber 要能夠通訊,兩者的 QoS 必須要先 compatible,也就是符合 RxO 才行。 一言以蔽之,RxO 就是 Publisher 的 QoS 設定必須要高於 Subscriber 才行。 舉個例子來說,Reliability 的 QoS 要能夠通訊,Publisher 和 Subscriber 必須要符合如下邏輯。
Publisher | Subscriber | Compatible |
---|---|---|
Best Effort | Best Effort | V |
Best Effort | Reliable | X |
Reliable | Best Effort | V |
Reliable | Reliable | V |
由於定義上 Reliable > Best Effort,所以如果 Publisher 是 Best Effort,Subscriber 是 Reliable,兩者就會跳出 incompatible QoS 的警告,並且無法通訊。
看到這邊一定會有個問題:要怎麼知道哪些設定是高,哪些設定是低? 除了靠直覺外 (Reliable 比起 Best Effort 來說要求更高),可以查詢 ROS 2 的 QoS Design Compatibility,或是直接去查詢 DDS 的說明,OpenSplice DDS 的文件就有詳細敘述。
實際範例
我放了一些範例在GitHub上,可以當作參考。
C++
C++ 的 QoS API 可以參考 rclcpp::QoS Class Reference
基本上作法是先 init QoS,然後在 create publisher / subscriber 的時候把 QoS 帶入即可。
- history, reliability, durability
這三者的設定還蠻容易的,所以就放一起來表示。
// 先 init QoS 的物件
rclcpp::QoS qos_settings(10);
// 如果要設定 keep_last 10, reliable, volatile
qos_settings.keep_last(10).reliable().durability_volatile();
// 如果要設定 keep_all, best_effort, transient local
qos_settings.keep_all().best_effort().transient_local();
// 然後再將 QoS 在 create publisher 時帶入即可
publisher_ = this->create_publisher<std_msgs::msg::String>("topic", qos_settings);
- liveliness
qos_settings.liveliness(RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC);
qos_settings.liveliness_lease_duration(std::chrono::milliseconds(2000));
rclcpp::SubscriptionOptions sub_options;
sub_options.event_callbacks.liveliness_callback =
[](rclcpp::QOSLivelinessChangedInfo & event)
{
printf("Liveliness changed event: \n");
printf(" alive_count: %d\n", event.alive_count);
printf(" not_alive_count: %d\n", event.not_alive_count);
printf(" alive_count_change: %d\n", event.alive_count_change);
printf(" not_alive_count_change: %d\n", event.not_alive_count_change);
};
- lifespan
可參考 官方 lifespan demo
- deadline
可參考 官方 deadline demo
qos_settings.deadline(std::chrono::milliseconds(2000));
// publisher callback
rclcpp::PublisherOptions pub_options;
pub_options.event_callbacks.deadline_callback =
[](rclcpp::QOSDeadlineOfferedInfo & event) -> void
{
printf("Offered deadline missed: total %d delta %d\n", event.total_count, event.total_count_change);
};
auto publisher_ = this->create_publisher<std_msgs::msg::String>("topic", qos_settings, pub_options);
// subscriber callback
rclcpp::SubscriptionOptions sub_options;
sub_options.event_callbacks.deadline_callback =
[](rclcpp::QOSDeadlineRequestedInfo & event) -> void
{
printf("Requested deadline missed: total %d delta %d\n", event.total_count, event.total_count_change);
};
auto subscription_ = this->create_subscription<std_msgs::msg::String>("topic", qos_settings, std::bind(&MinimalSubscriber::topic_callback, this, _1), sub_options);
Python
Python 的 QoS API 可以參考 rclpy Quality of Service
官方建議設定
官方有些建議的 QoS 設定,可參考 qos_profiles.h 舉個例子來說,sensor data 並不在乎資料的 reliable,所以會設定為 best effort,盡可能快速傳資料。 過去 sensor data 的歷史資料也不重要,所以 durability 的部分用 volatile 即可。