Bytes.MakeSlice in goroutines creates abnormal memory usage

In correlation to this issue, I created this post https://github.com/golang/go/issues/14089
I can report on this issue to, since I am working on a solution for 3-4 days now.

I deployed some code using a pattern inspired by http://nesv.github.io/golang/2014/02/25/worker-queues-in-go.html to production and I noticed the memory usage growing steadily over time with a FIXED size of Workers (35), that means there shouldn’t be more than 35 workers (with a JobQueue for each Worker of size 35) running code in parallel.
The code is very simple: It uses a global var httpClient to GET a file async, writes it into a channel, that channel is consumed by the HttpHandler that kicked off that download job and then the HttpHandler kicks off another job to upload the file to a server. See my analysis below.

// some DownloadFileFunction
func download() (*bytes.Buffer, err){
response, err := defaultHttpClient.Do(req)
//..
defer response.Body.Close() //yadayada
buf := bytes.NewBuffer(make([]byte, 32 * 1024)) // io.Copy makes this anyway if I pass in an empty one, I also tried it with size 0
_, err = io.Copy(buf, response.Body)
return buf, err // buf is obviously *bytes.Buffer
}

func  Run() {
    go func() {
        for {
            select {
            case job := <-w.Job:
                            buf, err := download(job.URL.String())
                            job.dataChannel <- buf.Bytes()
            }
        }
        }
}

func httpHandler(....){
// fill jobQueue with jobs to be consumed by Worker goroutines
dataChan, errChan = AddDownloadJob(dlJobCollector, u.String())
// wait for error
err = <-errChan
// no error? receive on the dataChannel
data = <-dataChan
// close after receiving
close(dataChan)
// pass data []byte to another goroutine for Upload
errChan = AddUploadJob(uploadJobCollector, data)
}

Using go tool pprof -inuse_space mybinary

(flat,cum)
  .    61.25MB     98:   _, err = io.Copy(buf, response.Body) 
  .    61.75MB    168:  buf, err := download(job.URL.String())
(pprof) top
81.35MB of 82.35MB total (98.79%)
Dropped 491 nodes (cum <= 0.41MB)
Showing top 10 nodes out of 58 (cum >= 0.50MB)
      flat  flat%   sum%        ■■■   ■■■%
   61.25MB 74.38% 74.38%    61.25MB 74.38%  bytes.makeSlice
    8.84MB 10.73% 85.11%     8.84MB 10.73%  main.fileReadAppend
    3.50MB  4.25% 89.36%     3.50MB  4.25%  net/textproto.(*Reader).ReadLine
       3MB  3.64% 93.01%    12.55MB 15.24%  main.uploadHttpHandler
       2MB  2.43% 95.44%        2MB  2.43%  main.(*UploadWorker).Run.func1
    0.54MB  0.66% 96.96%     0.54MB  0.66%  html.init
    0.50MB  0.61% 97.57%     0.50MB  0.61%  crypto/tls.(*block).reserve
    0.50MB  0.61% 98.18%     0.50MB  0.61%  encoding/pem.Decode
    0.50MB  0.61% 98.79%     0.50MB  0.61%  reflect.unsafe_NewArray

I tried this with difference approaches with ioutil etc, and on a high traffic server this quickly runs into 500MB - 1000MB memory usage just for bytes.makeSlice.
I even tried lowering GOGC to 30.

IMO i’ve never found memory profiling to be enlightening. Can you try running your program with

 GODEBUG=gctrace=1 

set, and look at what the garbage collector thinks is allocated.

How are you measuring memory usage ? You might find https://github.com/davecheney/gcvis useful.

This a few minutes with GOGC=30 and gctrace=1 using gcvis :frowning:

I am measuring memory usage using go tool pprof and the systems ressource monitor.

/uploads/default/original/2X/e/e2c5f665c464d99a78ed97b8e65687da36d72138.png

My guess is a goroutine leak. Can you post your complete source code, or at least a complete runnable example.

You can confirm this is a goroutine leak by running your program with

GOTRACEBACK=all

And killing your program with

kill -QUIT $PID

Which will print a stack trace of every running goroutine, if there are hundreds of thousands of goroutines, all blocked on something, there’s your problem.

/debug/pprof/

profiles:

0block

105goroutine

1129heap

11threadcreate

^--------/debug/pprof/

Using “boom” (like ab) to throw 50-100 concurrent requests at the server creates 600-1000 goroutines according to pprof.

As I am still on 1.5.3 I had to use GOTRACEBACK=crash instead of all
I dont know if go “recycles” goroutine “ids”, but the dump gave me routines 1-100+ and a lot of

goroutine 30366 [chan receive]:
goroutine 30367 [io wait]:
goroutine 30368 [chan select]:

I suppose it is not normal to have 30k Goroutines if I limit my worker pool, what I think I do.

I composed most of the code from multiple files into one file and stripped away any non related code. I use the azure-go-sdk with my own fork which currently only adds 1 header to the request specifying the Content-Type.
I hacked it a bit together from different files, so the imports are somewhat duplicated etc.

package main

import(
"encoding/base64"
"fmt"
"log"
"net/http"
"strconv"
"time"
"path/filepath"
"bytes"
"encoding/json"
"errors"
"fmt"
"log"
"math"
"net/http"
"net/url"
"sort"
"strconv"
"strings"
"text/template"
"time"
"bytes"
"crypto/sha1"
"errors"
"fmt"
"io"
"log"
"math"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"

_ "expvar"

"os"
"runtime/debug"
"strconv"
_ "net/http/pprof"

"github.com/gorilla/mux"
"github.com/daemonfire300/azure-sdk-for-go/storage"
"github.com/fatih/color"
_ "github.com/go-sql-driver/mysql"
"github.com/pkg/profile"
)

var defaultHttpClient *http.Client

func main(){
defaultHttpClient = &http.Client{
       Timeout: 180 * time.Second,
}

uploadJobCollector = NewUploadJobCollector()
SetupUploadWorkers(uploadJobCollector, WorkerLimit)
uploadJobCollector.Loop()

dlJobCollector = NewDownloadJobCollector()
SetupDownloadWorkers(dlJobCollector, DownloadWorkerLimit)
dlJobCollector.Loop()

r := mux.NewRouter()
r.HandleFunc("/admin/download", contentTypeWrapper(uploadByURLHandler, "application/json")).Methods("GET")
port := "1234"
http.ListenAndServe(":"+port, nil)
}

func contentTypeWrapper(handler http.HandlerFunc, contentType string) http.HandlerFunc {
       return func(w http.ResponseWriter, r *http.Request) {
              w.Header().Set("Content-Type", contentType)
              handler(w, r)
       }
}

