change rtps_handler output format

While previously we sent the EntityID before the GUID Prefix (for some
reason), we changed swaped the order to allow easier decoding (by having
uniform format for all inputs)
This commit is contained in:
Greek 2020-12-18 16:19:57 +01:00
parent 3566eb1282
commit a00f942826
5 changed files with 132 additions and 77 deletions

View File

@ -45,8 +45,8 @@ Note that the processing of this message uses the reply locators in the RTPS Rec
This is the only source of information for the StatelessWriter to determine where to send the reply to.
Proper functioning of the protocol requires that the RTPS Reader inserts an InfoReply Submessage ahead of the AckNack such that these fields are properly set.
Writer Liveness Protocol
========================
Writer Liveliness Protocol
==========================
ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER
ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER
@ -96,14 +96,14 @@ ENDPOINT FIFO PACKET FORMAT
+-------------+---------------+---------------+---------------+
| SRC_IPv4_ADDR |
+-------------------------------------------------------------+
| SRC_ENTITYID |
+-------------------------------------------------------------+
| |
+ +
| SRC_GUIDPREFIX |
+ +
| |
+-------------------------------------------------------------+
| SRC_ENTITYID |
+-------------------------------------------------------------+
| DEST_ENTITYID [only for Builtin Destinations] |
+-------------------------------------------------------------+
| |
@ -184,9 +184,6 @@ BUILT-IN ENDPOINTS
The QoS of the built-in Subscriber and DataReader objects is given by the following table:
HISTORY: kind = KEEP_LAST, depth = 1
Since the Built-In Endpoints have a history depth of 1, we can safely reduce the processing complexity of ACKNACK and
HEARTBEAT messages (and even ignore GAP messages).
PARTICICPANT DATA
=================
31............24..............16..............8...............0
@ -309,34 +306,39 @@ LOCAL ENDPOINT BUFFER
31............24..............16..............8...............0
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+-------------------------------------------------------------+
| |
00| ENTITYID |
+-------------------------------------------------------------+
01| |
+ +
| GUIDPREFIX |
02| GUIDPREFIX |
+ +
| |
03| |
+-------------------------------------------------------------+
| ENTITYID |
04| IPv4_ADDRESS | [Reliable only]
+-------------------------------------------------------------+
| IPv4_ADDRESS |
05| UDP_PORT | UNUSED |Q| [Reliable only]
+-------------------------------------------------------------+
| UDP_PORT | EXTRA_FLAGS |
06| |
+ HIGHEST_SEQ_NR +
07| |
+-------------------------------------------------------------+
| LIFESPAN |
+ +
| (READER_ONLY) |
08| |
+ LEASE_DEADLINE +
09| |
+-------------------------------------------------------------+
OUTPUT DATA
===========
31............24..............16..............8...............0
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
01| SRC_IPv4_ADDRESS |
00| SRC_IPv4_ADDRESS |
+-------------------------------------------------------------+
02| DEST_IPv4_ADDRESS |
01| DEST_IPv4_ADDRESS |
+-------------------------------------------------------------+
03| SRC_UDP_PORT | DEST_UDP_PORT |
02| SRC_UDP_PORT | DEST_UDP_PORT |
+-------------------------------------------------------------+
04| |
03| |
~ RTPS_MESSAGE ~
| |
+-------------------------------------------------------------+
@ -350,43 +352,63 @@ Using the key hash benefits implementations by providing a faster alternative th
MISC
====
8.2.9 Relation to DDS Entities
8.2.9 Relation to DDS Entities (DDSI-RTPS)
How exactly a DDS Entity interacts with the HistoryCache however, is implementation specific and not
formally modeled by the RTPS protocol. Instead, the Behavior Module of the RTPS protocol only specifies how
CacheChange changes are transferred from the HistoryCache of the RTPS Writer to the HistoryCache of
each matching RTPS Reader.
8.2.9.1
8.2.9.1 (DDSI-RTPS)
When using strict reliable communication, a change can
only be removed when it has been acknowledged by all readers the change was sent to and which are still
active and alive.
8.2.9.2
8.2.9.2 (DDSI-RTPS)
Each matching Writer will attempt to transfer all relevant samples from its HistoryCache to the HistoryCache of the Reader.
8.4.3
8.4.3 (DDSI-RTPS)
It is also not able to drop out-of-order samples on the Reader side as this
requires keeping track of the largest sequence number received from each remote Writer.
--> Suggests there is multiple writers
8.5.4.2
8.5.4.2 (DDSI-RTPS)
For the purpose of interoperability, it is sufficient that an
implementation provides the required built-in Endpoints and reliable communication that satisfies the general
requirements listed in 8.4.2.
8.5.4.4
8.5.4.4 (DDSI-RTPS)
An implementation of the protocol need not necessarily send all information contained in the DataTypes. If any
information is not present, the implementation can assume the default values, as defined by the PSM.
8.7.2.2.6
8.7.2.2.6 (DDSI-RTPS)
In order to implement the DDS_BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS policy,
implementations must include an InfoTimestamp Submessage with every update from a Writer.
9.4.2.11
9.4.2.11 (DDSI-RTPS)
The ParameterList may contain multiple Parameters with the same value for the parameterId.
This is used to provide a collection of values for that kind of Parameter.
DDS_Advanced_Tutorial_2006_00-T1-2_Pardo.pdf (P.16)
DDS_Advanced_Tutorial_2006_00-T1-2_Pardo.pdf
P.16
OpenDDS Developer's Guide
It should be noted that if multiple data writers write to the same instance, care should be taken to ensure that clocks are synchronized to prevent incorrect ordering on the data reader.
https://community.rti.com/forum-topic/dropping-out-order-messages
Messages from a single DataWriter always presserve their order when put into the DataReader cache. The middlware would drop a message rather than accept it out of order.
If you call the regular read/take APIs then you will get them in the same order as they are in the cache which wil preserve the writer order. But if you start taking them using QueryConditions or read by insance etc. Then you will could be skipping some messages that are in the DataReader cache and access them later out of order.
In "Best efforts" there is a little bit of effort to ensure there are no duplicates or our-of-order samples. Basically the DataReader keeps the higuest accepted sequence number from each DataWriter. If a sample arrives with sequence number less or equal to that higuest accepted sequence number it will be dropped. This avoids both duplicates and out of order samples.
https://community.rti.com/forum-topic/strict-reliability-and-destination-order-qos
The samples written by each DataWriter are guaranteed to have timestamps that are monotonically increasing (each equal or greater than the previous) and this the DataReader will not reject them based on source timestamp. The fact that a sample may be dropped on the wire and the reliable protocol may repair it does not affect this fact. The out-of-order sample will be staged in the "reliability queue" of the DataReader until the repair comes and by the time they are pushed to the DataReader cache they will be pushed in the correct order and the source timestamps will not cause a problem.
https://community.rti.com/static/documentation/connext-dds/5.2.3/doc/manuals/connext_dds/html_files/RTI_ConnextDDS_CoreLibraries_UsersManual/Content/UsersManual/The_SampleInfo_Structure.htm
In reliable communication, if DDS data samples are received out received of order, Connext DDS will not deliver them until all the previous DDS data samples have been received. For example, if DDS sample 2 arrives before DDS sample 1, DDS sample 2 cannot be delivered until DDS sample 1 is received. The reception_timestamp is the time when all previous DDS samples has been received—the time at which the DDS sample is committed. If DDS samples are all received in order, the committed time will be same as reception time. However, if DDS samples are lost on the wire, then the committed time will be later than the initial reception time.
2.2.3.16 LIFESPAN (DDS)
This QoS relies on the sender and receiving applications having their clocks sufficiently synchronized. If this is not the case
and the Service can detect it, the DataReader is allowed to use the reception timestamp instead of the source timestamp in its
computation of the expiration time.
INVALIDATION
============

