Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions lib/ably/models/channel_options.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
module Ably::Models
# Convert token details argument to a {ChannelOptions} object
#
# @param attributes (see #initialize)
#
# @return [ChannelOptions]
def self.ChannelOptions(attributes)
case attributes
when ChannelOptions
return attributes
else
ChannelOptions.new(attributes)
end
end

# Represents options of a channel
class ChannelOptions
extend Ably::Modules::Enum
extend Forwardable
include Ably::Modules::ModelCommon

MODES = ruby_enum('MODES',
presence: 0,
publish: 1,
subscribe: 2,
presence_subscribe: 3
)

attr_reader :attributes

alias_method :to_h, :attributes

def_delegators :attributes, :fetch, :size, :empty?
# Initialize a new ChannelOptions
#
# @option params [Hash] (TB2c) params (for realtime client libraries only) a of key/value pairs
# @option modes [Hash] modes (for realtime client libraries only) an array of ChannelMode
# @option cipher [Hash,Ably::Models::CipherParams] :cipher A hash of options or a {Ably::Models::CipherParams} to configure the encryption. *:key* is required, all other options are optional.
#
def initialize(attrs)
@attributes = IdiomaticRubyWrapper(attrs.clone)

attributes[:modes] = modes.to_a.map { |mode| Ably::Models::ChannelOptions::MODES[mode] } if modes
attributes[:cipher] = Ably::Models::CipherParams(cipher) if cipher
attributes.clone
end

# @!attribute cipher
#
# @return [CipherParams]
def cipher
attributes[:cipher]
end

# @!attribute params
#
# @return [Hash]
def params
attributes[:params].to_h
end

# @!attribute modes
#
# @return [Array<ChannelOptions::MODES>]
def modes
attributes[:modes]
end

# Converts modes to a bitfield that coresponds to ProtocolMessage#flags
#
# @return [Integer]
def modes_to_flags
modes.map { |mode| Ably::Models::ProtocolMessage::ATTACH_FLAGS_MAPPING[mode.to_sym] }.reduce(:|)
end

# @return [Hash]
# @api private
def set_params(hash)
attributes[:params] = hash
end

# Sets modes from ProtocolMessage#flags
#
# @return [Array<ChannelOptions::MODES>]
# @api private
def set_modes_from_flags(flags)
return unless flags

message_modes = MODES.select do |mode|
flag = Ably::Models::ProtocolMessage::ATTACH_FLAGS_MAPPING[mode.to_sym]
flags & flag == flag
end

attributes[:modes] = message_modes.map { |mode| Ably::Models::ChannelOptions::MODES[mode] }
end
end
end
4 changes: 4 additions & 0 deletions lib/ably/models/idiomatic_ruby_wrapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ def size
attributes.size
end

def empty?
attributes.empty?
end

def keys
map { |key, value| key }
end
Expand Down
22 changes: 17 additions & 5 deletions lib/ably/models/protocol_message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ class ProtocolMessage
auth: 17
)

ATTACH_FLAGS_MAPPING = {
resume: 32, # 2^5
presence: 65536, # 2^16
publish: 131072, # 2^17
subscribe: 262144, # 2^18
presence_subscribe: 524288, # 2^19
}

# Indicates this protocol message action will generate an ACK response such as :message or :presence
# @api private
def self.ack_required?(for_action)
Expand Down Expand Up @@ -185,6 +193,10 @@ def has_correct_message_size?
message_size <= connection_details.max_message_size
end

def params
@params ||= attributes[:params].to_h
end

def flags
Integer(attributes[:flags])
rescue TypeError
Expand Down Expand Up @@ -218,27 +230,27 @@ def has_transient_flag?

# @api private
def has_attach_resume_flag?
flags & 32 == 32 # 2^5
flags & ATTACH_FLAGS_MAPPING[:resume] == ATTACH_FLAGS_MAPPING[:resume] # 2^5
end

# @api private
def has_attach_presence_flag?
flags & 65536 == 65536 # 2^16
flags & ATTACH_FLAGS_MAPPING[:presence] == ATTACH_FLAGS_MAPPING[:presence] # 2^16
end

# @api private
def has_attach_publish_flag?
flags & 131072 == 131072 # 2^17
flags & ATTACH_FLAGS_MAPPING[:publish] == ATTACH_FLAGS_MAPPING[:publish] # 2^17
end

