Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions lib/ably/modules/conversions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,5 +115,39 @@ def ensure_supported_payload(payload)

raise Ably::Exceptions::UnsupportedDataType.new('Invalid data payload', 400, Ably::Exceptions::Codes::INVALID_MESSAGE_DATA_OR_ENCODING)
end

# Converts the name, data, attributes into the array of Message objects
#
# @return [Array<Ably::Models::Message>]
#
def build_messages(name, data = nil, attributes = {})
return [Ably::Models::Message(ensure_supported_name_and_payload(nil, data, attributes))] if name.nil?

Array(name).map do |item|
Ably::Models::Message(ensure_supported_name_and_payload(item, data, attributes))
end
end

# Ensures if the first argument (name) is a String, Hash or Ably::Models::Message object,
# second argument (data) should be a String, Hash, Array or nil (see ensure_supported_payload() method).
#
# @return [Hash] Contains :name, :data and other attributes
#
# (RSL1a, RSL1b)
#
def ensure_supported_name_and_payload(name, data = nil, attributes = {})
return name.attributes.dup if name.kind_of?(Ably::Models::Message)

payload = data
if (hash = name).kind_of?(Hash)
name, payload = hash[:name], (hash[:data] || payload)
attributes.merge!(hash)
end

name = ensure_utf_8(:name, name, allow_nil: true)
ensure_supported_payload payload

attributes.merge({ name: name, data: payload })
end
end
end
30 changes: 16 additions & 14 deletions lib/ably/realtime/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -129,23 +129,31 @@ def initialize(client, name, channel_options = {})
# @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks
#
# @example
# # Publish a single message
# # Publish a single message form
# channel.publish 'click', { x: 1, y: 2 }
#
# # Publish an array of message Hashes
# # Publish a single message with single Hash form
# message = { name: 'click', data: { x: 1, y: 2 } }
# channel.publish message
#
# # Publish an array of message Hashes form
# messages = [
# { name: 'click', { x: 1, y: 2 } },
# { name: 'click', { x: 2, y: 3 } }
# { name: 'click', data: { x: 1, y: 2 } },
# { name: 'click', data: { x: 2, y: 3 } }
# ]
# channel.publish messages
#
# # Publish an array of Ably::Models::Message objects
# # Publish an array of Ably::Models::Message objects form
# messages = [
# Ably::Models::Message(name: 'click', { x: 1, y: 2 })
# Ably::Models::Message(name: 'click', { x: 2, y: 3 })
# Ably::Models::Message(name: 'click', data: { x: 1, y: 2 })
# Ably::Models::Message(name: 'click', data: { x: 2, y: 3 })
# ]
# channel.publish messages
#
# # Publish an array of Ably::Models::Message objects form
# message = Ably::Models::Message(name: 'click', data: { x: 1, y: 2 })
# channel.publish message
#
# channel.publish('click', 'body') do |message|
# puts "#{message.name} event received with #{message.data}"
# end
Expand All @@ -165,13 +173,7 @@ def publish(name, data = nil, attributes = {}, &success_block)
return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
end

messages = if name.kind_of?(Enumerable)
name
else
name = ensure_utf_8(:name, name, allow_nil: true)
ensure_supported_payload data
[{ name: name, data: data }.merge(attributes)]
end
messages = build_messages(name, data, attributes) # (RSL1a, RSL1b)

if messages.length > Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE
error = Ably::Exceptions::InvalidRequest.new("It is not possible to publish more than #{Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE} messages with a single publish request.")
Expand Down
43 changes: 17 additions & 26 deletions lib/ably/rest/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,52 +41,43 @@ def initialize(client, name, channel_options = {})
@push = PushChannel.new(self)
end

# Publish one or more messages to the channel. Three overloaded forms
# Publish one or more messages to the channel. Five overloaded forms
# @param name [String, Array<Ably::Models::Message|Hash>, Ably::Models::Message, nil] The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with +:name+ and +:data+ pairs, or a single Ably::Model::Message object
# @param data [String, ByteArray, Hash, nil] The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument, in which case an optional hash of query parameters
# @param data [String, Array, Hash, nil] The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument, in which case an optional hash of query parameters
# @param attributes [Hash, nil] Optional additional message attributes such as :extras, :id, :client_id or :connection_id, applied when name attribute is nil or a string (Deprecated, will be removed in 2.0 in favour of constructing a Message object)
# @return [Boolean] true if the message was published, otherwise false
#
# @example
# # Publish a single message with (name, data) form
# channel.publish 'click', { x: 1, y: 2 }
#
# # Publish an array of message Hashes
# # Publish a single message with single Hash form
# message = { name: 'click', data: { x: 1, y: 2 } }
# channel.publish message
#
# # Publish an array of message Hashes form
# messages = [
# { name: 'click', data: { x: 1, y: 2 } },
# { name: 'click', data: { x: 2, y: 3 } }
# ]
# channel.publish messages
#
# # Publish an array of Ably::Models::Message objects
# # Publish an array of Ably::Models::Message objects form
# messages = [
# Ably::Models::Message(name: 'click', { x: 1, y: 2 })
# Ably::Models::Message(name: 'click', { x: 2, y: 3 })
# Ably::Models::Message(name: 'click', data: { x: 1, y: 2 })
# Ably::Models::Message(name: 'click', data: { x: 2, y: 3 })
# ]
# channel.publish messages
#
# # Publish a single Ably::Models::Message object, with a query params
# # specifying quickAck: true
# message = Ably::Models::Message(name: 'click', { x: 1, y: 2 })
# channel.publish message, quickAck: 'true'
# # Publish a single Ably::Models::Message object form
# message = Ably::Models::Message(name: 'click', data: { x: 1, y: 2 })
# channel.publish message
#
def publish(first, second = nil, third = {})
messages, qs_params = if first.kind_of?(Enumerable)
# ([Message], qs_params) form
[first, second]
elsif first.kind_of?(Ably::Models::Message)
# (Message, qs_params) form
[[first], second]
else
# (name, data, attributes) form
first = ensure_utf_8(:name, first, allow_nil: true)
ensure_supported_payload second
# RSL1h - attributes as an extra method parameter is extra-spec but need to
# keep it for backcompat until version 2
[[{ name: first, data: second }.merge(third)], nil]
end
def publish(name, data = nil, attributes = {})
qs_params = nil
qs_params = data if name.kind_of?(Enumerable) || name.kind_of?(Ably::Models::Message)

