From a3a51e2f52ed751ef73dd81bf9449d0a6849d77c Mon Sep 17 00:00:00 2001 From: Greek Date: Sun, 14 Feb 2021 19:58:09 +0100 Subject: [PATCH] Various Fixes in RTPS Reader/Writer Amongst other changes the RTPS Reader now supportes VOLATILE Durability (Will only request most recent change from new matched Writers) --- src/rtps_package.vhd | 1 + src/rtps_reader.vhd | 60 ++++++++---- src/rtps_writer.vhd | 217 +++++++++++++++++++++---------------------- 3 files changed, 150 insertions(+), 128 deletions(-) diff --git a/src/rtps_package.vhd b/src/rtps_package.vhd index a5d8c58..b1de52a 100644 --- a/src/rtps_package.vhd +++ b/src/rtps_package.vhd @@ -105,6 +105,7 @@ package rtps_package is constant TIME_ZERO : TIME_TYPE := (others => (others => '0')); constant TIME_INVALID : TIME_TYPE := (others => (others => '1')); constant TIME_INFINITE : TIME_TYPE := (x"ffffffff", x"fffffffe"); + constant FIRST_SEQUENCENUMBER : SEQUENCENUMBER_TYPE := (x"00000000", x"00000001"); constant SEQUENCENUMBER_UNKNOWN : SEQUENCENUMBER_TYPE := (x"ffffffff", x"00000000"); constant PROTOCOL_RTPS : std_logic_vector(PROTOCOL_WIDTH-1 downto 0) := x"52545053"; -- 'RTPS' in Ascii code constant PROTOCOLVERSION_1_0 : std_logic_vector(PROTOCOLVERSION_WIDTH-1 downto 0) := x"0100"; diff --git a/src/rtps_reader.vhd b/src/rtps_reader.vhd index 5b5c98e..f9704a4 100644 --- a/src/rtps_reader.vhd +++ b/src/rtps_reader.vhd @@ -13,6 +13,7 @@ entity rtps_reader is generic ( RELIABILTY_QOS : std_logic_vector(CDR_ENUMERATION_WIDTH-1 downto 0) := DEFAULT_RELIABILTY_QOS; LIVELINESS_QOS : std_logic_vector(CDR_ENUMERATION_WIDTH-1 downto 0) := DEFAULT_LIVELINESS_QOS; + DURABILITY_QOS : std_logic_vector(CDR_ENUMERATION_WIDTH-1 downto 0) := DEFAULT_DURABILITY_QOS; HEARTBEAT_RESPONSE_DELAY : DURATION_TYPE := TODO; HEARTBEAT_SUPPRESSION_DELAY : DURATION_TYPE := TODO; LEASE_DURATION : DURATION_TYPE := DEFAULT_LEASE_DURATION; @@ -587,7 +588,7 @@ begin case (opcode) is when SID_DATA => stage_next <= LATCH_EXTRA_DATA; - mem_field_flags <= (others => '0'); + mem_field_flags <= EMF_NEXT_SEQ_NR_FLAG; cnt_next <= 0; when SID_HEARTBEAT => stage_next <= LATCH_HEARTBEAT; @@ -850,8 +851,23 @@ begin -- No scheduled Heartbeat Response if (mem_endpoint_data.res_time = TIME_INVALID) then + -- If Reader is Volatile and we have not received anything from the writer yet + if (DURABILITY_QOS = VOLATILE_DURABILITY_QOS and mem_endpoint_data.next_seq_nr = SEQUENCENUMBER_UNKNOWN) then + -- Mark last available SN as next expected (Ignore historical data) + next_seq_nr_next <= last_seq_nr; + mem_op_start <= '1'; + mem_opcode <= UPDATE_ENDPOINT; + tmp_flags := tmp_flags or EMF_NEXT_SEQ_NR_FLAG or EMF_RES_TIME_FLAG; + if (HEARTBEAT_RESPONSE_DELAY /= DURATION_INFINITE) then + res_time <= time + HEARTBEAT_RESPONSE_DELAY; + -- NOTE: Last Bit denotes if this is Response or Suppression Delay + res_time(1)(0) <= '0'; + tmp_dw := (time + HEARTBEAT_RESPONSE_DELAY) when ((time + HEARTBEAT_RESPONSE_DELAY) < tmp_dw); + else + res_time <= TIME_INVALID; + end if; -- If current Sequence Number obsolete (removed from source history cache) - if (first_seq_nr > mem_endpoint_data.next_seq_nr and first_seq_nr <= last_seq_nr) then + elsif (first_seq_nr > mem_endpoint_data.next_seq_nr and first_seq_nr <= last_seq_nr) then -- Store new expected Sequence Number and set Response Dealy next_seq_nr_next <= first_seq_nr; mem_op_start <= '1'; @@ -1146,16 +1162,26 @@ begin stage_next <= SKIP_PARAMETER; end if; when INITIATE_ADD_CACHE_CHANGE_REQUEST => - -- SANITY CHECK: Skip if no Hash Key and no payload - if (WITH_KEY and key_hash_rcvd = '0' and data_flag = '0' and key_flag = '0') then - stage_next <= SKIP_PACKET; - else - start_hc <= '1'; - opcode_hc <= ADD_CACHE_CHANGE; - -- Wait until History Cache acknowledges request - if (ack_hc = '1') then - stage_next <= ADD_CACHE_CHANGE; - cnt_next <= 0; + -- Wait for Endpoint Data + if (mem_op_done = '1') then + -- Data is Next expected Sequence Number + if ((RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS and seq_nr = mem_endpoint_data.next_seq_nr) or (RELIABILTY_QOS = BEST_EFFORT_RELIABILITY_QOS and seq_nr >= mem_endpoint_data.next_seq_nr) or (DURABILITY_QOS = VOLATILE_DURABILITY_QOS and mem_endpoint_data.next_seq_nr = SEQUENCENUMBER_UNKNOWN)) then + -- SANITY CHECK: Skip if no Hash Key and no Payload + if (WITH_KEY and key_hash_rcvd = '0' and data_flag = '0' and key_flag = '0') then + -- Ignore + stage_next <= SKIP_PACKET; + else + start_hc <= '1'; + opcode_hc <= ADD_CACHE_CHANGE; + -- Wait until History Cache acknowledges request + if (ack_hc = '1') then + stage_next <= ADD_CACHE_CHANGE; + cnt_next <= 0; + end if; + end if; + else + -- Ignore + stage_next <= SKIP_PACKET; end if; end if; when ADD_CACHE_CHANGE => @@ -1529,12 +1555,8 @@ begin -- Reset Parameter End parameter_end_next <= (others => '1'); - -- Stale Check Exit - if (stale_check = '1') then - -- DONE - stage_next <= IDLE; -- Consumed last word of Packet - elsif (last_word_in_latch = '1' and last_word_in_user = '0') then + if (last_word_in_latch = '1' and last_word_in_user = '0') then -- Reset Last Word In Latch last_word_in_latch_next <= '0'; -- DONE @@ -2340,7 +2362,7 @@ begin when 6 => mem_valid_in <= '1'; mem_addr <= mem_addr_base + EMF_NEXT_SEQ_NR_OFFSET; - mem_write_data <= (others => '0'); + mem_write_data <= SEQUENCENUMBER_UNKNOWN(0) when (DURABILITY_QOS = VOLATILE_DURABILITY_QOS) else FIRST_SEQUENCENUMBER(0); if (mem_ready_in = '1') then mem_cnt_next <= mem_cnt + 1; end if; @@ -2349,7 +2371,7 @@ begin mem_write_data <= (others => '0'); mem_valid_in <= '1'; mem_addr <= mem_addr_base + EMF_NEXT_SEQ_NR_OFFSET + 1; - mem_write_data <= to_unsigned(1,CDR_LONG_WIDTH); + mem_write_data <= SEQUENCENUMBER_UNKNOWN(1) when (DURABILITY_QOS = VOLATILE_DURABILITY_QOS) else FIRST_SEQUENCENUMBER(1); if (mem_ready_in = '1') then mem_cnt_next <= mem_cnt + 1; end if; diff --git a/src/rtps_writer.vhd b/src/rtps_writer.vhd index 9f7b3ce..0d8e747 100644 --- a/src/rtps_writer.vhd +++ b/src/rtps_writer.vhd @@ -713,7 +713,7 @@ begin seq_nr <= SEQUENCENUMBER_UNKNOWN when (reader_flags(BEST_EFFORT_FLAG) = '1') else (others => (others => '0')); -- Reader needs Historical Data - if (reader_flags(SEND_HISTORICAL_DATA_FLAG) = '1') then + if (DURABILITY_QOS /= VOLATILE_DURABILITY_QOS and reader_flags(SEND_HISTORICAL_DATA_FLAG) = '1') then -- Send Historical Data historical_push_next <= '1'; @@ -1229,7 +1229,7 @@ begin min_sn_next(1) <= data_in_hc; assert (last_word_in_hc = '1') severity FAILURE; - if (historical_push = '1') then + if (DURABILITY_QOS /= VOLATILE_DURABILITY_QOS and historical_push = '1') then stage_next <= HANDLE_HISTORICAL; cnt_next <= 4; -- Pre-Check Sub-state elsif (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS or LIVELINESS_QOS = MANUAL_BY_TOPIC_LIVELINESS_QOS) then @@ -1501,105 +1501,108 @@ begin end case; end if; when HANDLE_HISTORICAL => - case (cnt) is - -- Next SN - when 0 => - -- End of Historical Data - if (next_seq_nr = last_seq_nr) then - if (gap_in_progress) then - -- NOTE: We close the GAP, than come back here, and exit through the else branch. - -- Close GAP Message - gap_in_progress_next <= '0'; - stage_next <= SEND_GAP_B; - return_stage_next <= HANDLE_HISTORICAL; - cnt_next <= 0; + -- Synthesis Guard + if (DURABILITY_QOS /= VOLATILE_DURABILITY_QOS) then + case (cnt) is + -- Next SN + when 0 => + -- End of Historical Data + if (next_seq_nr = last_seq_nr) then + if (gap_in_progress) then + -- NOTE: We close the GAP, than come back here, and exit through the else branch. + -- Close GAP Message + gap_in_progress_next <= '0'; + stage_next <= SEND_GAP_B; + return_stage_next <= HANDLE_HISTORICAL; + cnt_next <= 0; + else + -- Reset + historical_push_next <= '0'; + + -- DONE + stage_next <= IDLE; + end if; else - -- Reset - historical_push_next <= '0'; - - -- DONE - stage_next <= IDLE; + next_seq_nr_next <= next_seq_nr + 1; + -- If remote Reader is RELIABLE, NACK the SN + cnt_next <= (cnt + 1) when (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS and reader_flags(BEST_EFFORT_FLAG) = '0') else (cnt + 2); end if; - else - next_seq_nr_next <= next_seq_nr + 1; - -- If remote Reader is RELIABLE, NACK the SN - cnt_next <= (cnt + 1) when (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS and reader_flags(BEST_EFFORT_FLAG) = '0') else (cnt + 2); - end if; - -- NACK SN - when 1 => - -- Synthesis Guard - if (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS) then + -- NACK SN + when 1 => + -- Synthesis Guard + if (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS) then + start_hc <= '1'; + opcode_hc <= NACK_CACHE_CHANGE; + seq_nr_hc <= next_seq_nr; + + -- NOTE: Ignore Return Code + + -- Wait until Operation Acknowledgement + if (ack_hc = '1') then + cnt_next <= cnt + 1; + end if; + end if; + -- Get SN + when 2 => start_hc <= '1'; - opcode_hc <= NACK_CACHE_CHANGE; + opcode_hc <= GET_CACHE_CHANGE; seq_nr_hc <= next_seq_nr; - -- NOTE: Ignore Return Code - -- Wait until Operation Acknowledgement if (ack_hc = '1') then cnt_next <= cnt + 1; end if; - end if; - -- Get SN - when 2 => - start_hc <= '1'; - opcode_hc <= GET_CACHE_CHANGE; - seq_nr_hc <= next_seq_nr; - - -- Wait until Operation Acknowledgement - if (ack_hc = '1') then - cnt_next <= cnt + 1; - end if; - -- Handle Request - when 3 => - -- Wait until Operation Response - if (done_hc = '1') then - -- Wait for Operation Response - if (ret_hc = OK) then - -- Only request Payload if necessary (Contains DATA or Serialized Key) - if (cc_kind = ALIVE or (WITH_KEY and cc_kind /= ALIVE)) then - get_data_hc <= '1'; - end if; - - if (gap_in_progress = '1') then - -- Close GAP and send DATA - gap_in_progress_next <= '0'; - stage_next <= SEND_GAP_B; - return_stage_next <= SEND_INFO_TS when (DESTINATION_ORDER_QOS = BY_SOURCE_TIMESTAMP_DESTINATION_ORDER_QOS) else SEND_DATA_A; - else - -- Send DATA - stage_next <= SEND_HEADER; - return_stage_next <= SEND_INFO_TS when (DESTINATION_ORDER_QOS = BY_SOURCE_TIMESTAMP_DESTINATION_ORDER_QOS) else SEND_DATA_A; - cnt_next <= 0; - end if; - elsif (ret_hc = INVALID) then - if (gap_in_progress = '0') then - -- Open new GAP - gap_in_progress_next <= '1'; - stage_next <= SEND_HEADER; - return_stage_next <= SEND_GAP_A; - cnt_next <= 0; + -- Handle Request + when 3 => + -- Wait until Operation Response + if (done_hc = '1') then + -- Wait for Operation Response + if (ret_hc = OK) then + -- Only request Payload if necessary (Contains DATA or Serialized Key) + if (cc_kind = ALIVE or (WITH_KEY and cc_kind /= ALIVE)) then + get_data_hc <= '1'; + end if; + + if (gap_in_progress = '1') then + -- Close GAP and send DATA + gap_in_progress_next <= '0'; + stage_next <= SEND_GAP_B; + return_stage_next <= SEND_INFO_TS when (DESTINATION_ORDER_QOS = BY_SOURCE_TIMESTAMP_DESTINATION_ORDER_QOS) else SEND_DATA_A; + else + -- Send DATA + stage_next <= SEND_HEADER; + return_stage_next <= SEND_INFO_TS when (DESTINATION_ORDER_QOS = BY_SOURCE_TIMESTAMP_DESTINATION_ORDER_QOS) else SEND_DATA_A; + cnt_next <= 0; + end if; + elsif (ret_hc = INVALID) then + if (gap_in_progress = '0') then + -- Open new GAP + gap_in_progress_next <= '1'; + stage_next <= SEND_HEADER; + return_stage_next <= SEND_GAP_A; + cnt_next <= 0; + end if; end if; end if; - end if; - -- Pre-Check - when 4 => - -- No Cache CHanges Available - if (min_sn = SEQUENCENUMBER_UNKNOWN) then - -- Reset - historical_push_next <= '0'; - - -- DONE - stage_next <= IDLE; - else - next_seq_nr_next <= min_sn; - cnt_next <= cnt + 2; -- Skip Increment - -- If remote Reader is RELIABLE, NACK the SN - cnt_next <= 1 when (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS and reader_flags(BEST_EFFORT_FLAG) = '0') else 2; - end if; - when others => - null; - end case; + -- Pre-Check + when 4 => + -- No Cache CHanges Available + if (min_sn = SEQUENCENUMBER_UNKNOWN) then + -- Reset + historical_push_next <= '0'; + + -- DONE + stage_next <= IDLE; + else + next_seq_nr_next <= min_sn; + cnt_next <= cnt + 2; -- Skip Increment + -- If remote Reader is RELIABLE, NACK the SN + cnt_next <= 1 when (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS and reader_flags(BEST_EFFORT_FLAG) = '0') else 2; + end if; + when others => + null; + end case; + end if; when SEND_HEADER => -- Synthesis Guard if (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS) then @@ -1715,15 +1718,15 @@ begin cnt_next <= 0; else -- Continue - if (stale_check = '1') then + if (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS and stale_check = '1') then stage_next <= HANDLE_REQUESTS; cnt_next <= 0; req_bitmap_pos_next <= req_bitmap_pos + 1; next_seq_nr_next <= next_seq_nr + 1; - elsif (new_push = '1') then + elsif (PUSH_MODE and new_push = '1') then stage_next <= HANDLE_NEW; cnt_next <= 4; - elsif (historical_push = '1') then + elsif (DURABILITY_QOS /= VOLATILE_DURABILITY_QOS and historical_push = '1') then stage_next <= HANDLE_HISTORICAL; cnt_next <= 0; else @@ -1804,15 +1807,15 @@ begin stage_next <= SEND_DATA_B; else -- Continue - if (stale_check = '1') then + if (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS and stale_check = '1') then stage_next <= HANDLE_REQUESTS; cnt_next <= 0; req_bitmap_pos_next <= req_bitmap_pos + 1; next_seq_nr_next <= next_seq_nr + 1; - elsif (new_push = '1') then + elsif (PUSH_MODE and new_push = '1') then stage_next <= HANDLE_NEW; cnt_next <= 4; - elsif (historical_push = '1') then + elsif (DURABILITY_QOS /= VOLATILE_DURABILITY_QOS and historical_push = '1') then stage_next <= HANDLE_HISTORICAL; cnt_next <= 0; else @@ -1837,15 +1840,15 @@ begin stage_next <= SEND_DATA_B; else -- Continue - if (stale_check = '1') then + if (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS and stale_check = '1') then stage_next <= HANDLE_REQUESTS; cnt_next <= 0; req_bitmap_pos_next <= req_bitmap_pos + 1; next_seq_nr_next <= next_seq_nr + 1; - elsif (new_push = '1') then + elsif (PUSH_MODE and new_push = '1') then stage_next <= HANDLE_NEW; cnt_next <= 4; - elsif (historical_push = '1') then + elsif (DURABILITY_QOS /= VOLATILE_DURABILITY_QOS and historical_push = '1') then stage_next <= HANDLE_HISTORICAL; cnt_next <= 0; else @@ -1867,15 +1870,15 @@ begin last_word_out_rtps = '1'; -- Continue - if (stale_check = '1') then + if (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS and stale_check = '1') then stage_next <= HANDLE_REQUESTS; cnt_next <= 0; req_bitmap_pos_next <= req_bitmap_pos + 1; next_seq_nr_next <= next_seq_nr + 1; - elsif (new_push = '1') then + elsif (PUSH_MODE and new_push = '1') then stage_next <= HANDLE_NEW; cnt_next <= 4; - elsif (historical_push = '1') then + elsif (DURABILITY_QOS /= VOLATILE_DURABILITY_QOS and historical_push = '1') then stage_next <= HANDLE_HISTORICAL; cnt_next <= 0; else @@ -1909,12 +1912,12 @@ begin data_out <= std_logic_vector(next_seq_nr(1)); -- Continue - if (stale_check = '1') then + if (RELIABILTY_QOS = RELIABLE_RELIABILITY_QOS and stale_check = '1') then stage_next <= HANDLE_REQUESTS; cnt_next <= 0; req_bitmap_pos_next <= req_bitmap_pos + 1; next_seq_nr_next <= next_seq_nr + 1; - elsif (historical_push = '1') then + elsif (DURABILITY_QOS /= VOLATILE_DURABILITY_QOS and historical_push = '1') then stage_next <= HANDLE_HISTORICAL; cnt_next <= 0; end if; @@ -1997,12 +2000,8 @@ begin -- Reset Parameter End parameter_end_next <= (others => '1'); - -- Stale Check Exit - if (stale_check = '1') then - -- DONE - stage_next <= IDLE; -- Consumed last word of Packet - elsif (last_word_in_latch = '1' and last_word_in_user = '0') then + if (last_word_in_latch = '1' and last_word_in_user = '0') then -- Reset Last Word In Latch last_word_in_latch_next <= '0'; -- DONE