Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
0811091
revert @Column
dmsolr Apr 14, 2020
edc80a1
Merge branch 'master' into v8/influxdb
dmsolr Apr 14, 2020
160ab95
fix checkstyle
dmsolr Apr 14, 2020
2f8a231
add influxdb to e2e test
dmsolr Apr 15, 2020
799c83c
Merge branch 'master' into v8/influxdb
wu-sheng Apr 15, 2020
dde4f2d
fix traffic
dmsolr Apr 15, 2020
8071831
Merge branch 'v8/influxdb' of https://github.com/dmsolr/skywalking in…
dmsolr Apr 15, 2020
50ba299
checkstyle
dmsolr Apr 16, 2020
85edf00
fix e2e configuration
dmsolr Apr 16, 2020
d582213
Merge branch 'master' into v8/influxdb
dmsolr Apr 16, 2020
b7f2f2d
fix e2e configuration
dmsolr Apr 16, 2020
61f813f
fix query issue
dmsolr Apr 17, 2020
f4b3b51
update
dmsolr Apr 17, 2020
ef1ce7d
checkstyleg
dmsolr Apr 17, 2020
5803272
polish
dmsolr Apr 17, 2020
8765d92
checkstyle
dmsolr Apr 17, 2020
5b99769
remove TAG_SERVER_ID
dmsolr Apr 17, 2020
f3f50b1
remove TAG_SERVER_ID
dmsolr Apr 17, 2020
17dff41
Revert unnecessary changes
kezhenxu94 Apr 18, 2020
060ab3d
remove image on Test finished
dmsolr Apr 18, 2020
316fcbc
polish
dmsolr Apr 18, 2020
c77f8e5
remove workspace of testscase
dmsolr Apr 18, 2020
813cfde
update logs when failed
dmsolr Apr 18, 2020
0e85466
move mysql to test.3.yaml
dmsolr Apr 18, 2020
6ee0f4c
test mysql-scenario
dmsolr Apr 18, 2020
88e245b
revert
dmsolr Apr 18, 2020
a2ea524
delete some versions of mysql temporarily
dmsolr Apr 18, 2020
4b2342f
Merge branch 'master' into v8/influxdb
kezhenxu94 Apr 19, 2020
365b7bb
fix encode
dmsolr Apr 19, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/e2e.cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
strategy:
matrix:
coordinator: ['zk']
storage: ['mysql', 'es6', 'es7'] #TODO: 'influxdb'
storage: ['mysql', 'es6', 'es7', 'influxdb']
env:
SW_COORDINATOR: ${{ matrix.coordinator }}
SW_STORAGE: ${{ matrix.storage }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/e2e.profiling.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
timeout-minutes: 90
strategy:
matrix:
storage: ['h2', 'mysql', 'es6', 'es7'] #TODO: 'influxdb'
storage: ['h2', 'mysql', 'es6', 'es7', 'influxdb']
env:
SW_STORAGE: ${{ matrix.storage }}
steps:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/e2e.storages.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
timeout-minutes: 90
strategy:
matrix:
storage: ['mysql', 'es6', 'es7'] #TODO: 'influxdb'
storage: ['mysql', 'es6', 'es7', 'influxdb']
env:
SW_STORAGE: ${{ matrix.storage }}
steps:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/e2e.ttl.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
timeout-minutes: 90
strategy:
matrix:
storage: ['es6', 'es7'] #TODO: 'influxdb'
storage: ['es6', 'es7', 'influxdb']
env:
SW_STORAGE: ${{ matrix.storage }}
steps:
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/plugins-test.1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ jobs:
- { name: 'kotlin-coroutine-scenario', title: 'Kotlin Coroutine 1.0.1-1.3.3 (4)' }
- { name: 'lettuce-scenario', title: 'Lettuce 5.x (17)' }
- { name: 'mongodb-3.x-scenario', title: 'Mongodb 3.4.0-3.11.1 (22)' }
- { name: 'mysql-scenario', title: 'MySQL 5.1.2-8.0.15 (53)' }
- { name: 'netty-socketio-scenario', title: 'Netty-SocketIO 1.x (4)' }
steps:
- uses: actions/checkout@v2
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/plugins-test.3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ jobs:
fail-fast: true
matrix:
case:
- { name: 'mysql-scenario', title: 'MySQL 5.1.2-8.0.15 (30)' }
- { name: 'undertow-scenario', title: 'Undertow 1.3.0-2.0.27 (23)' }
- { name: 'webflux-scenario', title: 'Spring-WebFlux 2.x (7)' }
- { name: 'zookeeper-scenario', title: 'Zookeeper 3.4.x (14)' }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import okhttp3.OkHttpClient;
Expand Down Expand Up @@ -73,6 +74,7 @@ public void connect() {
InfluxDB.ResponseFormat.MSGPACK
);
influx.query(new Query("CREATE DATABASE " + database));
influx.enableGzip();

influx.enableBatch(config.getActions(), config.getDuration(), TimeUnit.MILLISECONDS);
influx.setDatabase(database);
Expand All @@ -99,7 +101,7 @@ public List<QueryResult.Result> query(Query query) throws IOException {
}

try {
QueryResult result = getInflux().query(query);
QueryResult result = getInflux().query(new Query(query.getCommand()));
if (result.hasError()) {
throw new IOException(result.getError());
}
Expand Down Expand Up @@ -136,6 +138,22 @@ public QueryResult.Series queryForSingleSeries(Query query) throws IOException {
return series.get(0);
}

/**
* Execute a query against InfluxDB with a `select count(*)` statement and return the count only.
*
* @throws IOException if there is an error on the InfluxDB server or communication error
*/
public int getCounter(Query query) throws IOException {
QueryResult.Series series = queryForSingleSeries(query);
if (log.isDebugEnabled()) {
log.debug("SQL: {} result: {}", query.getCommand(), series);
}
if (Objects.isNull(series)) {
return 0;
}
return ((Number) series.getValues().get(0).get(1)).intValue();
}

/**
* Data management, to drop a time-series by measurement and time-series name specified. If an exception isn't
* thrown, it means execution success. Notice, drop series don't support to drop series by range
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,31 @@

package org.apache.skywalking.oap.server.storage.plugin.influxdb;

public interface InfluxModelConstants {
/**
* Override column because the 'duration' is the identifier of InfluxDB.
*/
String DURATION = "dur";
public interface InfluxConstants {
String ID_COLUMN = "id";

String NAME = "\"name\"";

String ALL_FIELDS = "*::field";

String SORT_DES = "top";

String SORT_ASC = "bottom";

String DURATION = "\"" + "duration" + "\"";

interface TagName {

String ID_COLUMN = "_id";

String NAME = "_name";

String ENTITY_ID = "_entity_id";

String TIME_BUCKET = "_time_bucket";

String NODE_TYPE = "_node_type";

String SERVICE_ID = "_service_id";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
Expand All @@ -46,10 +47,10 @@
import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.InfluxStorageDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.AggregationQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.AlarmQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.InfluxMetadataQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.InfluxNetworkAddressAlias;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.LogQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.MetadataQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.MetricsQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.NetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.ProfileTaskLogQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.ProfileTaskQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.ProfileThreadSnapshotQuery;
Expand All @@ -60,7 +61,7 @@
@Slf4j
public class InfluxStorageProvider extends ModuleProvider {
private InfluxStorageConfig config;
private InfluxClient influxClient;
private InfluxClient client;

public InfluxStorageProvider() {
config = new InfluxStorageConfig();
Expand All @@ -83,35 +84,42 @@ public ModuleConfig createConfigBeanIfAbsent() {

@Override
public void prepare() throws ServiceNotProvidedException {
influxClient = new InfluxClient(config);
client = new InfluxClient(config);

this.registerServiceImplementation(IBatchDAO.class, new BatchDAO(influxClient));
this.registerServiceImplementation(StorageDAO.class, new InfluxStorageDAO(influxClient));
this.registerServiceImplementation(IBatchDAO.class, new BatchDAO(client));
this.registerServiceImplementation(StorageDAO.class, new InfluxStorageDAO(client));

this.registerServiceImplementation(INetworkAddressAliasDAO.class, new InfluxNetworkAddressAlias(influxClient));
this.registerServiceImplementation(IMetadataQueryDAO.class, new InfluxMetadataQueryDAO(influxClient));
this.registerServiceImplementation(INetworkAddressAliasDAO.class, new NetworkAddressAliasDAO(client));
this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQuery(client));

this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQuery(influxClient));
this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQuery(influxClient));
this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQuery(influxClient));
this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQuery(influxClient));
this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQuery(influxClient));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQuery(influxClient));
this.registerServiceImplementation(ILogQueryDAO.class, new LogQuery(influxClient));
this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQuery(client));
this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQuery(client));
this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQuery(client));
this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQuery(client));
this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQuery(client));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQuery(client));
this.registerServiceImplementation(ILogQueryDAO.class, new LogQuery(client));