func UploadServiceUpload(containerName string, blobName string, blobBytesPtr []byte, byteSize int, chunkCount int) error {
       var blockList []storage.Block
       var idx []byte
       var err error
       cl, _ := storage.NewBasicClient("yourAzureAccountName", "yourAzureAccountKey")
       bs := cl.GetBlobService()
       blobBytes := blobBytesPtr
       ec := &errorCollector{}
       ok, err := bs.CreateContainerIfNotExists(containerName, storage.ContainerAccessTypeBlob)
       if ok == true {
              // created new container
       }
       ec.collect(err)
       contentType := http.DetectContentType(blobBytes)
       log.Println("Creating blockblob in", "Container", containerName, "Blobname", blobName, len(blobName), "Content-Type:", contentType)
       err = bs.CreateBlockBlobWithContentType(containerName, blobName, contentType) // I forked https://github.com/daemonfire300/azure-sdk-for-go/blob/master/storage/blob.go
       //err = bs.CreateBlockBlob(containerName, blobName) // official Go Azure SDK
       ec.collect(err)
       for i := 0; i < chunkCount; i++ {
              idx = []byte("idx" + strconv.Itoa(i))
              err = bs.PutBlock(containerName, blobName, base64.StdEncoding.EncodeToString(idx), blobBytes[(byteSize/chunkCount)*i:(byteSize/chunkCount)*(i+1)])
              ec.collect(err)
              if err != nil {
                     log.Println("Err: BlockID", string(idx), base64.StdEncoding.EncodeToString(idx))
              }
              block := &storage.Block{}
              block.ID = base64.StdEncoding.EncodeToString(idx)
              block.Status = storage.BlockStatusUncommitted
              blockList = append(blockList, *block)
       }
       //err = bs.PutBlockList(containerName, blobName, blockList)
       err = bs.PutBlockListWithContentType(containerName, blobName, blockList, contentType)
       ec.collect(err)
       return ec.first()
}


func handleUploadRequest(ic ImageContainer) error {
       fileSize := len(ic.Contents)
       if fileSize == 0 {
              return errors.New("[UPLOAD ERROR] Empty file, cannot upload")
       }
       chunkCount := int(math.Ceil(float64(fileSize) / (float64(4) * math.Pow(10, 6))))
       err := uploadServiceUpload(ic.ContainerName, ic.FileName, ic.Contents, fileSize, chunkCount)
       if err == nil {
              return err
       }
       return err
}

const (
       WorkerLimit    = 35
       WorkerJobLimit = 35
)

type UploadJob struct {
       ImageContainer ImageContainer
       Done           chan error
}

type UploadWorker struct {
       ID        int
       Job       chan UploadJob
       JobQueue  chan chan UploadJob
       Quit      chan bool
       PanicChan chan int
}

type UploadJobCollector struct {
       JobQueue    chan UploadJob
       WorkerQueue chan chan UploadJob
       Workers     map[int]*UploadWorker
       WorkerPanic chan int
}

func AddUploadJob(c *UploadJobCollector, ic ImageContainer) chan error {
       uplDone := make(chan error)
       c.Collect(NewUploadJob(ic, uplDone))
       return uplDone
}

func (c *UploadJobCollector) Loop() {
       go func() {
              for {
                     select {
                     case job := <-c.JobQueue:
                            log.Println("Job IN")
                            go func() {
                                   worker := <-c.WorkerQueue
                                   log.Println("Job SEND")
                                   worker <- job
                            }()
                     case panickedWorker := <-c.WorkerPanic:
                            log.Println(fmt.Sprintln("Worker %d panicked, killing now and setting up new", panickedWorker))
                            c.Workers[panickedWorker].Stop()
                            delete(c.Workers, panickedWorker)
                            w := NewUploadWorker(panickedWorker, c.WorkerQueue, c.WorkerPanic)
                            c.Workers[panickedWorker] = w
                            w.Run()
                     }
              }
       }()
}

func (c *UploadJobCollector) Collect(job UploadJob) {
       c.JobQueue <- job
}

func NewUploadJobCollector() (c *UploadJobCollector) {
       c = &UploadJobCollector{
              JobQueue:    make(chan UploadJob, WorkerJobLimit),
              WorkerQueue: make(chan chan UploadJob, WorkerLimit),
              Workers:     make(map[int]*UploadWorker),
              WorkerPanic: make(chan int, 1),
       }
       return
}

func NewUploadWorker(id int, jobQueue chan chan UploadJob, panicChan chan int) (w *UploadWorker) {
       w = &UploadWorker{
              ID:        id,
              Job:       make(chan UploadJob, 1),
              JobQueue:  jobQueue,
              Quit:      make(chan bool),
              PanicChan: panicChan,
       }
       return
}

func NewUploadJob(ic ImageContainer, done chan error) (j UploadJob) {
       j = UploadJob{
              ImageContainer: ic,
              Done:           done,
       }
       return
}

func SetupUploadWorkers(c *UploadJobCollector, nWorkers int) {
       for i := 0; i < nWorkers; i++ {
              w := NewUploadWorker(i, c.WorkerQueue, c.WorkerPanic)
              c.Workers[i] = w
              w.Run()
       }
}

func (w *UploadWorker) Name() string {
       return fmt.Sprintf("UploadWorker#%d", w.ID)
}

func (w *UploadWorker) Run() {
       go func() {

              for {
                     w.JobQueue <- w.Job
                     select {
                     case job := <-w.Job:
                            defer func() {
                                   if panicErr := recover(); panicErr != nil {
                                          workerFail(w)
                                          log.Println("UploadWorker has panicked, sending kill comand")
                                          close(w.Job)
                                          job.Done <- panicErr.(error)
                                          w.PanicChan <- w.ID

                                   }
                            }()
                            ic := job.ImageContainer
                            log.Println(fmt.Sprintf("Worker %d starting work on %s for user %d with token %s", w.ID, ic.FileName, ic.UserID, ic.ContainerName))
                            uploadedBytes := len(job.ImageContainer.Contents)
                            err := handleUploadRequest(ic)
                            if err != nil {
                                   workerFail(w)
                                   log.Println(fmt.Sprintf("Worker %d", w.ID), err)
                            }
                            job.Done <- err
                            log.Println(fmt.Sprintf("Worker %d finished upload of %s, uploaded %d bytes", w.ID, job.ImageContainer.URL, uploadedBytes))
                     case <-w.Quit:
                            log.Println(fmt.Sprintf("Stopping Worker %d at %s", w.ID, time.Now().Format(time.RFC1123)))
                            return
                     }
              }
       }()
}

func (w *UploadWorker) Stop() {
       go func() {
              w.Quit <- true
       }()
}


const (
       DownloadWorkerLimit    = 35
       DownloadWorkerJobLimit = 35
)

type Worker interface {
       Name() string
}

type DownloadJob struct {
       URL  *url.URL
       Done chan error
       Data chan []byte
}

