From 6bd90f5fb7a31b24d7d50d3eefcb030d24fde3eb Mon Sep 17 00:00:00 2001 From: liushengsong <602726449@qq.com> Date: Fri, 8 Mar 2024 09:58:32 +0800 Subject: [PATCH] Enhancement: Add gpshrink to support elastic scaling In order to support gpshrink, similar to gpexpand, we first support "alter table shrink table to " to redistribute data on a specific number of segments. For gpshrink implementation, it is mainly divided into two stages similar to gpexpand: 1. Collect the tables that need to be shrink and write them into gpshrink.status_detail. 2. Perform data redistribution on the tables that need to be shrink, and delete specific segments in gp_segment_configuration. --- .github/workflows/build.yml | 43 + gpMgmt/bin/Makefile | 4 +- gpMgmt/bin/gpexpand | 30 + gpMgmt/bin/gppylib/gparray.py | 123 +- .../gppylib/system/ComputeCatalogUpdate.py | 2 +- gpMgmt/bin/gpshrink | 1511 +++++++++++++++++ src/backend/commands/tablecmds.c | 70 +- src/backend/parser/gram.y | 12 +- src/include/nodes/parsenodes.h | 1 + src/include/parser/kwlist.h | 1 + src/test/isolation2/Makefile | 3 + .../isolation2/expected/gpexpand_gpshrink.out | 397 +++++ .../isolation2_expandshrink_schedule | 3 + src/test/isolation2/sql/gpexpand_gpshrink.sql | 141 ++ src/test/regress/expected/shrink_table.out | 856 ++++++++++ src/test/regress/greenplum_schedule | 2 +- src/test/regress/sql/shrink_table.sql | 440 +++++ 17 files changed, 3613 insertions(+), 26 deletions(-) create mode 100644 gpMgmt/bin/gpshrink create mode 100644 src/test/isolation2/expected/gpexpand_gpshrink.out create mode 100644 src/test/isolation2/isolation2_expandshrink_schedule create mode 100644 src/test/isolation2/sql/gpexpand_gpshrink.sql create mode 100644 src/test/regress/expected/shrink_table.out create mode 100644 src/test/regress/sql/shrink_table.sql diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 363a4a5a570..d62263f57e6 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -125,6 +125,49 @@ jobs: /code/gpdb_src/src/test/isolation/expected/ /code/gpdb_src/gpAux/gpdemo/datadirs/standby/log/ /code/gpdb_src/gpAux/gpdemo/datadirs/singlenodedir/demoDataDir-1/log/ + icw-expandshrink-test: + needs: build + runs-on: [ self-hosted, example ] + env: + MAKE_TEST_COMMAND: "-C src/test/isolation2 installcheck-expandshrink" + TEST_OS: "centos" + DUMP_DB: "true" + steps: + - uses: actions/checkout@v3 + with: + path: "gpdb_src" + - uses: actions/download-artifact@v3 + with: + name: cbdb-variables + path: /opt/ + - uses: actions/download-artifact@v3 + with: + name: cbdb-package + path: /opt/ + - name: Run icw-test script + run: | + mkdir /code + cp -a gpdb_src/ /code + cd /code + echo $GITHUB_RUN_ID > gpdb_src/BUILD_NUMBER + gpdb_src/hd-ci/icw_cbdb.bash $FTS_MODE + - uses: actions/upload-artifact@v3 + if: failure() + with: + name: cbdb-icw-expandshrink-test-log + path: | + /code/gpdb_src/src/test/isolation2/regression.out + /code/gpdb_src/src/test/isolation2/regression.diffs + /code/gpdb_src/src/test/isolation2/results/ + /code/gpdb_src/src/test/isolation2/expected/ + /code/gpdb_src/gpAux/gpdemo/datadirs/standby/log/ + /code/gpdb_src/gpAux/gpdemo/datadirs/qddir/demoDataDir-1/log/ + /code/gpdb_src/gpAux/gpdemo/datadirs/dbfast1/demoDataDir0/log/ + /code/gpdb_src/gpAux/gpdemo/datadirs/dbfast2/demoDataDir1/log/ + /code/gpdb_src/gpAux/gpdemo/datadirs/dbfast3/demoDataDir2/log/ + /code/gpdb_src/gpAux/gpdemo/datadirs/dbfast_mirror1/demoDataDir0/log/ + /code/gpdb_src/gpAux/gpdemo/datadirs/dbfast_mirror2/demoDataDir1/log/ + /code/gpdb_src/gpAux/gpdemo/datadirs/dbfast_mirror3/demoDataDir2/log/ icw-orca-test: needs: build runs-on: [self-hosted, example] diff --git a/gpMgmt/bin/Makefile b/gpMgmt/bin/Makefile index ea978e3f281..2b4c7483e2a 100644 --- a/gpMgmt/bin/Makefile +++ b/gpMgmt/bin/Makefile @@ -13,7 +13,7 @@ SUBDIRS += ifaddrs $(recurse) PROGRAMS= analyzedb gpactivatestandby gpaddmirrors gpcheckcat gpcheckperf \ - gpcheckresgroupimpl gpconfig gpdeletesystem gpexpand gpinitstandby \ + gpcheckresgroupimpl gpconfig gpdeletesystem gpexpand gpshrink gpinitstandby \ gpinitsystem gpload gpload.py gplogfilter gpmovemirrors \ gppkg gprecoverseg gpreload gpscp gpsd gpssh gpssh-exkeys gpstart \ gpstate gpstop minirepro gpmemwatcher gpmemreport gpdemo @@ -194,7 +194,7 @@ clean distclean: rm -rf *.pyc rm -f analyzedbc gpactivatestandbyc gpaddmirrorsc gpcheckcatc \ gpcheckperfc gpcheckresgroupimplc gpchecksubnetcfgc gpconfigc \ - gpdeletesystemc gpexpandc gpinitstandbyc gplogfilterc gpmovemirrorsc \ + gpdeletesystemc gpexpandc gpshrinkc gpinitstandbyc gplogfilterc gpmovemirrorsc \ gppkgc gprecoversegc gpreloadc gpscpc gpsdc gpssh-exkeysc gpsshc \ gpstartc gpstatec gpstopc minireproc rm -f gpconfig_modules/gucs_disallowed_in_file.txt diff --git a/gpMgmt/bin/gpexpand b/gpMgmt/bin/gpexpand index 6b6bfc05bbe..f9d0fe36b2b 100755 --- a/gpMgmt/bin/gpexpand +++ b/gpMgmt/bin/gpexpand @@ -1109,6 +1109,8 @@ class gpexpand: tblspc_info = {} for oid in tblspc_oids: + if oid not in tblspc_oid_names: + continue location = os.path.dirname(os.readlink(os.path.join(coordinator_tblspc_dir, oid))) tblspc_info[oid] = {"location": location, @@ -1222,6 +1224,15 @@ class gpexpand: coordinator_tblspc_dir = self.gparray.coordinator.getSegmentTableSpaceDirectory() if not os.listdir(coordinator_tblspc_dir): return None + + tblspc_oids = os.listdir(coordinator_tblspc_dir) + tblspc_oid_names = self.get_tablespace_oid_names() + flag = False + for oid in tblspc_oids: + if oid in tblspc_oid_names: + flag = True + if not flag: + return None if not self.options.filename: raise ExpansionError('Missing tablespace input file') @@ -1385,6 +1396,25 @@ class gpexpand: self.pool.join() self.pool.check_results() + + for i in range(1,12): + flag = True + for segment in newSegments: + if seg.isSegmentMirror() == True: + continue + + cmd = Command('pg_isready for segment', + "pg_isready -q -h %s -p %d -d %s" % (segment.getSegmentHostName(), segment.getSegmentPort(), segment.getSegmentDataDirectory())) + cmd.run() + rc = cmd.get_return_code() + if rc != 0: + flag &= False + if flag: + break + time.sleep(10) + self.logger.info("Waiting for segment ready last for %s second" % (i*10)) + + """ Build the list of delete statements based on the COORDINATOR_ONLY_TABLES defined in gpcatalog.py diff --git a/gpMgmt/bin/gppylib/gparray.py b/gpMgmt/bin/gppylib/gparray.py index 3cde686451a..83545fb82e8 100755 --- a/gpMgmt/bin/gppylib/gparray.py +++ b/gpMgmt/bin/gppylib/gparray.py @@ -148,7 +148,7 @@ def __equal(self, other, ignoreAttr=[]): return True def __eq__(self, other): - return self.__equal(other) + return self.__equal(other, ['mode']) def __hash__(self): @@ -429,6 +429,9 @@ def __str__(self): return "(Primary: %s, Mirror: %s)" % (str(self.primaryDB), str(self.mirrorDB)) + def __eq__(self, other): + return self.primaryDB == other.primaryDB and self.mirrorDB == other.mirrorDB + # -------------------------------------------------------------------- def addPrimary(self,segDB): self.primaryDB=segDB @@ -799,6 +802,7 @@ def __init__(self, segments, segmentsAsLoadedFromDb=None): self.standbyCoordinator = None self.segmentPairs = [] self.expansionSegmentPairs=[] + self.shrinkSegmentPairs=[] self.numPrimarySegments = 0 self.recoveredSegmentDbids = [] @@ -1045,7 +1049,7 @@ def dumpToFile(self, filename): fp.close() # -------------------------------------------------------------------- - def getDbList(self, includeExpansionSegs=False): + def getDbList(self, includeExpansionSegs=False, removeShrinkSegs=False): """ Return a list of all Segment objects that make up the array """ @@ -1054,8 +1058,8 @@ def getDbList(self, includeExpansionSegs=False): dbs.append(self.coordinator) if self.standbyCoordinator: dbs.append(self.standbyCoordinator) - if includeExpansionSegs: - dbs.extend(self.getSegDbList(True)) + if includeExpansionSegs or removeShrinkSegs: + dbs.extend(self.getSegDbList(includeExpansionSegs, removeShrinkSegs)) else: dbs.extend(self.getSegDbList()) return dbs @@ -1105,7 +1109,7 @@ def getDbIdToPeerMap(self): # -------------------------------------------------------------------- - def getSegDbList(self, includeExpansionSegs=False): + def getSegDbList(self, includeExpansionSegs=False, removeShrinkSegs=False): """Return a list of all Segment objects for all segments in the array""" dbs=[] for segPair in self.segmentPairs: @@ -1113,15 +1117,21 @@ def getSegDbList(self, includeExpansionSegs=False): if includeExpansionSegs: for segPair in self.expansionSegmentPairs: dbs.extend(segPair.get_dbs()) + if removeShrinkSegs: + for segPair in self.shrinkSegmentPairs: + dbs = list(filter(lambda x: segPair.primaryDB != x and segPair.mirrorDB != x, dbs)) return dbs # -------------------------------------------------------------------- - def getSegmentList(self, includeExpansionSegs=False): + def getSegmentList(self, includeExpansionSegs=False, removeShrinkSegs=False): """Return a list of SegmentPair objects for all segments in the array""" dbs=[] dbs.extend(self.segmentPairs) if includeExpansionSegs: dbs.extend(self.expansionSegmentPairs) + if removeShrinkSegs: + for segPair in self.shrinkSegmentPairs: + dbs.remove(segPair) return dbs # -------------------------------------------------------------------- @@ -1148,6 +1158,21 @@ def getExpansionSegPairList(self): """Returns a list of all SegmentPair objects that make up the new segments of an expansion""" return self.expansionSegmentPairs + + # -------------------------------------------------------------------- + def getShrinkSegDbList(self): + """Returns a list of all Segment objects that make up the new segments + of an expansion""" + dbs=[] + for segPair in self.shrinkSegmentPairs: + dbs.extend(segPair.get_dbs()) + return dbs + + # -------------------------------------------------------------------- + def getShrinkSegPairList(self): + """Returns a list of all SegmentPair objects that make up the new segments + of an expansion""" + return self.shrinkSegmentPairs # -------------------------------------------------------------------- def getSegmentContainingDb(self, db): @@ -1164,6 +1189,15 @@ def getExpansionSegmentContainingDb(self, db): if db.getSegmentDbId() == segDb.getSegmentDbId(): return segPair return None + + # -------------------------------------------------------------------- + def getShrinkSegmentContainingDb(self, db): + for segPair in self.shrinkSegmentPairs: + for segDb in segPair.get_dbs(): + if db.getSegmentDbId() == segDb.getSegmentDbId(): + return segPair + return None + # -------------------------------------------------------------------- def get_invalid_segdbs(self): dbs=[] @@ -1488,6 +1522,37 @@ def addExpansionSeg(self, content, preferred_role, dbid, role, else: seg.addMirror(segdb) + # -------------------------------------------------------------------- + def addShrinkSeg(self, content, preferred_role, dbid, role, + hostname, address, port, datadir): + """ + Add a segment to the gparray as an shrink segment. + + Note: may work better to construct the new Segment in gpshrink and + simply pass it in. + """ + + segdb = Segment(content = content, + preferred_role = preferred_role, + dbid = dbid, + role = role, + mode = MODE_SYNCHRONIZED, + status = STATUS_UP, + hostname = hostname, + address = address, + port = port, + datadir = datadir) + + if preferred_role == ROLE_PRIMARY: + self.shrinkSegmentPairs.append(SegmentPair()) + seg = self.shrinkSegmentPairs[-1] + if seg.primaryDB: + raise Exception('Duplicate content id for primary segment') + seg.addPrimary(segdb) + else: + seg = self.shrinkSegmentPairs[-1] + seg.addMirror(segdb) + # -------------------------------------------------------------------- def reOrderExpansionSegs(self): """ @@ -1595,6 +1660,52 @@ def validateExpansionSegs(self): else: used_ports[hostname] = [] used_ports[hostname].append(db.port) + + # -------------------------------------------------------------------- + def validateShrinkSegs(self): + """ Checks the segments added for various inconsistencies and errors. + """ + + # make sure we have added at least one segment + if len(self.shrinkSegmentPairs) == 0: + raise Exception('No shrink segments defined') + + totalsize = len(self.segmentPairs) + removesize = len(self.shrinkSegmentPairs) + + if removesize >= totalsize: + self.logger.error('removed segment num %d more than or equal to total segment num %d', removesize, totalsize) + exit(1) + elif removesize < 1: + self.logger.error('removed segment num %d less than 1', removesize) + exit(1) + + for segPair in self.shrinkSegmentPairs: + if self.hasMirrors: + if segPair.mirrorDB is None: + raise Exception('primaryDB and mirrorDB should be removed simultaneously') + + if segPair.primaryDB.content != segPair.mirrorDB.content: + raise Exception('primaryDB content is not equal mirrorDB content') + + # If shrinkSegmentPairs not in the segmentPairs raise exception + flag = False + for segPair_ in self.segmentPairs : + if segPair_ == segPair : + flag = True + + if flag == False: + raise Exception('Shrink segments not in the gp_segment_configuration table') + + # If shrinkSegmentPairs is not the last n segment. + self.shrinkSegmentPairs.sort(key=lambda segPair: segPair.primaryDB.content) + + if self.shrinkSegmentPairs[-1].primaryDB.content != self.get_max_contentid(): + raise Exception('please remove segment from max contentid') + + if self.shrinkSegmentPairs[0].primaryDB.content != self.get_max_contentid()-len(self.shrinkSegmentPairs)+1: + raise Exception('please remove segment in continuous contentid') + # -------------------------------------------------------------------- def addExpansionHosts(self, hosts, mirror_type): diff --git a/gpMgmt/bin/gppylib/system/ComputeCatalogUpdate.py b/gpMgmt/bin/gppylib/system/ComputeCatalogUpdate.py index 3bbb1f8ef43..74d080a1a26 100755 --- a/gpMgmt/bin/gppylib/system/ComputeCatalogUpdate.py +++ b/gpMgmt/bin/gppylib/system/ComputeCatalogUpdate.py @@ -54,7 +54,7 @@ def __init__(self, gpArray, forceMap, useUtilityMode, allowPrimary): self.dbsegmap = dict([(seg.getSegmentDbId(), seg) for seg in gpArray.getSegmentsAsLoadedFromDb()]) # 'goalsegmap' reflects the desired state of the catalog - self.goalsegmap = dict([(seg.getSegmentDbId(), seg) for seg in gpArray.getDbList(includeExpansionSegs=True)]) + self.goalsegmap = dict([(seg.getSegmentDbId(), seg) for seg in gpArray.getDbList(includeExpansionSegs=True, removeShrinkSegs=True)]) # find mirrors and primaries to remove self.mirror_to_remove = [ diff --git a/gpMgmt/bin/gpshrink b/gpMgmt/bin/gpshrink new file mode 100644 index 00000000000..386c64a7e24 --- /dev/null +++ b/gpMgmt/bin/gpshrink @@ -0,0 +1,1511 @@ +#!/usr/bin/env python3 +# Line too long - pylint: disable=C0301 +# Invalid name - pylint: disable=C0103 +# +# Copyright (c) Greenplum Inc 2008. All Rights Reserved. +# +from gppylib.mainUtils import getProgramName + +import copy +import datetime +import os +import random +import sys +import json +import shutil +import signal +import traceback +from collections import defaultdict +from time import strftime, sleep + +try: + import pg, pgdb + + from gppylib.commands.unix import * + from gppylib.commands.gp import * + from gppylib.gparray import GpArray, MODE_NOT_SYNC, STATUS_DOWN + from gppylib.gpparseopts import OptParser, OptChecker + from gppylib.gplog import * + from gppylib.db import catalog + from gppylib.db import dbconn + from gppylib.userinput import * + from gppylib.operations.startSegments import MIRROR_MODE_MIRRORLESS + from gppylib.system import configurationInterface, configurationImplGpdb + from gppylib.system.environment import GpCoordinatorEnvironment + from pgdb import DatabaseError + from gppylib.gpcatalog import COORDINATOR_ONLY_TABLES + from gppylib.operations.package import SyncPackages + from gppylib.operations.utils import ParallelOperation + from gppylib.parseutils import line_reader, check_values, canonicalize_address + from gppylib.heapchecksum import HeapChecksum + from gppylib.commands.pg import PgBaseBackup + from gppylib.mainUtils import ExceptionNoStackTraceNeeded + from gppylib.operations.update_pg_hba_on_segments import update_pg_hba_on_segments + +except ImportError as e: + sys.exit('ERROR: Cannot import modules. Please check that you have sourced greenplum_path.sh. Detail: ' + str(e)) + +# constants +MAX_PARALLEL_SHRINKS = 96 +MAX_BATCH_SIZE = 128 + +SEGMENT_CONFIGURATION_BACKUP_FILE = "gpshrink.gp_segment_configuration" + +DBNAME = 'postgres' + +#global var +_gp_shrink = None + +description = (""" +Adds additional segments to a pre-existing CBDB Array. +""") + +_help = [""" +The input file should be a plain text file with a line for each segment +to add with the format: + + |
||||| + +And add primary before mirror. + +""", + ] + +_TODO = [""" + +Remaining TODO items: +==================== +""", + + """* smarter heuristics on deciding which tables to reorder first. """, + + """* make sure system isn't in "readonly mode" during setup. """, + + """* need a startup validation where we check the status detail + with gp_distribution_policy and make sure that our book + keeping matches reality. we don't have a perfect transactional + model since the tables can be in a different database from + where the gpshrink schema is kept. """, + + """* currently requires that GPHOME and PYTHONPATH be set on all of the remote hosts of + the system. should get rid of this requirement. """ + ] + +_usage = """[-f hosts_file] + +gpshrink -i input_file [-B batch_size] [-t segment_tar_dir] [-S] + +gpshrink [-d duration[hh][:mm[:ss]] | [-e 'YYYY-MM-DD hh:mm:ss']] + [-a] [-n parallel_processes] + +gpshrink -r + +gpshrink -c + +gpshrink -? | -h | --help | --verbose | -v""" + +EXECNAME = os.path.split(__file__)[-1] + + +# ----------------------- Command line option parser ---------------------- + +def parseargs(): + parser = OptParser(option_class=OptChecker, + description=' '.join(description.split()), + version='%prog version $Revision$') + parser.setHelp(_help) + parser.set_usage('%prog ' + _usage) + parser.remove_option('-h') + + parser.add_option('-c', '--clean', action='store_true', + help='remove the shrink schema.') + parser.add_option('-r', '--rollback', action='store_true', + help='rollback failed shrink setup.') + parser.add_option('-a', '--analyze', action='store_true', + help='Analyze the shrinked table after redistribution.') + parser.add_option('-d', '--duration', type='duration', metavar='[h][:m[:s]]', + help='duration from beginning to end.') + parser.add_option('-e', '--end', type='datetime', metavar='datetime', + help="ending date and time in the format 'YYYY-MM-DD hh:mm:ss'.") + parser.add_option('-i', '--input', dest="filename", + help="input shrink configuration file.", metavar="FILE") + parser.add_option('-f', '--hosts-file', metavar='', + help='file containing new host names used to generate input file') + parser.add_option('-B', '--batch-size', type='int', default=16, metavar="", + help='shrink configuration batch size. Valid values are 1-%d' % MAX_BATCH_SIZE) + parser.add_option('-n', '--parallel', type="int", default=1, metavar="", + help='number of tables to shrink at a time. Valid values are 1-%d.' % MAX_PARALLEL_SHRINKS) + parser.add_option('-v', '--verbose', action='store_true', + help='debug output.') + parser.add_option('-S', '--simple-progress', action='store_true', + help='show simple progress.') + parser.add_option('-t', '--tardir', default='.', metavar="FILE", + help='Tar file directory.') + parser.add_option('-h', '-?', '--help', action='help', + help='show this help message and exit.') + parser.add_option('-s', '--silent', action='store_true', + help='Do not prompt for confirmation to proceed on warnings') + parser.add_option('', '--hba-hostnames', action='store_true', default=False, + help='use hostnames instead of CIDR in pg_hba.conf') + parser.add_option('--usage', action="briefhelp") + + parser.set_defaults(verbose=False, filters=[], slice=(None, None)) + + # Parse the command line arguments + (options, args) = parser.parse_args() + return options, args, parser + +def validate_options(options, args, parser): + if len(args) > 0: + logger.error('Unknown argument %s' % args[0]) + parser.exit() + + # -n sanity check + if options.parallel > MAX_PARALLEL_SHRINKS or options.parallel < 1: + logger.error('Invalid argument. parallel value must be >= 1 and <= %d' % MAX_PARALLEL_SHRINKS) + parser.print_help() + parser.exit() + + proccount = os.environ.get('GP_MGMT_PROCESS_COUNT') + if options.batch_size == 16 and proccount is not None: + options.batch_size = int(proccount) + + if options.batch_size < 1 or options.batch_size > 128: + logger.error('Invalid argument. -B value must be >= 1 and <= %s' % MAX_BATCH_SIZE) + parser.print_help() + parser.exit() + + # OptParse can return date instead of datetime so we might need to convert + if options.end and not isinstance(options.end, datetime.datetime): + options.end = datetime.datetime.combine(options.end, datetime.time(0)) + + if options.end and options.end < datetime.datetime.now(): + logger.error('End time occurs in the past') + parser.print_help() + parser.exit() + + if options.end and options.duration: + logger.warn('Both end and duration options were given.') + # Both a duration and an end time were given. + if options.end > datetime.datetime.now() + options.duration: + logger.warn('The duration argument will be used for the shrink end time.') + options.end = datetime.datetime.now() + options.duration + else: + logger.warn('The end argument will be used for the shrink end time.') + elif options.duration: + options.end = datetime.datetime.now() + options.duration + + # -c and -r options are mutually exclusive + if options.rollback and options.clean: + rollbackOpt = "--rollback" if "--rollback" in sys.argv else "-r" + cleanOpt = "--clean" if "--clean" in sys.argv else "-c" + logger.error("%s and %s options cannot be specified together." % (rollbackOpt, cleanOpt)) + parser.exit() + + try: + options.coordinator_data_directory = get_coordinatordatadir() + options.gphome = get_gphome() + except GpError as msg: + logger.error(msg) + parser.exit() + + if not os.path.exists(options.coordinator_data_directory): + logger.error('Coordinator data directory %s does not exist.' % options.coordinator_data_directory) + parser.exit() + + return options, args + +# ------------------------------------------------------------------------- +# process information functions +def create_pid_file(coordinator_data_directory): + """Creates gpshrink pid file""" + try: + fp = open(coordinator_data_directory + '/gpshrink.pid', 'w') + fp.write(str(os.getpid())) + except IOError: + raise + finally: + if fp: fp.close() + + +def remove_pid_file(coordinator_data_directory): + """Removes gpshrink pid file""" + try: + os.unlink(coordinator_data_directory + '/gpshrink.pid') + except: + pass + + +def is_gpshrink_running(coordinator_data_directory): + """Checks if there is another instance of gpshrink running""" + is_running = False + try: + fp = open(coordinator_data_directory + '/gpshrink.pid', 'r') + pid = int(fp.readline().strip()) + fp.close() + is_running = check_pid(pid) + except IOError: + pass + except Exception: + raise + + return is_running + + +def gpshrink_status_file_exists(coordinator_data_directory): + """Checks if gpshrink.pid exists""" + return os.path.exists(coordinator_data_directory + '/gpshrink.status') + + +# ------------------------------------------------------------------------- +# shrink schema + +undone_status = "NOT STARTED" +start_status = "IN PROGRESS" +done_status = "COMPLETED" +does_not_exist_status = 'NO LONGER EXISTS' + +create_schema_sql = "CREATE SCHEMA gpshrink" +drop_schema_sql = "DROP SCHEMA IF EXISTS gpshrink CASCADE" + +status_table_sql = """CREATE TABLE gpshrink.status + ( status text, + updated timestamp ) """ + +status_detail_table_sql = """CREATE TABLE gpshrink.status_detail + ( dbname text, + fq_name text, + table_oid oid, + root_partition_oid oid, + rank int, + external_writable bool, + status text, + shrink_started timestamp, + shrink_finished timestamp, + source_bytes numeric ) """ +# gpshrink views +progress_view_simple_sql = """CREATE VIEW gpshrink.shrink_progress AS +SELECT + CASE status + WHEN '%s' THEN 'Tables Shrinked' + WHEN '%s' THEN 'Tables Left' + END AS Name, + count(*)::text AS Value +FROM gpshrink.status_detail GROUP BY status""" % (done_status, undone_status) + +progress_view_sql = """CREATE VIEW gpshrink.shrink_progress AS +SELECT + CASE status + WHEN '%s' THEN 'Tables Shrinked' + WHEN '%s' THEN 'Tables Left' + WHEN '%s' THEN 'Tables In Progress' + END AS Name, + count(*)::text AS Value +FROM gpshrink.status_detail GROUP BY status + +UNION + +SELECT + CASE status + WHEN '%s' THEN 'Bytes Done' + WHEN '%s' THEN 'Bytes Left' + WHEN '%s' THEN 'Bytes In Progress' + END AS Name, + SUM(source_bytes)::text AS Value +FROM gpshrink.status_detail GROUP BY status + +UNION + +SELECT + 'Estimated shrink Rate' AS Name, + (SUM(source_bytes) / (1 + extract(epoch FROM (max(shrink_finished) - min(shrink_started)))) / 1024 / 1024)::text || ' MB/s' AS Value +FROM gpshrink.status_detail +WHERE status = '%s' +AND +shrink_started > (SELECT updated FROM gpshrink.status WHERE status = '%s' ORDER BY updated DESC LIMIT 1) + +UNION + +SELECT +'Estimated Time to Completion' AS Name, +CAST((SUM(source_bytes) / ( +SELECT 1 + SUM(source_bytes) / (1 + (extract(epoch FROM (max(shrink_finished) - min(shrink_started))))) +FROM gpshrink.status_detail +WHERE status = '%s' +AND +shrink_started > (SELECT updated FROM gpshrink.status WHERE status = '%s' ORDER BY +updated DESC LIMIT 1)))::text || ' seconds' as interval)::text AS Value +FROM gpshrink.status_detail +WHERE status = '%s' + OR status = '%s'""" % (done_status, undone_status, start_status, + done_status, undone_status, start_status, + done_status, + 'SHRINK STARTED', + done_status, + 'SHRINK STARTED', + start_status, undone_status) + +# ------------------------------------------------------------------------- +class InvalidStatusError(Exception): pass + + +class ValidationError(Exception): pass + + +# ------------------------------------------------------------------------- +class gpshrinkStatus(): + """Class that manages gpshrink status file. + + The status file is placed in the coordinator data directory on both the coordinator and + the standby coordinator. it's used to keep track of where we are in the progression. + """ + + def __init__(self, logger, coordinator_data_directory, coordinator_mirror=None): + self.logger = logger + + self._status_values = {'UNINITIALIZED': 1, + 'SHRINK_PREPARE_STARTED': 2, + 'UPDATE_CATALOG_STARTED': 3, + 'UPDATE_CATALOG_DONE': 4, + 'SETUP_SHRINK_SCHEMA_STARTED': 5, + 'SETUP_SHRINK_SCHEMA_DONE': 6, + 'PREPARE_SHRINK_SCHEMA_STARTED': 7, + 'PREPARE_SHRINK_SCHEMA_DONE': 8, + 'SHRINK_PREPARE_DONE': 9, + 'SHRINK_PERFOEM_STARTED':10, + 'SHRINK_TABLE_STARTED': 11, + 'SHRINK_TABLE_DONE': 12, + 'SHRINK_CATALOG_STARTED': 13, + 'SHRINK_CATALOG_DONE': 14, + 'SHRINK_PERFOEM_DONE': 15, + } + self._status = [] + self._status_info = [] + self._coordinator_data_directory = coordinator_data_directory + self._coordinator_mirror = coordinator_mirror + self._status_filename = coordinator_data_directory + '/gpshrink.status' + if coordinator_mirror: + self._status_standby_filename = coordinator_mirror.getSegmentDataDirectory() \ + + '/gpshrink.status' + self._segment_configuration_standby_filename = coordinator_mirror.getSegmentDataDirectory() \ + + '/' + SEGMENT_CONFIGURATION_BACKUP_FILE + self._fp = None + self._temp_dir = None + self._input_filename = None + self._gp_segment_configuration_backup = None + + if os.path.exists(self._status_filename): + self._read_status_file() + + def _read_status_file(self): + """Reads in an existing gpshrink status file""" + self.logger.debug("Trying to read in a pre-existing gpshrink status file") + try: + self._fp = open(self._status_filename, 'a+') + self._fp.seek(0) + + for line in self._fp: + (status, status_info) = line.rstrip().split(':') + if status == 'SHRINK_PREPARE_STARTED': + self._input_filename = status_info + elif status == 'UPDATE_CATALOG_STARTED': + self._gp_segment_configuration_backup = status_info + + self._status.append(status) + self._status_info.append(status_info) + except IOError: + raise + + if self._status[-1] not in self._status_values: + raise InvalidStatusError('Invalid status file. Unknown status %s' % self._status) + + def create_status_file(self): + """Creates a new gpshrink status file""" + try: + self._fp = open(self._status_filename, 'w') + self._fp.write('UNINITIALIZED:None\n') + self._fp.flush() + os.fsync(self._fp) + self._status.append('UNINITIALIZED') + self._status_info.append('None') + except IOError: + raise + + if self._coordinator_mirror: + self._sync_status_file() + + def _sync_status_file(self): + """Syncs the gpshrink status file with the coordinator mirror""" + cpCmd = Scp('gpshrink copying status file to coordinator mirror', + srcFile=self._status_filename, + dstFile=self._status_standby_filename, + dstHost=self._coordinator_mirror.getSegmentHostName()) + cpCmd.run(validateAfter=True) + + def set_status(self, status, status_info=None, force=False): + """Sets the current status. gpshrink status must be set in + proper order. Any out of order status result in an + InvalidStatusError exception. But if force is True, setting + status out of order is allowded""" + if len(self._status) == 0 or not os.path.exists(self._status_filename): + raise InvalidStatusError('not in shrink status or no status file') + + self.logger.debug("Transitioning from %s to %s" % (self._status[-1], status)) + + if not self._fp: + raise InvalidStatusError('The status file is invalid and cannot be written to') + if status not in self._status_values: + raise InvalidStatusError('%s is an invalid gpshrink status' % status) + self._fp.write('%s:%s\n' % (status, status_info)) + self._fp.flush() + os.fsync(self._fp) + self._status.append(status) + self._status_info.append(status_info) + if self._coordinator_mirror: + self._sync_status_file() + + def get_current_status(self): + """Gets the current status that has been written to the gpshrink + status file""" + if (len(self._status) > 0 and len(self._status_info) > 0): + return (self._status[-1], self._status_info[-1]) + else: + return (None, None) + + def get_status_history(self): + """Gets the full status history""" + return list(zip(self._status, self._status_info)) + + def remove_status_file(self): + """Closes and removes the gpexand status file""" + if self._fp: + self._fp.close() + self._fp = None + if os.path.exists(self._status_filename): + os.unlink(self._status_filename) + if self._coordinator_mirror: + RemoveFile.remote('gpshrink coordinator mirror status file cleanup', + self._coordinator_mirror.getSegmentHostName(), + self._status_standby_filename) + + def remove_segment_configuration_backup_file(self): + """ Remove the segment configuration backup file """ + self.logger.debug("Removing segment configuration backup file") + if self._gp_segment_configuration_backup != None and os.path.exists( + self._gp_segment_configuration_backup) == True: + os.unlink(self._gp_segment_configuration_backup) + if self._coordinator_mirror: + RemoveFile.remote('gpshrink coordinator mirror segment configuration backup file cleanup', + self._coordinator_mirror.getSegmentHostName(), + self._segment_configuration_standby_filename) + + def sync_segment_configuration_backup_file(self): + """ Sync the segment configuration backup file to standby """ + if self._coordinator_mirror: + self.logger.debug("Sync segment configuration backup file") + cpCmd = Scp('gpshrink copying segment configuration backup file to coordinator mirror', + srcFile=self._gp_segment_configuration_backup, + dstFile=self._segment_configuration_standby_filename, + dstHost=self._coordinator_mirror.getSegmentHostName()) + cpCmd.run(validateAfter=True) + + def get_input_filename(self): + """Gets input file that was used by shrink setup""" + return self._input_filename + + def get_gp_segment_configuration_backup(self): + """Gets the filename of the gp_segment_configuration backup file + created during shrink setup""" + return self._gp_segment_configuration_backup + + def set_gp_segment_configuration_backup(self, filename): + """Sets the filename of the gp_segment_configuration backup file""" + self._gp_segment_configuration_backup = filename + + + +# ------------------------------------------------------------------------- + +class ShrinkError(Exception): pass + + + +# ------------------------------------------------------------------------------------------------------ +# ------------------------------------------------------------------------------------------------------ +class NewSegmentInput: + def __init__(self, hostname, address, port, datadir, dbid, contentId, role): + self.hostname = hostname + self.address = address + self.port = port + self.datadir = datadir + self.dbid = dbid + self.contentId = contentId + self.role = role + + +# ------------------------------------------------------------------------------------------------------ +# ------------------------------------------------------------------------------------------------------ +class gpshrink: + def __init__(self, logger, gparray, dburl, options, parallel=1, size=0): + self.pastThePointOfNoReturn = False + self.logger = logger + self.dburl = dburl + self.options = options + self.numworkers = parallel + self.gparray = gparray + self.size = size + self.conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8', allowSystemTableMods=True) + self.old_segments = self.gparray.getSegDbList() + + datadir = self.gparray.coordinator.getSegmentDataDirectory() + self.statusLogger = gpshrinkStatus(logger=logger, + coordinator_data_directory=datadir, + coordinator_mirror=self.gparray.standbyCoordinator) + + # Adjust batch size if it's too high given the number of segments + seg_count = len(self.old_segments) + if self.options.batch_size > seg_count: + self.options.batch_size = seg_count + self.pool = WorkerPool(numWorkers=self.options.batch_size) + + self.queue = None + + + @staticmethod + def prepare_gpdb_state(logger, dburl, options): + """ Gets GPDB in the appropriate state for an shrink. + This state will depend on if this is a new shrink setup, + a continuation of a previous shrink or a rollback """ + # Get the database in the expected state for the shrink/rollback + # If gpshrink status file exists ,the last run of gpshrink didn't finish properly + + gpshrink_db_status = gpshrink.get_status_from_db(dburl, options) + + return gpshrink_db_status + + @staticmethod + def get_status_from_db(dburl, options): + """Gets gpshrink status from the gpshrink schema""" + status_conn = None + gpshrink_db_status = None + if get_local_db_mode(options.coordinator_data_directory) == 'NORMAL': + try: + status_conn = dbconn.connect(dburl, encoding='UTF8') + # Get the last status entry + cursor = dbconn.query(status_conn, 'SELECT status FROM gpshrink.status ORDER BY updated DESC LIMIT 1') + if cursor.rowcount == 1: + gpshrink_db_status = cursor.fetchone()[0] + + except Exception: + # shrink schema doesn't exists or there was a connection failure. + pass + finally: + if status_conn: status_conn.close() + + # make sure gpshrink schema doesn't exist since it wasn't in DB provided + if not gpshrink_db_status: + """ + MPP-14145 - If there's no discernible status, the schema must not exist. + + The checks in get_status_from_db claim to look for existence of the 'gpshrink' schema, but more accurately they're + checking for non-emptiness of the gpshrink.status table. If the table were empty, but the schema did exist, gpshrink would presume + a new shrink was taking place and it would try to CREATE SCHEMA later, which would fail. So, here, if this is the case, we error out. + + Note: -c/--clean will not necessarily work either, as it too has assumptions about the non-emptiness of the gpshrink schema. + """ + conn = dbconn.connect(dburl, encoding='UTF8', utility=True) + try: + count = dbconn.querySingleton(conn, + "SELECT count(n.nspname) FROM pg_catalog.pg_namespace n WHERE n.nspname = 'gpshrink'") + if count > 0: + raise ShrinkError( + "Existing shrink state could not be determined, but a gpshrink schema already exists. Cannot proceed.") + finally: + conn.close() + + return gpshrink_db_status + + def validate_max_connections(self): + try: + conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8') + max_connections = int(catalog.getSessionGUC(conn, 'max_connections')) + except DatabaseError as ex: + if self.options.verbose: + logger.exception(ex) + logger.error('Failed to check max_connections GUC') + raise ex + finally: + conn.close() + + if max_connections < self.options.parallel * 2 + 1: + self.logger.error('max_connections is too small to shrink %d tables at' % self.options.parallel) + self.logger.error('a time. This will lead to connection errors. Either') + self.logger.error('reduce the value for -n passed to gpshrink or raise') + self.logger.error('max_connections in postgresql.conf') + return False + + return True + + def cleanup_file(self): + """simple remove remove status_file segment_configuration_backup_file """ + self.statusLogger.remove_status_file() + self.statusLogger.remove_segment_configuration_backup_file() + + def get_state(self): + """Returns shrink state from status logger""" + return self.statusLogger.get_current_status()[0] + + def generate_inputfile(self): + """Writes a gpshrink input file based on shrink segments + added to gparray by the gpshrink interview""" + outputfile = 'gpshrink_inputfile_' + strftime("%Y%m%d_%H%M%S") + outfile = open(outputfile, 'w') + + logger.info("Generating input file...") + + for db in self.gparray.getShrinkSegDbList(): + tempStr = "%s|%s|%d|%s|%d|%d|%s" % (canonicalize_address(db.getSegmentHostName()) + , canonicalize_address(db.getSegmentAddress()) + , db.getSegmentPort() + , db.getSegmentDataDirectory() + , db.getSegmentDbId() + , db.getSegmentContentId() + , db.getSegmentPreferredRole() + ) + outfile.write(tempStr + "\n") + + outfile.close() + + return outputfile + + + def add_remove_segments(self, inputFileEntryList): + for seg in inputFileEntryList: + self.gparray.addShrinkSeg(content=int(seg.contentId) + , preferred_role=seg.role + , dbid=int(seg.dbid) + , role=seg.role + , hostname=seg.hostname.strip() + , address=seg.address.strip() + , port=int(seg.port) + , datadir=os.path.abspath(seg.datadir.strip()) + ) + try: + self.gparray.validateShrinkSegs() + except Exception as e: + raise ShrinkError('Invalid input file: %s' % e) + + def _getParsedRow(self, lineno, line): + parts = line.split('|') + if len(parts) != 7: + raise ExceptionNoStackTraceNeeded("expected 7 parts, obtained %d" % len(parts)) + hostname, address, port, datadir, dbid, contentId, role = parts + check_values(lineno, address=address, port=port, datadir=datadir, content=contentId, + hostname=hostname, dbid=dbid, role=role) + return NewSegmentInput(hostname=hostname + , port=port + , address=address + , datadir=datadir + , dbid=dbid + , contentId=contentId + , role=role + ) + + def read_input_files(self, inputFilename=None): + """Reads and validates line format of the input file passed + in on the command line via the -i arg""" + + retValue = [] + + if not self.options.filename and not inputFilename: + raise ShrinkError('Missing input file') + + if self.options.filename: + inputFilename = self.options.filename + f = None + + try: + f = open(inputFilename, 'r') + for lineno, line in line_reader(f): + try: + retValue.append(self._getParsedRow(lineno, line)) + except ValueError: + raise ShrinkError('Missing or invalid value on line %d of file %s.' % (lineno, inputFilename)) + except Exception as e: + raise ShrinkError('Invalid input file on line %d of file %s: %s' % (lineno, inputFilename, str(e))) + except IOError: + raise ShrinkError('Input file %s not found' % inputFilename) + finally: + if f is not None: + f.close() + + return retValue + + + def lock_catalog(self): + self.conn_catalog_lock = dbconn.connect(self.dburl, utility=True, encoding='UTF8') + self.logger.info('Locking catalog') + dbconn.execSQL(self.conn_catalog_lock, "BEGIN", autocommit=False) + # FIXME: is CHECKPOINT inside BEGIN the one wanted by us? + dbconn.execSQL(self.conn_catalog_lock, "select gp_expand_lock_catalog()", autocommit=False) + dbconn.execSQL(self.conn_catalog_lock, "CHECKPOINT", autocommit=False) + self.logger.info('Locked catalog') + + def unlock_catalog(self): + self.logger.info('Unlocking catalog') + dbconn.execSQL(self.conn_catalog_lock, "COMMIT") + self.conn_catalog_lock.close() + self.conn_catalog_lock = None + self.logger.info('Unlocked catalog') + + def update_original_segments(self): + """Updates the gp_id catalog table of existing hosts""" + + # Update the gp_id of original segments + self.newPrimaryCount = 0; + for seg in self.gparray.getShrinkSegDbList(): + if seg.isSegmentPrimary(False): + self.newPrimaryCount -= 1 + + self.newPrimaryCount -= self.gparray.get_primary_count() + + # FIXME: update postmaster.opts + + def update_catalog_swap_segment(self): + """ + Backup the gp_segment_configuration. + Fixme: we should swap the removed segment to the end. And + save the new removed segment in the file to remove in the + next pahse. + """ + self.statusLogger.set_gp_segment_configuration_backup( + self.options.coordinator_data_directory + '/' + SEGMENT_CONFIGURATION_BACKUP_FILE) + self.gparray.dumpToFile(self.statusLogger.get_gp_segment_configuration_backup()) + self.statusLogger.set_status('UPDATE_CATALOG_STARTED', self.statusLogger.get_gp_segment_configuration_backup()) + self.statusLogger.sync_segment_configuration_backup_file() + + + + self.statusLogger.set_status('UPDATE_CATALOG_DONE') + + + + def update_catalog_remove_segments(self): + """ + Starts the database, calls updateSystemConfig() to setup + the catalog tables and get the actual dbid and content id + for the new segments. + """ + + self.statusLogger.set_status('SHRINK_CATALOG_STARTED') + + # Update the catalog + configurationInterface.getConfigurationProvider().updateSystemConfig( + self.gparray, + "%s: segment config for resync" % getProgramName(), + dbIdToForceMirrorRemoveAdd={}, + useUtilityMode=True, + allowPrimary=True + ) + + # Issue checkpoint due to forced shutdown below + self.conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8') + dbconn.execSQL(self.conn, "CHECKPOINT") + self.conn.close() + + # increase expand version + self.conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8') + dbconn.execSQL(self.conn, "select gp_expand_bump_version()") + self.conn.close() + + self.statusLogger.set_status('SHRINK_CATALOG_DONE') + self.statusLogger.set_status('SHRINK_PERFOEM_DONE') + + def stop_remove_segments(self): + """ + Stop the removed segment, and join the pool + """ + newSegments = self.gparray.getShrinkSegDbList() + for seg in newSegments: + segStopCmd = SegmentStop( + name="Stopping new segment dbid %s on host %s." % (str(seg.getSegmentDbId), seg.getSegmentHostName()) + , dataDir=seg.getSegmentDataDirectory() + , mode='fast' + , nowait=False + , ctxt=REMOTE + , remoteHost=seg.getSegmentHostName() + ) + self.pool.addCommand(segStopCmd) + self.pool.join() + self.pool.check_results() + self.pool.haltWork() + self.pool.joinWorkers() + + def start_prepare(self): + """Inserts into gpshrink.status that shrink preparation has started.""" + if self.options.filename: + self.statusLogger.create_status_file() + self.statusLogger.set_status('SHRINK_PREPARE_STARTED', os.path.abspath(self.options.filename)) + + def setup_schema(self): + """Used to setup the gpshrink schema""" + self.statusLogger.set_status('SETUP_SHRINK_SCHEMA_STARTED') + self.logger.info('Creating shrink schema') + self.conn = dbconn.connect(self.dburl, encoding='UTF8') + dbconn.execSQL(self.conn, create_schema_sql) + dbconn.execSQL(self.conn, status_table_sql) + dbconn.execSQL(self.conn, status_detail_table_sql) + + # views + if not self.options.simple_progress: + dbconn.execSQL(self.conn, progress_view_sql) + else: + dbconn.execSQL(self.conn, progress_view_simple_sql) + + self.statusLogger.set_status('SETUP_SHRINK_SCHEMA_DONE') + + def prepare_schema(self): + """Prepares the gpshrink schema""" + self.statusLogger.set_status('PREPARE_SHRINK_SCHEMA_STARTED') + + if not self.conn: + self.conn = dbconn.connect(self.dburl, encoding='UTF8', allowSystemTableMods=True) + self.gparray = GpArray.initFromCatalog(self.dburl) + + nowStr = datetime.datetime.now() + statusSQL = "INSERT INTO gpshrink.status VALUES ( 'SETUP', '%s' ) " % (nowStr) + + dbconn.execSQL(self.conn, statusSQL) + + db_list = catalog.getDatabaseList(self.conn) + + for db in db_list: + dbname = db[0] + if dbname == 'template0': + continue + self.logger.info('Populating gpshrink.status_detail with data from database %s' % ( + dbname)) + self._populate_regular_tables(dbname) + + nowStr = datetime.datetime.now() + statusSQL = "INSERT INTO gpshrink.status VALUES ( 'SETUP DONE', '%s' ) " % (nowStr) + dbconn.execSQL(self.conn, statusSQL) + + self.conn.close() + + self.statusLogger.set_status('PREPARE_SHRINK_SCHEMA_DONE') + self.statusLogger.set_status('SHRINK_PREPARE_DONE') + + + def _populate_regular_tables(self, dbname): + # FIXME: we process partition table as regular_table, because processing each leaf table + # like exapnd in shrink may result unsafe intermediate state and cannot roll back. + src_bytes_str = "0" if self.options.simple_progress else "pg_relation_size(quote_ident(n.nspname) || '.' || quote_ident(c.relname))" + sql = """SELECT + current_database(), + quote_ident(n.nspname) || '.' || quote_ident(c.relname) as fq_name, + c.oid as tableoid, + NULL as root_partition_oid, + 2 as rank, + pe.writable is not null as external_writable, + '%s' as undone_status, + NULL as shrink_started, + NULL as shrink_finished, + %s as source_bytes +FROM + pg_class c + JOIN pg_namespace n ON (c.relnamespace=n.oid) + JOIN pg_catalog.gp_distribution_policy p on (c.oid = p.localoid) + LEFT JOIN pg_partitioned_table pp on (c.oid=pp.partrelid) + LEFT JOIN pg_exttable pe on (c.oid=pe.reloid and pe.writable) +WHERE + NOT c.relispartition + AND n.nspname != 'pg_bitmapindex' + AND c.relpersistence != 't' + """ % (undone_status, src_bytes_str) + self.logger.debug(sql) + table_conn = self.connect_database(dbname) + + try: + data_file = os.path.abspath('./status_detail.dat') + self.logger.debug('status_detail data file: %s' % data_file) + copySQL = """COPY (%s) TO '%s'""" % (sql, data_file) + + self.logger.debug(copySQL) + dbconn.execSQL(table_conn, copySQL) + table_conn.close() + except Exception as e: + raise ShrinkError(e) + + try: + copySQL = """COPY gpshrink.status_detail FROM '%s'""" % (data_file) + + self.logger.debug(copySQL) + dbconn.execSQL(self.conn, copySQL) + except Exception as e: + raise ShrinkError(e) + finally: + os.unlink(data_file) + + def compute_shrink_size(self): + """Compute we need to shrink the cluster to actual size """ + totalsize = len(self.gparray.segmentPairs) + removesize = len(self.gparray.shrinkSegmentPairs) + + if removesize >= totalsize: + self.logger.error('remove segment num %d more than segment num %d', removesize, totalsize) + exit(1) + elif removesize < 1: + self.logger.error('remove segment num %d less than 1', removesize) + exit(1) + self.size = totalsize - removesize + + def perform_shrink(self): + """Performs the actual table re-organizations""" + self.statusLogger.set_status('SHRINK_PERFOEM_STARTED') + self.statusLogger.set_status('SHRINK_TABLE_STARTED') + + shrinkStart = datetime.datetime.now() + + # setup a threadpool + self.queue = WorkerPool(numWorkers=self.numworkers) + + # go through and reset any "IN PROGRESS" tables + self.conn = dbconn.connect(self.dburl, encoding='UTF8') + sql = "INSERT INTO gpshrink.status VALUES ( 'SHRINK STARTED', '%s' ) " % ( + shrinkStart) + dbconn.execSQL(self.conn, sql) + + sql = """UPDATE gpshrink.status_detail set status = '%s' WHERE status = '%s' """ % (undone_status, start_status) + dbconn.execSQL(self.conn, sql) + + # read schema and queue up commands + sql = "SELECT * FROM gpshrink.status_detail WHERE status = 'NOT STARTED' ORDER BY rank" + cursor = dbconn.query(self.conn, sql) + + for row in cursor: + self.logger.debug(row) + name = "name" + tbl = ShrinkTable(options=self.options, row=row, size=self.size) + cmd = ShrinkCommand(name=name, status_url=self.dburl, table=tbl, options=self.options) + self.queue.addCommand(cmd) + + table_shrink_error = False + + stopTime = None + stoppedEarly = False + if self.options.end: + stopTime = self.options.end + + # wait till done. + while not self.queue.isDone(): + logger.debug( + "woke up. queue: %d finished %d " % (self.queue.assigned, self.queue.completed_queue.qsize())) + if stopTime and datetime.datetime.now() >= stopTime: + stoppedEarly = True + break + time.sleep(5) + + shrinkStopped = datetime.datetime.now() + + self.queue.haltWork() + self.queue.joinWorkers() + + + # Doing this after the halt and join workers guarantees that no new completed items can be added + # while we're doing a check + for shrinkCommand in self.queue.getCompletedItems(): + if shrinkCommand.table_shrink_error: + table_shrink_error = True + break + + if stoppedEarly: + logger.info('End time reached. Stopping shrink.') + sql = "INSERT INTO gpshrink.status VALUES ( 'SHRINK STOPPED', '%s' ) " % ( + shrinkStopped) + dbconn.execSQL(self.conn, sql) + logger.info('You can resume shrink by running gpshrink again') + elif table_shrink_error: + logger.warn('**************************************************') + logger.warn('One or more tables failed to shrink successfully.') + logger.warn('Please check the log file, correct the problem and') + logger.warn('run gpshrink again to finish the shrink process') + logger.warn('**************************************************') + # We'll try to update the status, but if the errors were caused by + # going into read only mode, this will fail. That's ok though as + # gpshrink will resume next run + try: + sql = "INSERT INTO gpshrink.status VALUES ( 'SHRINK STOPPED', '%s' ) " % ( + shrinkStopped) + dbconn.execSQL(self.conn, sql) + except: + pass + else: + sql = "INSERT INTO gpshrink.status VALUES ( 'SHRINK COMPLETE', '%s' ) " % ( + shrinkStopped) + dbconn.execSQL(self.conn, sql) + logger.info("SHRINK COMPLETED SUCCESSFULLY") + + self.conn.commit() + self.conn.close() + self.statusLogger.set_status('SHRINK_TABLE_DONE') + + def shutdown(self): + """used if the script is closed abrubtly""" + logger.info('Shutting down gpshrink...') + if self.pool: + self.pool.haltWork() + self.pool.joinWorkers() + + if self.queue: + self.queue.haltWork() + self.queue.joinWorkers() + + try: + shrinkStopped = datetime.datetime.now() + sql = "INSERT INTO gpshrink.status VALUES ( 'SHRINK STOPPED', '%s' ) " % ( + shrinkStopped) + dbconn.execSQL(self.conn, sql) + self.conn.close() + except pgdb.OperationalError: + pass + except Exception: + # schema doesn't exist. Cancel or error during setup + pass + + def halt_work(self): + if self.pool: + self.pool.haltWork() + self.pool.joinWorkers() + + if self.queue: + self.queue.haltWork() + self.queue.joinWorkers() + + def cleanup_schema(self): + """Removes the gpshrink schema""" + # drop schema + + # See if user wants to dump the status_detail table to file + c = dbconn.connect(self.dburl, encoding='UTF8') + + self.logger.info("Removing gpshrink schema") + dbconn.execSQL(c, drop_schema_sql) + c.commit() + c.close() + + def connect_database(self, dbname): + test_url = copy.deepcopy(self.dburl) + test_url.pgdb = dbname + c = dbconn.connect(test_url, encoding='UTF8', allowSystemTableMods=True) + return c + + def validate_heap_checksums(self): + num_workers = min(len(self.gparray.get_hostlist()), MAX_PARALLEL_SHRINKS) + heap_checksum_util = HeapChecksum(gparray=self.gparray, num_workers=num_workers, logger=self.logger) + successes, failures = heap_checksum_util.get_segments_checksum_settings() + if len(successes) == 0: + logger.fatal("No segments responded to ssh query for heap checksum. Not shrinking the cluster.") + return 1 + + consistent, inconsistent, coordinator_heap_checksum = heap_checksum_util.check_segment_consistency(successes) + + inconsistent_segment_msgs = [] + for segment in inconsistent: + inconsistent_segment_msgs.append("dbid: %s " + "checksum set to %s differs from coordinator checksum set to %s" % + (segment.getSegmentDbId(), segment.heap_checksum, + coordinator_heap_checksum)) + + if not heap_checksum_util.are_segments_consistent(consistent, inconsistent): + self.logger.fatal("Cluster heap checksum setting differences reported") + self.logger.fatal("Heap checksum settings on %d of %d segment instances do not match coordinator <<<<<<<<" + % (len(inconsistent_segment_msgs), len(self.gparray.segmentPairs))) + self.logger.fatal("Review %s for details" % get_logfile()) + log_to_file_only("Failed checksum consistency validation:", logging.WARN) + self.logger.fatal("gpshrink error: Cluster will not be modified as checksum settings are not consistent " + "across the cluster.") + + for msg in inconsistent_segment_msgs: + log_to_file_only(msg, logging.WARN) + raise Exception("Segments have heap_checksum set inconsistently to coordinator") + else: + self.logger.info("Heap checksum setting consistent across cluster") + + +# ----------------------------------------------- +class ShrinkTable(): + def __init__(self, options, row=None, size = 0): + self.options = options + if row is not None: + (self.dbname, self.fq_name, self.table_oid, + self.root_partition_oid, + self.rank, self.external_writable, self.status, + self.shrink_started, self.shrink_finished, + self.source_bytes) = row + self.size = size + + def add_table(self, conn): + insertSQL = """INSERT INTO gpshrink.status_detail + VALUES ('%s','%s',%s, + '%d',%d,'%s','%s','%s','%s',%d) + """ % (self.dbname.replace("'", "''"), self.fq_name.replace("'", "''"), self.table_oid, + self.root_partition_oid, + self.rank, self.external_writable, self.status, + self.shrink_started, self.shrink_finished, + self.source_bytes) + logger.info('Added table %s.%s' % (self.dbname, self.fq_name)) + logger.debug(insertSQL) + dbconn.execSQL(conn, insertSQL) + + def mark_started(self, status_conn, table_conn, start_time, cancel_flag): + if cancel_flag: + return + sql = "SELECT pg_relation_size(%s)" % (self.table_oid) + row = dbconn.queryRow(table_conn, sql) + src_bytes = int(row[0]) + logger.debug(" Table: %s has %d bytes" % (self.fq_name, src_bytes)) + + sql = """UPDATE gpshrink.status_detail + SET status = '%s', shrink_started='%s', + source_bytes = %d + WHERE dbname = '%s' + AND table_oid = %s """ % (start_status, start_time, + src_bytes, self.dbname.replace("'", "''"), + self.table_oid) + + logger.debug("Mark Started: " + sql) + dbconn.execSQL(status_conn, sql) + + def reset_started(self, status_conn): + sql = """UPDATE gpshrink.status_detail + SET status = '%s', shrink_started=NULL, shrink_finished=NULL + WHERE dbname = '%s' + AND table_oid = %s """ % (undone_status, + self.dbname.replace("'", "''"), self.table_oid) + + logger.debug('Resetting detailed_status: %s' % sql) + dbconn.execSQL(status_conn, sql) + + def shrink(self, table_conn, cancel_flag): + # shrink leaf partitions separately in parallel + # FIXME: alter table on external table does not throw + # a warning, but it will throw error in 6X + # do we still need using alter external table? + if self.root_partition_oid is not None: + return True + else: + # FIXME: Can "ONLY" be allowed in "EXPAND TABLE"? + sql = 'ALTER TABLE %s SHRINK TABLE to %d' % (self.fq_name, self.size) + + logger.info('Shrinking %s.%s' % (self.dbname, self.fq_name)) + logger.debug("Shrink SQL: %s" % sql) + + # check is atomic in python + if not cancel_flag: + dbconn.execSQL(table_conn, sql) + # the ALTER TABLE command requires a commit to execute + table_conn.commit() + if self.options.analyze: + sql = 'ANALYZE %s' % (self.fq_name) + logger.info('Analyzing %s' % (self.fq_name)) + dbconn.execSQL(table_conn, sql) + + return True + + # I can only get here if the cancel flag is True + return False + + def mark_finished(self, status_conn, start_time, finish_time): + sql = """UPDATE gpshrink.status_detail + SET status = '%s', shrink_started='%s', shrink_finished='%s' + WHERE dbname = '%s' + AND table_oid = %s """ % (done_status, start_time, finish_time, + self.dbname.replace("'", "''"), self.table_oid) + logger.debug(sql) + dbconn.execSQL(status_conn, sql) + + def mark_does_not_exist(self, status_conn, finish_time): + sql = """UPDATE gpshrink.status_detail + SET status = '%s', shrink_finished='%s' + WHERE dbname = '%s' + AND table_oid = %s """ % (does_not_exist_status, finish_time, + self.dbname.replace("'", "''"), self.table_oid) + logger.debug(sql) + dbconn.execSQL(status_conn, sql) + +def sig_handler(sig, arg): + if _gp_shrink is not None: + _gp_shrink.shutdown() + + signal.signal(signal.SIGTERM, signal.SIG_DFL) + signal.signal(signal.SIGHUP, signal.SIG_DFL) + + # raise sig + os.kill(os.getpid(), sig) + + +# ----------------------------------------------- +class ShrinkCommand(SQLCommand): + def __init__(self, name, status_url, table, options): + self.status_url = status_url + self.table = table + self.options = options + self.cmdStr = "Shrink %s.%s" % (table.dbname, table.fq_name) + self.table_url = copy.deepcopy(status_url) + self.table_url.pgdb = table.dbname + self.table_shrink_error = False + + SQLCommand.__init__(self, name) + + def run(self, validateAfter=False): + # connect. + status_conn = None + table_conn = None + table_exp_success = False + + try: + status_conn = dbconn.connect(self.status_url, encoding='UTF8') + table_conn = dbconn.connect(self.table_url, encoding='UTF8') + except DatabaseError as ex: + if self.options.verbose: + logger.exception(ex) + logger.error(ex.__str__().strip()) + if status_conn: status_conn.close() + if table_conn: table_conn.close() + self.table_shrink_error = True + return + + # validate table hasn't been dropped + start_time = None + try: + sql = """select * from pg_class c where c.oid = %d """ % (self.table.table_oid) + + cursor = dbconn.query(table_conn, sql) + + if cursor.rowcount == 0: + logger.info('%s no longer exists in database %s' % (self.table.fq_name, + self.table.dbname)) + + self.table.mark_does_not_exist(status_conn, datetime.datetime.now()) + status_conn.close() + table_conn.close() + return + else: + # Set conn for cancel + self.cancel_conn = table_conn + start_time = datetime.datetime.now() + if not self.options.simple_progress: + self.table.mark_started(status_conn, table_conn, start_time, self.cancel_flag) + + table_exp_success = self.table.shrink(table_conn, self.cancel_flag) + + except Exception as ex: + if ex.__str__().find('canceling statement due to user request') == -1 and not self.cancel_flag: + self.table_shrink_error = True + if self.options.verbose: + logger.exception(ex) + logger.error('Table %s.%s failed to shrink: %s' % (self.table.dbname, + self.table.fq_name, + ex.__str__().strip())) + else: + logger.info('ALTER TABLE of %s.%s canceled' % ( + self.table.dbname, self.table.fq_name)) + + if table_exp_success: + end_time = datetime.datetime.now() + # update metadata + logger.info( + "Finished shrinking %s.%s" % (self.table.dbname, self.table.fq_name)) + self.table.mark_finished(status_conn, start_time, end_time) + elif not self.options.simple_progress: + logger.info("Resetting status_detail for %s.%s" % ( + self.table.dbname, self.table.fq_name)) + self.table.reset_started(status_conn) + + # disconnect + status_conn.close() + table_conn.close() + + def set_results(self, results): + raise ExecutionError("TODO: must implement", None) + + def get_results(self): + raise ExecutionError("TODO: must implement", None) + + def was_successful(self): + raise ExecutionError("TODO: must implement", None) + + def validate(self, expected_rc=0): + raise ExecutionError("TODO: must implement", None) + + +# ------------------------------- UI Help -------------------------------- +def read_hosts_file(hosts_file): + new_hosts = [] + try: + f = open(hosts_file, 'r') + try: + for l in f: + if l.strip().startswith('#') or l.strip() == '': + continue + + new_hosts.append(l.strip()) + + finally: + f.close() + except IOError: + raise ShrinkError('Hosts file %s not found' % hosts_file) + + return new_hosts + + +# -------------------------------------------------------------------------- +# Main +# -------------------------------------------------------------------------- +def main(options, args, parser): + global _gp_shrink + + remove_pid = True + try: + # setup signal handlers so we can clean up correctly + signal.signal(signal.SIGTERM, sig_handler) + signal.signal(signal.SIGHUP, sig_handler) + + logger = get_default_logger() + setup_tool_logging(EXECNAME, getLocalHostname(), getUserName()) + + options, args = validate_options(options, args, parser) + + if options.verbose: + enable_verbose_logging() + + if is_gpshrink_running(options.coordinator_data_directory): + logger.error('gpshrink is already running. Only one instance') + logger.error('of gpshrink is allowed at a time.') + remove_pid = False + sys.exit(1) + else: + create_pid_file(options.coordinator_data_directory) + + # prepare provider for updateSystemConfig + gpEnv = GpCoordinatorEnvironment(options.coordinator_data_directory, True) + configurationInterface.registerConfigurationProvider( + configurationImplGpdb.GpConfigurationProviderUsingGpdbCatalog()) + configurationInterface.getConfigurationProvider().initializeProvider(gpEnv.getCoordinatorPort()) + + dburl = dbconn.DbURL(dbname=DBNAME, port=gpEnv.getCoordinatorPort()) + + gpshrink_db_status = gpshrink.prepare_gpdb_state(logger, dburl, options) + + # Get array configuration + try: + gparray = GpArray.initFromCatalog(dburl, utility=True) + except DatabaseError as ex: + logger.error('Failed to connect to database. Make sure the') + logger.error('CloudberryDB instance you wish to shrink is running') + logger.error('and that your environment is correct, then rerun') + logger.error('gshrink ' + ' '.join(sys.argv[1:])) + sys.exit(1) + + _gp_shrink = gpshrink(logger, gparray, dburl, options, parallel=options.parallel) + + if options.clean: + _gp_shrink.cleanup_schema() + _gp_shrink.cleanup_file() + logger.info('Cleanup Finished. exiting...') + sys.exit(0) + + if options.rollback: + try: + logger.info('Rollback is not support in shrink.') + sys.exit(0) + except ShrinkError as e: + logger.error(e) + sys.exit(1) + + if options.filename is None: + logger.error('gpshrink must with input file') + + if gpshrink_db_status is None and options.filename: + _gp_shrink.validate_heap_checksums() + removeSegList = _gp_shrink.read_input_files() + _gp_shrink.add_remove_segments(removeSegList) + _gp_shrink.start_prepare() + _gp_shrink.lock_catalog() + _gp_shrink.update_original_segments() + _gp_shrink.update_catalog_swap_segment() + _gp_shrink.unlock_catalog() + _gp_shrink.setup_schema() + _gp_shrink.prepare_schema() + logger.info('************************************************') + logger.info('Initialization of the system shrink complete.') + logger.info('To begin table shrink onto the new segments') + logger.info('rerun gpshrink') + logger.info('************************************************') + elif gpshrink_db_status == 'SETUP DONE' or gpshrink_db_status == 'SHRINK STARTED' or gpshrink_db_status == 'SHRINK STOPPED': + if not _gp_shrink.validate_max_connections(): + raise ValidationError() + removeSegList = _gp_shrink.read_input_files() + _gp_shrink.add_remove_segments(removeSegList) + _gp_shrink.compute_shrink_size() + _gp_shrink.perform_shrink() + _gp_shrink.lock_catalog() + _gp_shrink.update_original_segments() + _gp_shrink.update_catalog_remove_segments() + _gp_shrink.unlock_catalog() + _gp_shrink.stop_remove_segments() + elif gpshrink_db_status == 'SHRINK COMPLETE': + logger.info('shrink has already completed.') + logger.info('If you want to shrink again, run gpshrink -c to remove') + logger.info('the gpshrink schema and begin a new shrink') + else: + logger.error('gpshrink_db_status is %s', gpshrink_db_status) + logger.error('The last gpshrink setup did not complete successfully.') + logger.error('Please run gpshrink -c to clean to the original state.') + + logger.info("Exiting...") + sys.exit(0) + + except ValidationError as e: + logger.info('Input validation failed: %s', e) + if _gp_shrink is not None: + _gp_shrink.shutdown() + sys.exit() + except Exception as e: + logger.error('Exeception happens: %s', e) + if _gp_shrink is not None: + _gp_shrink.shutdown() + if not (gpshrink_db_status is None): + logger.error('May left the database in uncompleted state') + logger.error('Any remaining issues must be addressed outside of gpexpand.') + logger.error('You can shrink all the table in the database by yourself in gpshrink.status_detail.') + logger.error('And \'gpshrink -c\' to clean the file and schema.') + sys.exit(3) + except KeyboardInterrupt: + # Disable SIGINT while we shutdown. + signal.signal(signal.SIGINT, signal.SIG_IGN) + + if _gp_shrink is not None: + _gp_shrink.shutdown() + + # Re-enabled SIGINT + signal.signal(signal.SIGINT, signal.default_int_handler) + + sys.exit('\nUser Interrupted') + + + finally: + try: + if remove_pid and options: + remove_pid_file(options.coordinator_data_directory) + except NameError: + pass + + if _gp_shrink is not None: + _gp_shrink.halt_work() + + +if __name__ == '__main__': + options, args, parser = parseargs() + main(options, args, parser) \ No newline at end of file diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index bfbfd36c904..70a47275b76 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -526,9 +526,9 @@ static void RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid, static void RangeVarCallbackForAlterRelation(const RangeVar *rv, Oid relid, Oid oldrelid, void *arg); -static void ATExecExpandTable(List **wqueue, Relation rel, AlterTableCmd *cmd); -static void ATExecExpandPartitionTablePrepare(Relation rel); -static void ATExecExpandTableCTAS(AlterTableCmd *rootCmd, Relation rel, AlterTableCmd *cmd); +static void ATExecExpandTable(List **wqueue, Relation rel, AlterTableCmd *cmd, int numsegments); +static void ATExecExpandPartitionTablePrepare(Relation rel, int numsegments); +static void ATExecExpandTableCTAS(AlterTableCmd *rootCmd, Relation rel, AlterTableCmd *cmd, int numsegments); static void ATExecSetDistributedBy(Relation rel, Node *node, AlterTableCmd *cmd); @@ -4908,6 +4908,7 @@ AlterTableGetLockLevel(List *cmds) /* GPDB additions */ case AT_ExpandTable: case AT_ExpandPartitionTablePrepare: + case AT_ShrinkTable: case AT_SetDistributedBy: cmd_lockmode = AccessExclusiveLock; break; @@ -5478,6 +5479,37 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, pass = AT_PASS_MISC; break; + case AT_ShrinkTable: + ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE | ATT_MATVIEW); + + /* ATTACH and DETACH will process in ATExecAttachPartition function */ + if (!recursing) + { + Assert(IsA(cmd->def, Integer)); + if (Gp_role == GP_ROLE_DISPATCH && + rel->rd_cdbpolicy->numsegments <= intVal(cmd->def)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot shrink table \"%s\"", + RelationGetRelationName(rel)), + errdetail("table numsegments \"%d\", shrink size \"%d\" " ,rel->rd_cdbpolicy->numsegments, intVal(cmd->def)))); + + if (rel->rd_rel->relispartition) + { + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot shrink leaf or interior partition \"%s\"", + RelationGetRelationName(rel)), + errdetail("Root/leaf/interior partitions need to have same numsegments"), + errhint("Call ALTER TABLE SHRINK TABLE on the root table instead"))); + } + + } + + ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context); + pass = AT_PASS_MISC; + break; + case AT_AddInherit: /* INHERIT */ ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE); /* This command never recurses */ @@ -5981,10 +6013,13 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, ATExecSetDistributedBy(rel, (Node *) cmd->def, cmd); break; case AT_ExpandTable: /* EXPAND TABLE */ - ATExecExpandTable(wqueue, rel, cmd); + ATExecExpandTable(wqueue, rel, cmd, getgpsegmentCount()); break; case AT_ExpandPartitionTablePrepare: /* EXPAND PARTITION PREPARE */ - ATExecExpandPartitionTablePrepare(rel); + ATExecExpandPartitionTablePrepare(rel, getgpsegmentCount()); + break; + case AT_ShrinkTable: /* EXPAND TABLE */ + ATExecExpandTable(wqueue, rel, cmd, intVal(cmd->def)); break; case AT_AttachPartition: cmd = ATParseTransformCmd(wqueue, tab, rel, cmd, false, lockmode, @@ -17839,9 +17874,13 @@ checkPolicyCompatibleWithIndexes(Relation rel, GpPolicy *pol) * the data to the new reltion file, and swap it in place of the old one. * This is called the "CTAS method", because it uses a CREATE TABLE AS * command internally to create the new physical relation. + * + * To support shrink, We add parameter numsegments to set table policy to + * arbitrary size. For expand, the numsegments is getgpsegmentCount. For shrink + * the numsegments is input of user. */ static void -ATExecExpandTable(List **wqueue, Relation rel, AlterTableCmd *cmd) +ATExecExpandTable(List **wqueue, Relation rel, AlterTableCmd *cmd, int numsegments) { AlteredTableInfo *tab; AlterTableCmd *rootCmd; @@ -17903,11 +17942,11 @@ ATExecExpandTable(List **wqueue, Relation rel, AlterTableCmd *cmd) } else { - ATExecExpandTableCTAS(rootCmd, rel, cmd); + ATExecExpandTableCTAS(rootCmd, rel, cmd, numsegments); } /* Update numsegments to cluster size */ - newPolicy->numsegments = getgpsegmentCount(); + newPolicy->numsegments = numsegments; GpPolicyReplace(relid, newPolicy); } @@ -17932,11 +17971,12 @@ ATExecExpandTable(List **wqueue, Relation rel, AlterTableCmd *cmd) * and new policy type of leaf partitions are randomly on 3 segments * * @param rel the parent or leaf of partition table + * + * Add parameter numsegments, see ATExecExpandTable for details. */ static void -ATExecExpandPartitionTablePrepare(Relation rel) +ATExecExpandPartitionTablePrepare(Relation rel, int numsegments) { - int new_numsegments = getgpsegmentCount(); Oid relid = RelationGetRelid(rel); if (GpPolicyIsRandomPartitioned(rel->rd_cdbpolicy) || rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) @@ -17951,7 +17991,7 @@ ATExecExpandPartitionTablePrepare(Relation rel) */ oldcontext = MemoryContextSwitchTo(GetMemoryChunkContext(rel)); new_policy = GpPolicyCopy(rel->rd_cdbpolicy); - new_policy->numsegments = new_numsegments; + new_policy->numsegments = numsegments; MemoryContextSwitchTo(oldcontext); GpPolicyReplace(relid, new_policy); @@ -17977,7 +18017,7 @@ ATExecExpandPartitionTablePrepare(Relation rel) /* Just modify the numsegments for external writable leaves */ oldcontext = MemoryContextSwitchTo(GetMemoryChunkContext(rel)); new_policy = GpPolicyCopy(rel->rd_cdbpolicy); - new_policy->numsegments = new_numsegments; + new_policy->numsegments = numsegments; MemoryContextSwitchTo(oldcontext); GpPolicyReplace(relid, new_policy); @@ -17993,7 +18033,7 @@ ATExecExpandPartitionTablePrepare(Relation rel) /* we change policy type to randomly for regular leaf partitions distributed by hash */ oldcontext = MemoryContextSwitchTo(GetMemoryChunkContext(rel)); - new_policy = createRandomPartitionedPolicy(new_numsegments); + new_policy = createRandomPartitionedPolicy(numsegments); MemoryContextSwitchTo(oldcontext); GpPolicyReplace(relid, new_policy); @@ -18004,7 +18044,7 @@ ATExecExpandPartitionTablePrepare(Relation rel) } static void -ATExecExpandTableCTAS(AlterTableCmd *rootCmd, Relation rel, AlterTableCmd *cmd) +ATExecExpandTableCTAS(AlterTableCmd *rootCmd, Relation rel, AlterTableCmd *cmd, int numsegments) { RangeVar *tmprv; Datum newOptions; @@ -18045,7 +18085,7 @@ ATExecExpandTableCTAS(AlterTableCmd *rootCmd, Relation rel, AlterTableCmd *cmd) /* Step (b) - build CTAS */ distby = make_distributedby_for_rel(rel); - distby->numsegments = getgpsegmentCount(); + distby->numsegments = numsegments; newOptions = new_rel_opts(rel); diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index cdd1670c0f1..90b96b185e7 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -869,7 +869,7 @@ static void check_expressions_in_partition_key(PartitionSpec *spec, core_yyscan_ RANDOMLY READABLE READS REJECT_P REPLICATED RESOURCE ROOTPARTITION - SCATTER SEGMENT SEGMENTS SPLIT SUBPARTITION + SCATTER SEGMENT SEGMENTS SHRINK SPLIT SUBPARTITION TASK SCHEDULE @@ -3547,6 +3547,14 @@ alter_table_cmd: n->subtype = AT_ExpandPartitionTablePrepare; $$ = (Node *)n; } + /* ALTER TABLE SHRINK TABLE TO */ + | SHRINK TABLE TO SignedIconst + { + AlterTableCmd *n = makeNode(AlterTableCmd); + n->subtype = AT_ShrinkTable; + n->def = (Node *) makeInteger($4); + $$ = (Node *)n; + } /* ALTER TABLE OF */ | OF any_name { @@ -18961,6 +18969,7 @@ unreserved_keyword: | SETS | SHARE | SHOW + | SHRINK | SIMPLE | SKIP | SNAPSHOT @@ -19937,6 +19946,7 @@ bare_label_keyword: | SETS | SHARE | SHOW + | SHRINK | SIMILAR | SIMPLE | SKIP diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index d9a278ede94..19083ce9ec2 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -2106,6 +2106,7 @@ typedef enum AlterTableType AT_SetDistributedBy, /* SET DISTRIBUTED BY */ AT_ExpandTable, /* EXPAND DISTRIBUTED */ AT_ExpandPartitionTablePrepare, /* EXPAND PARTITION PREPARE */ + AT_ShrinkTable, /* SHRINK DISTRIBUTED */ /* GPDB: Legacy commands to manipulate partitions */ AT_PartAdd, /* Add */ diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index d8aca8f4a43..fa539d38949 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -446,6 +446,7 @@ PG_KEYWORD("setof", SETOF, COL_NAME_KEYWORD, BARE_LABEL) PG_KEYWORD("sets", SETS, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("share", SHARE, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("show", SHOW, UNRESERVED_KEYWORD, BARE_LABEL) +PG_KEYWORD("shrink", SHRINK, UNRESERVED_KEYWORD, BARE_LABEL) /* GPDB */ PG_KEYWORD("similar", SIMILAR, TYPE_FUNC_NAME_KEYWORD, BARE_LABEL) PG_KEYWORD("simple", SIMPLE, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("skip", SKIP, UNRESERVED_KEYWORD, BARE_LABEL) diff --git a/src/test/isolation2/Makefile b/src/test/isolation2/Makefile index 2404e6fa9b7..0d0ad9bca28 100644 --- a/src/test/isolation2/Makefile +++ b/src/test/isolation2/Makefile @@ -69,6 +69,9 @@ installcheck: install installcheck-parallel-retrieve-cursor installcheck-resgroup: install $(pg_isolation2_regress_installcheck) --init-file=$(top_builddir)/src/test/regress/init_file --init-file=./init_file_resgroup --dbname=isolation2resgrouptest --schedule=$(srcdir)/isolation2_resgroup_schedule +installcheck-expandshrink: install + $(pg_isolation2_regress_installcheck) --init-file=$(top_builddir)/src/test/regress/init_file --init-file=./init_file_isolation2 --dbname=isolation2expandshrinktest --schedule=$(srcdir)/isolation2_expandshrink_schedule + installcheck-parallel-retrieve-cursor: install $(pg_isolation2_regress_installcheck) $(EXTRA_REGRESS_OPTS) --init-file=$(top_builddir)/src/test/regress/init_file --init-file=./init_file_parallel_retrieve_cursor --bindir='$(bindir)' --inputdir=$(srcdir) --dbname=isolation2parallelretrcursor --load-extension=gp_inject_fault --schedule=$(srcdir)/parallel_retrieve_cursor_schedule diff --git a/src/test/isolation2/expected/gpexpand_gpshrink.out b/src/test/isolation2/expected/gpexpand_gpshrink.out new file mode 100644 index 00000000000..c571c299bbd --- /dev/null +++ b/src/test/isolation2/expected/gpexpand_gpshrink.out @@ -0,0 +1,397 @@ +!\retcode yes | gpexpand -c; +-- start_ignore + +-- end_ignore +(exited with code 3) + +!\retcode gpshrink -c; +-- start_ignore + +-- end_ignore +(exited with code 0) + +!\retcode rm -r /tmp/datadirs/; +-- start_ignore + +-- end_ignore +(exited with code 1) + +-- expand one segment and shrink +!\retcode echo "localhost|localhost|7008|/tmp/datadirs/dbfast4/demoDataDir3|9|3|p localhost|localhost|7009|/tmp/datadirs/dbfast_mirror4/demoDataDir3|10|3|m" > /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +create table test(a int); +CREATE + +insert into test select i from generate_series(1,100) i; +INSERT 100 + +select gp_segment_id, count(*) from test group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 38 + 1 | 37 + 2 | 25 +(3 rows) + +select count(*) from gp_segment_configuration; + count +------- + 8 +(1 row) + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test' and c.oid=d.localoid; + policytype | numsegments | distkey +------------+-------------+--------- + p | 3 | 1 +(1 row) + +!\retcode gpexpand -i /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +!\retcode gpexpand -i /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +select gp_segment_id, count(*) from test group by gp_segment_id; + gp_segment_id | count +---------------+------- + 1 | 30 + 3 | 26 + 0 | 28 + 2 | 16 +(4 rows) + +select count(*) from gp_segment_configuration; + count +------- + 10 +(1 row) + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test' and c.oid=d.localoid; + policytype | numsegments | distkey +------------+-------------+--------- + p | 4 | 1 +(1 row) + +!\retcode gpshrink -i /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +!\retcode gpshrink -i /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +select gp_segment_id, count(*) from test group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 38 + 1 | 37 + 2 | 25 +(3 rows) + +select count(*) from gp_segment_configuration; + count +------- + 8 +(1 row) + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test' and c.oid=d.localoid; + policytype | numsegments | distkey +------------+-------------+--------- + p | 3 | 1 +(1 row) + +!\retcode yes | gpexpand -c; +-- start_ignore + +-- end_ignore +(exited with code 0) + +!\retcode gpshrink -c; +-- start_ignore + + +-- end_ignore +(exited with code 0) + +!\retcode rm -r /tmp/datadirs/; +-- start_ignore + +-- end_ignore +(exited with code 0) + +drop table test; +DROP + + +-- expand one segment and shrink +!\retcode echo "localhost|localhost|7008|/tmp/datadirs/dbfast4/demoDataDir3|9|3|p localhost|localhost|7009|/tmp/datadirs/dbfast_mirror4/demoDataDir3|10|3|m" > /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +create table test_partitioned (a int, b int) partition by range (b) (start(1) end(101) every(20),default partition def); +CREATE + +insert into test_partitioned select i,i from generate_series(1,100) i; +INSERT 100 + +select gp_segment_id, count(*) from test_partitioned group by gp_segment_id; + gp_segment_id | count +---------------+------- + 1 | 37 + 2 | 25 + 0 | 38 +(3 rows) + +select count(*) from gp_segment_configuration; + count +------- + 8 +(1 row) + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test_partitioned' and c.oid=d.localoid; + policytype | numsegments | distkey +------------+-------------+--------- + p | 3 | 1 +(1 row) + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test_partitioned_1_prt_2' and c.oid=d.localoid; + policytype | numsegments | distkey +------------+-------------+--------- + p | 3 | 1 +(1 row) + +!\retcode gpexpand -i /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +!\retcode gpexpand -i /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +select gp_segment_id, count(*) from test_partitioned group by gp_segment_id; + gp_segment_id | count +---------------+------- + 1 | 30 + 2 | 16 + 0 | 28 + 3 | 26 +(4 rows) + +select count(*) from gp_segment_configuration; + count +------- + 10 +(1 row) + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test_partitioned' and c.oid=d.localoid; + policytype | numsegments | distkey +------------+-------------+--------- + p | 4 | 1 +(1 row) + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test_partitioned_1_prt_2' and c.oid=d.localoid; + policytype | numsegments | distkey +------------+-------------+--------- + p | 4 | 1 +(1 row) + +!\retcode gpshrink -i /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +!\retcode gpshrink -i /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +select gp_segment_id, count(*) from test_partitioned group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 38 + 2 | 25 + 1 | 37 +(3 rows) + +select count(*) from gp_segment_configuration; + count +------- + 8 +(1 row) + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test_partitioned' and c.oid=d.localoid; + policytype | numsegments | distkey +------------+-------------+--------- + p | 3 | 1 +(1 row) + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test_partitioned_1_prt_2' and c.oid=d.localoid; + policytype | numsegments | distkey +------------+-------------+--------- + p | 3 | 1 +(1 row) + +drop table test_partitioned; +DROP + +!\retcode yes | gpexpand -c; +-- start_ignore + +-- end_ignore +(exited with code 0) + +!\retcode gpshrink -c; +-- start_ignore + + +-- end_ignore +(exited with code 0) + +!\retcode rm -r /tmp/datadirs/; +-- start_ignore + +-- end_ignore +(exited with code 0) + + +-- expand two segment and shrink +!\retcode echo "localhost|localhost|7008|/tmp/datadirs/dbfast4/demoDataDir3|9|3|p localhost|localhost|7009|/tmp/datadirs/dbfast_mirror4/demoDataDir3|10|3|m localhost|localhost|7010|/tmp/datadirs/dbfast5/demoDataDir4|11|4|p localhost|localhost|7011|/tmp/datadirs/dbfast_mirror5/demoDataDir4|12|4|m" > /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +create table test(a int); +CREATE + +insert into test select i from generate_series(1,100) i; +INSERT 100 + +select gp_segment_id, count(*) from test group by gp_segment_id; + gp_segment_id | count +---------------+------- + 1 | 37 + 2 | 25 + 0 | 38 +(3 rows) + +select count(*) from gp_segment_configuration; + count +------- + 8 +(1 row) + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test' and c.oid=d.localoid; + policytype | numsegments | distkey +------------+-------------+--------- + p | 3 | 1 +(1 row) + +!\retcode gpexpand -i /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +!\retcode gpexpand -i /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +select gp_segment_id, count(*) from test group by gp_segment_id; + gp_segment_id | count +---------------+------- + 3 | 22 + 2 | 13 + 1 | 20 + 0 | 24 + 4 | 21 +(5 rows) + +select count(*) from gp_segment_configuration; + count +------- + 12 +(1 row) + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test' and c.oid=d.localoid; + policytype | numsegments | distkey +------------+-------------+--------- + p | 5 | 1 +(1 row) + +!\retcode gpshrink -i /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +!\retcode gpshrink -i /tmp/testexpand; +-- start_ignore + +-- end_ignore +(exited with code 0) + +select gp_segment_id, count(*) from test group by gp_segment_id; + gp_segment_id | count +---------------+------- + 1 | 37 + 2 | 25 + 0 | 38 +(3 rows) + +select count(*) from gp_segment_configuration; + count +------- + 8 +(1 row) + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test' and c.oid=d.localoid; + policytype | numsegments | distkey +------------+-------------+--------- + p | 3 | 1 +(1 row) + +drop table test; +DROP + +!\retcode yes | gpexpand -c; +-- start_ignore + +-- end_ignore +(exited with code 0) + +!\retcode gpshrink -c; +-- start_ignore + +-- end_ignore +(exited with code 0) + +!\retcode rm -r /tmp/datadirs/; +-- start_ignore + +-- end_ignore +(exited with code 0) diff --git a/src/test/isolation2/isolation2_expandshrink_schedule b/src/test/isolation2/isolation2_expandshrink_schedule new file mode 100644 index 00000000000..3fd75a5eff0 --- /dev/null +++ b/src/test/isolation2/isolation2_expandshrink_schedule @@ -0,0 +1,3 @@ +# Tests for gpexpand and gpshrink +# Keep for single schedule due to redistribute all the table in all database +test: gpexpand_gpshrink \ No newline at end of file diff --git a/src/test/isolation2/sql/gpexpand_gpshrink.sql b/src/test/isolation2/sql/gpexpand_gpshrink.sql new file mode 100644 index 00000000000..6a73fd5a254 --- /dev/null +++ b/src/test/isolation2/sql/gpexpand_gpshrink.sql @@ -0,0 +1,141 @@ +!\retcode yes | gpexpand -c; + +!\retcode gpshrink -c; + +!\retcode rm -r /tmp/datadirs/; + +-- expand one segment and shrink +!\retcode echo "localhost|localhost|7008|/tmp/datadirs/dbfast4/demoDataDir3|9|3|p +localhost|localhost|7009|/tmp/datadirs/dbfast_mirror4/demoDataDir3|10|3|m" > /tmp/testexpand; + +create table test(a int); + +insert into test select i from generate_series(1,100) i; + +select gp_segment_id, count(*) from test group by gp_segment_id; + +select count(*) from gp_segment_configuration; + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test' and c.oid=d.localoid; + +!\retcode gpexpand -i /tmp/testexpand; + +!\retcode gpexpand -i /tmp/testexpand; + +select gp_segment_id, count(*) from test group by gp_segment_id; + +select count(*) from gp_segment_configuration; + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test' and c.oid=d.localoid; + +!\retcode gpshrink -i /tmp/testexpand; + +!\retcode gpshrink -i /tmp/testexpand; + +select gp_segment_id, count(*) from test group by gp_segment_id; + +select count(*) from gp_segment_configuration; + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test' and c.oid=d.localoid; + +!\retcode yes | gpexpand -c; + +!\retcode gpshrink -c; + +!\retcode rm -r /tmp/datadirs/; + +drop table test; + + +-- expand one segment and shrink +!\retcode echo "localhost|localhost|7008|/tmp/datadirs/dbfast4/demoDataDir3|9|3|p +localhost|localhost|7009|/tmp/datadirs/dbfast_mirror4/demoDataDir3|10|3|m" > /tmp/testexpand; + +create table test_partitioned (a int, b int) partition by range (b) (start(1) end(101) every(20),default partition def); + +insert into test_partitioned select i,i from generate_series(1,100) i; + +select gp_segment_id, count(*) from test_partitioned group by gp_segment_id; + +select count(*) from gp_segment_configuration; + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test_partitioned' and c.oid=d.localoid; + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test_partitioned_1_prt_2' and c.oid=d.localoid; + +!\retcode gpexpand -i /tmp/testexpand; + +!\retcode gpexpand -i /tmp/testexpand; + +select gp_segment_id, count(*) from test_partitioned group by gp_segment_id; + +select count(*) from gp_segment_configuration; + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test_partitioned' and c.oid=d.localoid; + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test_partitioned_1_prt_2' and c.oid=d.localoid; + +!\retcode gpshrink -i /tmp/testexpand; + +!\retcode gpshrink -i /tmp/testexpand; + +select gp_segment_id, count(*) from test_partitioned group by gp_segment_id; + +select count(*) from gp_segment_configuration; + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test_partitioned' and c.oid=d.localoid; + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test_partitioned_1_prt_2' and c.oid=d.localoid; + +drop table test_partitioned; + +!\retcode yes | gpexpand -c; + +!\retcode gpshrink -c; + +!\retcode rm -r /tmp/datadirs/; + + +-- expand two segment and shrink +!\retcode echo "localhost|localhost|7008|/tmp/datadirs/dbfast4/demoDataDir3|9|3|p +localhost|localhost|7009|/tmp/datadirs/dbfast_mirror4/demoDataDir3|10|3|m +localhost|localhost|7010|/tmp/datadirs/dbfast5/demoDataDir4|11|4|p +localhost|localhost|7011|/tmp/datadirs/dbfast_mirror5/demoDataDir4|12|4|m" > /tmp/testexpand; + +create table test(a int); + +insert into test select i from generate_series(1,100) i; + +select gp_segment_id, count(*) from test group by gp_segment_id; + +select count(*) from gp_segment_configuration; + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test' and c.oid=d.localoid; + +!\retcode gpexpand -i /tmp/testexpand; + +!\retcode gpexpand -i /tmp/testexpand; + +select gp_segment_id, count(*) from test group by gp_segment_id; + +select count(*) from gp_segment_configuration; + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test' and c.oid=d.localoid; + +!\retcode gpshrink -i /tmp/testexpand; + +!\retcode gpshrink -i /tmp/testexpand; + +select gp_segment_id, count(*) from test group by gp_segment_id; + +select count(*) from gp_segment_configuration; + +select policytype, numsegments, distkey from gp_distribution_policy d, pg_class c where c.relname='test' and c.oid=d.localoid; + +drop table test; + +!\retcode yes | gpexpand -c; + +!\retcode gpshrink -c; + +!\retcode rm -r /tmp/datadirs/; \ No newline at end of file diff --git a/src/test/regress/expected/shrink_table.out b/src/test/regress/expected/shrink_table.out new file mode 100644 index 00000000000..74ed3b6b80b --- /dev/null +++ b/src/test/regress/expected/shrink_table.out @@ -0,0 +1,856 @@ +drop schema if exists test_shrink_table cascade; +NOTICE: schema "test_shrink_table" does not exist, skipping +create schema test_shrink_table; +set search_path=test_shrink_table,public; +set default_table_access_method='heap'; +set allow_system_table_mods=true; +-- Hash distributed tables +Create table t1(a int, b int, c int) distributed by (a); +insert into t1 select i,i,0 from generate_series(1,100) I; +Update t1 set c = gp_segment_id; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 2 | 25 + 0 | 38 + 1 | 37 +(3 rows) + +begin; +Alter table t1 shrink table to 2; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 53 + 1 | 47 +(2 rows) + +abort; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 38 + 2 | 25 + 1 | 37 +(3 rows) + +Alter table t1 shrink table to 2; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 53 + 1 | 47 +(2 rows) + +select numsegments from gp_distribution_policy where localoid='t1'::regclass; + numsegments +------------- + 2 +(1 row) + +drop table t1; +Create table t1(a int, b int, c int) distributed by (a,b); +insert into t1 select i,i,0 from generate_series(1,100) I; +Update t1 set c = gp_segment_id; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 2 | 37 + 0 | 33 + 1 | 30 +(3 rows) + +begin; +Alter table t1 shrink table to 1; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 100 +(1 row) + +abort; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 33 + 1 | 30 + 2 | 37 +(3 rows) + +Alter table t1 shrink table to 1; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 100 +(1 row) + +select numsegments from gp_distribution_policy where localoid='t1'::regclass; + numsegments +------------- + 1 +(1 row) + +drop table t1; +-- Test NULLs. +Create table t1(a int, b int, c int) distributed by (a,b,c); +insert into t1 values + (1, 1, 1 ), + (null, 2, 2 ), + (3, null, 3 ), + (4, 4, null), + (null, null, 5 ), + (null, 6, null), + (7, null, null), + (null, null, null); +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 3 + 1 | 1 + 2 | 4 +(3 rows) + +begin; +Alter table t1 shrink table to 2; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 4 + 1 | 4 +(2 rows) + +abort; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 3 + 1 | 1 + 2 | 4 +(3 rows) + +Alter table t1 shrink table to 2; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 4 + 1 | 4 +(2 rows) + +select numsegments from gp_distribution_policy where localoid='t1'::regclass; + numsegments +------------- + 2 +(1 row) + +drop table t1; +Create table t1(a int, b int, c int) distributed by (a) partition by list(b) (partition t1_1 values(1), partition t1_2 values(2), default partition other); +insert into t1 select i,i,0 from generate_series(1,100) I; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; +NOTICE: One or more columns in the following table(s) do not have statistics: t1 +HINT: For non-partitioned tables, run analyze (). For partitioned tables, run analyze rootpartition (). See log for columns missing statistics. + gp_segment_id | count +---------------+------- + 0 | 38 + 1 | 37 + 2 | 25 +(3 rows) + +begin; +Alter table t1 shrink table to 2; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 53 + 1 | 47 +(2 rows) + +abort; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; +NOTICE: One or more columns in the following table(s) do not have statistics: t1 +HINT: For non-partitioned tables, run analyze (). For partitioned tables, run analyze rootpartition (). See log for columns missing statistics. + gp_segment_id | count +---------------+------- + 0 | 38 + 1 | 37 + 2 | 25 +(3 rows) + +Alter table t1 shrink table to 2; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 53 + 1 | 47 +(2 rows) + +select numsegments from gp_distribution_policy where localoid='t1'::regclass; + numsegments +------------- + 2 +(1 row) + +drop table t1; +-- Random distributed tables +Create table r1(a int, b int, c int) distributed randomly; +insert into r1 select i,i,0 from generate_series(1,100) I; +Update r1 set c = gp_segment_id; +Select count(*) from r1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from r1 where gp_segment_id=2; + ?column? +---------- + t +(1 row) + +begin; +Alter table r1 shrink table to 2; +Select count(*) from r1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from r1 where gp_segment_id=2; + ?column? +---------- + f +(1 row) + +abort; +Select count(*) from r1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from r1 where gp_segment_id=2; + ?column? +---------- + t +(1 row) + +Alter table r1 shrink table to 2; +Select count(*) from r1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from r1 where gp_segment_id=2; + ?column? +---------- + f +(1 row) + +select numsegments from gp_distribution_policy where localoid='r1'::regclass; + numsegments +------------- + 2 +(1 row) + +drop table r1; +Create table r1(a int, b int, c int) distributed randomly; +insert into r1 select i,i,0 from generate_series(1,100) I; +Update r1 set c = gp_segment_id; +Select count(*) from r1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from r1 where gp_segment_id=2; + ?column? +---------- + t +(1 row) + +begin; +Alter table r1 shrink table to 2; +Select count(*) from r1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from r1 where gp_segment_id=2; + ?column? +---------- + f +(1 row) + +abort; +Select count(*) from r1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from r1 where gp_segment_id=2; + ?column? +---------- + t +(1 row) + +Alter table r1 shrink table to 2; +Select count(*) from r1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from r1 where gp_segment_id=2; + ?column? +---------- + f +(1 row) + +select numsegments from gp_distribution_policy where localoid='r1'::regclass; + numsegments +------------- + 2 +(1 row) + +drop table r1; +Create table r1(a int, b int, c int) distributed randomly partition by list(b) (partition r1_1 values(1), partition r1_2 values(2), default partition other); +insert into r1 select i,i,0 from generate_series(1,100) I; +Select count(*) from r1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from r1 where gp_segment_id=2; + ?column? +---------- + t +(1 row) + +begin; +Alter table r1 shrink table to 2; +Select count(*) from r1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from r1 where gp_segment_id=2; + ?column? +---------- + f +(1 row) + +abort; +Select count(*) from r1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from r1 where gp_segment_id=2; + ?column? +---------- + t +(1 row) + +Alter table r1 shrink table to 2; +Select count(*) from r1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from r1 where gp_segment_id=2; + ?column? +---------- + f +(1 row) + +select numsegments from gp_distribution_policy where localoid='r1'::regclass; + numsegments +------------- + 2 +(1 row) + +drop table r1; +-- Replicated tables +Create table r1(a int, b int, c int) distributed replicated; +insert into r1 select i,i,0 from generate_series(1,100) I; +Select count(*) from gp_dist_random('r1'); + count +------- + 300 +(1 row) + +begin; +Alter table r1 shrink table to 2; +Select count(*) from gp_dist_random('r1'); + count +------- + 200 +(1 row) + +abort; +Select count(*) from gp_dist_random('r1'); + count +------- + 300 +(1 row) + +Alter table r1 shrink table to 2; +Select count(*) from gp_dist_random('r1'); + count +------- + 200 +(1 row) + +select numsegments from gp_distribution_policy where localoid='r1'::regclass; + numsegments +------------- + 2 +(1 row) + +drop table r1; +-- +Create table r1(a int, b int, c int) distributed replicated; +insert into r1 select i,i,0 from generate_series(1,100) I; +Select count(*) from gp_dist_random('r1'); + count +------- + 300 +(1 row) + +begin; +Alter table r1 shrink table to 2; +Select count(*) from gp_dist_random('r1'); + count +------- + 200 +(1 row) + +abort; +Select count(*) from gp_dist_random('r1'); + count +------- + 300 +(1 row) + +Alter table r1 shrink table to 2; +Select count(*) from gp_dist_random('r1'); + count +------- + 200 +(1 row) + +select numsegments from gp_distribution_policy where localoid='r1'::regclass; + numsegments +------------- + 2 +(1 row) + +drop table r1; +-- table with update triggers on distributed key column +CREATE OR REPLACE FUNCTION trigger_func() RETURNS trigger AS $$ +BEGIN + RAISE NOTICE 'trigger_func(%) called: action = %, when = %, level = %', + TG_ARGV[0], TG_OP, TG_WHEN, TG_LEVEL; + RETURN NULL; +END; +$$ LANGUAGE plpgsql; +CREATE TABLE table_with_update_trigger(a int, b int, c int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Cloudberry Database data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +insert into table_with_update_trigger select i,i,0 from generate_series(1,100) I; +select gp_segment_id, count(*) from table_with_update_trigger group by 1 order by 1; + gp_segment_id | count +---------------+------- + 0 | 38 + 1 | 37 + 2 | 25 +(3 rows) + +CREATE TRIGGER foo_br_trigger BEFORE INSERT OR UPDATE OR DELETE ON table_with_update_trigger +FOR EACH ROW EXECUTE PROCEDURE trigger_func('before_stmt'); +CREATE TRIGGER foo_ar_trigger AFTER INSERT OR UPDATE OR DELETE ON table_with_update_trigger +FOR EACH ROW EXECUTE PROCEDURE trigger_func('before_stmt'); +CREATE TRIGGER foo_bs_trigger BEFORE INSERT OR UPDATE OR DELETE ON table_with_update_trigger +FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func('before_stmt'); +ERROR: Triggers for statements are not yet supported +CREATE TRIGGER foo_as_trigger AFTER INSERT OR UPDATE OR DELETE ON table_with_update_trigger +FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func('before_stmt'); +ERROR: Triggers for statements are not yet supported +-- update should fail +update table_with_update_trigger set a = a + 1; +ERROR: UPDATE on distributed key column not allowed on relation with update triggers +-- data expansion should success and not hiting any triggers. +Alter table table_with_update_trigger shrink table to 2; +select gp_segment_id, count(*) from table_with_update_trigger group by 1 order by 1; + gp_segment_id | count +---------------+------- + 0 | 53 + 1 | 47 +(2 rows) + +drop table table_with_update_trigger; +-- +-- Test shrinking inheritance parent table, parent table has different +-- numsegments with child tables. +-- +create table mix_base_tbl (a int4, b int4) DISTRIBUTED RANDOMLY; +insert into mix_base_tbl select g, g from generate_series(1, 3) g; +create table mix_child_a (a int4, b int4) inherits (mix_base_tbl) distributed by (a); +NOTICE: merging column "a" with inherited definition +NOTICE: merging column "b" with inherited definition +insert into mix_child_a select g, g from generate_series(11, 13) g; +create table mix_child_b (a int4, b int4) inherits (mix_base_tbl) distributed by (b); +NOTICE: merging column "a" with inherited definition +NOTICE: merging column "b" with inherited definition +insert into mix_child_b select g, g from generate_series(21, 23) g; +-- shrink the child table, not effect parent table +Alter table mix_child_a shrink table to 2; +select numsegments from gp_distribution_policy where localoid='mix_base_tbl'::regclass; + numsegments +------------- + 3 +(1 row) + +-- shrink the parent table, both parent and child table will be rebalanced to all +-- segments +select count(*) from mix_child_a where gp_segment_id = 2; + count +------- + 0 +(1 row) + +select count(*) from mix_child_b where gp_segment_id = 2; + count +------- + 1 +(1 row) + +Alter table mix_base_tbl shrink table to 2; +select count(*) from mix_child_a where gp_segment_id = 2; + count +------- + 0 +(1 row) + +select count(*) from mix_child_b where gp_segment_id = 2; + count +------- + 0 +(1 row) + +drop table mix_base_tbl cascade; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table mix_child_a +drop cascades to table mix_child_b +-- multi-level partition tables +CREATE TABLE part_t1(a int, b int, c int, d int, e int) +DISTRIBUTED BY(a) +PARTITION BY RANGE (b) + SUBPARTITION BY RANGE (c) + SUBPARTITION TEMPLATE ( + START(1) END (3) EVERY(1), + DEFAULT SUBPARTITION others_c) + SUBPARTITION BY LIST (d) + SUBPARTITION TEMPLATE ( + SUBPARTITION one VALUES (1), + SUBPARTITION two VALUES (2), + SUBPARTITION three VALUES (3), + DEFAULT SUBPARTITION others_d) +( START (1) END (2) EVERY (1), + DEFAULT PARTITION other_b); +insert into part_t1 select i,i%3,i%4,i%5,i from generate_series(1,100) I; +Update part_t1 set e = gp_segment_id; +Select gp_segment_id, count(*) from part_t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 38 + 1 | 37 + 2 | 25 +(3 rows) + +begin; +Alter table part_t1 shrink table to 2; +Select gp_segment_id, count(*) from part_t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 53 + 1 | 47 +(2 rows) + +abort; +Select gp_segment_id, count(*) from part_t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 2 | 25 + 0 | 38 + 1 | 37 +(3 rows) + +Alter table part_t1 shrink table to 2; +Select gp_segment_id, count(*) from part_t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 53 + 1 | 47 +(2 rows) + +select numsegments from gp_distribution_policy where localoid='part_t1'::regclass; + numsegments +------------- + 2 +(1 row) + +drop table part_t1; +-- +CREATE TABLE part_t1(a int, b int, c int, d int, e int) +DISTRIBUTED RANDOMLY +PARTITION BY RANGE (b) + SUBPARTITION BY RANGE (c) + SUBPARTITION TEMPLATE ( + START(1) END (3) EVERY(1), + DEFAULT SUBPARTITION others_c) + SUBPARTITION BY LIST (d) + SUBPARTITION TEMPLATE ( + SUBPARTITION one VALUES (1), + SUBPARTITION two VALUES (2), + SUBPARTITION three VALUES (3), + DEFAULT SUBPARTITION others_d) +( START (1) END (2) EVERY (1), + DEFAULT PARTITION other_b); +insert into part_t1 select i,i%3,i%4,i%5,i from generate_series(1,100) I; +Update part_t1 set e = gp_segment_id; +Select count(*) from part_t1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from part_t1 where gp_segment_id=2; + ?column? +---------- + t +(1 row) + +begin; +Alter table part_t1 shrink table to 2; +Select count(*) from part_t1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from part_t1 where gp_segment_id=2; + ?column? +---------- + f +(1 row) + +abort; +Select count(*) from part_t1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from part_t1 where gp_segment_id=2; + ?column? +---------- + t +(1 row) + +Alter table part_t1 shrink table to 2; +Select count(*) from part_t1; + count +------- + 100 +(1 row) + +Select count(*) > 0 from part_t1 where gp_segment_id=2; + ?column? +---------- + f +(1 row) + +select numsegments from gp_distribution_policy where localoid='part_t1'::regclass; + numsegments +------------- + 2 +(1 row) + +drop table part_t1; +-- only shrink leaf partitions, not allowed now +CREATE TABLE part_t1(a int, b int, c int, d int, e int) +DISTRIBUTED BY(a) +PARTITION BY RANGE (b) + SUBPARTITION BY RANGE (c) + SUBPARTITION TEMPLATE ( + START(1) END (3) EVERY(1), + DEFAULT SUBPARTITION others_c) + SUBPARTITION BY LIST (d) + SUBPARTITION TEMPLATE ( + SUBPARTITION one VALUES (1), + SUBPARTITION two VALUES (2), + SUBPARTITION three VALUES (3), + DEFAULT SUBPARTITION others_d) +( START (1) END (2) EVERY (1), + DEFAULT PARTITION other_b); +insert into part_t1 select i,i%3,i%4,i%5,i from generate_series(1,100) I; +Update part_t1 set e = gp_segment_id; +select gp_segment_id, * from part_t1_1_prt_other_b_2_prt_2_3_prt_others_d; + gp_segment_id | a | b | c | d | e +---------------+----+---+---+---+--- + 0 | 29 | 2 | 1 | 4 | 0 + 0 | 45 | 0 | 1 | 0 | 0 + 0 | 65 | 2 | 1 | 0 | 0 + 2 | 5 | 2 | 1 | 0 | 2 + 2 | 9 | 0 | 1 | 4 | 2 + 1 | 69 | 0 | 1 | 4 | 1 + 1 | 89 | 2 | 1 | 4 | 1 +(7 rows) + +alter table part_t1_1_prt_other_b_2_prt_2_3_prt_others_d shrink table to 2; +ERROR: cannot shrink leaf or interior partition "part_t1_1_prt_other_b_2_prt_2_3_prt_others_d" +DETAIL: Root/leaf/interior partitions need to have same numsegments +HINT: Call ALTER TABLE SHRINK TABLE on the root table instead +select gp_segment_id, * from part_t1_1_prt_other_b_2_prt_2_3_prt_others_d; + gp_segment_id | a | b | c | d | e +---------------+----+---+---+---+--- + 0 | 29 | 2 | 1 | 4 | 0 + 0 | 45 | 0 | 1 | 0 | 0 + 0 | 65 | 2 | 1 | 0 | 0 + 2 | 5 | 2 | 1 | 0 | 2 + 2 | 9 | 0 | 1 | 4 | 2 + 1 | 69 | 0 | 1 | 4 | 1 + 1 | 89 | 2 | 1 | 4 | 1 +(7 rows) + +-- try to shrink root partition, should success +Alter table part_t1 shrink table to 2; +Select gp_segment_id, count(*) from part_t1 group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 53 + 1 | 47 +(2 rows) + +drop table part_t1; +-- inherits tables +CREATE TABLE inherit_t1_p1(a int, b int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Cloudberry Database data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +CREATE TABLE inherit_t1_p2(a int, b int) INHERITS (inherit_t1_p1); +NOTICE: table has parent, setting distribution columns to match parent table +NOTICE: merging column "a" with inherited definition +NOTICE: merging column "b" with inherited definition +CREATE TABLE inherit_t1_p3(a int, b int) INHERITS (inherit_t1_p1); +NOTICE: table has parent, setting distribution columns to match parent table +NOTICE: merging column "a" with inherited definition +NOTICE: merging column "b" with inherited definition +CREATE TABLE inherit_t1_p4(a int, b int) INHERITS (inherit_t1_p2); +NOTICE: table has parent, setting distribution columns to match parent table +NOTICE: merging column "a" with inherited definition +NOTICE: merging column "b" with inherited definition +CREATE TABLE inherit_t1_p5(a int, b int) INHERITS (inherit_t1_p3); +NOTICE: table has parent, setting distribution columns to match parent table +NOTICE: merging column "a" with inherited definition +NOTICE: merging column "b" with inherited definition +insert into inherit_t1_p1 select i,i from generate_series(1,10) i; +insert into inherit_t1_p2 select i,i from generate_series(1,10) i; +insert into inherit_t1_p3 select i,i from generate_series(1,10) i; +insert into inherit_t1_p4 select i,i from generate_series(1,10) i; +insert into inherit_t1_p5 select i,i from generate_series(1,10) i; +select count(*) > 0 from inherit_t1_p1 where gp_segment_id = 2; + ?column? +---------- + t +(1 row) + +begin; +alter table inherit_t1_p1 shrink table to 2; +select count(*) > 0 from inherit_t1_p1 where gp_segment_id = 2; + ?column? +---------- + f +(1 row) + +abort; +select count(*) > 0 from inherit_t1_p1 where gp_segment_id = 2; + ?column? +---------- + t +(1 row) + +alter table inherit_t1_p1 shrink table to 2; +select count(*) > 0 from inherit_t1_p1 where gp_segment_id = 2; + ?column? +---------- + f +(1 row) + +DROP TABLE inherit_t1_p1 CASCADE; +NOTICE: drop cascades to 4 other objects +DETAIL: drop cascades to table inherit_t1_p2 +drop cascades to table inherit_t1_p4 +drop cascades to table inherit_t1_p3 +drop cascades to table inherit_t1_p5 +-- +-- Cannot shrink a native view and transformed view +-- +CREATE TABLE shrink_table1(a int) distributed by (a); +CREATE TABLE shrink_table2(a int) distributed by (a); +CREATE VIEW shrink_view AS select * from shrink_table1; +CREATE rule "_RETURN" AS ON SELECT TO shrink_table2 + DO INSTEAD SELECT * FROM shrink_table1; +ALTER TABLE shrink_table2 shrink TABLE to 2; +ERROR: "shrink_table2" is not a table, materialized view, or foreign table +ALTER TABLE shrink_view shrink TABLE to 2; +ERROR: "shrink_view" is not a table, materialized view, or foreign table +ALTER TABLE shrink_table1 shrink TABLE to 2; +drop table shrink_table1 cascade; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to view shrink_view +drop cascades to view shrink_table2 +-- +-- Test shrinking a table with a domain type as distribution key. +-- +create domain myintdomain as int4; +create table shrink_domain_tab(d myintdomain, oldseg int4) distributed by(d); +insert into shrink_domain_tab select generate_series(1,10); +update shrink_domain_tab set oldseg = gp_segment_id; +select gp_segment_id, count(*) from shrink_domain_tab group by gp_segment_id; + gp_segment_id | count +---------------+------- + 1 | 1 + 2 | 4 + 0 | 5 +(3 rows) + +alter table shrink_domain_tab shrink table to 2; +select gp_segment_id, count(*) from shrink_domain_tab group by gp_segment_id; + gp_segment_id | count +---------------+------- + 0 | 8 + 1 | 2 +(2 rows) + +select numsegments from gp_distribution_policy where localoid='shrink_domain_tab'::regclass; + numsegments +------------- + 2 +(1 row) + +drop table shrink_domain_tab; +-- start_ignore +-- We need to do a cluster expansion which will check if there are partial +-- tables, we need to drop the partial tables to keep the cluster expansion +-- run correctly. +reset search_path; +drop schema test_reshuffle cascade; +ERROR: schema "test_reshuffle" does not exist +-- end_ignore diff --git a/src/test/regress/greenplum_schedule b/src/test/regress/greenplum_schedule index 58fda7b5803..b73dd6e7b17 100755 --- a/src/test/regress/greenplum_schedule +++ b/src/test/regress/greenplum_schedule @@ -163,7 +163,7 @@ test: resource_group_gucs test: wrkloadadmin # expand_table tests may affect the result of 'gp_explain', keep them below that -test: gp_toolkit_ao_funcs trig auth_constraint role portals_updatable plpgsql_cache timeseries pg_stat_last_operation pg_stat_last_shoperation gp_numeric_agg partindex_test partition_pruning runtime_stats expand_table expand_table_ao expand_table_aoco expand_table_regression +test: gp_toolkit_ao_funcs trig auth_constraint role portals_updatable plpgsql_cache timeseries pg_stat_last_operation pg_stat_last_shoperation gp_numeric_agg partindex_test partition_pruning runtime_stats expand_table expand_table_ao expand_table_aoco expand_table_regression shrink_table # direct dispatch tests test: direct_dispatch bfv_dd bfv_dd_multicolumn bfv_dd_types diff --git a/src/test/regress/sql/shrink_table.sql b/src/test/regress/sql/shrink_table.sql new file mode 100644 index 00000000000..63fc5169d8c --- /dev/null +++ b/src/test/regress/sql/shrink_table.sql @@ -0,0 +1,440 @@ +drop schema if exists test_shrink_table cascade; +create schema test_shrink_table; +set search_path=test_shrink_table,public; +set default_table_access_method='heap'; +set allow_system_table_mods=true; + +-- Hash distributed tables +Create table t1(a int, b int, c int) distributed by (a); +insert into t1 select i,i,0 from generate_series(1,100) I; +Update t1 set c = gp_segment_id; + +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + +begin; +Alter table t1 shrink table to 2; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; +abort; + +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + +Alter table t1 shrink table to 2; + +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + +select numsegments from gp_distribution_policy where localoid='t1'::regclass; +drop table t1; + + +Create table t1(a int, b int, c int) distributed by (a,b); +insert into t1 select i,i,0 from generate_series(1,100) I; +Update t1 set c = gp_segment_id; + +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + +begin; +Alter table t1 shrink table to 1; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; +abort; + +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + +Alter table t1 shrink table to 1; + +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + +select numsegments from gp_distribution_policy where localoid='t1'::regclass; +drop table t1; + +-- Test NULLs. +Create table t1(a int, b int, c int) distributed by (a,b,c); +insert into t1 values + (1, 1, 1 ), + (null, 2, 2 ), + (3, null, 3 ), + (4, 4, null), + (null, null, 5 ), + (null, 6, null), + (7, null, null), + (null, null, null); +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + +begin; +Alter table t1 shrink table to 2; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; +abort; + +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + +Alter table t1 shrink table to 2; + +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + +select numsegments from gp_distribution_policy where localoid='t1'::regclass; +drop table t1; + +Create table t1(a int, b int, c int) distributed by (a) partition by list(b) (partition t1_1 values(1), partition t1_2 values(2), default partition other); +insert into t1 select i,i,0 from generate_series(1,100) I; + +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + +begin; +Alter table t1 shrink table to 2; +Select gp_segment_id, count(*) from t1 group by gp_segment_id; +abort; + +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + +Alter table t1 shrink table to 2; + +Select gp_segment_id, count(*) from t1 group by gp_segment_id; + +select numsegments from gp_distribution_policy where localoid='t1'::regclass; +drop table t1; + +-- Random distributed tables +Create table r1(a int, b int, c int) distributed randomly; +insert into r1 select i,i,0 from generate_series(1,100) I; +Update r1 set c = gp_segment_id; + +Select count(*) from r1; +Select count(*) > 0 from r1 where gp_segment_id=2; + +begin; +Alter table r1 shrink table to 2; +Select count(*) from r1; +Select count(*) > 0 from r1 where gp_segment_id=2; +abort; + +Select count(*) from r1; +Select count(*) > 0 from r1 where gp_segment_id=2; + +Alter table r1 shrink table to 2; + +Select count(*) from r1; +Select count(*) > 0 from r1 where gp_segment_id=2; + +select numsegments from gp_distribution_policy where localoid='r1'::regclass; +drop table r1; + +Create table r1(a int, b int, c int) distributed randomly; +insert into r1 select i,i,0 from generate_series(1,100) I; +Update r1 set c = gp_segment_id; + +Select count(*) from r1; +Select count(*) > 0 from r1 where gp_segment_id=2; + +begin; +Alter table r1 shrink table to 2; +Select count(*) from r1; +Select count(*) > 0 from r1 where gp_segment_id=2; +abort; + +Select count(*) from r1; +Select count(*) > 0 from r1 where gp_segment_id=2; + +Alter table r1 shrink table to 2; + +Select count(*) from r1; +Select count(*) > 0 from r1 where gp_segment_id=2; + +select numsegments from gp_distribution_policy where localoid='r1'::regclass; +drop table r1; + +Create table r1(a int, b int, c int) distributed randomly partition by list(b) (partition r1_1 values(1), partition r1_2 values(2), default partition other); +insert into r1 select i,i,0 from generate_series(1,100) I; + +Select count(*) from r1; +Select count(*) > 0 from r1 where gp_segment_id=2; + +begin; +Alter table r1 shrink table to 2; +Select count(*) from r1; +Select count(*) > 0 from r1 where gp_segment_id=2; +abort; + +Select count(*) from r1; +Select count(*) > 0 from r1 where gp_segment_id=2; + +Alter table r1 shrink table to 2; + +Select count(*) from r1; +Select count(*) > 0 from r1 where gp_segment_id=2; + +select numsegments from gp_distribution_policy where localoid='r1'::regclass; +drop table r1; + +-- Replicated tables +Create table r1(a int, b int, c int) distributed replicated; + +insert into r1 select i,i,0 from generate_series(1,100) I; + +Select count(*) from gp_dist_random('r1'); + +begin; +Alter table r1 shrink table to 2; +Select count(*) from gp_dist_random('r1'); +abort; + +Select count(*) from gp_dist_random('r1'); + +Alter table r1 shrink table to 2; + +Select count(*) from gp_dist_random('r1'); + +select numsegments from gp_distribution_policy where localoid='r1'::regclass; +drop table r1; + +-- +Create table r1(a int, b int, c int) distributed replicated; + +insert into r1 select i,i,0 from generate_series(1,100) I; + +Select count(*) from gp_dist_random('r1'); + +begin; +Alter table r1 shrink table to 2; +Select count(*) from gp_dist_random('r1'); +abort; + +Select count(*) from gp_dist_random('r1'); + +Alter table r1 shrink table to 2; + +Select count(*) from gp_dist_random('r1'); + +select numsegments from gp_distribution_policy where localoid='r1'::regclass; +drop table r1; + +-- table with update triggers on distributed key column +CREATE OR REPLACE FUNCTION trigger_func() RETURNS trigger AS $$ +BEGIN + RAISE NOTICE 'trigger_func(%) called: action = %, when = %, level = %', + TG_ARGV[0], TG_OP, TG_WHEN, TG_LEVEL; + RETURN NULL; +END; +$$ LANGUAGE plpgsql; + +CREATE TABLE table_with_update_trigger(a int, b int, c int); +insert into table_with_update_trigger select i,i,0 from generate_series(1,100) I; +select gp_segment_id, count(*) from table_with_update_trigger group by 1 order by 1; + +CREATE TRIGGER foo_br_trigger BEFORE INSERT OR UPDATE OR DELETE ON table_with_update_trigger +FOR EACH ROW EXECUTE PROCEDURE trigger_func('before_stmt'); +CREATE TRIGGER foo_ar_trigger AFTER INSERT OR UPDATE OR DELETE ON table_with_update_trigger +FOR EACH ROW EXECUTE PROCEDURE trigger_func('before_stmt'); + +CREATE TRIGGER foo_bs_trigger BEFORE INSERT OR UPDATE OR DELETE ON table_with_update_trigger +FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func('before_stmt'); +CREATE TRIGGER foo_as_trigger AFTER INSERT OR UPDATE OR DELETE ON table_with_update_trigger +FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func('before_stmt'); + +-- update should fail +update table_with_update_trigger set a = a + 1; +-- data expansion should success and not hiting any triggers. +Alter table table_with_update_trigger shrink table to 2; +select gp_segment_id, count(*) from table_with_update_trigger group by 1 order by 1; +drop table table_with_update_trigger; + +-- +-- Test shrinking inheritance parent table, parent table has different +-- numsegments with child tables. +-- +create table mix_base_tbl (a int4, b int4) DISTRIBUTED RANDOMLY; +insert into mix_base_tbl select g, g from generate_series(1, 3) g; +create table mix_child_a (a int4, b int4) inherits (mix_base_tbl) distributed by (a); +insert into mix_child_a select g, g from generate_series(11, 13) g; +create table mix_child_b (a int4, b int4) inherits (mix_base_tbl) distributed by (b); +insert into mix_child_b select g, g from generate_series(21, 23) g; +-- shrink the child table, not effect parent table +Alter table mix_child_a shrink table to 2; +select numsegments from gp_distribution_policy where localoid='mix_base_tbl'::regclass; +-- shrink the parent table, both parent and child table will be rebalanced to all +-- segments +select count(*) from mix_child_a where gp_segment_id = 2; +select count(*) from mix_child_b where gp_segment_id = 2; +Alter table mix_base_tbl shrink table to 2; +select count(*) from mix_child_a where gp_segment_id = 2; +select count(*) from mix_child_b where gp_segment_id = 2; +drop table mix_base_tbl cascade; + +-- multi-level partition tables +CREATE TABLE part_t1(a int, b int, c int, d int, e int) +DISTRIBUTED BY(a) +PARTITION BY RANGE (b) + SUBPARTITION BY RANGE (c) + SUBPARTITION TEMPLATE ( + START(1) END (3) EVERY(1), + DEFAULT SUBPARTITION others_c) + + SUBPARTITION BY LIST (d) + SUBPARTITION TEMPLATE ( + SUBPARTITION one VALUES (1), + SUBPARTITION two VALUES (2), + SUBPARTITION three VALUES (3), + DEFAULT SUBPARTITION others_d) + +( START (1) END (2) EVERY (1), + DEFAULT PARTITION other_b); + +insert into part_t1 select i,i%3,i%4,i%5,i from generate_series(1,100) I; +Update part_t1 set e = gp_segment_id; + +Select gp_segment_id, count(*) from part_t1 group by gp_segment_id; + +begin; +Alter table part_t1 shrink table to 2; +Select gp_segment_id, count(*) from part_t1 group by gp_segment_id; +abort; + +Select gp_segment_id, count(*) from part_t1 group by gp_segment_id; + +Alter table part_t1 shrink table to 2; + +Select gp_segment_id, count(*) from part_t1 group by gp_segment_id; + +select numsegments from gp_distribution_policy where localoid='part_t1'::regclass; +drop table part_t1; + +-- +CREATE TABLE part_t1(a int, b int, c int, d int, e int) +DISTRIBUTED RANDOMLY +PARTITION BY RANGE (b) + SUBPARTITION BY RANGE (c) + SUBPARTITION TEMPLATE ( + START(1) END (3) EVERY(1), + DEFAULT SUBPARTITION others_c) + + SUBPARTITION BY LIST (d) + SUBPARTITION TEMPLATE ( + SUBPARTITION one VALUES (1), + SUBPARTITION two VALUES (2), + SUBPARTITION three VALUES (3), + DEFAULT SUBPARTITION others_d) + +( START (1) END (2) EVERY (1), + DEFAULT PARTITION other_b); + +insert into part_t1 select i,i%3,i%4,i%5,i from generate_series(1,100) I; +Update part_t1 set e = gp_segment_id; + +Select count(*) from part_t1; +Select count(*) > 0 from part_t1 where gp_segment_id=2; + +begin; +Alter table part_t1 shrink table to 2; +Select count(*) from part_t1; +Select count(*) > 0 from part_t1 where gp_segment_id=2; +abort; + +Select count(*) from part_t1; +Select count(*) > 0 from part_t1 where gp_segment_id=2; + +Alter table part_t1 shrink table to 2; + +Select count(*) from part_t1; +Select count(*) > 0 from part_t1 where gp_segment_id=2; + +select numsegments from gp_distribution_policy where localoid='part_t1'::regclass; +drop table part_t1; + +-- only shrink leaf partitions, not allowed now +CREATE TABLE part_t1(a int, b int, c int, d int, e int) +DISTRIBUTED BY(a) +PARTITION BY RANGE (b) + SUBPARTITION BY RANGE (c) + SUBPARTITION TEMPLATE ( + START(1) END (3) EVERY(1), + DEFAULT SUBPARTITION others_c) + + SUBPARTITION BY LIST (d) + SUBPARTITION TEMPLATE ( + SUBPARTITION one VALUES (1), + SUBPARTITION two VALUES (2), + SUBPARTITION three VALUES (3), + DEFAULT SUBPARTITION others_d) + +( START (1) END (2) EVERY (1), + DEFAULT PARTITION other_b); + +insert into part_t1 select i,i%3,i%4,i%5,i from generate_series(1,100) I; +Update part_t1 set e = gp_segment_id; + +select gp_segment_id, * from part_t1_1_prt_other_b_2_prt_2_3_prt_others_d; + +alter table part_t1_1_prt_other_b_2_prt_2_3_prt_others_d shrink table to 2; +select gp_segment_id, * from part_t1_1_prt_other_b_2_prt_2_3_prt_others_d; + +-- try to shrink root partition, should success +Alter table part_t1 shrink table to 2; +Select gp_segment_id, count(*) from part_t1 group by gp_segment_id; + +drop table part_t1; + + +-- inherits tables +CREATE TABLE inherit_t1_p1(a int, b int); +CREATE TABLE inherit_t1_p2(a int, b int) INHERITS (inherit_t1_p1); +CREATE TABLE inherit_t1_p3(a int, b int) INHERITS (inherit_t1_p1); +CREATE TABLE inherit_t1_p4(a int, b int) INHERITS (inherit_t1_p2); +CREATE TABLE inherit_t1_p5(a int, b int) INHERITS (inherit_t1_p3); + +insert into inherit_t1_p1 select i,i from generate_series(1,10) i; +insert into inherit_t1_p2 select i,i from generate_series(1,10) i; +insert into inherit_t1_p3 select i,i from generate_series(1,10) i; +insert into inherit_t1_p4 select i,i from generate_series(1,10) i; +insert into inherit_t1_p5 select i,i from generate_series(1,10) i; + +select count(*) > 0 from inherit_t1_p1 where gp_segment_id = 2; + +begin; +alter table inherit_t1_p1 shrink table to 2; +select count(*) > 0 from inherit_t1_p1 where gp_segment_id = 2; +abort; + +select count(*) > 0 from inherit_t1_p1 where gp_segment_id = 2; + +alter table inherit_t1_p1 shrink table to 2; + +select count(*) > 0 from inherit_t1_p1 where gp_segment_id = 2; + +DROP TABLE inherit_t1_p1 CASCADE; + + +-- +-- Cannot shrink a native view and transformed view +-- +CREATE TABLE shrink_table1(a int) distributed by (a); +CREATE TABLE shrink_table2(a int) distributed by (a); +CREATE VIEW shrink_view AS select * from shrink_table1; +CREATE rule "_RETURN" AS ON SELECT TO shrink_table2 + DO INSTEAD SELECT * FROM shrink_table1; +ALTER TABLE shrink_table2 shrink TABLE to 2; +ALTER TABLE shrink_view shrink TABLE to 2; +ALTER TABLE shrink_table1 shrink TABLE to 2; +drop table shrink_table1 cascade; + +-- +-- Test shrinking a table with a domain type as distribution key. +-- +create domain myintdomain as int4; + +create table shrink_domain_tab(d myintdomain, oldseg int4) distributed by(d); +insert into shrink_domain_tab select generate_series(1,10); + +update shrink_domain_tab set oldseg = gp_segment_id; + +select gp_segment_id, count(*) from shrink_domain_tab group by gp_segment_id; + +alter table shrink_domain_tab shrink table to 2; +select gp_segment_id, count(*) from shrink_domain_tab group by gp_segment_id; +select numsegments from gp_distribution_policy where localoid='shrink_domain_tab'::regclass; +drop table shrink_domain_tab; + +-- start_ignore +-- We need to do a cluster expansion which will check if there are partial +-- tables, we need to drop the partial tables to keep the cluster expansion +-- run correctly. +reset search_path; +drop schema test_reshuffle cascade; +-- end_ignore