-
Notifications
You must be signed in to change notification settings - Fork 347
Use metadata to set column comments and encoding #178
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
8be8ae5 to
ddafe4c
Compare
Current coverage is 75.93%@@ master #178 diff @@
==========================================
Files 13 13
Lines 684 694 +10
Methods 589 609 +20
Messages 0 0
Branches 95 85 -10
==========================================
- Hits 617 527 -90
- Misses 67 167 +100
Partials 0 0
|
ddafe4c to
0066e7e
Compare
|
@JoshRosen |
|
Very cool; glad to see that this was pretty simple to implement. I agree that we should get #157 in first, so I'll try to take a look at that patch and bring it up to date (sorry for the lag in addressing feedback on that). Do we need to add any roundtrip integration tests for this? The unit tests to ensure that we issue the expected |
README.md
Outdated
|
|
||
| ### Configuring column encoding | ||
|
|
||
| When creating a table, `spark-redshift` can be configured to use a specific compression encoding on individual columns. You can use the `encoding` column metadata field to specify a compression encoding for each column (see [Amazon docs](http://docs.aws.amazon.com/redshift/latest/dg/c_Compression_encodings.html) for available encodings). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This library is undergoing a slight naming / branding change in the README and documentation, so spark-redshift needs to be replaced with "this library" (see #179).
|
It's probably a good idea to have a roundtrip test, just to be sure the syntax is correct and works with Redshift, so I've added a couple of integration tests. The query to get the column comments is a bit convoluted, but it was the best I could find. |
| val metadata = new MetadataBuilder().putString("encoding", "LZO").build() | ||
| val schema = StructType( | ||
| StructField("x", StringType, metadata = metadata) :: Nil) | ||
| sqlContext.createDataFrame(sc.parallelize(Seq(Row("a" * 512))), schema).write |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test seems to fail with a "java.sql.SQLException: Error (code 1204) while loading data into Redshift: "String length exceeds DDL length"" exception. My hunch is that compression doesn't affect data type constraints, such as maximum field length, and thus I think that you'll have to either reduce the string size or increase the column's string length.
68a8ccf to
5e42923
Compare
| .format("com.databricks.spark.redshift") | ||
| .option("url", jdbcUrl) | ||
| .option("dbtable", | ||
| s"""(SELECT "column", encoding FROM pg_table_def WHERE tablename='$tableName')""") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interestingly, this query is now failing for me with the following error:
[info] Caused by: com.amazon.support.exceptions.ErrorException: [Amazon](500310) Invalid operation: Specified types or functions (one per INFO message) not supported on Redshift tables.;
[info] ... 28 more (QueryTest.scala:60)
[info] org.scalatest.exceptions.TestFailedException:
[info] at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:495)
[info] at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
[info] at org.scalatest.Assertions$class.fail(Assertions.scala:1328)
[info] at org.scalatest.FunSuite.fail(FunSuite.scala:1555)
[info] at com.databricks.spark.redshift.QueryTest$class.checkAnswer(QueryTest.scala:60)
[info] at com.databricks.spark.redshift.RedshiftIntegrationSuite.checkAnswer(RedshiftIntegrationSuite.scala:27)
[info] at com.databricks.spark.redshift.RedshiftIntegrationSuite$$anonfun$17.apply$mcV$sp(RedshiftIntegrationSuite.scala:417)
[info] at com.databricks.spark.redshift.RedshiftIntegrationSuite$$anonfun$17.apply(RedshiftIntegrationSuite.scala:389)
[info] at com.databricks.spark.redshift.RedshiftIntegrationSuite$$anonfun$17.apply(RedshiftIntegrationSuite.scala:389)
[info] at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
[info] at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info] at org.scalatest.Transformer.apply(Transformer.scala:22)
[info] at org.scalatest.Transformer.apply(Transformer.scala:20)
[info] at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
[info] at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
[info] at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
[info] at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
[info] at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
[info] at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
[info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info] at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
[info] at com.databricks.spark.redshift.RedshiftIntegrationSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(RedshiftIntegrationSuite.scala:27)
[...]
I received a bug report from a user who hit this problem but was never able to get to the bottom of it as far as I could tell (since I didn't have access to any logs or queries to even know which query was triggering that error). I'm going to see if I can add some additional logging to help figure out what's going on here now that I've stumbled across a reproduction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Err, here's a better copy of the stacktrace:
[info] java.sql.SQLException: [Amazon](500310) Invalid operation: Specified types or functions (one per INFO message) not supported on Redshift tables.;
[info] at com.amazon.redshift.client.messages.inbound.ErrorResponse.toErrorException(Unknown Source)
[info] at com.amazon.redshift.client.PGMessagingContext.handleErrorResponse(Unknown Source)
[info] at com.amazon.redshift.client.PGMessagingContext.getOperationMetadata(Unknown Source)
[info] at com.amazon.redshift.client.PGMessagingContext.getOperationMetadata(Unknown Source)
[info] at com.amazon.redshift.client.PGMessagingContext.handleMessage(Unknown Source)
[info] at com.amazon.jdbc.communications.InboundMessagesPipeline.getNextMessageOfClass(Unknown Source)
[info] at com.amazon.redshift.client.PGMessagingContext.doMoveToNextClass(Unknown Source)
[info] at com.amazon.redshift.client.PGMessagingContext.getReadyForQuery(Unknown Source)
[info] at com.amazon.redshift.client.PGMessagingContext.getOperationMetadata(Unknown Source)
[info] at com.amazon.redshift.client.PGMessagingContext.getOperationMetadata(Unknown Source)
[info] at com.amazon.redshift.client.PGMessagingContext.handleMessage(Unknown Source)
[info] at com.amazon.jdbc.communications.InboundMessagesPipeline.getNextMessageOfClass(Unknown Source)
[info] at com.amazon.redshift.client.PGMessagingContext.doMoveToNextClass(Unknown Source)
[info] at com.amazon.redshift.client.PGMessagingContext.getReadyForQuery(Unknown Source)
[info] at com.amazon.redshift.client.PGMessagingContext.getOperationMetadata(Unknown Source)
[info] at com.amazon.redshift.client.PGMessagingContext.getOperationMetadata(Unknown Source)
[info] at com.amazon.redshift.dataengine.PGExecutionResults.<init>(Unknown Source)
[info] at com.amazon.redshift.dataengine.PGAbstractQueryExecutor.createExecutionResults(Unknown Source)
[info] at com.amazon.redshift.dataengine.PGAbstractQueryExecutor.getResults(Unknown Source)
[info] at com.amazon.jdbc.common.SPreparedStatement.executeWithParams(Unknown Source)
[info] at com.amazon.jdbc.common.SPreparedStatement.execute(Unknown Source)
[info] at com.databricks.spark.redshift.JDBCWrapper$$anonfun$executeInterruptibly$1.apply(RedshiftJDBCWrapper.scala:122)
[info] at com.databricks.spark.redshift.JDBCWrapper$$anonfun$executeInterruptibly$1.apply(RedshiftJDBCWrapper.scala:122)
[info] at com.databricks.spark.redshift.JDBCWrapper$$anonfun$2.apply(RedshiftJDBCWrapper.scala:140)
[info] at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
[info] at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
[info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a client-side error; I added ?loglevel=1 to the end of my JDBC URL in order to get the Redshift JDBC driver to perform more logging and used DriverManager.setLogWriter(new PrintWriter(System.out)) to direct those logs to stdout.
This produced the following output:
SQLWarning: reason(Function "format_type(oid,integer)" not supported.) SQLState(01000) vendor code(0)
SQLWarning: reason(Function "pg_table_is_visible(oid)" not supported.) SQLState(01000) vendor code(0)
It looks like what's happening here is that we're trying to perform an UNLOAD on a leader-only table, which is unsupported: https://stackoverflow.com/questions/28719808/how-to-unload-pg-table-def-table-to-s3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what you'll have to do here is to use the regular JDBC data source to query these Redshift system tables rather than using spark-redshift.
|
I think that I've managed to fix the integration tests with the changes in 119d950 |
|
Thanks, I've merged in those changes. |
|
LGTM. Integration tests pass in my branch, so I'm going to merge this into |
|
Thanks for merging. No worries about the delay, we're actually running this from a fork at the moment so that we can stage the data in CSV format to speed up the Redshift import. |
Fixes #164 and #172.