type DownloadWorker struct {
       ID        int
       Job       chan DownloadJob
       JobQueue  chan chan DownloadJob
       Quit      chan bool
       PanicChan chan int
}

type DownloadJobCollector struct {
       JobQueue    chan DownloadJob
       WorkerQueue chan chan DownloadJob
       Workers     map[int]*DownloadWorker
       WorkerPanic chan int
}

func (j *DownloadJob) SendDone(err error) {
       j.Done <- err
}

func (j *DownloadJob) SendData(data []byte) {
       j.Data <- data
}

func AddDownloadJob(c *DownloadJobCollector, urlString string) (chan []byte, chan error) {
       uplDone := make(chan error)
       data := make(chan []byte)
       u, err := url.Parse(urlString)
       if err != nil {
              uplDone <- err
              return data, uplDone
       }
       c.Collect(NewDownloadJob(u, uplDone, data))
       return data, uplDone
}

func (c *DownloadJobCollector) Loop() {
       go func() {
              for {
                     select {
                     case job := <-c.JobQueue:
                            log.Println("Job IN")
                            go func() {
                                   worker := <-c.WorkerQueue
                                   log.Println("Job SEND")
                                   worker <- job
                            }()
                     case panickedWorker := <-c.WorkerPanic:
                            log.Println(fmt.Sprintln("Worker %d panicked, killing now and setting up new", panickedWorker))
                            c.Workers[panickedWorker].Stop()
                            delete(c.Workers, panickedWorker)
                            w := NewDownloadWorker(panickedWorker, c.WorkerQueue, c.WorkerPanic)
                            c.Workers[panickedWorker] = w
                            w.Run()
                     }
              }
       }()
}

func (c *DownloadJobCollector) Collect(job DownloadJob) {
       c.JobQueue <- job
}

func NewDownloadJobCollector() (c *DownloadJobCollector) {
       c = &DownloadJobCollector{
              JobQueue:    make(chan DownloadJob, WorkerJobLimit),
              WorkerQueue: make(chan chan DownloadJob, WorkerLimit),
              Workers:     make(map[int]*DownloadWorker),
              WorkerPanic: make(chan int, 1),
       }
       return
}

func NewDownloadWorker(id int, jobQueue chan chan DownloadJob, panicChan chan int) (w *DownloadWorker) {
       w = &DownloadWorker{
              ID:        id,
              Job:       make(chan DownloadJob, 1),
              JobQueue:  jobQueue,
              Quit:      make(chan bool),
              PanicChan: panicChan,
       }
       return
}

func NewDownloadJob(u *url.URL, done chan error, data chan []byte) (j DownloadJob) {
       j = DownloadJob{
              URL:  u,
              Done: done,
              Data: data,
       }
       return
}

func SetupDownloadWorkers(c *DownloadJobCollector, nWorkers int) {
       for i := 0; i < nWorkers; i++ {
              w := NewDownloadWorker(i, c.WorkerQueue, c.WorkerPanic)
              c.Workers[i] = w
              w.Run()
       }
}

func (w *DownloadWorker) Name() string {
       return fmt.Sprintf("DownloadWorker#%d", w.ID)
}

type NoError struct {
}

func (e *NoError) Error() string {
       return "No Error"
}

func (w *DownloadWorker) Run() {
       go func() {
              defer func() {
                     if panicErr := recover(); panicErr != nil {
                            workerFail(w)
                            log.Println("Worker has panicked, sending kill comand")
                            waited := 0
                            for {
                                   select {
                                   case job := <-w.Job:
                                          if waited > 4 {
                                                 job.SendDone(panicErr.(error))
                                                 job.SendData(nil)
                                                 close(w.Job)
                                                 w.PanicChan <- w.ID
                                                 break
                                          }

                                   default:
                                          time.Sleep(time.Millisecond * 100)
                                          waited++
                                   }
                            }

                     }
              }()

              for {
                     w.JobQueue <- w.Job
                     select {
                     case job := <-w.Job:
                            log.Println(fmt.Sprintf("Worker %d starting download of %s", w.ID, job.URL.String()))
                            buf, err := downloadFromURL(job.URL.String())
                            if err != nil {
                                   workerFail(w)
                                   job.SendDone(err)
                                   job.SendData(buf.Bytes())
                            }
                            if buf.Len() == 0 {
                                   workerFail(w)
                                   job.SendDone(errors.New("Empty file"))
                                   job.SendData(buf.Bytes())
                            }
                            job.SendDone(nil)
                            job.SendData(buf.Bytes())
                            log.Println(fmt.Sprintf("Worker %d finished download of %s", w.ID, job.URL.String()))
                     case <-w.Quit:
                            log.Println(fmt.Sprintf("Stopping Worker %d at %s", w.ID, time.Now().Format(time.RFC1123)))
                            return
                     }
              }
       }()
}

func workerFail(w Worker) {
       log.Println(fmt.Sprintf("Worker %s failed to run job", w.Name()))
}

func (w *DownloadWorker) Stop() {
       w.Quit <- true
}

func fileReadAppend(file io.Reader, partialBuf []byte) ([]byte, error) {
       var fileBuf []byte
       fileBuf = append(fileBuf, partialBuf...)
       for {
              n, err := file.Read(partialBuf)
              if err != nil && err != io.EOF {
                     return nil, err
              }
              if n == 0 {
                     break
              }
              fileBuf = append(fileBuf, partialBuf...)
       }
       return fileBuf, nil
}


func readFile(r *http.Request) (string, []byte, error) {
       file, _, err := r.FormFile("picture")
       if err != nil {
              file, _, err = r.FormFile("file")
       }
       if err != nil {
              return "", nil, err
       }
       defer file.Close()
       var fileBuf []byte
       partialBuf, err := fileReadPartial(file, 512)
       if err != nil {
              return "", nil, err
       }
       contentType := http.DetectContentType(partialBuf)
       fileExtension := _MIMETypeToFileExtention(contentType)
       fileBuf, err = fileReadAppend(file, partialBuf)
       if err != nil {
              return "", nil, err
       }
       return fileExtension, fileBuf, err
}

