diff --git a/lib/ably/realtime/channel.rb b/lib/ably/realtime/channel.rb index f52aeece1..b2b110c7c 100644 --- a/lib/ably/realtime/channel.rb +++ b/lib/ably/realtime/channel.rb @@ -93,6 +93,11 @@ class Channel # @api private attr_reader :manager + # Flag that specifies whether channel is resuming attachment(reattach) or is doing a 'clean attach' RTL4j1 + # @return [Bolean] + # @api private + attr_reader :attach_resume + # ChannelOptions params attrribute (#RTL4k) # return [Hash] def_delegators :options, :params @@ -116,6 +121,7 @@ def initialize(client, name, channel_options = {}) @manager = ChannelManager.new(self, client.connection) @push = PushChannel.new(self) @properties = ChannelProperties.new(self) + @attach_resume = false setup_event_handlers setup_presence @@ -341,6 +347,16 @@ def logger client.logger end + # @api private + def attach_resume! + @attach_resume = true + end + + # @api private + def reset_attach_resume! + @attach_resume = false + end + # As we are using a state machine, do not allow change_state to be used # #transition_state_machine must be used instead private :change_state diff --git a/lib/ably/realtime/channel/channel_manager.rb b/lib/ably/realtime/channel/channel_manager.rb index 5cec145be..5b1761087 100644 --- a/lib/ably/realtime/channel/channel_manager.rb +++ b/lib/ably/realtime/channel/channel_manager.rb @@ -203,8 +203,11 @@ def channel_retry_timeout def send_attach_protocol_message message_options = {} - message_options[:flags] = channel.options.modes_to_flags if channel.options.modes message_options[:params] = channel.options.params if channel.options.params.any? + message_options[:flags] = channel.options.modes_to_flags if channel.options.modes + if channel.attach_resume + message_options[:flags] = message_options[:flags].to_i | Ably::Models::ProtocolMessage::ATTACH_FLAGS_MAPPING[:resume] + end send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Attach, :suspended, message_options end diff --git a/lib/ably/realtime/channel/channel_state_machine.rb b/lib/ably/realtime/channel/channel_state_machine.rb index 12526e18a..fb91deee1 100644 --- a/lib/ably/realtime/channel/channel_state_machine.rb +++ b/lib/ably/realtime/channel/channel_state_machine.rb @@ -42,6 +42,11 @@ class ChannelStateMachine before_transition(to: [:attached]) do |channel, current_transition| channel.manager.attached current_transition.metadata.protocol_message + channel.attach_resume! + end + + before_transition(to: [:detaching, :failed]) do |channel, _current_transition| + channel.reset_attach_resume! end after_transition(to: [:detaching]) do |channel, current_transition| diff --git a/spec/acceptance/realtime/channel_spec.rb b/spec/acceptance/realtime/channel_spec.rb index f3a6161d4..d2c4250e1 100644 --- a/spec/acceptance/realtime/channel_spec.rb +++ b/spec/acceptance/realtime/channel_spec.rb @@ -134,6 +134,25 @@ def disconnect_transport channel.attach end end + + context 'when channel is reattaching' do + it 'sends ATTACH_RESUME flag along with other modes (RTL4j)' do + channel.attach do + channel.on(:suspended) do + client.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| + next if protocol_message.action != :attach + + expect(protocol_message.has_attach_resume_flag?).to eq(true) + expect(protocol_message.has_attach_publish_flag?).to eq(true) + stop_reactor + end + + client.connection.connect + end + client.connection.transition_state_machine :suspended + end + end + end end context 'context when channel options contain params' do @@ -437,6 +456,42 @@ def disconnect_transport end end end + + describe 'clean attach (RTL4j)' do + context "when channel wasn't previously attached" do + it "doesn't send ATTACH_RESUME" do + client.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| + next if protocol_message.action != :attach + + expect(protocol_message.has_attach_resume_flag?).to eq(false) + stop_reactor + end + + channel.attach + end + end + + context "when channel was explicitly detached" do + it "doesn't send ATTACH_RESUME" do + channel.once(:detached) do + client.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| + next if protocol_message.action != :attach + + expect(protocol_message.has_attach_resume_flag?).to eq(false) + stop_reactor + end + + channel.attach + end + + channel.once(:attached) do + channel.detach + end + + channel.attach + end + end + end end describe '#detach' do @@ -2005,15 +2060,33 @@ def fake_error(error) end end - it 'transitions state automatically to :attaching once the connection is re-established (#RTN15c3)' do - channel.attach do - channel.on(:suspended) do - client.connection.connect - channel.once(:attached) do - stop_reactor + describe 'reattaching (#RTN15c3)' do + it 'transitions state automatically to :attaching once the connection is re-established ' do + channel.attach do + channel.on(:suspended) do + client.connection.connect + channel.once(:attached) do + stop_reactor + end end + client.connection.transition_state_machine :suspended + end + end + + it 'sends ATTACH_RESUME flag when reattaching (RTL4j)' do + channel.attach do + channel.on(:suspended) do + client.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| + next if protocol_message.action != :attach + + expect(protocol_message.has_attach_resume_flag?).to eq(true) + stop_reactor + end + + client.connection.connect + end + client.connection.transition_state_machine :suspended end - client.connection.transition_state_machine :suspended end end end @@ -2209,14 +2282,18 @@ def fake_error(error) { modes: modes } end - shared_examples 'an update that sends ATTACH message' do |state| + def self.build_flags(flags) + flags.map { |flag| Ably::Models::ProtocolMessage::ATTACH_FLAGS_MAPPING[flag] }.reduce(:|) + end + + shared_examples 'an update that sends ATTACH message' do |state, flags| it 'sends an ATTACH message on options change' do attach_sent = nil client.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| if protocol_message.action == :attach && protocol_message.flags.nonzero? attach_sent = true - expect(protocol_message.flags).to eq(262144) + expect(protocol_message.flags).to eq(flags) end end @@ -2238,11 +2315,11 @@ def fake_error(error) end context 'when channel is attaching' do - it_behaves_like 'an update that sends ATTACH message', :attaching + it_behaves_like 'an update that sends ATTACH message', :attaching, build_flags(%i[subscribe]) end context 'when channel is attaching' do - it_behaves_like 'an update that sends ATTACH message', :attached + it_behaves_like 'an update that sends ATTACH message', :attached, build_flags(%i[resume subscribe]) end context 'when channel is initialized' do @@ -2502,11 +2579,20 @@ def fake_error(error) end context 'and channel is attached' do - it 'reattaches immediately (#RTL13a)' do - channel.attach do + it 'reattaches immediately (#RTL13a) with ATTACH_RESUME flag(RTL4j)' do + resume_flag = false + + channel.once(:attached) do + client.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| + next if protocol_message.action != :attach + + resume_flag = protocol_message.has_attach_resume_flag? + end + channel.once(:attaching) do |state_change| expect(state_change.reason.code).to eql(50505) channel.once(:attached) do + expect(resume_flag).to eq(true) stop_reactor end end @@ -2514,26 +2600,39 @@ def fake_error(error) detach_message = Ably::Models::ProtocolMessage.new(action: detached_action, channel: channel_name, error: { code: 50505 }) client.connection.__incoming_protocol_msgbus__.publish :protocol_message, detach_message end + + channel.attach end end context 'and channel is suspended' do - it 'reattaches immediately (#RTL13a)' do - channel.attach do - channel.once(:suspended) do - channel.once(:attaching) do |state_change| - expect(state_change.reason.code).to eql(50505) - channel.once(:attached) do - stop_reactor - end - end + it 'reattaches immediately (#RTL13a) with ATTACH_RESUME flag(RTL4j)' do + resume_flag = false - detach_message = Ably::Models::ProtocolMessage.new(action: detached_action, channel: channel_name, error: { code: 50505 }) - client.connection.__incoming_protocol_msgbus__.publish :protocol_message, detach_message + channel.once(:attached) do + channel.transition_state_machine! :suspended + end + + channel.once(:suspended) do + client.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| + next if protocol_message.action != :attach + + resume_flag = protocol_message.has_attach_resume_flag? end - channel.transition_state_machine! :suspended + channel.once(:attaching) do |state_change| + expect(state_change.reason.code).to eql(50505) + channel.once(:attached) do + expect(resume_flag).to eq(true) + stop_reactor + end + end + + detach_message = Ably::Models::ProtocolMessage.new(action: detached_action, channel: channel_name, error: { code: 50505 }) + client.connection.__incoming_protocol_msgbus__.publish :protocol_message, detach_message end + + channel.attach end context 'when connection is no longer connected' do