Skip to content
Merged

RTL4j #341

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions lib/ably/realtime/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ class Channel
# @api private
attr_reader :manager

# Flag that specifies whether channel is resuming attachment(reattach) or is doing a 'clean attach' RTL4j1
# @return [Bolean]
# @api private
attr_reader :attach_resume

# ChannelOptions params attrribute (#RTL4k)
# return [Hash]
def_delegators :options, :params
Expand All @@ -116,6 +121,7 @@ def initialize(client, name, channel_options = {})
@manager = ChannelManager.new(self, client.connection)
@push = PushChannel.new(self)
@properties = ChannelProperties.new(self)
@attach_resume = false

setup_event_handlers
setup_presence
Expand Down Expand Up @@ -341,6 +347,16 @@ def logger
client.logger
end

# @api private
def attach_resume!
@attach_resume = true
end

# @api private
def reset_attach_resume!
@attach_resume = false
end

# As we are using a state machine, do not allow change_state to be used
# #transition_state_machine must be used instead
private :change_state
Expand Down
5 changes: 4 additions & 1 deletion lib/ably/realtime/channel/channel_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,11 @@ def channel_retry_timeout

def send_attach_protocol_message
message_options = {}
message_options[:flags] = channel.options.modes_to_flags if channel.options.modes
message_options[:params] = channel.options.params if channel.options.params.any?
message_options[:flags] = channel.options.modes_to_flags if channel.options.modes
if channel.attach_resume
message_options[:flags] = message_options[:flags].to_i | Ably::Models::ProtocolMessage::ATTACH_FLAGS_MAPPING[:resume]
end

send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Attach, :suspended, message_options
end
Expand Down
5 changes: 5 additions & 0 deletions lib/ably/realtime/channel/channel_state_machine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ class ChannelStateMachine

before_transition(to: [:attached]) do |channel, current_transition|
channel.manager.attached current_transition.metadata.protocol_message
channel.attach_resume!
end

before_transition(to: [:detaching, :failed]) do |channel, _current_transition|
channel.reset_attach_resume!
end

after_transition(to: [:detaching]) do |channel, current_transition|
Expand Down
149 changes: 124 additions & 25 deletions spec/acceptance/realtime/channel_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,25 @@ def disconnect_transport
channel.attach
end
end

context 'when channel is reattaching' do
it 'sends ATTACH_RESUME flag along with other modes (RTL4j)' do
channel.attach do
channel.on(:suspended) do
client.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
next if protocol_message.action != :attach

expect(protocol_message.has_attach_resume_flag?).to eq(true)
expect(protocol_message.has_attach_publish_flag?).to eq(true)
stop_reactor
end

client.connection.connect
end
client.connection.transition_state_machine :suspended
end
end
end
end

context 'context when channel options contain params' do
Expand Down Expand Up @@ -437,6 +456,42 @@ def disconnect_transport
end
end
end

describe 'clean attach (RTL4j)' do
context "when channel wasn't previously attached" do
it "doesn't send ATTACH_RESUME" do
client.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
next if protocol_message.action != :attach

expect(protocol_message.has_attach_resume_flag?).to eq(false)
stop_reactor
end

channel.attach
end
end

context "when channel was explicitly detached" do
it "doesn't send ATTACH_RESUME" do
channel.once(:detached) do
client.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
next if protocol_message.action != :attach

expect(protocol_message.has_attach_resume_flag?).to eq(false)
stop_reactor
end

channel.attach
end

channel.once(:attached) do
channel.detach
end

channel.attach
end
end
end
end

describe '#detach' do
Expand Down Expand Up @@ -2005,15 +2060,33 @@ def fake_error(error)
end
end

it 'transitions state automatically to :attaching once the connection is re-established (#RTN15c3)' do
channel.attach do
channel.on(:suspended) do
client.connection.connect
channel.once(:attached) do
stop_reactor
describe 'reattaching (#RTN15c3)' do
it 'transitions state automatically to :attaching once the connection is re-established ' do
channel.attach do
channel.on(:suspended) do
client.connection.connect
channel.once(:attached) do
stop_reactor
end
end
client.connection.transition_state_machine :suspended
end
end