func downloadFromURL(urlString string) (*bytes.Buffer, error) {
       if urlString == "" {
              return bytes.NewBuffer(make([]byte, 0)), errors.New("Empty URL String for download")
       }
       tokens := strings.Split(urlString, "/")
       fileName := tokens[len(tokens)-1]
       log.Println("Downloading", urlString, "with fileName", fileName)

       req, err := http.NewRequest("GET", urlString, nil)
       if err != nil {
              return bytes.NewBuffer(make([]byte, 0)), errors.New("Error while creating download request")
       }
       req.Header.Set("Connection", "close")
       req.Close = true
       /*client := &http.Client{
              Timeout: 45 * time.Second,
       }*/
       s := unixTimestamp()
       //response, err := client.Do(req)
       response, err := defaultHttpClient.Do(req)

       //response, err := http.Get(urlString)

       if err != nil {
              log.Println("Error while downloading", urlString, "-", err)
              return bytes.NewBuffer(make([]byte, 0)), errors.New("Error while downloading")
       }
       defer func(){
              log.Println("Closing http body")
              response.Body.Close()
       }()
       buf := bytes.NewBuffer(make([]byte, 32 * 1024))
       //rawBuf, err := ioutil.ReadAll(response.Body)
       _, err = io.Copy(buf, response.Body)
       //buf := bytes.NewBuffer(rawBuf)
       if err != nil {
              log.Println("Error while downloading copy from response.Body")
              return buf, err
       }

       e := float64((unixTimestamp() - s)) * math.Pow10(-9)
       log.Println(buf.Len(), fmt.Sprintf("bytes downloaded. Taking %f seconds", e))
       return buf, err
}

func _MIMETypeToFileExtention(mimetype string) string {
       var extension string
       switch mimetype {
       case "image/jpg":
              extension = "jpeg"
              break
       case "image/jpeg":
              extension = "jpeg"
              break
       case "image/png":
              extension = "png"
              break
       case "image/gif":
              extension = "gif"
              break
       default:
              extension = "jpg"
       }
       return extension
}

type ImageContainer struct {
       IsOnline      bool
       BackupPath    string
       ContainerName string
       FileName      string
       URL           string
       ImageType     string
       Extension     string
       UserID        int64
       ID            int64
       Contents      []byte
}

func NewImageContainerSimple(fileBuf []byte, userID int64, fileName string, containerName string, imageType string, extension string, url string) ImageContainer {
       ic := ImageContainer{
              IsOnline:      false,
              BackupPath:    "",
              ContainerName: containerName,
              FileName:      fileName,
              URL:           url,
              ImageType:     imageType,
              Extension:     extension,
              UserID:        userID,
              Contents:      fileBuf,
       }
       return ic
}

func unixTimestamp() int64 {
       return time.Now().UnixNano()
}

func uploadByURLHandler(w http.ResponseWriter, r *http.Request) {
       var dataChan chan []byte
       var data []byte
       var errChan chan error
       var err error
       urlStr := r.URL.Query().Get("url")
       fileName := r.URL.Query().Get("name")
       targetContainer := r.URL.Query().Get("target_container")
       if targetContainer == "" {
              targetContainer = "defaultcontainer"
       }
       if (urlStr == "" && r.Method == "GET") || fileName == "" {
              http.Error(w, "Empty URL passed OR no fileName given", 404)
              return
       }
       token := "abc"
       uid := 1

       if r.Method == "GET" {
              s = unixTimestamp()
              u, err := url.Parse(urlStr)
              dataChan, errChan = AddDownloadJob(dlJobCollector, u.String())
              log.Println("Waiting for Download (doneChan)")
              err = <-errChan
              if err != nil {
                     log.Println("Error while downloading URL", err)
                     http.Error(w, "Error while downloading URL", 500)
                     return
              }
              log.Println("Waiting for Download (dataChan)")
              data = <-dataChan
              close(dataChan)
       } else {
              _, data, err = readFile(r)
       }

       ext := _MIMETypeToFileExtention(http.DetectContentType(data))
       fileName = fmt.Sprintf("%s.%s", fileName, ext)
       ic := NewImageContainerSimple(data, 0, fileName, targetContainer, defaultImageType, ext, "")
       errChan = AddUploadJob(uploadJobCollector, ic)
       err = <-errChan
       close(errChan)
       if err != nil {
              http.Error(w, "Error while uploading downloaded file from URL", 500)
              return
       }

       resp, err := getDownloadURLResponse(fileName) // get JSON
       w.Write(resp)
       return
}

Go does not recycle goroutine id’s. You should use the /debug/pprof/goroutine page to dump the goroutines in use in your program.

I updated to Go 1.6, ran 20k Requests against the server, Routines went up, and are now down to
goroutine profile: total 85 again
Waited 7 minutes, gc seems not to give back ressources back to the OS (currently using Resource monitor, gctrace disabled).

goroutine profile: total 85
1 @ 0x7c9388 0x7c9163 0x7c4a94 0x66156e 0x661780 0x51739a 0x518d4d 0x51981e 0x5160fe 0x495ba1
#	0x7c9388	runtime/pprof.writeRuntimeProfile+0xb8	/home/goapp/go/src/runtime/pprof/pprof.go:545
#	0x7c9163	runtime/pprof.writeGoroutine+0x93	/home/goapp/go/src/runtime/pprof/pprof.go:507
#	0x7c4a94	runtime/pprof.(*Profile).WriteTo+0xd4	/home/goapp/go/src/runtime/pprof/pprof.go:236
#	0x66156e	net/http/pprof.handler.ServeHTTP+0x37e	/home/goapp/go/src/net/http/pprof/pprof.go:210
#	0x661780	net/http/pprof.Index+0x200		/home/goapp/go/src/net/http/pprof/pprof.go:222
#	0x51739a	net/http.HandlerFunc.ServeHTTP+0x3a	/home/goapp/go/src/net/http/server.go:1618
#	0x518d4d	net/http.(*ServeMux).ServeHTTP+0x17d	/home/goapp/go/src/net/http/server.go:1910
#	0x51981e	net/http.serverHandler.ServeHTTP+0x19e	/home/goapp/go/src/net/http/server.go:2081
#	0x5160fe	net/http.(*conn).serve+0xf2e		/home/goapp/go/src/net/http/server.go:1472

1 @ 0x464e93 0x45f94e 0x45ee10 0x6d999a 0x6d9a06 0x6dd69c 0x6fafad 0x51a9d1 0x519b69 0x5199b6 0x51a0b8 0x427125 0x41c6c6 0x464ab0 0x495ba1
#	0x45ee10	net.runtime_pollWait+0x60			/home/goapp/go/src/runtime/netpoll.go:160
#	0x6d999a	net.(*pollDesc).Wait+0x3a			/home/goapp/go/src/net/fd_poll_runtime.go:73
#	0x6d9a06	net.(*pollDesc).WaitRead+0x36			/home/goapp/go/src/net/fd_poll_runtime.go:78
#	0x6dd69c	net.(*netFD).accept+0x27c			/home/goapp/go/src/net/fd_unix.go:426
#	0x6fafad	net.(*TCPListener).AcceptTCP+0x4d		/home/goapp/go/src/net/tcpsock_posix.go:254
#	0x51a9d1	net/http.tcpKeepAliveListener.Accept+0x41	/home/goapp/go/src/net/http/server.go:2427
#	0x519b69	net/http.(*Server).Serve+0x129			/home/goapp/go/src/net/http/server.go:2117
#	0x5199b6	net/http.(*Server).ListenAndServe+0x136		/home/goapp/go/src/net/http/server.go:2098
#	0x51a0b8	net/http.ListenAndServe+0x98			/home/goapp/go/src/net/http/server.go:2195
#	0x427125	main.startServer+0xdb5				/home/goapp/goapps/src/microservice/routes.go:35
#	0x41c6c6	main.main+0x19e6				/home/goapp/goapps/src/microservice/main.go:207
#	0x464ab0	runtime.main+0x2b0				/home/goapp/go/src/runtime/proc.go:188

