* Implemented Participant Message Handling

This commit is contained in:
Greek 2020-10-21 19:40:20 +02:00
parent 386fd38aa6
commit 0bfedc82df
4 changed files with 463 additions and 77 deletions

308
src/REF.txt Normal file
View File

@ -0,0 +1,308 @@
RULES 8.4.2
===========
GENERAL
-------
* All communications must take place using RTPS Messages
* All implementations must implement the RTPS Message Receiver
* The timing characteristics of all implementations must be tunable
* Implementations must implement the Simple Participant and Endpoint Discovery Protocols
WRITER
------
* Writers must not send data out-of-order
* Writers must include in-line QoS values if requested by a Reader
* Writers must send periodic HEARTBEAT Messages (reliable only)
* Writers must eventually respond to a negative acknowledgment (reliable only)
* Sending Heartbeats and Gaps with Writer Group Information (Writer belonging to a Group)
READER
------
A best-effort Reader is completely passive as it only receives data and does not send messages itself.
Therefore, the requirements below only apply to reliable Readers.
* Readers must respond eventually after receiving a HEARTBEAT with final flag not set
* Readers must respond eventually after receiving a HEARTBEAT that indicates a sample is missing
* Once acknowledged, always acknowledged
* Readers can only send an ACKNACK Message in response to a HEARTBEAT Message
RELIABILITY
===========
* Best Effort Writer can only be matched with Best Effort Reader
* Stateless Reader can only be Best Effort (maintains absolutely no state, does not handle duplicate and out-of-order changes)
STATELESS WRITER
================
Note that the processing of this message uses the reply locators in the RTPS Receiver.
This is the only source of information for the StatelessWriter to determine where to send the reply to.
Proper functioning of the protocol requires that the RTPS Reader inserts an InfoReply Submessage ahead of the AckNack such that these fields are properly set.
Writer Liveness Protocol
========================
ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER
ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER
OPTIONAL 8.4.14
===============
Optional features may not be supported by all RTPS implementations.
* LARGE DATA (Fragmented Data)
--------------------------------------------
ENTITYID_UKNOWN also for Built-In?
Ignore Participant/Topic/Publication/Subscription (handle argument of Sampleinfo)
ENDIANNESS
==========
You have to see what datatypes PSM maps to each element.
If the datatype is bigger than a byte, byte swaping has to occur.
The elements of an array are in order (but the elements themselves may need to be swapped if bigger than a Byte)
ENDPOINT FIFO PACKET FORMAT
===========================
31............24..............16..............8...............0
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+-------------+---------------+---------------+---------------+
| OPCODE | FLAGS | SRC_UDP_PORT |
+-------------+---------------+---------------+---------------+
| SRC_IPv4_ADDR |
+-------------------------------------------------------------+
| SRC_ENTITYID |
+-------------------------------------------------------------+
| |
+ +
| SRC_GUIDPREFIX |
+ +
| |
+-------------------------------------------------------------+
| DEST_ENTITYID [only for Builtin Destinations] |
+-------------------------------------------------------------+
| |
+ Sequence Number [only for DATA Submessage] +
| |
+-------------------------------------------------------------+
| |
~ PAYLOAD (SUBMESSAGE CONTENT) ~
| |
+-------------------------------------------------------------+
ENDPOINT_ID
===========
0...MAX_ENDPOINTS
READERS...WRITERS
BUILT-IN ENDPOINTS
==================
2.2.5 Built-In Topics
The QoS of the built-in Subscriber and DataReader objects is given by the following table:
HISTORY: kind = KEEP_LAST, depth = 1
Since the Built-In Endpoints have a history depth of 1, we can safely reduce the processing complexity of ACKNACK and
HEARTBEAT messages (and even ignore GAP messages).
PARTICICPANT DATA
=================
31............24..............16..............8...............0
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+-------------------------------------------------------------+
01| |
+ +
02| GUIDPREFIX |
+ +
03| |
+-------------------------------------------------------------+
04| META_IPv4_ADDRESS |
+-------------------------------------------------------------+
05| DEFAULT_IPv4_ADDRESS |
+-------------------------------------------------------------+
06| META_UDP_PORT | DEFAULT_UDP_PORT |
+-------------------------------------------------------------+
07| |
+ SPDP_SEQ_NR +
08| |
+-------------------------------------------------------------+
09| |
+ LEASE_DURATION +
10| |
+-------------------------------------------------------------+
11| |
+ LEASE_DEADLINE +
12| |
+-------------------------------------------------------------+
13|P|S|M| EXTRA_FLAGS |Q|
+-------------------------------------------------------------+
14| |
+ ACKNACK_RES_TIME +
15| |
+-------------------------------------------------------------+
16| |
+ HEARTBEAT_RES_TIME +
17| |
+-------------------------------------------------------------+
18| |
+ PUBLICATION_SEQ_NR +
19| |
+-------------------------------------------------------------+
20| |
+ SUBSCRIPTION_SEQ_NR +
21| |
+-------------------------------------------------------------+
22| |
+ MESSAGE_SEQ_NR +
23| |
+-------------------------------------------------------------+
ENDPOINT DATA
=============
31............24..............16..............8...............0
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+-------------------------------------------------------------+
01| ENTITYID |
+-------------------------------------------------------------+
02| |
+ +
03| GUIDPREFIX |
+ +
04| |
+-------------------------------------------------------------+
05| |
~ ENDPOINT_BITMASK ~
**| |
+-------------------------------------------------------------+
ENDPOINT MATCH FRAME
====================
31............24..............16..............8...............0
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+-------------------------------------------------------------+
01| OPCODE |
+-------------------------------------------------------------+
02| |
+ +
03| GUIDPREFIX |
+ +
04| |
+-------------------------------------------------------------+
05| ENTITYID |
+-------------------------------------------------------------+
06| IPv4_ADDRESS |
+-------------------------------------------------------------+
07| UDP_PORT | EXTRA_FLAGS |Q|
+-------------------------------------------------------------+
ENDPOINT UNMATCH FRAME
======================
31............24..............16..............8...............0
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+-------------------------------------------------------------+
01| OPCODE |
+-------------------------------------------------------------+
02| |
+ +
03| GUIDPREFIX |
+ +
04| |
+-------------------------------------------------------------+
05| ENTITYID |
+-------------------------------------------------------------+
ENDPOINT LIVELINESS UPDATE
==========================
31............24..............16..............8...............0
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+-------------------------------------------------------------+
01| OPCODE |
+-------------------------------------------------------------+
02| |
+ +
03| GUIDPREFIX |
+ +
04| |
+-------------------------------------------------------------+
LOCAL ENDPOINT BUFFER
=====================
31............24..............16..............8...............0
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+-------------------------------------------------------------+
| |
+ +
| GUIDPREFIX |
+ +
| |
+-------------------------------------------------------------+
| ENTITYID |
+-------------------------------------------------------------+
| IPv4_ADDRESS |
+-------------------------------------------------------------+
| UDP_PORT | EXTRA_FLAGS |
+-------------------------------------------------------------+
| LIFESPAN |
+ +
| (READER_ONLY) |
+-------------------------------------------------------------+
OUTPUT DATA
===========
31............24..............16..............8...............0
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
01| SRC_IPv4_ADDRESS |
+-------------------------------------------------------------+
02| DEST_IPv4_ADDRESS |
+-------------------------------------------------------------+
03| SRC_UDP_PORT | DEST_UDP_PORT |
+-------------------------------------------------------------+
04| |
~ RTPS_MESSAGE ~
| |
+-------------------------------------------------------------+
TOPIC KEYS
==========
Nominally the key is part of the serialized data of a data submessage.
Using the key hash benefits implementations by providing a faster alternative than deserializing the full key from the received data-object.
MISC
====
8.2.9 Relation to DDS Entities
How exactly a DDS Entity interacts with the HistoryCache however, is implementation specific and not
formally modeled by the RTPS protocol. Instead, the Behavior Module of the RTPS protocol only specifies how
CacheChange changes are transferred from the HistoryCache of the RTPS Writer to the HistoryCache of
each matching RTPS Reader.
8.2.9.1
When using strict reliable communication, a change can
only be removed when it has been acknowledged by all readers the change was sent to and which are still
active and alive.
8.2.9.2
Each matching Writer will attempt to transfer all relevant samples from its HistoryCache to the HistoryCache of the Reader.
8.4.3
It is also not able to drop out-of-order samples on the Reader side as this
requires keeping track of the largest sequence number received from each remote Writer.
--> Suggests there is multiple writers
8.5.4.2
For the purpose of interoperability, it is sufficient that an
implementation provides the required built-in Endpoints and reliable communication that satisfies the general
requirements listed in 8.4.2.
8.5.4.4
An implementation of the protocol need not necessarily send all information contained in the DataTypes. If any
information is not present, the implementation can assume the default values, as defined by the PSM.
8.7.2.2.6
In order to implement the DDS_BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS policy,
implementations must include an InfoTimestamp Submessage with every update from a Writer.
DDS_Advanced_Tutorial_2006_00-T1-2_Pardo.pdf (P.16)

