From 2291a8dab1844ceaa54008b26b47d480a0ef2f9e Mon Sep 17 00:00:00 2001 From: Stuart Ferguson Date: Thu, 12 Mar 2026 14:09:15 +0000 Subject: [PATCH] Automate replay of parked EventStore messages daily Added a Windows Task Scheduler XML to run a new PowerShell script (ReplayParkedQueue.ps1) every day at 1:00 AM. The script queries all persistent EventStore subscriptions, checks for parked messages, and triggers their replay if present. Includes logging with retention and error handling. Automates maintenance of parked queues for all subscriptions. Add scheduled task and script to replay parked EventStore msgs Introduced a Windows Task Scheduler XML to run a new PowerShell script (ReplayParkedQueue.ps1) daily at 1:00 AM. The script queries all EventStore persistent subscriptions, checks for parked messages, and triggers their replay if present. Includes logging, log retention, and error handling. Automates maintenance of parked queues for improved reliability. --- .../Replay Parked Queues.xml | Bin 0 -> 3730 bytes .../ReplayParkedQueue/ReplayParkedQueue.ps1 | 143 ++++++++++++++++++ 2 files changed, 143 insertions(+) create mode 100644 ScheduledTasks/ReplayParkedQueue/Replay Parked Queues.xml create mode 100644 ScheduledTasks/ReplayParkedQueue/ReplayParkedQueue.ps1 diff --git a/ScheduledTasks/ReplayParkedQueue/Replay Parked Queues.xml b/ScheduledTasks/ReplayParkedQueue/Replay Parked Queues.xml new file mode 100644 index 0000000000000000000000000000000000000000..8f1230e72963395d1efc437fbc682a586c5c28a9 GIT binary patch literal 3730 zcmbW4TW{J}5Xa|vrTq>l?+^&vq>5-N()5CqZZ9G2OI}!T2?~%1lcxFd-TwY_F!r%2 zrIlrD&dmJha^`aU|M!kPw{JGHsnzVOCAP5CCU$FcduDsqvW7Kz)}YL7$odHC+{RA5 zw7$KvI$VeFJhv13+y1f+s{>GOeu{YDh(YTdPHDQfCD)xD+dfx{u4{D89I3%`!pZ{u zDfbsR8*a@E-Uqj6#QFo&eLKM7rv1ZPllOg>UGiM6tlC@IS;M}LeMHL`gcSM$jwSE% zzQ_HAx8Rd_1OJ)xB&*ne*62tCRR#YsqIT2=aDmANBiTx5x_0clCz}6|6lj(u0>(o*LXE zul-^s7ppVhlS>elQ#~Y98|pcA>M2y6-VV82)YuV~-}b6(lLM;5Ln=~rs(P(MYjHLC z-$tShw@|bqZh0E(s>b-7OdNC1iOD1Wsu!rPQmzND-jU;M;;W*4^sGJyMcsD_)*G)O zbwBl_F6#qURKX!9Wn)#Uh%An3yz~6qOfT+PhZrYlS2t9*FVez(zV-YTUEL+|?3KL( z&jZ>1o>)ww4?XvLpv*lw2^}pVQnKQAQDQI8Ku^&)%2$reuutOsPaO&GvQZ$ z6zloV5MnE&^_D0|=WE_V#CFmCdy=u%?%L}&__&}h`tGGBrTUkAj(y!tBG!^e^s~?7 z`G$Nv!S7J>p-1i}6YB~2te8#77DZAyvRu{E%??`s1R=ITy+xV4J@Z5MXFDg0GdazM z^1Rr_SJWO*HEUD6)_yKMe>3cIjJI(XaaCg(#&*9t#JE~V^6w0*bPlU}x0^(4PkDkC ztFcUrCDj*4TnUzx;}O$hxoT!)5hpIPEo9Laj7x_fWC%v#{g?zi?{{u16cZp`Aa+CQv5V}OC@ZqE zQZt}Vf4TJ;sjmr5*91SpYB>9cnK4*-s@W`hHU7mZSM$@JPCz;fi2v0Zdk;&GIgc-~ ze9pAcx6IK0>#IXwL76VaGNY>>=zB~v_=?qPEaDS>@qC}3?i)y^oNo5qCe6eYe=e~> wUw!3oFU`^+JX8M`Q9n8+|8#0gEYifA$J#Z1ljI7k*VC(yK>eKDTc4<^D<2mk;8 literal 0 HcmV?d00001 diff --git a/ScheduledTasks/ReplayParkedQueue/ReplayParkedQueue.ps1 b/ScheduledTasks/ReplayParkedQueue/ReplayParkedQueue.ps1 new file mode 100644 index 0000000..1120f78 --- /dev/null +++ b/ScheduledTasks/ReplayParkedQueue/ReplayParkedQueue.ps1 @@ -0,0 +1,143 @@ +param ( + [Parameter(Mandatory = $true)] + [string]$BaseUrl, + + [Parameter] + [string]$Username, + + [Parameter] + [string]$Password, + + [string]$LogDirectory = "C:\home\txnproc\trace", + [int]$LogRetentionDays = 7 +) + +# ========================= +# Logging setup +# ========================= +if (-not (Test-Path $LogDirectory)) { + New-Item -ItemType Directory -Path $LogDirectory -Force | Out-Null +} + +function Get-LogFilePath { + $date = (Get-Date).ToString("yyyy-MM-dd") + Join-Path $LogDirectory "ReplayParkedSubscriptions-$date.log" +} + +function Write-Trace { + param ( + [string]$Level, + [string]$Message + ) + + $timestamp = (Get-Date).ToString("yyyy-MM-dd HH:mm:ss.fff") + $entry = "$timestamp [$Level] $Message" + Add-Content -Path (Get-LogFilePath) -Value $entry +} + +# ========================= +# Log retention cleanup +# ========================= +$cutoffDate = (Get-Date).Date.AddDays(-$LogRetentionDays) + +Get-ChildItem -Path $LogDirectory -Filter "ReplayParkedSubscriptions-*.log" -File | + Where-Object { + if ($_.Name -match 'ReplayParkedSubscriptions-(\d{4}-\d{2}-\d{2})\.log') { + [DateTime]::ParseExact($matches[1], 'yyyy-MM-dd', $null) -lt $cutoffDate + } + else { + $false + } + } | + ForEach-Object { + try { + Remove-Item $_.FullName -Force + } + catch { + # Avoid failing the run due to cleanup issues + } + } + +Write-Trace "INFO" "Script started. BaseUrl=$BaseUrl" + +# ========================= +# Auth header +# ========================= +$AuthHeader = @{ + Authorization = "Basic " + + [Convert]::ToBase64String( + [Text.Encoding]::ASCII.GetBytes("$Username`:$Password") + ) +} + +# ========================= +# Query subscriptions +# ========================= +try { + Write-Trace "INFO" "Querying persistent subscriptions" + $subscriptions = Invoke-RestMethod ` + -Method GET ` + -Uri "$BaseUrl/subscriptions" ` + -Headers $AuthHeader +} +catch { + Write-Trace "ERROR" "Failed to query subscriptions: $($_.Exception.Message)" + exit 1 +} + +# ========================= +# Process subscriptions +# ========================= +foreach ($sub in $subscriptions) { + + $stream = $sub.eventStreamId + $group = $sub.groupName + + #Write-Trace "INFO" "Processing subscription [$stream][$group]" + + $streamEncoded = if ($stream -eq '$all') { + '%24all' + } + else { + [System.Web.HttpUtility]::UrlEncode($stream) + } + + $infoUrl = "$BaseUrl/subscriptions/$streamEncoded/$group/info" + + try { + $info = Invoke-RestMethod ` + -Method GET ` + -Uri $infoUrl ` + -Headers $AuthHeader + } + catch { + Write-Trace "WARN" "Failed to get info for [$stream][$group]: $($_.Exception.Message)" + continue + } + + $parkedCount = $info.parkedMessageCount + #Write-Trace "INFO" "[$stream][$group] parkedMessageCount=$parkedCount" + + if ($parkedCount -gt 0) { + $replayUrl = "$BaseUrl/subscriptions/$streamEncoded/$group/replayParked?from=0" + + Write-Trace "INFO" "Replaying [$parkedCount] parked messages for [$stream][$group]" + + try { + Invoke-RestMethod ` + -Method POST ` + -Uri $replayUrl ` + -Headers $AuthHeader + + Write-Trace "INFO" "Replay triggered successfully for [$stream][$group]" + } + catch { + Write-Trace "ERROR" "Replay failed for [$stream][$group]: $($_.Exception.Message)" + } + } + #else { + #Write-Trace "INFO" "No parked messages for [$stream][$group]" + #} +} + +Write-Trace "INFO" "Script completed" \ No newline at end of file