Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions be/src/util/thrift_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include <thrift/server/TThreadPoolServer.h>
#include <thrift/server/TThreadedServer.h>
#include <thrift/transport/TSocket.h>
#include <thrift/server/TThreadPoolServer.h>
#include <thrift/transport/TServerSocket.h>

namespace palo {
Expand All @@ -42,12 +41,16 @@ namespace palo {
// Helper class that starts a server in a separate thread, and handles
// the inter-thread communication to monitor whether it started
// correctly.
class ThriftServer::ThriftServerEventProcessor
class ThriftServer::ThriftServerEventProcessor
: public apache::thrift::server::TServerEventHandler {
public:
ThriftServerEventProcessor(ThriftServer* thrift_server) :
ThriftServerEventProcessor(ThriftServer* thrift_server) :
_thrift_server(thrift_server),
_signal_fired(false) {
_signal_fired(false) {
}

// friendly to code style
virtual ~ThriftServerEventProcessor() {
}

// Called by TNonBlockingServer when server has acquired its resources and is ready to
Expand Down Expand Up @@ -103,7 +106,7 @@ Status ThriftServer::ThriftServerEventProcessor::start_and_wait_for_server() {
_thrift_server->_server_thread.reset(
new boost::thread(&ThriftServer::ThriftServerEventProcessor::supervise, this));

boost::system_time deadline = boost::get_system_time()
boost::system_time deadline = boost::get_system_time()
+ boost::posix_time::milliseconds(TIMEOUT_MS);

// Loop protects against spurious wakeup. Locks provide necessary fences to ensure
Expand Down Expand Up @@ -265,7 +268,7 @@ ThriftServer::ThriftServer(
int port,
MetricGroup* metrics,
int num_worker_threads,
ServerType server_type) :
ServerType server_type) :
_started(false),
_port(port),
_num_worker_threads(num_worker_threads),
Expand All @@ -291,10 +294,10 @@ ThriftServer::ThriftServer(

Status ThriftServer::start() {
DCHECK(!_started);
boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>
boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>
protocol_factory(new apache::thrift::protocol::TBinaryProtocolFactory());
boost::shared_ptr<apache::thrift::concurrency::ThreadManager> thread_mgr;
boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>
boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>
thread_factory(new apache::thrift::concurrency::PosixThreadFactory());
boost::shared_ptr<apache::thrift::transport::TServerTransport> fe_server_transport;
boost::shared_ptr<apache::thrift::transport::TTransportFactory> transport_factory;
Expand Down
54 changes: 35 additions & 19 deletions fe/src/com/baidu/palo/common/ThriftServer.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
// Modifications copyright (C) 2017, Baidu.com, Inc.
// Copyright 2017 The Apache Software Foundation

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
Expand All @@ -20,8 +15,12 @@

package com.baidu.palo.common;

import org.apache.logging.log4j.Logger;
import com.baidu.palo.thrift.TNetworkAddress;

import com.google.common.collect.Sets;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
Expand All @@ -33,20 +32,25 @@
import org.apache.thrift.transport.TTransportException;

import java.io.IOException;
import java.util.Set;

/**
* Created by zhaochun on 14-9-2.
*/
public class ThriftServer {
private static final Logger LOG = LogManager.getLogger(ThriftServer.class);
private ThriftServerType type = ThriftServerType.THREAD_POOL;
private int port;
private static final Logger LOG = LogManager.getLogger(ThriftServer.class);
private ThriftServerType type = ThriftServerType.THREAD_POOL;
private int port;
private TProcessor processor;
private TServer server;
private Thread serverThread;
private TServer server;
private Thread serverThread;
private Set<TNetworkAddress> connects;

public ThriftServer(int port, TProcessor processor) {
this.port = port;
this.processor = processor;
this.connects = Sets.newConcurrentHashSet();
}

public ThriftServerType getType() {
return type;
}

private void createSimpleServer() throws TTransportException {
Expand Down Expand Up @@ -86,6 +90,10 @@ public void start() throws IOException {
LOG.warn("create thrift server failed.", ex);
throw new IOException("create thrift server failed.", ex);
}

ThriftServerEventProcessor eventProcessor = new ThriftServerEventProcessor(this);
server.setServerEventHandler(eventProcessor);

serverThread = new Thread(new Runnable() {
@Override
public void run() {
Expand All @@ -109,6 +117,14 @@ public void join() throws InterruptedException {
serverThread.join();
}

public void addConnect(TNetworkAddress clientAddress) {
connects.add(clientAddress);
}

public void removeConnect(TNetworkAddress clientAddress) {
connects.remove(clientAddress);
}

public static enum ThriftServerType {
SIMPLE,
THREADED,
Expand Down
35 changes: 35 additions & 0 deletions fe/src/com/baidu/palo/common/ThriftServerContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package com.baidu.palo.common;

import com.baidu.palo.thrift.TNetworkAddress;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.server.ServerContext;

public class ThriftServerContext implements ServerContext {
private static final Logger LOG = LogManager.getLogger(ThriftServerEventProcessor.class);
private TNetworkAddress client;

public ThriftServerContext(TNetworkAddress clientAddress) {
this.client = clientAddress;
}

public TNetworkAddress getClient() {
return client;
}
}
123 changes: 123 additions & 0 deletions fe/src/com/baidu/palo/common/ThriftServerEventProcessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package com.baidu.palo.common;

import com.baidu.palo.thrift.TNetworkAddress;

import com.google.common.base.Preconditions;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

import java.net.InetSocketAddress;
import java.net.SocketAddress;

public class ThriftServerEventProcessor implements TServerEventHandler {
private static final Logger LOG = LogManager.getLogger(ThriftServerEventProcessor.class);

private ThriftServer thriftServer;

private static ThreadLocal<ThriftServerContext> connectionContext;

public ThriftServerEventProcessor(ThriftServer thriftServer) {
this.thriftServer = thriftServer;
connectionContext = new ThreadLocal<>();
}

public static ThriftServerContext getConnectionContext() {
return connectionContext.get();
}

@Override
public void preServe() {
}

@Override
public ServerContext createContext(TProtocol input, TProtocol output) {
// param input is class org.apache.thrift.protocol.TBinaryProtocol
TSocket tSocket = null;
TTransport transport = input.getTransport();

switch (thriftServer.getType()) {
case THREADED:
// class org.apache.thrift.transport.TFramedTransport
Preconditions.checkState(transport instanceof TFramedTransport);
TFramedTransport framedTransport = (TFramedTransport) transport;
// NOTE: we need patch code in TNonblockingServer, we don't use for now.
// see https://issues.apache.org/jira/browse/THRIFT-1053
break;
case SIMPLE:
case THREAD_POOL:
// org.apache.thrift.transport.TSocket
Preconditions.checkState(transport instanceof TSocket);
tSocket = (TSocket) transport;
break;
}
if (tSocket == null) {
LOG.info("fail to get client socket. server type: {}", thriftServer.getType());
return null;
}
SocketAddress socketAddress = tSocket.getSocket().getRemoteSocketAddress();
InetSocketAddress inetSocketAddress = null;
if (socketAddress instanceof InetSocketAddress) {
inetSocketAddress = (InetSocketAddress) socketAddress;
} else {
LOG.info("fail to get client socket address. server type: {}",
thriftServer.getType());
return null;
}
TNetworkAddress clientAddress = new TNetworkAddress(
inetSocketAddress.getHostString(),
inetSocketAddress.getPort());

thriftServer.addConnect(clientAddress);

LOG.info("create thrift context. client: {}", clientAddress);
return new ThriftServerContext(clientAddress);
}

@Override
public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) {
if (serverContext == null) {
return;
}

Preconditions.checkState(serverContext instanceof ThriftServerContext);
ThriftServerContext thriftServerContext = (ThriftServerContext) serverContext;
TNetworkAddress clientAddress = thriftServerContext.getClient();
connectionContext.remove();
thriftServer.removeConnect(clientAddress);
LOG.info("delete thrift context. client: {}", clientAddress);
}

@Override
public void processContext(ServerContext serverContext, TTransport inputTransport, TTransport outputTransport) {
if (serverContext == null) {
return;
}

ThriftServerContext thriftServerContext = (ThriftServerContext) serverContext;
TNetworkAddress clientAddress = thriftServerContext.getClient();
Preconditions.checkState(serverContext instanceof ThriftServerContext);
connectionContext.set(new ThriftServerContext(clientAddress));
}
}
27 changes: 22 additions & 5 deletions fe/src/com/baidu/palo/service/FrontendServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,22 @@
import com.baidu.palo.common.Config;
import com.baidu.palo.common.DdlException;
import com.baidu.palo.common.PatternMatcher;
import com.baidu.palo.load.MiniEtlTaskInfo;
import com.baidu.palo.common.ThriftServerContext;
import com.baidu.palo.common.ThriftServerEventProcessor;
import com.baidu.palo.load.EtlStatus;
import com.baidu.palo.load.LoadJob;
import com.baidu.palo.load.MiniEtlTaskInfo;
import com.baidu.palo.master.MasterImpl;
import com.baidu.palo.mysql.MysqlPassword;
import com.baidu.palo.qe.AuditBuilder;
import com.baidu.palo.qe.ConnectContext;
import com.baidu.palo.qe.ConnectProcessor;
import com.baidu.palo.qe.QeProcessor;
import com.baidu.palo.qe.VariableMgr;
import com.baidu.palo.service.FrontendOptions;
import com.baidu.palo.system.Frontend;
import com.baidu.palo.system.SystemInfoService;
import com.baidu.palo.thrift.FrontendService;
import com.baidu.palo.thrift.FrontendServiceVersion;
import com.baidu.palo.thrift.TMiniLoadEtlStatusResult;
import com.baidu.palo.thrift.TMiniLoadRequest;
import com.baidu.palo.thrift.TColumnDef;
import com.baidu.palo.thrift.TColumnDesc;
import com.baidu.palo.thrift.TDescribeTableParams;
Expand All @@ -60,6 +60,9 @@
import com.baidu.palo.thrift.TMasterOpRequest;
import com.baidu.palo.thrift.TMasterOpResult;
import com.baidu.palo.thrift.TMasterResult;
import com.baidu.palo.thrift.TMiniLoadEtlStatusResult;
import com.baidu.palo.thrift.TMiniLoadRequest;
import com.baidu.palo.thrift.TNetworkAddress;
import com.baidu.palo.thrift.TReportExecStatusParams;
import com.baidu.palo.thrift.TReportExecStatusResult;
import com.baidu.palo.thrift.TReportRequest;
Expand All @@ -69,8 +72,8 @@
import com.baidu.palo.thrift.TStatusCode;
import com.baidu.palo.thrift.TTableStatus;
import com.baidu.palo.thrift.TUniqueId;
import com.baidu.palo.thrift.TUpdateMiniEtlTaskStatusRequest;
import com.baidu.palo.thrift.TUpdateExportTaskStatusRequest;
import com.baidu.palo.thrift.TUpdateMiniEtlTaskStatusRequest;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -399,6 +402,20 @@ public TFeResult updateMiniEtlTaskStatus(TUpdateMiniEtlTaskStatusRequest request

@Override
public TMasterOpResult forward(TMasterOpRequest params) throws TException {
ThriftServerContext connectionContext = ThriftServerEventProcessor.getConnectionContext();
// For NonBlockingServer, we can not get client ip.
if (connectionContext != null) {
TNetworkAddress clientAddress = connectionContext.getClient();
LOG.debug("debug: client address in forward: {}", clientAddress);

Frontend fe = Catalog.getInstance().checkFeExist(
clientAddress.getHostname(),
clientAddress.getPort());
if (fe == null) {
throw new TException("request from invalid host, reject.");
}
}

ConnectContext context = new ConnectContext(null);
ConnectProcessor processor = new ConnectProcessor(context);
TMasterOpResult result = processor.proxyExecute(params);
Expand Down