How to use io.pipe effectively to upload file with multi-part over http?

Background :

Upload the file to s3 storage by pre-signed URL.File size is around 500MB.
Needed some kind of streaming and less memory utilization.

    type TraceFile struct {
    	systemPath string
    	fileName   string
    }

    func uploadToS3(URL string, traceFile TraceFile) error {

    	fileSize, err := util.GetFileSize(traceFile.systemPath)
    	if err != nil {
    		return err
    	}
    	// S3 does not accept chunked multipart MIME without contentLength 
    	contentLength := emptyMultipartSize("file", traceFile.fileName) + fileSize

    	readBody, writeBody := io.Pipe()
    	defer readBody.Close()

    	form := multipart.NewWriter(writeBody)
    	errChan := make(chan error, 1)
    	go flushToWriter(errChan, writeBody, traceFile)
    	req, err := http.NewRequest(http.MethodPut, URL, readBody)
    	req.Header.Set("Content-Type", form.FormDataContentType())
    	if contentLength > 0 {
    		req.ContentLength = contentLength
    	}
    	res, err := http.DefaultClient.Do(req)
    	log.Info("Response from s3 ", res)
    	if err != nil {
    		log.Error("Uploading failed due to", err)
                     //how to notify flushToWriter routine to stop?
    		<-errChan
    		return err
    	}
           //how to stop sending request in case of error in flushToWriter 
    	return <-errChan
    }

    func emptyMultipartSize(fieldname, filename string) int64 {
    	body := &bytes.Buffer{}
    	form := multipart.NewWriter(body)
    	form.CreateFormFile(fieldname, filename)
    	form.Close()
    	return int64(body.Len())
    }

func flushToWriter(errChan chan error, writeBody *io.PipeWriter, traceFile TraceFile) {

	form := multipart.NewWriter(writeBody)
	defer writeBody.Close()

	filePart, err := form.CreateFormFile(cnst.MultiPartFieldName, traceFile.fileName)
	if err != nil {
		errChan <- err
		return
	}

	err = copyToWriter(traceFile.systemPath, filePart)
	if err != nil {
		errChan <- err
		return
	}

	errChan <- form.Close()
}

func copyToWriter(path string, filePart io.Writer) error {
	file, err := os.Open(path)
	if err != nil {
		return err
	}
	defer file.Close()

	fi, err := file.Stat()
	if err != nil {
		return err
	}
	_, err = io.CopyN(filePart, file, fi.Size())
	return err
}

My Questions :

  1. How to notify the error from uploadToS3 to flushToWriter ? (question is there at code comment)
  2. How to stop the request to endpoint when an error comes from flushToWriter ? (question is there at code comment)
  3. Have i used pipe correctly ?

Please suggest ,if there are still room for any improvement .
Note: the code doesn’t give any problem for success case. I am just taking precaution for failure cases and better error handling.

Observation :

  • wrong endpoint(error at uploadS3 function) ,code is exiting immediately by giving proper http error.
  • In case of error at flushToWriter function ,code is exiting immediately by giving error with http error only like:

http: ContentLength=189849860 with Body length 192

But actual error occurred at flushToWriter , which was not able to propagated to uploadS3 function.But it enters the error block below :

if err != nil {
      log.Error("Uploading failed due to", err)
       <-errChan
       return err
   }

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.