messages.map! { |message| Ably::Models::Message(message.dup) }
messages = build_messages(name, data, attributes) # (RSL1a, RSL1b)

if messages.sum(&:size) > Ably::Rest::Channel::MAX_MESSAGE_SIZE
raise Ably::Exceptions::MaxMessageSizeExceeded.new("Maximum message size exceeded #{Ably::Rest::Channel::MAX_MESSAGE_SIZE}.")
Expand Down
77 changes: 77 additions & 0 deletions spec/acceptance/realtime/message_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,83 @@ def publish_and_check_data(data)
end
end

context 'a single Message object (#RSL1a)' do
let(:name) { random_str }
let(:data) { random_str }
let(:message) { Ably::Models::Message.new(name: name, data: data) }

it 'publishes the message' do
channel.attach
channel.publish(message)
channel.subscribe do |msg|
expect(msg.name).to eq(message.name)
expect(msg.data).to eq(message.data)
stop_reactor
end
end
end

context 'an array of Message objects (#RSL1a)' do
let(:data) { random_str }
let(:message1) { Ably::Models::Message.new(name: random_str, data: data) }
let(:message2) { Ably::Models::Message.new(name: random_str, data: data) }
let(:message3) { Ably::Models::Message.new(name: random_str, data: data) }

it 'publishes three messages' do
channel.attach
channel.publish([message1, message2, message3])
counter = 0
channel.subscribe do |message|
counter += 1
expect(message.data).to eq(data)
expect(message.name).to eq(message1.name) if counter == 1
expect(message.name).to eq(message2.name) if counter == 2
if counter == 3
expect(message.name).to eq(message3.name)
stop_reactor
end
end
end
end

context 'an array of hashes (#RSL1a)' do
let(:data) { random_str }
let(:message1) { { name: random_str, data: data } }
let(:message2) { { name: random_str, data: data } }
let(:message3) { { name: random_str, data: data } }

it 'publishes three messages' do
channel.attach
channel.publish([message1, message2, message3])
counter = 0
channel.subscribe do |message|
counter += 1
expect(message.data).to eq(data)
expect(message.name).to eq(message1[:name]) if counter == 1
expect(message.name).to eq(message2[:name]) if counter == 2
if counter == 3
expect(message.name).to eq(message3[:name])
stop_reactor
end
end
end
end

context 'a name with data payload (#RSL1a, #RSL1b)' do
let(:name) { random_str }
let(:data) { random_str }

it 'publishes a message' do
channel.attach
channel.publish(name, data)
channel.subscribe do |message|
expect(message.name).to eql(name)
expect(message.data).to eq(data)
stop_reactor
end
end
end

context 'with supported extra payload content type (#RTL6h, #RSL6a2)' do
let(:channel) { client.channel("pushenabled:#{random_str}") }

Expand Down
51 changes: 51 additions & 0 deletions spec/acceptance/rest/message_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,57 @@
end
end

context 'a single Message object (#RSL1a)' do
let(:name) { random_str }
let(:data) { random_str }
let(:message) { Ably::Models::Message.new(name: name, data: data) }

it 'publishes the message' do
channel.publish(message)
expect(channel.history.items.length).to eql(1)
message = channel.history.items.first
expect(message.name).to eq(name)
expect(message.data).to eq(data)
end
end

context 'an array of Message objects (#RSL1a)' do
let(:data) { random_str }
let(:message1) { Ably::Models::Message.new(name: random_str, data: data) }
let(:message2) { Ably::Models::Message.new(name: random_str, data: data) }
let(:message3) { Ably::Models::Message.new(name: random_str, data: data) }

it 'publishes three messages' do
channel.publish([message1, message2, message3])
expect(channel.history.items.length).to eql(3)
end
end

context 'an array of hashes (#RSL1a)' do
let(:data) { random_str }
let(:message1) { { name: random_str, data: data } }
let(:message2) { { name: random_str, data: data } }
let(:message3) { { name: random_str, data: data } }

it 'publishes three messages' do
channel.publish([message1, message2, message3])
expect(channel.history.items.length).to eql(3)
end
end

context 'a name with data payload (#RSL1a, #RSL1b)' do
let(:name) { random_str }
let(:data) { random_str }

it 'publishes the message' do
channel.publish(name, data)
expect(channel.history.items.length).to eql(1)
message = channel.history.items.first
expect(message.name).to eq(name)
expect(message.data).to eq(data)
end
end

context 'with supported data payload content type' do
context 'JSON Object (Hash)' do
let(:data) { { 'Hash' => 'true' } }
Expand Down