diff --git a/examples/notebooks/tour-of-beam/getting-started.ipynb b/examples/notebooks/tour-of-beam/getting-started.ipynb index 7155e7632601..b93fa0704c5d 100644 --- a/examples/notebooks/tour-of-beam/getting-started.ipynb +++ b/examples/notebooks/tour-of-beam/getting-started.ipynb @@ -246,7 +246,7 @@ "id": "h0UUmpwRADqA" }, "source": [ - "> ℹ️  In Beam, you can __NOT__ access the elements from a `PCollection` directly like a Python list.\n", + "> ℹ️ In Beam, you can __NOT__ access the elements from a `PCollection` directly like a Python list.\n", "> This means, we can't simply `print` the output `PCollection` to see the elements.\n", ">\n", "> This is because, depending on the runner,\n", @@ -322,7 +322,7 @@ "\n", "`map` takes a _function_ that transforms a single input `a` into a single output `b`.\n", "\n", - "> ℹ️ -- For example, we want to multiply each element by 2." + "> ℹ️ For example, we want to multiply each element by 2." ] }, { @@ -441,7 +441,7 @@ "id": "Q06jvwmqOzer" }, "source": [ - "> ℹ️  Now that we know how `Map` works, we can see what's happening when we print the elements.\n", + "> ℹ️ Now that we know how `Map` works, we can see what's happening when we print the elements.\n", ">\n", "> We have our outputs stored in the `outputs` `PCollection`, so we _pipe_ it to a `Map` transform to apply the\n", "> [`print`](https://docs.python.org/3/library/functions.html#print)\n", @@ -473,7 +473,7 @@ "`flatMap` takes a function that transforms a single input `a` into an `iterable` of outputs `b`.\n", "But we get a _single collection_ containing the outputs of _all_ the elements.\n", "\n", - "> ℹ️ -- For example, we want to have as many elements as the element's value.\n", + "> ℹ️ For example, we want to have as many elements as the element's value.\n", "> For a value `1` we want one element, and three elements for a value `3`." ] }, @@ -608,7 +608,7 @@ "`filter` takes a function that checks a single element `a`,\n", "and returns `True` to keep the element, or `False` to discard it.\n", "\n", - "> ℹ️ -- For example, we only want to keep number that are *even*, or divisible by two.\n", + "> ℹ️ For example, we only want to keep number that are *even*, or divisible by two.\n", "> We can use the\n", "> [modulo operator `%`](https://en.wikipedia.org/wiki/Modulo_operation)\n", "> for a simple check." @@ -729,7 +729,7 @@ "\n", "Other common names for this function are `fold` and `reduce`.\n", "\n", - "> ℹ️ -- For example, we want to add all numbers together." + "> ℹ️ For example, we want to add all numbers together." ] }, { @@ -851,7 +851,7 @@ "id": "pFb98ioSp9YU" }, "source": [ - "> ℹ️  There are many ways to combine values in Beam.\n", + "> ℹ️ There are many ways to combine values in Beam.\n", "> You could even combine them into a different data type by defining a custom `CombineFn`.\n", ">\n", "> You can learn more about them by checking the available\n", @@ -874,7 +874,7 @@ "but instead of replacing the value on a \"duplicate\" key,\n", "you would get a list of all the values associated with that key.\n", "\n", - "> ℹ️ -- For example, we want to group each animal with the list of foods they like, and we start with `(animal, food)` pairs." + "> ℹ️ For example, we want to group each animal with the list of foods they like, and we start with `(animal, food)` pairs." ] }, { @@ -1006,10 +1006,15 @@ "source": [ "# What's next?\n", "\n", - "* [Transform catalog](https://beam.apache.org/documentation/transforms/python/overview/) -- check out all the available transforms\n", - "* [Using I/O transforms](https://beam.apache.org/documentation/programming-guide/#pipeline-io) -- learn how to read/write data to/from external sources and sinks like text files or databases\n", - "* [Mobile gaming example](https://beam.apache.org/get-started/mobile-gaming-example/) -- learn more about windowing, triggers, and streaming through a complete example pipeline\n", - "* [Runners](https://beam.apache.org/documentation/runners/capability-matrix/) -- check the available runners, their capabilities, and how to run your pipeline in them" + "* ![Open in Colab](https://github.com/googlecolab/open_in_colab/raw/main/images/icon16.png)\n", + " [Reading and writing data](https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb) --\n", + " how to read and write data to and from different data formats. \n", + "* [Transform catalog](https://beam.apache.org/documentation/transforms/python/overview) --\n", + " check out all the available transforms.\n", + "* [Mobile gaming example](https://beam.apache.org/get-started/mobile-gaming-example) --\n", + " learn more about windowing, triggers, and streaming through a complete example pipeline.\n", + "* [Runners](https://beam.apache.org/documentation/runners/capability-matrix) --\n", + " check the available runners, their capabilities, and how to run your pipeline in them." ] } ] diff --git a/examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb b/examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb new file mode 100644 index 000000000000..a3892ea16626 --- /dev/null +++ b/examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb @@ -0,0 +1,941 @@ +{ + "nbformat": 4, + "nbformat_minor": 0, + "metadata": { + "colab": { + "name": "Reading and writing data -- Tour of Beam", + "provenance": [], + "collapsed_sections": [], + "toc_visible": true + }, + "kernelspec": { + "name": "python3", + "display_name": "Python 3" + } + }, + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "id": "view-in-github", + "colab_type": "text" + }, + "source": [ + "\"Open" + ] + }, + { + "cell_type": "code", + "metadata": { + "cellView": "form", + "id": "upmJn_DjcThx" + }, + "source": [ + "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n", + "\n", + "# Licensed to the Apache Software Foundation (ASF) under one\n", + "# or more contributor license agreements. See the NOTICE file\n", + "# distributed with this work for additional information\n", + "# regarding copyright ownership. The ASF licenses this file\n", + "# to you under the Apache License, Version 2.0 (the\n", + "# \"License\"); you may not use this file except in compliance\n", + "# with the License. You may obtain a copy of the License at\n", + "#\n", + "# http://www.apache.org/licenses/LICENSE-2.0\n", + "#\n", + "# Unless required by applicable law or agreed to in writing,\n", + "# software distributed under the License is distributed on an\n", + "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", + "# KIND, either express or implied. See the License for the\n", + "# specific language governing permissions and limitations\n", + "# under the License." + ], + "execution_count": 95, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "5UC_aGanx6oE" + }, + "source": [ + "# Reading and writing data -- _Tour of Beam_\n", + "\n", + "So far we've learned some of the basic transforms like\n", + "[`Map`](https://beam.apache.org/documentation/transforms/python/elementwise/map),\n", + "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap),\n", + "[`Filter`](https://beam.apache.org/documentation/transforms/python/elementwise/filter),\n", + "[`Combine`](https://beam.apache.org/documentation/transforms/python/aggregation/combineglobally), and\n", + "[`GroupByKey`](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey).\n", + "These allow us to transform data in any way, but so far we've used\n", + "[`Create`](https://beam.apache.org/documentation/transforms/python/other/create)\n", + "to get data from an in-memory\n", + "[`iterable`](https://docs.python.org/3/glossary.html#term-iterable), like a `list`.\n", + "\n", + "This works well for experimenting with small datasets. For larger datasets we can use `Source` transforms to read data and `Sink` transforms to write data.\n", + "If there are no built-in `Source` or `Sink` transforms, we can also easily create our custom I/O transforms.\n", + "\n", + "Let's create some data files and see how we can read them in Beam." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "R_Yhoc6N_Flg" + }, + "source": [ + "# Install apache-beam with pip.\n", + "!pip install --quiet apache-beam\n", + "\n", + "# Create a directory for our data files.\n", + "!mkdir -p data" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "sQUUi4H9s-g2" + }, + "source": [ + "%%writefile data/my-text-file-1.txt\n", + "This is just a plain text file, UTF-8 strings are allowed 🎉.\n", + "Each line in the file is one element in the PCollection." + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "BWVVeTSOlKug" + }, + "source": [ + "%%writefile data/my-text-file-2.txt\n", + "There are no guarantees on the order of the elements.\n", + "ฅ^•ﻌ•^ฅ" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "NhCws6ncbDJG" + }, + "source": [ + "%%writefile data/penguins.csv\n", + "species,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g\n", + "0,0.2545454545454545,0.6666666666666666,0.15254237288135594,0.2916666666666667\n", + "0,0.26909090909090905,0.5119047619047618,0.23728813559322035,0.3055555555555556\n", + "1,0.5236363636363636,0.5714285714285713,0.3389830508474576,0.2222222222222222\n", + "1,0.6509090909090909,0.7619047619047619,0.4067796610169492,0.3333333333333333\n", + "2,0.509090909090909,0.011904761904761862,0.6610169491525424,0.5\n", + "2,0.6509090909090909,0.38095238095238104,0.9830508474576272,0.8333333333333334" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "_OkWHiAvpWDZ" + }, + "source": [ + "# Reading from text files\n", + "\n", + "We can use the\n", + "[`ReadFromText`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.ReadFromText)\n", + "transform to read text files into `str` elements.\n", + "\n", + "It takes a\n", + "[_glob pattern_](https://en.wikipedia.org/wiki/Glob_%28programming%29)\n", + "as an input, and reads all the files that match that pattern.\n", + "It returns one element for each line in the file.\n", + "\n", + "For example, in the pattern `data/*.txt`, the `*` is a wildcard that matches anything. This pattern matches all the files in the `data/` directory with a `.txt` extension." + ] + }, + { + "cell_type": "code", + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "xDXdE9uysriw", + "outputId": "f5d58b5d-892a-4a42-89c5-b78f1d329cf3" + }, + "source": [ + "import apache_beam as beam\n", + "\n", + "input_files = 'data/*.txt'\n", + "with beam.Pipeline() as pipeline:\n", + " (\n", + " pipeline\n", + " | 'Read files' >> beam.io.ReadFromText(input_files)\n", + " | 'Print contents' >> beam.Map(print)\n", + " )" + ], + "execution_count": 96, + "outputs": [ + { + "output_type": "stream", + "text": [ + "There are no guarantees on the order of the elements.\n", + "ฅ^•ﻌ•^ฅ\n", + "This is just a plain text file, UTF-8 strings are allowed 🎉.\n", + "Each line in the file is one element in the PCollection.\n" + ], + "name": "stdout" + } + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "9-2wmzEWsdrb" + }, + "source": [ + "# Writing to text files\n", + "\n", + "We can use the\n", + "[`WriteToText`](https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText) transform to write `str` elements into text files.\n", + "\n", + "It takes a _file path prefix_ as an input, and it writes the all `str` elements into one or more files with filenames starting with that prefix. You can optionally pass a `file_name_suffix` as well, usually used for the file extension. Each element goes into its own line in the output files." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "nkPlfoTfz61I" + }, + "source": [ + "import apache_beam as beam\n", + "\n", + "output_file_name_prefix = 'outputs/file'\n", + "with beam.Pipeline() as pipeline:\n", + " (\n", + " pipeline\n", + " | 'Create file lines' >> beam.Create([\n", + " 'Each element must be a string.',\n", + " 'It writes one element per line.',\n", + " 'There are no guarantees on the line order.',\n", + " 'The data might be written into multiple files.',\n", + " ])\n", + " | 'Write to files' >> beam.io.WriteToText(\n", + " output_file_name_prefix,\n", + " file_name_suffix='.txt')\n", + " )" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "8au0yJSd1itt", + "outputId": "d7e72785-9fa8-4a2b-c6d0-4735aac8e206" + }, + "source": [ + "# Lets look at the output files and contents.\n", + "!head outputs/file*.txt" + ], + "execution_count": 98, + "outputs": [ + { + "output_type": "stream", + "text": [ + "Each element must be a string.\n", + "It writes one element per line.\n", + "There are no guarantees on the line order.\n", + "The data might be written into multiple files.\n" + ], + "name": "stdout" + } + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "21CCdZispqYK" + }, + "source": [ + "# Reading data\n", + "\n", + "Your data might reside in various input formats. Take a look at the\n", + "[Built-in I/O Transforms](https://beam.apache.org/documentation/io/built-in)\n", + "page for a list of all the available I/O transforms in Beam.\n", + "\n", + "If none of those work for you, you might need to create your own input transform.\n", + "\n", + "> ℹ️ For a more in-depth guide, take a look at the\n", + "[Developing a new I/O connector](https://beam.apache.org/documentation/io/developing-io-overview) page." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "7dQEym1QRG4y" + }, + "source": [ + "## Reading from an `iterable`\n", + "\n", + "The easiest way to create elements is using\n", + "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap).\n", + "\n", + "A common way is having a [`generator`](https://docs.python.org/3/glossary.html#term-generator) function. This could take an input and _expand_ it into a large amount of elements. The nice thing about `generator`s is that they don't have to fit everything into memory like a `list`, they simply\n", + "[`yield`](https://docs.python.org/3/reference/simple_stmts.html#yield)\n", + "elements as they process them.\n", + "\n", + "For example, let's define a `generator` called `count`, that `yield`s the numbers from `0` to `n`. We use `Create` for the initial `n` value(s) and then exapand them with `FlatMap`." + ] + }, + { + "cell_type": "code", + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "wR6WY6wOMVhb", + "outputId": "232e9fb3-4054-4eaf-9bd0-1adc4435b220" + }, + "source": [ + "import apache_beam as beam\n", + "\n", + "def count(n):\n", + " for i in range(n):\n", + " yield i\n", + "\n", + "n = 5\n", + "with beam.Pipeline() as pipeline:\n", + " (\n", + " pipeline\n", + " | 'Create inputs' >> beam.Create([n])\n", + " | 'Generate elements' >> beam.FlatMap(count)\n", + " | 'Print elements' >> beam.Map(print)\n", + " )" + ], + "execution_count": 8, + "outputs": [ + { + "output_type": "stream", + "text": [ + "0\n", + "1\n", + "2\n", + "3\n", + "4\n" + ], + "name": "stdout" + } + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "G4fw7NE1RQNf" + }, + "source": [ + "## Creating an input transform\n", + "\n", + "For a nicer interface, we could abstract the `Create` and the `FlatMap` into a custom `PTransform`. This would give a more intuitive way to use it, while hiding the inner workings.\n", + "\n", + "We create a new class that inherits from `beam.PTransform`. Any input from the generator function, like `n`, becomes a class field. The generator function itself would now become a\n", + "[`staticmethod`](https://docs.python.org/3/library/functions.html#staticmethod).\n", + "And we can hide the `Create` and `FlatMap` in the `expand` method.\n", + "\n", + "Now we can use our transform in a more intuitive way, just like `ReadFromText`." + ] + }, + { + "cell_type": "code", + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "m8iXqE1CRnn5", + "outputId": "019f3b32-74c5-4860-edee-1c8553f200bb" + }, + "source": [ + "import apache_beam as beam\n", + "\n", + "class Count(beam.PTransform):\n", + " def __init__(self, n):\n", + " self.n = n\n", + "\n", + " @staticmethod\n", + " def count(n):\n", + " for i in range(n):\n", + " yield i\n", + "\n", + " def expand(self, pcollection):\n", + " return (\n", + " pcollection\n", + " | 'Create inputs' >> beam.Create([self.n])\n", + " | 'Generate elements' >> beam.FlatMap(Count.count)\n", + " )\n", + "\n", + "n = 5\n", + "with beam.Pipeline() as pipeline:\n", + " (\n", + " pipeline\n", + " | f'Count to {n}' >> Count(n)\n", + " | 'Print elements' >> beam.Map(print)\n", + " )" + ], + "execution_count": 9, + "outputs": [ + { + "output_type": "stream", + "text": [ + "0\n", + "1\n", + "2\n", + "3\n", + "4\n" + ], + "name": "stdout" + } + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "e02_vFmUg-mK" + }, + "source": [ + "## Example: Reading CSV files\n", + "\n", + "Lets say we want to read CSV files to get elements as Python dictionaries. We like how `ReadFromText` expands a file pattern, but we might want to allow for multiple patterns as well.\n", + "\n", + "We create a `ReadCsvFiles` transform, which takes a list of `file_patterns` as input. It expands all the `glob` patterns, and then, for each file name it reads each row as a `dict` using the\n", + "[`csv.DictReader`](https://docs.python.org/3/library/csv.html#csv.DictReader) module." + ] + }, + { + "cell_type": "code", + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "ywVbJxegaZbo", + "outputId": "5e0adfa3-e685-4fe0-b6b7-bfa3d8469da1" + }, + "source": [ + "import apache_beam as beam\n", + "import csv\n", + "import glob\n", + "\n", + "class ReadCsvFiles(beam.PTransform):\n", + " def __init__(self, file_patterns):\n", + " self.file_patterns = file_patterns\n", + "\n", + " @staticmethod\n", + " def read_csv_lines(file_name):\n", + " with open(file_name, 'r') as f:\n", + " for row in csv.DictReader(f):\n", + " yield dict(row)\n", + "\n", + " def expand(self, pcollection):\n", + " return (\n", + " pcollection\n", + " | 'Create file patterns' >> beam.Create(self.file_patterns)\n", + " | 'Expand file patterns' >> beam.FlatMap(glob.glob)\n", + " | 'Read CSV lines' >> beam.FlatMap(self.read_csv_lines)\n", + " )\n", + "\n", + "input_patterns = ['data/*.csv']\n", + "with beam.Pipeline() as pipeline:\n", + " (\n", + " pipeline\n", + " | 'Read CSV files' >> ReadCsvFiles(input_patterns)\n", + " | 'Print elements' >> beam.Map(print)\n", + " )" + ], + "execution_count": 86, + "outputs": [ + { + "output_type": "stream", + "text": [ + "{'species': '0', 'culmen_length_mm': '0.2545454545454545', 'culmen_depth_mm': '0.6666666666666666', 'flipper_length_mm': '0.15254237288135594', 'body_mass_g': '0.2916666666666667'}\n", + "{'species': '0', 'culmen_length_mm': '0.26909090909090905', 'culmen_depth_mm': '0.5119047619047618', 'flipper_length_mm': '0.23728813559322035', 'body_mass_g': '0.3055555555555556'}\n", + "{'species': '1', 'culmen_length_mm': '0.5236363636363636', 'culmen_depth_mm': '0.5714285714285713', 'flipper_length_mm': '0.3389830508474576', 'body_mass_g': '0.2222222222222222'}\n", + "{'species': '1', 'culmen_length_mm': '0.6509090909090909', 'culmen_depth_mm': '0.7619047619047619', 'flipper_length_mm': '0.4067796610169492', 'body_mass_g': '0.3333333333333333'}\n", + "{'species': '2', 'culmen_length_mm': '0.509090909090909', 'culmen_depth_mm': '0.011904761904761862', 'flipper_length_mm': '0.6610169491525424', 'body_mass_g': '0.5'}\n", + "{'species': '2', 'culmen_length_mm': '0.6509090909090909', 'culmen_depth_mm': '0.38095238095238104', 'flipper_length_mm': '0.9830508474576272', 'body_mass_g': '0.8333333333333334'}\n" + ], + "name": "stdout" + } + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "ZyzB_RO9Vs1D" + }, + "source": [ + "## Example: Reading from a SQLite database\n", + "\n", + "Lets begin by creating a small SQLite local database file.\n", + "\n", + "Run the _\"Creating the SQLite database\"_ cell to create a new SQLite3 database with the filename you choose. You can double-click it to see the source code if you want." + ] + }, + { + "cell_type": "code", + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "EJ58A0AoV02o", + "cellView": "form", + "outputId": "7025eb26-409d-4212-bd10-a3bccbb2679f" + }, + "source": [ + "#@title Creating the SQLite database\n", + "import sqlite3\n", + "\n", + "databse_file = \"moon-phases.db\" #@param {type:\"string\"}\n", + "\n", + "with sqlite3.connect(databse_file) as db:\n", + " cursor = db.cursor()\n", + "\n", + " # Create the moon_phases table.\n", + " cursor.execute('''\n", + " CREATE TABLE IF NOT EXISTS moon_phases (\n", + " id INTEGER PRIMARY KEY,\n", + " phase_emoji TEXT NOT NULL,\n", + " peak_datetime DATETIME NOT NULL,\n", + " phase TEXT NOT NULL)''')\n", + "\n", + " # Truncate the table if it's already populated.\n", + " cursor.execute('DELETE FROM moon_phases')\n", + "\n", + " # Insert some sample data.\n", + " insert_moon_phase = 'INSERT INTO moon_phases(phase_emoji, peak_datetime, phase) VALUES(?, ?, ?)'\n", + " cursor.execute(insert_moon_phase, ('🌕', '2017-12-03 15:47:00', 'Full Moon'))\n", + " cursor.execute(insert_moon_phase, ('🌗', '2017-12-10 07:51:00', 'Last Quarter'))\n", + " cursor.execute(insert_moon_phase, ('🌑', '2017-12-18 06:30:00', 'New Moon'))\n", + " cursor.execute(insert_moon_phase, ('🌓', '2017-12-26 09:20:00', 'First Quarter'))\n", + " cursor.execute(insert_moon_phase, ('🌕', '2018-01-02 02:24:00', 'Full Moon'))\n", + " cursor.execute(insert_moon_phase, ('🌗', '2018-01-08 22:25:00', 'Last Quarter'))\n", + " cursor.execute(insert_moon_phase, ('🌑', '2018-01-17 02:17:00', 'New Moon'))\n", + " cursor.execute(insert_moon_phase, ('🌓', '2018-01-24 22:20:00', 'First Quarter'))\n", + " cursor.execute(insert_moon_phase, ('🌕', '2018-01-31 13:27:00', 'Full Moon'))\n", + "\n", + " # Query for the data in the table to make sure it's populated.\n", + " cursor.execute('SELECT * FROM moon_phases')\n", + " for row in cursor.fetchall():\n", + " print(row)" + ], + "execution_count": 11, + "outputs": [ + { + "output_type": "stream", + "text": [ + "(1, '🌕', '2017-12-03 15:47:00', 'Full Moon')\n", + "(2, '🌗', '2017-12-10 07:51:00', 'Last Quarter')\n", + "(3, '🌑', '2017-12-18 06:30:00', 'New Moon')\n", + "(4, '🌓', '2017-12-26 09:20:00', 'First Quarter')\n", + "(5, '🌕', '2018-01-02 02:24:00', 'Full Moon')\n", + "(6, '🌗', '2018-01-08 22:25:00', 'Last Quarter')\n", + "(7, '🌑', '2018-01-17 02:17:00', 'New Moon')\n", + "(8, '🌓', '2018-01-24 22:20:00', 'First Quarter')\n", + "(9, '🌕', '2018-01-31 13:27:00', 'Full Moon')\n" + ], + "name": "stdout" + } + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "8y-bRhPVWai6" + }, + "source": [ + "We could use a `FlatMap` transform to receive a SQL query and `yield` each result row, but that would mean creating a new database connection for each query. If we generated a large number of queries, creating that many connections could be a bottleneck.\n", + "\n", + "It would be nice to create the database connection only once for each worker, and every query could use the same connection if needed.\n", + "\n", + "We can use a\n", + "[custom `DoFn` transform](https://beam.apache.org/documentation/transforms/python/elementwise/pardo/#example-3-pardo-with-dofn-methods)\n", + "for this. It allows us to open and close resources, like the database connection, only _once_ per `DoFn` _instance_ by using the `setup` and `teardown` methods.\n", + "\n", + "> ℹ️ It should be safe to _read_ from a database with multiple concurrent processes using the same connection, but only one process should be _writing_ at once." + ] + }, + { + "cell_type": "code", + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "Bnpwqr-NV5DF", + "outputId": "b3cb7e46-222b-4e82-8f41-81098f54b7ab" + }, + "source": [ + "import apache_beam as beam\n", + "import sqlite3\n", + "\n", + "class SQLiteSelect(beam.DoFn):\n", + " def __init__(self, database_file):\n", + " self.database_file = database_file\n", + " self.connection = None\n", + "\n", + " def setup(self):\n", + " self.connection = sqlite3.connect(self.database_file)\n", + "\n", + " def process(self, query):\n", + " table, columns = query\n", + " cursor = self.connection.cursor()\n", + " cursor.execute(f\"SELECT {','.join(columns)} FROM {table}\")\n", + " for row in cursor.fetchall():\n", + " yield dict(zip(columns, row))\n", + "\n", + " def teardown(self):\n", + " self.connection.close()\n", + "\n", + "class SelectFromSQLite(beam.PTransform):\n", + " def __init__(self, database_file, queries):\n", + " self.database_file = database_file\n", + " self.queries = queries\n", + "\n", + " def expand(self, pcollection):\n", + " return (\n", + " pcollection\n", + " | 'Create None' >> beam.Create(queries)\n", + " | 'SQLite SELECT' >> beam.ParDo(SQLiteSelect(self.database_file))\n", + " )\n", + "\n", + "database_file = 'moon-phases.db'\n", + "queries = [\n", + " # (table_name, [column1, column2, ...])\n", + " ('moon_phases', ['phase_emoji', 'peak_datetime', 'phase']),\n", + " ('moon_phases', ['phase_emoji', 'phase']),\n", + "]\n", + "with beam.Pipeline() as pipeline:\n", + " (\n", + " pipeline\n", + " | 'Read from SQLite' >> SelectFromSQLite(database_file, queries)\n", + " | 'Print rows' >> beam.Map(print)\n", + " )" + ], + "execution_count": 12, + "outputs": [ + { + "output_type": "stream", + "text": [ + "{'phase_emoji': '🌕', 'peak_datetime': '2017-12-03 15:47:00', 'phase': 'Full Moon'}\n", + "{'phase_emoji': '🌗', 'peak_datetime': '2017-12-10 07:51:00', 'phase': 'Last Quarter'}\n", + "{'phase_emoji': '🌑', 'peak_datetime': '2017-12-18 06:30:00', 'phase': 'New Moon'}\n", + "{'phase_emoji': '🌓', 'peak_datetime': '2017-12-26 09:20:00', 'phase': 'First Quarter'}\n", + "{'phase_emoji': '🌕', 'peak_datetime': '2018-01-02 02:24:00', 'phase': 'Full Moon'}\n", + "{'phase_emoji': '🌗', 'peak_datetime': '2018-01-08 22:25:00', 'phase': 'Last Quarter'}\n", + "{'phase_emoji': '🌑', 'peak_datetime': '2018-01-17 02:17:00', 'phase': 'New Moon'}\n", + "{'phase_emoji': '🌓', 'peak_datetime': '2018-01-24 22:20:00', 'phase': 'First Quarter'}\n", + "{'phase_emoji': '🌕', 'peak_datetime': '2018-01-31 13:27:00', 'phase': 'Full Moon'}\n", + "{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n", + "{'phase_emoji': '🌗', 'phase': 'Last Quarter'}\n", + "{'phase_emoji': '🌑', 'phase': 'New Moon'}\n", + "{'phase_emoji': '🌓', 'phase': 'First Quarter'}\n", + "{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n", + "{'phase_emoji': '🌗', 'phase': 'Last Quarter'}\n", + "{'phase_emoji': '🌑', 'phase': 'New Moon'}\n", + "{'phase_emoji': '🌓', 'phase': 'First Quarter'}\n", + "{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n" + ], + "name": "stdout" + } + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "C5Mx_pfNpu_q" + }, + "source": [ + "# Writing data\n", + "\n", + "Your might want to write your data in various output formats. Take a look at the\n", + "[Built-in I/O Transforms](https://beam.apache.org/documentation/io/built-in)\n", + "page for a list of all the available I/O transforms in Beam.\n", + "\n", + "If none of those work for you, you might need to create your own output transform.\n", + "\n", + "> ℹ️ For a more in-depth guide, take a look at the\n", + "[Developing a new I/O connector](https://beam.apache.org/documentation/io/developing-io-overview) page." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "FpM368NEhc-q" + }, + "source": [ + "## Creating an output transform\n", + "\n", + "The most straightforward way to write data would be to use a `Map` transform to write each element into our desired output format. In most cases, however, this would result in a lot of overhead creating, connecting to, and/or deleting resources.\n", + "\n", + "Instead, most data services are optimized to write _batches_ of elements at a time. Batch writes only connects to the service once, and can load many elements at a time.\n", + "\n", + "Here, we discuss two common ways of batching elements for optimized writes: _fixed-sized batches_, and\n", + "_[windows](https://beam.apache.org/documentation/programming-guide/#windowing)\n", + "of elements_." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "5gypFFh4hM48" + }, + "source": [ + "## Writing fixed-sized batches\n", + "\n", + "If the order of the elements _is not_ important, we can simply create fixed-sized batches and write those independently.\n", + "\n", + "We can use\n", + "[`GroupIntoBatches`](https://beam.apache.org/documentation/transforms/python/aggregation/groupintobatches)\n", + "to get fixed-sized batches. Note that it expects `(key, value)` pairs. Since `GroupIntoBatches` is an _aggregation_, all the elements in a batch _must_ fit into memory for each worker.\n", + "\n", + "> ℹ️ `GroupIntoBatches` requires a `(key, value)` pair. For simplicity, this example uses a placeholder `None` key and discards it later. Depending on your data, there might be a key that makes more sense. Using a _balanced_ key, where each key contains around the same number of elements, may help parallelize the batching process.\n", + "\n", + "Let's create something similar to `WriteToText` but keep it simple with a unique identifier in the file name instead of the file count." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "LcRHXwyT8Rrj" + }, + "source": [ + "import apache_beam as beam\n", + "import glob\n", + "import os\n", + "import uuid\n", + "\n", + "class WriteBatchesToFiles(beam.PTransform):\n", + " def __init__(self, file_name_prefix, file_name_suffix, batch_size):\n", + " self.file_name_prefix = file_name_prefix\n", + " self.file_name_suffix = file_name_suffix\n", + " self.batch_size = batch_size\n", + "\n", + " @staticmethod\n", + " def write_file(lines, file_name_prefix, file_name_suffix):\n", + " file_name = f\"{file_name_prefix}-{uuid.uuid4().hex}{file_name_suffix}\"\n", + " with open(file_name, 'w') as f:\n", + " for line in lines:\n", + " f.write(f\"{line}\\n\")\n", + "\n", + " def expand(self, pcollection):\n", + " # Remove existing files matching the output file_name pattern.\n", + " for f in glob.glob(f\"{self.file_name_prefix}*{self.file_name_suffix}\"):\n", + " os.remove(f)\n", + " return (\n", + " pcollection\n", + " # For simplicity we key with `None` and discard it.\n", + " | 'Key with None' >> beam.WithKeys(lambda _: None)\n", + " | 'Group into batches' >> beam.GroupIntoBatches(self.batch_size)\n", + " | 'Discard key' >> beam.Values()\n", + " | 'Write file' >> beam.Map(\n", + " self.write_file,\n", + " file_name_prefix=self.file_name_prefix,\n", + " file_name_suffix=self.file_name_suffix,\n", + " )\n", + " )\n", + "\n", + "output_file_name_prefix = 'outputs/batch'\n", + "with beam.Pipeline() as pipeline:\n", + " (\n", + " pipeline\n", + " | 'Create file lines' >> beam.Create([\n", + " 'Each element must be a string.',\n", + " 'It writes one element per line.',\n", + " 'There are no guarantees on the line order.',\n", + " 'The data might be written into multiple files.',\n", + " ])\n", + " | 'Write batches to files' >> WriteBatchesToFiles(\n", + " output_file_name_prefix,\n", + " file_name_suffix='.txt',\n", + " batch_size=3,\n", + " )\n", + " )" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "CUklk4JtEbft", + "outputId": "a2f25ae0-ed93-4d6a-ebee-376520ec8ae1" + }, + "source": [ + "# Lets look at the output files and contents.\n", + "!head outputs/batch*.txt" + ], + "execution_count": 91, + "outputs": [ + { + "output_type": "stream", + "text": [ + "==> outputs/batch-92a6a79e31e34fb68db049b78e76b987.txt <==\n", + "The data might be written into multiple files.\n", + "\n", + "==> outputs/batch-c6a8494e2a5146949f2a05918b36ee38.txt <==\n", + "Each element must be a string.\n", + "It writes one element per line.\n", + "There are no guarantees on the line order.\n" + ], + "name": "stdout" + } + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "hbmPT317hP5K" + }, + "source": [ + "## Writing windows of elements\n", + "\n", + "If the order of the elements _is_ important, we could batch the elements by windows. This could be useful in _streaming_ pipelines, where we have an indefinite number of incoming elements and we would like to write windows as they are being processed.\n", + "\n", + "> ℹ️ For more information about windows and triggers, check the [Windowing](https://beam.apache.org/documentation/programming-guide/#windowing) page.\n", + "\n", + "We use a\n", + "[custom `DoFn` transform](https://beam.apache.org/documentation/transforms/python/elementwise/pardo/#example-2-pardo-with-timestamp-and-window-information)\n", + "to extract the window start time and end time.\n", + "We use this for the file names of the output files." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "v_qK300FG9js" + }, + "source": [ + "import apache_beam as beam\n", + "import datetime\n", + "import time\n", + "\n", + "def unix_time(time_str):\n", + " return time.mktime(time.strptime(time_str, '%Y-%m-%d %H:%M:%S'))\n", + "\n", + "class WithWindowInfo(beam.DoFn):\n", + " def process(self, events, window=beam.DoFn.WindowParam):\n", + " yield {\n", + " 'events': events,\n", + " 'window_start': window.start.to_utc_datetime(),\n", + " 'window_end': window.end.to_utc_datetime(),\n", + " }\n", + "\n", + "class WriteWindowsToFiles(beam.PTransform):\n", + " def __init__(self, file_name_prefix, file_name_suffix):\n", + " self.file_name_prefix = file_name_prefix\n", + " self.file_name_suffix = file_name_suffix\n", + "\n", + " @staticmethod\n", + " def write_file(element, file_name_prefix, file_name_suffix):\n", + " start_date = element['window_start'].date()\n", + " start_time = element['window_start'].time()\n", + " end_time = element['window_end'].time()\n", + " file_name = f\"{file_name_prefix}-{start_date}-{start_time}-{end_time}{file_name_suffix}\"\n", + " with open(file_name, 'w') as f:\n", + " for event in element['events']:\n", + " event_time = datetime.datetime.fromtimestamp(event['timestamp'])\n", + " f.write(f\"{event_time}: {event['event']}\\n\")\n", + "\n", + " def expand(self, pcollection):\n", + " return (\n", + " pcollection\n", + " | 'Group all per window' >> beam.GroupBy(lambda _: None)\n", + " | 'Discard key' >> beam.Values()\n", + " | 'Get window info' >> beam.ParDo(WithWindowInfo())\n", + " | 'Write files' >> beam.Map(\n", + " self.write_file,\n", + " self.file_name_prefix,\n", + " self.file_name_suffix,\n", + " )\n", + " )\n", + "\n", + "output_file_name_prefix = 'outputs/window'\n", + "window_size_sec = 5 * 60 # 5 minutes\n", + "with beam.Pipeline() as pipeline:\n", + " (\n", + " pipeline\n", + " | 'Create elements' >> beam.Create([\n", + " {'timestamp': unix_time('2020-03-19 08:49:00'), 'event': 'login'},\n", + " {'timestamp': unix_time('2020-03-19 08:49:20'), 'event': 'view_account'},\n", + " {'timestamp': unix_time('2020-03-19 08:50:00'), 'event': 'view_orders'},\n", + " {'timestamp': unix_time('2020-03-19 08:51:00'), 'event': 'track_order'},\n", + " {'timestamp': unix_time('2020-03-19 09:00:00'), 'event': 'logout'},\n", + " ])\n", + " | 'With timestamps' >> beam.Map(\n", + " lambda x: beam.window.TimestampedValue(x, x['timestamp']))\n", + " | 'Fixed-sized windows' >> beam.WindowInto(\n", + " beam.window.FixedWindows(window_size_sec))\n", + " | 'Write windows to files' >> WriteWindowsToFiles(\n", + " output_file_name_prefix,\n", + " file_name_suffix='.txt',\n", + " )\n", + " )" + ], + "execution_count": 84, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "4QXKKVawTJ2_", + "outputId": "adbe9ad9-5521-4991-8483-7261f9918520" + }, + "source": [ + "# Lets look at the output files and contents.\n", + "!head outputs/window*.txt" + ], + "execution_count": 92, + "outputs": [ + { + "output_type": "stream", + "text": [ + "==> outputs/window-2020-03-19-08:45:00-08:50:00.txt <==\n", + "2020-03-19 08:49:00: login\n", + "2020-03-19 08:49:20: view_account\n", + "\n", + "==> outputs/window-2020-03-19-08:50:00-08:55:00.txt <==\n", + "2020-03-19 08:50:00: view_orders\n", + "2020-03-19 08:51:00: track_order\n", + "\n", + "==> outputs/window-2020-03-19-09:00:00-09:05:00.txt <==\n", + "2020-03-19 09:00:00: logout\n" + ], + "name": "stdout" + } + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "gnoz_mWtxSjW" + }, + "source": [ + "# What's next?\n", + "\n", + "* [Programming guide](https://beam.apache.org/documentation/programming-guide) -- learn about all the Apache Beam concepts in more depth.\n", + "* [Transform catalog](https://beam.apache.org/documentation/transforms/python/overview) -- check out all the available transforms.\n", + "* [Mobile gaming example](https://beam.apache.org/get-started/mobile-gaming-example) -- learn more about windowing, triggers, and streaming through a complete example pipeline.\n", + "* [Runners](https://beam.apache.org/documentation/runners/capability-matrix) -- check the available runners, their capabilities, and how to run your pipeline in them." + ] + } + ] +} \ No newline at end of file diff --git a/website/www/site/content/en/get-started/tour-of-beam.md b/website/www/site/content/en/get-started/tour-of-beam.md index 2312e911f426..abf48e528dae 100644 --- a/website/www/site/content/en/get-started/tour-of-beam.md +++ b/website/www/site/content/en/get-started/tour-of-beam.md @@ -30,9 +30,18 @@ You can also [try an Apache Beam pipeline](/get-started/try-apache-beam) using t ### Learn the basics In this notebook we go through the basics of what is Apache Beam and how to get started. +We learn what is a data pipeline, a PCollection, a PTransform, as well as some basic transforms like `Map`, `FlatMap`, `Filter`, `Combine`, and `GroupByKey`. {{< button-colab url="https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/getting-started.ipynb" >}} +### Reading and writing data + +In this notebook we go through some examples on how to read and write data to and from different data formats. +We introduce the built-in `ReadFromText` and `WriteToText` transforms. +We also see how we can read from CSV files, read from a SQLite database, write fixed-sized batches of elements, and write windows of elements. + +{{< button-colab url="https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb" >}} + ## Transforms Check the [Python transform catalog](/documentation/transforms/python/overview/)