From 1072544abce9cab8c54a7cbc3b7bbcf81046b725 Mon Sep 17 00:00:00 2001 From: Roman Sysoev Date: Thu, 5 Mar 2026 10:40:29 +0300 Subject: [PATCH 1/4] fix(dvcr): vmdk footer err Signed-off-by: Roman Sysoev --- images/dvcr-artifact/pkg/registry/registry.go | 209 +++++++++++++++++- 1 file changed, 202 insertions(+), 7 deletions(-) diff --git a/images/dvcr-artifact/pkg/registry/registry.go b/images/dvcr-artifact/pkg/registry/registry.go index f037780db0..046b908cb6 100644 --- a/images/dvcr-artifact/pkg/registry/registry.go +++ b/images/dvcr-artifact/pkg/registry/registry.go @@ -82,6 +82,11 @@ const ( isoImageType = "iso" ) +const ( + syntheticHeadSize = 10 * 1024 * 1024 + syntheticTailSize = 50 * 1024 * 1024 +) + type DataProcessor struct { ds datasource.DataSourceInterface destUsername string @@ -415,14 +420,62 @@ func populateCommonConfigFields(cnf *v1.ConfigFile) { }) } +type TailBuffer struct { + buffer []byte + size int + writePos int + full bool +} + +func NewTailBuffer(size int) *TailBuffer { + return &TailBuffer{ + buffer: make([]byte, size), + size: size, + } +} + +func (tb *TailBuffer) Write(p []byte) (n int, err error) { + n = len(p) + + for len(p) > 0 { + available := tb.size - tb.writePos + toCopy := len(p) + if toCopy > available { + toCopy = available + } + + copy(tb.buffer[tb.writePos:], p[:toCopy]) + tb.writePos += toCopy + p = p[toCopy:] + + if tb.writePos >= tb.size { + tb.writePos = 0 + tb.full = true + } + } + + return n, nil +} + +func (tb *TailBuffer) Bytes() []byte { + if !tb.full { + return tb.buffer[:tb.writePos] + } + + result := make([]byte, tb.size) + copy(result, tb.buffer[tb.writePos:]) + copy(result[tb.size-tb.writePos:], tb.buffer[:tb.writePos]) + return result +} + func getImageInfo(ctx context.Context, sourceReader io.ReadCloser) (ImageInfo, error) { formatSourceReaders, err := importer.NewFormatReaders(sourceReader, 0) if err != nil { return ImageInfo{}, fmt.Errorf("error creating format readers: %w", err) } - var uncompressedN int64 var tempImageInfoFile *os.File + var initialBuffer []byte klog.Infoln("Write image info to temp file") { @@ -430,9 +483,17 @@ func getImageInfo(ctx context.Context, sourceReader io.ReadCloser) (ImageInfo, e if err != nil { return ImageInfo{}, fmt.Errorf("error creating temp file: %w", err) } + defer os.Remove(tempImageInfoFile.Name()) - uncompressedN, err = io.CopyN(tempImageInfoFile, formatSourceReaders.TopReader(), imageInfoSize) - if err != nil && !errors.Is(err, io.EOF) { + initialBuffer = make([]byte, imageInfoSize) + n, err := io.ReadFull(formatSourceReaders.TopReader(), initialBuffer) + if err != nil && !errors.Is(err, io.EOF) && err != io.ErrUnexpectedEOF { + return ImageInfo{}, fmt.Errorf("error reading from source: %w", err) + } + initialBuffer = initialBuffer[:n] + + _, err = tempImageInfoFile.Write(initialBuffer) + if err != nil { return ImageInfo{}, fmt.Errorf("error writing to temp file: %w", err) } @@ -445,9 +506,30 @@ func getImageInfo(ctx context.Context, sourceReader io.ReadCloser) (ImageInfo, e var imageInfo ImageInfo { cmd := exec.CommandContext(ctx, "qemu-img", "info", "--output=json", tempImageInfoFile.Name()) - rawOut, err := cmd.Output() + rawOut, err := cmd.CombinedOutput() + if err != nil { - return ImageInfo{}, fmt.Errorf("error running qemu-img info: %w", err) + klog.Warningf("qemu-img info failed: %v", err) + klog.Warningf("qemu-img output: %s", string(rawOut)) + + if shouldTrySyntheticApproach(string(rawOut)) { + klog.Infoln("Detected VMDK footer issue, trying synthetic approach") + + syntheticInfo, syntheticErr := tryGetImageInfoSynthetic( + ctx, + formatSourceReaders.TopReader(), + initialBuffer, + ) + + if syntheticErr == nil { + klog.Infoln("Synthetic approach succeeded") + return syntheticInfo, nil + } + + klog.Warningf("Synthetic approach failed: %v", syntheticErr) + } + + return ImageInfo{}, fmt.Errorf("error running qemu-img info: %s: %w", string(rawOut), err) } klog.Infoln("Qemu-img command output:", string(rawOut)) @@ -458,7 +540,7 @@ func getImageInfo(ctx context.Context, sourceReader io.ReadCloser) (ImageInfo, e if imageInfo.Format != "raw" { // It's necessary to read everything from the original image to avoid blocking. - _, err = io.Copy(&EmptyWriter{}, sourceReader) + _, err = io.Copy(&EmptyWriter{}, formatSourceReaders.TopReader()) if err != nil { return ImageInfo{}, fmt.Errorf("error copying to nowhere: %w", err) } @@ -491,7 +573,7 @@ func getImageInfo(ctx context.Context, sourceReader io.ReadCloser) (ImageInfo, e return ImageInfo{}, fmt.Errorf("error copying to nowhere: %w", err) } - imageInfo.VirtualSize = uint64(uncompressedN + n) + imageInfo.VirtualSize = uint64(int64(len(initialBuffer)) + n) return imageInfo, nil } @@ -529,3 +611,116 @@ type EmptyWriter struct{} func (w EmptyWriter) Write(p []byte) (int, error) { return len(p), nil } + +func shouldTrySyntheticApproach(qemuImgOutput string) bool { + vmdkErrorIndicators := []string{ + "Invalid footer", + "footer", + "Footer", + "VMDK", + "vmdk", + "Could not open", + "invalid header", + "grain directory", + "Grain Directory", + "grain table", + "incomplete", + } + + lowerOutput := strings.ToLower(qemuImgOutput) + + for _, indicator := range vmdkErrorIndicators { + if strings.Contains(lowerOutput, strings.ToLower(indicator)) { + klog.Infof("Found VMDK error indicator: %s", indicator) + return true + } + } + + return false +} + +func tryGetImageInfoSynthetic(ctx context.Context, sourceReader io.Reader, initialBuffer []byte) (ImageInfo, error) { + klog.Infoln("Creating synthetic VMDK from head and tail buffers") + + headSize := syntheticHeadSize + if len(initialBuffer) < headSize { + headSize = len(initialBuffer) + } + headBuf := initialBuffer[:headSize] + + klog.Infof("Using head from initial buffer: %d bytes", len(headBuf)) + + tailBuf := NewTailBuffer(syntheticTailSize) + + if len(initialBuffer) > headSize { + remainingInitial := initialBuffer[headSize:] + klog.Infof("Adding remaining initial buffer to tail: %d bytes", len(remainingInitial)) + tailBuf.Write(remainingInitial) + } + + klog.Infoln("Streaming remaining data to tail buffer...") + written, err := io.Copy(tailBuf, sourceReader) + if err != nil { + return ImageInfo{}, fmt.Errorf("error streaming to tail: %w", err) + } + + totalSize := int64(len(initialBuffer)) + written + klog.Infof("Total file size: %d bytes (%.2f GB)", totalSize, float64(totalSize)/(1024*1024*1024)) + + syntheticPath, err := createSyntheticVMDK(headBuf, tailBuf, totalSize) + if err != nil { + return ImageInfo{}, fmt.Errorf("error creating synthetic VMDK: %w", err) + } + defer os.Remove(syntheticPath) + + klog.Infof("Created synthetic VMDK: %s", syntheticPath) + + cmd := exec.CommandContext(ctx, "qemu-img", "info", "--output=json", syntheticPath) + rawOut, err := cmd.CombinedOutput() + if err != nil { + klog.Errorf("qemu-img failed on synthetic VMDK: %s", string(rawOut)) + return ImageInfo{}, fmt.Errorf("qemu-img info failed on synthetic: %w, output: %s", err, string(rawOut)) + } + + klog.Infof("qemu-img output on synthetic VMDK: %s", string(rawOut)) + + var imageInfo ImageInfo + if err = json.Unmarshal(rawOut, &imageInfo); err != nil { + return ImageInfo{}, fmt.Errorf("error parsing qemu-img output: %w", err) + } + + return imageInfo, nil +} + +func createSyntheticVMDK(headBuf []byte, tailBuf *TailBuffer, totalSize int64) (string, error) { + tmpFile, err := os.CreateTemp("", "synthetic-*.vmdk") + if err != nil { + return "", fmt.Errorf("error creating temp file: %w", err) + } + defer tmpFile.Close() + + _, err = tmpFile.Write(headBuf) + if err != nil { + os.Remove(tmpFile.Name()) + return "", fmt.Errorf("error writing head: %w", err) + } + + tailData := tailBuf.Bytes() + tailOffset := totalSize - int64(len(tailData)) + + if tailOffset > int64(len(headBuf)) { + _, err = tmpFile.Seek(tailOffset, io.SeekStart) + if err != nil { + os.Remove(tmpFile.Name()) + return "", fmt.Errorf("error seeking: %w", err) + } + } + + _, err = tmpFile.Write(tailData) + if err != nil { + os.Remove(tmpFile.Name()) + return "", fmt.Errorf("error writing tail: %w", err) + } + + return tmpFile.Name(), nil +} From 4508e617f7575e92ccce289e5dc8286636a0bd25 Mon Sep 17 00:00:00 2001 From: Roman Sysoev Date: Tue, 10 Mar 2026 13:44:06 +0300 Subject: [PATCH 2/4] fix(dvcr): get image format from header Signed-off-by: Roman Sysoev --- images/dvcr-artifact/pkg/registry/registry.go | 227 +++++++++--------- 1 file changed, 116 insertions(+), 111 deletions(-) diff --git a/images/dvcr-artifact/pkg/registry/registry.go b/images/dvcr-artifact/pkg/registry/registry.go index 046b908cb6..eea3cd6c3c 100644 --- a/images/dvcr-artifact/pkg/registry/registry.go +++ b/images/dvcr-artifact/pkg/registry/registry.go @@ -18,6 +18,7 @@ package registry import ( "archive/tar" + "bytes" "context" "crypto/md5" "crypto/sha256" @@ -43,6 +44,7 @@ import ( "github.com/google/go-containerregistry/pkg/v1/stream" "golang.org/x/sync/errgroup" "k8s.io/klog/v2" + "kubevirt.io/containerized-data-importer/pkg/image" "kubevirt.io/containerized-data-importer/pkg/importer" "github.com/deckhouse/virtualization-controller/dvcr-importers/pkg/datasource" @@ -469,15 +471,116 @@ func (tb *TailBuffer) Bytes() []byte { } func getImageInfo(ctx context.Context, sourceReader io.ReadCloser) (ImageInfo, error) { - formatSourceReaders, err := importer.NewFormatReaders(sourceReader, 0) + initialReadSize := syntheticHeadSize + headerBuf := make([]byte, initialReadSize) + n, err := io.ReadFull(sourceReader, headerBuf) + if err != nil && err != io.ErrUnexpectedEOF { + return ImageInfo{}, fmt.Errorf("error reading initial data: %w", err) + } + headerBuf = headerBuf[:n] + + combinedReader := io.MultiReader( + bytes.NewReader(headerBuf), + sourceReader, + ) + combinedReadCloser := io.NopCloser(combinedReader) + + formatSourceReaders, err := importer.NewFormatReaders(combinedReadCloser, 0) if err != nil { return ImageInfo{}, fmt.Errorf("error creating format readers: %w", err) } + knownHdrs := image.CopyKnownHdrs() + vmdkHeader, exists := knownHdrs["vmdk"] + + checkSize := min(len(headerBuf), 512) + if exists && vmdkHeader.Match(headerBuf[:checkSize]) { + klog.Infoln("VMDK format detected, using VMDK-specific approach") + return getImageInfoVMDK(ctx, formatSourceReaders.TopReader(), headerBuf) + } + + klog.Infoln("Non-VMDK format detected, using standard approach") + return getImageInfoStandard(ctx, formatSourceReaders, headerBuf) +} + +// getImageInfoVMDK obtains information about the VMDK image using a synthetic file. +// This approach is necessary because VMDK stores metadata (footer, Grain Directory) +// at the end of the file, and qemu-img cannot work with partial VMDK. +func getImageInfoVMDK(ctx context.Context, sourceReader io.Reader, headerBuf []byte) (ImageInfo, error) { + klog.Infoln("Processing VMDK using synthetic file approach (head + tail)") + + var headBuf []byte + var totalBytesRead int64 + + if headerBuf != nil && len(headerBuf) > 0 { + headSize := syntheticHeadSize + if len(headerBuf) < headSize { + headSize = len(headerBuf) + } + headBuf = headerBuf[:headSize] + totalBytesRead = int64(len(headerBuf)) + + klog.Infof("Using %d bytes from header as head buffer", len(headBuf)) + } else { + headBuf = make([]byte, syntheticHeadSize) + n, err := io.ReadFull(sourceReader, headBuf) + if err != nil && err != io.ErrUnexpectedEOF { + return ImageInfo{}, fmt.Errorf("error reading head: %w", err) + } + headBuf = headBuf[:n] + totalBytesRead = int64(n) + + klog.Infof("Read %d bytes as head buffer", n) + } + + tailBuf := NewTailBuffer(syntheticTailSize) + + if headerBuf != nil && len(headerBuf) > len(headBuf) { + remainingHeader := headerBuf[len(headBuf):] + klog.Infof("Adding %d bytes from remaining header to tail buffer", len(remainingHeader)) + tailBuf.Write(remainingHeader) + } + + klog.Infoln("Streaming remaining VMDK data through tail buffer...") + written, err := io.Copy(tailBuf, sourceReader) + if err != nil { + return ImageInfo{}, fmt.Errorf("error streaming to tail buffer: %w", err) + } + + totalSize := totalBytesRead + written + klog.Infof("VMDK total size: %d bytes (%.2f GB)", totalSize, float64(totalSize)/(1024*1024*1024)) + + syntheticPath, err := createSyntheticVMDK(headBuf, tailBuf, totalSize) + if err != nil { + return ImageInfo{}, fmt.Errorf("error creating synthetic VMDK: %w", err) + } + defer os.Remove(syntheticPath) + + klog.Infof("Created synthetic VMDK file: %s", syntheticPath) + + cmd := exec.CommandContext(ctx, "qemu-img", "info", "--output=json", syntheticPath) + rawOut, err := cmd.CombinedOutput() + if err != nil { + klog.Errorf("qemu-img failed on synthetic VMDK: %s", string(rawOut)) + return ImageInfo{}, fmt.Errorf("qemu-img info failed on synthetic VMDK: %w, output: %s", err, string(rawOut)) + } + + klog.Infof("qemu-img output: %s", string(rawOut)) + + var imageInfo ImageInfo + if err = json.Unmarshal(rawOut, &imageInfo); err != nil { + return ImageInfo{}, fmt.Errorf("error parsing qemu-img output: %w", err) + } + + return imageInfo, nil +} + +// getImageInfoStandard handles non-VMDK formats using the first 64MB of the file. +func getImageInfoStandard(ctx context.Context, formatSourceReaders *importer.FormatReaders, headerBuf []byte) (ImageInfo, error) { var tempImageInfoFile *os.File - var initialBuffer []byte + var err error - klog.Infoln("Write image info to temp file") + klog.Infoln("Write image info to temp file (standard approach)") { tempImageInfoFile, err = os.CreateTemp("", tempImageInfoPattern) if err != nil { @@ -485,16 +588,17 @@ func getImageInfo(ctx context.Context, sourceReader io.ReadCloser) (ImageInfo, e } defer os.Remove(tempImageInfoFile.Name()) - initialBuffer = make([]byte, imageInfoSize) - n, err := io.ReadFull(formatSourceReaders.TopReader(), initialBuffer) - if err != nil && !errors.Is(err, io.EOF) && err != io.ErrUnexpectedEOF { - return ImageInfo{}, fmt.Errorf("error reading from source: %w", err) + _, err = tempImageInfoFile.Write(headerBuf) + if err != nil { + return ImageInfo{}, fmt.Errorf("error writing header to temp file: %w", err) } - initialBuffer = initialBuffer[:n] - _, err = tempImageInfoFile.Write(initialBuffer) - if err != nil { - return ImageInfo{}, fmt.Errorf("error writing to temp file: %w", err) + remaining := imageInfoSize - int64(len(headerBuf)) + if remaining > 0 { + _, err = io.CopyN(tempImageInfoFile, formatSourceReaders.TopReader(), remaining) + if err != nil && !errors.Is(err, io.EOF) { + return ImageInfo{}, fmt.Errorf("error writing remaining data to temp file: %w", err) + } } if err = tempImageInfoFile.Close(); err != nil { @@ -507,28 +611,9 @@ func getImageInfo(ctx context.Context, sourceReader io.ReadCloser) (ImageInfo, e { cmd := exec.CommandContext(ctx, "qemu-img", "info", "--output=json", tempImageInfoFile.Name()) rawOut, err := cmd.CombinedOutput() - if err != nil { klog.Warningf("qemu-img info failed: %v", err) klog.Warningf("qemu-img output: %s", string(rawOut)) - - if shouldTrySyntheticApproach(string(rawOut)) { - klog.Infoln("Detected VMDK footer issue, trying synthetic approach") - - syntheticInfo, syntheticErr := tryGetImageInfoSynthetic( - ctx, - formatSourceReaders.TopReader(), - initialBuffer, - ) - - if syntheticErr == nil { - klog.Infoln("Synthetic approach succeeded") - return syntheticInfo, nil - } - - klog.Warningf("Synthetic approach failed: %v", syntheticErr) - } - return ImageInfo{}, fmt.Errorf("error running qemu-img info: %s: %w", string(rawOut), err) } @@ -573,7 +658,7 @@ func getImageInfo(ctx context.Context, sourceReader io.ReadCloser) (ImageInfo, e return ImageInfo{}, fmt.Errorf("error copying to nowhere: %w", err) } - imageInfo.VirtualSize = uint64(int64(len(initialBuffer)) + n) + imageInfo.VirtualSize = uint64(int64(len(headerBuf)) + n) return imageInfo, nil } @@ -612,86 +697,6 @@ func (w EmptyWriter) Write(p []byte) (int, error) { return len(p), nil } -func shouldTrySyntheticApproach(qemuImgOutput string) bool { - vmdkErrorIndicators := []string{ - "Invalid footer", - "footer", - "Footer", - "VMDK", - "vmdk", - "Could not open", - "invalid header", - "grain directory", - "Grain Directory", - "grain table", - "incomplete", - } - - lowerOutput := strings.ToLower(qemuImgOutput) - - for _, indicator := range vmdkErrorIndicators { - if strings.Contains(lowerOutput, strings.ToLower(indicator)) { - klog.Infof("Found VMDK error indicator: %s", indicator) - return true - } - } - - return false -} - -func tryGetImageInfoSynthetic(ctx context.Context, sourceReader io.Reader, initialBuffer []byte) (ImageInfo, error) { - klog.Infoln("Creating synthetic VMDK from head and tail buffers") - - headSize := syntheticHeadSize - if len(initialBuffer) < headSize { - headSize = len(initialBuffer) - } - headBuf := initialBuffer[:headSize] - - klog.Infof("Using head from initial buffer: %d bytes", len(headBuf)) - - tailBuf := NewTailBuffer(syntheticTailSize) - - if len(initialBuffer) > headSize { - remainingInitial := initialBuffer[headSize:] - klog.Infof("Adding remaining initial buffer to tail: %d bytes", len(remainingInitial)) - tailBuf.Write(remainingInitial) - } - - klog.Infoln("Streaming remaining data to tail buffer...") - written, err := io.Copy(tailBuf, sourceReader) - if err != nil { - return ImageInfo{}, fmt.Errorf("error streaming to tail: %w", err) - } - - totalSize := int64(len(initialBuffer)) + written - klog.Infof("Total file size: %d bytes (%.2f GB)", totalSize, float64(totalSize)/(1024*1024*1024)) - - syntheticPath, err := createSyntheticVMDK(headBuf, tailBuf, totalSize) - if err != nil { - return ImageInfo{}, fmt.Errorf("error creating synthetic VMDK: %w", err) - } - defer os.Remove(syntheticPath) - - klog.Infof("Created synthetic VMDK: %s", syntheticPath) - - cmd := exec.CommandContext(ctx, "qemu-img", "info", "--output=json", syntheticPath) - rawOut, err := cmd.CombinedOutput() - if err != nil { - klog.Errorf("qemu-img failed on synthetic VMDK: %s", string(rawOut)) - return ImageInfo{}, fmt.Errorf("qemu-img info failed on synthetic: %w, output: %s", err, string(rawOut)) - } - - klog.Infof("qemu-img output on synthetic VMDK: %s", string(rawOut)) - - var imageInfo ImageInfo - if err = json.Unmarshal(rawOut, &imageInfo); err != nil { - return ImageInfo{}, fmt.Errorf("error parsing qemu-img output: %w", err) - } - - return imageInfo, nil -} - func createSyntheticVMDK(headBuf []byte, tailBuf *TailBuffer, totalSize int64) (string, error) { tmpFile, err := os.CreateTemp("", "synthetic-*.vmdk") if err != nil { From a15612e8be6e2f6d3a18a3910c4918d720615ef9 Mon Sep 17 00:00:00 2001 From: Roman Sysoev Date: Tue, 10 Mar 2026 18:14:02 +0300 Subject: [PATCH 3/4] fix(dvcr): image virtual size Signed-off-by: Roman Sysoev --- images/dvcr-artifact/pkg/registry/registry.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/images/dvcr-artifact/pkg/registry/registry.go b/images/dvcr-artifact/pkg/registry/registry.go index eea3cd6c3c..3e5589746c 100644 --- a/images/dvcr-artifact/pkg/registry/registry.go +++ b/images/dvcr-artifact/pkg/registry/registry.go @@ -579,6 +579,7 @@ func getImageInfoVMDK(ctx context.Context, sourceReader io.Reader, headerBuf []b func getImageInfoStandard(ctx context.Context, formatSourceReaders *importer.FormatReaders, headerBuf []byte) (ImageInfo, error) { var tempImageInfoFile *os.File var err error + var bytesWrittenToTemp int64 klog.Infoln("Write image info to temp file (standard approach)") { @@ -588,17 +589,19 @@ func getImageInfoStandard(ctx context.Context, formatSourceReaders *importer.For } defer os.Remove(tempImageInfoFile.Name()) - _, err = tempImageInfoFile.Write(headerBuf) + n, err := tempImageInfoFile.Write(headerBuf) if err != nil { return ImageInfo{}, fmt.Errorf("error writing header to temp file: %w", err) } + bytesWrittenToTemp = int64(n) remaining := imageInfoSize - int64(len(headerBuf)) if remaining > 0 { - _, err = io.CopyN(tempImageInfoFile, formatSourceReaders.TopReader(), remaining) + n, err := io.CopyN(tempImageInfoFile, formatSourceReaders.TopReader(), remaining) if err != nil && !errors.Is(err, io.EOF) { return ImageInfo{}, fmt.Errorf("error writing remaining data to temp file: %w", err) } + bytesWrittenToTemp += n } if err = tempImageInfoFile.Close(); err != nil { @@ -658,7 +661,7 @@ func getImageInfoStandard(ctx context.Context, formatSourceReaders *importer.For return ImageInfo{}, fmt.Errorf("error copying to nowhere: %w", err) } - imageInfo.VirtualSize = uint64(int64(len(headerBuf)) + n) + imageInfo.VirtualSize = uint64(bytesWrittenToTemp + n) return imageInfo, nil } From cd1fe8a8382202550f80fa17c53b6fa76f0fb46a Mon Sep 17 00:00:00 2001 From: Roman Sysoev Date: Mon, 16 Mar 2026 02:38:22 +0300 Subject: [PATCH 4/4] fix(dvcr): resolve review comments Signed-off-by: Roman Sysoev --- .../dvcr-artifact/pkg/registry/imageinfo.go | 272 ++++++++++++++++ images/dvcr-artifact/pkg/registry/registry.go | 297 ------------------ .../dvcr-artifact/pkg/registry/tailbuffer.go | 65 ++++ 3 files changed, 337 insertions(+), 297 deletions(-) create mode 100644 images/dvcr-artifact/pkg/registry/imageinfo.go create mode 100644 images/dvcr-artifact/pkg/registry/tailbuffer.go diff --git a/images/dvcr-artifact/pkg/registry/imageinfo.go b/images/dvcr-artifact/pkg/registry/imageinfo.go new file mode 100644 index 0000000000..c55095f937 --- /dev/null +++ b/images/dvcr-artifact/pkg/registry/imageinfo.go @@ -0,0 +1,272 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registry + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "os" + "os/exec" + "strings" + + "k8s.io/klog/v2" + "kubevirt.io/containerized-data-importer/pkg/image" + "kubevirt.io/containerized-data-importer/pkg/importer" +) + +const ( + syntheticHeadSize = 10 * 1024 * 1024 + syntheticTailSize = 50 * 1024 * 1024 +) + +const ( + imageInfoSize = 64 * 1024 * 1024 + tempImageInfoPattern = "tempfile" + isoImageType = "iso" +) + +func getImageInfo(ctx context.Context, sourceReader io.ReadCloser) (ImageInfo, error) { + initialReadSize := syntheticHeadSize + headerBuf := make([]byte, initialReadSize) + n, err := io.ReadFull(sourceReader, headerBuf) + if err != nil && err != io.ErrUnexpectedEOF { + return ImageInfo{}, fmt.Errorf("error reading initial data: %w", err) + } + headerBuf = headerBuf[:n] + + combinedReader := io.MultiReader( + bytes.NewReader(headerBuf), + sourceReader, + ) + combinedReadCloser := io.NopCloser(combinedReader) + + formatSourceReaders, err := importer.NewFormatReaders(combinedReadCloser, 0) + if err != nil { + return ImageInfo{}, fmt.Errorf("error creating format readers: %w", err) + } + + knownHdrs := image.CopyKnownHdrs() + vmdkHeader, exists := knownHdrs["vmdk"] + + checkSize := min(len(headerBuf), 512) + if exists && vmdkHeader.Match(headerBuf[:checkSize]) { + return getImageInfoVMDK(ctx, formatSourceReaders.TopReader(), headerBuf) + } + + return getImageInfoStandard(ctx, formatSourceReaders, headerBuf) +} + +// getImageInfoVMDK obtains information about the VMDK image using a synthetic file. +// This approach is necessary because VMDK stores metadata (footer, Grain Directory) +// at the end of the file, and qemu-img cannot work with partial VMDK. +func getImageInfoVMDK(ctx context.Context, sourceReader io.Reader, headerBuf []byte) (ImageInfo, error) { + klog.Infoln("Get VMDK image info: prepare temp file with the first and last parts of the image data.") + + var headBuf []byte + var totalBytesRead int64 + + if headerBuf != nil && len(headerBuf) > 0 { + headSize := syntheticHeadSize + if len(headerBuf) < headSize { + headSize = len(headerBuf) + } + headBuf = headerBuf[:headSize] + totalBytesRead = int64(len(headerBuf)) + + klog.Infof("Using %d bytes from header as head buffer", len(headBuf)) + } else { + headBuf = make([]byte, syntheticHeadSize) + n, err := io.ReadFull(sourceReader, headBuf) + if err != nil && err != io.ErrUnexpectedEOF { + return ImageInfo{}, fmt.Errorf("error reading head: %w", err) + } + headBuf = headBuf[:n] + totalBytesRead = int64(n) + + klog.Infof("Read %d bytes as head buffer", n) + } + + tailBuf := NewTailBuffer(syntheticTailSize) + + if headerBuf != nil && len(headerBuf) > len(headBuf) { + remainingHeader := headerBuf[len(headBuf):] + klog.Infof("Adding %d bytes from remaining header to tail buffer", len(remainingHeader)) + tailBuf.Write(remainingHeader) + } + + klog.Infoln("Streaming remaining VMDK data through tail buffer...") + written, err := io.Copy(tailBuf, sourceReader) + if err != nil { + return ImageInfo{}, fmt.Errorf("error streaming to tail buffer: %w", err) + } + + totalSize := totalBytesRead + written + klog.Infof("VMDK total size: %d bytes (%.2f GB)", totalSize, float64(totalSize)/(1024*1024*1024)) + + syntheticPath, err := createSyntheticVMDK(headBuf, tailBuf, totalSize) + if err != nil { + return ImageInfo{}, fmt.Errorf("error creating synthetic VMDK: %w", err) + } + defer os.Remove(syntheticPath) + + klog.Infof("Created synthetic VMDK file: %s", syntheticPath) + + cmd := exec.CommandContext(ctx, "qemu-img", "info", "--output=json", syntheticPath) + rawOut, err := cmd.CombinedOutput() + if err != nil { + klog.Errorf("qemu-img failed on synthetic VMDK: %s", string(rawOut)) + return ImageInfo{}, fmt.Errorf("qemu-img info failed on synthetic VMDK: %w, output: %s", err, string(rawOut)) + } + + klog.Infof("qemu-img output: %s", string(rawOut)) + + var imageInfo ImageInfo + if err = json.Unmarshal(rawOut, &imageInfo); err != nil { + return ImageInfo{}, fmt.Errorf("error parsing qemu-img output: %w", err) + } + + return imageInfo, nil +} + +// getImageInfoStandard handles non-VMDK formats using the first 64MB of the file. +func getImageInfoStandard(ctx context.Context, formatSourceReaders *importer.FormatReaders, headerBuf []byte) (ImageInfo, error) { + var tempImageInfoFile *os.File + var err error + var bytesWrittenToTemp int64 + + klog.Infoln("Get image info: prepare temp file with the first 64Mi of the image data.") + { + tempImageInfoFile, err = os.CreateTemp("", tempImageInfoPattern) + if err != nil { + return ImageInfo{}, fmt.Errorf("error creating temp file: %w", err) + } + defer os.Remove(tempImageInfoFile.Name()) + + n, err := tempImageInfoFile.Write(headerBuf) + if err != nil { + return ImageInfo{}, fmt.Errorf("error writing header to temp file: %w", err) + } + bytesWrittenToTemp = int64(n) + + remaining := imageInfoSize - int64(len(headerBuf)) + if remaining > 0 { + n, err := io.CopyN(tempImageInfoFile, formatSourceReaders.TopReader(), remaining) + if err != nil && !errors.Is(err, io.EOF) { + return ImageInfo{}, fmt.Errorf("error writing remaining data to temp file: %w", err) + } + bytesWrittenToTemp += n + } + + if err = tempImageInfoFile.Close(); err != nil { + return ImageInfo{}, fmt.Errorf("error closing temp file: %w", err) + } + } + + klog.Infoln("Get image info from temp file") + var imageInfo ImageInfo + { + cmd := exec.CommandContext(ctx, "qemu-img", "info", "--output=json", tempImageInfoFile.Name()) + rawOut, err := cmd.CombinedOutput() + if err != nil { + klog.Warningf("qemu-img info failed: %v", err) + klog.Warningf("qemu-img output: %s", string(rawOut)) + return ImageInfo{}, fmt.Errorf("error running qemu-img info: %s: %w", string(rawOut), err) + } + + klog.Infoln("Qemu-img command output:", string(rawOut)) + + if err = json.Unmarshal(rawOut, &imageInfo); err != nil { + return ImageInfo{}, fmt.Errorf("error parsing qemu-img info output: %w", err) + } + + if imageInfo.Format != "raw" { + // It's necessary to read everything from the original image to avoid blocking. + _, err = io.Copy(&EmptyWriter{}, formatSourceReaders.TopReader()) + if err != nil { + return ImageInfo{}, fmt.Errorf("error copying to nowhere: %w", err) + } + + return imageInfo, nil + } + } + + // `qemu-img` command does not support getting information about iso files. + // It is necessary to obtain this information in another way (using the `file` command). + klog.Infoln("Check the image as it may be an iso") + { + cmd := exec.CommandContext(ctx, "file", "-b", tempImageInfoFile.Name()) + rawOut, err := cmd.Output() + if err != nil { + return ImageInfo{}, fmt.Errorf("error running file info: %w", err) + } + + out := string(rawOut) + + klog.Infoln("File command output:", out) + + if strings.HasPrefix(strings.ToLower(out), isoImageType) { + imageInfo.Format = isoImageType + } + + // Count uncompressed size of source image. + n, err := io.Copy(&EmptyWriter{}, formatSourceReaders.TopReader()) + if err != nil { + return ImageInfo{}, fmt.Errorf("error copying to nowhere: %w", err) + } + + imageInfo.VirtualSize = uint64(bytesWrittenToTemp + n) + + return imageInfo, nil + } +} + +func createSyntheticVMDK(headBuf []byte, tailBuf *TailBuffer, totalSize int64) (string, error) { + tmpFile, err := os.CreateTemp("", "synthetic-*.vmdk") + if err != nil { + return "", fmt.Errorf("error creating temp file: %w", err) + } + defer tmpFile.Close() + + _, err = tmpFile.Write(headBuf) + if err != nil { + os.Remove(tmpFile.Name()) + return "", fmt.Errorf("error writing head: %w", err) + } + + tailData := tailBuf.Bytes() + tailOffset := totalSize - int64(len(tailData)) + + if tailOffset > int64(len(headBuf)) { + _, err = tmpFile.Seek(tailOffset, io.SeekStart) + if err != nil { + os.Remove(tmpFile.Name()) + return "", fmt.Errorf("error seeking: %w", err) + } + } + + _, err = tmpFile.Write(tailData) + if err != nil { + os.Remove(tmpFile.Name()) + return "", fmt.Errorf("error writing tail: %w", err) + } + + return tmpFile.Name(), nil +} diff --git a/images/dvcr-artifact/pkg/registry/registry.go b/images/dvcr-artifact/pkg/registry/registry.go index 3e5589746c..9c6da2f747 100644 --- a/images/dvcr-artifact/pkg/registry/registry.go +++ b/images/dvcr-artifact/pkg/registry/registry.go @@ -18,21 +18,16 @@ package registry import ( "archive/tar" - "bytes" "context" "crypto/md5" "crypto/sha256" "crypto/tls" "encoding/hex" - "encoding/json" "errors" "fmt" "io" "net/http" - "os" - "os/exec" "path" - "strings" "time" "github.com/google/go-containerregistry/pkg/authn" @@ -44,8 +39,6 @@ import ( "github.com/google/go-containerregistry/pkg/v1/stream" "golang.org/x/sync/errgroup" "k8s.io/klog/v2" - "kubevirt.io/containerized-data-importer/pkg/image" - "kubevirt.io/containerized-data-importer/pkg/importer" "github.com/deckhouse/virtualization-controller/dvcr-importers/pkg/datasource" importerrs "github.com/deckhouse/virtualization-controller/dvcr-importers/pkg/errors" @@ -77,18 +70,6 @@ type ImageInfo struct { Format string `json:"format"` } -const ( - imageInfoSize = 64 * 1024 * 1024 - pipeBufSize = 64 * 1024 * 1024 - tempImageInfoPattern = "tempfile" - isoImageType = "iso" -) - -const ( - syntheticHeadSize = 10 * 1024 * 1024 - syntheticTailSize = 50 * 1024 * 1024 -) - type DataProcessor struct { ds datasource.DataSourceInterface destUsername string @@ -422,251 +403,6 @@ func populateCommonConfigFields(cnf *v1.ConfigFile) { }) } -type TailBuffer struct { - buffer []byte - size int - writePos int - full bool -} - -func NewTailBuffer(size int) *TailBuffer { - return &TailBuffer{ - buffer: make([]byte, size), - size: size, - } -} - -func (tb *TailBuffer) Write(p []byte) (n int, err error) { - n = len(p) - - for len(p) > 0 { - available := tb.size - tb.writePos - toCopy := len(p) - if toCopy > available { - toCopy = available - } - - copy(tb.buffer[tb.writePos:], p[:toCopy]) - tb.writePos += toCopy - p = p[toCopy:] - - if tb.writePos >= tb.size { - tb.writePos = 0 - tb.full = true - } - } - - return n, nil -} - -func (tb *TailBuffer) Bytes() []byte { - if !tb.full { - return tb.buffer[:tb.writePos] - } - - result := make([]byte, tb.size) - copy(result, tb.buffer[tb.writePos:]) - copy(result[tb.size-tb.writePos:], tb.buffer[:tb.writePos]) - return result -} - -func getImageInfo(ctx context.Context, sourceReader io.ReadCloser) (ImageInfo, error) { - initialReadSize := syntheticHeadSize - headerBuf := make([]byte, initialReadSize) - n, err := io.ReadFull(sourceReader, headerBuf) - if err != nil && err != io.ErrUnexpectedEOF { - return ImageInfo{}, fmt.Errorf("error reading initial data: %w", err) - } - headerBuf = headerBuf[:n] - - combinedReader := io.MultiReader( - bytes.NewReader(headerBuf), - sourceReader, - ) - combinedReadCloser := io.NopCloser(combinedReader) - - formatSourceReaders, err := importer.NewFormatReaders(combinedReadCloser, 0) - if err != nil { - return ImageInfo{}, fmt.Errorf("error creating format readers: %w", err) - } - - knownHdrs := image.CopyKnownHdrs() - vmdkHeader, exists := knownHdrs["vmdk"] - - checkSize := min(len(headerBuf), 512) - if exists && vmdkHeader.Match(headerBuf[:checkSize]) { - klog.Infoln("VMDK format detected, using VMDK-specific approach") - return getImageInfoVMDK(ctx, formatSourceReaders.TopReader(), headerBuf) - } - - klog.Infoln("Non-VMDK format detected, using standard approach") - return getImageInfoStandard(ctx, formatSourceReaders, headerBuf) -} - -// getImageInfoVMDK obtains information about the VMDK image using a synthetic file. -// This approach is necessary because VMDK stores metadata (footer, Grain Directory) -// at the end of the file, and qemu-img cannot work with partial VMDK. -func getImageInfoVMDK(ctx context.Context, sourceReader io.Reader, headerBuf []byte) (ImageInfo, error) { - klog.Infoln("Processing VMDK using synthetic file approach (head + tail)") - - var headBuf []byte - var totalBytesRead int64 - - if headerBuf != nil && len(headerBuf) > 0 { - headSize := syntheticHeadSize - if len(headerBuf) < headSize { - headSize = len(headerBuf) - } - headBuf = headerBuf[:headSize] - totalBytesRead = int64(len(headerBuf)) - - klog.Infof("Using %d bytes from header as head buffer", len(headBuf)) - } else { - headBuf = make([]byte, syntheticHeadSize) - n, err := io.ReadFull(sourceReader, headBuf) - if err != nil && err != io.ErrUnexpectedEOF { - return ImageInfo{}, fmt.Errorf("error reading head: %w", err) - } - headBuf = headBuf[:n] - totalBytesRead = int64(n) - - klog.Infof("Read %d bytes as head buffer", n) - } - - tailBuf := NewTailBuffer(syntheticTailSize) - - if headerBuf != nil && len(headerBuf) > len(headBuf) { - remainingHeader := headerBuf[len(headBuf):] - klog.Infof("Adding %d bytes from remaining header to tail buffer", len(remainingHeader)) - tailBuf.Write(remainingHeader) - } - - klog.Infoln("Streaming remaining VMDK data through tail buffer...") - written, err := io.Copy(tailBuf, sourceReader) - if err != nil { - return ImageInfo{}, fmt.Errorf("error streaming to tail buffer: %w", err) - } - - totalSize := totalBytesRead + written - klog.Infof("VMDK total size: %d bytes (%.2f GB)", totalSize, float64(totalSize)/(1024*1024*1024)) - - syntheticPath, err := createSyntheticVMDK(headBuf, tailBuf, totalSize) - if err != nil { - return ImageInfo{}, fmt.Errorf("error creating synthetic VMDK: %w", err) - } - defer os.Remove(syntheticPath) - - klog.Infof("Created synthetic VMDK file: %s", syntheticPath) - - cmd := exec.CommandContext(ctx, "qemu-img", "info", "--output=json", syntheticPath) - rawOut, err := cmd.CombinedOutput() - if err != nil { - klog.Errorf("qemu-img failed on synthetic VMDK: %s", string(rawOut)) - return ImageInfo{}, fmt.Errorf("qemu-img info failed on synthetic VMDK: %w, output: %s", err, string(rawOut)) - } - - klog.Infof("qemu-img output: %s", string(rawOut)) - - var imageInfo ImageInfo - if err = json.Unmarshal(rawOut, &imageInfo); err != nil { - return ImageInfo{}, fmt.Errorf("error parsing qemu-img output: %w", err) - } - - return imageInfo, nil -} - -// getImageInfoStandard handles non-VMDK formats using the first 64MB of the file. -func getImageInfoStandard(ctx context.Context, formatSourceReaders *importer.FormatReaders, headerBuf []byte) (ImageInfo, error) { - var tempImageInfoFile *os.File - var err error - var bytesWrittenToTemp int64 - - klog.Infoln("Write image info to temp file (standard approach)") - { - tempImageInfoFile, err = os.CreateTemp("", tempImageInfoPattern) - if err != nil { - return ImageInfo{}, fmt.Errorf("error creating temp file: %w", err) - } - defer os.Remove(tempImageInfoFile.Name()) - - n, err := tempImageInfoFile.Write(headerBuf) - if err != nil { - return ImageInfo{}, fmt.Errorf("error writing header to temp file: %w", err) - } - bytesWrittenToTemp = int64(n) - - remaining := imageInfoSize - int64(len(headerBuf)) - if remaining > 0 { - n, err := io.CopyN(tempImageInfoFile, formatSourceReaders.TopReader(), remaining) - if err != nil && !errors.Is(err, io.EOF) { - return ImageInfo{}, fmt.Errorf("error writing remaining data to temp file: %w", err) - } - bytesWrittenToTemp += n - } - - if err = tempImageInfoFile.Close(); err != nil { - return ImageInfo{}, fmt.Errorf("error closing temp file: %w", err) - } - } - - klog.Infoln("Get image info from temp file") - var imageInfo ImageInfo - { - cmd := exec.CommandContext(ctx, "qemu-img", "info", "--output=json", tempImageInfoFile.Name()) - rawOut, err := cmd.CombinedOutput() - if err != nil { - klog.Warningf("qemu-img info failed: %v", err) - klog.Warningf("qemu-img output: %s", string(rawOut)) - return ImageInfo{}, fmt.Errorf("error running qemu-img info: %s: %w", string(rawOut), err) - } - - klog.Infoln("Qemu-img command output:", string(rawOut)) - - if err = json.Unmarshal(rawOut, &imageInfo); err != nil { - return ImageInfo{}, fmt.Errorf("error parsing qemu-img info output: %w", err) - } - - if imageInfo.Format != "raw" { - // It's necessary to read everything from the original image to avoid blocking. - _, err = io.Copy(&EmptyWriter{}, formatSourceReaders.TopReader()) - if err != nil { - return ImageInfo{}, fmt.Errorf("error copying to nowhere: %w", err) - } - - return imageInfo, nil - } - } - - // `qemu-img` command does not support getting information about iso files. - // It is necessary to obtain this information in another way (using the `file` command). - klog.Infoln("Check the image as it may be an iso") - { - cmd := exec.CommandContext(ctx, "file", "-b", tempImageInfoFile.Name()) - rawOut, err := cmd.Output() - if err != nil { - return ImageInfo{}, fmt.Errorf("error running file info: %w", err) - } - - out := string(rawOut) - - klog.Infoln("File command output:", out) - - if strings.HasPrefix(strings.ToLower(out), isoImageType) { - imageInfo.Format = isoImageType - } - - // Count uncompressed size of source image. - n, err := io.Copy(&EmptyWriter{}, formatSourceReaders.TopReader()) - if err != nil { - return ImageInfo{}, fmt.Errorf("error copying to nowhere: %w", err) - } - - imageInfo.VirtualSize = uint64(bytesWrittenToTemp + n) - - return imageInfo, nil - } -} - func destNameOptions(destInsecure bool) []name.Option { nameOpts := []name.Option{} @@ -699,36 +435,3 @@ type EmptyWriter struct{} func (w EmptyWriter) Write(p []byte) (int, error) { return len(p), nil } - -func createSyntheticVMDK(headBuf []byte, tailBuf *TailBuffer, totalSize int64) (string, error) { - tmpFile, err := os.CreateTemp("", "synthetic-*.vmdk") - if err != nil { - return "", fmt.Errorf("error creating temp file: %w", err) - } - defer tmpFile.Close() - - _, err = tmpFile.Write(headBuf) - if err != nil { - os.Remove(tmpFile.Name()) - return "", fmt.Errorf("error writing head: %w", err) - } - - tailData := tailBuf.Bytes() - tailOffset := totalSize - int64(len(tailData)) - - if tailOffset > int64(len(headBuf)) { - _, err = tmpFile.Seek(tailOffset, io.SeekStart) - if err != nil { - os.Remove(tmpFile.Name()) - return "", fmt.Errorf("error seeking: %w", err) - } - } - - _, err = tmpFile.Write(tailData) - if err != nil { - os.Remove(tmpFile.Name()) - return "", fmt.Errorf("error writing tail: %w", err) - } - - return tmpFile.Name(), nil -} diff --git a/images/dvcr-artifact/pkg/registry/tailbuffer.go b/images/dvcr-artifact/pkg/registry/tailbuffer.go new file mode 100644 index 0000000000..a8e6b26e1a --- /dev/null +++ b/images/dvcr-artifact/pkg/registry/tailbuffer.go @@ -0,0 +1,65 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registry + +type TailBuffer struct { + buffer []byte + size int + writePos int + full bool +} + +func NewTailBuffer(size int) *TailBuffer { + return &TailBuffer{ + buffer: make([]byte, size), + size: size, + } +} + +func (tb *TailBuffer) Write(p []byte) (n int, err error) { + n = len(p) + + for len(p) > 0 { + available := tb.size - tb.writePos + toCopy := len(p) + if toCopy > available { + toCopy = available + } + + copy(tb.buffer[tb.writePos:], p[:toCopy]) + tb.writePos += toCopy + p = p[toCopy:] + + if tb.writePos >= tb.size { + tb.writePos = 0 + tb.full = true + } + } + + return n, nil +} + +func (tb *TailBuffer) Bytes() []byte { + if !tb.full { + return tb.buffer[:tb.writePos] + } + + result := make([]byte, tb.size) + copy(result, tb.buffer[tb.writePos:]) + copy(result[tb.size-tb.writePos:], tb.buffer[:tb.writePos]) + return result +}