diff --git a/README.md b/README.md index 5d01005d..b109d416 100644 --- a/README.md +++ b/README.md @@ -178,6 +178,7 @@ Currently, the following operations are supported: | Subcommand | Description | | --------- | ----------- | | `delete acls [flags]` | Deletes ACL(s) in the cluster matching the provided flags | +| `delete topic [topic]` | Deletes a single topic in the cluster | #### get diff --git a/cmd/topicctl/subcmd/delete.go b/cmd/topicctl/subcmd/delete.go index 27fdbd09..6f0ab557 100644 --- a/cmd/topicctl/subcmd/delete.go +++ b/cmd/topicctl/subcmd/delete.go @@ -44,6 +44,7 @@ func init() { addSharedFlags(deleteCmd, &deleteConfig.shared) deleteCmd.AddCommand( deleteACLCmd(), + deleteTopicCmd(), ) RootCmd.AddCommand(deleteCmd) } @@ -150,3 +151,24 @@ $ topicctl delete acls --resource-type topic --resource-pattern-type literal --r cmd.MarkFlagRequired("resource-type") return cmd } + +func deleteTopicCmd() *cobra.Command { + return &cobra.Command{ + Use: "topic [topic name]", + Short: "Delete a topic", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + ctx := context.Background() + sess := session.Must(session.NewSession()) + + adminClient, err := deleteConfig.shared.getAdminClient(ctx, sess, false) + if err != nil { + return err + } + defer adminClient.Close() + + cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner) + return cliRunner.DeleteTopic(ctx, args[0]) + }, + } +} diff --git a/pkg/admin/brokerclient.go b/pkg/admin/brokerclient.go index 6cf1305a..5b69699f 100644 --- a/pkg/admin/brokerclient.go +++ b/pkg/admin/brokerclient.go @@ -616,6 +616,31 @@ func (c *BrokerAdminClient) AssignPartitions( return err } +// DeleteTopic deletes a topic in the cluster. +func (c *BrokerAdminClient) DeleteTopic(ctx context.Context, topic string) error { + if c.config.ReadOnly { + return errors.New("Cannot delete topics in read-only mode") + } + + req := &kafka.DeleteTopicsRequest{ + Topics: []string{topic}, + } + log.Debugf("DeleteTopics request: %+v", req) + + resp, err := c.client.DeleteTopics(ctx, req) + log.Debugf("DeleteTopics response: %+v (%+v)", resp, err) + + if err != nil { + return err + } + + if err, ok := resp.Errors[topic]; ok { + return err + } + + return nil +} + // AddPartitions extends a topic by adding one or more new partitions to it. func (c *BrokerAdminClient) AddPartitions( ctx context.Context, diff --git a/pkg/admin/brokerclient_test.go b/pkg/admin/brokerclient_test.go index 8691195a..856fbe18 100644 --- a/pkg/admin/brokerclient_test.go +++ b/pkg/admin/brokerclient_test.go @@ -419,6 +419,70 @@ func TestBrokerClientAddPartitions(t *testing.T) { assert.Equal(t, []int{6, 1}, topicInfo.Partitions[4].Replicas) } +func TestBrokerDeleteTopic(t *testing.T) { + if !util.CanTestBrokerAdmin() { + t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set") + } + + ctx := context.Background() + client, err := NewBrokerAdminClient( + ctx, + BrokerAdminClientConfig{ + ConnectorConfig: ConnectorConfig{ + BrokerAddr: util.TestKafkaAddr(), + }, + }, + ) + require.NoError(t, err) + + topicName := util.RandomString("topic-delete-", 6) + err = client.CreateTopic( + ctx, + kafka.TopicConfig{ + Topic: topicName, + NumPartitions: -1, + ReplicationFactor: -1, + ReplicaAssignments: []kafka.ReplicaAssignment{ + { + Partition: 0, + Replicas: []int{1, 2}, + }, + { + Partition: 1, + Replicas: []int{2, 3}, + }, + { + Partition: 2, + Replicas: []int{3, 4}, + }, + }, + ConfigEntries: []kafka.ConfigEntry{ + { + ConfigName: "flush.ms", + ConfigValue: "2000", + }, + { + ConfigName: "retention.ms", + ConfigValue: "10000000", + }, + }, + }, + ) + require.NoError(t, err) + util.RetryUntil(t, 5*time.Second, func() error { + _, err := client.GetTopic(ctx, topicName, true) + return err + }) + + err = client.DeleteTopic(ctx, topicName) + require.NoError(t, err) + + time.Sleep(time.Second * 10) + + _, err = client.GetTopic(ctx, topicName, false) + require.Error(t, err) +} + func TestBrokerClientAlterAssignments(t *testing.T) { if !util.CanTestBrokerAdmin() { t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set") diff --git a/pkg/admin/client.go b/pkg/admin/client.go index 919a282c..e05d5dda 100644 --- a/pkg/admin/client.go +++ b/pkg/admin/client.go @@ -41,6 +41,9 @@ type Client interface { detailed bool, ) (TopicInfo, error) + // DeleteTopic deletes a single topic in the cluster. + DeleteTopic(ctx context.Context, topic string) error + // GetACLs gets full information about each ACL in the cluster. GetACLs( ctx context.Context, diff --git a/pkg/admin/zkclient.go b/pkg/admin/zkclient.go index 417fd507..b8f23a56 100644 --- a/pkg/admin/zkclient.go +++ b/pkg/admin/zkclient.go @@ -645,6 +645,29 @@ func (c *ZKAdminClient) CreateTopic( return err } +func (c *ZKAdminClient) DeleteTopic(ctx context.Context, topic string) error { + if c.readOnly { + return errors.New("Cannot delete topics in read-only mode") + } + + req := kafka.DeleteTopicsRequest{ + Topics: []string{topic}, + } + log.Debugf("DeleteTopics request: %+v", req) + + resp, err := c.Connector.KafkaClient.DeleteTopics(ctx, &req) + log.Debugf("DeleteTopics response: %+v (%+v)", resp, err) + if err != nil { + return err + } + + if err, ok := resp.Errors[topic]; ok { + return err + } + + return nil +} + // AssignPartitions notifies the cluster to begin a partition reassignment. // This should only be used for existing partitions; to create new partitions, // use the AddPartitions method. diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index ee20f497..a60a1208 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -21,6 +21,7 @@ import ( "github.com/segmentio/topicctl/pkg/config" "github.com/segmentio/topicctl/pkg/groups" "github.com/segmentio/topicctl/pkg/messages" + "github.com/segmentio/topicctl/pkg/util" log "github.com/sirupsen/logrus" ) @@ -608,6 +609,40 @@ func (c *CLIRunner) GetTopics(ctx context.Context, full bool) error { return nil } +// DeleteTopic deletes a single topic. +func (c *CLIRunner) DeleteTopic(ctx context.Context, topic string) error { + c.printer("Checking if topic %s exists...", topic) + c.startSpinner() + // First check that topic exists + _, err := c.adminClient.GetTopic(ctx, topic, false) + if err != nil { + c.stopSpinner() + return fmt.Errorf("error fetching topic info: %+v", err) + } + c.stopSpinner() + c.printer("Topic %s exists in the cluster!", topic) + + confirm, err := util.Confirm(fmt.Sprintf("Delete topic \"%s\"", topic), false) + if err != nil { + return err + } + + if !confirm { + return nil + } + + c.startSpinner() + err = c.adminClient.DeleteTopic(ctx, topic) + c.stopSpinner() + if err != nil { + return err + } + + c.printer("Topic %s successfully deleted", topic) + + return nil +} + // GerUsers fetches the details of each user in the cluster and prints out a table of them. func (c *CLIRunner) GetUsers(ctx context.Context, names []string) error { c.startSpinner() diff --git a/pkg/version/version.go b/pkg/version/version.go index e2b6ecac..502a5946 100644 --- a/pkg/version/version.go +++ b/pkg/version/version.go @@ -1,4 +1,4 @@ package version // Version is the current topicctl version. -const Version = "1.20.2" +const Version = "1.21.0"