diff --git a/README.md b/README.md index bb961c707..97edeb6a7 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ _[Ably](https://ably.com) is the platform that powers synchronized digital experiences in realtime. Whether attending an event in a virtual venue, receiving realtime financial information, or monitoring live car performance data – consumers simply expect realtime digital experiences as standard. Ably provides a suite of APIs to build, extend, and deliver powerful digital experiences in realtime for more than 250 million devices across 80 countries each month. Organizations like Bloomberg, HubSpot, Verizon, and Hopin depend on Ably’s platform to offload the growing complexity of business-critical realtime data synchronization at global scale. For more information, see the [Ably documentation](https://ably.com/documentation)._ -This is a Ruby client library for Ably. The library currently targets the [Ably 1.2 client library specification](https://ably.com/documentation/client-lib-development-guide/features/). You can see the complete list of features this client library supports in [our client library SDKs feature support matrix](https://ably.com/download/sdk-feature-support-matrix). +This is a Ruby client library for Ably. The library currently targets the [Ably 2.0.0 client library specification](https://ably.com/documentation/client-lib-development-guide/features/). You can see the complete list of features this client library supports in [our client library SDKs feature support matrix](https://ably.com/download/sdk-feature-support-matrix). ## Supported platforms diff --git a/lib/ably/models/protocol_message.rb b/lib/ably/models/protocol_message.rb index 8490ebc33..0088fded7 100644 --- a/lib/ably/models/protocol_message.rb +++ b/lib/ably/models/protocol_message.rb @@ -20,8 +20,6 @@ module Ably::Models # @return [String] Contains a serial number for a message on the current channel # @!attribute [r] connection_id # @return [String] Contains a string private connection key used to recover this connection - # @!attribute [r] connection_serial - # @return [Bignum] Contains a serial number for a message sent from the server to the client # @!attribute [r] message_serial # @return [Bignum] Contains a serial number for a message sent from the client to the server # @!attribute [r] timestamp @@ -129,12 +127,6 @@ def message_serial raise TypeError, "msg_serial '#{attributes[:msg_serial]}' is invalid, a positive Integer is expected for a ProtocolMessage" end - def connection_serial - Integer(attributes[:connection_serial]) - rescue TypeError - raise TypeError, "connection_serial '#{attributes[:connection_serial]}' is invalid, a positive Integer is expected for a ProtocolMessage" - end - def count [1, attributes[:count].to_i].max end @@ -146,26 +138,12 @@ def has_message_serial? false end - # @api private - def has_connection_serial? - connection_serial && true + def has_channel_serial? + channel_serial && true rescue TypeError false end - def serial - if has_connection_serial? - connection_serial - else - message_serial - end - end - - # @api private - def has_serial? - has_connection_serial? || has_message_serial? - end - def messages @messages ||= Array(attributes[:messages]).map do |message| @@ -271,7 +249,7 @@ def attributes # Return a JSON ready object from the underlying #attributes using Ably naming conventions for keys def as_json(*args) raise TypeError, ':action is missing, cannot generate a valid Hash for ProtocolMessage' unless action - raise TypeError, ':msg_serial or :connection_serial is missing, cannot generate a valid Hash for ProtocolMessage' if ack_required? && !has_serial? + raise TypeError, ':msg_serial is missing, cannot generate a valid Hash for ProtocolMessage' if ack_required? && !has_message_serial? attributes.dup.tap do |hash_object| hash_object['action'] = action.to_i @@ -296,11 +274,12 @@ def to_s end # True if the ProtocolMessage appears to be invalid, however this is not a guarantee + # Used for validating incoming protocol messages, so no need to add unnecessary checks # @return [Boolean] # @api private def invalid? action_enum = action rescue nil - !action_enum || (ack_required? && !has_serial?) + !action_enum end # @!attribute [r] logger diff --git a/lib/ably/modules/safe_deferrable.rb b/lib/ably/modules/safe_deferrable.rb index c011b4375..1849544db 100644 --- a/lib/ably/modules/safe_deferrable.rb +++ b/lib/ably/modules/safe_deferrable.rb @@ -39,7 +39,7 @@ def errback(&block) end end - # Mark the Deferrable as succeeded and trigger all callbacks. + # Mark the Deferrable as succeeded and trigger all success callbacks. # See http://www.rubydoc.info/gems/eventmachine/1.0.7/EventMachine/Deferrable#succeed-instance_method # # @return [void] @@ -48,7 +48,7 @@ def succeed(*args) super(*args) end - # Mark the Deferrable as failed and trigger all callbacks. + # Mark the Deferrable as failed and trigger all error callbacks. # See http://www.rubydoc.info/gems/eventmachine/1.0.7/EventMachine/Deferrable#fail-instance_method # # @return [void] diff --git a/lib/ably/modules/state_emitter.rb b/lib/ably/modules/state_emitter.rb index c638cb9a4..2a5f8b516 100644 --- a/lib/ably/modules/state_emitter.rb +++ b/lib/ably/modules/state_emitter.rb @@ -3,7 +3,7 @@ module Ably::Modules # the instance variable @state is used exclusively, the {Enum} STATE is defined prior to inclusion of this # module, and the class is an {EventEmitter}. It then emits state changes. # - # It also ensures the EventEmitter is configured to retrict permitted events to the + # It also ensures the EventEmitter is configured to restrict permitted events to the # the available STATEs or EVENTs if defined i.e. if EVENTs includes an additional type such as # :update, then it will support all EVENTs being emitted. EVENTs must be a superset of STATEs # diff --git a/lib/ably/realtime/channel.rb b/lib/ably/realtime/channel.rb index 6192cf666..162b153bf 100644 --- a/lib/ably/realtime/channel.rb +++ b/lib/ably/realtime/channel.rb @@ -42,7 +42,7 @@ class Channel # # @spec RTL2b # - # The permited states for this channel + # The permitted states for this channel STATE = ruby_enum('STATE', :initialized, :attaching, diff --git a/lib/ably/realtime/channel/channel_manager.rb b/lib/ably/realtime/channel/channel_manager.rb index 5b1761087..d10bc66ce 100644 --- a/lib/ably/realtime/channel/channel_manager.rb +++ b/lib/ably/realtime/channel/channel_manager.rb @@ -209,6 +209,7 @@ def send_attach_protocol_message message_options[:flags] = message_options[:flags].to_i | Ably::Models::ProtocolMessage::ATTACH_FLAGS_MAPPING[:resume] end + message_options[:channelSerial] = channel.properties.channel_serial # RTL4c1 send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Attach, :suspended, message_options end diff --git a/lib/ably/realtime/channel/channel_properties.rb b/lib/ably/realtime/channel/channel_properties.rb index 26ddf2622..60f1911a8 100644 --- a/lib/ably/realtime/channel/channel_properties.rb +++ b/lib/ably/realtime/channel/channel_properties.rb @@ -18,6 +18,15 @@ class ChannelProperties # attr_reader :attach_serial + # ChannelSerial contains the channelSerial from latest ProtocolMessage of action type + # Message/PresenceMessage received on the channel. + # + # @spec CP2b, RTL15b + # + # @return [String] + # + attr_accessor :channel_serial + def initialize(channel) @channel = channel end diff --git a/lib/ably/realtime/channel/channel_state_machine.rb b/lib/ably/realtime/channel/channel_state_machine.rb index fb91deee1..a346fec28 100644 --- a/lib/ably/realtime/channel/channel_state_machine.rb +++ b/lib/ably/realtime/channel/channel_state_machine.rb @@ -55,6 +55,7 @@ class ChannelStateMachine end after_transition(to: [:detached, :failed, :suspended]) do |channel, current_transition| + channel.properties.channel_serial = nil # RTP5a1 err = error_from_state_change(current_transition) channel.manager.fail_queued_messages(err) if channel.failed? or channel.suspended? #RTL11 channel.manager.log_channel_error err if err diff --git a/lib/ably/realtime/channels.rb b/lib/ably/realtime/channels.rb index 9b814db12..e7cf788dd 100644 --- a/lib/ably/realtime/channels.rb +++ b/lib/ably/realtime/channels.rb @@ -46,6 +46,26 @@ def release(channel) @channels.delete(channel) end if @channels.has_key?(channel) end + + # Sets channel serial to each channel from given serials hashmap + # @param [Hash] serials - map of channel name to respective channel serial + # @api private + def set_channel_serials(serials) + serials.each do |channel_name, channel_serial| + get(channel_name).properties.channel_serial = channel_serial + end + end + + # @return [Hash] serials - map of channel name to respective channel serial + # @api private + def get_channel_serials + channel_serials = {} + self.each do |channel| + channel_serials[channel.name] = channel.properties.channel_serial if channel.state == :attached + end + channel_serials + end + end end end diff --git a/lib/ably/realtime/client.rb b/lib/ably/realtime/client.rb index bf04776c2..b35aaa94b 100644 --- a/lib/ably/realtime/client.rb +++ b/lib/ably/realtime/client.rb @@ -1,5 +1,6 @@ require 'uri' require 'ably/realtime/channel/publisher' +require 'ably/realtime/recovery_key_context' module Ably module Realtime @@ -11,6 +12,7 @@ class Client include Ably::Modules::Conversions extend Forwardable + using Ably::Util::AblyExtensions DOMAIN = 'realtime.ably.io' @@ -120,17 +122,23 @@ def initialize(options) acc[key.to_s] = value.to_s end @rest_client = Ably::Rest::Client.new(options.merge(realtime_client: self)) - @echo_messages = rest_client.options.fetch(:echo_messages, true) == false ? false : true - @queue_messages = rest_client.options.fetch(:queue_messages, true) == false ? false : true + @echo_messages = rest_client.options.fetch_with_default(:echo_messages, true) + @queue_messages = rest_client.options.fetch_with_default(:queue_messages, true) @custom_realtime_host = rest_client.options[:realtime_host] || rest_client.options[:ws_host] - @auto_connect = rest_client.options.fetch(:auto_connect, true) == false ? false : true - @recover = rest_client.options[:recover] - - raise ArgumentError, "Recovery key '#{recover}' is invalid" if recover && !recover.match(Connection::RECOVER_REGEX) + @auto_connect = rest_client.options.fetch_with_default(:auto_connect, true) + @recover = rest_client.options.fetch_with_default(:recover, '') @auth = Ably::Realtime::Auth.new(self) @channels = Ably::Realtime::Channels.new(self) @connection = Ably::Realtime::Connection.new(self, options) + + unless @recover.nil_or_empty? + recovery_context = RecoveryKeyContext.from_json(@recover, logger) + unless recovery_context.nil? + @channels.set_channel_serials recovery_context.channel_serials # RTN16j + @connection.set_msg_serial_from_recover = recovery_context.msg_serial # RTN16f + end + end end # Return a {Ably::Realtime::Channel Realtime Channel} for the given name diff --git a/lib/ably/realtime/client/incoming_message_dispatcher.rb b/lib/ably/realtime/client/incoming_message_dispatcher.rb index 292a713b7..435d70660 100644 --- a/lib/ably/realtime/client/incoming_message_dispatcher.rb +++ b/lib/ably/realtime/client/incoming_message_dispatcher.rb @@ -43,19 +43,24 @@ def dispatch_protocol_message(*args) raise ArgumentError, "Expected a ProtocolMessage. Received #{protocol_message}" end - unless protocol_message.action.match_any?(:nack, :error) - logger.debug { "#{protocol_message.action} received: #{protocol_message}" } + # RTL15b + if protocol_message.has_channel_serial? && + ( + protocol_message.action == :message || + protocol_message.action == :presence || + protocol_message.action == :attached + ) + get_channel(protocol_message.channel).tap do |channel| + logger.info "Setting channel serial for channel #{channel.name}, " << + "Previous: #{channel.properties.channel_serial}, New: #{protocol_message.channel_serial}" + channel.properties.channel_serial = protocol_message.channel_serial + end end - if protocol_message.action.match_any?(:sync, :presence, :message) - if connection.serial && protocol_message.has_connection_serial? && protocol_message.connection_serial <= connection.serial - error_message = "Protocol error, duplicate message received for serial #{protocol_message.connection_serial}" - logger.error error_message - return - end + unless protocol_message.action.match_any?(:nack, :error) + logger.debug { "#{protocol_message.action} received: #{protocol_message}" } end - update_connection_recovery_info protocol_message connection.set_connection_confirmed_alive case protocol_message.action @@ -172,10 +177,6 @@ def process_connected_update_message(protocol_message) end end - def update_connection_recovery_info(protocol_message) - connection.update_connection_serial protocol_message.connection_serial if protocol_message.has_connection_serial? - end - def ack_pending_queue_for_message_serial(ack_protocol_message) drop_pending_queue_from_ack(ack_protocol_message) do |protocol_message| ack_messages protocol_message.messages diff --git a/lib/ably/realtime/connection.rb b/lib/ably/realtime/connection.rb index 195abe383..aa0f1ada2 100644 --- a/lib/ably/realtime/connection.rb +++ b/lib/ably/realtime/connection.rb @@ -9,6 +9,8 @@ class Connection include Ably::Modules::Conversions include Ably::Modules::SafeYield extend Ably::Modules::Enum + using Ably::Util::AblyExtensions + # The current {Ably::Realtime::Connection::STATE} of the connection. # Describes the realtime [Connection]{@link Connection} object states. @@ -77,9 +79,6 @@ class Connection include Ably::Modules::UsesStateMachine ensure_state_machine_emits 'Ably::Models::ConnectionStateChange' - # Expected format for a connection recover key - RECOVER_REGEX = /^(?[^:]+):(?[^:]+):(?\-?\d+)$/ - # Defaults for automatic connection recovery and timeouts DEFAULTS = { channel_retry_timeout: 15, # when a channel becomes SUSPENDED, after this delay in seconds, the channel will automatically attempt to reattach if the connection is CONNECTED @@ -113,16 +112,6 @@ class Connection # attr_reader :key - # The serial number of the last message to be received on this connection, used automatically by the library when - # recovering or resuming a connection. When recovering a connection explicitly, the recoveryKey is used in - # the recover client options as it contains both the key and the last message serial. - # - # @spec RTN10 - # - # @return [Integer] - # - attr_reader :serial - # An {Ably::Models::ErrorInfo} object describing the last error received if a connection failure occurs. # # @spec RTN14a @@ -177,17 +166,6 @@ def initialize(client, options) end if options.kind_of?(Hash) @defaults.freeze - # If a recover client options is provided, then we need to ensure that the msgSerial matches the - # recover serial immediately at client library instantiation. This is done immediately so that any queued - # publishes use the correct serial number for these queued messages as well. - # There is no harm if the msgSerial is higher than expected if the recover fails. - recovery_msg_serial = connection_recover_parts && connection_recover_parts[:msg_serial].to_i - if recovery_msg_serial - @client_msg_serial = recovery_msg_serial - else - reset_client_msg_serial - end - Client::IncomingMessageDispatcher.new client, self Client::OutgoingMessageDispatcher.new client, self @@ -196,6 +174,8 @@ def initialize(client, options) @manager = ConnectionManager.new(self) @current_host = client.endpoint.host + + reset_client_msg_serial end # Causes the connection to close, entering the {Ably::Realtime::Connection::STATE} CLOSING state. @@ -347,33 +327,40 @@ def internet_up? end end - # The recovery key string can be used by another client to recover this connection's state in the recover client options property. See connection state recover options for more information. + # The recovery key string can be used by another client to recover this connection's state in the + # recover client options property. See connection state recover options for more information. # # @spec RTN16b, RTN16c # - # @return [String] + # @deprecated Use {#create_recovery_key} instead # def recovery_key - "#{key}:#{serial}:#{client_msg_serial}" if connection_resumable? + logger.warn "[DEPRECATION] recovery_key is deprecated, use create_recovery_key method instead" + create_recovery_key + end + + # The recovery key string can be used by another client to recover this connection's state in the recover client + # options property. See connection state recover options for more information. + # + # @spec RTN16g, RTN16c + # + # @return [String] a json string which incorporates the @connectionKey@, the current @msgSerial@ and collection + # of pairs of channel @name@ and current @channelSerial@ for every currently attached channel + def create_recovery_key + if key.nil_or_empty? || state == :closing || state == :closed || state == :failed || state == :suspended + return nil #RTN16g2 + end + RecoveryKeyContext.new(key, client_msg_serial, client.channels.get_channel_serials).to_json end # Following a new connection being made, the connection ID, connection key - # and connection serial need to match the details provided by the server. + # need to match the details provided by the server. # # @return [void] # @api private - def configure_new(connection_id, connection_key, connection_serial) + def configure_new(connection_id, connection_key) @id = connection_id @key = connection_key - - update_connection_serial connection_serial - end - - # Store last received connection serial so that the connection can be resumed from the last known point-in-time - # @return [void] - # @api private - def update_connection_serial(connection_serial) - @serial = connection_serial end # Disable automatic resume of a connection @@ -381,7 +368,7 @@ def update_connection_serial(connection_serial) # @api private def reset_resume_info @key = nil - @serial = nil + @id = nil end # @!attribute [r] __outgoing_protocol_msgbus__ @@ -472,7 +459,7 @@ def create_websocket_transport url_params = auth_params.merge( 'format' => client.protocol, 'echo' => client.echo_messages, - 'v' => Ably::PROTOCOL_VERSION, + 'v' => Ably::PROTOCOL_VERSION, # RSC7a 'agent' => client.rest_client.agent ) @@ -486,14 +473,15 @@ def create_websocket_transport url_params['clientId'] = client.auth.client_id if client.auth.has_client_id? url_params.merge!(client.transport_params) - if connection_resumable? - url_params.merge! resume: key, connection_serial: serial - logger.debug { "Resuming connection key #{key} with serial #{serial}" } - elsif connection_recoverable? - url_params.merge! recover: connection_recover_parts[:recover], connectionSerial: connection_recover_parts[:connection_serial] - logger.debug { "Recovering connection with key #{client.recover}" } - unsafe_once(:connected, :closed, :failed) do - client.disable_automatic_connection_recovery + if !key.nil_or_empty? and connection_state_available? + url_params.merge! resume: key + logger.debug { "Resuming connection with key #{key}" } + elsif !client.recover.nil_or_empty? + recovery_context = RecoveryKeyContext.from_json(client.recover, logger) + unless recovery_context.nil? + key = recovery_context.connection_key + logger.debug { "Recovering connection with key #{key}" } + url_params.merge! recover: key end end @@ -600,6 +588,12 @@ def reset_client_msg_serial @client_msg_serial = -1 end + # Sets the client message serial from recover clientOption. + # @api private + def set_msg_serial_from_recover=(value) + @client_msg_serial = value + end + # When a hearbeat or any other message from Ably is received # we know it's alive, see #RTN23 # @api private @@ -620,11 +614,7 @@ def time_since_connection_confirmed_alive? private # The client message serial (msgSerial) is incremented for every message that is published that requires an ACK. - # Note that this is different to the connection serial that contains the last known serial number - # received from the server. - # # A message serial number does not guarantee a message has been received, only sent. - # A connection serial guarantees the server has received the message and is thus used for connection recovery and resumes. # @return [Integer] starting at -1 indicating no messages sent, 0 when the first message is sent def client_msg_serial @client_msg_serial @@ -665,10 +655,6 @@ def when_initialized EventMachine.next_tick { yield } end - def connection_resumable? - !key.nil? && !serial.nil? && connection_state_available? - end - def connection_state_available? return true if connected? @@ -682,14 +668,6 @@ def connection_state_available? end end - def connection_recoverable? - connection_recover_parts - end - - def connection_recover_parts - client.recover.to_s.match(RECOVER_REGEX) - end - def production? client.environment.nil? || client.environment == :production end @@ -740,3 +718,4 @@ def second_reconnect_attempt_for(state, first_attempt_count) require 'ably/realtime/connection/connection_manager' require 'ably/realtime/connection/connection_state_machine' require 'ably/realtime/connection/websocket_transport' +require 'ably/realtime/recovery_key_context' diff --git a/lib/ably/realtime/connection/connection_manager.rb b/lib/ably/realtime/connection/connection_manager.rb index 8d514b9f9..cfbfe87d1 100644 --- a/lib/ably/realtime/connection/connection_manager.rb +++ b/lib/ably/realtime/connection/connection_manager.rb @@ -14,12 +14,14 @@ class ConnectionManager RESOLVABLE_ERROR_CODES = { token_expired: Ably::Exceptions::TOKEN_EXPIRED_CODE } + using Ably::Util::AblyExtensions def initialize(connection) @connection = connection @timers = Hash.new { |hash, key| hash[key] = [] } - connection.unsafe_on(:closed) do + # RTN8c, RTN9c + connection.unsafe_on(:closing, :closed, :suspended, :failed) do connection.reset_resume_info end @@ -111,23 +113,29 @@ def connected(protocol_message) # Update the connection details and any associated defaults connection.set_connection_details protocol_message.connection_details + is_connection_resume_or_recover_attempt = !connection.key.nil_or_empty? || !client.recover.nil_or_empty? + + # RTN15c7, RTN16d + failed_resume_or_recover = !protocol_message.connection_id == connection.id && !protocol_message.error.nil? + if is_connection_resume_or_recover_attempt and failed_resume_or_recover # RTN15c7 + connection.reset_client_msg_serial + end + client.disable_automatic_connection_recovery # RTN16k, explicitly setting null, so it won't be used for subsequent connection requests + if connection.key if protocol_message.connection_id == connection.id logger.debug { "ConnectionManager: Connection resumed successfully - ID #{connection.id} and key #{connection.key}" } EventMachine.next_tick { connection.trigger_resumed } resend_pending_message_ack_queue else - logger.debug { "ConnectionManager: Connection was not resumed, old connection ID #{connection.id} has been updated with new connection ID #{protocol_message.connection_id} and key #{protocol_message.connection_details.connection_key}" } nack_messages_on_all_channels protocol_message.error - force_reattach_on_channels protocol_message.error end else logger.debug { "ConnectionManager: New connection created with ID #{protocol_message.connection_id} and key #{protocol_message.connection_details.connection_key}" } end - reattach_suspended_channels protocol_message.error - - connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key, protocol_message.connection_serial + connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key + force_reattach_on_channels protocol_message.error # irrespective of connection success/failure, reattach channels end # When connection is CONNECTED and receives an update @@ -139,7 +147,7 @@ def connected_update(protocol_message) # Update the connection details and any associated defaults connection.set_connection_details protocol_message.connection_details - connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key, protocol_message.connection_serial + connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key state_change = Ably::Models::ConnectionStateChange.new( current: connection.state, @@ -281,7 +289,7 @@ def retry_count_for_state(state) # Any message sent before an ACK/NACK was received on the previous transport # need to be resent to the Ably service so that a subsequent ACK/NACK is received. # It is up to Ably to ensure that duplicate messages are not retransmitted on the channel - # base on the serial numbers + # based on the message serial numbers # # @api private def resend_pending_message_ack_queue @@ -568,20 +576,12 @@ def currently_renewing_token? client.auth.authorization_in_flight? end - def reattach_suspended_channels(error) - channels.select do |channel| - channel.suspended? - end.each do |channel| - channel.transition_state_machine :attaching - end - end - - # When continuity on a connection is lost all messages - # Channels in the ATTACHED or ATTACHING state should explicitly be re-attached - # by sending a new ATTACH to Ably + # When reconnected if channel is in ATTACHING, ATTACHED or SUSPENDED state + # it should explicitly be re-attached by sending a new ATTACH to Ably + # Spec : RTN15c6, RTN15c7 def force_reattach_on_channels(error) channels.select do |channel| - channel.attached? || channel.attaching? + channel.attached? || channel.attaching? || channel.suspended? end.each do |channel| channel.manager.request_reattach reason: error end diff --git a/lib/ably/realtime/recovery_key_context.rb b/lib/ably/realtime/recovery_key_context.rb new file mode 100644 index 000000000..30ec3bc2e --- /dev/null +++ b/lib/ably/realtime/recovery_key_context.rb @@ -0,0 +1,36 @@ +require 'json' +# frozen_string_literal: true + +module Ably + module Realtime + class RecoveryKeyContext + attr_reader :connection_key + attr_reader :msg_serial + attr_reader :channel_serials + + def initialize(connection_key, msg_serial, channel_serials) + @connection_key = connection_key + @msg_serial = msg_serial + @channel_serials = channel_serials + if @channel_serials.nil? + @channel_serials = {} + end + end + + def to_json + { 'connection_key' => @connection_key, 'msg_serial' => @msg_serial, 'channel_serials' => @channel_serials }.to_json + end + + def self.from_json(obj, logger = nil) + begin + data = JSON.load obj + self.new data['connection_key'], data['msg_serial'], data['channel_serials'] + rescue => e + logger.warn "unable to decode recovery key, found error #{e}" unless logger.nil? + return nil + end + end + + end + end +end diff --git a/lib/ably/rest/client.rb b/lib/ably/rest/client.rb index 235bd81f0..7aa34e3cb 100644 --- a/lib/ably/rest/client.rb +++ b/lib/ably/rest/client.rb @@ -16,6 +16,7 @@ class Client include Ably::Modules::Conversions include Ably::Modules::HttpHelpers extend Forwardable + using Ably::Util::AblyExtensions # Default Ably domain for REST DOMAIN = 'rest.ably.io' @@ -186,7 +187,7 @@ def initialize(options) @agent = options.delete(:agent) || Ably::AGENT @realtime_client = options.delete(:realtime_client) - @tls = options.delete(:tls) == false ? false : true + @tls = options.delete_with_default(:tls, true) @environment = options.delete(:environment) # nil is production @environment = nil if [:production, 'production'].include?(@environment) @protocol = options.delete(:protocol) || :msgpack @@ -200,10 +201,7 @@ def initialize(options) @log_retries_as_info = options.delete(:log_retries_as_info) @max_message_size = options.delete(:max_message_size) || MAX_MESSAGE_SIZE @max_frame_size = options.delete(:max_frame_size) || MAX_FRAME_SIZE - - if (@idempotent_rest_publishing = options.delete(:idempotent_rest_publishing)).nil? - @idempotent_rest_publishing = Ably::PROTOCOL_VERSION.to_f > 1.1 - end + @idempotent_rest_publishing = options.delete_with_default(:idempotent_rest_publishing, true) if options[:fallback_hosts_use_default] && options[:fallback_hosts] raise ArgumentError, "fallback_hosts_use_default cannot be set to try when fallback_hosts is also provided" diff --git a/lib/ably/util/ably_extensions.rb b/lib/ably/util/ably_extensions.rb new file mode 100644 index 000000000..c4304eb8a --- /dev/null +++ b/lib/ably/util/ably_extensions.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +module Ably::Util + module AblyExtensions + refine Object do + def nil_or_empty? + self.nil? || self.empty? + end + end + + refine Hash do + def fetch_with_default(key, default) + value = self.fetch(key, default) + if value.nil? + return default + end + return value + end + + def delete_with_default(key, default) + value = self.delete(key) + if value.nil? + return default + end + return value + end + end + end +end diff --git a/lib/ably/util/safe_deferrable.rb b/lib/ably/util/safe_deferrable.rb index 05b13c1cd..36cdd1cf8 100644 --- a/lib/ably/util/safe_deferrable.rb +++ b/lib/ably/util/safe_deferrable.rb @@ -1,6 +1,6 @@ module Ably::Util # SafeDeferrable class provides a Deferrable that is safe to use for for public interfaces - # of this client library. Any exceptions raised in the success or failure callbacks are + # of this client library. Any exceptions raised in the success or failure callbacks are # caught and logged to the provided logger. # # An exception in a callback provided by a developer should not break this client library diff --git a/lib/ably/version.rb b/lib/ably/version.rb index c0265f5c9..c0144ed69 100644 --- a/lib/ably/version.rb +++ b/lib/ably/version.rb @@ -1,9 +1,7 @@ module Ably VERSION = '1.2.5' - PROTOCOL_VERSION = '1.2' - - # @api private - def self.major_minor_version_numeric - VERSION.gsub(/\.\d+$/, '').to_f - end + # The level of compatibility with the Ably service that this SDK supports. + # Also referred to as the 'wire protocol version'. + # spec : CSV2 + PROTOCOL_VERSION = '2' end diff --git a/spec/acceptance/realtime/connection_spec.rb b/spec/acceptance/realtime/connection_spec.rb index 53fc9c752..edfb8f702 100644 --- a/spec/acceptance/realtime/connection_spec.rb +++ b/spec/acceptance/realtime/connection_spec.rb @@ -744,58 +744,6 @@ def expect_ordered_phases end end - describe '#serial connection serial' do - let(:channel) { client.channel(random_str) } - - it 'is set to -1 when a new connection is opened' do - connection.connect do - expect(connection.serial).to eql(-1) - stop_reactor - end - end - - context 'when a message is sent but the ACK has not yet been received' do - it 'the sent message msgSerial is 0 but the connection serial remains at -1' do - channel.attach do - connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| - if protocol_message.action == :message - connection.__outgoing_protocol_msgbus__.unsubscribe - expect(protocol_message['msgSerial']).to eql(0) - expect(connection.serial).to eql(-1) - stop_reactor - end - end - channel.publish('event', 'data') - end - end - end - - it 'is set to 0 when a message is received back' do - channel.publish('event', 'data') - channel.subscribe do - expect(connection.serial).to eql(0) - stop_reactor - end - end - - it 'is set to 1 when the second message is received' do - channel.attach do - messages = [] - channel.subscribe do |message| - messages << message - if messages.length == 2 - expect(connection.serial).to eql(1) - stop_reactor - end - end - - channel.publish('event', 'data') do - channel.publish('event', 'data') - end - end - end - end - describe '#msgSerial' do context 'when messages are queued for publish before a connection is established' do let(:batches) { 6 } @@ -922,7 +870,6 @@ def log_connection_changes let(:protocol_message_attributes) do { action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i, - connection_serial: 55, connection_details: { max_idle_interval: 2 * 1000 } @@ -1232,7 +1179,6 @@ def log_connection_changes let(:protocol_message_attributes) do { action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i, - connection_serial: 55, connection_details: { max_idle_interval: 2 * 1000 } @@ -1403,34 +1349,6 @@ def self.available_states let(:states) { Hash.new } let(:channel) { client.channel(random_str) } - it 'is composed of connection key and serial that is kept up to date with each message ACK received' do - connection.on(:connected) do - expected_serial = -1 - expect(connection.key).to_not be_nil - expect(connection.serial).to eql(expected_serial) - - channel.attach do - channel.publish('event', 'data') - channel.subscribe do - channel.unsubscribe - - expected_serial += 1 # attach message received - expect(connection.serial).to eql(expected_serial) - - channel.publish('event', 'data') - channel.subscribe do - channel.unsubscribe - expected_serial += 1 # attach message received - expect(connection.serial).to eql(expected_serial) - - expect(connection.recovery_key).to eql("#{connection.key}:#{connection.serial}:#{connection.send(:client_msg_serial)}") - stop_reactor - end - end - end - end - end - it "is available when connection is in one of the states: #{available_states.join(', ')}" do connection.once(:connected) do allow(client).to receive(:endpoint).and_return( @@ -2001,7 +1919,6 @@ def self.available_states let(:protocol_message_attributes) do { action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i, - connection_serial: 55, connection_details: { client_id: 'bob', connection_key: connection_key, @@ -2037,7 +1954,6 @@ def self.available_states connection.once(:update) do |connection_state_change| expect(client.auth.client_id).to eql('bob') expect(connection.key).to eql(connection_key) - expect(connection.serial).to eql(55) expect(connection.connection_state_ttl).to eql(33) expect(connection.details.client_id).to eql('bob') @@ -2060,7 +1976,6 @@ def self.available_states let(:protocol_message_attributes) do { action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i, - connection_serial: 22, error: { code: 50000, message: 'Internal failure' }, } end @@ -2089,7 +2004,7 @@ def self.available_states it 'sends the protocol version param v (#G4, #RTN2f)' do expect(EventMachine).to receive(:connect) do |host, port, transport, object, url| uri = URI.parse(url) - expect(CGI::parse(uri.query)['v'][0]).to eql('1.2') + expect(CGI::parse(uri.query)['v'][0]).to eql('2') stop_reactor end client diff --git a/spec/acceptance/realtime/presence_spec.rb b/spec/acceptance/realtime/presence_spec.rb index 2250a14b1..8f3a5e54c 100644 --- a/spec/acceptance/realtime/presence_spec.rb +++ b/spec/acceptance/realtime/presence_spec.rb @@ -580,7 +580,6 @@ def presence_action(method_name, data) action = Ably::Models::ProtocolMessage::ACTION.Presence presence_msg = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: 20, channel: channel_name, presence: presence_data, timestamp: Time.now.to_i * 1000 @@ -633,7 +632,6 @@ def allow_sync_fabricate_data_final_sync_and_assert_members action = Ably::Models::ProtocolMessage::ACTION.Presence presence_msg = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: anonymous_client.connection.serial + 1, channel: channel_name, presence: presence_data, timestamp: Time.now.to_i * 1000 @@ -644,7 +642,6 @@ def allow_sync_fabricate_data_final_sync_and_assert_members action = Ably::Models::ProtocolMessage::ACTION.Sync sync_msg = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: anonymous_client.connection.serial + 2, channel: channel_name, channel_serial: 'validserialprefix:', # with no part after the `:` this indicates the end to the SYNC presence: [], @@ -2243,7 +2240,6 @@ def connect_members_deferrables action = Ably::Models::ProtocolMessage::ACTION.Sync sync_message = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: 10, channel_serial: 'sequenceid:cursor', channel: channel_name, presence: presence_sync_1, @@ -2253,7 +2249,6 @@ def connect_members_deferrables sync_message = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: 11, channel_serial: 'sequenceid:', # indicates SYNC is complete channel: channel_name, presence: presence_sync_2, @@ -2294,7 +2289,6 @@ def connect_members_deferrables action = Ably::Models::ProtocolMessage::ACTION.Sync sync_message = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: 10, channel: channel_name, presence: presence_sync, timestamp: Time.now.to_i * 1000 @@ -2348,7 +2342,6 @@ def connect_members_deferrables action = Ably::Models::ProtocolMessage::ACTION.Sync sync_message = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: 10, channel: channel_name, presence: presence_sync_protocol_message, timestamp: Time.now.to_i * 1000 diff --git a/spec/acceptance/rest/client_spec.rb b/spec/acceptance/rest/client_spec.rb index db80bb65f..e1619c6ef 100644 --- a/spec/acceptance/rest/client_spec.rb +++ b/spec/acceptance/rest/client_spec.rb @@ -1095,9 +1095,16 @@ def encode64(text) end it 'sends a protocol version and lib version header (#G4, #RSC7a, #RSC7b)' do - client.channels.get('foo').publish("event") + response = client.channels.get('foo').publish("event") + expect(response).to eql true expect(publish_message_stub).to have_been_requested - expect(Ably::PROTOCOL_VERSION).to eql('1.2') + if agent.nil? + expect(publish_message_stub.to_s).to include("'Ably-Agent'=>'#{Ably::AGENT}'") + expect(publish_message_stub.to_s).to include("'X-Ably-Version'=>'2'") + else + expect(publish_message_stub.to_s).to include("'Ably-Agent'=>'ably-ruby/1.1.1 ruby/3.1.1'") + expect(publish_message_stub.to_s).to include("'X-Ably-Version'=>'2'") + end end end end diff --git a/spec/acceptance/rest/message_spec.rb b/spec/acceptance/rest/message_spec.rb index 4291d6aa6..49960226e 100644 --- a/spec/acceptance/rest/message_spec.rb +++ b/spec/acceptance/rest/message_spec.rb @@ -204,20 +204,17 @@ end end - specify 'idempotent publishing is disabled by default with <= 1.1 (#TO3n)' do - stub_const 'Ably::PROTOCOL_VERSION', '1.0' - client = Ably::Rest::Client.new(key: api_key, protocol: protocol) - expect(client.idempotent_rest_publishing).to be_falsey - stub_const 'Ably::PROTOCOL_VERSION', '1.1' - client = Ably::Rest::Client.new(key: api_key, protocol: protocol) + specify 'idempotent publishing is set as per clientOptions' do + # set idempotent_rest_publishing to false + client = Ably::Rest::Client.new(key: api_key, protocol: protocol, idempotent_rest_publishing: false) expect(client.idempotent_rest_publishing).to be_falsey - end - specify 'idempotent publishing is enabled by default with >= 1.2 (#TO3n)' do - stub_const 'Ably::PROTOCOL_VERSION', '1.2' - client = Ably::Rest::Client.new(key: api_key, protocol: protocol) + # set idempotent_rest_publishing to true + client = Ably::Rest::Client.new(key: api_key, protocol: protocol, idempotent_rest_publishing: true) expect(client.idempotent_rest_publishing).to be_truthy - stub_const 'Ably::PROTOCOL_VERSION', '1.3' + end + + specify 'idempotent publishing is enabled by default (#TO3n)' do client = Ably::Rest::Client.new(key: api_key, protocol: protocol) expect(client.idempotent_rest_publishing).to be_truthy end diff --git a/spec/unit/models/protocol_message_spec.rb b/spec/unit/models/protocol_message_spec.rb index ddfea5733..b279b3c54 100644 --- a/spec/unit/models/protocol_message_spec.rb +++ b/spec/unit/models/protocol_message_spec.rb @@ -127,14 +127,6 @@ def new_protocol_message(options) end end - context '#connection_serial' do - let(:protocol_message) { new_protocol_message(connection_serial: "55") } - it 'converts :connection_serial to an Integer' do - expect(protocol_message.connection_serial).to be_a(Integer) - expect(protocol_message.connection_serial).to eql(55) - end - end - context '#flags (#TR4i)' do context 'when nil' do let(:protocol_message) { new_protocol_message({}) } @@ -241,76 +233,6 @@ def new_protocol_message(options) end end - context '#has_connection_serial?' do - context 'without connection_serial' do - let(:protocol_message) { new_protocol_message({}) } - - it 'returns false' do - expect(protocol_message.has_connection_serial?).to eql(false) - end - end - - context 'with connection_serial' do - let(:protocol_message) { new_protocol_message(connection_serial: "55") } - - it 'returns true' do - expect(protocol_message.has_connection_serial?).to eql(true) - end - end - end - - context '#serial' do - context 'with underlying msg_serial' do - let(:protocol_message) { new_protocol_message(msg_serial: "55") } - it 'converts :msg_serial to an Integer' do - expect(protocol_message.serial).to be_a(Integer) - expect(protocol_message.serial).to eql(55) - end - end - - context 'with underlying connection_serial' do - let(:protocol_message) { new_protocol_message(connection_serial: "55") } - it 'converts :connection_serial to an Integer' do - expect(protocol_message.serial).to be_a(Integer) - expect(protocol_message.serial).to eql(55) - end - end - - context 'with underlying connection_serial and msg_serial' do - let(:protocol_message) { new_protocol_message(connection_serial: "99", msg_serial: "11") } - it 'prefers connection_serial and converts :connection_serial to an Integer' do - expect(protocol_message.serial).to be_a(Integer) - expect(protocol_message.serial).to eql(99) - end - end - end - - context '#has_serial?' do - context 'without msg_serial or connection_serial' do - let(:protocol_message) { new_protocol_message({}) } - - it 'returns false' do - expect(protocol_message.has_serial?).to eql(false) - end - end - - context 'with msg_serial' do - let(:protocol_message) { new_protocol_message(msg_serial: "55") } - - it 'returns true' do - expect(protocol_message.has_serial?).to eql(true) - end - end - - context 'with connection_serial' do - let(:protocol_message) { new_protocol_message(connection_serial: "55") } - - it 'returns true' do - expect(protocol_message.has_serial?).to eql(true) - end - end - end - context '#error' do context 'with no error attribute' do let(:protocol_message) { new_protocol_message(action: 1) } diff --git a/spec/unit/realtime/recovery_key_context_spec.rb b/spec/unit/realtime/recovery_key_context_spec.rb new file mode 100644 index 000000000..fd373a395 --- /dev/null +++ b/spec/unit/realtime/recovery_key_context_spec.rb @@ -0,0 +1,36 @@ +require 'spec_helper' +require 'ably/realtime/recovery_key_context' + +describe Ably::Realtime::RecoveryKeyContext do + + context 'connection recovery key' do + + it 'should encode recovery key - RTN16i, RTN16f, RTN16j' do + connection_key = 'key' + msg_serial = 123 + channel_serials = { + 'channel1' => 'serial1', + 'channel2' => 'serial2' + } + recovery_context = Ably::Realtime::RecoveryKeyContext.new(connection_key, msg_serial, channel_serials) + encoded_recovery_key = recovery_context.to_json + expect(encoded_recovery_key).to eq "{\"connection_key\":\"key\",\"msg_serial\":123," << + "\"channel_serials\":{\"channel1\":\"serial1\",\"channel2\":\"serial2\"}}" + end + + it 'should decode recovery key - RTN16i, RTN16f, RTN16j' do + encoded_recovery_key = "{\"connection_key\":\"key\",\"msg_serial\":123," << + "\"channel_serials\":{\"channel1\":\"serial1\",\"channel2\":\"serial2\"}}" + decoded_recovery_key = Ably::Realtime::RecoveryKeyContext.from_json(encoded_recovery_key) + expect(decoded_recovery_key.connection_key).to eq("key") + expect(decoded_recovery_key.msg_serial).to eq(123) + end + + it 'should return nil for invalid recovery key - RTN16i, RTN16f, RTN16j' do + encoded_recovery_key = "{\"invalid key\"}" + decoded_recovery_key = Ably::Realtime::RecoveryKeyContext.from_json(encoded_recovery_key) + expect(decoded_recovery_key).to be_nil + end + + end +end