From 7e31f7ea53f27ac348a9acf2bfffe3b5607bb03d Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 23 May 2024 17:18:22 +0530 Subject: [PATCH 1/6] refactored code for duplicate message received from the server --- lib/ably/realtime/channel/channel_manager.rb | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/ably/realtime/channel/channel_manager.rb b/lib/ably/realtime/channel/channel_manager.rb index d10bc66c..94dac0bf 100644 --- a/lib/ably/realtime/channel/channel_manager.rb +++ b/lib/ably/realtime/channel/channel_manager.rb @@ -34,7 +34,7 @@ def detach(error, previous_state) # Channel is attached, notify presence if sync is expected def attached(attached_protocol_message) # If no attached ProtocolMessage then this attached request was triggered by the client - # library, such as returning to attached whne detach has failed + # library, such as returning to attached when detach has failed if attached_protocol_message update_presence_sync_state_following_attached attached_protocol_message channel.properties.set_attach_serial(attached_protocol_message.channel_serial) @@ -60,6 +60,7 @@ def request_reattach(options = {}) end def duplicate_attached_received(protocol_message) + logger.debug { "Server initiated ATTACHED message received for channel '#{channel.name}' with state #{channel.state}" } if protocol_message.error channel.set_channel_error_reason protocol_message.error log_channel_error protocol_message.error @@ -68,9 +69,7 @@ def duplicate_attached_received(protocol_message) channel.properties.set_attach_serial(protocol_message.channel_serial) channel.options.set_modes_from_flags(protocol_message.flags) - if protocol_message.has_channel_resumed_flag? - logger.debug { "ChannelManager: Additional resumed ATTACHED message received for #{channel.state} channel '#{channel.name}'" } - else + unless protocol_message.has_channel_resumed_flag? channel.emit :update, Ably::Models::ChannelStateChange.new( current: channel.state, previous: channel.state, From 8faef723dcfee24bddb5cabcc5404c8207554280 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 24 May 2024 15:25:08 +0530 Subject: [PATCH 2/6] refactored has presence flag for channel attached/re-attached --- lib/ably/realtime/channel/channel_manager.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/ably/realtime/channel/channel_manager.rb b/lib/ably/realtime/channel/channel_manager.rb index 94dac0bf..8c51580c 100644 --- a/lib/ably/realtime/channel/channel_manager.rb +++ b/lib/ably/realtime/channel/channel_manager.rb @@ -36,7 +36,7 @@ def attached(attached_protocol_message) # If no attached ProtocolMessage then this attached request was triggered by the client # library, such as returning to attached when detach has failed if attached_protocol_message - update_presence_sync_state_following_attached attached_protocol_message + update_presence_sync_state_following_attached attached_protocol_message.has_presence_flag? channel.properties.set_attach_serial(attached_protocol_message.channel_serial) channel.options.set_modes_from_flags(attached_protocol_message.flags) channel.options.set_params(attached_protocol_message.params) @@ -77,7 +77,7 @@ def duplicate_attached_received(protocol_message) reason: protocol_message.error, resumed: false, ) - update_presence_sync_state_following_attached protocol_message + update_presence_sync_state_following_attached protocol_message.has_presence_flag? end end @@ -253,8 +253,8 @@ def send_state_change_protocol_message(new_state, state_if_failed, message_optio ) end - def update_presence_sync_state_following_attached(attached_protocol_message) - if attached_protocol_message.has_presence_flag? + def update_presence_sync_state_following_attached(has_presence_flag) + if has_presence_flag channel.presence.manager.sync_expected else channel.presence.manager.sync_not_expected From 6ff1efe8eb2f6c62abd61c9609ac2291f9cfa398 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 24 May 2024 17:52:42 +0530 Subject: [PATCH 3/6] Added missing spec id for channel member map --- lib/ably/realtime/presence/members_map.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/ably/realtime/presence/members_map.rb b/lib/ably/realtime/presence/members_map.rb index ef2d8df3..d55fd1ca 100644 --- a/lib/ably/realtime/presence/members_map.rb +++ b/lib/ably/realtime/presence/members_map.rb @@ -213,6 +213,7 @@ def setup_event_handlers update_members_and_emit_events presence_message end + # RTP5a channel.unsafe_on(:failed, :detached) do reset_members reset_local_members From 43397368a12ff3e85efa792029b772184363aff9 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Tue, 28 May 2024 13:42:12 +0530 Subject: [PATCH 4/6] Refactored in_sync ruby constant as a part of ruby enum --- lib/ably/realtime/presence/members_map.rb | 42 +++++++++-------------- spec/acceptance/realtime/presence_spec.rb | 20 +++++------ 2 files changed, 26 insertions(+), 36 deletions(-) diff --git a/lib/ably/realtime/presence/members_map.rb b/lib/ably/realtime/presence/members_map.rb index d55fd1ca..9659fb53 100644 --- a/lib/ably/realtime/presence/members_map.rb +++ b/lib/ably/realtime/presence/members_map.rb @@ -23,7 +23,7 @@ class MembersMap :sync_starting, # Indicates the client is waiting for SYNC ProtocolMessages from Ably :sync_none, # Indicates the ATTACHED ProtocolMessage had no presence flag and thus no members on the channel :finalizing_sync, - :in_sync, + :sync_complete, # Indicates completion of server initiated sync :failed ) include Ably::Modules::StateEmitter @@ -49,16 +49,6 @@ def initialize(presence) setup_event_handlers end - # When attaching to a channel that has members present, the server - # initiates a sync automatically so that the client has a complete list of members. - # - # Until this sync is complete, this method returns false - # - # @return [Boolean] - def sync_complete? - in_sync? - end - # Update the SYNC serial from the ProtocolMessage so that SYNC can be resumed. # If the serial is nil, or the part after the first : is empty, then the SYNC is complete # @@ -110,27 +100,27 @@ def get(options = {}, &block) # Must be defined before subsequent procs reference this callback reset_callbacks = nil - in_sync_callback = lambda do + sync_complete_callback = lambda do reset_callbacks.call if reset_callbacks result_block.call end - failed_callback = lambda do |error| + sync_failed_callback = lambda do |error| reset_callbacks.call if reset_callbacks deferrable.fail error end reset_callbacks = lambda do - off(&in_sync_callback) - off(&failed_callback) - channel.off(&failed_callback) + off(&sync_complete_callback) + off(&sync_failed_callback) + channel.off(&sync_failed_callback) end - unsafe_once(:in_sync, &in_sync_callback) - unsafe_once(:failed, &failed_callback) + unsafe_once(:sync_complete, &sync_complete_callback) + unsafe_once(:failed, &sync_failed_callback) channel.unsafe_once(:detaching, :detached, :failed) do |error_reason| - failed_callback.call error_reason + sync_failed_callback.call error_reason end end @@ -228,7 +218,7 @@ def setup_event_handlers connection.on_resume(&resume_sync_proc) end - unsafe_once(:in_sync, :failed) do + unsafe_once(:sync_complete, :failed) do connection.off_resume(&resume_sync_proc) end end @@ -241,11 +231,11 @@ def setup_event_handlers unsafe_on(:finalizing_sync) do clean_up_absent_members - clean_up_members_not_present_in_sync - change_state :in_sync + clean_up_members_not_present_after_sync + change_state :sync_complete end - unsafe_on(:in_sync) do + unsafe_on(:sync_complete) do update_local_member_state end end @@ -376,7 +366,7 @@ def add_presence_member(presence_message) def remove_presence_member(presence_message) logger.debug { "#{self.class.name}: Member '#{presence_message.member_key}' removed.\n#{presence_message.to_json}" } - if in_sync? + if sync_complete? member_set_delete presence_message else member_set_upsert presence_message, false @@ -402,7 +392,7 @@ def member_set_upsert(presence_message, present) def member_set_delete(presence_message) members.delete presence_message.member_key - if in_sync? + if sync_complete? # If not in SYNC, then local members missing may need to be re-entered # Let #update_local_member_state handle missing members local_members.delete presence_message.member_key @@ -432,7 +422,7 @@ def clean_up_absent_members end end - def clean_up_members_not_present_in_sync + def clean_up_members_not_present_after_sync members.select do |member_key, member| member.fetch(:sync_session_id) != sync_session_id end.each do |member_key, member| diff --git a/spec/acceptance/realtime/presence_spec.rb b/spec/acceptance/realtime/presence_spec.rb index 8f3a5e54..ac87036e 100644 --- a/spec/acceptance/realtime/presence_spec.rb +++ b/spec/acceptance/realtime/presence_spec.rb @@ -516,11 +516,11 @@ def presence_action(method_name, data) stop_reactor end - it 'will emit an :in_sync event when synchronisation is complete' do + it 'will emit an :sync_complete event when synchronisation is complete' do presence_client_one.enter presence_client_two.enter - presence_anonymous_client.members.once(:in_sync) do + presence_anonymous_client.members.once(:sync_complete) do stop_reactor end @@ -545,7 +545,7 @@ def presence_action(method_name, data) entered += 1 next unless entered == 2 - presence_anonymous_client.members.once(:in_sync) do + presence_anonymous_client.members.once(:sync_complete) do expect(presence_anonymous_client.members.count).to eql(2) member_ids = presence_anonymous_client.members.map(&:member_key) expect(member_ids.count).to eql(2) @@ -848,7 +848,7 @@ def setup_members_on(presence) # Hacky accessing a private method, but absent members are intentionally not exposed to any public APIs expect(presence_anonymous_client.members.send(:absent_members).length).to eql(1) - presence_anonymous_client.members.once(:in_sync) do + presence_anonymous_client.members.once(:sync_complete) do # Check that members count is exact indicating the members with LEAVE action after sync are removed expect(presence_anonymous_client).to be_sync_complete expect(presence_anonymous_client.members.length).to eql(enter_expected_count - 1) @@ -1004,7 +1004,7 @@ def setup_members_on(presence) channel_anonymous_client.attach do presence_anonymous_client.get(wait_for_sync: false) do |members| - expect(presence_anonymous_client.members).to_not be_in_sync + expect(presence_anonymous_client.members).to_not be_sync_complete expect(members.count).to eql(0) stop_reactor end @@ -1211,7 +1211,7 @@ def setup_members_on(presence) presence_client_one.subscribe(:enter) do presence_client_one.unsubscribe :enter EventMachine.add_timer(0.5) do - expect(presence_client_one.members).to be_in_sync + expect(presence_client_one.members).to be_sync_complete expect(presence_client_one.members.send(:members).count).to eql(1) presence_client_one.leave data end @@ -2525,7 +2525,7 @@ def cripple_websocket_transport let(:member_data) { random_str } it 'immediately resends all local presence members (#RTP5c2, #RTP19a)' do - in_sync_confirmed_no_local_members = false + sync_complete_confirmed_no_local_members = false local_member_leave_event_fired = false presence_client_one.enter(member_data) @@ -2546,16 +2546,16 @@ def cripple_websocket_transport EventMachine.next_tick do expect(presence_client_one.members.length).to eql(1) expect(presence_client_one.members.local_members.length).to eql(1) - expect(in_sync_confirmed_no_local_members).to be_truthy + expect(sync_complete_confirmed_no_local_members).to be_truthy stop_reactor end end - presence_client_one.members.once(:in_sync) do + presence_client_one.members.once(:sync_complete) do # Immediately after SYNC (no sync actually occurred, but this event fires immediately after a channel SYNCs or is not expecting to SYNC) expect(presence_client_one.members.length).to eql(0) expect(presence_client_one.members.local_members.length).to eql(0) - in_sync_confirmed_no_local_members = true + sync_complete_confirmed_no_local_members = true end # ATTACHED ProtocolMessage with no presence flag will clear the presence set immediately, #RTP19a From a6afa8dbabd90bf174939aa6e0701d1768c98de2 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Tue, 28 May 2024 17:31:32 +0530 Subject: [PATCH 5/6] Updated local memebers to use client id as a key --- lib/ably/realtime/presence/members_map.rb | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/ably/realtime/presence/members_map.rb b/lib/ably/realtime/presence/members_map.rb index 9659fb53..8e9390cc 100644 --- a/lib/ably/realtime/presence/members_map.rb +++ b/lib/ably/realtime/presence/members_map.rb @@ -243,15 +243,15 @@ def setup_event_handlers # Listen for events that change the PresenceMap state and thus # need to be replicated to the local member set def update_local_member_state - new_local_members = members.select do |member_key, member| + new_local_members = members.select do |_, member| member.fetch(:message).connection_id == connection.id - end.each_with_object({}) do |(member_key, member), hash_object| - hash_object[member_key] = member.fetch(:message) + end.each_with_object({}) do |(_, member), hash_object| + hash_object[member.fetch(:client_id)] = member.fetch(:message) end - @local_members.reject do |member_key, message| + @local_members.reject do |member_key, _| new_local_members.keys.include?(member_key) - end.each do |member_key, message| + end.values.each do |message| re_enter_local_member_missing_from_presence_map message end From c33867e176fe4cd2afd39bca3473d1368fb127e1 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Mon, 3 Jun 2024 17:30:00 +0530 Subject: [PATCH 6/6] Implemented local members enter on channel attach. Removed unnecessary client initiated sync defined using resume_sync method. --- lib/ably/realtime/channel/channel_manager.rb | 12 +-- lib/ably/realtime/presence.rb | 18 +++- lib/ably/realtime/presence/members_map.rb | 91 ++++++------------- .../realtime/presence/presence_manager.rb | 29 +++--- 4 files changed, 54 insertions(+), 96 deletions(-) diff --git a/lib/ably/realtime/channel/channel_manager.rb b/lib/ably/realtime/channel/channel_manager.rb index 8c51580c..9c0fdc02 100644 --- a/lib/ably/realtime/channel/channel_manager.rb +++ b/lib/ably/realtime/channel/channel_manager.rb @@ -36,7 +36,7 @@ def attached(attached_protocol_message) # If no attached ProtocolMessage then this attached request was triggered by the client # library, such as returning to attached when detach has failed if attached_protocol_message - update_presence_sync_state_following_attached attached_protocol_message.has_presence_flag? + channel.presence.manager.on_attach attached_protocol_message.has_presence_flag? channel.properties.set_attach_serial(attached_protocol_message.channel_serial) channel.options.set_modes_from_flags(attached_protocol_message.flags) channel.options.set_params(attached_protocol_message.params) @@ -77,7 +77,7 @@ def duplicate_attached_received(protocol_message) reason: protocol_message.error, resumed: false, ) - update_presence_sync_state_following_attached protocol_message.has_presence_flag? + channel.presence.manager.on_attach protocol_message.has_presence_flag? end end @@ -253,14 +253,6 @@ def send_state_change_protocol_message(new_state, state_if_failed, message_optio ) end - def update_presence_sync_state_following_attached(has_presence_flag) - if has_presence_flag - channel.presence.manager.sync_expected - else - channel.presence.manager.sync_not_expected - end - end - def logger connection.logger end diff --git a/lib/ably/realtime/presence.rb b/lib/ably/realtime/presence.rb index 0c4fcd3c..2cf0fc2e 100644 --- a/lib/ably/realtime/presence.rb +++ b/lib/ably/realtime/presence.rb @@ -110,6 +110,15 @@ def enter_client(client_id, data = nil, &success_block) send_presence_action_for_client(Ably::Models::PresenceMessage::ACTION.Enter, client_id, data, &success_block) end + def enter_client_with_id(id, client_id, data = nil, &success_block) + ensure_supported_client_id client_id + ensure_supported_payload data + + send_presence_action_for_client(Ably::Models::PresenceMessage::ACTION.Enter, client_id, data, id, &success_block) + end + private :enter_client + + # Leave this client from this channel. This client will be removed from the presence # set and presence subscribers will see a leave message for this client. # @@ -338,8 +347,11 @@ def sync_complete? private # @return [Ably::Models::PresenceMessage] presence message is returned allowing callbacks to be added - def send_presence_protocol_message(presence_action, client_id, data) + def send_presence_protocol_message(presence_action, client_id, data, id = nil) presence_message = create_presence_message(presence_action, client_id, data) + unless id.nil? + presence_message.id = id + end unless presence_message.client_id raise Ably::Exceptions::Standard.new('Unable to enter create presence message without a client_id', 400, Ably::Exceptions::Codes::UNABLE_TO_ENTER_PRESENCE_CHANNEL_NO_CLIENTID) end @@ -433,13 +445,13 @@ def deferrable_fail(deferrable, *args, &block) deferrable end - def send_presence_action_for_client(action, client_id, data, &success_block) + def send_presence_action_for_client(action, client_id, data, id = nil, &success_block) requirements_failed_deferrable = ensure_presence_publishable_on_connection_deferrable return requirements_failed_deferrable if requirements_failed_deferrable deferrable = create_deferrable ensure_channel_attached(deferrable) do - send_presence_protocol_message(action, client_id, data).tap do |protocol_message| + send_presence_protocol_message(action, client_id, data, id).tap do |protocol_message| protocol_message.callback { |message| deferrable_succeed deferrable, &success_block } protocol_message.errback { |error| deferrable_fail deferrable, error } end diff --git a/lib/ably/realtime/presence/members_map.rb b/lib/ably/realtime/presence/members_map.rb index 8e9390cc..6ed001eb 100644 --- a/lib/ably/realtime/presence/members_map.rb +++ b/lib/ably/realtime/presence/members_map.rb @@ -146,7 +146,7 @@ def each(&block) # and thus the responsibility of this library to re-enter on the channel automatically if the # channel loses continuity # - # @return [Array] + # @return [Hash] # @api private def local_members @local_members @@ -209,18 +209,8 @@ def setup_event_handlers reset_local_members end - resume_sync_proc = method(:resume_sync).to_proc - unsafe_on(:sync_starting) do @sync_session_id += 1 - - channel.unsafe_once(:attached) do - connection.on_resume(&resume_sync_proc) - end - - unsafe_once(:sync_complete, :failed) do - connection.off_resume(&resume_sync_proc) - end end unsafe_on(:sync_none) do @@ -234,59 +224,31 @@ def setup_event_handlers clean_up_members_not_present_after_sync change_state :sync_complete end - - unsafe_on(:sync_complete) do - update_local_member_state - end end - # Listen for events that change the PresenceMap state and thus - # need to be replicated to the local member set - def update_local_member_state - new_local_members = members.select do |_, member| - member.fetch(:message).connection_id == connection.id - end.each_with_object({}) do |(_, member), hash_object| - hash_object[member.fetch(:client_id)] = member.fetch(:message) - end - - @local_members.reject do |member_key, _| - new_local_members.keys.include?(member_key) - end.values.each do |message| - re_enter_local_member_missing_from_presence_map message - end - - @local_members = new_local_members - end - - def re_enter_local_member_missing_from_presence_map(presence_message) - local_client_id = presence_message.client_id || client.auth.client_id - logger.debug { "#{self.class.name}: Manually re-entering local presence member, client ID: #{local_client_id} with data: #{presence_message.data}" } - presence.enter_client(local_client_id, presence_message.data).tap do |deferrable| - deferrable.errback do |error| - presence_message_client_id = presence_message.client_id || client.auth.client_id - re_enter_error = Ably::Models::ErrorInfo.new( - message: "unable to automatically re-enter presence channel for client_id '#{presence_message_client_id}'. Source error code #{error.code} and message '#{error.message}'", - code: Ably::Exceptions::Codes::UNABLE_TO_AUTOMATICALLY_REENTER_PRESENCE_CHANNEL - ) - channel.emit :update, Ably::Models::ChannelStateChange.new( - current: channel.state, - previous: channel.state, - event: Ably::Realtime::Channel::EVENT(:update), - reason: re_enter_error, - resumed: true - ) + def _enter_local_members + local_members.values.each do |member| + local_client_id = member.client_id || client.auth.client_id + logger.debug { "#{self.class.name}: Manually re-entering local presence member, client ID: #{local_client_id} with data: #{member.data}" } + presence.enter_client_with_id(member.id, local_client_id, member.data).tap do |deferrable| + deferrable.errback do |error| + presence_message_client_id = member.client_id || client.auth.client_id + re_enter_error = Ably::Models::ErrorInfo.new( + message: "unable to automatically re-enter presence channel for client_id '#{presence_message_client_id}'. Source error code #{error.code} and message '#{error.message}'", + code: Ably::Exceptions::Codes::UNABLE_TO_AUTOMATICALLY_REENTER_PRESENCE_CHANNEL + ) + channel.emit :update, Ably::Models::ChannelStateChange.new( + current: channel.state, + previous: channel.state, + event: Ably::Realtime::Channel::EVENT(:update), + reason: re_enter_error, + resumed: true + ) + end end end end - - # Trigger a manual SYNC operation to resume member synchronisation from last known cursor position - def resume_sync - connection.send_protocol_message( - action: Ably::Models::ProtocolMessage::ACTION.Sync.to_i, - channel: channel.name, - channel_serial: sync_serial - ) if channel.attached? - end + public :_enter_local_members def update_members_and_emit_events(presence_message) return unless ensure_presence_message_is_valid(presence_message) @@ -385,17 +347,16 @@ def touch_presence_member(presence_message) def member_set_upsert(presence_message, present) members[presence_message.member_key] = { present: present, message: presence_message, sync_session_id: sync_session_id } if presence_message.connection_id == connection.id - local_members[presence_message.member_key] = presence_message - logger.debug { "#{self.class.name}: Local member '#{presence_message.member_key}' added" } + local_members[presence_message.client_id] = presence_message + logger.debug { "#{self.class.name}: Local member '#{presence_message.client_id}' added" } end end def member_set_delete(presence_message) members.delete presence_message.member_key - if sync_complete? - # If not in SYNC, then local members missing may need to be re-entered - # Let #update_local_member_state handle missing members - local_members.delete presence_message.member_key + if sync_complete? and presence_message.connection_id == connection.id + local_members.delete presence_message.client_id + logger.debug { "#{self.class.name}: Local member '#{presence_message.client_id}' deleted" } end end diff --git a/lib/ably/realtime/presence/presence_manager.rb b/lib/ably/realtime/presence/presence_manager.rb index 54c27f9b..ef8f8db4 100644 --- a/lib/ably/realtime/presence/presence_manager.rb +++ b/lib/ably/realtime/presence/presence_manager.rb @@ -19,13 +19,17 @@ def initialize(presence) setup_channel_event_handlers end - # Expect SYNC ProtocolMessages from the server with a list of current members on this channel - # - # @return [void] - # - # @api private - def sync_expected - presence.members.change_state :sync_starting + def on_attach(has_presence_flag) + if has_presence_flag + # Expect SYNC ProtocolMessages from the server with a list of current members on this channel + presence.members.change_state :sync_starting + else + # There server has indicated that there are no SYNC ProtocolMessages to come because + # there are no members on this channel + logger.debug { "#{self.class.name}: Emitting leave events for all members as a SYNC is not expected and thus there are no members on the channel" } + presence.members.change_state :sync_none + end + presence.members._enter_local_members end # Process presence messages from SYNC messages. Sync can be server-initiated or triggered following ATTACH @@ -47,17 +51,6 @@ def sync_process_messages(serial, presence_messages) presence.members.change_state :finalizing_sync if presence.members.sync_serial_cursor_at_end? end - # There server has indicated that there are no SYNC ProtocolMessages to come because - # there are no members on this channel - # - # @return [void] - # - # @api private - def sync_not_expected - logger.debug { "#{self.class.name}: Emitting leave events for all members as a SYNC is not expected and thus there are no members on the channel" } - presence.members.change_state :sync_none - end - private def_delegators :presence, :members, :channel