# @api private
def has_attach_subscribe_flag?
flags & 262144 == 262144 # 2^18
flags & ATTACH_FLAGS_MAPPING[:subscribe] == ATTACH_FLAGS_MAPPING[:subscribe] # 2^18
end

# @api private
def has_attach_presence_subscribe_flag?
flags & 524288 == 524288 # 2^19
flags & ATTACH_FLAGS_MAPPING[:presence_subscribe] == ATTACH_FLAGS_MAPPING[:presence_subscribe] # 2^19
end

def connection_details
Expand Down
24 changes: 22 additions & 2 deletions lib/ably/modules/channels_collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,21 @@ def initialize(client, channel_klass)
# Return a Channel for the given name
#
# @param name [String] The name of the channel
# @param channel_options [Hash] Channel options including the encryption options
# @param channel_options [Hash, Ably::Models::ChannelOptions] A hash of options or a {Ably::Models::ChannelOptions}
#
# @return [Channel]
#
def get(name, channel_options = {})
if channels.has_key?(name)
channels[name].tap do |channel|
channel.update_options channel_options if channel_options && !channel_options.empty?
if channel_options && !channel_options.empty?
if channel.respond_to?(:need_reattach?) && channel.need_reattach?
raise_implicit_options_update
else
warn_implicit_options_update
channel.options = channel_options
end
end
end
else
channels[name] ||= channel_klass.new(client, name, channel_options)
Expand Down Expand Up @@ -70,6 +77,19 @@ def each(&block)
end

private

def raise_implicit_options_update
raise ArgumentError, "You are trying to indirectly update channel options which will trigger reattachment of the channel. Please use Channel#set_options directly if you wish to continue"
end

def warn_implicit_options_update
logger.warn { "Channels#get: Using this method to update channel options is deprecated and may be removed in a future version of ably-ruby. Please use Channel#setOptions instead" }
end

def logger
client.logger
end

def client
@client
end
Expand Down
32 changes: 22 additions & 10 deletions lib/ably/realtime/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class Channel
include Ably::Modules::MessageEmitter
include Ably::Realtime::Channel::Publisher
extend Ably::Modules::Enum
extend Forwardable

# ChannelState
# The permited states for this channel
Expand Down Expand Up @@ -92,17 +93,20 @@ class Channel
# @api private
attr_reader :manager

# ChannelOptions params attrribute (#RTL4k)
# return [Hash]
def_delegators :options, :params

# Initialize a new Channel object
#
# @param client [Ably::Rest::Client]
# @param name [String] The name of the channel
# @param channel_options [Hash] Channel options, currently reserved for Encryption options
# @option channel_options [Hash,Ably::Models::CipherParams] :cipher A hash of options or a {Ably::Models::CipherParams} to configure the encryption. *:key* is required, all other options are optional. See {Ably::Util::Crypto#initialize} for a list of +:cipher+ options
# @param channel_options [Hash, Ably::Models::ChannelOptions] A hash of options or a {Ably::Models::ChannelOptions}
#
def initialize(client, name, channel_options = {})
name = ensure_utf_8(:name, name)

update_options channel_options
@options = Ably::Models::ChannelOptions(channel_options)
@client = client
@name = name
@queue = []
Expand Down Expand Up @@ -309,6 +313,16 @@ def __incoming_msgbus__
)
end

# Sets or updates the stored channel options. (#RTL16)
# @param channel_options [Hash, Ably::Models::ChannelOptions] A hash of options or a {Ably::Models::ChannelOptions}
# @return [Ably::Models::ChannelOptions]
def set_options(channel_options)
@options = Ably::Models::ChannelOptions(channel_options)

manager.request_reattach if need_reattach?
end
alias options= set_options

# @api private
def set_channel_error_reason(error)
@error_reason = error
Expand All @@ -319,13 +333,6 @@ def clear_error_reason
@error_reason = nil
end

# @api private
def update_options(channel_options)
@options = channel_options.clone.freeze
end
alias set_options update_options # (RSL7)
alias options= update_options

# Used by {Ably::Modules::StateEmitter} to debug state changes
# @api private
def logger
Expand All @@ -336,7 +343,12 @@ def logger
# #transition_state_machine must be used instead
private :change_state

def need_reattach?
!!(attaching? || attached?) && !!(options.modes || options.params)
end

private

