Skip to content
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@

## Bugfixes

* Fix data loss issues when reading gzipped files with TextIO (Python) ([#18390](https://github.com/apache/beam/issues/18390), [#31040](https://github.com/apache/beam/issues/31040)).
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Security Fixes
Expand Down
25 changes: 23 additions & 2 deletions sdks/python/apache_beam/io/gcp/gcsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,8 +589,29 @@ def __init__(
blob,
chunk_size=DEFAULT_READ_BUFFER_SIZE,
enable_read_bucket_metric=False,
retry=DEFAULT_RETRY):
super().__init__(blob, chunk_size=chunk_size, retry=retry)
retry=DEFAULT_RETRY,
raw_download=True):
# By default, we always request to retrieve raw data from GCS even if the
# object meets the criteria of decompressive transcoding
# (https://cloud.google.com/storage/docs/transcoding).
super().__init__(
blob, chunk_size=chunk_size, retry=retry, raw_download=raw_download)
# TODO: Remove this after
# https://github.com/googleapis/python-storage/issues/1406 is fixed.
# As a workaround, we manually trigger a reload here. Otherwise, an internal
# call of reader.seek() will cause an exception if raw_download is set
# when initializing BlobReader(),
blob.reload()

# TODO: Currently there is a bug in GCS server side when a client requests
# a file with "content-encoding=gzip" and "content-type=application/gzip" or
# "content-type=application/x-gzip", which will lead to infinite loop.
# We skip the support of this type of files until the GCS bug is fixed.
# Internal bug id: 203845981.
if (blob.content_encoding == "gzip" and
blob.content_type in ["application/gzip", "application/x-gzip"]):
raise NotImplementedError("Doubly compressed files not supported.")

self.enable_read_bucket_metric = enable_read_bucket_metric
self.mode = "r"

Expand Down
71 changes: 71 additions & 0 deletions sdks/python/apache_beam/io/gcp/gcsio_integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,22 @@
import logging
import unittest
import uuid
import zlib

import mock
import pytest
from parameterized import parameterized
from parameterized import parameterized_class

from apache_beam import Create
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystems import FileSystems
from apache_beam.io.textio import ReadAllFromText
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms.combiners import Count

try:
from apache_beam.io.gcp import gcsio
Expand Down Expand Up @@ -230,6 +238,69 @@ def test_create_default_bucket(self, mock_default_gcs_bucket_name):
self.assertIsNone(self.gcsio.get_bucket(overridden_bucket_name))


class GcsIOReadGzipTest(unittest.TestCase):
gcs_path_prefix = "gs://apache-beam-samples/textio/"
gzip_test_files = [
"textio-test-data.content-type-gzip-content-encoding-gzip.1k.txt.gz",
"textio-test-data.content-type-gzip-content-encoding-none.1k.txt.gz",
"textio-test-data.content-type-none-content-encoding-gzip.1k.txt.gz",
"textio-test-data.content-type-none-content-encoding-none.1k.txt.gz",
"textio-test-data.content-type-text-content-encoding-gzip.1k.txt.gz",
"textio-test-data.content-type-text-content-encoding-none.1k.txt.gz",
"textio-test-data.default.1k.txt",
"textio-test-data.default.1k.txt.gz",
"textio-test-data.gzip-local.1k.txt.gz",
]

@parameterized.expand([
(gzip_test_files[0], CompressionTypes.UNCOMPRESSED, NotImplementedError),
(gzip_test_files[0], CompressionTypes.GZIP, NotImplementedError),
(gzip_test_files[0], CompressionTypes.AUTO, NotImplementedError),
(gzip_test_files[1], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[1], CompressionTypes.GZIP, None),
(gzip_test_files[1], CompressionTypes.AUTO, None),
(gzip_test_files[2], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[2], CompressionTypes.GZIP, None),
(gzip_test_files[2], CompressionTypes.AUTO, None),
(gzip_test_files[3], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[3], CompressionTypes.GZIP, None),
(gzip_test_files[3], CompressionTypes.AUTO, None),
(gzip_test_files[4], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[4], CompressionTypes.GZIP, None),
(gzip_test_files[4], CompressionTypes.AUTO, None),
(gzip_test_files[5], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[5], CompressionTypes.GZIP, None),
(gzip_test_files[5], CompressionTypes.AUTO, None),
(gzip_test_files[6], CompressionTypes.UNCOMPRESSED, None),
(gzip_test_files[6], CompressionTypes.GZIP, zlib.error),
(gzip_test_files[6], CompressionTypes.AUTO, None),
(gzip_test_files[7], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[7], CompressionTypes.GZIP, None),
(gzip_test_files[7], CompressionTypes.AUTO, None),
(gzip_test_files[8], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError),
(gzip_test_files[8], CompressionTypes.GZIP, None),
(gzip_test_files[8], CompressionTypes.AUTO, None),
])
@unittest.skipIf(NotFound is None, 'GCP dependencies are not installed')
def test_read_gzip_file(self, file_name, compression_type, exception):
p = TestPipeline(runner="Direct", is_integration_test=True)
r = (
p
| Create([f"{GcsIOReadGzipTest.gcs_path_prefix}{file_name}"])
| "Read File from GCS" >>
ReadAllFromText(compression_type=compression_type)
| Count.Globally())
assert_that(r, equal_to([1000]))

if exception is None:
result = p.run()
result.wait_until_finish()
else:
with self.assertRaises(exception):
result = p.run()
result.wait_until_finish()


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
5 changes: 5 additions & 0 deletions sdks/python/apache_beam/io/gcp/gcsio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ def __init__(
self._fail_when_getting_metadata = fail_when_getting_metadata
self._fail_when_reading = fail_when_reading
self.generation = random.randint(0, (1 << 63) - 1)
self.content_encoding = None
self.content_type = None

def reload(self):
pass

def delete(self):
self.bucket.delete_blob(self.name)
Expand Down
Loading