Skip to content
This repository was archived by the owner on Aug 20, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
a7c7dc2
Casey Stella - elasticsearch rest client migration base work
cestella Oct 9, 2018
10410ea
Update shade plugin version
mmiklavc Oct 9, 2018
a33a168
Fix es update dao test
mmiklavc Oct 9, 2018
52c3c96
Merge with master. Fix es search integration tests
mmiklavc Oct 9, 2018
4742832
Merge branch 'master' into es-rest-client
mmiklavc Oct 11, 2018
4380996
Get shade plugin working with the new ES client and the ClassIndexTra…
mmiklavc Oct 11, 2018
af03f6f
Introduce config classes for managing ES client configuration. Transl…
mmiklavc Oct 19, 2018
1c8eac2
Resolve merge conflicts with master
mmiklavc Oct 23, 2018
1a47ded
Remove extra deps in metron-elasticsearch around log4j.
mmiklavc Oct 23, 2018
54870d6
Fixes for dep version issues.
mmiklavc Oct 24, 2018
554de87
METRON-1845 Correct Test Data Load in Elasticsearch Integration Tests
nickwallen Oct 25, 2018
3b95d1e
Removed dead function
nickwallen Oct 25, 2018
7d9ee25
Part way through Solr changes that I am going to roll-back
nickwallen Oct 26, 2018
e699386
Revert "Part way through Solr changes that I am going to roll-back"
nickwallen Oct 26, 2018
dd024b8
Merge branch 'master' into es-rest-client
mmiklavc Oct 26, 2018
4a7a837
Resolved integration test differences with Solr
nickwallen Oct 26, 2018
ebb7ef5
Start addressing pr feedback.
mmiklavc Oct 26, 2018
3581117
METRON-1849 Added integration test for UpdateDao.update() and UpdateD…
nickwallen Oct 29, 2018
b7ac957
Addressing review feedback
mmiklavc Oct 30, 2018
c0faab2
METRON-1849 Progress on shared write logic
nickwallen Oct 31, 2018
021810a
METRON-1849 Progress on tests, still integration tests to fix-up
nickwallen Oct 31, 2018
0564970
METRON-1845 Added license header
nickwallen Nov 1, 2018
f1c8ce4
Most tests working except for a few
nickwallen Nov 1, 2018
3e5c121
Refactor clientfactory. Updates to metron-common, metron-elasticsearc…
mmiklavc Nov 2, 2018
d4b02ce
Fixed all tests
nickwallen Nov 2, 2018
fb5610b
Merge remote-tracking branch 'mike/es-rest-client' into METRON-1849
nickwallen Nov 2, 2018
c3b2836
Fixed how client is created
nickwallen Nov 2, 2018
4539f43
Improved docs
nickwallen Nov 2, 2018
32467b3
Small cleanup
nickwallen Nov 5, 2018
55afca4
Removed the IndexedDocument class to simplify things
nickwallen Nov 5, 2018
b71d501
Merge remote-tracking branch 'apache/master' into METRON-1849
nickwallen Nov 13, 2018
118fd2a
Merge remote-tracking branch 'apache/master' into METRON-1849
nickwallen Nov 14, 2018
bcfec32
Merge remote-tracking branch 'apache/master' into METRON-1849
nickwallen Nov 15, 2018
21089a0
Merge remote-tracking branch 'apache/master' into METRON-1849
nickwallen Nov 16, 2018
5b6c6b0
ElasticsearchBulkDocumentWriter was not flushing the documents that w…
nickwallen Nov 8, 2018
6a506df
Removed use of Java streams
nickwallen Nov 27, 2018
684b815
Merge remote-tracking branch 'apache/master' into METRON-1849
nickwallen Nov 29, 2018
84691e3
Clean-up imports
nickwallen Nov 29, 2018
414b5dc
No need to change this test. Missed in merge with master
nickwallen Nov 29, 2018
7b4f82f
Removing an unnecessary use of Java streams
nickwallen Nov 29, 2018
bfd5138
Rename test method to be more descriptive
nickwallen Nov 30, 2018
213dc0f
ElasticsearchWriter should handle message timestamp expressed as a St…
nickwallen Nov 30, 2018
260ccc3
Added test for missing GUID
nickwallen Nov 30, 2018
a30b750
Restore original Document class from master
nickwallen Dec 5, 2018
1c1e470
Using ConversionUtils for String conversion
nickwallen Dec 5, 2018
da65a45
The BulkDocumentWriter now returns a list of write results, rather th…
nickwallen Dec 5, 2018
a474022
Merge remote-tracking branch 'apache/master' into METRON-1849
nickwallen Dec 8, 2018
57c1479
Fixed missing generic types per @jleet feedback
nickwallen Dec 11, 2018
316a7a6
Fixed javadoc per @jleet
nickwallen Dec 11, 2018
c323271
Simplifying iteration over the bulk item responses per @jleet
nickwallen Dec 11, 2018
30bc70a
Was able to remove the BulkDocumentWriterStub and use a mock instead.…
nickwallen Dec 11, 2018
2dadcd0
Fixed errant comment
nickwallen Dec 11, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.elasticsearch.bulk;

