diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java index 89acd66658d6f0..5131cf82bf66fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java @@ -17,6 +17,9 @@ package org.apache.doris.plugin.dialect; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.collect.Lists; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import lombok.Data; @@ -30,35 +33,113 @@ import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; /** * This class is used to convert sql with different dialects using sql convertor service. * The sql convertor service is a http service which is used to convert sql. + *

+ * Features: + * - Support multiple URLs (comma separated) + * - Blacklist mechanism for failed URLs + * - Automatic failover and retry + * - URL caching and smart selection */ public class HttpDialectUtils { private static final Logger LOG = LogManager.getLogger(HttpDialectUtils.class); - public static String convertSql(String targetURL, String originStmt, String dialect, + // Cache URL manager instances to avoid duplicate parsing with automatic expiration + private static final Cache urlManagerCache = Caffeine.newBuilder() + .maximumSize(10) + .expireAfterAccess(8, TimeUnit.HOURS) + .build(); + + // Blacklist recovery time (ms): 1 minute + private static final long BLACKLIST_RECOVERY_TIME_MS = 60 * 1000; + // Connection timeout period (ms): 3 seconds + private static final int CONNECTION_TIMEOUT_MS = 3000; + // Read timeout period (ms): 10 seconds + private static final int READ_TIMEOUT_MS = 10000; + + public static String convertSql(String targetURLs, String originStmt, String dialect, String[] features, String config) { + UrlManager urlManager = getOrCreateUrlManager(targetURLs); ConvertRequest convertRequest = new ConvertRequest(originStmt, dialect, features, config); + String requestStr = convertRequest.toJson(); + + // Try to convert SQL using intelligent URL selection strategy + return tryConvertWithIntelligentSelection(urlManager, requestStr, originStmt); + } + + /** + * Try to convert SQL using intelligent URL selection strategy + * CRITICAL: This method ensures 100% success rate when ANY service is available + */ + private static String tryConvertWithIntelligentSelection( + UrlManager urlManager, String requestStr, String originStmt) { + // Strategy: Try ALL URLs in intelligent order, regardless of blacklist status + // This ensures 100% success rate when any service is actually available + List allUrls = urlManager.getAllUrlsInPriorityOrder(); + + for (String url : allUrls) { + try { + String result = doConvertSql(url, requestStr); + // If no exception thrown, HTTP response was successful (200) + // Mark URL as healthy and return result (even if empty) + urlManager.markUrlAsHealthy(url); + if (LOG.isDebugEnabled()) { + LOG.debug("Successfully converted SQL using URL: {}", url); + } + return result; + } catch (Exception e) { + LOG.warn("Failed to convert SQL using URL: {}, error: {}", url, e.getMessage()); + // Add failed URL to blacklist for future optimization + urlManager.markUrlAsBlacklisted(url); + // Continue trying next URL - this is CRITICAL for 100% success rate + } + } + + return originStmt; + } + + /** + * Get or create a URL manager + */ + private static UrlManager getOrCreateUrlManager(String targetURLs) { + return urlManagerCache.get(targetURLs, UrlManager::new); + } + + /** + * Perform SQL conversion for individual URL + */ + private static String doConvertSql(String targetURL, String requestStr) throws Exception { HttpURLConnection connection = null; try { - URL url = new URL(targetURL); + if (targetURL == null || targetURL.trim().isEmpty()) { + throw new Exception("Target URL is null or empty"); + } + URL url = new URL(targetURL.trim()); connection = (HttpURLConnection) url.openConnection(); connection.setRequestMethod("POST"); connection.setRequestProperty("Content-Type", "application/json"); connection.setUseCaches(false); connection.setDoOutput(true); + connection.setConnectTimeout(CONNECTION_TIMEOUT_MS); + connection.setReadTimeout(READ_TIMEOUT_MS); - String requestStr = convertRequest.toJson(); try (OutputStream outputStream = connection.getOutputStream()) { outputStream.write(requestStr.getBytes(StandardCharsets.UTF_8)); } int responseCode = connection.getResponseCode(); if (LOG.isDebugEnabled()) { - LOG.debug("POST Response Code: {}, post data: {}", responseCode, requestStr); + LOG.debug("POST Response Code: {}, URL: {}, post data: {}", responseCode, targetURL, requestStr); } if (responseCode == HttpURLConnection.HTTP_OK) { @@ -76,26 +157,20 @@ public static String convertSql(String targetURL, String originStmt, String dial }.getType(); ConvertResponse result = new Gson().fromJson(response.toString(), type); if (LOG.isDebugEnabled()) { - LOG.debug("convert response: {}", result); + LOG.debug("Convert response: {}, URL: {}", result, targetURL); } if (result.code == 0) { if (!"v1".equals(result.version)) { - LOG.warn("failed to convert sql, response version is not v1: {}", result.version); - return originStmt; + throw new Exception("Unsupported version: " + result.version); } return result.data; } else { - LOG.warn("failed to convert sql, response: {}", result); - return originStmt; + throw new Exception("Conversion failed: " + result.message); } } } else { - LOG.warn("failed to convert sql, response code: {}", responseCode); - return originStmt; + throw new Exception("HTTP response code: " + responseCode); } - } catch (Exception e) { - LOG.warn("failed to convert sql", e); - return originStmt; } finally { if (connection != null) { connection.disconnect(); @@ -103,6 +178,171 @@ public static String convertSql(String targetURL, String originStmt, String dial } } + /** + * URL Manager - Responsible for URL parsing, caching, blacklist management, and smart selection + */ + private static class UrlManager { + private final List parsedUrls; + private final ConcurrentHashMap blacklist; + + public UrlManager(String urls) { + this.parsedUrls = parseUrls(urls); + this.blacklist = new ConcurrentHashMap<>(); + if (LOG.isDebugEnabled()) { + LOG.debug("Created UrlManager with URLs: {}, parsed: {}", urls, parsedUrls); + } + } + + /** + * Parse comma separated URL strings + */ + private List parseUrls(String urls) { + List result = Lists.newArrayList(); + if (urls != null && !urls.trim().isEmpty()) { + String[] urlArray = urls.split(","); + for (String url : urlArray) { + String trimmedUrl = url.trim(); + if (!trimmedUrl.isEmpty()) { + result.add(trimmedUrl); + } + } + } + return result; + } + + /** + * Mark URL as healthy (remove from blacklist) + */ + public void markUrlAsHealthy(String url) { + if (blacklist.remove(url) != null) { + LOG.info("Removed URL from blacklist due to successful request: {}", url); + } + } + + /** + * Add URL to blacklist + */ + public void markUrlAsBlacklisted(String url) { + // If URL is already in blacklist, just return + if (blacklist.containsKey(url)) { + return; + } + + long currentTime = System.currentTimeMillis(); + long recoverTime = currentTime + BLACKLIST_RECOVERY_TIME_MS; + blacklist.put(url, new BlacklistEntry(currentTime, recoverTime)); + LOG.warn("Added URL to blacklist: {}, will recover at: {}", url, new Date(recoverTime)); + } + + /** + * Check if URL is localhost (127.0.0.1 or localhost) + */ + private boolean isLocalhost(String url) { + return url.contains("127.0.0.1") || url.contains("localhost"); + } + + /** + * Get ALL URLs in priority order for 100% success guarantee + * CRITICAL: This method ensures we try every URL when any service might be available + *

+ * Priority order: + * 1. Localhost URLs (127.0.0.1 or localhost) that are healthy + * 2. Other healthy URLs (randomly selected) + * 3. Localhost URLs in blacklist + * 4. Other blacklisted URLs (sorted by recovery time) + */ + public List getAllUrlsInPriorityOrder() { + List prioritizedUrls = Lists.newArrayList(); + List healthyLocalhost = Lists.newArrayList(); + List healthyOthers = Lists.newArrayList(); + List blacklistedLocalhost = Lists.newArrayList(); + List blacklistedOthers = Lists.newArrayList(); + + long currentTime = System.currentTimeMillis(); + + // Single traversal to categorize all URLs + for (String url : parsedUrls) { + BlacklistEntry entry = blacklist.get(url); + boolean isHealthy = false; + + if (entry == null) { + // URL is not in blacklist, consider it healthy + isHealthy = true; + } else if (currentTime >= entry.recoverTime) { + // URL has reached recovery time, remove from blacklist and consider healthy + blacklist.remove(url); + isHealthy = true; + if (LOG.isDebugEnabled()) { + LOG.debug("URL recovered from blacklist: {}", url); + } + } + + boolean isLocal = isLocalhost(url); + + if (isHealthy) { + if (isLocal) { + healthyLocalhost.add(url); + } else { + healthyOthers.add(url); + } + } else { + if (isLocal) { + blacklistedLocalhost.add(url); + } else { + blacklistedOthers.add(url); + } + } + } + + // Add URLs in priority order + // 1. Healthy localhost URLs first + prioritizedUrls.addAll(healthyLocalhost); + + // 2. Other healthy URLs (randomly shuffled for load balancing) + Collections.shuffle(healthyOthers, ThreadLocalRandom.current()); + prioritizedUrls.addAll(healthyOthers); + + // 3. Blacklisted localhost URLs + prioritizedUrls.addAll(blacklistedLocalhost); + + // 4. Other blacklisted URLs (sorted by recovery time) + blacklistedOthers.sort((url1, url2) -> { + BlacklistEntry entry1 = blacklist.get(url1); + BlacklistEntry entry2 = blacklist.get(url2); + if (entry1 == null && entry2 == null) { + return 0; + } + if (entry1 == null) { + return -1; + } + if (entry2 == null) { + return 1; + } + return Long.compare(entry1.recoverTime, entry2.recoverTime); + }); + prioritizedUrls.addAll(blacklistedOthers); + + if (LOG.isDebugEnabled()) { + LOG.debug("All URLs in priority order: {}", prioritizedUrls); + } + + return prioritizedUrls; + } + } + + /** + * Blacklist entry + */ + private static class BlacklistEntry { + final long blacklistedTime; + final long recoverTime; + + BlacklistEntry(long blacklistedTime, long recoverTime) { + this.blacklistedTime = blacklistedTime; + this.recoverTime = recoverTime; + } + } + @Data private static class ConvertRequest { private String version; // CHECKSTYLE IGNORE THIS LINE diff --git a/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java index de359f79475cc4..4ce71e196a33e1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java @@ -28,49 +28,379 @@ import java.net.DatagramSocket; import java.net.ServerSocket; import java.net.SocketException; +import java.util.ArrayList; +import java.util.List; public class HttpDialectUtilsTest { - private int port; - private SimpleHttpServer server; + private List ports = new ArrayList<>(); + private List servers = new ArrayList<>(); @Before public void setUp() throws Exception { - port = findValidPort(); - server = new SimpleHttpServer(port); - server.start("/api/v1/convert"); + // Create three test servers + for (int i = 0; i < 3; i++) { + int port = findValidPort(); + ports.add(port); + SimpleHttpServer server = new SimpleHttpServer(port); + server.start("/api/v1/convert"); + servers.add(server); + } } @After public void tearDown() { - if (server != null) { - server.stop(); + for (SimpleHttpServer server : servers) { + if (server != null) { + server.stop(); + } } + servers.clear(); + ports.clear(); } @Test - public void testSqlConvert() { + public void testSingleUrlConvert() { String originSql = "select * from t1 where \"k1\" = 1"; String expectedSql = "select * from t1 where `k1` = 1"; String[] features = new String[] {"ctas"}; - String targetURL = "http://127.0.0.1:" + port + "/api/v1/convert"; + String targetURL = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert"; + + // Test with no response (should return original SQL) String res = HttpDialectUtils.convertSql(targetURL, originSql, "presto", features, "{}"); Assert.assertEquals(originSql, res); - // test presto - server.setResponse("{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + + // Test successful conversion + servers.get(0).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); res = HttpDialectUtils.convertSql(targetURL, originSql, "presto", features, "{}"); Assert.assertEquals(expectedSql, res); - // test response version error - server.setResponse("{\"version\": \"v2\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + + // Test version error + servers.get(0).setResponse( + "{\"version\": \"v2\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); res = HttpDialectUtils.convertSql(targetURL, originSql, "presto", features, "{}"); Assert.assertEquals(originSql, res); - // test response code error - server.setResponse( + + // Test code error + servers.get(0).setResponse( "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 400, \"message\": \"\"}"); res = HttpDialectUtils.convertSql(targetURL, originSql, "presto", features, "{}"); Assert.assertEquals(originSql, res); } + @Test + public void testMultipleUrlsConvert() { + String originSql = "select * from t1 where \"k1\" = 1"; + String expectedSql = "select * from t1 where `k1` = 1"; + String[] features = new String[] {"ctas"}; + + String targetURLs = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert," + + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert," + + "http://127.0.0.1:" + ports.get(2) + "/api/v1/convert"; + + servers.get(0).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + + String res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); + Assert.assertEquals(expectedSql, res); + } + + @Test + public void testFailoverMechanism() throws InterruptedException { + String originSql = "select * from t1 where \"k1\" = 1"; + String expectedSql = "select * from t1 where `k1` = 1"; + String[] features = new String[] {"ctas"}; + + String targetURLs = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert," + + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert"; + + // First server returns error, second server succeeds + servers.get(0).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 500, \"message\": \"error\"}"); + servers.get(1).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + + String res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); + Assert.assertEquals(expectedSql, res); + } + + @Test + public void testBlacklistMechanism() throws InterruptedException { + String originSql = "select * from t1 where \"k1\" = 1"; + String expectedSql = "select * from t1 where `k1` = 1"; + String[] features = new String[] {"ctas"}; + + String targetURLs = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert," + + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert"; + + // Stop first server, set second server to work + servers.get(0).stop(); + servers.get(1).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + + // First call should succeed via second server + String res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); + Assert.assertEquals(expectedSql, res); + + // Restart first server + servers.set(0, new SimpleHttpServer(ports.get(0))); + try { + servers.get(0).start("/api/v1/convert"); + servers.get(0).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + } catch (IOException e) { + return; // Skip test if port is occupied + } + + // Should still work with blacklist recovery + res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); + Assert.assertEquals(expectedSql, res); + } + + @Test + public void testAllUrlsFailure() { + String originSql = "select * from t1 where \"k1\" = 1"; + String[] features = new String[] {"ctas"}; + + String targetURLs = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert," + + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert"; + + // All servers return error + servers.get(0).setResponse("{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); + servers.get(1).setResponse("{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); + + String res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); + Assert.assertEquals(originSql, res); + } + + @Test + public void testUrlParsing() { + String originSql = "select * from t1 where \"k1\" = 1"; + String expectedSql = "select * from t1 where `k1` = 1"; + String[] features = new String[] {"ctas"}; + + // Test URL parsing with spaces and empty items + String targetURLs = " http://127.0.0.1:" + ports.get(0) + "/api/v1/convert , ," + + " http://127.0.0.1:" + ports.get(1) + "/api/v1/convert "; + + servers.get(0).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + + String res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); + Assert.assertEquals(expectedSql, res); + } + + @Test + public void testSeamlessFailover() throws IOException { + String originSql = "select * from t1 where \"k1\" = 1"; + String expectedSql = "select * from t1 where `k1` = 1"; + String[] features = new String[] {"ctas"}; + + String targetURLs = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert," + + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert"; + + // Both servers start healthy + servers.get(0).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + servers.get(1).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + + String res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); + Assert.assertEquals(expectedSql, res); + + // Stop first server + servers.get(0).stop(); + res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); + Assert.assertEquals(expectedSql, res); + + // Restart first server, stop second + servers.set(0, new SimpleHttpServer(ports.get(0))); + servers.get(0).start("/api/v1/convert"); + servers.get(0).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + servers.get(1).stop(); + + // Should seamlessly switch to first server + res = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); + Assert.assertEquals(expectedSql, res); + } + + @Test + public void testConcurrentRequests() throws InterruptedException { + String originSql = "select * from t1 where \"k1\" = 1"; + String expectedSql = "select * from t1 where `k1` = 1"; + String[] features = new String[] {"ctas"}; + + String targetURLs = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert," + + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert"; + + servers.get(0).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + servers.get(1).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + + // Test with multiple concurrent threads + Thread[] threads = new Thread[10]; + String[] results = new String[10]; + + for (int i = 0; i < 10; i++) { + final int index = i; + threads[i] = new Thread(() -> { + results[index] = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); + }); + threads[i].start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + // Verify all results + for (String result : results) { + Assert.assertEquals(expectedSql, result); + } + } + + @Test + public void testZeroFailureGuarantee() throws InterruptedException { + String originSql = "select * from t1 where \"k1\" = 1"; + String expectedSql = "select * from t1 where `k1` = 1"; + String[] features = new String[] {"ctas"}; + + String targetURLs = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert," + + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert," + + "http://127.0.0.1:" + ports.get(2) + "/api/v1/convert"; + + int totalRequests = 30; // Reduced for faster testing with production timeouts + int successCount = 0; + + // Test various failure scenarios while ensuring at least one service is always available + for (int i = 0; i < totalRequests; i++) { + if (i < 6) { + // All servers healthy + setAllServersHealthy(expectedSql); + } else if (i < 12) { + // Server 0 fails, others healthy + servers.get(0) + .setResponse("{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); + servers.get(1).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + servers.get(2).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + } else if (i < 18) { + // Servers 0,1 fail, server 2 healthy + servers.get(0) + .setResponse("{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); + servers.get(1) + .setResponse("{\"version\": \"v1\", \"data\": \"\", \"code\": 503, \"message\": \"error\"}"); + servers.get(2).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + } else if (i < 24) { + // Only server 1 healthy + servers.get(0) + .setResponse("{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); + servers.get(1).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + servers.get(2) + .setResponse("{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); + } else { + // Alternating recovery + if (i % 2 == 0) { + servers.get(0).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + servers.get(1).setResponse( + "{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); + servers.get(2).setResponse( + "{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); + } else { + servers.get(0).setResponse( + "{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); + servers.get(1).setResponse( + "{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"error\"}"); + servers.get(2).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + } + } + + String result = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); + if (expectedSql.equals(result)) { + successCount++; + } + + Thread.sleep(50); // Small delay between requests + } + + System.out.println("Zero Failure Guarantee Test Results:"); + System.out.println("Total requests: " + totalRequests); + System.out.println("Successful: " + successCount); + System.out.println("Success rate: " + (successCount * 100.0 / totalRequests) + "%"); + + // Must achieve 100% success rate when at least one service is available + Assert.assertEquals("Must achieve 100% success rate when service is available", + totalRequests, successCount); + } + + @Test + public void testNetworkJitterStress() throws InterruptedException { + String originSql = "select * from t1 where \"k1\" = 1"; + String expectedSql = "select * from t1 where `k1` = 1"; + String[] features = new String[] {"ctas"}; + + String targetURLs = "http://127.0.0.1:" + ports.get(0) + "/api/v1/convert," + + "http://127.0.0.1:" + ports.get(1) + "/api/v1/convert"; + + int totalRequests = 15; // Reduced for faster testing with production timeouts + int successCount = 0; + + // Simulate network jitter while ensuring at least one server is always available + for (int i = 0; i < totalRequests; i++) { + double random = Math.random(); + if (random < 0.3) { + // Server 0 fails, Server 1 works + servers.get(0) + .setResponse("{\"version\": \"v1\", \"data\": \"\", \"code\": 500, \"message\": \"timeout\"}"); + servers.get(1).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + } else if (random < 0.5) { + // Server 1 fails, Server 0 works + servers.get(0).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + servers.get(1).setResponse( + "{\"version\": \"v1\", \"data\": \"\", \"code\": 503, \"message\": \"service unavailable\"}"); + } else { + // Both servers work + servers.get(0).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + servers.get(1).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + } + + String result = HttpDialectUtils.convertSql(targetURLs, originSql, "presto", features, "{}"); + if (expectedSql.equals(result)) { + successCount++; + } + + Thread.sleep(100); // Delay between requests for production timeouts + } + + System.out.println("Network Jitter Test Results:"); + System.out.println("Total requests: " + totalRequests); + System.out.println("Successful: " + successCount); + System.out.println("Success rate: " + (successCount * 100.0 / totalRequests) + "%"); + + // Must achieve 100% success rate since we ensure at least one server is always available + Assert.assertEquals("Must handle network jitter with 100% success when service is available", + totalRequests, successCount); + } + + private void setAllServersHealthy(String expectedSql) { + for (int i = 0; i < 3; i++) { + servers.get(i).setResponse( + "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); + } + } + private static int findValidPort() { int port; while (true) {