1 @ 0x495ba1

1 @ 0x4451ee 0x478322 0x7c39b8 0x495ba1
#	0x478322	os/signal.signal_recv+0x132	/home/goapp/go/src/runtime/sigqueue.go:116
#	0x7c39b8	os/signal.loop+0x18		/home/goapp/go/src/os/signal/signal_unix.go:22

1 @ 0x464e93 0x464f54 0x43b31f 0x43ae6b 0x568415 0x495ba1
#	0x568415	database/sql.(*DB).connectionOpener+0x45	/home/goapp/go/src/database/sql/sql.go:727

35 @ 0x464e93 0x473c37 0x473192 0x432809 0x495ba1
#	0x432809	main.(*UploadWorker).Run.func1+0xfd9	/home/goapp/goapps/src/microservice/uploadworker.go:113

1 @ 0x464e93 0x473c37 0x473192 0x43159c 0x495ba1
#	0x43159c	main.(*UploadJobCollector).Loop.func1+0x79c	/home/goapp/goapps/src/microservice/uploadworker.go:43

35 @ 0x464e93 0x473c37 0x473192 0x43093c 0x495ba1
#	0x43093c	main.(*DownloadWorker).Run.func1+0x103c	/home/goapp/goapps/src/microservice/downloadworker.go:166

1 @ 0x464e93 0x473c37 0x473192 0x42f4f4 0x495ba1
#	0x42f4f4	main.(*DownloadJobCollector).Loop.func1+0x7e4	/home/goapp/goapps/src/microservice/downloadworker.go:64

4 @ 0x464e93 0x45f94e 0x45ee10 0x6d999a 0x6d9a06 0x6db85a 0x6ef9f4 0x698c9c 0x699561 0x69e2b7 0x52c737 0x542db0 0x7265f9 0x72681c 0x528c57 0x495ba1
#	0x45ee10	net.runtime_pollWait+0x60		/home/goapp/go/src/runtime/netpoll.go:160
#	0x6d999a	net.(*pollDesc).Wait+0x3a		/home/goapp/go/src/net/fd_poll_runtime.go:73
#	0x6d9a06	net.(*pollDesc).WaitRead+0x36		/home/goapp/go/src/net/fd_poll_runtime.go:78
#	0x6db85a	net.(*netFD).Read+0x23a			/home/goapp/go/src/net/fd_unix.go:250
#	0x6ef9f4	net.(*conn).Read+0xe4			/home/goapp/go/src/net/net.go:172
#	0x698c9c	crypto/tls.(*block).readFromUntil+0xcc	/home/goapp/go/src/crypto/tls/conn.go:460
#	0x699561	crypto/tls.(*Conn).readRecord+0x2d1	/home/goapp/go/src/crypto/tls/conn.go:562
#	0x69e2b7	crypto/tls.(*Conn).Read+0x167		/home/goapp/go/src/crypto/tls/conn.go:939
#	0x52c737	net/http.noteEOFReader.Read+0x67	/home/goapp/go/src/net/http/transport.go:1683
#	0x542db0	net/http.(*noteEOFReader).Read+0xd0	<autogenerated>:284
#	0x7265f9	bufio.(*Reader).fill+0x1e9		/home/goapp/go/src/bufio/bufio.go:97
#	0x72681c	bufio.(*Reader).Peek+0xcc		/home/goapp/go/src/bufio/bufio.go:132
#	0x528c57	net/http.(*persistConn).readLoop+0x177	/home/goapp/go/src/net/http/transport.go:1069

4 @ 0x464e93 0x473c37 0x473192 0x52a6a2 0x495ba1
#	0x52a6a2	net/http.(*persistConn).writeLoop+0x472	/home/goapp/go/src/net/http/transport.go:1273

Full GoRoutine Stack Dump: https://gist.github.com/daemonfire300/46c72c3080d837f6511c

EDIT
goroutine 73 [select]:
main.(*DownloadWorker).Run.func1(0xc82015eb10)

These are intended, the long running “workers”, I just wonder if the memory in these gets no GCed for some weird reason. The select in the worker runs in a for loop and the select case that receives a job locally allocates a []byte, but shouldn’t that buffer be released when there is noone using it anymore? I mean I pass that []byte back to the http handler and then into another goroutine, but after that the []byte shouldn’t be referenced anymore by anyone, or do I have to explicitely free this []byte because I pass it around so “weirdly” between routines?

85 Goroutines does not look unreasonable.

Waited 7 minutes, gc seems not to give back ressources back to the OS (currently using Resource monitor, gctrace disabled).

The go runtime cannot give memory back to the operating system. The runtime can only advise the operating system that parts of the heap are unused and the operating system can reclaim them if it wishes. Operating systems, especially linux, can and do regularly ignore this advice.

If you want to know the real memory usage of your Go process, use the information from /debug/pprof

1 Like

goroutine 73 [select]:
main.(*DownloadWorker).Run.func1(0xc82015eb10)

These are intended, the long running “workers”, I just wonder if the
memory in these gets no GCed for some weird reason. The select in the
worker runs in a for loop and the select case that receives a job
locally allocates a []byte, but shouldn’t that buffer be released when
there is noone using it anymore? I mean I pass that []byte back to the
http handler and then into another goroutine, but after that the []byte
shouldn’t be referenced anymore by anyone, or do I have to explicitely
free this []byte because I pass it around so “weirdly” between routines?

(pprof) top 15
550.45MB of 556.95MB total (98.83%)
Dropped 522 nodes (cum <= 2.78MB)
      flat  flat%   sum%        ■■■   ■■■%
  550.45MB 98.83% 98.83%   550.45MB 98.83%  bytes.makeSlice
         0     0% 98.83%   550.45MB 98.83%  bytes.(*Buffer).ReadFrom
         0     0% 98.83%   550.45MB 98.83%  io.Copy
         0     0% 98.83%   550.45MB 98.83%  io.copyBuffer
         0     0% 98.83%   550.45MB 98.83%  main.(*DownloadWorker).Run.func1
         0     0% 98.83%   550.45MB 98.83%  main.downloadFromURL
         0     0% 98.83%   554.95MB 99.64%  runtime.goexit

