88
99
1010def _get_grpc_header (engine_ip = None , cluster = None ):
11+ """
12+ Generate gRPC metadata headers for the request.
13+
14+ This function creates a list of metadata headers to be used in gRPC requests.
15+ It includes optional headers for the engine IP and cluster UUID.
16+
17+ Args:
18+ engine_ip (str, optional): The IP address of the engine. Defaults to None.
19+ cluster (str, optional): The UUID of the cluster. Defaults to None.
20+
21+ Returns:
22+ list: A list of tuples representing the gRPC metadata headers.
23+ """
1124 metadata = []
1225 if engine_ip :
1326 metadata .append (('plannerip' , engine_ip ))
@@ -123,7 +136,7 @@ class ClusterManager:
123136 cluster_uuid (str): The unique identifier for the target cluster.
124137 """
125138
126- def __init__ (self , host : str , port : int , user : str , password : str , secure_channel : bool = False , timeout = 60 * 3 , cluster_uuid = None ):
139+ def __init__ (self , host : str , port : int , user : str , password : str , secure_channel : bool = False , timeout = 60 * 3 , cluster_uuid = None , grpc_options = None ):
127140 """
128141 Initializes a new instance of the ClusterManager class.
129142
@@ -147,6 +160,9 @@ def __init__(self, host: str, port: int, user: str, password: str, secure_channe
147160 self ._timeout = time .time () + timeout
148161 self ._secure_channel = secure_channel
149162 self .cluster_uuid = cluster_uuid
163+ self ._grpc_options = grpc_options
164+ if grpc_options is None :
165+ self ._grpc_options = dict ()
150166
151167 @property
152168 def _get_connection (self ):
@@ -161,14 +177,34 @@ def _get_connection(self):
161177 if self ._secure_channel :
162178 self ._channel = grpc .secure_channel (
163179 target = '{}:{}' .format (self ._host , self ._port ),
180+ options = self ._grpc_options ,
164181 credentials = grpc .ssl_channel_credentials ()
165182 )
166183 else :
167184 self ._channel = grpc .insecure_channel (
168- target = '{}:{}' .format (self ._host , self ._port )
185+ target = '{}:{}' .format (self ._host , self ._port ),
186+ options = self ._grpc_options
169187 )
170188 return cluster_pb2_grpc .ClusterServiceStub (self ._channel )
171189
190+ def _check_cluster_status (self ):
191+ while True :
192+ try :
193+ # Create a status request payload with user credentials
194+ status_payload = cluster_pb2 .ClusterStatusRequest (
195+ user = self ._user ,
196+ password = self ._password
197+ )
198+ # Send the status request to the cluster service
199+ response = self ._get_connection .status (
200+ status_payload ,
201+ metadata = _get_grpc_header (cluster = self .cluster_uuid )
202+ )
203+ # Yield the current status
204+ yield response .status
205+ except _InactiveRpcError as e :
206+ yield None
207+
172208 def resume (self ) -> bool :
173209 """
174210 Resumes the cluster if it is currently suspended or not in the 'active' state.
@@ -229,27 +265,15 @@ def resume(self) -> bool:
229265 """
230266 return False
231267
232- # Wait for the cluster to become active
233- while True :
234- try :
235- status_payload = cluster_pb2 .ClusterStatusRequest (
236- user = self ._user ,
237- password = self ._password
238- )
239- response = self ._get_connection .status (
240- status_payload ,
241- metadata = _get_grpc_header (cluster = self .cluster_uuid )
242- )
243- if response .status == 'active' :
244- lock .set_active ()
245- return True
246- if response .status in ['suspended' , 'failed' ]:
247- return False
248- if time .time () > self ._timeout :
249- return False
250- except _InactiveRpcError as e :
251- pass
268+ for status in self ._check_cluster_status ():
269+ if status == 'active' :
270+ lock .set_active ()
271+ return True
272+ elif status == 'failed' or time .time () > self ._timeout :
273+ return False
274+ # Wait for 5 seconds before the next status check
252275 time .sleep (5 )
276+ return False
253277
254278 def suspend (self ):
255279 """
0 commit comments