diff --git a/be/src/util/thrift_server.cpp b/be/src/util/thrift_server.cpp index 3b37597ef8abc9..aa3a678f51d17e 100644 --- a/be/src/util/thrift_server.cpp +++ b/be/src/util/thrift_server.cpp @@ -33,7 +33,6 @@ #include #include #include -#include #include namespace palo { @@ -42,12 +41,16 @@ namespace palo { // Helper class that starts a server in a separate thread, and handles // the inter-thread communication to monitor whether it started // correctly. -class ThriftServer::ThriftServerEventProcessor +class ThriftServer::ThriftServerEventProcessor : public apache::thrift::server::TServerEventHandler { public: - ThriftServerEventProcessor(ThriftServer* thrift_server) : + ThriftServerEventProcessor(ThriftServer* thrift_server) : _thrift_server(thrift_server), - _signal_fired(false) { + _signal_fired(false) { + } + + // friendly to code style + virtual ~ThriftServerEventProcessor() { } // Called by TNonBlockingServer when server has acquired its resources and is ready to @@ -103,7 +106,7 @@ Status ThriftServer::ThriftServerEventProcessor::start_and_wait_for_server() { _thrift_server->_server_thread.reset( new boost::thread(&ThriftServer::ThriftServerEventProcessor::supervise, this)); - boost::system_time deadline = boost::get_system_time() + boost::system_time deadline = boost::get_system_time() + boost::posix_time::milliseconds(TIMEOUT_MS); // Loop protects against spurious wakeup. Locks provide necessary fences to ensure @@ -265,7 +268,7 @@ ThriftServer::ThriftServer( int port, MetricGroup* metrics, int num_worker_threads, - ServerType server_type) : + ServerType server_type) : _started(false), _port(port), _num_worker_threads(num_worker_threads), @@ -291,10 +294,10 @@ ThriftServer::ThriftServer( Status ThriftServer::start() { DCHECK(!_started); - boost::shared_ptr + boost::shared_ptr protocol_factory(new apache::thrift::protocol::TBinaryProtocolFactory()); boost::shared_ptr thread_mgr; - boost::shared_ptr + boost::shared_ptr thread_factory(new apache::thrift::concurrency::PosixThreadFactory()); boost::shared_ptr fe_server_transport; boost::shared_ptr transport_factory; diff --git a/fe/src/com/baidu/palo/common/ThriftServer.java b/fe/src/com/baidu/palo/common/ThriftServer.java index 3360d1f41519d8..100a0d692e0ff2 100644 --- a/fe/src/com/baidu/palo/common/ThriftServer.java +++ b/fe/src/com/baidu/palo/common/ThriftServer.java @@ -1,13 +1,8 @@ -// Modifications copyright (C) 2017, Baidu.com, Inc. -// Copyright 2017 The Apache Software Foundation - -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // @@ -20,8 +15,12 @@ package com.baidu.palo.common; -import org.apache.logging.log4j.Logger; +import com.baidu.palo.thrift.TNetworkAddress; + +import com.google.common.collect.Sets; + import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TServer; @@ -33,20 +32,25 @@ import org.apache.thrift.transport.TTransportException; import java.io.IOException; +import java.util.Set; -/** - * Created by zhaochun on 14-9-2. - */ public class ThriftServer { - private static final Logger LOG = LogManager.getLogger(ThriftServer.class); - private ThriftServerType type = ThriftServerType.THREAD_POOL; - private int port; + private static final Logger LOG = LogManager.getLogger(ThriftServer.class); + private ThriftServerType type = ThriftServerType.THREAD_POOL; + private int port; private TProcessor processor; - private TServer server; - private Thread serverThread; + private TServer server; + private Thread serverThread; + private Set connects; + public ThriftServer(int port, TProcessor processor) { this.port = port; this.processor = processor; + this.connects = Sets.newConcurrentHashSet(); + } + + public ThriftServerType getType() { + return type; } private void createSimpleServer() throws TTransportException { @@ -86,6 +90,10 @@ public void start() throws IOException { LOG.warn("create thrift server failed.", ex); throw new IOException("create thrift server failed.", ex); } + + ThriftServerEventProcessor eventProcessor = new ThriftServerEventProcessor(this); + server.setServerEventHandler(eventProcessor); + serverThread = new Thread(new Runnable() { @Override public void run() { @@ -109,6 +117,14 @@ public void join() throws InterruptedException { serverThread.join(); } + public void addConnect(TNetworkAddress clientAddress) { + connects.add(clientAddress); + } + + public void removeConnect(TNetworkAddress clientAddress) { + connects.remove(clientAddress); + } + public static enum ThriftServerType { SIMPLE, THREADED, diff --git a/fe/src/com/baidu/palo/common/ThriftServerContext.java b/fe/src/com/baidu/palo/common/ThriftServerContext.java new file mode 100644 index 00000000000000..70f7d6a487b036 --- /dev/null +++ b/fe/src/com/baidu/palo/common/ThriftServerContext.java @@ -0,0 +1,35 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.baidu.palo.common; + +import com.baidu.palo.thrift.TNetworkAddress; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.thrift.server.ServerContext; + +public class ThriftServerContext implements ServerContext { + private static final Logger LOG = LogManager.getLogger(ThriftServerEventProcessor.class); + private TNetworkAddress client; + + public ThriftServerContext(TNetworkAddress clientAddress) { + this.client = clientAddress; + } + + public TNetworkAddress getClient() { + return client; + } +} diff --git a/fe/src/com/baidu/palo/common/ThriftServerEventProcessor.java b/fe/src/com/baidu/palo/common/ThriftServerEventProcessor.java new file mode 100644 index 00000000000000..4c326b5eb98612 --- /dev/null +++ b/fe/src/com/baidu/palo/common/ThriftServerEventProcessor.java @@ -0,0 +1,123 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.baidu.palo.common; + +import com.baidu.palo.thrift.TNetworkAddress; + +import com.google.common.base.Preconditions; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.server.ServerContext; +import org.apache.thrift.server.TServerEventHandler; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +public class ThriftServerEventProcessor implements TServerEventHandler { + private static final Logger LOG = LogManager.getLogger(ThriftServerEventProcessor.class); + + private ThriftServer thriftServer; + + private static ThreadLocal connectionContext; + + public ThriftServerEventProcessor(ThriftServer thriftServer) { + this.thriftServer = thriftServer; + connectionContext = new ThreadLocal<>(); + } + + public static ThriftServerContext getConnectionContext() { + return connectionContext.get(); + } + + @Override + public void preServe() { + } + + @Override + public ServerContext createContext(TProtocol input, TProtocol output) { + // param input is class org.apache.thrift.protocol.TBinaryProtocol + TSocket tSocket = null; + TTransport transport = input.getTransport(); + + switch (thriftServer.getType()) { + case THREADED: + // class org.apache.thrift.transport.TFramedTransport + Preconditions.checkState(transport instanceof TFramedTransport); + TFramedTransport framedTransport = (TFramedTransport) transport; + // NOTE: we need patch code in TNonblockingServer, we don't use for now. + // see https://issues.apache.org/jira/browse/THRIFT-1053 + break; + case SIMPLE: + case THREAD_POOL: + // org.apache.thrift.transport.TSocket + Preconditions.checkState(transport instanceof TSocket); + tSocket = (TSocket) transport; + break; + } + if (tSocket == null) { + LOG.info("fail to get client socket. server type: {}", thriftServer.getType()); + return null; + } + SocketAddress socketAddress = tSocket.getSocket().getRemoteSocketAddress(); + InetSocketAddress inetSocketAddress = null; + if (socketAddress instanceof InetSocketAddress) { + inetSocketAddress = (InetSocketAddress) socketAddress; + } else { + LOG.info("fail to get client socket address. server type: {}", + thriftServer.getType()); + return null; + } + TNetworkAddress clientAddress = new TNetworkAddress( + inetSocketAddress.getHostString(), + inetSocketAddress.getPort()); + + thriftServer.addConnect(clientAddress); + + LOG.info("create thrift context. client: {}", clientAddress); + return new ThriftServerContext(clientAddress); + } + + @Override + public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) { + if (serverContext == null) { + return; + } + + Preconditions.checkState(serverContext instanceof ThriftServerContext); + ThriftServerContext thriftServerContext = (ThriftServerContext) serverContext; + TNetworkAddress clientAddress = thriftServerContext.getClient(); + connectionContext.remove(); + thriftServer.removeConnect(clientAddress); + LOG.info("delete thrift context. client: {}", clientAddress); + } + + @Override + public void processContext(ServerContext serverContext, TTransport inputTransport, TTransport outputTransport) { + if (serverContext == null) { + return; + } + + ThriftServerContext thriftServerContext = (ThriftServerContext) serverContext; + TNetworkAddress clientAddress = thriftServerContext.getClient(); + Preconditions.checkState(serverContext instanceof ThriftServerContext); + connectionContext.set(new ThriftServerContext(clientAddress)); + } +} diff --git a/fe/src/com/baidu/palo/service/FrontendServiceImpl.java b/fe/src/com/baidu/palo/service/FrontendServiceImpl.java index 474de2f74b3eb2..8b146239e97fd4 100644 --- a/fe/src/com/baidu/palo/service/FrontendServiceImpl.java +++ b/fe/src/com/baidu/palo/service/FrontendServiceImpl.java @@ -28,9 +28,11 @@ import com.baidu.palo.common.Config; import com.baidu.palo.common.DdlException; import com.baidu.palo.common.PatternMatcher; -import com.baidu.palo.load.MiniEtlTaskInfo; +import com.baidu.palo.common.ThriftServerContext; +import com.baidu.palo.common.ThriftServerEventProcessor; import com.baidu.palo.load.EtlStatus; import com.baidu.palo.load.LoadJob; +import com.baidu.palo.load.MiniEtlTaskInfo; import com.baidu.palo.master.MasterImpl; import com.baidu.palo.mysql.MysqlPassword; import com.baidu.palo.qe.AuditBuilder; @@ -38,12 +40,10 @@ import com.baidu.palo.qe.ConnectProcessor; import com.baidu.palo.qe.QeProcessor; import com.baidu.palo.qe.VariableMgr; -import com.baidu.palo.service.FrontendOptions; +import com.baidu.palo.system.Frontend; import com.baidu.palo.system.SystemInfoService; import com.baidu.palo.thrift.FrontendService; import com.baidu.palo.thrift.FrontendServiceVersion; -import com.baidu.palo.thrift.TMiniLoadEtlStatusResult; -import com.baidu.palo.thrift.TMiniLoadRequest; import com.baidu.palo.thrift.TColumnDef; import com.baidu.palo.thrift.TColumnDesc; import com.baidu.palo.thrift.TDescribeTableParams; @@ -60,6 +60,9 @@ import com.baidu.palo.thrift.TMasterOpRequest; import com.baidu.palo.thrift.TMasterOpResult; import com.baidu.palo.thrift.TMasterResult; +import com.baidu.palo.thrift.TMiniLoadEtlStatusResult; +import com.baidu.palo.thrift.TMiniLoadRequest; +import com.baidu.palo.thrift.TNetworkAddress; import com.baidu.palo.thrift.TReportExecStatusParams; import com.baidu.palo.thrift.TReportExecStatusResult; import com.baidu.palo.thrift.TReportRequest; @@ -69,8 +72,8 @@ import com.baidu.palo.thrift.TStatusCode; import com.baidu.palo.thrift.TTableStatus; import com.baidu.palo.thrift.TUniqueId; -import com.baidu.palo.thrift.TUpdateMiniEtlTaskStatusRequest; import com.baidu.palo.thrift.TUpdateExportTaskStatusRequest; +import com.baidu.palo.thrift.TUpdateMiniEtlTaskStatusRequest; import com.google.common.base.Joiner; import com.google.common.collect.Lists; @@ -399,6 +402,20 @@ public TFeResult updateMiniEtlTaskStatus(TUpdateMiniEtlTaskStatusRequest request @Override public TMasterOpResult forward(TMasterOpRequest params) throws TException { + ThriftServerContext connectionContext = ThriftServerEventProcessor.getConnectionContext(); + // For NonBlockingServer, we can not get client ip. + if (connectionContext != null) { + TNetworkAddress clientAddress = connectionContext.getClient(); + LOG.debug("debug: client address in forward: {}", clientAddress); + + Frontend fe = Catalog.getInstance().checkFeExist( + clientAddress.getHostname(), + clientAddress.getPort()); + if (fe == null) { + throw new TException("request from invalid host, reject."); + } + } + ConnectContext context = new ConnectContext(null); ConnectProcessor processor = new ConnectProcessor(context); TMasterOpResult result = processor.proxyExecute(params);