I wonder how I can make go not accumulate that memory right there if the Http Request finishes. Maybe I should create a pool of size WorkerJobLimit with []byte 's that each Job requests so that I have a fixed amount of []byte s that live on, and the worst case scenario is that a worker downloads a large file growing a []byte large, but never allocating more than WorkerJobLimit []bytes

Something likes this:

-> Fetch Job
-> Remove []byte from Pool
-> Download into []byte
-> Pass []byte to HttpHandler
-> Pass []byte to UploadJob
-> Add []byte back to the Pool

.   550.45MB    169:                           buf, err := downloadFromURL(job.URL.String())

I recommend against pooling []byte slices, that is what the garbage collector is for. Compared to network transit time, gc times are not your bottleneck.

1 Like

If the above code was too rubish (it was already midnight in my timezone) I can compose a new example and make it runnable.

So I re-wrote it a bit, now it is runnable, the only thing you would need to provide is a Azure BlobStorage Account+Key, but you can replace the upload function with an upload to something else.

package main

import (
    "bytes"
    "encoding/base64"
    "encoding/json"
    "errors"
    "fmt"
    "io"
    "log"
    "math"
    "net/http"
    "net/url"
    "strconv"
    "strings"
    "time"

    _ "expvar"

    _ "net/http/pprof"

    "github.com/daemonfire300/azure-sdk-for-go/storage"
    _ "github.com/go-sql-driver/mysql"
    "github.com/gorilla/mux"
)

var defaultHttpClient *http.Client

var uploadJobCollector *UploadJobCollector
var dlJobCollector *DownloadJobCollector

func main() {
    defaultHttpClient = &http.Client{
        Timeout: 180 * time.Second,
    }

    uploadJobCollector = NewUploadJobCollector()
    SetupUploadWorkers(uploadJobCollector, WorkerLimit)
    uploadJobCollector.Loop()

    dlJobCollector = NewDownloadJobCollector()
    SetupDownloadWorkers(dlJobCollector, DownloadWorkerLimit)
    dlJobCollector.Loop()

    r := mux.NewRouter()
    r.HandleFunc("/admin/download", contentTypeWrapper(uploadByURLHandler, "application/json")).Methods("GET")
    port := "1234"
    http.ListenAndServe(":"+port, nil)
}

func contentTypeWrapper(handler http.HandlerFunc, contentType string) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Content-Type", contentType)
        handler(w, r)
    }
}

func uploadServiceUpload(containerName string, blobName string, blobBytesPtr []byte, byteSize int, chunkCount int) error {
    var blockList []storage.Block
    var idx []byte
    var err error
    cl, _ := storage.NewBasicClient("yourAzureAccountName", "yourAzureAccountKey")
    bs := cl.GetBlobService()
    blobBytes := blobBytesPtr
    ok, err := bs.CreateContainerIfNotExists(containerName, storage.ContainerAccessTypeBlob)
    if ok == true {
        // created new container
    }
    contentType := http.DetectContentType(blobBytes)
    log.Println("Creating blockblob in", "Container", containerName, "Blobname", blobName, len(blobName), "Content-Type:", contentType)
    err = bs.CreateBlockBlobWithContentType(containerName, blobName, contentType) // I forked https://github.com/daemonfire300/azure-sdk-for-go/blob/master/storage/blob.go
    //err = bs.CreateBlockBlob(containerName, blobName) // official Go Azure SDK

    for i := 0; i < chunkCount; i++ {
        idx = []byte("idx" + strconv.Itoa(i))
        err = bs.PutBlock(containerName, blobName, base64.StdEncoding.EncodeToString(idx), blobBytes[(byteSize/chunkCount)*i:(byteSize/chunkCount)*(i+1)])
        if err != nil {
            log.Println("Err: BlockID", string(idx), base64.StdEncoding.EncodeToString(idx))
        }
        block := &storage.Block{}
        block.ID = base64.StdEncoding.EncodeToString(idx)
        block.Status = storage.BlockStatusUncommitted
        blockList = append(blockList, *block)
    }
    //err = bs.PutBlockList(containerName, blobName, blockList)
    err = bs.PutBlockListWithContentType(containerName, blobName, blockList, contentType)
    return err
}

func handleUploadRequest(ic ImageContainer) error {
    fileSize := len(ic.Contents)
    if fileSize == 0 {
        return errors.New("[UPLOAD ERROR] Empty file, cannot upload")
    }
    chunkCount := int(math.Ceil(float64(fileSize) / (float64(4) * math.Pow(10, 6))))
    err := uploadServiceUpload(ic.ContainerName, ic.FileName, ic.Contents, fileSize, chunkCount)
    if err == nil {
        return err
    }
    return err
}

const (
    WorkerLimit    = 35
    WorkerJobLimit = 35
)

type UploadJob struct {
    ImageContainer ImageContainer
    Done           chan error
}

type UploadWorker struct {
    ID        int
    Job       chan UploadJob
    JobQueue  chan chan UploadJob
    Quit      chan bool
    PanicChan chan int
}

type UploadJobCollector struct {
    JobQueue    chan UploadJob
    WorkerQueue chan chan UploadJob
    Workers     map[int]*UploadWorker
    WorkerPanic chan int
}

func AddUploadJob(c *UploadJobCollector, ic ImageContainer) chan error {
    uplDone := make(chan error)
    c.Collect(NewUploadJob(ic, uplDone))
    return uplDone
}

func (c *UploadJobCollector) Loop() {
    go func() {
        for {
            select {
            case job := <-c.JobQueue:
                log.Println("Job IN")
                go func() {
                    worker := <-c.WorkerQueue
                    log.Println("Job SEND")
                    worker <- job
                }()
            case panickedWorker := <-c.WorkerPanic:
                log.Println(fmt.Sprintln("Worker %d panicked, killing now and setting up new", panickedWorker))
                c.Workers[panickedWorker].Stop()
                delete(c.Workers, panickedWorker)
                w := NewUploadWorker(panickedWorker, c.WorkerQueue, c.WorkerPanic)
                c.Workers[panickedWorker] = w
                w.Run()
            }
        }
    }()
}

func (c *UploadJobCollector) Collect(job UploadJob) {
    c.JobQueue <- job
}

func NewUploadJobCollector() (c *UploadJobCollector) {
    c = &UploadJobCollector{
        JobQueue:    make(chan UploadJob, WorkerJobLimit),
        WorkerQueue: make(chan chan UploadJob, WorkerLimit),
        Workers:     make(map[int]*UploadWorker),
        WorkerPanic: make(chan int, 1),
    }
    return
}

func NewUploadWorker(id int, jobQueue chan chan UploadJob, panicChan chan int) (w *UploadWorker) {
    w = &UploadWorker{
        ID:        id,
        Job:       make(chan UploadJob, 1),
        JobQueue:  jobQueue,
        Quit:      make(chan bool),
        PanicChan: panicChan,
    }
    return
}

