diff --git a/Emulsion.Tests/Emulsion.Tests.fsproj b/Emulsion.Tests/Emulsion.Tests.fsproj index d92458df..2300bc39 100644 --- a/Emulsion.Tests/Emulsion.Tests.fsproj +++ b/Emulsion.Tests/Emulsion.Tests.fsproj @@ -7,8 +7,10 @@ - + + + @@ -16,8 +18,11 @@ + - + + + diff --git a/Emulsion.Tests/ExceptionUtilsTests.fs b/Emulsion.Tests/ExceptionUtilsTests.fs new file mode 100644 index 00000000..1bf8a38b --- /dev/null +++ b/Emulsion.Tests/ExceptionUtilsTests.fs @@ -0,0 +1,35 @@ +module Emulsion.Tests.ExceptionUtilsTests + +open System + +open Emulsion +open Xunit + +[] +let ``reraise works in sync code``(): unit = + let nestedStacktrace() = + raise <| Exception("Foo") + let thrown = + try + nestedStacktrace() + null + with + | ex -> ex + let rethrown = Assert.Throws(fun () -> ExceptionUtils.reraise thrown |> ignore) + Assert.Contains("nestedStacktrace", rethrown.StackTrace) + +[] +let ``reraise works in async code``(): unit = + let nestedStacktrace() = + raise <| Exception("Foo") + + let ex = Assert.Throws(fun () -> + async { + try + nestedStacktrace() + with + | ex -> + ExceptionUtils.reraise ex + } |> Async.RunSynchronously + ) + Assert.Contains("nestedStacktrace", ex.StackTrace) diff --git a/Emulsion.Tests/LifetimesTests.fs b/Emulsion.Tests/LifetimesTests.fs new file mode 100644 index 00000000..3fec5367 --- /dev/null +++ b/Emulsion.Tests/LifetimesTests.fs @@ -0,0 +1,16 @@ +module Emulsion.Tests.LifetimesTests + +open JetBrains.Lifetimes + +open Xunit + +open System.Threading.Tasks +open Emulsion.Lifetimes + +[] +let ``awaitTermination completes after the parent lifetime is terminated``(): unit = + use ld = Lifetime.Define() + let task = Async.StartAsTask <| awaitTermination ld.Lifetime + Assert.False task.IsCompleted + ld.Terminate() + task.GetAwaiter().GetResult() |> ignore diff --git a/Emulsion.Tests/Settings.fs b/Emulsion.Tests/SettingsTests.fs similarity index 96% rename from Emulsion.Tests/Settings.fs rename to Emulsion.Tests/SettingsTests.fs index 2045bc31..596c1b80 100644 --- a/Emulsion.Tests/Settings.fs +++ b/Emulsion.Tests/SettingsTests.fs @@ -1,4 +1,4 @@ -module Emulsion.Tests.Settings +module Emulsion.Tests.SettingsTests open System open System.IO diff --git a/Emulsion.Tests/Xmpp/EmulsionXmppTests.fs b/Emulsion.Tests/Xmpp/EmulsionXmppTests.fs new file mode 100644 index 00000000..9ba8d14f --- /dev/null +++ b/Emulsion.Tests/Xmpp/EmulsionXmppTests.fs @@ -0,0 +1,170 @@ +module Emulsion.Tests.Xmpp.EmulsionXmppTests + +open System +open System.Threading.Tasks + +open JetBrains.Lifetimes +open SharpXMPP +open Xunit +open Xunit.Abstractions + +open Emulsion.Settings +open Emulsion +open Emulsion.Lifetimes +open Emulsion.Tests.TestUtils +open Emulsion.Tests.Xmpp +open Emulsion.Xmpp +open Emulsion.Xmpp.SharpXmppHelper.Elements + +let private settings = { + Login = "user@example.org" + Password = "password" + Room = "room@conference.example.org" + Nickname = "nickname" +} + +type RunTests(outputHelper: ITestOutputHelper) = + let logger = Logging.xunitLogger outputHelper + + [] + member __.``EmulsionXmpp connects the server``(): unit = + let mutable connectionFailedHandler = ignore + let disconnect() = connectionFailedHandler(ConnFailedArgs()) + let mutable connectCalled = false + let client = + XmppClientFactory.create( + addConnectionFailedHandler = (fun _ h -> connectionFailedHandler <- h), + connect = (fun () -> async { + connectCalled <- true + disconnect() + }) + ) + Assert.ThrowsAny(fun() -> Async.RunSynchronously <| EmulsionXmpp.run settings logger client ignore) + |> ignore + Assert.True connectCalled + + [] + member __.``EmulsionXmpp connects the room``(): unit = + let mutable connectionFailedHandler = ignore + let disconnect() = connectionFailedHandler(ConnFailedArgs()) + let mutable joinRoomArgs = Unchecked.defaultof<_> + let client = + XmppClientFactory.create( + addConnectionFailedHandler = (fun _ h -> connectionFailedHandler <- h), + joinMultiUserChat = (fun roomJid nickname -> + joinRoomArgs <- (roomJid.FullJid, nickname) + disconnect() + ) + ) + Assert.ThrowsAny(fun() -> Async.RunSynchronously <| EmulsionXmpp.run settings logger client ignore) + |> ignore + Assert.Equal((settings.Room, settings.Nickname), joinRoomArgs) + +type ReceiveMessageTests(outputHelper: ITestOutputHelper) = + let logger = Logging.xunitLogger outputHelper + + let runReceiveMessageTest message = + let mutable connectionFailedHandler = ignore + let receiveHandlers = ResizeArray() + + let sendMessage msg = receiveHandlers |> Seq.iter (fun h -> h msg) + let disconnect() = connectionFailedHandler(ConnFailedArgs()) + + let mutable messageReceived = None + let onMessageReceived = fun m -> messageReceived <- Some m + + let client = + XmppClientFactory.create( + addConnectionFailedHandler = (fun _ h -> connectionFailedHandler <- h), + addMessageHandler = (fun _ h -> receiveHandlers.Add h), + joinMultiUserChat = fun _ _ -> + sendMessage message + disconnect() + ) + Assert.ThrowsAny(fun() -> + Async.RunSynchronously <| EmulsionXmpp.run settings logger client onMessageReceived + ) |> ignore + + messageReceived + + [] + member __.``Ordinary message gets received by the client``(): unit = + let incomingMessage = XmppMessageFactory.create("room@conference.example.org/sender", + "test", + messageType = "groupchat") + let receivedMessage = runReceiveMessageTest incomingMessage + Assert.Equal(Some <| XmppMessage { author = "sender"; text = "test" }, receivedMessage) + + [] + member __.``Own message gets skipped by the client``(): unit = + let ownMessage = XmppMessageFactory.create("room@conference.example.org/nickname", + "test", + messageType = "groupchat") + let receivedMessage = runReceiveMessageTest ownMessage + Assert.Equal(None, receivedMessage) + + [] + member __.``Historical message gets skipped by the client``(): unit = + let historicalMessage = XmppMessageFactory.create("room@conference.example.org/sender", + "test", + messageType = "groupchat", + delayDate = "2019-01-01") + let receivedMessage = runReceiveMessageTest historicalMessage + Assert.Equal(None, receivedMessage) + + [] + member __.``Empty message gets skipped by the client``(): unit = + let emptyMessage = XmppMessageFactory.create("room@conference.example.org/sender", + "", + messageType = "groupchat") + let receivedMessage = runReceiveMessageTest emptyMessage + Assert.Equal(None, receivedMessage) + +type SendTests(outputHelper: ITestOutputHelper) = + let logger = Logging.xunitLogger outputHelper + + [] + member __.``send function calls the Send method on the client``(): unit = + use ld = Lifetime.Define() + let lt = ld.Lifetime + let mutable sentMessage = Unchecked.defaultof<_> + let client = XmppClientFactory.create(send = fun m -> + sentMessage <- m + ld.Terminate() + ) + + let outgoingMessage = { author = "author"; text = "text" } + Assert.Throws(fun () -> + Async.RunSynchronously <| EmulsionXmpp.send logger client lt settings outgoingMessage + ) |> ignore + + let text = sentMessage.Element(Body).Value + Assert.Equal(" text", text) + + [] + member __.``send function awaits the message delivery``(): Task = + upcast (async { + use ld = Lifetime.Define() + let lt = ld.Lifetime + let messageId = lt.CreateTaskCompletionSource() + let messageHandlers = ResizeArray() + let onMessage msg = messageHandlers |> Seq.iter (fun h -> h msg) + + let client = + XmppClientFactory.create( + addMessageHandler = (fun _ h -> messageHandlers.Add h), + send = fun m -> messageId.SetResult(Option.get <| SharpXmppHelper.getMessageId m) + ) + let outgoingMessage = { author = "author"; text = "text" } + + let! receival = Async.StartChild <| EmulsionXmpp.send logger client lt settings outgoingMessage + let receivalTask = Async.StartAsTask receival + let! messageId = Async.AwaitTask messageId.Task // the send has been completed + + // Wait for 100 ms to check that the receival is not completed yet: + Assert.False(receivalTask.Wait(TimeSpan.FromMilliseconds 100.0)) + + let deliveryMessage = SharpXmppHelper.message messageId "" "" + onMessage deliveryMessage + do! receival + } |> Async.StartAsTask) diff --git a/Emulsion.Tests/Xmpp/SharpXmppHelper.fs b/Emulsion.Tests/Xmpp/SharpXmppHelperTests.fs similarity index 95% rename from Emulsion.Tests/Xmpp/SharpXmppHelper.fs rename to Emulsion.Tests/Xmpp/SharpXmppHelperTests.fs index 2cd38324..75fcf1ef 100644 --- a/Emulsion.Tests/Xmpp/SharpXmppHelper.fs +++ b/Emulsion.Tests/Xmpp/SharpXmppHelperTests.fs @@ -1,4 +1,4 @@ -module Emulsion.Tests.Xmpp.SharpXmppHelper +module Emulsion.Tests.Xmpp.SharpXmppHelperTests open System.Xml.Linq @@ -10,7 +10,7 @@ open Emulsion.Xmpp [] let ``Message body has a proper namespace``() = - let message = SharpXmppHelper.message "cthulhu@test" "text" + let message = SharpXmppHelper.message "" "cthulhu@test" "text" let body = Seq.exactlyOne(message.Descendants()) Assert.Equal(XNamespace.Get "jabber:client", body.Name.Namespace) diff --git a/Emulsion.Tests/Xmpp/XmppClientFactory.fs b/Emulsion.Tests/Xmpp/XmppClientFactory.fs new file mode 100644 index 00000000..410a3278 --- /dev/null +++ b/Emulsion.Tests/Xmpp/XmppClientFactory.fs @@ -0,0 +1,27 @@ +namespace Emulsion.Tests.Xmpp + +open Emulsion.Xmpp.XmppClient + +type XmppClientFactory = + static member create(?connect, + ?joinMultiUserChat, + ?send, + ?addConnectionFailedHandler, + ?addPresenceHandler, + ?addMessageHandler): IXmppClient = + let connect = defaultArg connect <| fun () -> async { return () } + let joinMultiUserChat = defaultArg joinMultiUserChat <| fun _ _ -> () + let send = defaultArg send ignore + let addConnectionFailedHandler = defaultArg addConnectionFailedHandler <| fun _ _ -> () + let addPresenceHandler = defaultArg addPresenceHandler <| fun _ _ -> () + let addMessageHandler = defaultArg addMessageHandler <| fun _ _ -> () + { new IXmppClient with + member __.Connect() = connect() + member __.JoinMultiUserChat roomJid nickname = joinMultiUserChat roomJid nickname + member __.Send m = send m + member __.AddConnectionFailedHandler lt handler = addConnectionFailedHandler lt handler + member __.AddSignedInHandler _ _ = () + member __.AddElementHandler _ _ = () + member __.AddPresenceHandler lt handler = addPresenceHandler lt handler + member __.AddMessageHandler lt handler = addMessageHandler lt handler + } diff --git a/Emulsion.Tests/Xmpp/XmppClientTests.fs b/Emulsion.Tests/Xmpp/XmppClientTests.fs new file mode 100644 index 00000000..7395937b --- /dev/null +++ b/Emulsion.Tests/Xmpp/XmppClientTests.fs @@ -0,0 +1,225 @@ +module Emulsion.Tests.Xmpp.XmppClientTests + +open System +open System.Threading.Tasks +open System.Xml.Linq + +open JetBrains.Lifetimes +open SharpXMPP +open SharpXMPP.XMPP +open SharpXMPP.XMPP.Client.Elements +open Xunit + +open Emulsion.Xmpp +open Emulsion.Xmpp.SharpXmppHelper.Attributes +open Emulsion.Xmpp.SharpXmppHelper.Elements + +let private createPresenceFor (roomJid: JID) nickname = + let presence = XMPPPresence() + let participantJid = JID(roomJid.FullJid) + participantJid.Resource <- nickname + presence.SetAttributeValue(From, participantJid.FullJid) + presence + +let private createSelfPresence roomJid nickname (statusCode: int) = + let presence = createPresenceFor roomJid nickname + let x = XElement X + let status = XElement Status + status.SetAttributeValue(Code, statusCode) + x.Add status + presence.Add x + presence + +let private createErrorPresence roomJid nickname errorXml = + let presence = createPresenceFor roomJid nickname + presence.SetAttributeValue(Type, "error") + let error = XElement Error + let errorChild = XElement.Parse errorXml + error.Add errorChild + presence.Add error + presence + +let private createLeavePresence roomJid nickname = + let presence = createSelfPresence roomJid nickname 307 + presence.SetAttributeValue(Type, "unavailable") + presence + +let private sendPresence presence handlers = + Seq.iter (fun h -> h presence) handlers + +let private createErrorMessage (message: XMPPMessage) errorXml = + // An error message is an exact copy of the original with the "error" element added: + let errorMessage = XMPPMessage() + message.Attributes() |> Seq.iter (fun a -> errorMessage.SetAttributeValue(a.Name, a.Value)) + message.Elements() |> Seq.iter (fun e -> errorMessage.Add e) + + let error = XElement Error + let errorChild = XElement.Parse errorXml + error.Add errorChild + errorMessage.Add error + errorMessage + +[] +let ``connect function calls the Connect method of the client passed``(): unit = + let mutable connectCalled = false + let client = XmppClientFactory.create(fun () -> async { connectCalled <- true }) + Async.RunSynchronously <| XmppClient.connect client |> ignore + Assert.True connectCalled + +[] +let ``connect function returns a lifetime terminated whenever the ConnectionFailed callback is triggered``(): unit = + let mutable callback = ignore + let client = XmppClientFactory.create(addConnectionFailedHandler = fun _ h -> callback <- h) + let lt = Async.RunSynchronously <| XmppClient.connect client + Assert.True lt.IsAlive + callback(ConnFailedArgs()) + Assert.False lt.IsAlive + +[] +let ``enterRoom function calls JoinMultiUserChat``(): unit = + let mutable called = false + let mutable presenceHandlers = ResizeArray() + let client = + XmppClientFactory.create( + addPresenceHandler = (fun _ h -> presenceHandlers.Add h), + joinMultiUserChat = fun roomJid nickname -> + called <- true + Seq.iter (fun h -> h (createSelfPresence roomJid nickname 110)) presenceHandlers + ) + let roomInfo = { RoomJid = JID("room@conference.example.org"); Nickname = "testuser" } + Lifetime.Using(fun lt -> + Async.RunSynchronously <| XmppClient.enterRoom client lt roomInfo |> ignore + Assert.True called + ) + +[] +let ``enterRoom throws an exception in case of an error presence``(): unit = + let mutable presenceHandlers = ResizeArray() + let client = + XmppClientFactory.create( + addPresenceHandler = (fun _ h -> presenceHandlers.Add h), + joinMultiUserChat = fun roomJid nickname -> + sendPresence (createErrorPresence roomJid nickname "") presenceHandlers + ) + let roomInfo = { RoomJid = JID("room@conference.example.org"); Nickname = "testuser" } + Lifetime.Using(fun lt -> + let ae = Assert.Throws(fun () -> + Async.RunSynchronously <| XmppClient.enterRoom client lt roomInfo |> ignore + ) + let ex = Seq.exactlyOne ae.InnerExceptions + Assert.Contains("", ex.Message) + ) + +[] +let ``Lifetime returned from enterRoom terminates by a room leave presence``(): unit = + let mutable presenceHandlers = ResizeArray() + let client = + XmppClientFactory.create( + addPresenceHandler = (fun _ h -> presenceHandlers.Add h), + joinMultiUserChat = fun roomJid nickname -> + sendPresence (createSelfPresence roomJid nickname 110) presenceHandlers + ) + let roomInfo = { RoomJid = JID("room@conference.example.org"); Nickname = "testuser" } + Lifetime.Using(fun lt -> + let roomLt = Async.RunSynchronously <| XmppClient.enterRoom client lt roomInfo + Assert.True roomLt.IsAlive + sendPresence (createLeavePresence roomInfo.RoomJid roomInfo.Nickname) presenceHandlers + Assert.False roomLt.IsAlive + ) + +[] +let ``Lifetime returned from enterRoom terminates by an external lifetime termination``(): unit = + let mutable presenceHandlers = ResizeArray() + let client = + XmppClientFactory.create( + addPresenceHandler = (fun _ h -> presenceHandlers.Add h), + joinMultiUserChat = fun roomJid nickname -> + sendPresence (createSelfPresence roomJid nickname 110) presenceHandlers + ) + let roomInfo = { RoomJid = JID("room@conference.example.org"); Nickname = "testuser" } + use ld = Lifetime.Define() + let lt = ld.Lifetime + let roomLt = Async.RunSynchronously <| XmppClient.enterRoom client lt roomInfo + Assert.True roomLt.IsAlive + ld.Terminate() + Assert.False roomLt.IsAlive + +[] +let ``sendRoomMessage calls Send method on the client``(): unit = + let mutable message = Unchecked.defaultof + let client = XmppClientFactory.create(send = fun m -> message <- m) + let messageInfo = { RecipientJid = JID("room@conference.example.org"); Text = "foo bar" } + Lifetime.Using(fun lt -> + Async.RunSynchronously <| XmppClient.sendRoomMessage client lt messageInfo |> ignore + Assert.Equal(messageInfo.RecipientJid.FullJid, message.To.FullJid) + Assert.Equal(messageInfo.Text, message.Text) + ) + +[] +let ``sendRoomMessage's result gets resolved after the message receival``(): unit = + let mutable messageHandler = ignore + let mutable message = Unchecked.defaultof + let client = + XmppClientFactory.create( + addMessageHandler = (fun _ h -> messageHandler <- h), + send = fun m -> message <- m + ) + let messageInfo = { RecipientJid = JID("room@conference.example.org"); Text = "foo bar" } + Lifetime.Using(fun lt -> + let deliveryInfo = Async.RunSynchronously <| XmppClient.sendRoomMessage client lt messageInfo + Assert.Equal(message.ID, deliveryInfo.MessageId) + let deliveryTask = Async.StartAsTask deliveryInfo.Delivery + Assert.False deliveryTask.IsCompleted + messageHandler message + deliveryTask.Wait() + ) + +[] +let ``sendRoomMessage's result doesn't get resolved after receiving other message``(): unit = + let mutable messageHandler = ignore + let client = XmppClientFactory.create(addMessageHandler = fun _ h -> messageHandler <- h) + let messageInfo = { RecipientJid = JID("room@conference.example.org"); Text = "foo bar" } + Lifetime.Using(fun lt -> + let deliveryInfo = Async.RunSynchronously <| XmppClient.sendRoomMessage client lt messageInfo + let deliveryTask = Async.StartAsTask deliveryInfo.Delivery + Assert.False deliveryTask.IsCompleted + + let otherMessage = SharpXmppHelper.message "xxx" "nickname@example.org" "foo bar" + messageHandler otherMessage + Assert.False deliveryTask.IsCompleted + ) + +[] +let ``sendRoomMessage's result gets resolved with an error if an error response is received``(): unit = + let mutable messageHandler = ignore + let client = + XmppClientFactory.create( + addMessageHandler = (fun _ h -> messageHandler <- h), + send = fun m -> messageHandler(createErrorMessage m "") + ) + let messageInfo = { RecipientJid = JID("room@conference.example.org"); Text = "foo bar" } + Lifetime.Using(fun lt -> + let deliveryInfo = Async.RunSynchronously <| XmppClient.sendRoomMessage client lt messageInfo + let ae = Assert.Throws(fun () -> Async.RunSynchronously deliveryInfo.Delivery) + let ex = Seq.exactlyOne ae.InnerExceptions + Assert.Contains("", ex.Message) + ) + +[] +let ``sendRoomMessage's result gets terminated after parent lifetime termination``(): unit = + let client = XmppClientFactory.create() + let messageInfo = { RecipientJid = JID("room@conference.example.org"); Text = "foo bar" } + use ld = Lifetime.Define() + let lt = ld.Lifetime + let deliveryInfo = Async.RunSynchronously <| XmppClient.sendRoomMessage client lt messageInfo + let deliveryTask = Async.StartAsTask deliveryInfo.Delivery + Assert.False deliveryTask.IsCompleted + ld.Terminate() + Assert.Throws(fun () -> deliveryTask.GetAwaiter().GetResult()) |> ignore + +[] +let ``awaitMessageDelivery just returns an async from the delivery info``(): unit = + let async = async { return () } + let deliveryInfo = { MessageId = ""; Delivery = async } + let result = XmppClient.awaitMessageDelivery deliveryInfo + Assert.True(Object.ReferenceEquals(async, result)) diff --git a/Emulsion.Tests/Xmpp/XmppMessageFactory.fs b/Emulsion.Tests/Xmpp/XmppMessageFactory.fs index f0dbc00c..fa86e0c9 100644 --- a/Emulsion.Tests/Xmpp/XmppMessageFactory.fs +++ b/Emulsion.Tests/Xmpp/XmppMessageFactory.fs @@ -3,6 +3,7 @@ namespace Emulsion.Tests.Xmpp open System.Xml.Linq open SharpXMPP.XMPP.Client.Elements +open Emulsion.Xmpp.SharpXmppHelper.Attributes open Emulsion.Xmpp.SharpXmppHelper.Elements type XmppMessageFactory = diff --git a/Emulsion/Emulsion.fsproj b/Emulsion/Emulsion.fsproj index 92dfc71b..adb752df 100644 --- a/Emulsion/Emulsion.fsproj +++ b/Emulsion/Emulsion.fsproj @@ -11,12 +11,17 @@ + + + - - + + + + @@ -27,6 +32,7 @@ + diff --git a/Emulsion/ExceptionUtils.fs b/Emulsion/ExceptionUtils.fs new file mode 100644 index 00000000..28f54d27 --- /dev/null +++ b/Emulsion/ExceptionUtils.fs @@ -0,0 +1,8 @@ +module Emulsion.ExceptionUtils + +open System.Runtime.ExceptionServices + +let reraise (ex: exn): 'a = + let edi = ExceptionDispatchInfo.Capture ex + edi.Throw() + failwith "Impossible" diff --git a/Emulsion/Lifetimes.fs b/Emulsion/Lifetimes.fs new file mode 100644 index 00000000..26d06fe7 --- /dev/null +++ b/Emulsion/Lifetimes.fs @@ -0,0 +1,10 @@ +module Emulsion.Lifetimes + +open System.Threading.Tasks + +open JetBrains.Lifetimes + +let awaitTermination(lifetime: Lifetime): Async = + let tcs = TaskCompletionSource() + lifetime.OnTermination(fun () -> tcs.SetResult()) |> ignore + Async.AwaitTask tcs.Task diff --git a/Emulsion/Program.fs b/Emulsion/Program.fs index 3e5ac016..456c2ce4 100644 --- a/Emulsion/Program.fs +++ b/Emulsion/Program.fs @@ -10,6 +10,7 @@ open Serilog open Emulsion.Actors open Emulsion.MessageSystem open Emulsion.Settings +open Emulsion.Xmpp let private getConfiguration directory fileName = let config = @@ -47,7 +48,7 @@ let private startApp config = let telegramLogger = Logging.telegramLogger logger let! cancellationToken = Async.CancellationToken - let xmpp = createClient xmppLogger Xmpp.Client cancellationToken config.Xmpp + let xmpp = createClient xmppLogger XmppMessageSystem cancellationToken config.Xmpp let telegram = createClient telegramLogger Telegram.Client cancellationToken config.Telegram let factories = { xmppFactory = Xmpp.spawn xmppLogger xmpp telegramFactory = Telegram.spawn telegramLogger telegram } diff --git a/Emulsion/Xmpp/Client.fs b/Emulsion/Xmpp/Client.fs deleted file mode 100644 index a60888ec..00000000 --- a/Emulsion/Xmpp/Client.fs +++ /dev/null @@ -1,28 +0,0 @@ -namespace Emulsion.Xmpp - -open System.Threading - -open Emulsion -open Emulsion.MessageSystem -open Emulsion.Settings - -type Client(ctx: ServiceContext, cancellationToken: CancellationToken, settings: XmppSettings) = - inherit MessageSystemBase(ctx, cancellationToken) - - let client = ref None - - override __.RunUntilError receiver = async { - use newClient = XmppClient.create ctx.Logger settings receiver - try - Volatile.Write(client, Some newClient) - do! XmppClient.run ctx.Logger newClient - finally - Volatile.Write(client, None) - } - - override __.Send (OutgoingMessage message) = async { - match Volatile.Read(client) with - | None -> failwith "Client is offline" - | Some client -> - return XmppClient.send settings client message - } diff --git a/Emulsion/Xmpp/EmulsionXmpp.fs b/Emulsion/Xmpp/EmulsionXmpp.fs new file mode 100644 index 00000000..c198d8cd --- /dev/null +++ b/Emulsion/Xmpp/EmulsionXmpp.fs @@ -0,0 +1,72 @@ +/// Main business logic for an XMPP part of the Emulsion application. +module Emulsion.Xmpp.EmulsionXmpp + +open JetBrains.Lifetimes +open Serilog +open SharpXMPP.XMPP + +open Emulsion +open Emulsion.Settings +open Emulsion.MessageSystem +open Emulsion.Xmpp.XmppClient + +let private shouldProcessMessage (settings: XmppSettings) message = + let isGroup = SharpXmppHelper.isGroupChatMessage message + let shouldSkip = lazy ( + SharpXmppHelper.isOwnMessage (settings.Nickname) message + || SharpXmppHelper.isHistoricalMessage message + || SharpXmppHelper.isEmptyMessage message + ) + isGroup && not shouldSkip.Value + +let private addMessageHandler (client: IXmppClient) lt settings receiver = + client.AddMessageHandler lt (fun xmppMessage -> + if shouldProcessMessage settings xmppMessage then + let message = SharpXmppHelper.parseMessage xmppMessage + receiver(XmppMessage message) + ) + +let initializeLogging (logger: ILogger) (client: IXmppClient): IXmppClient = + let lt = Lifetime.Eternal + client.AddConnectionFailedHandler lt (fun e -> logger.Error(e.Exception, "Connection failed: {Message}", e.Message)) + client.AddSignedInHandler lt (fun e -> logger.Information("Signed in to the server")) + client.AddElementHandler lt (fun e -> + let direction = if e.IsInput then "incoming" else "outgoing" + logger.Verbose("XMPP stanza ({Direction}): {Stanza}", direction, e.Stanza) + ) + client + +let run (settings: XmppSettings) + (logger: ILogger) + (client: IXmppClient) + (messageReceiver: IncomingMessageReceiver): Async = async { + logger.Information "Connecting to the server" + let! sessionLifetime = XmppClient.connect client + sessionLifetime.ThrowIfNotAlive() + logger.Information "Connection succeeded" + + logger.Information "Initializing client handler" + addMessageHandler client sessionLifetime settings messageReceiver + logger.Information "Client handler initialized" + + let roomInfo = { RoomJid = JID(settings.Room); Nickname = settings.Nickname } + logger.Information("Entering the room {RoomInfo}", roomInfo) + let! roomLifetime = XmppClient.enterRoom client sessionLifetime roomInfo + logger.Information "Entered the room" + + logger.Information "Ready, waiting for room lifetime termination" + do! Lifetimes.awaitTermination roomLifetime + logger.Information "Room lifetime has been terminated" +} + +let send (logger: ILogger) + (client: IXmppClient) + (lifetime: Lifetime) + (settings: XmppSettings) + (message: Message): Async = async { + let text = sprintf "<%s> %s" message.author message.text + let message = { RecipientJid = JID(settings.Room); Text = text } + let! deliveryInfo = XmppClient.sendRoomMessage client lifetime message + logger.Information("Message {MessageId} has been sent; awaiting delivery", deliveryInfo.MessageId) + do! XmppClient.awaitMessageDelivery deliveryInfo +} diff --git a/Emulsion/Xmpp/SharpXmppClient.fs b/Emulsion/Xmpp/SharpXmppClient.fs new file mode 100644 index 00000000..0767cbf1 --- /dev/null +++ b/Emulsion/Xmpp/SharpXmppClient.fs @@ -0,0 +1,44 @@ +/// An implementation of an IXmppClient based on SharpXMPP library. +module Emulsion.Xmpp.SharpXmppClient + +open SharpXMPP +open SharpXMPP.XMPP + +open Emulsion.Xmpp +open Emulsion.Xmpp.XmppClient +open Emulsion.Settings + +type Wrapper(client: XmppClient) = + interface IXmppClient with + member ___.Connect() = async { + let! ct = Async.CancellationToken + return! Async.AwaitTask(client.ConnectAsync ct) + } + member __.JoinMultiUserChat roomJid nickname = SharpXmppHelper.joinRoom client roomJid.BareJid nickname + member __.Send message = client.Send message + member __.AddSignedInHandler lt handler = + let handlerDelegate = XmppClient.SignedInHandler(fun _ args -> handler args) + client.add_SignedIn handlerDelegate + lt.OnTermination(fun () -> client.remove_SignedIn handlerDelegate) |> ignore + member __.AddElementHandler lt handler = + let handlerDelegate = XmppClient.ElementHandler(fun _ args -> handler args) + client.add_Element handlerDelegate + lt.OnTermination(fun () -> client.remove_Element handlerDelegate) |> ignore + member __.AddConnectionFailedHandler lt handler = + let handlerDelegate = XmppClient.ConnectionFailedHandler(fun _ args -> handler args) + client.add_ConnectionFailed handlerDelegate + lt.OnTermination(fun () -> client.remove_ConnectionFailed handlerDelegate) |> ignore + member __.AddPresenceHandler lt handler = + let handlerDelegate = XmppClient.PresenceHandler(fun _ args -> handler args) + client.add_Presence handlerDelegate + lt.OnTermination(fun () -> client.remove_Presence handlerDelegate) |> ignore + member __.AddMessageHandler lt handler = + let handlerDelegate = XmppClient.MessageHandler(fun _ args -> handler args) + client.add_Message handlerDelegate + lt.OnTermination(fun () -> client.remove_Message handlerDelegate) |> ignore + +let create (settings: XmppSettings): XmppClient = + new XmppClient(JID(settings.Login), settings.Password) + +let wrap(client: XmppClient): IXmppClient = + upcast Wrapper client diff --git a/Emulsion/Xmpp/SharpXmppHelper.fs b/Emulsion/Xmpp/SharpXmppHelper.fs index 6740df6a..4c52de8e 100644 --- a/Emulsion/Xmpp/SharpXmppHelper.fs +++ b/Emulsion/Xmpp/SharpXmppHelper.fs @@ -1,3 +1,4 @@ +/// Helper functions to deal with SharpXMPP low-level details (such as XML stuff). module Emulsion.Xmpp.SharpXmppHelper open System @@ -9,17 +10,30 @@ open SharpXMPP.XMPP.Client.MUC.Bookmarks.Elements open SharpXMPP.XMPP.Client.Elements open Emulsion +open Emulsion.Xmpp -module Elements = - let Body = XName.Get("body", Namespaces.JabberClient) - let Delay = XName.Get("delay", "urn:xmpp:delay") +module Namespaces = + let MucUser = "http://jabber.org/protocol/muc#user" + +module Attributes = + let Code = XName.Get "code" let From = XName.Get "from" + let Id = XName.Get "id" let Jid = XName.Get "jid" - let Nick = XName.Get("nick", Namespaces.StorageBookmarks) let Stamp = XName.Get "stamp" let To = XName.Get "to" let Type = XName.Get "type" +open Attributes + +module Elements = + let Body = XName.Get("body", Namespaces.JabberClient) + let Delay = XName.Get("delay", "urn:xmpp:delay") + let Error = XName.Get "error" + let Nick = XName.Get("nick", Namespaces.StorageBookmarks) + let Status = XName.Get("status", Namespaces.MucUser) + let X = XName.Get("x", Namespaces.MucUser) + open Elements let private bookmark (roomJid: string) (nickname: string): BookmarkedConference = @@ -33,8 +47,9 @@ let joinRoom (client: XmppClient) (roomJid: string) (nickname: string): unit = let room = bookmark roomJid nickname client.BookmarkManager.Join(room) -let message (toAddr : string) (text : string) = +let message (id: string) (toAddr: string) (text: string): XMPPMessage = let m = XMPPMessage() + m.SetAttributeValue(Id, id) m.SetAttributeValue(Type, "groupchat") m.SetAttributeValue(To, toAddr) let body = XElement(Body) @@ -69,9 +84,36 @@ let isGroupChatMessage(message: XMPPMessage): bool = let isEmptyMessage(message: XMPPMessage): bool = String.IsNullOrWhiteSpace message.Text +/// See https://xmpp.org/registrar/mucstatus.html +let private removalCodes = Set.ofArray [| 301; 307; 321; 322; 332 |] +let hasRemovalCode(states: int[]): bool = + states |> Array.exists (fun x -> Set.contains x removalCodes) + +let getMessageId(message: XMPPMessage): string option = + getAttributeValue message Id + +let getMessageError(message: XMPPMessage): XElement option = + message.Element Error |> Option.ofObj + let parseMessage (message: XMPPMessage): Message = let nickname = getAttributeValue message From |> Option.map getResource |> Option.defaultValue "[UNKNOWN USER]" { author = nickname; text = message.Text } + +let parsePresence(presence: XMPPPresence): Presence = + let from = getAttributeValue presence From |> Option.defaultValue "" + let presenceType = getAttributeValue presence Type + let states = + presence.Element X + |> Option.ofObj + |> Option.map (fun x -> + x.Elements Status + |> Seq.choose (fun s -> getAttributeValue s Code) + |> Seq.map int + ) + |> Option.map Seq.toArray + |> Option.defaultWith(fun () -> Array.empty) + let error = presence.Element Error |> Option.ofObj + { From = from; Type = presenceType; States = states; Error = error } diff --git a/Emulsion/Xmpp/Types.fs b/Emulsion/Xmpp/Types.fs new file mode 100644 index 00000000..e213671e --- /dev/null +++ b/Emulsion/Xmpp/Types.fs @@ -0,0 +1,29 @@ +namespace Emulsion.Xmpp + +open System.Xml.Linq + +open SharpXMPP.XMPP + +type Presence = { + From: string + States: int[] + Error: XElement option + Type: string option +} + +type RoomInfo = { + RoomJid: JID + Nickname: string +} + +type MessageInfo = { + RecipientJid: JID + Text: string +} + +type MessageDeliveryInfo = { + MessageId: string + + /// Resolves after the message is guaranteed to be delivered to the recipient. + Delivery: Async +} diff --git a/Emulsion/Xmpp/XmppClient.fs b/Emulsion/Xmpp/XmppClient.fs index 7c20b2b3..d47086ed 100644 --- a/Emulsion/Xmpp/XmppClient.fs +++ b/Emulsion/Xmpp/XmppClient.fs @@ -1,83 +1,137 @@ +/// A general abstraction around an XMPP client and common functions. module Emulsion.Xmpp.XmppClient open System -open System.Threading.Tasks -open Serilog +open JetBrains.Lifetimes + open SharpXMPP open SharpXMPP.XMPP +open SharpXMPP.XMPP.Client.Elements open Emulsion -open Emulsion.Settings - -let private connectionFailedHandler (logger: ILogger) = XmppConnection.ConnectionFailedHandler(fun s e -> - logger.Error(e.Exception, "XMPP connection failed: {Message}", e.Message) - ()) - -let private signedInHandler (logger: ILogger) (settings: XmppSettings) (client: XmppClient) = - XmppConnection.SignedInHandler(fun s e -> - logger.Information("Connecting to {Room} as {Nickname}", settings.Room, settings.Nickname) - SharpXmppHelper.joinRoom client settings.Room settings.Nickname +open Emulsion.Lifetimes +open Emulsion.Xmpp + +type IXmppClient = + abstract member Connect: unit -> Async + abstract member JoinMultiUserChat: roomJid: JID -> nickname: string -> unit + abstract member Send: XMPPMessage -> unit + abstract member AddConnectionFailedHandler: Lifetime -> (ConnFailedArgs -> unit) -> unit + abstract member AddSignedInHandler: Lifetime -> (SignedInArgs -> unit) -> unit + abstract member AddElementHandler: Lifetime -> (ElementArgs -> unit) -> unit + abstract member AddPresenceHandler: Lifetime -> (XMPPPresence -> unit) -> unit + abstract member AddMessageHandler: Lifetime -> (XMPPMessage -> unit) -> unit + +/// Establish a connection to the server and log in. Returns a connection lifetime that will terminate if the connection +/// terminates. +let connect (client: IXmppClient): Async = async { + let connectionLifetime = new LifetimeDefinition() + client.AddConnectionFailedHandler connectionLifetime.Lifetime <| fun _ -> + connectionLifetime.Terminate() + + do! client.Connect() + return connectionLifetime.Lifetime +} + +let private isSelfPresence (roomInfo: RoomInfo) (presence: XMPPPresence) = + let presence = SharpXmppHelper.parsePresence presence + let expectedJid = sprintf "%s/%s" roomInfo.RoomJid.BareJid roomInfo.Nickname + presence.Type = None && presence.From = expectedJid && Array.contains 110 presence.States + +let private isLeavePresence (roomInfo: RoomInfo) (presence: XMPPPresence) = + let presence = SharpXmppHelper.parsePresence presence + let expectedJid = sprintf "%s/%s" roomInfo.RoomJid.BareJid roomInfo.Nickname + presence.From = expectedJid && presence.Type = Some "unavailable" && SharpXmppHelper.hasRemovalCode presence.States + +let private extractPresenceException (roomInfo: RoomInfo) (presence: XMPPPresence) = + let presence = SharpXmppHelper.parsePresence presence + let expectedJid = sprintf "%s/%s" roomInfo.RoomJid.BareJid roomInfo.Nickname + if presence.From = expectedJid then + presence.Error + |> Option.map (fun e -> Exception(sprintf "Error: %A" e)) + else None + +/// Enter the room, returning the in-room lifetime. Will terminate if kicked or left the room. +let enterRoom (client: IXmppClient) (lifetime: Lifetime) (roomInfo: RoomInfo): Async = async { + use connectionLifetimeDefinition = lifetime.CreateNested() + let connectionLifetime = connectionLifetimeDefinition.Lifetime + + let roomLifetimeDefinition = lifetime.CreateNested() + let roomLifetime = roomLifetimeDefinition.Lifetime + + let tcs = connectionLifetime.CreateTaskCompletionSource() + + // Success and error handlers: + client.AddPresenceHandler connectionLifetime (fun presence -> + if isSelfPresence roomInfo presence + then tcs.SetResult() + else + match extractPresenceException roomInfo presence with + | Some ex -> tcs.SetException ex + | None -> () ) -let private shouldProcessMessage settings message = - let isGroup = SharpXmppHelper.isGroupChatMessage message - let shouldSkip = lazy ( - SharpXmppHelper.isOwnMessage (settings.Nickname) message - || SharpXmppHelper.isHistoricalMessage message - || SharpXmppHelper.isEmptyMessage message + // Room leave handler: + client.AddPresenceHandler roomLifetime (fun presence -> + if isLeavePresence roomInfo presence + then roomLifetimeDefinition.Terminate() ) - isGroup && not shouldSkip.Value - -let private messageHandler (logger: ILogger) settings onMessage = XmppConnection.MessageHandler(fun _ element -> - logger.Verbose("Incoming XMPP message: {Message}", element) - if shouldProcessMessage settings element then - onMessage(XmppMessage (SharpXmppHelper.parseMessage element)) -) - -let private elementHandler (logger: ILogger) = XmppConnection.ElementHandler(fun s e -> - let direction = if e.IsInput then "incoming" else "outgoing" - logger.Verbose("XMPP stanza ({Direction}): {Stanza}", direction, e.Stanza) -) - -let private presenceHandler (logger: ILogger) = XmppConnection.PresenceHandler(fun s e -> - logger.Verbose("XMPP presence: {Presence}", e) -) - -let create (logger: ILogger) (settings: XmppSettings) (onMessage: IncomingMessage -> unit): XmppClient = - let client = new XmppClient(JID(settings.Login), settings.Password) - client.add_ConnectionFailed(connectionFailedHandler logger) - client.add_SignedIn(signedInHandler logger settings client) - client.add_Element(elementHandler logger) - client.add_Presence(presenceHandler logger) - client.add_Message(messageHandler logger settings onMessage) - client - -type ConnectionFailedError(message: string, innerException: Exception) = - inherit Exception(message, innerException) - -let run (logger: ILogger) (client: XmppClient): Async = - logger.Information("Running XMPP bot: {Jid}", client.Jid.FullJid) - let connectionFinished = TaskCompletionSource() - let connectionFailedHandler = - XmppConnection.ConnectionFailedHandler( - fun _ error -> connectionFinished.SetException(ConnectionFailedError(error.Message, error.Exception)) - ) + try + // Start the join process, wait for a result: + client.JoinMultiUserChat roomInfo.RoomJid roomInfo.Nickname + do! Async.AwaitTask tcs.Task + return roomLifetime + with + | ex -> + // In case of an error, terminate the room lifetime (but leave it intact in case of success): + roomLifetimeDefinition.Terminate() + return ExceptionUtils.reraise ex +} + +let private hasMessageId messageId message = + SharpXmppHelper.getMessageId message = Some messageId + +let private extractMessageException message = + SharpXmppHelper.getMessageError message + |> Option.map(fun e -> Exception(sprintf "Error: %A" e)) + +let private awaitMessageReceival (client: IXmppClient) (lifetime: Lifetime) messageId = + // We need to perform this part synchronously to avoid the race condition between adding a message handler and + // actually sending a message. + let messageLifetimeDefinition = lifetime.CreateNested() + let messageLifetime = messageLifetimeDefinition.Lifetime + let messageReceivedTask = messageLifetime.CreateTaskCompletionSource() + client.AddMessageHandler lifetime (fun message -> + if hasMessageId messageId message then + match extractMessageException message with + | Some ex -> messageReceivedTask.SetException ex + | None -> messageReceivedTask.SetResult() + ) async { try - let! token = Async.CancellationToken - use _ = token.Register(fun () -> client.Close()) + do! Async.AwaitTask messageReceivedTask.Task + finally + messageLifetimeDefinition.Dispose() + } - client.add_ConnectionFailed connectionFailedHandler - do! Async.AwaitTask(client.ConnectAsync token) +let private newMessageId(): string = + Guid.NewGuid().ToString() - do! Async.AwaitTask connectionFinished.Task - finally - client.remove_ConnectionFailed connectionFailedHandler +/// Sends the message to the room. Returns an object that allows to track the message receival. +let sendRoomMessage (client: IXmppClient) (lifetime: Lifetime) (messageInfo: MessageInfo): Async = + async { + let messageId = newMessageId() + let message = SharpXmppHelper.message messageId messageInfo.RecipientJid.FullJid messageInfo.Text + let! delivery = Async.StartChild <| awaitMessageReceival client lifetime messageId + client.Send message + return { + MessageId = messageId + Delivery = delivery + } } -let send (settings: XmppSettings) (client: XmppClient) (message: Message): unit = - let text = sprintf "<%s> %s" message.author message.text - SharpXmppHelper.message settings.Room text - |> client.Send +/// Waits for the message to be delivered. +let awaitMessageDelivery (deliveryInfo: MessageDeliveryInfo): Async = + deliveryInfo.Delivery diff --git a/Emulsion/Xmpp/XmppMessageSystem.fs b/Emulsion/Xmpp/XmppMessageSystem.fs new file mode 100644 index 00000000..2e0e1569 --- /dev/null +++ b/Emulsion/Xmpp/XmppMessageSystem.fs @@ -0,0 +1,32 @@ +namespace Emulsion.Xmpp + +open System.Threading + +open JetBrains.Lifetimes + +open Emulsion +open Emulsion.MessageSystem +open Emulsion.Settings + +type XmppMessageSystem(ctx: ServiceContext, cancellationToken: CancellationToken, settings: XmppSettings) = + inherit MessageSystemBase(ctx, cancellationToken) + + let client = ref None + + override __.RunUntilError receiver = async { + use sharpXmpp = SharpXmppClient.create settings + let newClient = SharpXmppClient.wrap sharpXmpp |> EmulsionXmpp.initializeLogging ctx.Logger + use newClientLifetimeDef = Lifetime.Define() + try + Volatile.Write(client, Some (newClient, newClientLifetimeDef.Lifetime)) + do! EmulsionXmpp.run settings ctx.Logger newClient receiver + finally + Volatile.Write(client, None) + } + + override __.Send (OutgoingMessage message) = async { + match Volatile.Read(client) with + | None -> failwith "Client is offline" + | Some (client, lt) -> + return! EmulsionXmpp.send ctx.Logger client lt settings message + }