perf: migrate to ManifestLocation, add e_tag#3592
perf: migrate to ManifestLocation, add e_tag#3592wjones127 merged 8 commits intolance-format:mainfrom
ManifestLocation, add e_tag#3592Conversation
|
ACTION NEEDED The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. For details on the error please inspect the "PR Title Check" action. |
cf794c5 to
be23afa
Compare
ManifestLocation, add e_tag ManifestLocation, add e_tag
| #[tokio::test] | ||
| async fn test_replace_dataset() { | ||
| let test_dir = tempdir().unwrap(); | ||
| let test_uri = test_dir.path().to_str().unwrap(); | ||
|
|
||
| let data = gen() | ||
| .col("int", array::step::<Int32Type>()) | ||
| .into_batch_rows(RowCount::from(20)) | ||
| .unwrap(); | ||
| let data1 = data.slice(0, 10); | ||
| let data2 = data.slice(10, 10); | ||
| let mut ds = InsertBuilder::new(test_uri) | ||
| .execute(vec![data1]) | ||
| .await | ||
| .unwrap(); | ||
|
|
||
| ds.object_store().remove_dir_all(test_uri).await.unwrap(); | ||
|
|
||
| let ds2 = InsertBuilder::new(test_uri) | ||
| .execute(vec![data2.clone()]) | ||
| .await | ||
| .unwrap(); | ||
|
|
||
| ds.checkout_latest().await.unwrap(); | ||
| let roundtripped = ds.scan().try_into_batch().await.unwrap(); | ||
| assert_eq!(roundtripped, data2); | ||
|
|
||
| ds.validate().await.unwrap(); | ||
| ds2.validate().await.unwrap(); | ||
| assert_eq!(ds.manifest.version, 1); | ||
| assert_eq!(ds2.manifest.version, 1); | ||
| } |
There was a problem hiding this comment.
This is the main test of interest: can we delete a dataset, recreate it, and then use checkout_latest() on an old handle to detect the recreated version.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3592 +/- ##
==========================================
+ Coverage 78.69% 78.74% +0.04%
==========================================
Files 258 259 +1
Lines 96813 97030 +217
Branches 96813 97030 +217
==========================================
+ Hits 76185 76403 +218
+ Misses 17560 17552 -8
- Partials 3068 3075 +7
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| object_writer.shutdown().await.unwrap(); | ||
| let res = object_writer.shutdown().await.unwrap(); | ||
| assert_eq!(res.size, 256 * 3); | ||
| } |
There was a problem hiding this comment.
Is this test big enough to trigger multiple write parts?
| // Use an ETag scheme based on that used by many popular HTTP servers | ||
| // <https://httpd.apache.org/docs/2.2/mod/core.html#fileetag> | ||
| // <https://stackoverflow.com/questions/47512043/how-etags-are-generated-and-configured> | ||
| format!("{inode:x}-{mtime:x}-{size:x}") |
| _ => None, | ||
| }); | ||
|
|
||
| let e_tag = item.get("e_tag").and_then(|attr| attr.as_s().ok().cloned()); |
There was a problem hiding this comment.
Does dynamodb need any kind of migration for adding a new column?
There was a problem hiding this comment.
Only if you are creating new key columns / indices. For general columns, it's a schemaless document database.
|
|
||
| // On S3, the etag can change if originally was MultipartUpload and later was Copy | ||
| // https://docs.aws.amazon.com/AmazonS3/latest/API/API_Object.html#AmazonS3-Type-Object-ETag | ||
| // We generally only do MultipartUpload for > 5MB files, so we can skip this check |
There was a problem hiding this comment.
| // We generally only do MultipartUpload for > 5MB files, so we can skip this check | |
| // We only do MultipartUpload for > 5MB files, so we can skip this check |
I hope not generally if we're going to skip the step 😄
| self.manifest.version == location.version | ||
| && location.e_tag.as_ref().is_some_and(|e_tag| { | ||
| self.manifest_e_tag | ||
| .as_ref() | ||
| .is_some_and(|current_e_tag| e_tag == current_e_tag) | ||
| }) |
There was a problem hiding this comment.
So if this version or the previous version does not have the etag then we have to fallback to previous behavior and assume it isn't already checked out?
There was a problem hiding this comment.
Yes. I wanted to avoid a situation where we get into a reload loop because the e_tag keeps coming back as None.
| let manifest = | ||
| read_manifest(&self.dataset.object_store, &location.path, location.size).await?; |
CommitHandlerto just useManifestLocation.O(num_manifests)IOPS fromcleanup_old_versions, since we no longer have to make a separateHEADrequest to get the size of the file.O(num_manifests)IOPS fromlist_versions(), similar reasons as above.e_tagtoManifestLocation, so we can check we are loading the expected manifest. This eliminates the possibility that we are caching an old version of the manifest, in cases where the dataset has been deleted and recreated to the same version number.