Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions lib/ably/models/connection_details.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ def self.ConnectionDetails(attributes)
class ConnectionDetails
include Ably::Modules::ModelCommon

# Max message size
MAX_MESSAGE_SIZE = 65536 # See spec TO3l8

# Max frame size
MAX_FRAME_SIZE = 524288 # See spec TO3l9

# @param attributes [Hash]
# @option attributes [String] :client_id contains the client ID assigned to the connection
# @option attributes [String] :connection_key the connection secret key string that is used to resume a connection and its state
Expand All @@ -38,8 +44,8 @@ def initialize(attributes = {})
self.attributes[duration_field] = (self.attributes[duration_field].to_f / 1000).round
end
end
self.attributes[:max_message_size] ||= 65536
self.attributes[:max_frame_size] ||= 524288
self.attributes[:max_message_size] ||= MAX_MESSAGE_SIZE
self.attributes[:max_frame_size] ||= MAX_FRAME_SIZE
self.attributes.freeze
end

Expand Down
5 changes: 3 additions & 2 deletions lib/ably/realtime/channel/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ def enqueue_messages_on_connection(client, raw_messages, channel_name, channel_o
end
end

if messages.sum(&:size) > Ably::Realtime::Connection::MAX_MESSAGE_SIZE
error = Ably::Exceptions::MaxMessageSizeExceeded.new("Message size exceeded #{Ably::Realtime::Connection::MAX_MESSAGE_SIZE} bytes.")
max_message_size = connection.details && connection.details.max_message_size || Ably::Models::ConnectionDetails::MAX_MESSAGE_SIZE
if messages.sum(&:size) > max_message_size
error = Ably::Exceptions::MaxMessageSizeExceeded.new("Message size exceeded #{max_message_size} bytes.")
return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
end

Expand Down
3 changes: 0 additions & 3 deletions lib/ably/realtime/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ class Connection
# Max number of messages to bundle in a single ProtocolMessage
MAX_PROTOCOL_MESSAGE_BATCH_SIZE = 50

# Max message size
MAX_MESSAGE_SIZE = 65536 # See spec TO3l8

# A unique public identifier for this connection, used to identify this member in presence events and messages
# @return [String]
attr_reader :id
Expand Down
5 changes: 2 additions & 3 deletions lib/ably/rest/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ class Channel
attr_reader :push

IDEMPOTENT_LIBRARY_GENERATED_ID_LENGTH = 9 # See spec RSL1k1
MAX_MESSAGE_SIZE = 65536 # See spec TO3l8

# Initialize a new Channel object
#
Expand Down Expand Up @@ -88,8 +87,8 @@ def publish(first, second = nil, third = {})

messages.map! { |message| Ably::Models::Message(message.dup) }

if messages.sum(&:size) > Ably::Rest::Channel::MAX_MESSAGE_SIZE
raise Ably::Exceptions::MaxMessageSizeExceeded.new("Maximum message size exceeded #{Ably::Rest::Channel::MAX_MESSAGE_SIZE}.")
if messages.sum(&:size) > (max_message_size = client.max_message_size || Ably::Rest::Client::MAX_MESSAGE_SIZE)
raise Ably::Exceptions::MaxMessageSizeExceeded.new("Maximum message size exceeded #{max_message_size} bytes.")
end

payload = messages.map do |message|
Expand Down
20 changes: 16 additions & 4 deletions lib/ably/rest/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ class Client
# Default Ably domain for REST
DOMAIN = 'rest.ably.io'

MAX_FRAME_SIZE = 524288
MAX_MESSAGE_SIZE = 65536 # See spec TO3l8
MAX_FRAME_SIZE = 524288 # See spec TO3l8

# Configuration for HTTP timeouts and HTTP request reattempts to fallback hosts
HTTP_DEFAULTS = {
Expand Down Expand Up @@ -118,6 +119,14 @@ class Client
# @return [Boolean]
attr_reader :idempotent_rest_publishing

# Max message size (TO2, TO3l8) by default (65536 bytes) 64KiB
# @return [Integer]
attr_reader :max_message_size

# Max frame size (TO2, TO3l8) by default (524288 bytes) 512KiB
# @return [Integer]
attr_reader :max_frame_size

# Creates a {Ably::Rest::Client Rest Client} and configures the {Ably::Auth} object for the connection.
#
# @param [Hash,String] options an options Hash used to configure the client and the authentication, or String with an API key or Token ID
Expand Down Expand Up @@ -152,6 +161,8 @@ class Client
#
# @option options [Boolean] :add_request_ids (false) When true, adds a unique request_id to each request sent to Ably servers. This is handy when reporting issues, because you can refer to a specific request.
# @option options [Boolean] :idempotent_rest_publishing (false if ver < 1.2) When true, idempotent publishing is enabled for all messages published via REST
# @option options [Integer] :max_message_size (65536 bytes) Maximum size of all messages when publishing via REST publish()
# @option options [Integer] :max_frame_size (524288 bytes) Maximum size of frame
#
# @return [Ably::Rest::Client]
#
Expand Down Expand Up @@ -189,7 +200,8 @@ def initialize(options)
@add_request_ids = options.delete(:add_request_ids)
@log_retries_as_info = options.delete(:log_retries_as_info)
@idempotent_rest_publishing = options.delete(:idempotent_rest_publishing) || Ably.major_minor_version_numeric > 1.1

@max_message_size = options.delete(:max_message_size) || MAX_MESSAGE_SIZE
@max_frame_size = options.delete(:max_frame_size) || MAX_FRAME_SIZE

if options[:fallback_hosts_use_default] && options[:fallback_hosts]
raise ArgumentError, "fallback_hosts_use_default cannot be set to try when fallback_hosts is also provided"
Expand Down Expand Up @@ -365,8 +377,8 @@ def request(method, path, params = {}, body = nil, headers = {}, options = {})
send_request(method, path, params, headers: headers)
end
when :post, :patch, :put
if body.to_json.bytesize > MAX_FRAME_SIZE
raise Ably::Exceptions::MaxFrameSizeExceeded.new("Maximum frame size exceeded #{MAX_FRAME_SIZE} bytes.")
if body.to_json.bytesize > max_frame_size
raise Ably::Exceptions::MaxFrameSizeExceeded.new("Maximum frame size exceeded #{max_frame_size} bytes.")
end
path_with_params = Addressable::URI.new
path_with_params.query_values = params || {}
Expand Down
97 changes: 90 additions & 7 deletions spec/acceptance/realtime/channel_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1181,7 +1181,7 @@ def disconnect_transport
end

context 'with more than allowed messages in a single publish' do
let(:channel_name) { random_str }
65536

it 'rejects the publish' do
messages = (Ably::Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE + 1).times.map do
Expand Down Expand Up @@ -1405,15 +1405,98 @@ def disconnect_transport
end

context 'message size exceeded (#TO3l8)' do
let(:message) { 'x' * 700000 }

let(:client) { auto_close Ably::Realtime::Client.new(client_options) }
let(:channel) { client.channels.get(channel_name) }

it 'should not allow to send a message' do
channel.publish('event', message).errback do |error|
expect(error).to be_instance_of(Ably::Exceptions::MaxMessageSizeExceeded)
stop_reactor
context 'and max_message_size is default (65536 bytes)' do
let(:channel_name) { random_str }
let(:max_message_size) { 65536 }

it 'should allow to send a message (32 bytes)' do
client.connection.once(:connected) do
channel.subscribe('event') do |msg|
expect(msg.data).to eq('x' * 32)
stop_reactor
end
channel.publish('event', 'x' * 32)
end
end

it 'should not allow to send a message (700000 bytes)' do
client.connection.once(:connected) do
connection_details = Ably::Models::ConnectionDetails.new(
client.connection.details.attributes.attributes.merge('maxMessageSize' => max_message_size)
)
client.connection.set_connection_details(connection_details)
expect(client.connection.details.max_message_size).to eq(65536)
channel.publish('event', 'x' * 700000).errback do |error|
expect(error).to be_instance_of(Ably::Exceptions::MaxMessageSizeExceeded)
stop_reactor
end
end
end
end

context 'and max_message_size is customized (11 bytes)' do
let(:max_message_size) { 11 }

context 'and the message size is 30 bytes' do
let(:channel_name) { random_str }

it 'should not allow to send a message' do
client.connection.once(:connected) do
connection_details = Ably::Models::ConnectionDetails.new(
client.connection.details.attributes.attributes.merge('maxMessageSize' => max_message_size)
)
client.connection.set_connection_details(connection_details)
expect(client.connection.details.max_message_size).to eq(11)
channel.publish('event', 'x' * 30).errback do |error|
expect(error).to be_instance_of(Ably::Exceptions::MaxMessageSizeExceeded)
stop_reactor
end
end
end
end
end

context 'and max_message_size is nil' do
let(:max_message_size) { nil }

context 'and the message size is 30 bytes' do
let(:channel_name) { random_str }

it 'should allow to send a message' do
client.connection.once(:connected) do
connection_details = Ably::Models::ConnectionDetails.new(
client.connection.details.attributes.attributes.merge('maxMessageSize' => max_message_size)
)
client.connection.set_connection_details(connection_details)
expect(client.connection.details.max_message_size).to eq(65536)
channel.subscribe('event') do |msg|
expect(msg.data).to eq('x' * 30)
stop_reactor
end
channel.publish('event', 'x' * 30)
end
end
end

context 'and the message size is 65537 bytes' do
let(:channel_name) { random_str }

it 'should not allow to send a message' do
client.connection.once(:connected) do
connection_details = Ably::Models::ConnectionDetails.new(
client.connection.details.attributes.attributes.merge('maxMessageSize' => max_message_size)
)
client.connection.set_connection_details(connection_details)
expect(client.connection.details.max_message_size).to eq(65536)
channel.publish('event', 'x' * 65537).errback do |error|
expect(error).to be_instance_of(Ably::Exceptions::MaxMessageSizeExceeded)
stop_reactor
end
end
end
end
end
end
Expand Down
84 changes: 73 additions & 11 deletions spec/acceptance/rest/channel_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
include Ably::Modules::Conversions

vary_by_protocol do
let(:default_options) { { key: api_key, environment: environment, protocol: protocol} }
let(:default_options) { { key: api_key, environment: environment, protocol: protocol, max_frame_size: max_frame_size, max_message_size: max_message_size } }
let(:client_options) { default_options }
let(:client) do
Ably::Rest::Client.new(client_options)
end
let(:max_message_size) { nil }
let(:max_frame_size) { nil }

describe '#publish' do
let(:channel_name) { random_str }
Expand Down Expand Up @@ -60,7 +62,8 @@
end

it 'publishes an array of messages in one HTTP request' do
expect(messages.sum(&:size) < Ably::Rest::Channel::MAX_MESSAGE_SIZE).to eq(true)
expect(client.max_message_size).to eq(Ably::Rest::Client::MAX_MESSAGE_SIZE)
expect(messages.sum(&:size) < Ably::Rest::Client::MAX_MESSAGE_SIZE).to eq(true)

expect(client).to receive(:post).once.and_call_original
expect(channel.publish(messages)).to eql(true)
Expand All @@ -70,19 +73,78 @@
end

context 'with an array of Message objects' do
let(:messages) do
10.times.map do |index|
Ably::Models::Message(name: index.to_s, data: { "index" => index + 10 })
context 'when max_message_size and max_frame_size is not set' do
before do
expect(client.max_message_size).to eq(Ably::Rest::Client::MAX_MESSAGE_SIZE)
expect(client.max_frame_size).to eq(Ably::Rest::Client::MAX_FRAME_SIZE)
end

context 'and messages size (130 bytes) is smaller than the max_message_size' do
let(:messages) do
10.times.map do |index|
Ably::Models::Message(name: index.to_s, data: { "index" => index + 10 })
end
end

it 'publishes an array of messages in one HTTP request' do
expect(messages.sum &:size).to eq(130)
expect(client).to receive(:post).once.and_call_original
expect(channel.publish(messages)).to eql(true)
expect(channel.history.items.map(&:name)).to match_array(messages.map(&:name))
expect(channel.history.items.map(&:data)).to match_array(messages.map(&:data))
end
end

context 'and messages size (177784 bytes) is bigger than the max_message_size' do
let(:messages) do
10000.times.map do |index|
Ably::Models::Message(name: index.to_s, data: { "index" => index + 1 })
end
end

it 'should not publish and raise Ably::Exceptions::MaxMessageSizeExceeded' do
expect(messages.sum &:size).to eq(177784)
expect { channel.publish(messages) }.to raise_error(Ably::Exceptions::MaxMessageSizeExceeded)
end
end
end

it 'publishes an array of messages in one HTTP request' do
expect(messages.sum(&:size) < Ably::Rest::Channel::MAX_MESSAGE_SIZE).to eq(true)
context 'when max_message_size is 655 bytes' do
let(:max_message_size) { 655 }

expect(client).to receive(:post).once.and_call_original
expect(channel.publish(messages)).to eql(true)
expect(channel.history.items.map(&:name)).to match_array(messages.map(&:name))
expect(channel.history.items.map(&:data)).to match_array(messages.map(&:data))
before do
expect(client.max_message_size).to eq(max_message_size)
expect(client.max_frame_size).to eq(Ably::Rest::Client::MAX_FRAME_SIZE)
end

context 'and messages size (130 bytes) is smaller than the max_message_size' do
let(:messages) do
10.times.map do |index|
Ably::Models::Message(name: index.to_s, data: { "index" => index + 10 })
end
end

it 'publishes an array of messages in one HTTP request' do
expect(messages.sum &:size).to eq(130)
expect(client).to receive(:post).once.and_call_original
expect(channel.publish(messages)).to eql(true)
expect(channel.history.items.map(&:name)).to match_array(messages.map(&:name))
expect(channel.history.items.map(&:data)).to match_array(messages.map(&:data))
end
end

context 'and messages size (177784 bytes) is bigger than the max_message_size' do
let(:messages) do
10000.times.map do |index|
Ably::Models::Message(name: index.to_s, data: { "index" => index + 1 })
end
end

it 'should not publish and raise Ably::Exceptions::MaxMessageSizeExceeded' do
expect(messages.sum &:size).to eq(177784)
expect { channel.publish(messages) }.to raise_error(Ably::Exceptions::MaxMessageSizeExceeded)
end
end
end
end

Expand Down
3 changes: 2 additions & 1 deletion spec/unit/realtime/channel_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
require 'shared/protocol_msgbus_behaviour'

describe Ably::Realtime::Channel do
let(:client) { double('client').as_null_object }
let(:client) { Ably::Realtime::Client.new(token: 'valid') }
let(:channel_name) { 'test' }

subject do
Expand Down Expand Up @@ -71,6 +71,7 @@
let(:message) { instance_double('Ably::Models::Message', client_id: nil, size: 0) }

before do
allow(subject).to receive(:enqueue_messages_on_connection).and_return(message)
allow(subject).to receive(:create_message).and_return(message)
allow(subject).to receive(:attach).and_return(:true)
end
Expand Down
Loading