func NewUploadJob(ic ImageContainer, done chan error) (j UploadJob) {
    j = UploadJob{
        ImageContainer: ic,
        Done:           done,
    }
    return
}

func SetupUploadWorkers(c *UploadJobCollector, nWorkers int) {
    for i := 0; i < nWorkers; i++ {
        w := NewUploadWorker(i, c.WorkerQueue, c.WorkerPanic)
        c.Workers[i] = w
        w.Run()
    }
}

func (w *UploadWorker) Name() string {
    return fmt.Sprintf("UploadWorker#%d", w.ID)
}

func (w *UploadWorker) Run() {
    go func() {

        for {
            w.JobQueue <- w.Job
            select {
            case job := <-w.Job:
                defer func() {
                    if panicErr := recover(); panicErr != nil {
                        workerFail(w)
                        log.Println("UploadWorker has panicked, sending kill comand")
                        close(w.Job)
                        job.Done <- panicErr.(error)
                        w.PanicChan <- w.ID

                    }
                }()
                ic := job.ImageContainer
                log.Println(fmt.Sprintf("Worker %d starting work on %s for user %d with token %s", w.ID, ic.FileName, ic.UserID, ic.ContainerName))
                uploadedBytes := len(job.ImageContainer.Contents)
                err := handleUploadRequest(ic)
                if err != nil {
                    workerFail(w)
                    log.Println(fmt.Sprintf("Worker %d", w.ID), err)
                }
                job.Done <- err
                log.Println(fmt.Sprintf("Worker %d finished upload of %s, uploaded %d bytes", w.ID, job.ImageContainer.URL, uploadedBytes))
            case <-w.Quit:
                log.Println(fmt.Sprintf("Stopping Worker %d at %s", w.ID, time.Now().Format(time.RFC1123)))
                return
            }
        }
    }()
}

func (w *UploadWorker) Stop() {
    go func() {
        w.Quit <- true
    }()
}

const (
    DownloadWorkerLimit    = 35
    DownloadWorkerJobLimit = 35
)

type Worker interface {
    Name() string
}

type DownloadJob struct {
    URL  *url.URL
    Done chan error
    Data chan []byte
}

type DownloadWorker struct {
    ID        int
    Job       chan DownloadJob
    JobQueue  chan chan DownloadJob
    Quit      chan bool
    PanicChan chan int
}

type DownloadJobCollector struct {
    JobQueue    chan DownloadJob
    WorkerQueue chan chan DownloadJob
    Workers     map[int]*DownloadWorker
    WorkerPanic chan int
}

func (j *DownloadJob) SendDone(err error) {
    j.Done <- err
}

func (j *DownloadJob) SendData(data []byte) {
    j.Data <- data
}

func AddDownloadJob(c *DownloadJobCollector, urlString string) (chan []byte, chan error) {
    uplDone := make(chan error)
    data := make(chan []byte)
    u, err := url.Parse(urlString)
    if err != nil {
        uplDone <- err
        return data, uplDone
    }
    c.Collect(NewDownloadJob(u, uplDone, data))
    return data, uplDone
}

func (c *DownloadJobCollector) Loop() {
    go func() {
        for {
            select {
            case job := <-c.JobQueue:
                log.Println("Job IN")
                go func() {
                    worker := <-c.WorkerQueue
                    log.Println("Job SEND")
                    worker <- job
                }()
            case panickedWorker := <-c.WorkerPanic:
                log.Println(fmt.Sprintln("Worker %d panicked, killing now and setting up new", panickedWorker))
                c.Workers[panickedWorker].Stop()
                delete(c.Workers, panickedWorker)
                w := NewDownloadWorker(panickedWorker, c.WorkerQueue, c.WorkerPanic)
                c.Workers[panickedWorker] = w
                w.Run()
            }
        }
    }()
}

func (c *DownloadJobCollector) Collect(job DownloadJob) {
    c.JobQueue <- job
}

func NewDownloadJobCollector() (c *DownloadJobCollector) {
    c = &DownloadJobCollector{
        JobQueue:    make(chan DownloadJob, WorkerJobLimit),
        WorkerQueue: make(chan chan DownloadJob, WorkerLimit),
        Workers:     make(map[int]*DownloadWorker),
        WorkerPanic: make(chan int, 1),
    }
    return
}

func NewDownloadWorker(id int, jobQueue chan chan DownloadJob, panicChan chan int) (w *DownloadWorker) {
    w = &DownloadWorker{
        ID:        id,
        Job:       make(chan DownloadJob, 1),
        JobQueue:  jobQueue,
        Quit:      make(chan bool),
        PanicChan: panicChan,
    }
    return
}

func NewDownloadJob(u *url.URL, done chan error, data chan []byte) (j DownloadJob) {
    j = DownloadJob{
        URL:  u,
        Done: done,
        Data: data,
    }
    return
}

func SetupDownloadWorkers(c *DownloadJobCollector, nWorkers int) {
    for i := 0; i < nWorkers; i++ {
        w := NewDownloadWorker(i, c.WorkerQueue, c.WorkerPanic)
        c.Workers[i] = w
        w.Run()
    }
}

func (w *DownloadWorker) Name() string {
    return fmt.Sprintf("DownloadWorker#%d", w.ID)
}

type NoError struct {
}

func (e *NoError) Error() string {
    return "No Error"
}

func (w *DownloadWorker) Run() {
    go func() {
        defer func() {
            if panicErr := recover(); panicErr != nil {
                workerFail(w)
                log.Println("Worker has panicked, sending kill comand")
                waited := 0
                for {
                    select {
                    case job := <-w.Job:
                        if waited > 4 {
                            job.SendDone(panicErr.(error))
                            job.SendData(nil)
                            close(w.Job)
                            w.PanicChan <- w.ID
                            break
                        }

                    default:
                        time.Sleep(time.Millisecond * 100)
                        waited++
                    }
                }

            }
        }()

        for {
            w.JobQueue <- w.Job
            select {
            case job := <-w.Job:
                log.Println(fmt.Sprintf("Worker %d starting download of %s", w.ID, job.URL.String()))
                buf, err := downloadFromURL(job.URL.String())
                if err != nil {
                    workerFail(w)
                    job.SendDone(err)
                    job.SendData(buf.Bytes())
                }
                if buf.Len() == 0 {
                    workerFail(w)
                    job.SendDone(errors.New("Empty file"))
                    job.SendData(buf.Bytes())
                }
                job.SendDone(nil)
                job.SendData(buf.Bytes())
                log.Println(fmt.Sprintf("Worker %d finished download of %s", w.ID, job.URL.String()))
            case <-w.Quit:
                log.Println(fmt.Sprintf("Stopping Worker %d at %s", w.ID, time.Now().Format(time.RFC1123)))
                return
            }
        }
    }()
}

