diff --git a/lib/ably/models/connection_details.rb b/lib/ably/models/connection_details.rb index 0347bb790..58be50efc 100644 --- a/lib/ably/models/connection_details.rb +++ b/lib/ably/models/connection_details.rb @@ -21,6 +21,12 @@ def self.ConnectionDetails(attributes) class ConnectionDetails include Ably::Modules::ModelCommon + # Max message size + MAX_MESSAGE_SIZE = 65536 # See spec TO3l8 + + # Max frame size + MAX_FRAME_SIZE = 524288 # See spec TO3l9 + # @param attributes [Hash] # @option attributes [String] :client_id contains the client ID assigned to the connection # @option attributes [String] :connection_key the connection secret key string that is used to resume a connection and its state @@ -38,8 +44,8 @@ def initialize(attributes = {}) self.attributes[duration_field] = (self.attributes[duration_field].to_f / 1000).round end end - self.attributes[:max_message_size] ||= 65536 - self.attributes[:max_frame_size] ||= 524288 + self.attributes[:max_message_size] ||= MAX_MESSAGE_SIZE + self.attributes[:max_frame_size] ||= MAX_FRAME_SIZE self.attributes.freeze end diff --git a/lib/ably/realtime/channel/publisher.rb b/lib/ably/realtime/channel/publisher.rb index 26e444764..d7da60479 100644 --- a/lib/ably/realtime/channel/publisher.rb +++ b/lib/ably/realtime/channel/publisher.rb @@ -22,8 +22,9 @@ def enqueue_messages_on_connection(client, raw_messages, channel_name, channel_o end end - if messages.sum(&:size) > Ably::Realtime::Connection::MAX_MESSAGE_SIZE - error = Ably::Exceptions::MaxMessageSizeExceeded.new("Message size exceeded #{Ably::Realtime::Connection::MAX_MESSAGE_SIZE} bytes.") + max_message_size = connection.details && connection.details.max_message_size || Ably::Models::ConnectionDetails::MAX_MESSAGE_SIZE + if messages.sum(&:size) > max_message_size + error = Ably::Exceptions::MaxMessageSizeExceeded.new("Message size exceeded #{max_message_size} bytes.") return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error) end diff --git a/lib/ably/realtime/connection.rb b/lib/ably/realtime/connection.rb index c27537efe..246c79f53 100644 --- a/lib/ably/realtime/connection.rb +++ b/lib/ably/realtime/connection.rb @@ -82,9 +82,6 @@ class Connection # Max number of messages to bundle in a single ProtocolMessage MAX_PROTOCOL_MESSAGE_BATCH_SIZE = 50 - # Max message size - MAX_MESSAGE_SIZE = 65536 # See spec TO3l8 - # A unique public identifier for this connection, used to identify this member in presence events and messages # @return [String] attr_reader :id diff --git a/lib/ably/rest/channel.rb b/lib/ably/rest/channel.rb index 2a187fe2e..7c6b56bb0 100644 --- a/lib/ably/rest/channel.rb +++ b/lib/ably/rest/channel.rb @@ -23,7 +23,6 @@ class Channel attr_reader :push IDEMPOTENT_LIBRARY_GENERATED_ID_LENGTH = 9 # See spec RSL1k1 - MAX_MESSAGE_SIZE = 65536 # See spec TO3l8 # Initialize a new Channel object # @@ -88,8 +87,8 @@ def publish(first, second = nil, third = {}) messages.map! { |message| Ably::Models::Message(message.dup) } - if messages.sum(&:size) > Ably::Rest::Channel::MAX_MESSAGE_SIZE - raise Ably::Exceptions::MaxMessageSizeExceeded.new("Maximum message size exceeded #{Ably::Rest::Channel::MAX_MESSAGE_SIZE}.") + if messages.sum(&:size) > (max_message_size = client.max_message_size || Ably::Rest::Client::MAX_MESSAGE_SIZE) + raise Ably::Exceptions::MaxMessageSizeExceeded.new("Maximum message size exceeded #{max_message_size} bytes.") end payload = messages.map do |message| diff --git a/lib/ably/rest/client.rb b/lib/ably/rest/client.rb index 9e9992576..e1c76bf4e 100644 --- a/lib/ably/rest/client.rb +++ b/lib/ably/rest/client.rb @@ -25,7 +25,8 @@ class Client # Default Ably domain for REST DOMAIN = 'rest.ably.io' - MAX_FRAME_SIZE = 524288 + MAX_MESSAGE_SIZE = 65536 # See spec TO3l8 + MAX_FRAME_SIZE = 524288 # See spec TO3l8 # Configuration for HTTP timeouts and HTTP request reattempts to fallback hosts HTTP_DEFAULTS = { @@ -118,6 +119,14 @@ class Client # @return [Boolean] attr_reader :idempotent_rest_publishing + # Max message size (TO2, TO3l8) by default (65536 bytes) 64KiB + # @return [Integer] + attr_reader :max_message_size + + # Max frame size (TO2, TO3l8) by default (524288 bytes) 512KiB + # @return [Integer] + attr_reader :max_frame_size + # Creates a {Ably::Rest::Client Rest Client} and configures the {Ably::Auth} object for the connection. # # @param [Hash,String] options an options Hash used to configure the client and the authentication, or String with an API key or Token ID @@ -152,6 +161,8 @@ class Client # # @option options [Boolean] :add_request_ids (false) When true, adds a unique request_id to each request sent to Ably servers. This is handy when reporting issues, because you can refer to a specific request. # @option options [Boolean] :idempotent_rest_publishing (false if ver < 1.2) When true, idempotent publishing is enabled for all messages published via REST + # @option options [Integer] :max_message_size (65536 bytes) Maximum size of all messages when publishing via REST publish() + # @option options [Integer] :max_frame_size (524288 bytes) Maximum size of frame # # @return [Ably::Rest::Client] # @@ -189,7 +200,8 @@ def initialize(options) @add_request_ids = options.delete(:add_request_ids) @log_retries_as_info = options.delete(:log_retries_as_info) @idempotent_rest_publishing = options.delete(:idempotent_rest_publishing) || Ably.major_minor_version_numeric > 1.1 - + @max_message_size = options.delete(:max_message_size) || MAX_MESSAGE_SIZE + @max_frame_size = options.delete(:max_frame_size) || MAX_FRAME_SIZE if options[:fallback_hosts_use_default] && options[:fallback_hosts] raise ArgumentError, "fallback_hosts_use_default cannot be set to try when fallback_hosts is also provided" @@ -365,8 +377,8 @@ def request(method, path, params = {}, body = nil, headers = {}, options = {}) send_request(method, path, params, headers: headers) end when :post, :patch, :put - if body.to_json.bytesize > MAX_FRAME_SIZE - raise Ably::Exceptions::MaxFrameSizeExceeded.new("Maximum frame size exceeded #{MAX_FRAME_SIZE} bytes.") + if body.to_json.bytesize > max_frame_size + raise Ably::Exceptions::MaxFrameSizeExceeded.new("Maximum frame size exceeded #{max_frame_size} bytes.") end path_with_params = Addressable::URI.new path_with_params.query_values = params || {} diff --git a/spec/acceptance/realtime/channel_spec.rb b/spec/acceptance/realtime/channel_spec.rb index 8f437aad6..cba30b4fa 100644 --- a/spec/acceptance/realtime/channel_spec.rb +++ b/spec/acceptance/realtime/channel_spec.rb @@ -1181,7 +1181,7 @@ def disconnect_transport end context 'with more than allowed messages in a single publish' do - let(:channel_name) { random_str } + 65536 it 'rejects the publish' do messages = (Ably::Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE + 1).times.map do @@ -1405,15 +1405,98 @@ def disconnect_transport end context 'message size exceeded (#TO3l8)' do - let(:message) { 'x' * 700000 } - let(:client) { auto_close Ably::Realtime::Client.new(client_options) } let(:channel) { client.channels.get(channel_name) } - it 'should not allow to send a message' do - channel.publish('event', message).errback do |error| - expect(error).to be_instance_of(Ably::Exceptions::MaxMessageSizeExceeded) - stop_reactor + context 'and max_message_size is default (65536 bytes)' do + let(:channel_name) { random_str } + let(:max_message_size) { 65536 } + + it 'should allow to send a message (32 bytes)' do + client.connection.once(:connected) do + channel.subscribe('event') do |msg| + expect(msg.data).to eq('x' * 32) + stop_reactor + end + channel.publish('event', 'x' * 32) + end + end + + it 'should not allow to send a message (700000 bytes)' do + client.connection.once(:connected) do + connection_details = Ably::Models::ConnectionDetails.new( + client.connection.details.attributes.attributes.merge('maxMessageSize' => max_message_size) + ) + client.connection.set_connection_details(connection_details) + expect(client.connection.details.max_message_size).to eq(65536) + channel.publish('event', 'x' * 700000).errback do |error| + expect(error).to be_instance_of(Ably::Exceptions::MaxMessageSizeExceeded) + stop_reactor + end + end + end + end + + context 'and max_message_size is customized (11 bytes)' do + let(:max_message_size) { 11 } + + context 'and the message size is 30 bytes' do + let(:channel_name) { random_str } + + it 'should not allow to send a message' do + client.connection.once(:connected) do + connection_details = Ably::Models::ConnectionDetails.new( + client.connection.details.attributes.attributes.merge('maxMessageSize' => max_message_size) + ) + client.connection.set_connection_details(connection_details) + expect(client.connection.details.max_message_size).to eq(11) + channel.publish('event', 'x' * 30).errback do |error| + expect(error).to be_instance_of(Ably::Exceptions::MaxMessageSizeExceeded) + stop_reactor + end + end + end + end + end + + context 'and max_message_size is nil' do + let(:max_message_size) { nil } + + context 'and the message size is 30 bytes' do + let(:channel_name) { random_str } + + it 'should allow to send a message' do + client.connection.once(:connected) do + connection_details = Ably::Models::ConnectionDetails.new( + client.connection.details.attributes.attributes.merge('maxMessageSize' => max_message_size) + ) + client.connection.set_connection_details(connection_details) + expect(client.connection.details.max_message_size).to eq(65536) + channel.subscribe('event') do |msg| + expect(msg.data).to eq('x' * 30) + stop_reactor + end + channel.publish('event', 'x' * 30) + end + end + end + + context 'and the message size is 65537 bytes' do + let(:channel_name) { random_str } + + it 'should not allow to send a message' do + client.connection.once(:connected) do + connection_details = Ably::Models::ConnectionDetails.new( + client.connection.details.attributes.attributes.merge('maxMessageSize' => max_message_size) + ) + client.connection.set_connection_details(connection_details) + expect(client.connection.details.max_message_size).to eq(65536) + channel.publish('event', 'x' * 65537).errback do |error| + expect(error).to be_instance_of(Ably::Exceptions::MaxMessageSizeExceeded) + stop_reactor + end + end + end end end end diff --git a/spec/acceptance/rest/channel_spec.rb b/spec/acceptance/rest/channel_spec.rb index 867f5fe6f..691c44cb6 100644 --- a/spec/acceptance/rest/channel_spec.rb +++ b/spec/acceptance/rest/channel_spec.rb @@ -5,11 +5,13 @@ include Ably::Modules::Conversions vary_by_protocol do - let(:default_options) { { key: api_key, environment: environment, protocol: protocol} } + let(:default_options) { { key: api_key, environment: environment, protocol: protocol, max_frame_size: max_frame_size, max_message_size: max_message_size } } let(:client_options) { default_options } let(:client) do Ably::Rest::Client.new(client_options) end + let(:max_message_size) { nil } + let(:max_frame_size) { nil } describe '#publish' do let(:channel_name) { random_str } @@ -60,7 +62,8 @@ end it 'publishes an array of messages in one HTTP request' do - expect(messages.sum(&:size) < Ably::Rest::Channel::MAX_MESSAGE_SIZE).to eq(true) + expect(client.max_message_size).to eq(Ably::Rest::Client::MAX_MESSAGE_SIZE) + expect(messages.sum(&:size) < Ably::Rest::Client::MAX_MESSAGE_SIZE).to eq(true) expect(client).to receive(:post).once.and_call_original expect(channel.publish(messages)).to eql(true) @@ -70,19 +73,78 @@ end context 'with an array of Message objects' do - let(:messages) do - 10.times.map do |index| - Ably::Models::Message(name: index.to_s, data: { "index" => index + 10 }) + context 'when max_message_size and max_frame_size is not set' do + before do + expect(client.max_message_size).to eq(Ably::Rest::Client::MAX_MESSAGE_SIZE) + expect(client.max_frame_size).to eq(Ably::Rest::Client::MAX_FRAME_SIZE) + end + + context 'and messages size (130 bytes) is smaller than the max_message_size' do + let(:messages) do + 10.times.map do |index| + Ably::Models::Message(name: index.to_s, data: { "index" => index + 10 }) + end + end + + it 'publishes an array of messages in one HTTP request' do + expect(messages.sum &:size).to eq(130) + expect(client).to receive(:post).once.and_call_original + expect(channel.publish(messages)).to eql(true) + expect(channel.history.items.map(&:name)).to match_array(messages.map(&:name)) + expect(channel.history.items.map(&:data)).to match_array(messages.map(&:data)) + end + end + + context 'and messages size (177784 bytes) is bigger than the max_message_size' do + let(:messages) do + 10000.times.map do |index| + Ably::Models::Message(name: index.to_s, data: { "index" => index + 1 }) + end + end + + it 'should not publish and raise Ably::Exceptions::MaxMessageSizeExceeded' do + expect(messages.sum &:size).to eq(177784) + expect { channel.publish(messages) }.to raise_error(Ably::Exceptions::MaxMessageSizeExceeded) + end end end - it 'publishes an array of messages in one HTTP request' do - expect(messages.sum(&:size) < Ably::Rest::Channel::MAX_MESSAGE_SIZE).to eq(true) + context 'when max_message_size is 655 bytes' do + let(:max_message_size) { 655 } - expect(client).to receive(:post).once.and_call_original - expect(channel.publish(messages)).to eql(true) - expect(channel.history.items.map(&:name)).to match_array(messages.map(&:name)) - expect(channel.history.items.map(&:data)).to match_array(messages.map(&:data)) + before do + expect(client.max_message_size).to eq(max_message_size) + expect(client.max_frame_size).to eq(Ably::Rest::Client::MAX_FRAME_SIZE) + end + + context 'and messages size (130 bytes) is smaller than the max_message_size' do + let(:messages) do + 10.times.map do |index| + Ably::Models::Message(name: index.to_s, data: { "index" => index + 10 }) + end + end + + it 'publishes an array of messages in one HTTP request' do + expect(messages.sum &:size).to eq(130) + expect(client).to receive(:post).once.and_call_original + expect(channel.publish(messages)).to eql(true) + expect(channel.history.items.map(&:name)).to match_array(messages.map(&:name)) + expect(channel.history.items.map(&:data)).to match_array(messages.map(&:data)) + end + end + + context 'and messages size (177784 bytes) is bigger than the max_message_size' do + let(:messages) do + 10000.times.map do |index| + Ably::Models::Message(name: index.to_s, data: { "index" => index + 1 }) + end + end + + it 'should not publish and raise Ably::Exceptions::MaxMessageSizeExceeded' do + expect(messages.sum &:size).to eq(177784) + expect { channel.publish(messages) }.to raise_error(Ably::Exceptions::MaxMessageSizeExceeded) + end + end end end diff --git a/spec/unit/realtime/channel_spec.rb b/spec/unit/realtime/channel_spec.rb index b200971f7..97857b4a0 100644 --- a/spec/unit/realtime/channel_spec.rb +++ b/spec/unit/realtime/channel_spec.rb @@ -3,7 +3,7 @@ require 'shared/protocol_msgbus_behaviour' describe Ably::Realtime::Channel do - let(:client) { double('client').as_null_object } + let(:client) { Ably::Realtime::Client.new(token: 'valid') } let(:channel_name) { 'test' } subject do @@ -71,6 +71,7 @@ let(:message) { instance_double('Ably::Models::Message', client_id: nil, size: 0) } before do + allow(subject).to receive(:enqueue_messages_on_connection).and_return(message) allow(subject).to receive(:create_message).and_return(message) allow(subject).to receive(:attach).and_return(:true) end diff --git a/spec/unit/rest/channel_spec.rb b/spec/unit/rest/channel_spec.rb index d0460e9da..39703be2e 100644 --- a/spec/unit/rest/channel_spec.rb +++ b/spec/unit/rest/channel_spec.rb @@ -7,17 +7,14 @@ 'Ably::Rest::Client', encoders: [], post: instance_double('Faraday::Response', status: 201), - idempotent_rest_publishing: false, + idempotent_rest_publishing: false, max_message_size: max_message_size ) end let(:channel_name) { 'unique' } + let(:max_message_size) { nil } subject { Ably::Rest::Channel.new(client, channel_name) } - it 'should return Ably::Rest::Channel::MAX_MESSAGE_SIZE equal 65536 (TO3l8)' do - expect(Ably::Rest::Channel::MAX_MESSAGE_SIZE).to eq(65536) - end - describe '#initializer' do let(:channel_name) { random_str.encode(encoding) } @@ -132,8 +129,44 @@ end context 'max message size exceeded' do - it 'should raise Ably::Exceptions::MaxMessageSizeExceeded' do - expect { subject.publish('x' * 65537, 'data') }.to raise_error Ably::Exceptions::MaxMessageSizeExceeded + context 'when max_message_size is nil' do + context 'and a message size is 65537 bytes' do + it 'should raise Ably::Exceptions::MaxMessageSizeExceeded' do + expect { subject.publish('x' * 65537, 'data') }.to raise_error Ably::Exceptions::MaxMessageSizeExceeded + end + end + end + + context 'when max_message_size is 65536 bytes' do + let(:max_message_size) { 65536 } + + context 'and a message size is 65537 bytes' do + it 'should raise Ably::Exceptions::MaxMessageSizeExceeded' do + expect { subject.publish('x' * 65537, 'data') }.to raise_error Ably::Exceptions::MaxMessageSizeExceeded + end + end + + context 'and a message size is 10 bytes' do + it 'should send a message' do + expect(subject.publish('x' * 10, 'data')).to eq(true) + end + end + end + + context 'when max_message_size is 10 bytes' do + let(:max_message_size) { 10 } + + context 'and a message size is 11 bytes' do + it 'should raise Ably::Exceptions::MaxMessageSizeExceeded' do + expect { subject.publish('x' * 11, 'data') }.to raise_error Ably::Exceptions::MaxMessageSizeExceeded + end + end + + context 'and a message size is 2 bytes' do + it 'should send a message' do + expect(subject.publish('x' * 2, 'data')).to eq(true) + end + end end end end diff --git a/spec/unit/rest/client_spec.rb b/spec/unit/rest/client_spec.rb index d6e87c3d4..7b7da1638 100644 --- a/spec/unit/rest/client_spec.rb +++ b/spec/unit/rest/client_spec.rb @@ -87,6 +87,33 @@ end end end + + context 'max_message_size' do + context 'is not present' do + let(:client_options) { { key: 'appid.keyuid:keysecret' } } + + it 'should return default 65536 (#TO3l8)' do + expect(subject.max_message_size).to eq(Ably::Rest::Client::MAX_MESSAGE_SIZE) + end + end + + context 'is nil' do + let(:client_options) { { key: 'appid.keyuid:keysecret', max_message_size: nil } } + + it 'should return default 65536 (#TO3l8)' do + expect(Ably::Rest::Client::MAX_MESSAGE_SIZE).to eq(65536) + expect(subject.max_message_size).to eq(Ably::Rest::Client::MAX_MESSAGE_SIZE) + end + end + + context 'is customized 131072 bytes' do + let(:client_options) { { key: 'appid.keyuid:keysecret', max_message_size: 131072 } } + + it 'should return 131072' do + expect(subject.max_message_size).to eq(131072) + end + end + end end context 'request_id generation' do