Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import io.opentracing.util.GlobalTracer;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;

import org.apache.ratis.thirdparty.com.google.protobuf.ProtocolMessageEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -113,7 +114,7 @@ public HddsDispatcher(ConfigurationSource config, ContainerSet contSet,
this.tokenVerifier = tokenVerifier;

protocolMetrics =
new ProtocolMessageMetrics<ProtocolMessageEnum>(
new ProtocolMessageMetrics<>(
"HddsDispatcher",
"HDDS dispatcher metrics",
ContainerProtos.Type.values());
Expand Down Expand Up @@ -175,7 +176,7 @@ private ContainerCommandResponseProto dispatchRequest(
ContainerCommandRequestProto msg, DispatcherContext dispatcherContext) {
Preconditions.checkNotNull(msg);
if (LOG.isTraceEnabled()) {
LOG.trace("Command {}, trace ID: {} ", msg.getCmdType().toString(),
LOG.trace("Command {}, trace ID: {} ", msg.getCmdType(),
msg.getTraceID());
}

Expand Down Expand Up @@ -490,10 +491,9 @@ public void validateContainerCommand(
try {
validateBlockToken(msg);
} catch (IOException ioe) {
StorageContainerException sce = new StorageContainerException(
throw new StorageContainerException(
"Block token verification failed. " + ioe.getMessage(), ioe,
ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED);
throw sce;
}
}

Expand Down Expand Up @@ -583,14 +583,16 @@ private void audit(AuditAction action, EventType eventType,
AuditMessage amsg;
switch (result) {
case SUCCESS:
if(eventType == EventType.READ &&
AUDIT.getLogger().isInfoEnabled(AuditMarker.READ.getMarker())) {
amsg = buildAuditMessageForSuccess(action, params);
AUDIT.logReadSuccess(amsg);
} else if(eventType == EventType.WRITE &&
AUDIT.getLogger().isInfoEnabled(AuditMarker.WRITE.getMarker())) {
amsg = buildAuditMessageForSuccess(action, params);
AUDIT.logWriteSuccess(amsg);
if(isAllowed(action.getAction())) {
if(eventType == EventType.READ &&
AUDIT.getLogger().isInfoEnabled(AuditMarker.READ.getMarker())) {
amsg = buildAuditMessageForSuccess(action, params);
AUDIT.logReadSuccess(amsg);
} else if(eventType == EventType.WRITE &&
AUDIT.getLogger().isInfoEnabled(AuditMarker.WRITE.getMarker())) {
amsg = buildAuditMessageForSuccess(action, params);
AUDIT.logWriteSuccess(amsg);
}
}
break;

Expand Down Expand Up @@ -627,7 +629,6 @@ public AuditMessage buildAuditMessageForSuccess(AuditAction op,
.build();
}

//TODO: use GRPC to fetch user and ip details
@Override
public AuditMessage buildAuditMessageForFailure(AuditAction op,
Map<String, String> auditMap, Throwable throwable) {
Expand All @@ -646,4 +647,23 @@ enum EventType {
READ,
WRITE
}

/**
* Checks if the action is allowed for audit.
* @param action
* @return true or false accordingly.
*/
private boolean isAllowed(String action) {
switch(action) {
case "CLOSE_CONTAINER":
case "CREATE_CONTAINER":
case "LIST_CONTAINER":
case "DELETE_CONTAINER":
case "READ_CONTAINER":
case "UPDATE_CONTAINER":
case "DELETE_BLOCK":
return true;
default: return false;
}
}
}