func workerFail(w Worker) {
    log.Println(fmt.Sprintf("Worker %s failed to run job", w.Name()))
}

func (w *DownloadWorker) Stop() {
    w.Quit <- true
}

func fileReadAppend(file io.Reader, partialBuf []byte) ([]byte, error) {
    var fileBuf []byte
    fileBuf = append(fileBuf, partialBuf...)
    for {
        n, err := file.Read(partialBuf)
        if err != nil && err != io.EOF {
            return nil, err
        }
        if n == 0 {
            break
        }
        fileBuf = append(fileBuf, partialBuf...)
    }
    return fileBuf, nil
}

func fileReadPartial(file io.Reader, bufSize int) ([]byte, error) {
    partialBuf := make([]byte, bufSize)
    n, err := file.Read(partialBuf)
    if err != nil {
        return nil, errors.New("Error while reading file")
    }
    if n == 0 {
        return nil, errors.New("Could not read file of byte size 0")
    }
    return partialBuf, nil
}

func readFile(r *http.Request) (string, []byte, error) {
    file, _, err := r.FormFile("picture")
    if err != nil {
        file, _, err = r.FormFile("file")
    }
    if err != nil {
        return "", nil, err
    }
    defer file.Close()
    var fileBuf []byte
    partialBuf, err := fileReadPartial(file, 512)
    if err != nil {
        return "", nil, err
    }
    contentType := http.DetectContentType(partialBuf)
    fileExtension := _MIMETypeToFileExtention(contentType)
    fileBuf, err = fileReadAppend(file, partialBuf)
    if err != nil {
        return "", nil, err
    }
    return fileExtension, fileBuf, err
}

func downloadFromURL(urlString string) (*bytes.Buffer, error) {
    if urlString == "" {
        return bytes.NewBuffer(make([]byte, 0)), errors.New("Empty URL String for download")
    }
    tokens := strings.Split(urlString, "/")
    fileName := tokens[len(tokens)-1]
    log.Println("Downloading", urlString, "with fileName", fileName)

    req, err := http.NewRequest("GET", urlString, nil)
    if err != nil {
        return bytes.NewBuffer(make([]byte, 0)), errors.New("Error while creating download request")
    }
    req.Header.Set("Connection", "close")
    req.Close = true
    /*client := &http.Client{
           Timeout: 45 * time.Second,
    }*/
    s := unixTimestamp()
    //response, err := client.Do(req)
    response, err := defaultHttpClient.Do(req)

    //response, err := http.Get(urlString)

    if err != nil {
        log.Println("Error while downloading", urlString, "-", err)
        return bytes.NewBuffer(make([]byte, 0)), errors.New("Error while downloading")
    }
    defer func() {
        log.Println("Closing http body")
        response.Body.Close()
    }()
    buf := bytes.NewBuffer(make([]byte, 32*1024))
    //rawBuf, err := ioutil.ReadAll(response.Body)
    _, err = io.Copy(buf, response.Body)
    //buf := bytes.NewBuffer(rawBuf)
    if err != nil {
        log.Println("Error while downloading copy from response.Body")
        return buf, err
    }

    e := float64((unixTimestamp() - s)) * math.Pow10(-9)
    log.Println(buf.Len(), fmt.Sprintf("bytes downloaded. Taking %f seconds", e))
    return buf, err
}

func _MIMETypeToFileExtention(mimetype string) string {
    var extension string
    switch mimetype {
    case "image/jpg":
        extension = "jpeg"
        break
    case "image/jpeg":
        extension = "jpeg"
        break
    case "image/png":
        extension = "png"
        break
    case "image/gif":
        extension = "gif"
        break
    default:
        extension = "jpg"
    }
    return extension
}

type ImageContainer struct {
    IsOnline      bool
    BackupPath    string
    ContainerName string
    FileName      string
    URL           string
    ImageType     string
    Extension     string
    UserID        int64
    ID            int64
    Contents      []byte
}

func NewImageContainerSimple(fileBuf []byte, userID int64, fileName string, containerName string, imageType string, extension string, url string) ImageContainer {
    ic := ImageContainer{
        IsOnline:      false,
        BackupPath:    "",
        ContainerName: containerName,
        FileName:      fileName,
        URL:           url,
        ImageType:     imageType,
        Extension:     extension,
        UserID:        userID,
        Contents:      fileBuf,
    }
    return ic
}

func unixTimestamp() int64 {
    return time.Now().UnixNano()
}

func uploadByURLHandler(w http.ResponseWriter, r *http.Request) {
    var dataChan chan []byte
    var data []byte
    var errChan chan error
    var err error
    urlStr := r.URL.Query().Get("url")
    fileName := r.URL.Query().Get("name")
    targetContainer := r.URL.Query().Get("target_container")
    if targetContainer == "" {
        targetContainer = "defaultcontainer"
    }
    if (urlStr == "" && r.Method == "GET") || fileName == "" {
        http.Error(w, "Empty URL passed OR no fileName given", 404)
        return
    }

    if r.Method == "GET" {
        u, err := url.Parse(urlStr)
        dataChan, errChan = AddDownloadJob(dlJobCollector, u.String())
        log.Println("Waiting for Download (doneChan)")
        err = <-errChan
        if err != nil {
            log.Println("Error while downloading URL", err)
            http.Error(w, "Error while downloading URL", 500)
            return
        }
        log.Println("Waiting for Download (dataChan)")
        data = <-dataChan
        close(dataChan)
    } else {
        _, data, err = readFile(r)
    }

    ext := _MIMETypeToFileExtention(http.DetectContentType(data))
    fileName = fmt.Sprintf("%s.%s", fileName, ext)
    ic := NewImageContainerSimple(data, 0, fileName, targetContainer, "defaultImageType", ext, "")
    errChan = AddUploadJob(uploadJobCollector, ic)
    err = <-errChan
    close(errChan)
    if err != nil {
        http.Error(w, "Error while uploading downloaded file from URL", 500)
        return
    }

    resp, err := json.Marshal(map[string]interface{}{
        "abc": "dec",
    }) // get JSON
    w.Write(resp)
    return
}

Thank you for posting your updated code sample. I tried to comment, but it appears that I cannot quote reply to code blocks.

Here are two suggestions

  1. defer statements are function scoped. Every time you write a defer in a for loop you are consuming memory to store that frame.
    Looking further, the pattern of using defer to fix panics is suspect and I think you should move away from that immediately. If your code panics, you should fix that, not work around it. See the next point for the probable cause
  2. The error handling in this example is significantly worse than the previous.

My recommendation, before going further is to

a. remove the defer panic helper code entirely, it is not needed. Then address any code that was previously being masked by this panic handler.
b. Fix all your error handling, which will probably resolve a.

Hi there,

I found a solution. I removed the defers and moved the logic inside the select job case into another function and now the program stays constantly at 3-6MB RAM.

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.