diff --git a/.github/workflows/e2e.cluster.yaml b/.github/workflows/e2e.cluster.yaml index b1eb89f5c6ed..d2f82f268705 100644 --- a/.github/workflows/e2e.cluster.yaml +++ b/.github/workflows/e2e.cluster.yaml @@ -37,7 +37,7 @@ jobs: strategy: matrix: coordinator: ['zk'] - storage: ['mysql', 'es6', 'es7'] #TODO: 'influxdb' + storage: ['mysql', 'es6', 'es7', 'influxdb'] env: SW_COORDINATOR: ${{ matrix.coordinator }} SW_STORAGE: ${{ matrix.storage }} diff --git a/.github/workflows/e2e.profiling.yaml b/.github/workflows/e2e.profiling.yaml index 04de1767d1a1..229468d1a3db 100644 --- a/.github/workflows/e2e.profiling.yaml +++ b/.github/workflows/e2e.profiling.yaml @@ -36,7 +36,7 @@ jobs: timeout-minutes: 90 strategy: matrix: - storage: ['h2', 'mysql', 'es6', 'es7'] #TODO: 'influxdb' + storage: ['h2', 'mysql', 'es6', 'es7', 'influxdb'] env: SW_STORAGE: ${{ matrix.storage }} steps: diff --git a/.github/workflows/e2e.storages.yaml b/.github/workflows/e2e.storages.yaml index c346f4d0ddca..c5c380b94486 100644 --- a/.github/workflows/e2e.storages.yaml +++ b/.github/workflows/e2e.storages.yaml @@ -36,7 +36,7 @@ jobs: timeout-minutes: 90 strategy: matrix: - storage: ['mysql', 'es6', 'es7'] #TODO: 'influxdb' + storage: ['mysql', 'es6', 'es7', 'influxdb'] env: SW_STORAGE: ${{ matrix.storage }} steps: diff --git a/.github/workflows/e2e.ttl.yaml b/.github/workflows/e2e.ttl.yaml index 8bce3fac6cef..c1dce8234509 100644 --- a/.github/workflows/e2e.ttl.yaml +++ b/.github/workflows/e2e.ttl.yaml @@ -36,7 +36,7 @@ jobs: timeout-minutes: 90 strategy: matrix: - storage: ['es6', 'es7'] #TODO: 'influxdb' + storage: ['es6', 'es7', 'influxdb'] env: SW_STORAGE: ${{ matrix.storage }} steps: diff --git a/.github/workflows/plugins-test.1.yaml b/.github/workflows/plugins-test.1.yaml index b9fc6c20a7d5..a9a46b795b3e 100644 --- a/.github/workflows/plugins-test.1.yaml +++ b/.github/workflows/plugins-test.1.yaml @@ -46,7 +46,6 @@ jobs: - { name: 'kotlin-coroutine-scenario', title: 'Kotlin Coroutine 1.0.1-1.3.3 (4)' } - { name: 'lettuce-scenario', title: 'Lettuce 5.x (17)' } - { name: 'mongodb-3.x-scenario', title: 'Mongodb 3.4.0-3.11.1 (22)' } - - { name: 'mysql-scenario', title: 'MySQL 5.1.2-8.0.15 (53)' } - { name: 'netty-socketio-scenario', title: 'Netty-SocketIO 1.x (4)' } steps: - uses: actions/checkout@v2 diff --git a/.github/workflows/plugins-test.3.yaml b/.github/workflows/plugins-test.3.yaml index 4870180bc68a..80af4c7a1d89 100644 --- a/.github/workflows/plugins-test.3.yaml +++ b/.github/workflows/plugins-test.3.yaml @@ -33,6 +33,7 @@ jobs: fail-fast: true matrix: case: + - { name: 'mysql-scenario', title: 'MySQL 5.1.2-8.0.15 (30)' } - { name: 'undertow-scenario', title: 'Undertow 1.3.0-2.0.27 (23)' } - { name: 'webflux-scenario', title: 'Spring-WebFlux 2.x (7)' } - { name: 'zookeeper-scenario', title: 'Zookeeper 3.4.x (14)' } diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxClient.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxClient.java index cc4c06ebc5ba..fe96dd4dbde6 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxClient.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxClient.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.List; +import java.util.Objects; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import okhttp3.OkHttpClient; @@ -73,6 +74,7 @@ public void connect() { InfluxDB.ResponseFormat.MSGPACK ); influx.query(new Query("CREATE DATABASE " + database)); + influx.enableGzip(); influx.enableBatch(config.getActions(), config.getDuration(), TimeUnit.MILLISECONDS); influx.setDatabase(database); @@ -99,7 +101,7 @@ public List query(Query query) throws IOException { } try { - QueryResult result = getInflux().query(query); + QueryResult result = getInflux().query(new Query(query.getCommand())); if (result.hasError()) { throw new IOException(result.getError()); } @@ -136,6 +138,22 @@ public QueryResult.Series queryForSingleSeries(Query query) throws IOException { return series.get(0); } + /** + * Execute a query against InfluxDB with a `select count(*)` statement and return the count only. + * + * @throws IOException if there is an error on the InfluxDB server or communication error + */ + public int getCounter(Query query) throws IOException { + QueryResult.Series series = queryForSingleSeries(query); + if (log.isDebugEnabled()) { + log.debug("SQL: {} result: {}", query.getCommand(), series); + } + if (Objects.isNull(series)) { + return 0; + } + return ((Number) series.getValues().get(0).get(1)).intValue(); + } + /** * Data management, to drop a time-series by measurement and time-series name specified. If an exception isn't * thrown, it means execution success. Notice, drop series don't support to drop series by range diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxModelConstants.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxConstants.java similarity index 62% rename from oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxModelConstants.java rename to oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxConstants.java index 18dd47dc2816..33bc7dbc09f5 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxModelConstants.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxConstants.java @@ -18,9 +18,31 @@ package org.apache.skywalking.oap.server.storage.plugin.influxdb; -public interface InfluxModelConstants { - /** - * Override column because the 'duration' is the identifier of InfluxDB. - */ - String DURATION = "dur"; +public interface InfluxConstants { + String ID_COLUMN = "id"; + + String NAME = "\"name\""; + + String ALL_FIELDS = "*::field"; + + String SORT_DES = "top"; + + String SORT_ASC = "bottom"; + + String DURATION = "\"" + "duration" + "\""; + + interface TagName { + + String ID_COLUMN = "_id"; + + String NAME = "_name"; + + String ENTITY_ID = "_entity_id"; + + String TIME_BUCKET = "_time_bucket"; + + String NODE_TYPE = "_node_type"; + + String SERVICE_ID = "_service_id"; + } } diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageProvider.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageProvider.java index e501b4f41920..ae677488cac7 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageProvider.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageProvider.java @@ -23,6 +23,7 @@ import org.apache.skywalking.oap.server.core.storage.IBatchDAO; import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO; import org.apache.skywalking.oap.server.core.storage.StorageDAO; +import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO; @@ -46,10 +47,10 @@ import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.InfluxStorageDAO; import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.AggregationQuery; import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.AlarmQuery; -import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.InfluxMetadataQueryDAO; -import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.InfluxNetworkAddressAlias; import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.LogQuery; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.MetadataQuery; import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.MetricsQuery; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.NetworkAddressAliasDAO; import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.ProfileTaskLogQuery; import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.ProfileTaskQuery; import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.ProfileThreadSnapshotQuery; @@ -60,7 +61,7 @@ @Slf4j public class InfluxStorageProvider extends ModuleProvider { private InfluxStorageConfig config; - private InfluxClient influxClient; + private InfluxClient client; public InfluxStorageProvider() { config = new InfluxStorageConfig(); @@ -83,35 +84,42 @@ public ModuleConfig createConfigBeanIfAbsent() { @Override public void prepare() throws ServiceNotProvidedException { - influxClient = new InfluxClient(config); + client = new InfluxClient(config); - this.registerServiceImplementation(IBatchDAO.class, new BatchDAO(influxClient)); - this.registerServiceImplementation(StorageDAO.class, new InfluxStorageDAO(influxClient)); + this.registerServiceImplementation(IBatchDAO.class, new BatchDAO(client)); + this.registerServiceImplementation(StorageDAO.class, new InfluxStorageDAO(client)); - this.registerServiceImplementation(INetworkAddressAliasDAO.class, new InfluxNetworkAddressAlias(influxClient)); - this.registerServiceImplementation(IMetadataQueryDAO.class, new InfluxMetadataQueryDAO(influxClient)); + this.registerServiceImplementation(INetworkAddressAliasDAO.class, new NetworkAddressAliasDAO(client)); + this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQuery(client)); - this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQuery(influxClient)); - this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQuery(influxClient)); - this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQuery(influxClient)); - this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQuery(influxClient)); - this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQuery(influxClient)); - this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQuery(influxClient)); - this.registerServiceImplementation(ILogQueryDAO.class, new LogQuery(influxClient)); + this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQuery(client)); + this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQuery(client)); + this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQuery(client)); + this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQuery(client)); + this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQuery(client)); + this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQuery(client)); + this.registerServiceImplementation(ILogQueryDAO.class, new LogQuery(client)); - this.registerServiceImplementation(IProfileTaskQueryDAO.class, new ProfileTaskQuery(influxClient)); + this.registerServiceImplementation(IProfileTaskQueryDAO.class, new ProfileTaskQuery(client)); this.registerServiceImplementation( - IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQuery(influxClient)); + IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQuery(client)); this.registerServiceImplementation( - IProfileTaskLogQueryDAO.class, new ProfileTaskLogQuery(influxClient, config.getFetchTaskLogMaxSize())); + IProfileTaskLogQueryDAO.class, new ProfileTaskLogQuery(client, config.getFetchTaskLogMaxSize())); this.registerServiceImplementation( - IHistoryDeleteDAO.class, new HistoryDeleteDAO(influxClient)); + IHistoryDeleteDAO.class, new HistoryDeleteDAO(client)); } @Override public void start() throws ServiceNotProvidedException, ModuleStartException { - influxClient.connect(); + client.connect(); + + InfluxTableInstaller installer = new InfluxTableInstaller(getManager()); + try { + installer.install(client); + } catch (StorageException e) { + throw new ModuleStartException(e.getMessage(), e); + } } @Override diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/InfluxNetworkAddressAlias.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxTableInstaller.java similarity index 52% rename from oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/InfluxNetworkAddressAlias.java rename to oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxTableInstaller.java index 90b47e22d9ba..585df52d0f5c 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/InfluxNetworkAddressAlias.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxTableInstaller.java @@ -16,22 +16,28 @@ * */ -package org.apache.skywalking.oap.server.storage.plugin.influxdb.query; +package org.apache.skywalking.oap.server.storage.plugin.influxdb; -import java.util.List; -import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias; -import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO; -import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; +import org.apache.skywalking.oap.server.core.storage.StorageException; +import org.apache.skywalking.oap.server.core.storage.model.Model; +import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller; +import org.apache.skywalking.oap.server.library.client.Client; +import org.apache.skywalking.oap.server.library.module.ModuleManager; -public class InfluxNetworkAddressAlias implements INetworkAddressAliasDAO { - private InfluxClient client; +public class InfluxTableInstaller extends ModelInstaller { - public InfluxNetworkAddressAlias(final InfluxClient client) { - this.client = client; + public InfluxTableInstaller(ModuleManager moduleManager) { + super(moduleManager); } @Override - public List loadLastUpdate(final long timeBucket) { - return null; + protected boolean isExists(final Client client, final Model model) throws StorageException { + TableMetaInfo.addModel(model); + return true; + } + + @Override + protected void createTable(final Client client, final Model model) throws StorageException { + // Automatically create table } } diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java index 4cc9e806acf3..3d73663172ee 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java @@ -18,18 +18,80 @@ package org.apache.skywalking.oap.server.storage.plugin.influxdb; +import com.google.common.collect.Maps; import java.util.HashMap; +import java.util.List; import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic; +import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; +import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic; +import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; +import org.apache.skywalking.oap.server.core.analysis.record.Record; +import org.apache.skywalking.oap.server.core.storage.model.ColumnName; import org.apache.skywalking.oap.server.core.storage.model.Model; +import org.apache.skywalking.oap.server.core.storage.model.ModelColumn; +@Getter +@Builder +@AllArgsConstructor public class TableMetaInfo { - private static Map TABLES = new HashMap<>(); + private static final Map TABLES = new HashMap<>(); + + private Map storageAndColumnMap; + private Map storageAndTagMap; + private Model model; public static void addModel(Model model) { - TABLES.put(model.getName(), model); + final List columns = model.getColumns(); + final Map storageAndTagMap = Maps.newHashMap(); + final Map storageAndColumnMap = Maps.newHashMap(); + columns.forEach(column -> { + ColumnName columnName = column.getColumnName(); + storageAndColumnMap.put(columnName.getStorageName(), columnName.getName()); + }); + + if (model.getName().endsWith("_traffic")) { + // instance_traffic name, service_id + // endpoint_traffic name, service_id + storageAndTagMap.put(InstanceTraffic.NAME, InfluxConstants.TagName.NAME); + if (InstanceTraffic.INDEX_NAME.equals(model.getName()) + || EndpointTraffic.INDEX_NAME.equals(model.getName())) { + storageAndTagMap.put(EndpointTraffic.SERVICE_ID, InfluxConstants.TagName.SERVICE_ID); + } else { + // service_traffic name, node_type + storageAndTagMap.put(ServiceTraffic.NODE_TYPE, InfluxConstants.TagName.NODE_TYPE); + } + } else { + + // Specifies ENTITY_ID, TIME_BUCKET, NODE_TYPE, SERVICE_ID as tag + if (storageAndColumnMap.containsKey(Metrics.ENTITY_ID)) { + storageAndTagMap.put(Metrics.ENTITY_ID, InfluxConstants.TagName.ENTITY_ID); + } + if (storageAndColumnMap.containsKey(Record.TIME_BUCKET)) { + storageAndTagMap.put(Record.TIME_BUCKET, InfluxConstants.TagName.TIME_BUCKET); + } + if (storageAndColumnMap.containsKey(ServiceTraffic.NODE_TYPE)) { + storageAndTagMap.put(ServiceTraffic.NODE_TYPE, InfluxConstants.TagName.NODE_TYPE); + } + if (storageAndColumnMap.containsKey(SegmentRecord.SERVICE_ID)) { + storageAndTagMap.put(SegmentRecord.SERVICE_ID, InfluxConstants.TagName.SERVICE_ID); + } + } + + TableMetaInfo info = TableMetaInfo.builder() + .model(model) + .storageAndTagMap(storageAndTagMap) + .storageAndColumnMap(storageAndColumnMap) + .build(); + TABLES.put(model.getName(), info); } - public static Model get(String moduleName) { + public static TableMetaInfo get(String moduleName) { return TABLES.get(moduleName); } + } diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/InfluxInsertRequest.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/InfluxInsertRequest.java index eaa0f4dbe738..f98437216543 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/InfluxInsertRequest.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/InfluxInsertRequest.java @@ -21,7 +21,6 @@ import com.google.common.collect.Maps; import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.StorageData; import org.apache.skywalking.oap.server.core.storage.model.Model; @@ -29,15 +28,13 @@ import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.library.client.request.UpdateRequest; -import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants; import org.influxdb.dto.Point; /** * InfluxDB Point wrapper. */ public class InfluxInsertRequest implements InsertRequest, UpdateRequest { - public static final String ID = "id"; - private Point.Builder builder; private Map fields = Maps.newHashMap(); @@ -57,9 +54,8 @@ public InfluxInsertRequest(Model model, StorageData storageData, StorageBuilder } } builder = Point.measurement(model.getName()) - .addField(ID, storageData.id()) - .fields(fields) - .tag(InfluxClient.TAG_TIME_BUCKET, String.valueOf(fields.get(Metrics.TIME_BUCKET))); + .addField(InfluxConstants.ID_COLUMN, storageData.id()) + .fields(fields); } public InfluxInsertRequest time(long time, TimeUnit unit) { @@ -68,9 +64,7 @@ public InfluxInsertRequest time(long time, TimeUnit unit) { } public InfluxInsertRequest addFieldAsTag(String fieldName, String tagName) { - if (fields.containsKey(fieldName)) { - builder.tag(tagName, String.valueOf(fields.get(fieldName))); - } + builder.tag(tagName, String.valueOf(fields.get(fieldName))); return this; } diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/MetricsDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/MetricsDAO.java index eb3008f5ddfe..86f5c8c0d543 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/MetricsDAO.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/MetricsDAO.java @@ -26,30 +26,27 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; -import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic; -import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic; -import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.model.Model; -import org.apache.skywalking.oap.server.core.storage.model.ModelColumn; import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.library.client.request.UpdateRequest; import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.TableMetaInfo; import org.influxdb.dto.QueryResult; import org.influxdb.querybuilder.SelectQueryImpl; import org.influxdb.querybuilder.WhereQueryImpl; +import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.ALL_FIELDS; import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.contains; import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select; +@Slf4j public class MetricsDAO implements IMetricsDAO { - public static final String TAG_ENTITY_ID = "_entity_id"; - public static final String TAG_ENDPOINT_OWNER_SERVICE = "_service_id"; - public static final String TAG_ENDPOINT_NAME = "_endpoint_name"; private final StorageBuilder storageBuilder; private final InfluxClient client; @@ -62,10 +59,13 @@ public MetricsDAO(InfluxClient client, StorageBuilder storageBuilder) { @Override public List multiGet(Model model, List ids) throws IOException { WhereQueryImpl query = select() - .regex("*::field") + .raw(ALL_FIELDS) .from(client.getDatabase(), model.getName()) .where(contains("id", Joiner.on("|").join(ids))); QueryResult.Series series = client.queryForSingleSeries(query); + if (log.isDebugEnabled()) { + log.debug("SQL: {} result: {}", query.getCommand(), series); + } if (series == null) { return Collections.emptyList(); @@ -73,10 +73,9 @@ public List multiGet(Model model, List ids) throws IOException final List metrics = Lists.newArrayList(); List columns = series.getColumns(); - Map storageAndColumnNames = Maps.newHashMap(); - for (ModelColumn column : model.getColumns()) { - storageAndColumnNames.put(column.getColumnName().getStorageName(), column.getColumnName().getName()); - } + + TableMetaInfo metaInfo = TableMetaInfo.get(model.getName()); + Map storageAndColumnMap = metaInfo.getStorageAndColumnMap(); series.getValues().forEach(values -> { Map data = Maps.newHashMap(); @@ -87,7 +86,7 @@ public List multiGet(Model model, List ids) throws IOException value = ((StorageDataComplexObject) value).toStorageData(); } - data.put(storageAndColumnNames.get(columns.get(i)), value); + data.put(storageAndColumnMap.get(columns.get(i)), value); } metrics.add(storageBuilder.map2Data(data)); @@ -99,16 +98,15 @@ public List multiGet(Model model, List ids) throws IOException @Override public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException { final long timestamp = TimeBucket.getTimestamp(metrics.getTimeBucket(), model.getDownsampling()); - if (metrics instanceof EndpointTraffic || metrics instanceof ServiceTraffic || metrics instanceof InstanceTraffic) { - return new InfluxInsertRequest(model, metrics, storageBuilder) - .time(timestamp, TimeUnit.MILLISECONDS) - .addFieldAsTag(EndpointTraffic.SERVICE_ID, TAG_ENDPOINT_OWNER_SERVICE) - .addFieldAsTag(EndpointTraffic.NAME, TAG_ENDPOINT_NAME); - } else { - return new InfluxInsertRequest(model, metrics, storageBuilder) - .time(timestamp, TimeUnit.MILLISECONDS) - .addFieldAsTag(Metrics.ENTITY_ID, TAG_ENTITY_ID); - } + TableMetaInfo tableMetaInfo = TableMetaInfo.get(model.getName()); + + final InfluxInsertRequest request = new InfluxInsertRequest(model, metrics, storageBuilder) + .time(timestamp, TimeUnit.MILLISECONDS); + + tableMetaInfo.getStorageAndTagMap().forEach((field, tag) -> { + request.addFieldAsTag(field, tag); + }); + return request; } @Override diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/NoneStreamDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/NoneStreamDAO.java index ae00288aa677..3d89e419fa0a 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/NoneStreamDAO.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/NoneStreamDAO.java @@ -23,15 +23,13 @@ import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.analysis.config.NoneStream; -import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord; import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO; import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; -import org.influxdb.dto.Point; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.TableMetaInfo; public class NoneStreamDAO implements INoneStreamDAO { - public static final String TAG_SERVICE_ID = "_service_id"; private static final int PADDING_SIZE = 1_000_000; private static final AtomicRangeInteger SUFFIX = new AtomicRangeInteger(0, PADDING_SIZE); @@ -45,13 +43,14 @@ public NoneStreamDAO(InfluxClient client, StorageBuilder storageBuil @Override public void insert(final Model model, final NoneStream noneStream) throws IOException { - final long timestamp = TimeBucket.getTimestamp( - noneStream.getTimeBucket(), model.getDownsampling()) * PADDING_SIZE + SUFFIX.getAndIncrement(); - - Point point = new InfluxInsertRequest(model, noneStream, storageBuilder) - .time(timestamp, TimeUnit.NANOSECONDS) - .addFieldAsTag(ProfileTaskRecord.SERVICE_ID, TAG_SERVICE_ID).getPoint(); - - client.write(point); + final long timestamp = TimeBucket.getTimestamp(noneStream.getTimeBucket(), model.getDownsampling()) + * PADDING_SIZE + SUFFIX.getAndIncrement(); + + final InfluxInsertRequest request = new InfluxInsertRequest(model, noneStream, storageBuilder) + .time(timestamp, TimeUnit.NANOSECONDS); + TableMetaInfo.get(model.getName()).getStorageAndTagMap().forEach((field, tag) -> { + request.addFieldAsTag(field, tag); + }); + client.write(request.getPoint()); } } diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/RecordDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/RecordDAO.java index fae2e4109cd8..63739c1708d3 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/RecordDAO.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/RecordDAO.java @@ -22,16 +22,15 @@ import java.util.concurrent.TimeUnit; import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; -import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.storage.IRecordDAO; import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.TableMetaInfo; public class RecordDAO implements IRecordDAO { - public static final String TAG_SERVICE_ID = "_service_id"; private static final int PADDING_SIZE = 1_000_000; private static final AtomicRangeInteger SUFFIX = new AtomicRangeInteger(0, PADDING_SIZE); @@ -45,11 +44,14 @@ public RecordDAO(InfluxClient client, StorageBuilder storageBuilder) { @Override public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException { - final long timestamp = TimeBucket.getTimestamp( - record.getTimeBucket(), model.getDownsampling()) * PADDING_SIZE + SUFFIX.getAndIncrement(); + final long timestamp = TimeBucket.getTimestamp(record.getTimeBucket(), model.getDownsampling()) + * PADDING_SIZE + SUFFIX.getAndIncrement(); - return new InfluxInsertRequest(model, record, storageBuilder) - .time(timestamp, TimeUnit.NANOSECONDS) - .addFieldAsTag(SegmentRecord.SERVICE_ID, TAG_SERVICE_ID); + final InfluxInsertRequest request = new InfluxInsertRequest(model, record, storageBuilder) + .time(timestamp, TimeUnit.NANOSECONDS); + TableMetaInfo.get(model.getName()).getStorageAndTagMap().forEach((field, tag) -> { + request.addFieldAsTag(field, tag); + }); + return request; } } diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/AggregationQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/AggregationQuery.java index 2ab6b213710f..48cd1efc47ec 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/AggregationQuery.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/AggregationQuery.java @@ -25,13 +25,11 @@ import java.util.List; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.analysis.DownSampling; -import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic; -import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic; import org.apache.skywalking.oap.server.core.query.entity.Order; import org.apache.skywalking.oap.server.core.query.entity.TopNEntity; import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; -import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.MetricsDAO; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants; import org.influxdb.dto.QueryResult; import org.influxdb.querybuilder.SelectQueryImpl; import org.influxdb.querybuilder.SelectSubQueryImpl; @@ -68,7 +66,7 @@ public List getServiceInstanceTopN(String serviceId, String indName, long startTB, long endTB, Order order) throws IOException { return getTopNEntity( downsampling, indName, - subQuery(InstanceTraffic.SERVICE_ID, serviceId, indName, valueCName, startTB, endTB), order, topN + subQuery(InfluxConstants.TagName.SERVICE_ID, serviceId, indName, valueCName, startTB, endTB), order, topN ); } @@ -84,7 +82,7 @@ public List getEndpointTopN(String serviceId, String indName, String long startTB, long endTB, Order order) throws IOException { return getTopNEntity( downsampling, indName, - subQuery(EndpointTraffic.SERVICE_ID, serviceId, indName, valueCName, startTB, endTB), order, topN + subQuery(InfluxConstants.TagName.SERVICE_ID, serviceId, indName, valueCName, startTB, endTB), order, topN ); } @@ -95,14 +93,14 @@ private List getTopNEntity(DownSampling downsampling, int topN) throws IOException { // Have to re-sort here. Because the function, top()/bottom(), get the result ordered by the `time`. Comparator comparator = DESCENDING; - String functionName = "top"; + String functionName = InfluxConstants.SORT_DES; if (order == Order.ASC) { - functionName = "bottom"; + functionName = InfluxConstants.SORT_ASC; comparator = ASCENDING; } SelectQueryImpl query = select().function(functionName, "mean", topN).as("value") - .column(MetricsDAO.TAG_ENTITY_ID) + .column(InfluxConstants.TagName.ENTITY_ID) .from(client.getDatabase(), measurement); query.setSubQuery(subQuery); @@ -135,7 +133,7 @@ private SelectSubQueryImpl subQuery(String serviceColumnName, S .and(eq(serviceColumnName, serviceId)) .and(gte(InfluxClient.TIME, InfluxClient.timeInterval(startTB))) .and(lte(InfluxClient.TIME, InfluxClient.timeInterval(endTB))) - .groupBy(MetricsDAO.TAG_ENTITY_ID); + .groupBy(InfluxConstants.TagName.ENTITY_ID); } private SelectSubQueryImpl subQuery(String name, String columnName, long startTB, long endTB) { @@ -143,7 +141,7 @@ private SelectSubQueryImpl subQuery(String name, String columnN .where() .and(gte(InfluxClient.TIME, InfluxClient.timeInterval(startTB))) .and(lte(InfluxClient.TIME, InfluxClient.timeInterval(endTB))) - .groupBy(MetricsDAO.TAG_ENTITY_ID); + .groupBy(InfluxConstants.TagName.ENTITY_ID); } private static final Comparator ASCENDING = Comparator.comparingLong(TopNEntity::getValue); diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/InfluxMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/InfluxMetadataQueryDAO.java deleted file mode 100644 index b300f85f3292..000000000000 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/InfluxMetadataQueryDAO.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.oap.server.storage.plugin.influxdb.query; - -import com.google.common.base.Strings; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import org.apache.skywalking.oap.server.core.analysis.IDManager; -import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic; -import org.apache.skywalking.oap.server.core.query.entity.Database; -import org.apache.skywalking.oap.server.core.query.entity.Endpoint; -import org.apache.skywalking.oap.server.core.query.entity.Service; -import org.apache.skywalking.oap.server.core.query.entity.ServiceInstance; -import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO; -import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; -import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.MetricsDAO; -import org.influxdb.dto.Query; -import org.influxdb.dto.QueryResult; -import org.influxdb.querybuilder.SelectQueryImpl; -import org.influxdb.querybuilder.WhereQueryImpl; - -import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.contains; -import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq; -import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select; - -public class InfluxMetadataQueryDAO implements IMetadataQueryDAO { - private InfluxClient client; - // 'name' is InfluxDB keyword, so escapes it - private static final String ENDPOINT_NAME = '\"' + EndpointTraffic.NAME + '\"'; - - public InfluxMetadataQueryDAO(final InfluxClient client) { - this.client = client; - } - - @Override - public int numOfService(final long startTimestamp, final long endTimestamp) throws IOException { - return 0; - } - - @Override - public int numOfEndpoint() throws IOException { - final SelectQueryImpl selectQuery = select() - .count(EndpointTraffic.ENTITY_ID) - .from(client.getDatabase(), EndpointTraffic.INDEX_NAME); - - Query query = new Query(selectQuery.getCommand()); - - final QueryResult.Series series = client.queryForSingleSeries(query); - if (series == null) { - return 0; - } - - return ((Number) series.getValues().get(0).get(1)).intValue(); - } - - @Override - public int numOfConjectural(final int nodeTypeValue) throws IOException { - return 0; - } - - @Override - public List getAllServices(final long startTimestamp, final long endTimestamp) throws IOException { - return null; - } - - @Override - public List getAllBrowserServices(final long startTimestamp, final long endTimestamp) throws IOException { - return null; - } - - @Override - public List getAllDatabases() throws IOException { - return null; - } - - @Override - public List searchServices(final long startTimestamp, - final long endTimestamp, - final String keyword) throws IOException { - return null; - } - - @Override - public Service searchService(final String serviceCode) throws IOException { - return null; - } - - @Override - public List searchEndpoint(final String keyword, - final String serviceId, - final int limit) throws IOException { - WhereQueryImpl endpointQuery = select() - .column(EndpointTraffic.SERVICE_ID) - .column(ENDPOINT_NAME) - .from(client.getDatabase(), EndpointTraffic.INDEX_NAME) - .where(); - endpointQuery.where(eq(MetricsDAO.TAG_ENDPOINT_OWNER_SERVICE, String.valueOf(serviceId))); - if (!Strings.isNullOrEmpty(keyword)) { - endpointQuery.where(contains(MetricsDAO.TAG_ENDPOINT_NAME, keyword.replaceAll("/", "\\\\/"))); - } - endpointQuery.limit(limit); - - Query query = new Query(endpointQuery.getCommand()); - - final QueryResult.Series series = client.queryForSingleSeries(query); - - List list = new ArrayList<>(limit); - if (series != null) { - series.getValues().forEach(values -> { - EndpointTraffic endpointTraffic = new EndpointTraffic(); - endpointTraffic.setServiceId((String) values.get(1)); - endpointTraffic.setName((String) values.get(2)); - - Endpoint endpoint = new Endpoint(); - endpoint.setId(IDManager.EndpointID.buildId(endpointTraffic.getServiceId(), endpointTraffic.getName())); - endpoint.setName(endpointTraffic.getName()); - list.add(endpoint); - }); - } - return list; - } - - @Override - public List getServiceInstances(final long startTimestamp, - final long endTimestamp, - final String serviceId) throws IOException { - return null; - } -} diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/LogQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/LogQuery.java index fe69ac04e9c7..6b6019d394c7 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/LogQuery.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/LogQuery.java @@ -32,8 +32,9 @@ import org.apache.skywalking.oap.server.core.query.entity.Logs; import org.apache.skywalking.oap.server.core.query.entity.Pagination; import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO; +import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject; import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; -import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.RecordDAO; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants; import org.elasticsearch.common.Strings; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; @@ -51,6 +52,7 @@ import static org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord.STATUS_CODE; import static org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord.TIMESTAMP; import static org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord.TRACE_ID; +import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.ALL_FIELDS; import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq; import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte; import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.lte; @@ -68,11 +70,11 @@ public LogQuery(InfluxClient client) { public Logs queryLogs(String metricName, int serviceId, int serviceInstanceId, String endpointId, String traceId, LogState state, String stateCode, Pagination paging, int from, int limit, long startTB, long endTB) throws IOException { - WhereQueryImpl recallQuery = select().regex("*::field") + WhereQueryImpl recallQuery = select().raw(ALL_FIELDS) .from(client.getDatabase(), metricName) .where(); if (serviceId != Const.NONE) { - recallQuery.and(eq(RecordDAO.TAG_SERVICE_ID, String.valueOf(serviceId))); + recallQuery.and(eq(InfluxConstants.TagName.SERVICE_ID, String.valueOf(serviceId))); } if (serviceInstanceId != Const.NONE) { recallQuery.and(eq(SERVICE_INSTANCE_ID, serviceInstanceId)); @@ -131,8 +133,12 @@ public Logs queryLogs(String metricName, int serviceId, int serviceInstanceId, S Map data = Maps.newHashMap(); Log log = new Log(); - for (int i = 0; i < columns.size(); i++) { - data.put(columns.get(i), values.get(i)); + for (int i = 1; i < columns.size(); i++) { + Object value = values.get(i); + if (value instanceof StorageDataComplexObject) { + value = ((StorageDataComplexObject) value).toStorageData(); + } + data.put(columns.get(i), value); } log.setContent((String) data.get(CONTENT)); log.setContentType(ContentType.instanceOf((int) data.get(CONTENT_TYPE))); diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetadataQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetadataQuery.java new file mode 100644 index 000000000000..bd041974aee5 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetadataQuery.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.influxdb.query; + +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.oap.server.core.analysis.NodeType; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; +import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic; +import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic; +import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic; +import org.apache.skywalking.oap.server.core.query.entity.Attribute; +import org.apache.skywalking.oap.server.core.query.entity.Database; +import org.apache.skywalking.oap.server.core.query.entity.Endpoint; +import org.apache.skywalking.oap.server.core.query.entity.Language; +import org.apache.skywalking.oap.server.core.query.entity.LanguageTrans; +import org.apache.skywalking.oap.server.core.query.entity.Service; +import org.apache.skywalking.oap.server.core.query.entity.ServiceInstance; +import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import org.influxdb.querybuilder.SelectQueryImpl; +import org.influxdb.querybuilder.SelectSubQueryImpl; +import org.influxdb.querybuilder.WhereQueryImpl; +import org.influxdb.querybuilder.WhereSubQueryImpl; + +import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.ID_COLUMN; +import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.NAME; +import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.TagName; +import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.contains; +import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq; +import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte; +import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select; + +@Slf4j +public class MetadataQuery implements IMetadataQueryDAO { + private static final Gson GSON = new Gson(); + private final InfluxClient client; + + public MetadataQuery(final InfluxClient client) { + this.client = client; + } + + @Override + public int numOfService(final long startTimestamp, final long endTimestamp) throws IOException { + WhereQueryImpl query = select().raw("count(distinct " + ID_COLUMN + ")") + .from(client.getDatabase(), ServiceTraffic.INDEX_NAME) + .where() + .and( + eq(InfluxConstants.TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value()) + )); + return client.getCounter(query); + } + + @Override + public int numOfEndpoint() throws IOException { + SelectQueryImpl query = select() + .raw("count(distinct " + ID_COLUMN + ")") + .from(client.getDatabase(), EndpointTraffic.INDEX_NAME); + return client.getCounter(query); + } + + @Override + public int numOfConjectural(final int nodeTypeValue) throws IOException { + WhereQueryImpl query = select().raw("count(distinct " + ID_COLUMN + ")") + .from(client.getDatabase(), ServiceTraffic.INDEX_NAME) + .where(eq( + InfluxConstants.TagName.NODE_TYPE, + String.valueOf(nodeTypeValue) + )); + return client.getCounter(query); + } + + @Override + public List getAllServices(final long startTimestamp, final long endTimestamp) throws IOException { + SelectSubQueryImpl subQuery = select() + .fromSubQuery(client.getDatabase()) + .column(ID_COLUMN).column(NAME) + .from(ServiceTraffic.INDEX_NAME) + .where(eq(InfluxConstants.TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value()))) + .groupBy(TagName.NAME, TagName.NODE_TYPE); + SelectQueryImpl query = select(ID_COLUMN, NAME).from(client.getDatabase()); + query.setSubQuery(subQuery); + return buildServices(query); + } + + @Override + public List getAllBrowserServices(long startTimestamp, long endTimestamp) throws IOException { + WhereQueryImpl query = select(ID_COLUMN, NAME) + .from(client.getDatabase(), ServiceTraffic.INDEX_NAME) + .where(eq(InfluxConstants.TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value()))); + return buildServices(query); + } + + @Override + public List getAllDatabases() throws IOException { + SelectSubQueryImpl subQuery = select() + .fromSubQuery(client.getDatabase()) + .column(ID_COLUMN).column(NAME) + .from(ServiceTraffic.INDEX_NAME) + .where(eq(InfluxConstants.TagName.NODE_TYPE, NodeType.Database.value())) + .groupBy(TagName.NAME, TagName.NODE_TYPE); + SelectQueryImpl query = select(ID_COLUMN, NAME).from(client.getDatabase()); + query.setSubQuery(subQuery); + QueryResult.Series series = client.queryForSingleSeries(query); + if (log.isDebugEnabled()) { + log.debug("SQL: {} result: {}", query.getCommand(), series); + } + + List databases = Lists.newArrayList(); + if (Objects.nonNull(series)) { + for (List values : series.getValues()) { + Database database = new Database(); + database.setId((String) values.get(1)); + database.setName((String) values.get(2)); + databases.add(database); + } + } + return databases; + } + + @Override + public List searchServices(long startTimestamp, long endTimestamp, String keyword) throws IOException { + WhereSubQueryImpl, SelectQueryImpl> subQuery = select() + .fromSubQuery(client.getDatabase()) + .column(ID_COLUMN) + .column(NAME) + .from(ServiceTraffic.INDEX_NAME) + .where(eq(InfluxConstants.TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value()))); + if (!Strings.isNullOrEmpty(keyword)) { + subQuery.and(contains(ServiceTraffic.NAME, keyword)); + } + subQuery.groupBy(TagName.NAME, TagName.NODE_TYPE); + + SelectQueryImpl query = select(ID_COLUMN, NAME).from(client.getDatabase()); + query.setSubQuery(subQuery); + return buildServices(query); + } + + @Override + public Service searchService(String serviceCode) throws IOException { + WhereQueryImpl query = select(ID_COLUMN, NAME) + .from(client.getDatabase(), ServiceTraffic.INDEX_NAME) + .where(eq(InfluxConstants.TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value()))); + query.and(eq(ServiceTraffic.NODE_TYPE, serviceCode)); + return buildServices(query).get(0); + } + + @Override + public List searchEndpoint(final String keyword, + final String serviceId, + final int limit) throws IOException { + WhereSubQueryImpl, SelectQueryImpl> subQuery = select() + .fromSubQuery(client.getDatabase()) + .column(ID_COLUMN) + .column(NAME) + .from(EndpointTraffic.INDEX_NAME) + .where(eq(InfluxConstants.TagName.SERVICE_ID, String.valueOf(serviceId))); + if (!Strings.isNullOrEmpty(keyword)) { + subQuery.where(contains(EndpointTraffic.NAME, keyword.replaceAll("/", "\\\\/"))); + } + subQuery.groupBy(TagName.NAME, TagName.SERVICE_ID); + SelectQueryImpl query = select(ID_COLUMN, NAME) + .from(client.getDatabase()); + query.setSubQuery(subQuery); + query.limit(limit); + + final QueryResult.Series series = client.queryForSingleSeries(query); + if (log.isDebugEnabled()) { + log.debug("SQL: {} result: {}", query.getCommand(), series); + } + + List list = new ArrayList<>(limit); + if (series != null) { + series.getValues().forEach(values -> { + Endpoint endpoint = new Endpoint(); + endpoint.setId((String) values.get(1)); + endpoint.setName((String) values.get(2)); + list.add(endpoint); + }); + } + return list; + } + + @Override + public List getServiceInstances(final long startTimestamp, + final long endTimestamp, + final String serviceId) throws IOException { + final long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(startTimestamp); + + SelectSubQueryImpl subQuery = select() + .fromSubQuery(client.getDatabase()) + .column(ID_COLUMN).column(NAME).column(InstanceTraffic.PROPERTIES) + .from(InstanceTraffic.INDEX_NAME) + .where() + .and(gte(InstanceTraffic.LAST_PING_TIME_BUCKET, minuteTimeBucket)) + .and(eq(InfluxConstants.TagName.SERVICE_ID, serviceId)) + .groupBy(TagName.NAME, TagName.SERVICE_ID); + + SelectQueryImpl query = select().column(ID_COLUMN) + .column(NAME) + .column(InstanceTraffic.PROPERTIES) + .from(client.getDatabase(), InstanceTraffic.INDEX_NAME); + query.setSubQuery(subQuery); + + QueryResult.Series series = client.queryForSingleSeries(query); + if (log.isDebugEnabled()) { + log.debug("SQL: {} result: {}", query.getCommand(), series); + } + + if (Objects.isNull(series)) { + return Collections.EMPTY_LIST; + } + + List> result = series.getValues(); + List instances = Lists.newArrayList(); + for (List values : result) { + ServiceInstance serviceInstance = new ServiceInstance(); + + serviceInstance.setId((String) values.get(1)); + serviceInstance.setName((String) values.get(2)); + serviceInstance.setInstanceUUID(serviceInstance.getId()); + + String propertiesString = (String) values.get(3); + if (!Strings.isNullOrEmpty(propertiesString)) { + JsonObject properties = GSON.fromJson(propertiesString, JsonObject.class); + for (Map.Entry property : properties.entrySet()) { + String key = property.getKey(); + String value = property.getValue().getAsString(); + if (key.equals(InstanceTraffic.PropertyUtil.LANGUAGE)) { + serviceInstance.setLanguage(LanguageTrans.INSTANCE.value(value)); + } else { + serviceInstance.getAttributes().add(new Attribute(key, value)); + } + + } + } else { + serviceInstance.setLanguage(Language.UNKNOWN); + } + instances.add(serviceInstance); + } + return instances; + } + + private List buildServices(Query query) throws IOException { + QueryResult.Series series = client.queryForSingleSeries(query); + if (log.isDebugEnabled()) { + log.debug("SQL: {} result: {}", query.getCommand(), series); + } + + ArrayList services = Lists.newArrayList(); + if (Objects.nonNull(series)) { + for (List values : series.getValues()) { + Service service = new Service(); + service.setId((String) values.get(1)); + service.setName((String) values.get(2)); + services.add(service); + } + } + return services; + } +} diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetricsQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetricsQuery.java index 09b2107e6377..a28f1c6d2040 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetricsQuery.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetricsQuery.java @@ -38,14 +38,16 @@ import org.apache.skywalking.oap.server.core.query.sql.Where; import org.apache.skywalking.oap.server.core.storage.model.ModelColumn; import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants; import org.apache.skywalking.oap.server.storage.plugin.influxdb.TableMetaInfo; -import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.MetricsDAO; import org.influxdb.dto.QueryResult; import org.influxdb.querybuilder.SelectQueryImpl; import org.influxdb.querybuilder.SelectionQueryImpl; import org.influxdb.querybuilder.WhereQueryImpl; +import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.ID_COLUMN; import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.contains; import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq; import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte; @@ -74,7 +76,7 @@ public IntValues getValues(String measurement, DownSampling downsampling, long s WhereQueryImpl queryWhereQuery = query.from(client.getDatabase(), measurement).where(); Map> columnTypes = Maps.newHashMap(); - for (ModelColumn column : TableMetaInfo.get(measurement).getColumns()) { + for (ModelColumn column : TableMetaInfo.get(measurement).getModel().getColumns()) { columnTypes.put(column.getColumnName().getStorageName(), column.getType()); } @@ -84,6 +86,7 @@ public IntValues getValues(String measurement, DownSampling downsampling, long s StringBuilder clauseBuilder = new StringBuilder(); for (KeyValues kv : whereKeyValues) { final List values = kv.getValues(); + ids.addAll(values); Class type = columnTypes.get(kv.getKey()); if (values.size() == 1) { @@ -93,16 +96,15 @@ public IntValues getValues(String measurement, DownSampling downsampling, long s } clauseBuilder.append(kv.getKey()).append("=").append(value).append(" OR "); } else { - ids.addAll(values); if (type == String.class) { clauseBuilder.append(kv.getKey()) .append(" =~ /") .append(Joiner.on("|").join(values)) .append("/ OR "); - continue; - } - for (String value : values) { - clauseBuilder.append(kv.getKey()).append(" = '").append(value).append("' OR "); + } else { + for (String value : values) { + clauseBuilder.append(kv.getKey()).append(" = '").append(value).append("' OR "); + } } } } @@ -111,17 +113,17 @@ public IntValues getValues(String measurement, DownSampling downsampling, long s queryWhereQuery .and(gte(InfluxClient.TIME, InfluxClient.timeInterval(startTB, downsampling))) .and(lte(InfluxClient.TIME, InfluxClient.timeInterval(endTB, downsampling))) - .groupBy(MetricsDAO.TAG_ENTITY_ID); + .groupBy(InfluxConstants.TagName.ENTITY_ID); IntValues intValues = new IntValues(); List seriesList = client.queryForSeries(queryWhereQuery); if (log.isDebugEnabled()) { log.debug("SQL: {} result set: {}", queryWhereQuery.getCommand(), seriesList); } - if (!(seriesList == null || seriesList.isEmpty())) { + if (CollectionUtils.isNotEmpty(seriesList)) { for (QueryResult.Series series : seriesList) { KVInt kv = new KVInt(); - kv.setId(series.getTags().get(MetricsDAO.TAG_ENTITY_ID)); + kv.setId(series.getTags().get(InfluxConstants.TagName.ENTITY_ID)); Number value = (Number) series.getValues().get(0).get(1); kv.setValue(value.longValue()); @@ -139,16 +141,16 @@ public IntValues getLinearIntValues(String measurement, String valueCName) throws IOException { WhereQueryImpl query = select() - .column("id") + .column(ID_COLUMN) .column(valueCName) .from(client.getDatabase(), measurement) .where(); - if (ids != null && !ids.isEmpty()) { + if (CollectionUtils.isNotEmpty(ids)) { if (ids.size() == 1) { - query.where(eq("id", ids.get(0))); + query.where(eq(ID_COLUMN, ids.get(0))); } else { - query.where(contains("id", Joiner.on("|").join(ids))); + query.where(contains(ID_COLUMN, Joiner.on("|").join(ids))); } } List seriesList = client.queryForSeries(query); @@ -157,7 +159,7 @@ public IntValues getLinearIntValues(String measurement, } IntValues intValues = new IntValues(); - if (!(seriesList == null || seriesList.isEmpty())) { + if (CollectionUtils.isNotEmpty(seriesList)) { seriesList.get(0).getValues().forEach(values -> { KVInt kv = new KVInt(); kv.setValue(((Number) values.get(2)).longValue()); @@ -197,7 +199,7 @@ public IntValues[] getMultipleLinearIntValues(String measurement, DownSampling d .from(client.getDatabase(), measurement) .where(); - if (ids != null && !ids.isEmpty()) { + if (CollectionUtils.isNotEmpty(ids)) { if (ids.size() == 1) { query.where(eq("id", ids.get(0))); } else { @@ -212,7 +214,7 @@ public IntValues[] getMultipleLinearIntValues(String measurement, DownSampling d for (int i = 0; i < intValues.length; i++) { intValues[i] = new IntValues(); } - if (series == null || series.isEmpty()) { + if (CollectionUtils.isEmpty(series)) { return intValues; } series.get(0).getValues().forEach(values -> { @@ -253,9 +255,9 @@ public Thermodynamic getThermodynamic(String measurement, DownSampling downsampl .column(ThermodynamicMetrics.STEP) .column(ThermodynamicMetrics.NUM_OF_STEPS) .column(ThermodynamicMetrics.DETAIL_GROUP) - .column("id") + .column(ID_COLUMN) .from(client.getDatabase(), measurement) - .where(contains("id", Joiner.on("|").join(ids))); + .where(contains(ID_COLUMN, Joiner.on("|").join(ids))); Map> thermodynamicValueMatrix = new HashMap<>(); QueryResult.Series series = client.queryForSingleSeries(query); diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/NetworkAddressAliasDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/NetworkAddressAliasDAO.java new file mode 100644 index 000000000000..ef60af82a0fe --- /dev/null +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/NetworkAddressAliasDAO.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.influxdb.query; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias; +import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.TableMetaInfo; +import org.influxdb.dto.QueryResult; +import org.influxdb.querybuilder.SelectQueryImpl; +import org.influxdb.querybuilder.WhereQueryImpl; + +import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte; +import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select; + +@Slf4j +public class NetworkAddressAliasDAO implements INetworkAddressAliasDAO { + private final NetworkAddressAlias.Builder builder = new NetworkAddressAlias.Builder(); + private InfluxClient client; + + public NetworkAddressAliasDAO(final InfluxClient client) { + this.client = client; + } + + @Override + public List loadLastUpdate(final long timeBucket) { + List networkAddressAliases = new ArrayList<>(); + + WhereQueryImpl query = select().raw(InfluxConstants.ALL_FIELDS) + .from(client.getDatabase(), NetworkAddressAlias.INDEX_NAME) + .where(gte( + NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET, + timeBucket + )); + try { + QueryResult.Series series = client.queryForSingleSeries(query); + if (log.isDebugEnabled()) { + log.debug("SQL: {} result: {}", query.getCommand(), series); + } + if (Objects.isNull(series)) { + return networkAddressAliases; + } + + List> result = series.getValues(); + List columns = series.getColumns(); + + Map columnAndFieldMap = TableMetaInfo.get(NetworkAddressAlias.INDEX_NAME) + .getStorageAndColumnMap(); + for (List values : result) { + Map map = Maps.newHashMap(); + for (int i = 1; i < columns.size(); i++) { + map.put(columnAndFieldMap.get(columns.get(i)), values.get(i)); + } + networkAddressAliases.add(builder.map2Data(map)); + } + } catch (IOException e) { + log.error(e.getMessage(), e); + } + + return networkAddressAliases; + } +} diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskLogQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskLogQuery.java index cfac9db7a5d7..59982d834b3d 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskLogQuery.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskLogQuery.java @@ -19,17 +19,16 @@ package org.apache.skywalking.oap.server.storage.plugin.influxdb.query; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import java.io.IOException; import java.util.Collections; import java.util.List; -import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord; import org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLog; import org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLogOperationType; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants; import org.influxdb.dto.QueryResult; import org.influxdb.querybuilder.SelectQueryImpl; import org.influxdb.querybuilder.WhereQueryImpl; @@ -49,8 +48,8 @@ public ProfileTaskLogQuery(InfluxClient client, int fetchTaskLogMaxSize) { @Override public List getTaskLogList() throws IOException { WhereQueryImpl query = select() - .function("top", ProfileTaskLogRecord.OPERATION_TIME, fetchTaskLogMaxSize) - .column("id") + .function(InfluxConstants.SORT_DES, ProfileTaskLogRecord.OPERATION_TIME, fetchTaskLogMaxSize) + .column(InfluxConstants.ID_COLUMN) .column(ProfileTaskLogRecord.TASK_ID) .column(ProfileTaskLogRecord.INSTANCE_ID) .column(ProfileTaskLogRecord.OPERATION_TIME) @@ -65,26 +64,18 @@ public List getTaskLogList() throws IOException { if (series == null) { return Collections.emptyList(); } - List columns = series.getColumns(); - Map columnsMap = Maps.newHashMap(); - for (int i = 0; i < columns.size(); i++) { - columnsMap.put(columns.get(i), i); - } - - List taskLogs = Lists.newArrayList(); + final List taskLogs = Lists.newArrayList(); series.getValues().stream() // re-sort by self, because of the result order by time. .sorted((a, b) -> Long.compare(((Number) b.get(1)).longValue(), ((Number) a.get(1)).longValue())) .forEach(values -> { taskLogs.add(ProfileTaskLog.builder() - .id((String) values.get(columnsMap.get("id"))) - .taskId((String) values.get(columnsMap.get(ProfileTaskLogRecord.TASK_ID))) - .instanceId( - (String) values.get(columnsMap.get(ProfileTaskLogRecord.INSTANCE_ID))) - .operationTime( - (Long) values.get(columnsMap.get(ProfileTaskLogRecord.OPERATION_TIME))) + .id((String) values.get(2)) + .taskId((String) values.get(3)) + .instanceId((String) values.get(4)) + .operationTime(((Number) values.get(5)).longValue()) .operationType(ProfileTaskLogOperationType.parse( - (int) values.get(columnsMap.get(ProfileTaskLogRecord.OPERATION_TYPE)))) + ((Number) values.get(6)).intValue())) .build()); }); return taskLogs; diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskQuery.java index db2b2f2da858..43099ef99b4e 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskQuery.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskQuery.java @@ -22,13 +22,13 @@ import java.io.IOException; import java.util.List; import java.util.Objects; +import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.util.StringUtil; import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord; import org.apache.skywalking.oap.server.core.query.entity.ProfileTask; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; -import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxModelConstants; -import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.NoneStreamDAO; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants; import org.influxdb.dto.QueryResult; import org.influxdb.querybuilder.SelectQueryImpl; import org.influxdb.querybuilder.WhereQueryImpl; @@ -38,8 +38,9 @@ import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.lte; import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select; +@Slf4j public class ProfileTaskQuery implements IProfileTaskQueryDAO { - private InfluxClient client; + private final InfluxClient client; public ProfileTaskQuery(InfluxClient client) { this.client = client; @@ -52,19 +53,22 @@ public List getTaskList(final String serviceId, final Long endTimeBucket, final Integer limit) throws IOException { WhereQueryImpl query = - select("id", ProfileTaskRecord.SERVICE_ID, - ProfileTaskRecord.ENDPOINT_NAME, ProfileTaskRecord.START_TIME, - ProfileTaskRecord.CREATE_TIME, - InfluxModelConstants.DURATION, - ProfileTaskRecord.MIN_DURATION_THRESHOLD, - ProfileTaskRecord.DUMP_PERIOD, - ProfileTaskRecord.MAX_SAMPLING_COUNT + select( + InfluxConstants.ID_COLUMN, + ProfileTaskRecord.SERVICE_ID, + ProfileTaskRecord.ENDPOINT_NAME, + ProfileTaskRecord.START_TIME, + ProfileTaskRecord.CREATE_TIME, + InfluxConstants.DURATION, + ProfileTaskRecord.MIN_DURATION_THRESHOLD, + ProfileTaskRecord.DUMP_PERIOD, + ProfileTaskRecord.MAX_SAMPLING_COUNT ) .from(client.getDatabase(), ProfileTaskRecord.INDEX_NAME) .where(); if (StringUtil.isNotEmpty(serviceId)) { - query.and(eq(NoneStreamDAO.TAG_SERVICE_ID, serviceId)); + query.and(eq(InfluxConstants.TagName.SERVICE_ID, serviceId)); } if (StringUtil.isNotEmpty(endpointName)) { query.and(eq(ProfileTaskRecord.ENDPOINT_NAME, endpointName)); @@ -81,6 +85,9 @@ public List getTaskList(final String serviceId, List tasks = Lists.newArrayList(); QueryResult.Series series = client.queryForSingleSeries(query); + if (log.isDebugEnabled()) { + log.debug("SQL: {} result: {}", query.getCommand(), series); + } if (series != null) { series.getValues().forEach(values -> { tasks.add(profileTaskBuilder(values)); @@ -94,20 +101,26 @@ public ProfileTask getById(final String id) throws IOException { if (StringUtil.isEmpty(id)) { return null; } - SelectQueryImpl query = select("id", ProfileTaskRecord.SERVICE_ID, - ProfileTaskRecord.ENDPOINT_NAME, ProfileTaskRecord.START_TIME, - ProfileTaskRecord.CREATE_TIME, - InfluxModelConstants.DURATION, - ProfileTaskRecord.MIN_DURATION_THRESHOLD, - ProfileTaskRecord.DUMP_PERIOD, - ProfileTaskRecord.MAX_SAMPLING_COUNT + SelectQueryImpl query = select( + InfluxConstants.ID_COLUMN, + ProfileTaskRecord.SERVICE_ID, + ProfileTaskRecord.ENDPOINT_NAME, + ProfileTaskRecord.START_TIME, + ProfileTaskRecord.CREATE_TIME, + InfluxConstants.DURATION, + ProfileTaskRecord.MIN_DURATION_THRESHOLD, + ProfileTaskRecord.DUMP_PERIOD, + ProfileTaskRecord.MAX_SAMPLING_COUNT ) .from(client.getDatabase(), ProfileTaskRecord.INDEX_NAME) .where() - .and(eq("id", id)) + .and(eq(InfluxConstants.ID_COLUMN, id)) .limit(1); QueryResult.Series series = client.queryForSingleSeries(query); + if (log.isDebugEnabled()) { + log.debug("SQL: {} result: {}", query.getCommand(), series); + } if (Objects.nonNull(series)) { return profileTaskBuilder(series.getValues().get(0)); } diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileThreadSnapshotQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileThreadSnapshotQuery.java index 36a99371ce70..25dfc0e8d8fe 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileThreadSnapshotQuery.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileThreadSnapshotQuery.java @@ -26,6 +26,8 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Objects; +import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.util.StringUtil; import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord; @@ -33,6 +35,7 @@ import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO; import org.apache.skywalking.oap.server.library.util.BooleanUtils; import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants; import org.elasticsearch.common.Strings; import org.influxdb.dto.QueryResult; import org.influxdb.querybuilder.WhereQueryImpl; @@ -43,6 +46,7 @@ import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.lte; import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select; +@Slf4j public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDAO { private final InfluxClient client; @@ -60,7 +64,7 @@ public List queryProfiledSegments(String taskId) throws IOException final LinkedList segments = new LinkedList<>(); QueryResult.Series series = client.queryForSingleSeries(query); - if (series == null) { + if (Objects.isNull(series)) { return Collections.emptyList(); } series.getValues().forEach(values -> { @@ -72,7 +76,7 @@ public List queryProfiledSegments(String taskId) throws IOException } query = select() - .function("bottom", SegmentRecord.START_TIME, segments.size()) + .function(InfluxConstants.SORT_ASC, SegmentRecord.START_TIME, segments.size()) .column(SegmentRecord.SEGMENT_ID) .column(SegmentRecord.START_TIME) .column(SegmentRecord.ENDPOINT_NAME) @@ -130,8 +134,15 @@ public List queryRecords(String segmentId, int minS .and(gte(ProfileThreadSnapshotRecord.SEQUENCE, minSequence)) .and(lte(ProfileThreadSnapshotRecord.SEQUENCE, maxSequence)); + QueryResult.Series series = client.queryForSingleSeries(query); + if (log.isDebugEnabled()) { + log.debug("SQL: {} result: {}", query.getCommand(), series); + } + if (Objects.isNull(series)) { + return Collections.EMPTY_LIST; + } ArrayList result = new ArrayList<>(maxSequence - minSequence); - client.queryForSingleSeries(query).getValues().forEach(values -> { + series.getValues().forEach(values -> { ProfileThreadSnapshotRecord record = new ProfileThreadSnapshotRecord(); record.setTaskId((String) values.get(1)); @@ -165,7 +176,10 @@ public SegmentRecord getProfiledSegment(String segmentId) throws IOException { .where() .and(eq(SegmentRecord.SEGMENT_ID, segmentId)); List series = client.queryForSeries(query); - if (series == null || series.isEmpty()) { + if (log.isDebugEnabled()) { + log.debug("SQL: {} result set: {}", query.getCommand(), series); + } + if (Objects.isNull(series) || series.isEmpty()) { return null; } @@ -198,10 +212,6 @@ private int querySequenceWithAgg(String function, String segmentId, long start, .and(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId)) .and(gte(ProfileThreadSnapshotRecord.DUMP_TIME, start)) .and(lte(ProfileThreadSnapshotRecord.DUMP_TIME, end)); - QueryResult.Series series = client.queryForSingleSeries(query); - if (series == null) { - return -1; - } - return ((Number) series.getValues().get(0).get(1)).intValue(); + return client.getCounter(query); } } diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TopNRecordsQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TopNRecordsQuery.java index c247dd6c3d13..1dda92c58c52 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TopNRecordsQuery.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TopNRecordsQuery.java @@ -30,7 +30,7 @@ import org.apache.skywalking.oap.server.core.query.entity.TopNRecord; import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; -import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.RecordDAO; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants; import org.influxdb.dto.QueryResult; import org.influxdb.querybuilder.WhereQueryImpl; @@ -50,11 +50,11 @@ public TopNRecordsQuery(InfluxClient client) { @Override public List getTopNRecords(long startSecondTB, long endSecondTB, String metricName, String serviceId, int topN, Order order) throws IOException { - String function = "bottom"; + String function = InfluxConstants.SORT_ASC; // Have to re-sort here. Because the function, top()/bottom(), get the result ordered by the `time`. Comparator comparator = Comparator.comparingLong(TopNRecord::getLatency); if (order.equals(Order.DES)) { - function = "top"; + function = InfluxConstants.SORT_DES; comparator = (a, b) -> Long.compare(b.getLatency(), a.getLatency()); } @@ -68,7 +68,7 @@ public List getTopNRecords(long startSecondTB, long endSecondTB, Str .and(lte(TopN.TIME_BUCKET, endSecondTB)); if (StringUtil.isNotEmpty(serviceId)) { - query.and(eq(RecordDAO.TAG_SERVICE_ID, serviceId)); + query.and(eq(InfluxConstants.TagName.SERVICE_ID, serviceId)); } QueryResult.Series series = client.queryForSingleSeries(query); diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TopologyQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TopologyQuery.java index 804a78510481..67b6f4a7b735 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TopologyQuery.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TopologyQuery.java @@ -29,14 +29,17 @@ import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationServerSideMetrics; import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics; import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics; -import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.query.entity.Call; import org.apache.skywalking.oap.server.core.source.DetectPoint; import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants; +import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; +import org.influxdb.querybuilder.SelectQueryImpl; +import org.influxdb.querybuilder.SelectSubQueryImpl; import org.influxdb.querybuilder.WhereNested; -import org.influxdb.querybuilder.WhereQueryImpl; +import org.influxdb.querybuilder.WhereSubQueryImpl; import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq; import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte; @@ -56,7 +59,7 @@ public List loadServiceRelationsDetectedAtServerSide(DownSampli long endTB, List serviceIds) throws IOException { String measurement = ServiceRelationServerSideMetrics.INDEX_NAME; - WhereQueryImpl query = buildServiceCallsQuery( + WhereSubQueryImpl, SelectQueryImpl> subQuery = buildServiceCallsQuery( measurement, startTB, endTB, @@ -64,7 +67,8 @@ public List loadServiceRelationsDetectedAtServerSide(DownSampli ServiceRelationServerSideMetrics.DEST_SERVICE_ID, serviceIds ); - return buildServiceCalls(query, DetectPoint.SERVER); + + return buildServiceCalls(buildQuery(subQuery), DetectPoint.SERVER); } @Override @@ -72,7 +76,7 @@ public List loadServiceRelationDetectedAtClientSide(DownSamplin long endTB, List serviceIds) throws IOException { String measurement = ServiceRelationClientSideMetrics.INDEX_NAME; - WhereQueryImpl query = buildServiceCallsQuery( + WhereSubQueryImpl, SelectQueryImpl> subQuery = buildServiceCallsQuery( measurement, startTB, endTB, @@ -80,14 +84,14 @@ public List loadServiceRelationDetectedAtClientSide(DownSamplin ServiceRelationServerSideMetrics.DEST_SERVICE_ID, serviceIds ); - return buildServiceCalls(query, DetectPoint.CLIENT); + return buildServiceCalls(buildQuery(subQuery), DetectPoint.CLIENT); } @Override public List loadServiceRelationsDetectedAtServerSide(DownSampling downsampling, long startTB, long endTB) throws IOException { String measurement = ServiceRelationServerSideMetrics.INDEX_NAME; - WhereQueryImpl query = buildServiceCallsQuery( + WhereSubQueryImpl, SelectQueryImpl> subQuery = buildServiceCallsQuery( measurement, startTB, endTB, @@ -95,14 +99,14 @@ public List loadServiceRelationsDetectedAtServerSide(DownSampli ServiceRelationServerSideMetrics.DEST_SERVICE_ID, new ArrayList<>(0) ); - return buildServiceCalls(query, DetectPoint.SERVER); + return buildServiceCalls(buildQuery(subQuery), DetectPoint.SERVER); } @Override public List loadServiceRelationDetectedAtClientSide(DownSampling downsampling, long startTB, long endTB) throws IOException { String tableName = ServiceRelationClientSideMetrics.INDEX_NAME; - WhereQueryImpl query = buildServiceCallsQuery( + WhereSubQueryImpl, SelectQueryImpl> subQuery = buildServiceCallsQuery( tableName, startTB, endTB, @@ -110,7 +114,7 @@ public List loadServiceRelationDetectedAtClientSide(DownSamplin ServiceRelationServerSideMetrics.DEST_SERVICE_ID, new ArrayList<>(0) ); - return buildServiceCalls(query, DetectPoint.CLIENT); + return buildServiceCalls(buildQuery(subQuery), DetectPoint.CLIENT); } @Override @@ -120,14 +124,15 @@ public List loadInstanceRelationDetectedAtServerSide(String cli long startTB, long endTB) throws IOException { String measurement = ServiceInstanceRelationServerSideMetrics.INDEX_NAME; - WhereQueryImpl query = buildServiceInstanceCallsQuery(measurement, - startTB, - endTB, - ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID, - ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID, - clientServiceId, serverServiceId + WhereSubQueryImpl, SelectQueryImpl> subQuery = buildServiceInstanceCallsQuery( + measurement, + startTB, + endTB, + ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID, + ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID, + clientServiceId, serverServiceId ); - return buildInstanceCalls(query, DetectPoint.SERVER); + return buildInstanceCalls(buildQuery(subQuery), DetectPoint.SERVER); } @Override @@ -137,14 +142,15 @@ public List loadInstanceRelationDetectedAtClientSide(String cli long startTB, long endTB) throws IOException { String measurement = ServiceInstanceRelationClientSideMetrics.INDEX_NAME; - WhereQueryImpl query = buildServiceInstanceCallsQuery(measurement, - startTB, - endTB, - ServiceInstanceRelationClientSideMetrics.SOURCE_SERVICE_ID, - ServiceInstanceRelationClientSideMetrics.DEST_SERVICE_ID, - clientServiceId, serverServiceId + WhereSubQueryImpl, SelectQueryImpl> subQuery = buildServiceInstanceCallsQuery( + measurement, + startTB, + endTB, + ServiceInstanceRelationClientSideMetrics.SOURCE_SERVICE_ID, + ServiceInstanceRelationClientSideMetrics.DEST_SERVICE_ID, + clientServiceId, serverServiceId ); - return buildInstanceCalls(query, DetectPoint.CLIENT); + return buildInstanceCalls(buildQuery(subQuery), DetectPoint.CLIENT); } @Override @@ -154,7 +160,7 @@ public List loadEndpointRelation(DownSampling downsampling, String destEndpointId) throws IOException { String measurement = EndpointRelationServerSideMetrics.INDEX_NAME; - WhereQueryImpl query = buildServiceCallsQuery( + WhereSubQueryImpl, SelectQueryImpl> subQuery = buildServiceCallsQuery( measurement, startTB, endTB, @@ -162,9 +168,9 @@ public List loadEndpointRelation(DownSampling downsampling, EndpointRelationServerSideMetrics.DEST_ENDPOINT, Collections.emptyList() ); - query.and(eq(EndpointRelationServerSideMetrics.DEST_ENDPOINT, destEndpointId)); + subQuery.and(eq(EndpointRelationServerSideMetrics.DEST_ENDPOINT, destEndpointId)); - WhereQueryImpl query2 = buildServiceCallsQuery( + WhereSubQueryImpl, SelectQueryImpl> subQuery2 = buildServiceCallsQuery( measurement, startTB, endTB, @@ -172,61 +178,73 @@ public List loadEndpointRelation(DownSampling downsampling, EndpointRelationServerSideMetrics.DEST_ENDPOINT, Collections.emptyList() ); - query2.and(eq(EndpointRelationServerSideMetrics.SOURCE_ENDPOINT, destEndpointId)); + subQuery2.and(eq(EndpointRelationServerSideMetrics.SOURCE_ENDPOINT, destEndpointId)); - List calls = buildEndpointCalls(query, DetectPoint.SERVER); - calls.addAll(buildEndpointCalls(query2, DetectPoint.CLIENT)); + List calls = buildEndpointCalls(buildQuery(subQuery), DetectPoint.SERVER); + calls.addAll(buildEndpointCalls(buildQuery(subQuery), DetectPoint.CLIENT)); return calls; } - private WhereQueryImpl buildServiceCallsQuery(String measurement, long startTB, long endTB, String sourceCName, - String destCName, List serviceIds) { - WhereQueryImpl query = select() - .function("distinct", Metrics.ENTITY_ID, ServiceRelationServerSideMetrics.COMPONENT_ID) - .from(client.getDatabase(), measurement) + private WhereSubQueryImpl, SelectQueryImpl> buildServiceCallsQuery( + String measurement, + long startTB, + long endTB, + String sourceCName, + String destCName, + List serviceIds) { + WhereSubQueryImpl, SelectQueryImpl> subQuery = select() + .fromSubQuery(client.getDatabase()) + .function("distinct", ServiceInstanceRelationServerSideMetrics.COMPONENT_ID) + .as(ServiceInstanceRelationClientSideMetrics.COMPONENT_ID) + .from(measurement) .where() .and(gte(InfluxClient.TIME, InfluxClient.timeInterval(startTB))) .and(lte(InfluxClient.TIME, InfluxClient.timeInterval(endTB))); if (!serviceIds.isEmpty()) { - WhereNested whereNested = query.andNested(); + WhereNested whereNested = subQuery.andNested(); for (String id : serviceIds) { whereNested.or(eq(sourceCName, id)) .or(eq(destCName, id)); } whereNested.close(); } - return query; + return subQuery; } - private WhereQueryImpl buildServiceInstanceCallsQuery(String measurement, - long startTB, - long endTB, - String sourceCName, - String destCName, - String sourceServiceId, - String destServiceId) { - WhereQueryImpl query = select() - .function("distinct", Metrics.ENTITY_ID, ServiceInstanceRelationServerSideMetrics.COMPONENT_ID) - .from(client.getDatabase(), measurement) + private WhereSubQueryImpl, SelectQueryImpl> buildServiceInstanceCallsQuery( + String measurement, + long startTB, + long endTB, + String sourceCName, + String destCName, + String sourceServiceId, + String destServiceId) { + + WhereSubQueryImpl, SelectQueryImpl> subQuery = select() + .fromSubQuery(client.getDatabase()) + .function("distinct", ServiceInstanceRelationServerSideMetrics.COMPONENT_ID) + .as(ServiceInstanceRelationClientSideMetrics.COMPONENT_ID) + .from(measurement) .where() .and(gte(InfluxClient.TIME, InfluxClient.timeInterval(startTB))) .and(lte(InfluxClient.TIME, InfluxClient.timeInterval(endTB))); StringBuilder builder = new StringBuilder("(("); - builder.append(sourceCName).append("=").append(sourceServiceId) - .append(" and ") - .append(destCName).append("=").append(destServiceId) - .append(") or (") - .append(sourceCName).append("=").append(destServiceId) - .append(") and (") - .append(destCName).append("=").append(sourceServiceId) - .append("))"); - query.where(builder.toString()); - return query; + builder.append(sourceCName).append("='").append(sourceServiceId) + .append("' and ") + .append(destCName).append("='").append(destServiceId) + .append("') or (") + .append(sourceCName).append("='").append(destServiceId) + .append("') and (") + .append(destCName).append("='").append(sourceServiceId) + .append("'))"); + subQuery.where(builder.toString()); + subQuery.groupBy(InfluxConstants.TagName.ENTITY_ID); + return subQuery; } - private List buildServiceCalls(WhereQueryImpl query, + private List buildServiceCalls(Query query, DetectPoint detectPoint) throws IOException { QueryResult.Series series = client.queryForSingleSeries(query); @@ -240,7 +258,7 @@ private List buildServiceCalls(WhereQueryImpl query, List calls = new ArrayList<>(); series.getValues().forEach(values -> { Call.CallDetail call = new Call.CallDetail(); - String entityId = (String) values.get(1); + String entityId = String.valueOf(values.get(1)); int componentId = (int) values.get(2); call.buildFromServiceRelation(entityId, componentId, detectPoint); calls.add(call); @@ -248,7 +266,15 @@ private List buildServiceCalls(WhereQueryImpl query, return calls; } - private List buildInstanceCalls(WhereQueryImpl query, + private Query buildQuery(WhereSubQueryImpl, SelectQueryImpl> subQuery) { + SelectQueryImpl query = select().column(InfluxConstants.TagName.ENTITY_ID) + .column(ServiceInstanceRelationClientSideMetrics.COMPONENT_ID) + .from(client.getDatabase()); + query.setSubQuery(subQuery.groupBy(InfluxConstants.TagName.ENTITY_ID)); + return query; + } + + private List buildInstanceCalls(Query query, DetectPoint detectPoint) throws IOException { QueryResult.Series series = client.queryForSingleSeries(query); @@ -270,7 +296,7 @@ private List buildInstanceCalls(WhereQueryImpl query, return calls; } - private List buildEndpointCalls(WhereQueryImpl query, + private List buildEndpointCalls(Query query, DetectPoint detectPoint) throws IOException { QueryResult.Series series = client.queryForSingleSeries(query); diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TraceQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TraceQuery.java index eac9706d5207..9e577fa2ef5d 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TraceQuery.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TraceQuery.java @@ -34,7 +34,7 @@ import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO; import org.apache.skywalking.oap.server.library.util.BooleanUtils; import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; -import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.RecordDAO; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants; import org.elasticsearch.common.Strings; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; @@ -78,7 +78,7 @@ public TraceBrief queryBasicTraces(long startSecondTB, } WhereQueryImpl recallQuery = select() - .function("top", orderBy, limit + from) + .function(InfluxConstants.SORT_DES, orderBy, limit + from) .column(SegmentRecord.SEGMENT_ID) .column(SegmentRecord.START_TIME) .column(SegmentRecord.ENDPOINT_NAME) @@ -102,7 +102,7 @@ public TraceBrief queryBasicTraces(long startSecondTB, recallQuery.and(contains(SegmentRecord.ENDPOINT_NAME, endpointName.replaceAll("/", "\\\\/"))); } if (StringUtil.isNotEmpty(serviceId)) { - recallQuery.and(eq(RecordDAO.TAG_SERVICE_ID, String.valueOf(serviceId))); + recallQuery.and(eq(InfluxConstants.TagName.SERVICE_ID, serviceId)); } if (StringUtil.isNotEmpty(serviceInstanceId)) { recallQuery.and(eq(SegmentRecord.SERVICE_INSTANCE_ID, serviceInstanceId)); @@ -201,9 +201,9 @@ public List queryByTraceId(String traceId) throws IOException { segmentRecord.setEndTime((long) values.get(7)); segmentRecord.setLatency((int) values.get(8)); segmentRecord.setIsError((int) values.get(9)); - segmentRecord.setVersion((int) values.get(10)); + segmentRecord.setVersion((int) values.get(11)); - String base64 = (String) values.get(9); + String base64 = (String) values.get(10); if (!Strings.isNullOrEmpty(base64)) { segmentRecord.setDataBinary(Base64.getDecoder().decode(base64)); } diff --git a/test/e2e/e2e-test/docker/profile/docker-compose.influxdb.yml b/test/e2e/e2e-test/docker/profile/docker-compose.influxdb.yml index 4f0dc477f5fc..377ea791e024 100644 --- a/test/e2e/e2e-test/docker/profile/docker-compose.influxdb.yml +++ b/test/e2e/e2e-test/docker/profile/docker-compose.influxdb.yml @@ -22,8 +22,6 @@ services: - 8086 networks: - e2e - depends_on: - - h2db healthcheck: test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/8086"] interval: 5s @@ -37,8 +35,6 @@ services: environment: SW_STORAGE: influxdb depends_on: - h2db: - condition: service_healthy influxdb: condition: service_healthy diff --git a/test/plugin/run.sh b/test/plugin/run.sh index 56b6c628478d..d81ecfffc95d 100755 --- a/test/plugin/run.sh +++ b/test/plugin/run.sh @@ -187,7 +187,6 @@ fi supported_versions=`grep -v -E "^$|^#" ${supported_version_file}` for version in ${supported_versions} do - waitForAvailable testcase_name="${scenario_name}-${version}" # testcase working directory, there are logs, data and packages. @@ -218,8 +217,10 @@ do [[ $? -ne 0 ]] && exitWithMessage "${testcase_name}, generate script failure!" echo "start container of testcase.name=${testcase_name}" - bash ${case_work_base}/scenario.sh ${task_state_house} 1>${case_work_logs_dir}/${testcase_name}.log & + bash ${case_work_base}/scenario.sh ${task_state_house} 1>${case_work_logs_dir}/${testcase_name}.log sleep 3 + waitForAvailable + rm -rf ${case_work_base} done echo -e "\033[33m${scenario_name} has already sumbitted\033[0m" diff --git a/test/plugin/scenarios/mysql-scenario/support-version.list b/test/plugin/scenarios/mysql-scenario/support-version.list index 4768268ed6e1..bf4ddf6ff3c7 100644 --- a/test/plugin/scenarios/mysql-scenario/support-version.list +++ b/test/plugin/scenarios/mysql-scenario/support-version.list @@ -36,13 +36,4 @@ 5.1.26 5.1.24 5.1.22 -5.1.20 -5.1.18 -5.1.16 -5.1.14 -5.1.12 -5.1.10 -5.1.8 -5.1.6 -5.1.4 -5.1.2 \ No newline at end of file +5.1.20 \ No newline at end of file diff --git a/test/plugin/scenarios/solrj-7.x-scenario/configuration.yml b/test/plugin/scenarios/solrj-7.x-scenario/configuration.yml index aea4ef8281d2..be366df04c3b 100644 --- a/test/plugin/scenarios/solrj-7.x-scenario/configuration.yml +++ b/test/plugin/scenarios/solrj-7.x-scenario/configuration.yml @@ -25,6 +25,7 @@ dependencies: solr-server: image: solr:${CASE_SERVER_IMAGE_VERSION} hostname: solr-server + removeOnExit: true entrypoint: - docker-entrypoint.sh - solr-precreate