def setup_event_handlers
__incoming_msgbus__.subscribe(:message) do |message|
message.decode(client.encoders, options) do |encode_error, error_message|
Expand Down
17 changes: 13 additions & 4 deletions lib/ably/realtime/channel/channel_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ def attached(attached_protocol_message)
if attached_protocol_message
update_presence_sync_state_following_attached attached_protocol_message
channel.properties.set_attach_serial(attached_protocol_message.channel_serial)
channel.options.set_modes_from_flags(attached_protocol_message.flags)
channel.options.set_params(attached_protocol_message.params)
end
end

Expand All @@ -64,6 +66,7 @@ def duplicate_attached_received(protocol_message)
end

channel.properties.set_attach_serial(protocol_message.channel_serial)
channel.options.set_modes_from_flags(protocol_message.flags)

if protocol_message.has_channel_resumed_flag?
logger.debug { "ChannelManager: Additional resumed ATTACHED message received for #{channel.state} channel '#{channel.name}'" }
Expand Down Expand Up @@ -199,14 +202,18 @@ def channel_retry_timeout
end

def send_attach_protocol_message
send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Attach, :suspended # move to suspended
message_options = {}
message_options[:flags] = channel.options.modes_to_flags if channel.options.modes
message_options[:params] = channel.options.params if channel.options.params.any?

send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Attach, :suspended, message_options
end

def send_detach_protocol_message(previous_state)
send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Detach, previous_state # return to previous state if failed
end

def send_state_change_protocol_message(new_state, state_if_failed)
def send_state_change_protocol_message(new_state, state_if_failed, message_options = {})
state_at_time_of_request = channel.state
@pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do
if channel.state == state_at_time_of_request
Expand All @@ -227,7 +234,8 @@ def send_state_change_protocol_message(new_state, state_if_failed)
next unless pending_state_change_timer
connection.send_protocol_message(
action: new_state.to_i,
channel: channel.name
channel: channel.name,
**message_options.to_h
)
resend_if_disconnected_and_connected.call
end
Expand All @@ -237,7 +245,8 @@ def send_state_change_protocol_message(new_state, state_if_failed)

connection.send_protocol_message(
action: new_state.to_i,
channel: channel.name
channel: channel.name,
**message_options.to_h
)
end

Expand Down
2 changes: 1 addition & 1 deletion lib/ably/realtime/channels.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def initialize(client)
# Return a {Ably::Realtime::Channel} for the given name
#
# @param name [String] The name of the channel
# @param channel_options [Hash] Channel options, currently reserved for Encryption options
# @param channel_options [Hash, Ably::Models::ChannelOptions] A hash of options or a {Ably::Models::ChannelOptions}
# @return [Ably::Realtime::Channel}
#
def get(*args)
Expand Down
17 changes: 9 additions & 8 deletions lib/ably/rest/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@ class Channel
#
# @param client [Ably::Rest::Client]
# @param name [String] The name of the channel
# @param channel_options [Hash] Channel options, currently reserved for Encryption options
# @option channel_options [Hash,Ably::Models::CipherParams] :cipher A hash of options or a {Ably::Models::CipherParams} to configure the encryption. *:key* is required, all other options are optional. See {Ably::Util::Crypto#initialize} for a list of +:cipher+ options
# @param channel_options [Hash, Ably::Models::ChannelOptions] A hash of options or a {Ably::Models::ChannelOptions}
#
def initialize(client, name, channel_options = {})
name = (ensure_utf_8 :name, name)

update_options channel_options
@options = Ably::Models::ChannelOptions(channel_options)
@client = client
@name = name
@push = PushChannel.new(self)
Expand Down Expand Up @@ -164,14 +163,16 @@ def presence
@presence ||= Presence.new(client, self)
end

# @api private
def update_options(channel_options)
@options = channel_options.clone.freeze
# Sets or updates the stored channel options. (#RSL7)
# @param channel_options [Hash, Ably::Models::ChannelOptions] A hash of options or a {Ably::Models::ChannelOptions}
# @return [Ably::Models::ChannelOptions]
def set_options(channel_options)
@options = Ably::Models::ChannelOptions(channel_options)
end
alias set_options update_options # (RSL7)
alias options= update_options
alias options= set_options

private

def base_path
"/channels/#{URI.encode_www_form_component(name)}"
end
Expand Down
Loading