it 'sends ATTACH_RESUME flag when reattaching (RTL4j)' do
channel.attach do
channel.on(:suspended) do
client.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
next if protocol_message.action != :attach

expect(protocol_message.has_attach_resume_flag?).to eq(true)
stop_reactor
end

client.connection.connect
end
client.connection.transition_state_machine :suspended
end
client.connection.transition_state_machine :suspended
end
end
end
Expand Down Expand Up @@ -2209,14 +2282,18 @@ def fake_error(error)
{ modes: modes }
end

shared_examples 'an update that sends ATTACH message' do |state|
def self.build_flags(flags)
flags.map { |flag| Ably::Models::ProtocolMessage::ATTACH_FLAGS_MAPPING[flag] }.reduce(:|)
end

shared_examples 'an update that sends ATTACH message' do |state, flags|
it 'sends an ATTACH message on options change' do
attach_sent = nil

client.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
if protocol_message.action == :attach && protocol_message.flags.nonzero?
attach_sent = true
expect(protocol_message.flags).to eq(262144)
expect(protocol_message.flags).to eq(flags)
end
end

Expand All @@ -2238,11 +2315,11 @@ def fake_error(error)
end

context 'when channel is attaching' do
it_behaves_like 'an update that sends ATTACH message', :attaching
it_behaves_like 'an update that sends ATTACH message', :attaching, build_flags(%i[subscribe])
end

context 'when channel is attaching' do
it_behaves_like 'an update that sends ATTACH message', :attached
it_behaves_like 'an update that sends ATTACH message', :attached, build_flags(%i[resume subscribe])
end

context 'when channel is initialized' do
Expand Down Expand Up @@ -2502,38 +2579,60 @@ def fake_error(error)
end

context 'and channel is attached' do
it 'reattaches immediately (#RTL13a)' do
channel.attach do
it 'reattaches immediately (#RTL13a) with ATTACH_RESUME flag(RTL4j)' do
resume_flag = false

channel.once(:attached) do
client.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
next if protocol_message.action != :attach

resume_flag = protocol_message.has_attach_resume_flag?
end

channel.once(:attaching) do |state_change|
expect(state_change.reason.code).to eql(50505)
channel.once(:attached) do
expect(resume_flag).to eq(true)
stop_reactor
end
end

detach_message = Ably::Models::ProtocolMessage.new(action: detached_action, channel: channel_name, error: { code: 50505 })
client.connection.__incoming_protocol_msgbus__.publish :protocol_message, detach_message
end

channel.attach
end
end

context 'and channel is suspended' do
it 'reattaches immediately (#RTL13a)' do
channel.attach do
channel.once(:suspended) do
channel.once(:attaching) do |state_change|
expect(state_change.reason.code).to eql(50505)
channel.once(:attached) do
stop_reactor
end
end
it 'reattaches immediately (#RTL13a) with ATTACH_RESUME flag(RTL4j)' do
resume_flag = false

detach_message = Ably::Models::ProtocolMessage.new(action: detached_action, channel: channel_name, error: { code: 50505 })
client.connection.__incoming_protocol_msgbus__.publish :protocol_message, detach_message
channel.once(:attached) do
channel.transition_state_machine! :suspended
end

channel.once(:suspended) do
client.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
next if protocol_message.action != :attach

resume_flag = protocol_message.has_attach_resume_flag?
end

channel.transition_state_machine! :suspended
channel.once(:attaching) do |state_change|
expect(state_change.reason.code).to eql(50505)
channel.once(:attached) do
expect(resume_flag).to eq(true)
stop_reactor
end
end

detach_message = Ably::Models::ProtocolMessage.new(action: detached_action, channel: channel_name, error: { code: 50505 })
client.connection.__incoming_protocol_msgbus__.publish :protocol_message, detach_message
end

channel.attach
end

context 'when connection is no longer connected' do
Expand Down