From 33f48cbe1f27db8c4a2d2ced6d6656ee8c1fb69b Mon Sep 17 00:00:00 2001 From: ksobrenat32 Date: Wed, 20 Aug 2025 08:00:25 -0600 Subject: [PATCH 1/6] Add the base log_analyzer --- infra/security/README.md | 84 ++++++++++ infra/security/config.yml | 43 +++++ infra/security/log_analyzer.py | 277 ++++++++++++++++++++++++++++++++ infra/security/requirements.txt | 19 +++ 4 files changed, 423 insertions(+) create mode 100644 infra/security/README.md create mode 100644 infra/security/config.yml create mode 100644 infra/security/log_analyzer.py create mode 100644 infra/security/requirements.txt diff --git a/infra/security/README.md b/infra/security/README.md new file mode 100644 index 000000000000..0e60c4b33043 --- /dev/null +++ b/infra/security/README.md @@ -0,0 +1,84 @@ + + +# GCP Security Analyzer + +This document describes the implementation of a security analyzer for Google Cloud Platform (GCP) resources. The analyzer is designed to enhance security monitoring within our GCP environment by capturing critical events and generating alerts for specific security-sensitive actions. + +## How It Works + +1. **Log Sinks**: The system uses [GCP Log Sinks](https://cloud.google.com/logging/docs/export/configure_export_v2) to capture specific security-related log entries. These sinks are configured to filter for events like IAM policy changes or service account key creation. +2. **Log Storage**: The filtered logs are routed to a dedicated Google Cloud Storage (GCS) bucket for persistence and analysis. +3. **Report Generation**: A scheduled job runs weekly, executing the `log_analyzer.py` script. +4. **Email Alerts**: The script analyzes the logs from the past week, compiles a summary of security events, and sends a report to a configured email address. + +## Configuration + +The behavior of the log analyzer is controlled by a `config.yml` file. Here’s an overview of the configuration options: + +- `project_id`: The GCP project ID where the resources are located. +- `bucket_name`: The name of the GCS bucket where logs will be stored. +- `logging`: Configures the logging level and format for the script. +- `sinks`: A list of log sinks to be created. Each sink has the following properties: + - `name`: A unique name for the sink. + - `description`: A brief description of what the sink monitors. + - `filter_methods`: A list of GCP API methods to include in the filter (e.g., `SetIamPolicy`). + - `excluded_principals`: A list of service accounts or user emails to exclude from monitoring, such as CI/CD service accounts. + +### Example Configuration (`config.yml`) + +```yaml +project_id: your-gcp-project-id +bucket_name: your-log-storage-bucket + +sinks: + - name: iam-policy-changes + description: Monitors changes to IAM policies. + filter_methods: + - "SetIamPolicy" + excluded_principals: + - "ci-cd-account@your-project.iam.gserviceaccount.com" +``` + +## Usage + +The `log_analyzer.py` script provides two main commands for managing the security analyzer. + +### Initializing Sinks + +To create or update the log sinks in GCP based on your `config.yml` file, run the following command: + +```bash +python log_analyzer.py --config config.yml initialize +``` + +This command ensures that the log sinks are correctly configured to capture the desired security events. + +### Generating Weekly Reports + +To generate and send the weekly security report, run this command: + +```bash +python log_analyzer.py --config config.yml generate-report +``` + +This is typically run as a scheduled job (GitHub Action) to automate the delivery of weekly security reports. + + + diff --git a/infra/security/config.yml b/infra/security/config.yml new file mode 100644 index 000000000000..9565623be16d --- /dev/null +++ b/infra/security/config.yml @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +project_id: testing-me-460223 + +# Logging +logging: + level: DEBUG + format: "[%(asctime)s] %(levelname)s: %(message)s" + +# gcloud storage bucket +bucket_name: "testing-me-460223-tfstate" + +# GCP Log sinks +sinks: + - name: iam-policy-changes + description: Monitors changes to IAM policies, excluding approved CI/CD service accounts. + filter_methods: + - "SetIamPolicy" + excluded_principals: + - beam-github-actions@apache-beam-testing.iam.gserviceaccount.com + - github-self-hosted-runners@apache-beam-testing.iam.gserviceaccount.com + + - name: sa-key-management + description: Monitors creation and deletion of service account keys. + filter_methods: + - "google.iam.admin.v1.IAM.CreateServiceAccountKey" + - "google.iam.admin.v1.IAM.DeleteServiceAccountKey" + excluded_principals: + - beam-github-actions@apache-beam-testing.iam.gserviceaccount.com + - github-self-hosted-runners@apache-beam-testing.iam.gserviceaccount.com diff --git a/infra/security/log_analyzer.py b/infra/security/log_analyzer.py new file mode 100644 index 000000000000..ea3efeb52bc7 --- /dev/null +++ b/infra/security/log_analyzer.py @@ -0,0 +1,277 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import ssl +import yaml +import logging +import smtplib +import os +from dataclasses import dataclass +from datetime import datetime, timedelta +from google.cloud import logging_v2 +from google.cloud import storage +from typing import List, Dict, Any +import argparse + +REPORT_SUBJECT = "Weekly IAM Security Events Report" +REPORT_BODY_TEMPLATE = """ +Hello Team, + +Please find below the summary of IAM security events for the past week: + +{event_summary} + +Best Regards, +Automated GitHub Action +""" + +@dataclass +class Sink: + name: str + description: str + filter_methods: List[str] + excluded_principals: List[str] + +class LogAnalyzer(): + def __init__(self, project_id: str, gcp_bucket: str, logger: logging.Logger, sinks: List[Sink]): + self.project_id = project_id + self.bucket = gcp_bucket + self.logger = logger + self.sinks = sinks + + def _construct_filter(self, sink: Sink) -> str: + """ + Constructs a filter string for a given sink. + + Args: + sink (Sink): The sink object containing filter information. + + Returns: + str: The constructed filter string. + """ + + method_filters = [] + for method in sink.filter_methods: + method_filters.append(f'protoPayload.methodName="{method}"') + + exclusion_filters = [] + for principal in sink.excluded_principals: + exclusion_filters.append(f'protoPayload.authenticationInfo.principalEmail != "{principal}"') + + if method_filters and exclusion_filters: + filter_ = f"({' OR '.join(method_filters)}) AND ({' AND '.join(exclusion_filters)})" + elif method_filters: + filter_ = f"({' OR '.join(method_filters)})" + elif exclusion_filters: + filter_ = f"({' AND '.join(exclusion_filters)})" + else: + filter_ = "" + + return filter_ + + def _create_log_sink(self, sink: Sink) -> None: + """ + Creates a log sink in GCP if it doesn't already exist. + If it already exists, it updates the sink with the new filter in case the filter has changed. + + Args: + sink (Sink): The sink object to create. + """ + logging_client = logging_v2.Client(project=self.project_id) + filter_ = self._construct_filter(sink) + destination = "storage.googleapis.com/{bucket}".format(bucket=self.bucket) + + new_sink = logging_client.sink(sink.name, filter_=filter_, destination=destination) + + if new_sink.exists(): + self.logger.debug(f"Sink {sink.name} already exists.") + old_sink = logging_client.sink(sink.name) + old_sink.reload() + if old_sink.filter_ != filter_: + old_sink.filter_ = filter_ + old_sink.update() + self.logger.info(f"Updated sink {sink.name}'s filter.") + else: + new_sink.create() + self.logger.info(f"Created sink {sink.name}.") + + logging_client.close() + + def initialize_sinks(self) -> None: + for sink in self.sinks: + self._create_log_sink(sink) + self.logger.info(f"Initialized sink: {sink.name}") + + def get_event_logs(self, days: int = 7) -> List[Dict[str, Any]]: + """ + Reads and retrieves log events from the specified time range from the GCP Cloud Storage bucket. + + Args: + days (int): The number of days to look back for log analysis. + + Returns: + List[Dict[str, Any]]: A list of log entries that match the specified time range. + """ + found_events = [] + storage_client = storage.Client(project=self.project_id) + + blobs = storage_client.list_blobs(self.bucket) + days_ago = datetime.now() - timedelta(days=days) + for blob in blobs: + if blob.time_created < days_ago: + continue + + self.logger.debug(f"Processing blob: {blob.name}") + content = blob.download_as_string().decode("utf-8") + + for line in content.splitlines(): + try: + log_entry = json.loads(line) + payload = log_entry.get("protoPayload", {}) + + event_details = { + "timestamp": log_entry.get("timestamp"), + "principal": payload.get("authenticationInfo", {}).get("principalEmail"), + "method": payload.get("methodName"), + "resource": payload.get("resourceName"), + "project_id": log_entry.get("resource", {}).get("labels", {}).get("project_id") + } + found_events.append(event_details) + except json.JSONDecodeError: + continue + + storage_client.close() + return found_events + + def create_weekly_email_report(self) -> None: + """ + Creates an email report based on the events found this week. + """ + events = self.get_event_logs(days=7) + if not events: + self.logger.info("No events found for the weekly report.") + return + + events.sort(key=lambda x: x['timestamp'], reverse=True) + event_summary = "\n".join( + f"Timestamp: {event['timestamp']}, Principal: {event['principal']}, Method: {event['method']}, Resource: {event['resource']}, Project ID: {event['project_id']}" + for event in events + ) + + report_subject = REPORT_SUBJECT + report_body = REPORT_BODY_TEMPLATE.format(event_summary=event_summary) + + self.send_email(report_subject, report_body) + + def send_email(self, subject: str, body: str) -> None: + """ + Sends an email with the specified subject and body. + If email configuration is not fully set, it prints the email instead. + + Args: + subject (str): The subject of the email. + body (str): The body of the email. + """ + smtp_server = os.getenv("SMTP_SERVER") + smtp_port_str = os.getenv("SMTP_PORT") + recipient = os.getenv("EMAIL_RECIPIENT") + email = os.getenv("EMAIL_ADDRESS") + password = os.getenv("EMAIL_PASSWORD") + + if not all([smtp_server, smtp_port_str, recipient, email, password]): + self.logger.warning("Email configuration is not fully set. Printing email instead.") + print(f"Subject: {subject}\n") + print(f"Body:\n{body}") + return + + assert smtp_server is not None + assert smtp_port_str is not None + assert recipient is not None + assert email is not None + assert password is not None + + message = f"Subject: {subject}\n\n{body}" + context = ssl.create_default_context() + + try: + smtp_port = int(smtp_port_str) + with smtplib.SMTP_SSL(smtp_server, smtp_port, context=context) as server: + server.login(email, password) + server.sendmail(email, recipient, message) + self.logger.info(f"Successfully sent email report to {recipient}") + except Exception as e: + self.logger.error(f"Failed to send email report: {e}") + +def load_config_from_yaml(config_path: str) -> Dict[str, Any]: + with open(config_path, 'r') as file: + config = yaml.safe_load(file) + + c = { + "project_id": config.get("project_id"), + "gcp_bucket": config.get("bucket_name"), + "sinks": [], + "logger": logging.getLogger(__name__) + } + + for sink_config in config.get("sinks", []): + sink = Sink( + name=sink_config["name"], + description=sink_config["description"], + filter_methods=sink_config.get("filter_methods", []), + excluded_principals=sink_config.get("excluded_principals", []) + ) + c["sinks"].append(sink) + + logging_config = config.get("logging", {}) + log_level = logging_config.get("level", "INFO") + log_format = logging_config.get("format", "[%(asctime)s] %(levelname)s: %(message)s") + + c["logger"].setLevel(log_level) + logging.basicConfig(level=log_level, format=log_format) + + return c + +def main(): + """ + Main entry point for the script. + """ + parser = argparse.ArgumentParser(description="GCP IAM Log Analyzer") + parser.add_argument("--config", required=True, help="Path to the configuration YAML file.") + + subparsers = parser.add_subparsers(dest="command", required=True) + + subparsers.add_parser("initialize", help="Initialize/update log sinks in GCP.") + subparsers.add_parser("generate-report", help="Generate and send the weekly IAM security report.") + + args = parser.parse_args() + + config = load_config_from_yaml(args.config) + log_analyzer = LogAnalyzer( + project_id=config["project_id"], + gcp_bucket=config["gcp_bucket"], + logger=config["logger"], + sinks=config["sinks"] + ) + + if args.command == "initialize": + log_analyzer.initialize_sinks() + log_analyzer.logger.info("Sinks initialized successfully.") + elif args.command == "generate-report": + log_analyzer.create_weekly_email_report() + log_analyzer.logger.info("Weekly report generation process completed.") + +if __name__ == "__main__": + main() diff --git a/infra/security/requirements.txt b/infra/security/requirements.txt new file mode 100644 index 000000000000..a4abb8bc5acf --- /dev/null +++ b/infra/security/requirements.txt @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +PyYAML==6.0.2 +google-cloud-storage==3.3.0 +google-cloud-logging==3.12.1 From 4486544924a85dc60b7b473259ee19fdbe839427 Mon Sep 17 00:00:00 2001 From: ksobrenat32 Date: Wed, 20 Aug 2025 08:25:19 -0600 Subject: [PATCH 2/6] Add github action for security logging --- .../beam_Infrastructure_SecurityLogging.yml | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 .github/workflows/beam_Infrastructure_SecurityLogging.yml diff --git a/.github/workflows/beam_Infrastructure_SecurityLogging.yml b/.github/workflows/beam_Infrastructure_SecurityLogging.yml new file mode 100644 index 000000000000..3d9c2f75cbf3 --- /dev/null +++ b/.github/workflows/beam_Infrastructure_SecurityLogging.yml @@ -0,0 +1,77 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# This workflow works with the GCP security log analyzer to +# generate weekly security reports and initialize log sinks + +name: GCP Security Log Analyzer + +on: + workflow_dispatch: + schedule: + # Once a week at 9:00 AM on Monday + - cron: '0 9 * * 1' + push: + paths: + - 'infra/security/config.yml' + +# This allows a subsequently queued workflow run to interrupt previous runs +concurrency: + group: '${{ github.workflow }} @ ${{ github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.sender.login }}' + cancel-in-progress: true + +#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event +permissions: + contents: read + +jobs: + beam_GCP_Security_LogAnalyzer: + name: GCP Security Log Analysis + runs-on: [self-hosted, ubuntu-20.04, main] + timeout-minutes: 30 + steps: + - uses: actions/checkout@v4 + + - name: Setup Python + uses: actions/setup-python@v4 + with: + python-version: '3.13' + + - name: Install Python dependencies + working-directory: ./infra/security + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Setup gcloud + uses: google-github-actions/setup-gcloud@v2 + + - name: Initialize Log Sinks + if: github.event_name == 'push' || github.event_name == 'workflow_dispatch' + working-directory: ./infra/security + run: python log_analyzer.py --config config.yml initialize + + - name: Generate Weekly Security Report + if: github.event_name == 'schedule' || github.event_name == 'workflow_dispatch' + working-directory: ./infra/security + env: + SMTP_SERVER: smtp.gmail.com + SMTP_PORT: 465 + EMAIL_ADDRESS: ${{ secrets.ISSUE_REPORT_SENDER_EMAIL_ADDRESS }} + EMAIL_PASSWORD: ${{ secrets.ISSUE_REPORT_SENDER_EMAIL_PASSWORD }} + EMAIL_RECIPIENT: "dev@beam.apache.org" + run: python log_analyzer.py --config config.yml generate-report From 4f8a48e95b9be32e2a610cb94958a7214ee25204 Mon Sep 17 00:00:00 2001 From: ksobrenat32 Date: Wed, 27 Aug 2025 10:19:21 -0600 Subject: [PATCH 3/6] Enhance LogAnalyzer to filter logs by time range and include file names in event summary --- infra/security/log_analyzer.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/infra/security/log_analyzer.py b/infra/security/log_analyzer.py index ea3efeb52bc7..9215b098fb97 100644 --- a/infra/security/log_analyzer.py +++ b/infra/security/log_analyzer.py @@ -128,10 +128,13 @@ def get_event_logs(self, days: int = 7) -> List[Dict[str, Any]]: found_events = [] storage_client = storage.Client(project=self.project_id) + now = datetime.now() + end_time = datetime(now.year, now.month, now.day, now.hour, 0) - timedelta(minutes=30) + start_time = end_time - timedelta(days=days) + blobs = storage_client.list_blobs(self.bucket) - days_ago = datetime.now() - timedelta(days=days) for blob in blobs: - if blob.time_created < days_ago: + if not (start_time <= blob.time_created.replace(tzinfo=None) < end_time): continue self.logger.debug(f"Processing blob: {blob.name}") @@ -147,7 +150,8 @@ def get_event_logs(self, days: int = 7) -> List[Dict[str, Any]]: "principal": payload.get("authenticationInfo", {}).get("principalEmail"), "method": payload.get("methodName"), "resource": payload.get("resourceName"), - "project_id": log_entry.get("resource", {}).get("labels", {}).get("project_id") + "project_id": log_entry.get("resource", {}).get("labels", {}).get("project_id"), + "file_name": blob.name } found_events.append(event_details) except json.JSONDecodeError: @@ -167,7 +171,7 @@ def create_weekly_email_report(self) -> None: events.sort(key=lambda x: x['timestamp'], reverse=True) event_summary = "\n".join( - f"Timestamp: {event['timestamp']}, Principal: {event['principal']}, Method: {event['method']}, Resource: {event['resource']}, Project ID: {event['project_id']}" + f"Timestamp: {event['timestamp']}, Principal: {event['principal']}, Method: {event['method']}, Resource: {event['resource']}, Project ID: {event['project_id']}, File: {event['file_name']}" for event in events ) From cc257b42729476f98869555528de0d3f7ee6aa33 Mon Sep 17 00:00:00 2001 From: ksobrenat32 Date: Wed, 27 Aug 2025 10:31:20 -0600 Subject: [PATCH 4/6] Add dry-run option for weekly email report generation in LogAnalyzer --- .../beam_Infrastructure_SecurityLogging.yml | 2 +- infra/security/log_analyzer.py | 14 +++++++++++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/.github/workflows/beam_Infrastructure_SecurityLogging.yml b/.github/workflows/beam_Infrastructure_SecurityLogging.yml index 3d9c2f75cbf3..c364056f5683 100644 --- a/.github/workflows/beam_Infrastructure_SecurityLogging.yml +++ b/.github/workflows/beam_Infrastructure_SecurityLogging.yml @@ -74,4 +74,4 @@ jobs: EMAIL_ADDRESS: ${{ secrets.ISSUE_REPORT_SENDER_EMAIL_ADDRESS }} EMAIL_PASSWORD: ${{ secrets.ISSUE_REPORT_SENDER_EMAIL_PASSWORD }} EMAIL_RECIPIENT: "dev@beam.apache.org" - run: python log_analyzer.py --config config.yml generate-report + run: python log_analyzer.py --config config.yml generate-report --dry-run diff --git a/infra/security/log_analyzer.py b/infra/security/log_analyzer.py index 9215b098fb97..9884f3dc526e 100644 --- a/infra/security/log_analyzer.py +++ b/infra/security/log_analyzer.py @@ -160,9 +160,10 @@ def get_event_logs(self, days: int = 7) -> List[Dict[str, Any]]: storage_client.close() return found_events - def create_weekly_email_report(self) -> None: + def create_weekly_email_report(self, dry_run: bool = False) -> None: """ Creates an email report based on the events found this week. + If `dry_run` is True, it will print the report to the console instead of sending it. """ events = self.get_event_logs(days=7) if not events: @@ -178,6 +179,12 @@ def create_weekly_email_report(self) -> None: report_subject = REPORT_SUBJECT report_body = REPORT_BODY_TEMPLATE.format(event_summary=event_summary) + if dry_run: + self.logger.info("Dry run: printing email report to console.") + print(f"Subject: {report_subject}\n") + print(f"Body:\n{report_body}") + return + self.send_email(report_subject, report_body) def send_email(self, subject: str, body: str) -> None: @@ -258,7 +265,8 @@ def main(): subparsers = parser.add_subparsers(dest="command", required=True) subparsers.add_parser("initialize", help="Initialize/update log sinks in GCP.") - subparsers.add_parser("generate-report", help="Generate and send the weekly IAM security report.") + report_parser = subparsers.add_parser("generate-report", help="Generate and send the weekly IAM security report.") + report_parser.add_argument("--dry-run", action="store_true", help="Do not send email, print report to console.") args = parser.parse_args() @@ -274,7 +282,7 @@ def main(): log_analyzer.initialize_sinks() log_analyzer.logger.info("Sinks initialized successfully.") elif args.command == "generate-report": - log_analyzer.create_weekly_email_report() + log_analyzer.create_weekly_email_report(dry_run=args.dry_run) log_analyzer.logger.info("Weekly report generation process completed.") if __name__ == "__main__": From 97bc128a92a8a47a9185e23b0e6600506ed88b2d Mon Sep 17 00:00:00 2001 From: ksobrenat32 Date: Wed, 27 Aug 2025 10:46:30 -0600 Subject: [PATCH 5/6] Better error handling for timezones and missing details --- infra/security/log_analyzer.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/infra/security/log_analyzer.py b/infra/security/log_analyzer.py index 9884f3dc526e..0bdf09385097 100644 --- a/infra/security/log_analyzer.py +++ b/infra/security/log_analyzer.py @@ -20,7 +20,7 @@ import smtplib import os from dataclasses import dataclass -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from google.cloud import logging_v2 from google.cloud import storage from typing import List, Dict, Any @@ -128,33 +128,37 @@ def get_event_logs(self, days: int = 7) -> List[Dict[str, Any]]: found_events = [] storage_client = storage.Client(project=self.project_id) - now = datetime.now() - end_time = datetime(now.year, now.month, now.day, now.hour, 0) - timedelta(minutes=30) + now = datetime.now(timezone.utc) + end_time = now.replace(minute=0, second=0, microsecond=0) - timedelta(minutes=30) start_time = end_time - timedelta(days=days) blobs = storage_client.list_blobs(self.bucket) for blob in blobs: - if not (start_time <= blob.time_created.replace(tzinfo=None) < end_time): + if not (start_time <= blob.time_created < end_time): continue self.logger.debug(f"Processing blob: {blob.name}") content = blob.download_as_string().decode("utf-8") - for line in content.splitlines(): + for num, line in enumerate(content.splitlines(), 1): try: log_entry = json.loads(line) - payload = log_entry.get("protoPayload", {}) + payload = log_entry.get("protoPayload") + if not payload: + self.logger.warning(f"Skipping log in blob {blob.name}, line {num}: no protoPayload found.") + continue event_details = { - "timestamp": log_entry.get("timestamp"), - "principal": payload.get("authenticationInfo", {}).get("principalEmail"), - "method": payload.get("methodName"), - "resource": payload.get("resourceName"), - "project_id": log_entry.get("resource", {}).get("labels", {}).get("project_id"), + "timestamp": log_entry.get("timestamp", "N/A"), + "principal": payload.get("authenticationInfo", {}).get("principalEmail", "N/A"), + "method": payload.get("methodName", "N/A"), + "resource": payload.get("resourceName", "N/A"), + "project_id": log_entry.get("resource", {}).get("labels", {}).get("project_id", "N/A"), "file_name": blob.name } found_events.append(event_details) except json.JSONDecodeError: + self.logger.warning(f"Skipping invalid JSON log in blob {blob.name}, line {num}.") continue storage_client.close() From c4f57920718c47dfa1d2ffc79847a7a123014fc0 Mon Sep 17 00:00:00 2001 From: ksobrenat32 Date: Wed, 27 Aug 2025 12:25:20 -0600 Subject: [PATCH 6/6] Refactor LogAnalyzer to use SinkCls for type consistency and enhance bucket permission management for log sinks --- infra/security/log_analyzer.py | 66 +++++++++++++++++++++++++++------- 1 file changed, 53 insertions(+), 13 deletions(-) diff --git a/infra/security/log_analyzer.py b/infra/security/log_analyzer.py index 0bdf09385097..55ab4495e24f 100644 --- a/infra/security/log_analyzer.py +++ b/infra/security/log_analyzer.py @@ -39,20 +39,20 @@ """ @dataclass -class Sink: +class SinkCls: name: str description: str filter_methods: List[str] excluded_principals: List[str] class LogAnalyzer(): - def __init__(self, project_id: str, gcp_bucket: str, logger: logging.Logger, sinks: List[Sink]): + def __init__(self, project_id: str, gcp_bucket: str, logger: logging.Logger, sinks: List[SinkCls]): self.project_id = project_id self.bucket = gcp_bucket self.logger = logger self.sinks = sinks - def _construct_filter(self, sink: Sink) -> str: + def _construct_filter(self, sink: SinkCls) -> str: """ Constructs a filter string for a given sink. @@ -82,7 +82,7 @@ def _construct_filter(self, sink: Sink) -> str: return filter_ - def _create_log_sink(self, sink: Sink) -> None: + def _create_log_sink(self, sink: SinkCls) -> None: """ Creates a log sink in GCP if it doesn't already exist. If it already exists, it updates the sink with the new filter in case the filter has changed. @@ -94,22 +94,62 @@ def _create_log_sink(self, sink: Sink) -> None: filter_ = self._construct_filter(sink) destination = "storage.googleapis.com/{bucket}".format(bucket=self.bucket) - new_sink = logging_client.sink(sink.name, filter_=filter_, destination=destination) + sink_client = logging_client.sink(sink.name, filter_=filter_, destination=destination) - if new_sink.exists(): + if sink_client.exists(): self.logger.debug(f"Sink {sink.name} already exists.") - old_sink = logging_client.sink(sink.name) - old_sink.reload() - if old_sink.filter_ != filter_: - old_sink.filter_ = filter_ - old_sink.update() + sink_client.reload() + if sink_client.filter_ != filter_: + sink_client.filter_ = filter_ + sink_client.update() self.logger.info(f"Updated sink {sink.name}'s filter.") else: - new_sink.create() + sink_client.create() self.logger.info(f"Created sink {sink.name}.") + # Reload the sink to get the writer_identity, this may take a few moments + sink_client.reload() + + self._grant_bucket_permissions(sink_client) logging_client.close() + def _grant_bucket_permissions(self, sink: logging_v2.Sink) -> None: + """ + Grants a log sink's writer identity permissions to write to the bucket. + """ + logging_client = logging_v2.Client(project=self.project_id) + storage_client = storage.Client(project=self.project_id) + + sink.reload() + writer_identity = sink.writer_identity + if not writer_identity: + self.logger.warning(f"Could not retrieve writer identity for sink {sink.name}. " + f"Manual permission granting might be required.") + return + + bucket = storage_client.get_bucket(self.bucket) + policy = bucket.get_iam_policy(requested_policy_version=3) + iam_role = "roles/storage.objectCreator" + + # Workaround for projects where the writer_identity is not a valid service account. + if writer_identity == "serviceAccount:cloud-logs@system.gserviceaccount.com": + member = "group:cloud-logs@google.com" + else: + member = f"serviceAccount:{writer_identity}" + + # Check if the policy is already configured + if any(member in b.get("members", []) and b.get("role") == iam_role for b in policy.bindings): + self.logger.debug(f"Sink {sink.name} already has the necessary permissions.") + return + + policy.bindings.append({ + "role": iam_role, + "members": {member} + }) + + bucket.set_iam_policy(policy) + self.logger.info(f"Granted {iam_role} to {member} on bucket {self.bucket} for sink {sink.name}.") + def initialize_sinks(self) -> None: for sink in self.sinks: self._create_log_sink(sink) @@ -242,7 +282,7 @@ def load_config_from_yaml(config_path: str) -> Dict[str, Any]: } for sink_config in config.get("sinks", []): - sink = Sink( + sink = SinkCls( name=sink_config["name"], description=sink_config["description"], filter_methods=sink_config.get("filter_methods", []),