View File

@ -40,7 +40,7 @@
* 32-bit Timestamps? Seriously? Ever heard of Y2k38?
* Use generic package with unconstrained arrays (VHDL-2008), and assert bounds/length inside the package.
* Count repository lines
git ls-files | grep vhd | xargs wc -l
git ls-files | grep .vhd | xargs wc -l
* Count Field in Heartbeat/Acknack
The following sentence is quite self-explanatory:
"A counter that is incremented each time a new message is sent. Provides the means to detect

View File

@ -343,6 +343,10 @@ begin
if (wr_sig = '1') then
endpoint_wr <= endpoint_unmatch;
end if;
when LIVELINESS_UPDATE =>
if (wr_sig = '1') then
endpoint_wr <= endpoint_mask;
end if;
when others =>
null;
end case;
@ -534,7 +538,7 @@ begin
is_subscriber_next <= '0';
-- Reset Endpoint Mask (ALL READERS)
endpoint_mask_next <= (others => '0');
endpoint_mask_next(MAX_ENDPOINTS-1 downto MAX_ENDPOINTS-NUM_READERS) <= (others => '1');
endpoint_mask_next(0 to NUM_READERS-1) <= (others => '1');
end case;
end if;
when ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_ANNOUNCER =>
@ -565,10 +569,12 @@ begin
is_subscriber_next <= '1';
-- Reset Endpoint Mask (ALL WRITERS)
endpoint_mask_next <= (others => '0');
endpoint_mask_next(NUM_WRITERS-1 downto 0) <= (others => '1');
endpoint_mask_next(NUM_READERS to NUM_READERS+NUM_WRITERS-1) <= (others => '1');
end case;
end if;
when ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER =>
-- SANITY CHECK: Ignore if no Writers
if (NUM_WRITERS /= 0) then
-- Only ACKNACKs are relevant
if (opcode = SID_ACKNACK) then
message_type_next <= MESSAGE;
@ -576,7 +582,10 @@ begin
-- Initialise counter
cnt_next <= 0;
end if;
end if;
when ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER =>
-- SANITY CHECK: Ignore if no Readers
if (NUM_READERS /= 0) then
-- Only HEARTBEATs and DATA are relevant (GAPs irrelevant, since depth is 1)
case (opcode) is
when SID_HEARTBEAT =>
@ -589,7 +598,11 @@ begin
stage_next <= LATCH_SEQ_NR; -- DATA Processing
-- Initialise counter
cnt_next <= 0;
-- Reset Endpoint Mask (ALL READERS)
endpoint_mask_next <= (others => '0');
endpoint_mask_next(0 to NUM_READERS-1) <= (others => '1');
end case;
end if;
when others =>
null;
end case;
@ -640,7 +653,7 @@ begin
is_subscriber_next <= '0';
-- Reset Endpoint Mask (ALL READERS)
endpoint_mask_next <= (others => '0');
endpoint_mask_next(MAX_ENDPOINTS-1 downto MAX_ENDPOINTS-NUM_READERS) <= (others => '1');
endpoint_mask_next(0 to NUM_READERS-1) <= (others => '1');
end case;
end if;
when ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_DETECTOR =>
@ -671,10 +684,12 @@ begin
is_subscriber_next <= '1';
-- Reset Endpoint Mask (ALL WRITERS)
endpoint_mask_next <= (others => '0');
endpoint_mask_next(NUM_WRITERS-1 downto 0) <= (others => '1');
endpoint_mask_next(NUM_READERS to NUM_READERS+NUM_WRITERS-1) <= (others => '1');
end case;
end if;
when ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER =>
-- SANITY CHECK: Ignore if no Writers
if (NUM_WRITERS /= 0) then
-- Only ACKNACKs are relevant
if (opcode = SID_ACKNACK) then
message_type_next <= MESSAGE;
@ -682,7 +697,10 @@ begin
-- Initialise counter
cnt_next <= 0;
end if;
end if;
when ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER =>
-- SANITY CHECK: Ignore if no Readers
if (NUM_READERS /= 0) then
-- Only HEARTBEATs and DATA are relevant (GAPs irrelevant, since depth is 1)
case (opcode) is
when SID_HEARTBEAT =>
@ -695,7 +713,11 @@ begin
stage_next <= LATCH_SEQ_NR; -- DATA Processing
-- Initialise counter
cnt_next <= 0;
-- Reset Endpoint Mask (ALL READERS)
endpoint_mask_next <= (others => '0');
endpoint_mask_next(0 to NUM_READERS-1) <= (others => '1');
end case;
end if;
when others =>
null;
end case;
@ -750,6 +772,61 @@ begin
end case;
end if;
end if;
when PROCESS_MESSAGE =>
if (empty = '0') then
rd_sig <= '1';
--Increment Counter
cnt_next <= cnt + 1;
case (cnt) is
-- Check if GUID Prefix valid (Should be the same as the GUID Prefix of the Packet)
-- GUID Prefix 1/3
when 0 =>
if (data_in /= guid(0)) then
stage_next <= SKIP_PACKET;
end if;
-- GUID Prefix 2/3
when 1 =>
if (data_in /= guid(1)) then
stage_next <= SKIP_PACKET;
end if;
-- GUID Prefix 3/3
when 2 =>
if (data_in /= guid(2)) then
stage_next <= SKIP_PACKET;
end if;
-- Participant Mesasge Kind
when 3 =>
-- XXX: The Participant Message Data may contain additional data, and according to DDSI-RTPS 2.3
-- implementations must be able to support up to 128 Bytes of additional data. Due to the unnecessary
-- high complexity this would add to this entity this is not supported.
case (data_in) is
-- Automatic Liveliness Assertion
when PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE =>
-- Unmark all readers without automatic liveliness QoS
for i in 0 to NUM_READERS-1 loop
if (AUTOMATIC_LIVELINESS_QOS /= ENDPOINT_LIVELINESS(i)) then
endpoint_mask_next(i) <= '0';
end if;
end loop;
stage_next <= LIVELINESS_UPDATE;
cnt_next <= 0;
-- Manual by Participant Liveliness Assertion
when PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE =>
-- Unmark all readers without manual by participant liveliness QoS
for i in 0 to NUM_READERS-1 loop
if (MANUAL_BY_PARTICIPANT_LIVELINESS_QOS /= ENDPOINT_LIVELINESS(i)) then
endpoint_mask_next(i) <= '0';
end if;
end loop;
stage_next <= LIVELINESS_UPDATE;
cnt_next <= 0;
-- No Liveliness Update, skip packet
when others =>
stage_next <= SKIP_PACKET;
end case;
end case;
end if;
when PROCESS_PL =>
if (empty = '0') then
rd_sig <= '1';
@ -1859,23 +1936,7 @@ begin
-- Count
when 28 =>
output_sig <= count;
-- Signal Last Word
last_word_out <= '1';
-- DONE
stage_next <= IDLE;
end case;
end if;
when PROCESS_ACKNACK =>
if (empty = '0') then
rd_sig <= '1';
--Increment Counter
cnt_next <= cnt + 1;
-- Latch Sequence Number
-- NOTE: Because we always sent the entire history cache, we only need to look at the SequenceNumberSetBase to determine if we need to sent data or not
case (cnt) is
when 0 =>
first_seq_nr_next(0) <= data_in;
when 1 =>
-- Signal Last Word7
first_seq_nr_next(1) <= data_in;
stage_next <= PROCESS_ACKNACK_SEQUENCE_NUMBERS
end case;
@ -1993,9 +2054,7 @@ begin
-- If Publisher Data not scheduled for response or no Writers available, skip
if (mem_participant_data.extra_flags(PUB_DATA_FLAG) = '0' or NUM_WRITERS = 0) then
stage_next <= SEND_SUB_DATA;
end if;
if (rtps_full = '0') then
elsif (rtps_full = '0') then
wr_sig <= '1';
--Increment Counter
cnt_next <= cnt + 1;
@ -2012,9 +2071,7 @@ begin
-- If Subscriber Data not scheduled for response or no Readers available, skip
if (mem_participant_data.extra_flags(PUB_DATA_FLAG) = '0' or NUM_READERS = 0) then
stage_next <= SEND_MES_DATA;
end if;
if (rtps_full = '0') then
elsif (rtps_full = '0') then
wr_sig <= '1';
--Increment Counter
cnt_next <= cnt + 1;
@ -2031,9 +2088,7 @@ begin
-- If Message Data not scheduled for response, skip
if (mem_participant_data.extra_flags(MES_DATA_FLAG) = '0') then
stage_next <= IDLE;
end if;
if (rtps_full = '0') then
elsif (rtps_full = '0') then
wr_sig <= '1';
--Increment Counter
cnt_next <= cnt + 1;
@ -2205,6 +2260,28 @@ begin
stage_next <= IDLE;
end case;
end if;
when LIVELINESS_UPDATE =>
-- If no endpoint interested for liveliness, skip
if (endpoint_mask = (endpoint_mask'range => '0')) then
stage_next <= SKIP_PACKET;
elsif ((endpoint_mask and endpoint_full) = (endpoint_full'range => '0')) then
-- Increment Counter
cnt_next <= cnt + 1;
-- Enable Write
wr_sig <= '1';
case (cnt) is
when 0 =>
output_sig <= OPCODE_LIVELINESS_UPDATE;
when 1 =>
output_sig <= guid(0);
when 2 =>
output_sig <= guid(1);
when 3 =>
output_sig <= guid(2);
-- DONE
stage_next <= SKIP_PACKET;
end case;
end if;
--#############################
when SKIP_PARAMETER =>
-- End of Parameter

View File

@ -339,6 +339,7 @@ package rtps_package is
constant OPCODE_MATCH : std_logic_vector(ENDPOINT_MATCH_OPCODE_WIDTH-1 downto 0) := x"55000000";
constant OPCODE_UNMATCH : std_logic_vector(ENDPOINT_MATCH_OPCODE_WIDTH-1 downto 0) := x"55000001";
constant OPCODE_LIVELINESS_UPDATE : std_logic_vector(ENDPOINT_MATCH_OPCODE_WIDTH-1 downto 0) := x"55000002";
type USER_ENDPOINT_OUTPUT is array (0 to MAX_ENDPOINTS-1) of std_logic_vector(31 downto 0);
type ENDPOINT_BITMASK_ARRAY_TYPE is array (0 to ENDPOINT_BITMASK_SIZE-1) of std_logic_vector(31 downto 0);