View File

@ -13,6 +13,7 @@
computation of the expiration time.
2.2.3.17
The mechanism to set the source timestamp is middleware dependent.
- Well, it has to be synchronized with every writer updating the same instance.
* Are the Builtin Endpoints part of the DDS Specification or the RTPS specification?
- Both
* ALL Data Fields are sent on each change:
@ -21,6 +22,7 @@
* Does a RTPS Reader subscribe and receive ALL Instances of a keyed Topic?
i.e. where are the instances differentiated?
S.29 - Topic Kind
- When a reader "subscribes" to a topic, it receives all the instances of the matched writers (May not be ALL instances of the topic in general)
* Does a RTPS reader subscribe to more than one Writer (If not using Groups)?
- Yes
* Since only positive Sequence Numbers are valid, why is the type signed?
@ -47,15 +49,24 @@
But then, in 8.4.15.7 it says:
"So, an implementation should ensure that same logical HEARTBEATs are tagged with the same Count."
Does that mean there are cases were I have to put the same count? What is a logical HEARTBEAT?
* Should a "Keyed" Endpoint communicate with a "Non-Keyed"?
* Should a "Keyed" Endpoint communicate with a "Non-Keyed"? (In the sense of Entity Kind)
* Is the empty String a valid Topic and Type Name?
* We can determine if a Endpoint is a Reader or Writer via the Entity ID. Is it illegal to get a SEDP with incompatible source (Reader Entity ID from Publications Announcer?)
* Can we make an array of records of uncontrained strings? That we we could make an array of variable sized strings...
* Should I also check for Minor_Version >= 4?
- Yes: 8.6 Implementations of this version of the RTPS protocol should be able to process RTPS Messages not only with the same major version but possibly higher minor versions.
* If a DATA Submessage is invalid in any way, the Sequence Number is never marked as received, and thus processing of remote Endpoints could stall on corrupt Messages.
* If we have a memory collision during an Endpoint insertion, the Sequence Number anouncing the Endpoint is marked as processed,
but the endpoint will never be inserted even when later on there is memory space (Except if the endpoint send a DATA message we a newer Sequence Number)
* Can a Participant unmatch an Endpoint by marking it's announcing sequence number in a GAP message?
* Is DEADLINE per-INSTANCE or per-INSTANCE-and-WRITER?
- Since the matching is per-WRITER the assumption would be per-INSTANCE-and-WRITER
* Only a sub-part of the DDS QOS are actually relevant for the RTPS. Should I remove the QoS Specifications from the RTPS Package?
* What happens if we get a sample with a source timestamp earlier than the last sample that was accessed by the DataReader when using DESTINATION ORDER BY_SOURCE_TIMESTAMP? Is the smaple dropped?
* The spec does not define the serialized Key (KEY=1 DATA MESSAGE)
- fast-rtps assumes it is the Key Hash
-
* Currently the builtin-endpoint does only acknowledge SN, but does not negatively acknowledge any SN (Bitamp is always empty).
A writer usually responds with repqirs only to negative acknowledgements.
* Fast-RTPS doen not follow DDSI-RTPS Specification
- Open Github Issue
@ -63,6 +74,10 @@
- Seems that Fast-RTPS is also not checking the Validity of Submessages according to Spec
* DDSI-RTPS 2.3 ISSUES
- 8.2.2.4 get_seq_num_min
8.2.2.5 get_seq_num_max
This asume a history cache with duplicate-free sequence numbers, but because sequence number are generated per writer, and a reader can be matched with mutliple writers, we can get duplicate sequence numbers of different changes from different writers.
Ergo the functions are non-deterministic.
- 8.3.7.7 InfoDestination
'This message is sent from an RTPS Writer to an RTPS Reader to modify the GuidPrefix used to interpret the Reader entityIds appearing in the Submessages that follow it.'
But state is changed as follows 'Receiver.destGuidPrefix = InfoDestination.guidPrefix'.
@ -97,6 +112,7 @@
* DDS 1.4 ISSUES
- 2.2.3 Supported QoS
Partition is marked as RxO=No, but should be RxO=Yes? Or not?
- Existing Issue: https://issues.omg.org/issues/DDS15-245
* Source Port of SPDP is irrelevant, since it is BEST EFFORT and we do not reply (only Destination Port is of significance)
@ -138,9 +154,21 @@ DESIGN DECISIONS
* The HEARTBEATs are sent out together with the liveliness assertions. This adds a 96-Byte overhead to the output RTPS Message.
This was done to prevent having to loop through the memory to find remote participant destination more than once.
* The Publisher, Subscriber, and Message Data is written on separate RTPS Messages, even though they are sent simutanously.
This decision was made to support as many local Endpoints as possible. We could make a compile-time if check and sent them in
the same RTPS Message/UDP Packet, but the overhead is quite small and not worth the hassle.
* The Publisher, Subscriber, and Message Data is written on separate RTPS Messages, even though they are
sent simutanously. This decision was made to support as many local Endpoints as possible. We could
make a compile-time if check and sent them in the same RTPS Message/UDP Packet, but the overhead is
quite small and not worth the hassle.
* Even though the Reader does not need to keep track of received SN with respect to each Writer with
exception of the Highest/Last received (since it only keeps the SN in order and does only need to
request from the last stored SN on), the writer does need to keep track of the requested SN (and
possibly also the acknowledgements).
This could be solved by either storing the SN in a bitmap in the endpoint data, or be storing the
requester bitmap (endpoint data address) in the change data.
But since the writer might drop SN in any order, the highest and lowest SN inside the cache history
is unbounded. We can thus only reference to still available SN, and not to GAPs.
In order to acoomodate for that, we could store the lowest (and possibly highest) SN of a requested
lost SN and always send ALL GAPs in that range.
PROTOCOL UNCOMPLIANCE
=====================
@ -155,3 +183,9 @@ PROTOCOL UNCOMPLIANCE
-- Input FIFO Guard
-- Output FIFO Guard
-- Deferred to package Body
RTPS ENDPOINT
=============
* 8.2.6
topicKind Used to indicate whether the Endpoint supports instance lifecycle management operations (see 8.7.4).