this.registerServiceImplementation(IProfileTaskQueryDAO.class, new ProfileTaskQuery(influxClient));
this.registerServiceImplementation(IProfileTaskQueryDAO.class, new ProfileTaskQuery(client));
this.registerServiceImplementation(
IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQuery(influxClient));
IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQuery(client));
this.registerServiceImplementation(
IProfileTaskLogQueryDAO.class, new ProfileTaskLogQuery(influxClient, config.getFetchTaskLogMaxSize()));
IProfileTaskLogQueryDAO.class, new ProfileTaskLogQuery(client, config.getFetchTaskLogMaxSize()));

this.registerServiceImplementation(
IHistoryDeleteDAO.class, new HistoryDeleteDAO(influxClient));
IHistoryDeleteDAO.class, new HistoryDeleteDAO(client));
}

@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
influxClient.connect();
client.connect();

InfluxTableInstaller installer = new InfluxTableInstaller(getManager());
try {
installer.install(client);
} catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,28 @@
*
*/

package org.apache.skywalking.oap.server.storage.plugin.influxdb.query;
package org.apache.skywalking.oap.server.storage.plugin.influxdb;

import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.module.ModuleManager;

public class InfluxNetworkAddressAlias implements INetworkAddressAliasDAO {
private InfluxClient client;
public class InfluxTableInstaller extends ModelInstaller {

public InfluxNetworkAddressAlias(final InfluxClient client) {
this.client = client;
public InfluxTableInstaller(ModuleManager moduleManager) {
super(moduleManager);
}

@Override
public List<NetworkAddressAlias> loadLastUpdate(final long timeBucket) {
return null;
protected boolean isExists(final Client client, final Model model) throws StorageException {
TableMetaInfo.addModel(model);
return true;
}

@Override
protected void createTable(final Client client, final Model model) throws StorageException {
// Automatically create table
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,80 @@

package org.apache.skywalking.oap.server.storage.plugin.influxdb;

import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.model.ColumnName;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;

@Getter
@Builder
@AllArgsConstructor
public class TableMetaInfo {
private static Map<String, Model> TABLES = new HashMap<>();
private static final Map<String, TableMetaInfo> TABLES = new HashMap<>();

private Map<String, String> storageAndColumnMap;
private Map<String, String> storageAndTagMap;
private Model model;

public static void addModel(Model model) {
TABLES.put(model.getName(), model);
final List<ModelColumn> columns = model.getColumns();
final Map<String, String> storageAndTagMap = Maps.newHashMap();
final Map<String, String> storageAndColumnMap = Maps.newHashMap();
columns.forEach(column -> {
ColumnName columnName = column.getColumnName();
storageAndColumnMap.put(columnName.getStorageName(), columnName.getName());
});

if (model.getName().endsWith("_traffic")) {
// instance_traffic name, service_id
// endpoint_traffic name, service_id
storageAndTagMap.put(InstanceTraffic.NAME, InfluxConstants.TagName.NAME);
if (InstanceTraffic.INDEX_NAME.equals(model.getName())
|| EndpointTraffic.INDEX_NAME.equals(model.getName())) {
storageAndTagMap.put(EndpointTraffic.SERVICE_ID, InfluxConstants.TagName.SERVICE_ID);
} else {
// service_traffic name, node_type
storageAndTagMap.put(ServiceTraffic.NODE_TYPE, InfluxConstants.TagName.NODE_TYPE);
}
} else {

// Specifies ENTITY_ID, TIME_BUCKET, NODE_TYPE, SERVICE_ID as tag
if (storageAndColumnMap.containsKey(Metrics.ENTITY_ID)) {
storageAndTagMap.put(Metrics.ENTITY_ID, InfluxConstants.TagName.ENTITY_ID);
}
if (storageAndColumnMap.containsKey(Record.TIME_BUCKET)) {
storageAndTagMap.put(Record.TIME_BUCKET, InfluxConstants.TagName.TIME_BUCKET);
}
if (storageAndColumnMap.containsKey(ServiceTraffic.NODE_TYPE)) {
storageAndTagMap.put(ServiceTraffic.NODE_TYPE, InfluxConstants.TagName.NODE_TYPE);
}
if (storageAndColumnMap.containsKey(SegmentRecord.SERVICE_ID)) {
storageAndTagMap.put(SegmentRecord.SERVICE_ID, InfluxConstants.TagName.SERVICE_ID);
}
}

TableMetaInfo info = TableMetaInfo.builder()
.model(model)
.storageAndTagMap(storageAndTagMap)
.storageAndColumnMap(storageAndColumnMap)
.build();
TABLES.put(model.getName(), info);
}

public static Model get(String moduleName) {
public static TableMetaInfo get(String moduleName) {
return TABLES.get(moduleName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,20 @@
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.influxdb.dto.Point;

/**
* InfluxDB Point wrapper.
*/
public class InfluxInsertRequest implements InsertRequest, UpdateRequest {
public static final String ID = "id";

private Point.Builder builder;
private Map<String, Object> fields = Maps.newHashMap();

Expand All @@ -57,9 +54,8 @@ public InfluxInsertRequest(Model model, StorageData storageData, StorageBuilder
}
}
builder = Point.measurement(model.getName())
.addField(ID, storageData.id())
.fields(fields)
.tag(InfluxClient.TAG_TIME_BUCKET, String.valueOf(fields.get(Metrics.TIME_BUCKET)));
.addField(InfluxConstants.ID_COLUMN, storageData.id())
.fields(fields);
}

public InfluxInsertRequest time(long time, TimeUnit unit) {
Expand All @@ -68,9 +64,7 @@ public InfluxInsertRequest time(long time, TimeUnit unit) {
}

public InfluxInsertRequest addFieldAsTag(String fieldName, String tagName) {
if (fields.containsKey(fieldName)) {
builder.tag(tagName, String.valueOf(fields.get(fieldName)));
}
builder.tag(tagName, String.valueOf(fields.get(fieldName)));
return this;
}

Expand Down
Loading