Various Fixes in RTPS Reader/Writer

Amongst other changes the RTPS Reader now supportes VOLATILE Durability
(Will only request most recent change from new matched Writers)
This commit is contained in:
Greek 2021-02-14 19:58:09 +01:00
parent 348d036659
commit a3a51e2f52
3 changed files with 150 additions and 128 deletions

View File

@ -105,6 +105,7 @@ package rtps_package is
constant TIME_ZERO : TIME_TYPE := (others => (others => '0'));
constant TIME_INVALID : TIME_TYPE := (others => (others => '1'));
constant TIME_INFINITE : TIME_TYPE := (x"ffffffff", x"fffffffe");
constant FIRST_SEQUENCENUMBER : SEQUENCENUMBER_TYPE := (x"00000000", x"00000001");
constant SEQUENCENUMBER_UNKNOWN : SEQUENCENUMBER_TYPE := (x"ffffffff", x"00000000");
constant PROTOCOL_RTPS : std_logic_vector(PROTOCOL_WIDTH-1 downto 0) := x"52545053"; -- 'RTPS' in Ascii code
constant PROTOCOLVERSION_1_0 : std_logic_vector(PROTOCOLVERSION_WIDTH-1 downto 0) := x"0100";

View File

@ -13,6 +13,7 @@ entity rtps_reader is
generic (
RELIABILTY_QOS : std_logic_vector(CDR_ENUMERATION_WIDTH-1 downto 0) := DEFAULT_RELIABILTY_QOS;
LIVELINESS_QOS : std_logic_vector(CDR_ENUMERATION_WIDTH-1 downto 0) := DEFAULT_LIVELINESS_QOS;
DURABILITY_QOS : std_logic_vector(CDR_ENUMERATION_WIDTH-1 downto 0) := DEFAULT_DURABILITY_QOS;
HEARTBEAT_RESPONSE_DELAY : DURATION_TYPE := TODO;
HEARTBEAT_SUPPRESSION_DELAY : DURATION_TYPE := TODO;
LEASE_DURATION : DURATION_TYPE := DEFAULT_LEASE_DURATION;
@ -587,7 +588,7 @@ begin
case (opcode) is
when SID_DATA =>
stage_next <= LATCH_EXTRA_DATA;
mem_field_flags <= (others => '0');
mem_field_flags <= EMF_NEXT_SEQ_NR_FLAG;
cnt_next <= 0;
when SID_HEARTBEAT =>
stage_next <= LATCH_HEARTBEAT;
@ -850,8 +851,23 @@ begin
-- No scheduled Heartbeat Response
if (mem_endpoint_data.res_time = TIME_INVALID) then
-- If Reader is Volatile and we have not received anything from the writer yet
if (DURABILITY_QOS = VOLATILE_DURABILITY_QOS and mem_endpoint_data.next_seq_nr = SEQUENCENUMBER_UNKNOWN) then
-- Mark last available SN as next expected (Ignore historical data)
next_seq_nr_next <= last_seq_nr;
mem_op_start <= '1';
mem_opcode <= UPDATE_ENDPOINT;
tmp_flags := tmp_flags or EMF_NEXT_SEQ_NR_FLAG or EMF_RES_TIME_FLAG;
if (HEARTBEAT_RESPONSE_DELAY /= DURATION_INFINITE) then
res_time <= time + HEARTBEAT_RESPONSE_DELAY;
-- NOTE: Last Bit denotes if this is Response or Suppression Delay
res_time(1)(0) <= '0';
tmp_dw := (time + HEARTBEAT_RESPONSE_DELAY) when ((time + HEARTBEAT_RESPONSE_DELAY) < tmp_dw);
else
res_time <= TIME_INVALID;
end if;
-- If current Sequence Number obsolete (removed from source history cache)
if (first_seq_nr > mem_endpoint_data.next_seq_nr and first_seq_nr <= last_seq_nr) then
elsif (first_seq_nr > mem_endpoint_data.next_seq_nr and first_seq_nr <= last_seq_nr) then
-- Store new expected Sequence Number and set Response Dealy
next_seq_nr_next <= first_seq_nr;
mem_op_start <= '1';
@ -1146,16 +1162,26 @@ begin
stage_next <= SKIP_PARAMETER;
end if;
when INITIATE_ADD_CACHE_CHANGE_REQUEST =>
-- SANITY CHECK: Skip if no Hash Key and no payload
if (WITH_KEY and key_hash_rcvd = '0' and data_flag = '0' and key_flag = '0') then
stage_next <= SKIP_PACKET;
else
start_hc <= '1';
opcode_hc <= ADD_CACHE_CHANGE;
-- Wait until History Cache acknowledges request
if (ack_hc = '1') then
stage_next <= ADD_CACHE_CHANGE;
cnt_next <= 0;
-- Wait for Endpoint Data
if (mem_op_done = '1') then
-- Data is Next expected Sequence Number
if ((RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS and seq_nr = mem_endpoint_data.next_seq_nr) or (RELIABILTY_QOS = BEST_EFFORT_RELIABILITY_QOS and seq_nr >= mem_endpoint_data.next_seq_nr) or (DURABILITY_QOS = VOLATILE_DURABILITY_QOS and mem_endpoint_data.next_seq_nr = SEQUENCENUMBER_UNKNOWN)) then
-- SANITY CHECK: Skip if no Hash Key and no Payload
if (WITH_KEY and key_hash_rcvd = '0' and data_flag = '0' and key_flag = '0') then
-- Ignore
stage_next <= SKIP_PACKET;
else
start_hc <= '1';
opcode_hc <= ADD_CACHE_CHANGE;
-- Wait until History Cache acknowledges request
if (ack_hc = '1') then
stage_next <= ADD_CACHE_CHANGE;
cnt_next <= 0;
end if;
end if;
else
-- Ignore
stage_next <= SKIP_PACKET;
end if;
end if;
when ADD_CACHE_CHANGE =>
@ -1529,12 +1555,8 @@ begin
-- Reset Parameter End
parameter_end_next <= (others => '1');
-- Stale Check Exit
if (stale_check = '1') then
-- DONE
stage_next <= IDLE;
-- Consumed last word of Packet
elsif (last_word_in_latch = '1' and last_word_in_user = '0') then
if (last_word_in_latch = '1' and last_word_in_user = '0') then
-- Reset Last Word In Latch
last_word_in_latch_next <= '0';
-- DONE
@ -2340,7 +2362,7 @@ begin
when 6 =>
mem_valid_in <= '1';
mem_addr <= mem_addr_base + EMF_NEXT_SEQ_NR_OFFSET;
mem_write_data <= (others => '0');
mem_write_data <= SEQUENCENUMBER_UNKNOWN(0) when (DURABILITY_QOS = VOLATILE_DURABILITY_QOS) else FIRST_SEQUENCENUMBER(0);
if (mem_ready_in = '1') then
mem_cnt_next <= mem_cnt + 1;
end if;
@ -2349,7 +2371,7 @@ begin
mem_write_data <= (others => '0');
mem_valid_in <= '1';
mem_addr <= mem_addr_base + EMF_NEXT_SEQ_NR_OFFSET + 1;
mem_write_data <= to_unsigned(1,CDR_LONG_WIDTH);
mem_write_data <= SEQUENCENUMBER_UNKNOWN(1) when (DURABILITY_QOS = VOLATILE_DURABILITY_QOS) else FIRST_SEQUENCENUMBER(1);
if (mem_ready_in = '1') then
mem_cnt_next <= mem_cnt + 1;
end if;

View File

@ -713,7 +713,7 @@ begin
seq_nr <= SEQUENCENUMBER_UNKNOWN when (reader_flags(BEST_EFFORT_FLAG) = '1') else (others => (others => '0'));
-- Reader needs Historical Data
if (reader_flags(SEND_HISTORICAL_DATA_FLAG) = '1') then
if (DURABILITY_QOS /= VOLATILE_DURABILITY_QOS and reader_flags(SEND_HISTORICAL_DATA_FLAG) = '1') then
-- Send Historical Data
historical_push_next <= '1';
@ -1229,7 +1229,7 @@ begin
min_sn_next(1) <= data_in_hc;
assert (last_word_in_hc = '1') severity FAILURE;
if (historical_push = '1') then
if (DURABILITY_QOS /= VOLATILE_DURABILITY_QOS and historical_push = '1') then
stage_next <= HANDLE_HISTORICAL;
cnt_next <= 4; -- Pre-Check Sub-state
elsif (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS or LIVELINESS_QOS = MANUAL_BY_TOPIC_LIVELINESS_QOS) then
@ -1501,105 +1501,108 @@ begin
end case;
end if;
when HANDLE_HISTORICAL =>
case (cnt) is
-- Next SN
when 0 =>
-- End of Historical Data
if (next_seq_nr = last_seq_nr) then
if (gap_in_progress) then
-- NOTE: We close the GAP, than come back here, and exit through the else branch.
-- Close GAP Message
gap_in_progress_next <= '0';
stage_next <= SEND_GAP_B;
return_stage_next <= HANDLE_HISTORICAL;
cnt_next <= 0;
-- Synthesis Guard
if (DURABILITY_QOS /= VOLATILE_DURABILITY_QOS) then
case (cnt) is
-- Next SN
when 0 =>
-- End of Historical Data
if (next_seq_nr = last_seq_nr) then
if (gap_in_progress) then
-- NOTE: We close the GAP, than come back here, and exit through the else branch.
-- Close GAP Message
gap_in_progress_next <= '0';
stage_next <= SEND_GAP_B;
return_stage_next <= HANDLE_HISTORICAL;
cnt_next <= 0;
else
-- Reset
historical_push_next <= '0';
-- DONE
stage_next <= IDLE;
end if;
else
-- Reset
historical_push_next <= '0';
-- DONE
stage_next <= IDLE;
next_seq_nr_next <= next_seq_nr + 1;
-- If remote Reader is RELIABLE, NACK the SN
cnt_next <= (cnt + 1) when (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS and reader_flags(BEST_EFFORT_FLAG) = '0') else (cnt + 2);
end if;
else
next_seq_nr_next <= next_seq_nr + 1;
-- If remote Reader is RELIABLE, NACK the SN
cnt_next <= (cnt + 1) when (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS and reader_flags(BEST_EFFORT_FLAG) = '0') else (cnt + 2);
end if;
-- NACK SN
when 1 =>
-- Synthesis Guard
if (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS) then
start_hc <= '1';
opcode_hc <= NACK_CACHE_CHANGE;
seq_nr_hc <= next_seq_nr;
-- NACK SN
when 1 =>
-- Synthesis Guard
if (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS) then
start_hc <= '1';
opcode_hc <= NACK_CACHE_CHANGE;
seq_nr_hc <= next_seq_nr;
-- NOTE: Ignore Return Code
-- NOTE: Ignore Return Code
-- Wait until Operation Acknowledgement
if (ack_hc = '1') then
cnt_next <= cnt + 1;
end if;
end if;
-- Get SN
when 2 =>
start_hc <= '1';
opcode_hc <= GET_CACHE_CHANGE;
seq_nr_hc <= next_seq_nr;
-- Wait until Operation Acknowledgement
if (ack_hc = '1') then
cnt_next <= cnt + 1;
end if;
end if;
-- Get SN
when 2 =>
start_hc <= '1';
opcode_hc <= GET_CACHE_CHANGE;
seq_nr_hc <= next_seq_nr;
-- Handle Request
when 3 =>
-- Wait until Operation Response
if (done_hc = '1') then
-- Wait for Operation Response
if (ret_hc = OK) then
-- Only request Payload if necessary (Contains DATA or Serialized Key)
if (cc_kind = ALIVE or (WITH_KEY and cc_kind /= ALIVE)) then
get_data_hc <= '1';
end if;
-- Wait until Operation Acknowledgement
if (ack_hc = '1') then
cnt_next <= cnt + 1;
end if;
-- Handle Request
when 3 =>
-- Wait until Operation Response
if (done_hc = '1') then
-- Wait for Operation Response
if (ret_hc = OK) then
-- Only request Payload if necessary (Contains DATA or Serialized Key)
if (cc_kind = ALIVE or (WITH_KEY and cc_kind /= ALIVE)) then
get_data_hc <= '1';
end if;
if (gap_in_progress = '1') then
-- Close GAP and send DATA
gap_in_progress_next <= '0';
stage_next <= SEND_GAP_B;
return_stage_next <= SEND_INFO_TS when (DESTINATION_ORDER_QOS = BY_SOURCE_TIMESTAMP_DESTINATION_ORDER_QOS) else SEND_DATA_A;
else
-- Send DATA
stage_next <= SEND_HEADER;
return_stage_next <= SEND_INFO_TS when (DESTINATION_ORDER_QOS = BY_SOURCE_TIMESTAMP_DESTINATION_ORDER_QOS) else SEND_DATA_A;
cnt_next <= 0;
end if;
elsif (ret_hc = INVALID) then
if (gap_in_progress = '0') then
-- Open new GAP
gap_in_progress_next <= '1';
stage_next <= SEND_HEADER;
return_stage_next <= SEND_GAP_A;
cnt_next <= 0;
if (gap_in_progress = '1') then
-- Close GAP and send DATA
gap_in_progress_next <= '0';
stage_next <= SEND_GAP_B;
return_stage_next <= SEND_INFO_TS when (DESTINATION_ORDER_QOS = BY_SOURCE_TIMESTAMP_DESTINATION_ORDER_QOS) else SEND_DATA_A;
else
-- Send DATA
stage_next <= SEND_HEADER;
return_stage_next <= SEND_INFO_TS when (DESTINATION_ORDER_QOS = BY_SOURCE_TIMESTAMP_DESTINATION_ORDER_QOS) else SEND_DATA_A;
cnt_next <= 0;
end if;
elsif (ret_hc = INVALID) then
if (gap_in_progress = '0') then
-- Open new GAP
gap_in_progress_next <= '1';
stage_next <= SEND_HEADER;
return_stage_next <= SEND_GAP_A;
cnt_next <= 0;
end if;
end if;
end if;
end if;
-- Pre-Check
when 4 =>
-- No Cache CHanges Available
if (min_sn = SEQUENCENUMBER_UNKNOWN) then
-- Reset
historical_push_next <= '0';
-- Pre-Check
when 4 =>
-- No Cache CHanges Available
if (min_sn = SEQUENCENUMBER_UNKNOWN) then
-- Reset
historical_push_next <= '0';
-- DONE
stage_next <= IDLE;
else
next_seq_nr_next <= min_sn;
cnt_next <= cnt + 2; -- Skip Increment
-- If remote Reader is RELIABLE, NACK the SN
cnt_next <= 1 when (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS and reader_flags(BEST_EFFORT_FLAG) = '0') else 2;
end if;
when others =>
null;
end case;
-- DONE
stage_next <= IDLE;
else
next_seq_nr_next <= min_sn;
cnt_next <= cnt + 2; -- Skip Increment
-- If remote Reader is RELIABLE, NACK the SN
cnt_next <= 1 when (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS and reader_flags(BEST_EFFORT_FLAG) = '0') else 2;
end if;
when others =>
null;
end case;
end if;
when SEND_HEADER =>
-- Synthesis Guard
if (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS) then
@ -1715,15 +1718,15 @@ begin
cnt_next <= 0;
else
-- Continue
if (stale_check = '1') then
if (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS and stale_check = '1') then
stage_next <= HANDLE_REQUESTS;
cnt_next <= 0;
req_bitmap_pos_next <= req_bitmap_pos + 1;
next_seq_nr_next <= next_seq_nr + 1;
elsif (new_push = '1') then
elsif (PUSH_MODE and new_push = '1') then
stage_next <= HANDLE_NEW;
cnt_next <= 4;
elsif (historical_push = '1') then
elsif (DURABILITY_QOS /= VOLATILE_DURABILITY_QOS and historical_push = '1') then
stage_next <= HANDLE_HISTORICAL;
cnt_next <= 0;
else
@ -1804,15 +1807,15 @@ begin
stage_next <= SEND_DATA_B;
else
-- Continue
if (stale_check = '1') then
if (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS and stale_check = '1') then
stage_next <= HANDLE_REQUESTS;
cnt_next <= 0;
req_bitmap_pos_next <= req_bitmap_pos + 1;
next_seq_nr_next <= next_seq_nr + 1;
elsif (new_push = '1') then
elsif (PUSH_MODE and new_push = '1') then
stage_next <= HANDLE_NEW;
cnt_next <= 4;
elsif (historical_push = '1') then
elsif (DURABILITY_QOS /= VOLATILE_DURABILITY_QOS and historical_push = '1') then
stage_next <= HANDLE_HISTORICAL;
cnt_next <= 0;
else
@ -1837,15 +1840,15 @@ begin
stage_next <= SEND_DATA_B;
else
-- Continue
if (stale_check = '1') then
if (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS and stale_check = '1') then
stage_next <= HANDLE_REQUESTS;
cnt_next <= 0;
req_bitmap_pos_next <= req_bitmap_pos + 1;
next_seq_nr_next <= next_seq_nr + 1;
elsif (new_push = '1') then
elsif (PUSH_MODE and new_push = '1') then
stage_next <= HANDLE_NEW;
cnt_next <= 4;
elsif (historical_push = '1') then
elsif (DURABILITY_QOS /= VOLATILE_DURABILITY_QOS and historical_push = '1') then
stage_next <= HANDLE_HISTORICAL;
cnt_next <= 0;
else
@ -1867,15 +1870,15 @@ begin
last_word_out_rtps = '1';
-- Continue
if (stale_check = '1') then
if (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS and stale_check = '1') then
stage_next <= HANDLE_REQUESTS;
cnt_next <= 0;
req_bitmap_pos_next <= req_bitmap_pos + 1;
next_seq_nr_next <= next_seq_nr + 1;
elsif (new_push = '1') then
elsif (PUSH_MODE and new_push = '1') then
stage_next <= HANDLE_NEW;
cnt_next <= 4;
elsif (historical_push = '1') then
elsif (DURABILITY_QOS /= VOLATILE_DURABILITY_QOS and historical_push = '1') then
stage_next <= HANDLE_HISTORICAL;
cnt_next <= 0;
else
@ -1909,12 +1912,12 @@ begin
data_out <= std_logic_vector(next_seq_nr(1));
-- Continue
if (stale_check = '1') then
if (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS and stale_check = '1') then
stage_next <= HANDLE_REQUESTS;
cnt_next <= 0;
req_bitmap_pos_next <= req_bitmap_pos + 1;
next_seq_nr_next <= next_seq_nr + 1;
elsif (historical_push = '1') then
elsif (DURABILITY_QOS /= VOLATILE_DURABILITY_QOS and historical_push = '1') then
stage_next <= HANDLE_HISTORICAL;
cnt_next <= 0;
end if;
@ -1997,12 +2000,8 @@ begin
-- Reset Parameter End
parameter_end_next <= (others => '1');
-- Stale Check Exit
if (stale_check = '1') then
-- DONE
stage_next <= IDLE;
-- Consumed last word of Packet
elsif (last_word_in_latch = '1' and last_word_in_user = '0') then
if (last_word_in_latch = '1' and last_word_in_user = '0') then
-- Reset Last Word In Latch
last_word_in_latch_next <= '0';
-- DONE