-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathWebSocketClient.cs
More file actions
202 lines (163 loc) · 7.24 KB
/
WebSocketClient.cs
File metadata and controls
202 lines (163 loc) · 7.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
#region
using System;
using System.IO;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
using RustRcon.Types;
using RustRcon.Utility;
#endregion
namespace Services.Client
{
public class WebSocketClient : IDisposable, ILogger
{
private readonly Uri _uri;
private readonly ClientWebSocket _webSocket;
private readonly SemaphoreSlim _sendSemaphore;
private readonly SemaphoreSlim _receiveSemaphore;
private readonly ILogger _logger;
private readonly ReactiveProperty<WebSocketState> _state;
private readonly Encoding _textEncoding;
private readonly JsonSerializer _jsonSerializer;
protected CancellationTokenSource? ConnectionCts { get; private set; }
public readonly ReadOnlyReactiveProperty<WebSocketState> State;
public WebSocketClient(Uri uri, ILogger? logger)
{
_uri = uri;
_logger = logger ?? this;
_webSocket = new ClientWebSocket();
_sendSemaphore = new SemaphoreSlim(1);
_receiveSemaphore = new SemaphoreSlim(1);
_state = new ReactiveProperty<WebSocketState>();
_textEncoding = new UTF8Encoding(false);
_jsonSerializer = JsonSerializer.CreateDefault();
State = new ReadOnlyReactiveProperty<WebSocketState>(_state);
}
public virtual async Task ConnectAsync(CancellationToken cancellation = default)
{
if (_webSocket.State == WebSocketState.Connecting || _webSocket.State == WebSocketState.Open)
throw new InvalidOperationException("Client already connected.");
ConnectionCts = new CancellationTokenSource();
Task handleTask = new Task(() => HandleStateChanges(ConnectionCts.Token), ConnectionCts.Token);
handleTask.Start();
await _webSocket.ConnectAsync(_uri, cancellation);
}
public async Task DisconnectAsync(string reason, CancellationToken cancellation = default)
{
ConnectionCts?.Cancel();
_state.Value = WebSocketState.Closed;
if (!CanSendWebSocketMessage())
return;
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, reason, cancellation);
}
protected async Task SendAsync<T>(T command, CancellationToken cancellation)
{
if (_webSocket.State != WebSocketState.Open)
throw new InvalidOperationException("Client is not connected.");
await _sendSemaphore.WaitAsync(cancellation);
if (!CanSendWebSocketMessage())
return;
var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(ConnectionCts!.Token, cancellation);
try
{
await using MemoryStream stream = new MemoryStream();
await using (StreamWriter streamWriter = new StreamWriter(stream, _textEncoding, 1024, true))
using (JsonWriter jsonWriter = new JsonTextWriter(streamWriter))
{
_jsonSerializer.Serialize(jsonWriter, command);
await jsonWriter.FlushAsync(linkedCts.Token);
}
ArraySegment<byte> buffer = new ArraySegment<byte>(stream.GetBuffer(), 0, (int)stream.Length);
await _webSocket.SendAsync(buffer, WebSocketMessageType.Text, true, linkedCts.Token);
}
catch (WebSocketException ex) when (ex.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely)
{
await DisconnectAsync(ex.ErrorCode.ToString(), CancellationToken.None);
_logger.Log($"Error closing web socket: {ex.Message}");
}
finally
{
_sendSemaphore.Release();
}
}
protected async Task<T?> ReadAsync<T>(CancellationToken cancellation = default) where T : class
{
await _receiveSemaphore.WaitAsync(cancellation);
if (!CanSendWebSocketMessage())
return null;
var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(ConnectionCts!.Token, cancellation);
try
{
await using MemoryStream stream = new MemoryStream();
byte[] receiveBuffer = new byte[1024];
ArraySegment<byte> buffer = new ArraySegment<byte>(receiveBuffer);
WebSocketReceiveResult result;
do
{
linkedCts.Token.ThrowIfCancellationRequested();
result = await _webSocket.ReceiveAsync(buffer, linkedCts.Token);
switch (result.MessageType)
{
case WebSocketMessageType.Close:
{
_logger.Log($"Close message received: ${result.CloseStatusDescription}");
await DisconnectAsync(result.CloseStatusDescription, CancellationToken.None);
break;
}
case WebSocketMessageType.Text:
{
await stream.WriteAsync(buffer.Array, buffer.Offset, result.Count, linkedCts.Token);
break;
}
}
linkedCts.Token.ThrowIfCancellationRequested();
} while (!linkedCts.Token.IsCancellationRequested && !result.EndOfMessage);
stream.Seek(0, SeekOrigin.Begin);
using StreamReader streamReader = new StreamReader(stream);
using JsonTextReader reader = new JsonTextReader(streamReader);
return _jsonSerializer.Deserialize<T>(reader);
}
catch (WebSocketException ex) when (ex.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely)
{
await DisconnectAsync(ex.ErrorCode.ToString(), CancellationToken.None);
_logger.Log($"Error closing web socket: {ex.Message}");
}
finally
{
_receiveSemaphore.Release();
}
return default;
}
private void HandleStateChanges(CancellationToken cancellation)
{
while (!cancellation.IsCancellationRequested)
{
WebSocketState state = _webSocket.State;
if (state == _state.Value)
{
Thread.Sleep(500);
continue;
}
_state.Value = _webSocket.State;
_logger.Log($"Connection state changed: {_state.Value}");
}
}
private bool CanSendWebSocketMessage()
{
return _webSocket.State != WebSocketState.Aborted && _webSocket.State != WebSocketState.Closed &&
_webSocket.State != WebSocketState.CloseSent;
}
public void Dispose()
{
_webSocket?.Dispose();
_sendSemaphore?.Dispose();
_receiveSemaphore?.Dispose();
}
public void Log(string message)
{
Console.WriteLine($"[WebSocketClient] {message}");
}
}
}