import org.apache.metron.indexing.dao.update.Document;

/**
* Writes documents to an index in bulk.
*
* @param <D> The type of document to write.
*/
public interface BulkDocumentWriter<D extends Document> {

/**
* Add a document to the batch.
* @param document The document to write.
* @param index The name of the index to write to.
*/
void addDocument(D document, String index);

/**
* @return The number of documents waiting to be written.
*/
int size();

/**
* Write all documents in the batch.
*/
BulkDocumentWriterResults<D> write();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.elasticsearch.bulk;

import org.apache.metron.indexing.dao.update.Document;

import java.util.ArrayList;
import java.util.List;

/**
* The result of writing documents in bulk using a {@link BulkDocumentWriter}.
* @param <D> The type of documents to write.
*/
public class BulkDocumentWriterResults<D extends Document> {

private List<WriteSuccess<D>> successes;
private List<WriteFailure<D>> failures;

public BulkDocumentWriterResults() {
this.successes = new ArrayList<>();
this.failures = new ArrayList<>();
}

public void add(WriteSuccess<D> success) {
this.successes.add(success);
}

public void addSuccess(D success) {
add(new WriteSuccess<D>(success));
}

public void addSuccesses(List<D> successes) {
for(D success: successes) {
addSuccess(success);
}
}

public List<WriteSuccess<D>> getSuccesses() {
return successes;
}

public void add(WriteFailure<D> failure) {
this.failures.add(failure);
}

public void addFailure(D document, Throwable cause, String message) {
add(new WriteFailure(document, cause, message));
}

public List<WriteFailure<D>> getFailures() {
return failures;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.elasticsearch.bulk;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.metron.elasticsearch.client.ElasticsearchClient;
import org.apache.metron.indexing.dao.update.Document;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
* Writes documents to an Elasticsearch index in bulk.
*
* @param <D> The type of document to write.
*/
public class ElasticsearchBulkDocumentWriter<D extends Document> implements BulkDocumentWriter<D> {

/**
* A {@link Document} along with the index it will be written to.
*/
private class Indexable {
D document;
String index;

public Indexable(D document, String index) {
this.document = document;
this.index = index;
}
}

private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private ElasticsearchClient client;
private List<Indexable> documents;
private WriteRequest.RefreshPolicy refreshPolicy;

public ElasticsearchBulkDocumentWriter(ElasticsearchClient client) {
this.client = client;
this.documents = new ArrayList<>();
this.refreshPolicy = WriteRequest.RefreshPolicy.NONE;
}

@Override
public void addDocument(D document, String indexName) {
documents.add(new Indexable(document, indexName));
LOG.debug("Adding document to batch; document={}, index={}", document, indexName);
}

@Override
public BulkDocumentWriterResults<D> write() {
BulkDocumentWriterResults<D> results = new BulkDocumentWriterResults<>();
try {
// create an index request for each document
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(refreshPolicy);
for(Indexable doc: documents) {
DocWriteRequest request = createRequest(doc.document, doc.index);
bulkRequest.add(request);
}

// submit the request and handle the response
BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequest);
handleBulkResponse(bulkResponse, documents, results);

} catch(IOException e) {
// assume all documents have failed
for(Indexable indexable: documents) {
D failed = indexable.document;
results.addFailure(failed, e, ExceptionUtils.getRootCauseMessage(e));
}
LOG.error("Failed to submit bulk request; all documents failed", e);

} finally {
// flush all documents no matter which ones succeeded or failed
documents.clear();
}

LOG.debug("Wrote document(s) to Elasticsearch; batchSize={}, success={}, failed={}",
documents.size(), results.getSuccesses().size(), results.getFailures().size());
return results;
}

@Override
public int size() {
return documents.size();
}

public ElasticsearchBulkDocumentWriter<D> withRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) {
this.refreshPolicy = refreshPolicy;
return this;
}

private IndexRequest createRequest(D document, String index) {
if(document.getTimestamp() == null) {
throw new IllegalArgumentException("Document must contain the timestamp");
}
return new IndexRequest()
.source(document.getDocument())
.type(document.getSensorType() + "_doc")
.id(document.getGuid())
.index(index)
.timestamp(document.getTimestamp().toString());
}

/**
* Handles the {@link BulkResponse} received from Elasticsearch.
* @param bulkResponse The response received from Elasticsearch.
* @param documents The documents included in the bulk request.
* @param results The writer results.
*/
private void handleBulkResponse(BulkResponse bulkResponse, List<Indexable> documents, BulkDocumentWriterResults<D> results) {
if (bulkResponse.hasFailures()) {

// interrogate the response to distinguish between those that succeeded and those that failed
for(BulkItemResponse response: bulkResponse) {
if(response.isFailed()) {
// request failed
D failed = getDocument(response.getItemId());
Exception cause = response.getFailure().getCause();
String message = response.getFailureMessage();
results.addFailure(failed, cause, message);

} else {
// request succeeded
D success = getDocument(response.getItemId());
results.addSuccess(success);
}
}
} else {
// all requests succeeded
for(Indexable success: documents) {
results.addSuccess(success.document);
}
}
}

private D getDocument(int index) {
return documents.get(index).document;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.elasticsearch.bulk;

import org.apache.metron.indexing.dao.update.Document;

/**
* Indicates that a document failed to be written by a {@link BulkDocumentWriter}.
* @param <D> The type of document that failed to write.
*/
public class WriteFailure <D extends Document> {
private D document;
private Throwable cause;
private String message;

public WriteFailure(D document, Throwable cause, String message) {
this.document = document;
this.cause = cause;
this.message = message;
}

public D getDocument() {
return document;
}

public Throwable getCause() {
return cause;
}

public String getMessage() {
return message;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.elasticsearch.bulk;

import org.apache.metron.indexing.dao.update.Document;

/**
* Indicates that a document was successfully written by a {@link BulkDocumentWriter}.
* @param <D> The type of document written.
*/
public class WriteSuccess <D extends Document> {
private D document;

public WriteSuccess(D document) {
this.document = document;
}

public D getDocument() {
return document;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public ElasticsearchDao withRefreshPolicy(WriteRequest.RefreshPolicy refreshPoli
}

protected Optional<String> getIndexName(String guid, String sensorType) throws IOException {
return updateDao.getIndexName(guid, sensorType);
return updateDao.findIndexNameByGUID(guid, sensorType);
}

protected SearchResponse search(SearchRequest request, QueryBuilder queryBuilder)
Expand Down
Loading