View File

@ -9,8 +9,7 @@ use work.rtps_config_package.all;
-- TODO: Initialise RAM to zeroes
-- TODO: Skip Packet while we are waiting for memory operation to complete
-- TODO: add_res is still valid after removal. So we could continue searching the next orphan endpoint from that address on
-- TODO: is it really necessary to process Sequence Numbers in sequence? Could we not just accept any higher Sequence Number?
-- FIXME: ACNACK response does not nack, which is necessary to get the missing SNs.
entity rtps_builtin_endpoint is
port (
@ -439,8 +438,8 @@ begin
-- IDLE Idle state. Initiates Participant Announcements, Heartbeat/Liveliness Assertions, Stale Participant Entry Checks ,and Packet Processing, in that priority order.
-- PACKET_HEADER Read the Endpoint FIFO Packet Format Header (RTPS Submessage ID/Opcode, Submessage Flags, Source UDPv4 Port)
-- PACKET_SRC_ADDR Read Source IPv4 Address
-- PACKET_SRC_ENTITYID Read Source Entity ID
-- PACKET_SRC_GUIDPREFIX Read Source GUID Prefix
-- PACKET_SRC_ENTITYID Read Source Entity ID
-- PACKET_DEST_ENTITYID Read Destination Entity ID. Determine content of Packet (Participant Data, Publisher/Subscriber/Message Data/GAP/HEARTBEAT/ACKNACK) and initiate processing.
-- CHECK_SRC_ENTITYID Read Source Entity ID. Determine content of Packet (Participant Data, Publisher/Subscriber/Message Data/GAP/HEARTBEAT/ACKNACK) and initiate processing.
-- This state is taken if the Packet content could not be determined in the PACKET_DEST_ENTITYID state.
@ -680,15 +679,6 @@ begin
-- Latch Source IP Address
src_addr_next <= data_in;
stage_next <= PACKET_SRC_ENTITYID;
end if;
when PACKET_SRC_ENTITYID =>
-- Input FIFO Guard
if (empty = '0') then
rd_guard := '1';
-- Latch Source Entity ID
src_entityid_next <= data_in;
stage_next <= PACKET_SRC_GUIDPREFIX;
cnt_next <= 0;
end if;
@ -711,11 +701,20 @@ begin
mem_opcode <= SEARCH_PARTICIPANT;
mem_op_start <= '1';
stage_next <= PACKET_DEST_ENTITYID;
stage_next <= PACKET_SRC_ENTITYID;
when others =>
null;
end case;
end if;
when PACKET_SRC_ENTITYID =>
-- Input FIFO Guard
if (empty = '0') then
rd_guard := '1';
-- Latch Source Entity ID
src_entityid_next <= data_in;
stage_next <= PACKET_DEST_ENTITYID;
end if;
when PACKET_DEST_ENTITYID =>
-- Input FIFO Guard
if (empty = '0') then

View File

@ -1005,22 +1005,22 @@ begin
when 1 =>
data_out <= src_addr;
wr_sig <= '1';
-- Source Entity ID
when 2 =>
data_out <= src_entityid;
wr_sig <= '1';
-- GUID Prefix 1/3
when 3 =>
when 2 =>
data_out <= src_guidprefix(0);
wr_sig <= '1';
-- GUID Prefix 2/3
when 4 =>
when 3 =>
data_out <= src_guidprefix(1);
wr_sig <= '1';
-- GUID Prefix 3/3
when 5 =>
when 4 =>
data_out <= src_guidprefix(2);
wr_sig <= '1';
-- Source Entity ID
when 5 =>
data_out <= src_entityid;
wr_sig <= '1';
-- Destination ID [only for Built-in Endpoints]
when 6 =>
if (builtin_endpoint = '1') then

View File

@ -846,6 +846,13 @@ package body rtps_test_package is
-- Source Address
output.data(output.length) := loc.addr(IPv4_ADDRESS_WIDTH-1 downto 0);
output.length := output.length + 1;
-- Source GUID Prefix
output.data(output.length) := src_guid(0);
output.length := output.length + 1;
output.data(output.length) := src_guid(1);
output.length := output.length + 1;
output.data(output.length) := src_guid(2);
output.length := output.length + 1;
-- Source Entity ID
if (ref.submessageID = SID_ACKNACK or ref.submessageID = SID_NACK_FRAG) then
output.data(output.length) := ref.readerId;
@ -854,13 +861,6 @@ package body rtps_test_package is
output.data(output.length) := ref.writerId;
output.length := output.length + 1;
end if;
-- Source GUID Prefix
output.data(output.length) := src_guid(0);
output.length := output.length + 1;
output.data(output.length) := src_guid(1);
output.length := output.length + 1;
output.data(output.length) := src_guid(2);
output.length := output.length + 1;
-- Destination Enity ID
if (is_meta) then
if (ref.submessageID = SID_ACKNACK or ref.submessageID = SID_NACK_FRAG) then