From ba78b42968069b51d25867f9ca72583c1f8f8623 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20Brod=C3=A9n?= Date: Tue, 12 Dec 2023 13:54:22 +0100 Subject: [PATCH 1/4] Add and update peering with retries --- api/vpc_gcp_peering.go | 200 +++++++++++++++++++++---------- api/vpc_gcp_peering_withvpcid.go | 128 ++++++-------------- 2 files changed, 172 insertions(+), 156 deletions(-) diff --git a/api/vpc_gcp_peering.go b/api/vpc_gcp_peering.go index 8eb1bb7..2b50bee 100644 --- a/api/vpc_gcp_peering.go +++ b/api/vpc_gcp_peering.go @@ -7,76 +7,111 @@ import ( "time" ) -func (api *API) waitForGcpPeeringStatus(instanceID int, peerID string) error { - for { - time.Sleep(10 * time.Second) - data, err := api.ReadVpcGcpPeering(instanceID, peerID) +// RequestVpcGcpPeering: requests a VPC peering from an instance. +func (api *API) RequestVpcGcpPeering(instanceID int, params map[string]interface{}, + waitOnStatus bool, sleep, timeout int) (map[string]interface{}, error) { + + path := fmt.Sprintf("api/instances/%v/vpc-peering", instanceID) + attempt, data, err := api.requestVpcGcpPeeringWithRetry(path, params, waitOnStatus, 1, sleep, timeout) + if err != nil { + return nil, err + } + + if waitOnStatus { + log.Printf("[DEBUG] go-api::vpc_gcp_peering_withvpcid::request waiting for active state") + err := api.waitForGcpPeeringStatus(path, data["peering"].(string), attempt, sleep, timeout) if err != nil { - return err - } - rows := data["rows"].([]interface{}) - if len(rows) > 0 { - for _, row := range rows { - tempRow := row.(map[string]interface{}) - if tempRow["name"] != peerID { - continue - } - if tempRow["state"] == "ACTIVE" { - return nil - } - } + return nil, err } } + + return data, nil } -func (api *API) RequestVpcGcpPeering(instanceID int, params map[string]interface{}, - waitOnStatus bool) (map[string]interface{}, error) { +// requestVpcGcpPeeringWithRetry: requests a VPC peering from a path with retry logic +func (api *API) requestVpcGcpPeeringWithRetry(path string, params map[string]interface{}, + waitOnStatus bool, attempt, sleep, timeout int) (int, map[string]interface{}, error) { var ( data map[string]interface{} failed map[string]interface{} - path = fmt.Sprintf("api/instances/%v/vpc-peering", instanceID) ) - log.Printf("[DEBUG] go-api::vpc_gcp_peering::request params: %v", params) + log.Printf("[DEBUG] go-api::vpc_gcp_peering::request path: %s, params: %v", path, params) response, err := api.sling.New().Post(path).BodyJSON(params).Receive(&data, &failed) if err != nil { - return nil, err - } - if response.StatusCode != 200 { - return nil, fmt.Errorf("request VPC peering failed, status: %v, message: %s", response.StatusCode, failed) + return attempt, nil, err + } else if attempt*sleep > timeout { + return attempt, nil, + fmt.Errorf("request VPC peering failed, reached timeout of %d seconds", timeout) } - if waitOnStatus { - log.Printf("[DEBUG] go-api::vpc_gcp_peering::request waiting for active state") - api.waitForGcpPeeringStatus(instanceID, data["peering"].(string)) + switch response.StatusCode { + case 200: + return attempt, data, nil + case 400: + if strings.Compare(failed["error"].(string), "Timeout talking to backend") == 0 { + log.Printf("[INFO] go-api::vpc_gcp_peering::request Timeout talking to backend "+ + "attempt %d until timeout: %d", attempt, (timeout - (attempt * sleep))) + attempt++ + time.Sleep(time.Duration(sleep) * time.Second) + return api.requestVpcGcpPeeringWithRetry(path, params, waitOnStatus, attempt, sleep, timeout) + } } - return data, nil + return attempt, nil, fmt.Errorf("request VPC peering failed, status: %v, message: %s", + response.StatusCode, failed) } -func (api *API) ReadVpcGcpPeering(instanceID int, peerID string) (map[string]interface{}, error) { +// ReadVpcGcpPeering: reads the VPC peering from the API +func (api *API) ReadVpcGcpPeering(instanceID, sleep, timeout int) ( + map[string]interface{}, error) { + + path := fmt.Sprintf("/api/instances/%v/vpc-peering", instanceID) + _, data, err := api.readVpcGcpPeeringWithRetry(path, 1, sleep, timeout) + return data, err +} + +// readVpcGcpPeeringWithRetry: reads the VPC peering from the API with retry logic +func (api *API) readVpcGcpPeeringWithRetry(path string, attempt, sleep, timeout int) ( + int, map[string]interface{}, error) { + var ( data map[string]interface{} failed map[string]interface{} - path = fmt.Sprintf("/api/instances/%v/vpc-peering", instanceID) ) - log.Printf("[DEBUG] go-api::vpc_gcp_peering::read instance_id: %v, peer_id: %v", instanceID, peerID) + log.Printf("[DEBUG] go-api::vpc_gcp_peering::read path: %s", path) response, err := api.sling.New().Get(path).Receive(&data, &failed) - log.Printf("[DEBUG] go-api::vpc_gcp_peering::read data: %v", data) if err != nil { - return nil, err - } - if response.StatusCode != 200 { - return nil, fmt.Errorf("ReadRequest failed, status: %v, message: %s", response.StatusCode, failed) + return attempt, nil, err + } else if attempt*sleep > timeout { + return attempt, nil, fmt.Errorf("read plugins reached timeout of %d seconds", timeout) } - return data, nil + switch response.StatusCode { + case 200: + return attempt, data, nil + case 400: + if strings.Compare(failed["error"].(string), "Timeout talking to backend") == 0 { + log.Printf("[INFO] go-api::vpc_gcp_peering::read Timeout talking to backend "+ + "attempt %d until timeout: %d", attempt, (timeout - (attempt * sleep))) + attempt++ + time.Sleep(time.Duration(sleep) * time.Second) + return api.readVpcGcpPeeringWithRetry(path, attempt, sleep, timeout) + } + } + return attempt, nil, fmt.Errorf("read plugin with retry failed, status: %v, message: %s", + response.StatusCode, failed) } -func (api *API) UpdateVpcGcpPeering(instanceID int, peerID string) (map[string]interface{}, error) { - return api.ReadVpcGcpPeering(instanceID, peerID) +// UpdateVpcGcpPeering: updates a VPC peering from an instance. +func (api *API) UpdateVpcGcpPeering(instanceID int, sleep, timeout int) ( + map[string]interface{}, error) { + + // NOP just read out the VPC peering + return api.ReadVpcGcpPeering(instanceID, sleep, timeout) } +// RemoveVpcGcpPeering: removes a VPC peering from an instance. func (api *API) RemoveVpcGcpPeering(instanceID int, peerID string) error { var ( failed map[string]interface{} @@ -88,45 +123,86 @@ func (api *API) RemoveVpcGcpPeering(instanceID int, peerID string) error { if err != nil { return err } - if response.StatusCode != 204 { - return fmt.Errorf("RemoveVpcPeering failed, status: %v, message: %s", response.StatusCode, failed) + + switch response.StatusCode { + case 204: + return nil + default: + return fmt.Errorf("remove VPC peering failed, status: %v, message: %s", + response.StatusCode, failed) } - return nil } -func (api *API) ReadVpcGcpInfo(instanceID int) (map[string]interface{}, error) { - // Initiale values, 5 attempts and 20 second sleep - return api.readVpcGcpInfoWithRetry(instanceID, 5, 20) +// ReadVpcGcpInfo: reads the VPC info from the API +func (api *API) ReadVpcGcpInfo(instanceID, sleep, timeout int) (map[string]interface{}, error) { + path := fmt.Sprintf("/api/instances/%v/vpc-peering/info", instanceID) + return api.readVpcGcpInfoWithRetry(path, 1, sleep, timeout) } -func (api *API) readVpcGcpInfoWithRetry(instanceID, attempts, sleep int) (map[string]interface{}, error) { +// readVpcGcpInfoWithRetry: reads the VPC info from the API with retry logic +func (api *API) readVpcGcpInfoWithRetry(path string, attempt, sleep, timeout int) ( + map[string]interface{}, error) { + var ( data map[string]interface{} failed map[string]interface{} - path = fmt.Sprintf("/api/instances/%v/vpc-peering/info", instanceID) ) - log.Printf("[DEBUG] go-api::vpc_gcp_peering::info instance id: %v", instanceID) + log.Printf("[DEBUG] go-api::vpc_gcp_peering::info path: %s", path) response, err := api.sling.New().Get(path).Receive(&data, &failed) - log.Printf("[DEBUG] go-api::vpc_gcp_peering::info data: %v", data) if err != nil { return nil, err + } else if attempt*sleep > timeout { + return nil, fmt.Errorf("read VPC info, reached timeout of %d seconds", timeout) } - statusCode := response.StatusCode - log.Printf("[DEBUG] go-api::vpc_gcp_peering::info statusCode: %d", statusCode) - switch { - case statusCode == 400: + switch response.StatusCode { + case 200: + return data, nil + case 400: if strings.Compare(failed["error"].(string), "Timeout talking to backend") == 0 { - if attempts--; attempts > 0 { - log.Printf("[INFO] go-api::vpc_gcp_peering::info Timeout talking to backend "+ - "attempts left %d and retry in %d seconds", attempts, sleep) - time.Sleep(time.Duration(sleep) * time.Second) - return api.readVpcGcpInfoWithRetry(instanceID, attempts, 2*sleep) - } else { - return nil, fmt.Errorf("ReadInfo failed, status: %v, message: %s", response.StatusCode, failed) + log.Printf("[INFO] go-api::vpc_gcp_peering::info Timeout talking to backend "+ + "attempt %d until timeout: %d", attempt, (timeout - (attempt * sleep))) + attempt++ + time.Sleep(time.Duration(sleep) * time.Second) + return api.readVpcGcpInfoWithRetry(path, attempt, sleep, timeout) + } + } + return nil, fmt.Errorf("read VPC info failed, status: %v, message: %s", + response.StatusCode, failed) +} + +// waitForGcpPeeringStatus: waits for the VPC peering status to be ACTIVE or until timed out +func (api *API) waitForGcpPeeringStatus(path, peerID string, + attempt, sleep, timeout int) error { + + var ( + data map[string]interface{} + err error + ) + + for { + time.Sleep(time.Duration(sleep) * time.Second) + if attempt*sleep > timeout { + return fmt.Errorf("wait until GCP VPC peering status reached timeout of %d seconds", timeout) + } + + attempt, data, err = api.readVpcGcpPeeringWithRetry(path, attempt, sleep, timeout) + if err != nil { + return err + } + rows := data["rows"].([]interface{}) + if len(rows) > 0 { + for _, row := range rows { + tempRow := row.(map[string]interface{}) + if tempRow["name"] != peerID { + continue + } + if tempRow["state"] == "ACTIVE" { + return nil + } } } + attempt++ } - return data, nil } diff --git a/api/vpc_gcp_peering_withvpcid.go b/api/vpc_gcp_peering_withvpcid.go index 7b88ce3..47dc7b7 100644 --- a/api/vpc_gcp_peering_withvpcid.go +++ b/api/vpc_gcp_peering_withvpcid.go @@ -5,131 +5,71 @@ package api import ( "fmt" "log" - "strings" - "time" ) -func (api *API) waitForGcpPeeringStatusWithVpcId(vpcID, peerID string) error { - for { - time.Sleep(10 * time.Second) - data, err := api.ReadVpcGcpPeeringWithVpcId(vpcID, peerID) - if err != nil { - return err - } - rows := data["rows"].([]interface{}) - if len(rows) > 0 { - for _, row := range rows { - tempRow := row.(map[string]interface{}) - if tempRow["name"] != peerID { - continue - } - if tempRow["state"] == "ACTIVE" { - return nil - } - } - } - } -} - +// RequestVpcGcpPeeringWithVpcId: requests a VPC peering from an instance. func (api *API) RequestVpcGcpPeeringWithVpcId(vpcID string, params map[string]interface{}, - waitOnStatus bool) (map[string]interface{}, error) { - var ( - data map[string]interface{} - failed map[string]interface{} - path = fmt.Sprintf("api/vpcs/%s/vpc-peering", vpcID) - ) + waitOnStatus bool, sleep, timeout int) (map[string]interface{}, error) { - log.Printf("[DEBUG] go-api::vpc_gcp_peering_withvpcid::request params: %v", params) - response, err := api.sling.New().Post(path).BodyJSON(params).Receive(&data, &failed) + path := fmt.Sprintf("api/vpcs/%s/vpc-peering", vpcID) + attempt, data, err := api.requestVpcGcpPeeringWithRetry(path, params, waitOnStatus, 1, sleep, timeout) if err != nil { return nil, err } - if response.StatusCode != 200 { - return nil, fmt.Errorf("request VPC peering failed, status: %v, message: %s", response.StatusCode, failed) - } if waitOnStatus { log.Printf("[DEBUG] go-api::vpc_gcp_peering_withvpcid::request waiting for active state") - api.waitForGcpPeeringStatusWithVpcId(vpcID, data["peering"].(string)) + api.waitForGcpPeeringStatus(vpcID, data["peering"].(string), attempt, sleep, timeout) } + return data, nil } -func (api *API) ReadVpcGcpPeeringWithVpcId(vpcID, peerID string) (map[string]interface{}, error) { - var ( - data map[string]interface{} - failed map[string]interface{} - path = fmt.Sprintf("/api/vpcs/%s/vpc-peering", vpcID) - ) - - log.Printf("[DEBUG] go-api::vpc_gcp_peering_withvpcid::read instance_id: %s, peer_id: %s", vpcID, peerID) - response, err := api.sling.New().Get(path).Receive(&data, &failed) - log.Printf("[DEBUG] go-api::vpc_gcp_peering_withvpcid::read data: %v", data) - if err != nil { - return nil, err - } - if response.StatusCode != 200 { - return nil, fmt.Errorf("ReadRequest failed, status: %v, message: %s", response.StatusCode, failed) - } +// func (api *API) ReadVpcGcpPeering(instanceID, sleep, timeout int) ( +func (api *API) ReadVpcGcpPeeringWithVpcId(vpcID string, sleep, timeout int) ( + map[string]interface{}, error) { - return data, nil + path := fmt.Sprintf("/api/vpcs/%s/vpc-peering", vpcID) + _, data, err := api.readVpcGcpPeeringWithRetry(path, 1, sleep, timeout) + return data, err } -func (api *API) UpdateVpcGcpPeeringWithVpcId(vpcID, peerID string) (map[string]interface{}, error) { - return api.ReadVpcGcpPeeringWithVpcId(vpcID, peerID) +// UpdateVpcGcpPeeringWithVpcId: updates the VPC peering from the API +func (api *API) UpdateVpcGcpPeeringWithVpcId(vpcID string, sleep, timeout int) ( + map[string]interface{}, error) { + + // NOP just read out the VPC peering + return api.ReadVpcGcpPeeringWithVpcId(vpcID, sleep, timeout) } +// RemoveVpcGcpPeeringWithVpcId: removes the VPC peering from the API func (api *API) RemoveVpcGcpPeeringWithVpcId(vpcID, peerID string) error { var ( failed map[string]interface{} path = fmt.Sprintf("/api/vpcs/%s/vpc-peering/%s", vpcID, peerID) ) - log.Printf("[DEBUG] go-api::vpc_gcp_peering_withvpcid::remove vpc id: %s, peering id: %s", vpcID, peerID) + log.Printf("[DEBUG] go-api::vpc_gcp_peering_withvpcid::remove vpc id: %s, peering id: %s", + vpcID, peerID) response, err := api.sling.New().Delete(path).Receive(nil, &failed) if err != nil { return err } - if response.StatusCode != 204 { - return fmt.Errorf("RemoveVpcPeering failed, status: %v, message: %s", response.StatusCode, failed) + + switch response.StatusCode { + case 204: + return nil + default: + return fmt.Errorf("remove VPC peering failed, status: %v, message: %s", + response.StatusCode, failed) } - return nil } -func (api *API) ReadVpcGcpInfoWithVpcId(vpcID string) (map[string]interface{}, error) { - // Initiale values, 5 attempts and 20 second sleep - return api.readVpcGcpInfoWithRetryWithVpcId(vpcID, 5, 20) -} +// ReadVpcGcpInfoWithVpcId: reads the VPC info from the API +func (api *API) ReadVpcGcpInfoWithVpcId(vpcID string, sleep, timeout int) ( + map[string]interface{}, error) { -func (api *API) readVpcGcpInfoWithRetryWithVpcId(vpcID string, attempts, sleep int) (map[string]interface{}, error) { - var ( - data map[string]interface{} - failed map[string]interface{} - path = fmt.Sprintf("/api/vpcs/%s/vpc-peering/info", vpcID) - ) - - log.Printf("[DEBUG] go-api::vpc_gcp_peering_withvpcid::info vpc id: %s", vpcID) - response, err := api.sling.New().Get(path).Receive(&data, &failed) - log.Printf("[DEBUG] go-api::vpc_gcp_peering_withvpcid::info data: %v", data) - if err != nil { - return nil, err - } - - statusCode := response.StatusCode - log.Printf("[DEBUG] go-api::vpc_gcp_peering_withvpcid::info statusCode: %d", statusCode) - switch { - case statusCode == 400: - // Todo: Add error code to avoid using string comparison - if strings.Compare(failed["error"].(string), "Timeout talking to backend") == 0 { - if attempts--; attempts > 0 { - log.Printf("[INFO] go-api::vpc_gcp_peering_withvpcid::info Timeout talking to backend "+ - "attempts left %d and retry in %d seconds", attempts, sleep) - time.Sleep(time.Duration(sleep) * time.Second) - return api.readVpcGcpInfoWithRetryWithVpcId(vpcID, attempts, 2*sleep) - } else { - return nil, fmt.Errorf("ReadInfo failed, status: %v, message: %s", response.StatusCode, failed) - } - } - } - return data, nil + path := fmt.Sprintf("/api/vpcs/%s/vpc-peering/info", vpcID) + _, data, err := api.readVpcGcpPeeringWithRetry(path, 1, sleep, timeout) + return data, err } From 18a76c379268e37f51e01f65ca9c8bc72e5e459c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20Brod=C3=A9n?= Date: Wed, 13 Dec 2023 09:56:20 +0100 Subject: [PATCH 2/4] Move waitForGcpPeeringStatus back to its earlier position. Instead move it in a separated PR/commit to ease this diff. --- api/vpc_gcp_peering.go | 70 +++++++++++++++++++++--------------------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/api/vpc_gcp_peering.go b/api/vpc_gcp_peering.go index 2b50bee..1e7d600 100644 --- a/api/vpc_gcp_peering.go +++ b/api/vpc_gcp_peering.go @@ -7,6 +7,41 @@ import ( "time" ) +// waitForGcpPeeringStatus: waits for the VPC peering status to be ACTIVE or until timed out +func (api *API) waitForGcpPeeringStatus(path, peerID string, + attempt, sleep, timeout int) error { + + var ( + data map[string]interface{} + err error + ) + + for { + time.Sleep(time.Duration(sleep) * time.Second) + if attempt*sleep > timeout { + return fmt.Errorf("wait until GCP VPC peering status reached timeout of %d seconds", timeout) + } + + attempt, data, err = api.readVpcGcpPeeringWithRetry(path, attempt, sleep, timeout) + if err != nil { + return err + } + rows := data["rows"].([]interface{}) + if len(rows) > 0 { + for _, row := range rows { + tempRow := row.(map[string]interface{}) + if tempRow["name"] != peerID { + continue + } + if tempRow["state"] == "ACTIVE" { + return nil + } + } + } + attempt++ + } +} + // RequestVpcGcpPeering: requests a VPC peering from an instance. func (api *API) RequestVpcGcpPeering(instanceID int, params map[string]interface{}, waitOnStatus bool, sleep, timeout int) (map[string]interface{}, error) { @@ -171,38 +206,3 @@ func (api *API) readVpcGcpInfoWithRetry(path string, attempt, sleep, timeout int return nil, fmt.Errorf("read VPC info failed, status: %v, message: %s", response.StatusCode, failed) } - -// waitForGcpPeeringStatus: waits for the VPC peering status to be ACTIVE or until timed out -func (api *API) waitForGcpPeeringStatus(path, peerID string, - attempt, sleep, timeout int) error { - - var ( - data map[string]interface{} - err error - ) - - for { - time.Sleep(time.Duration(sleep) * time.Second) - if attempt*sleep > timeout { - return fmt.Errorf("wait until GCP VPC peering status reached timeout of %d seconds", timeout) - } - - attempt, data, err = api.readVpcGcpPeeringWithRetry(path, attempt, sleep, timeout) - if err != nil { - return err - } - rows := data["rows"].([]interface{}) - if len(rows) > 0 { - for _, row := range rows { - tempRow := row.(map[string]interface{}) - if tempRow["name"] != peerID { - continue - } - if tempRow["state"] == "ACTIVE" { - return nil - } - } - } - attempt++ - } -} From 48d5ae362195fdb9f93a93c81c80ebc4138acf99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20Brod=C3=A9n?= Date: Fri, 15 Dec 2023 14:22:42 +0100 Subject: [PATCH 3/4] Use correct input, concat path and not just VPC ID --- api/vpc_gcp_peering_withvpcid.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/api/vpc_gcp_peering_withvpcid.go b/api/vpc_gcp_peering_withvpcid.go index 47dc7b7..c886b63 100644 --- a/api/vpc_gcp_peering_withvpcid.go +++ b/api/vpc_gcp_peering_withvpcid.go @@ -19,7 +19,10 @@ func (api *API) RequestVpcGcpPeeringWithVpcId(vpcID string, params map[string]in if waitOnStatus { log.Printf("[DEBUG] go-api::vpc_gcp_peering_withvpcid::request waiting for active state") - api.waitForGcpPeeringStatus(vpcID, data["peering"].(string), attempt, sleep, timeout) + err = api.waitForGcpPeeringStatus(path, data["peering"].(string), attempt, sleep, timeout) + if err != nil { + return nil, err + } } return data, nil From e5e1a523a4d0e4d4e0f256f25d5518b4ec36282f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20Brod=C3=A9n?= Date: Fri, 15 Dec 2023 14:23:04 +0100 Subject: [PATCH 4/4] Log number of attempts --- api/vpc_gcp_peering.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/api/vpc_gcp_peering.go b/api/vpc_gcp_peering.go index 1e7d600..823b8ed 100644 --- a/api/vpc_gcp_peering.go +++ b/api/vpc_gcp_peering.go @@ -26,6 +26,7 @@ func (api *API) waitForGcpPeeringStatus(path, peerID string, if err != nil { return err } + rows := data["rows"].([]interface{}) if len(rows) > 0 { for _, row := range rows { @@ -38,6 +39,8 @@ func (api *API) waitForGcpPeeringStatus(path, peerID string, } } } + log.Printf("[INFO] go-api::vpc_gcp_peering::waitForGcpPeeringStatus Waiting for state = ACTIVE "+ + "attempt %d until timeout: %d", attempt, (timeout - (attempt * sleep))) attempt++ } } @@ -54,7 +57,7 @@ func (api *API) RequestVpcGcpPeering(instanceID int, params map[string]interface if waitOnStatus { log.Printf("[DEBUG] go-api::vpc_gcp_peering_withvpcid::request waiting for active state") - err := api.waitForGcpPeeringStatus(path, data["peering"].(string), attempt, sleep, timeout) + err = api.waitForGcpPeeringStatus(path, data["peering"].(string), attempt, sleep, timeout) if err != nil { return nil, err }