/debug/pprof/
profiles:
0block
105goroutine
1129heap
11threadcreate
^--------/debug/pprof/
Using “boom” (like ab) to throw 50-100 concurrent requests at the server creates 600-1000 goroutines according to pprof.
As I am still on 1.5.3 I had to use GOTRACEBACK=crash instead of all
I dont know if go “recycles” goroutine “ids”, but the dump gave me routines 1-100+ and a lot of
goroutine 30366 [chan receive]:
goroutine 30367 [io wait]:
goroutine 30368 [chan select]:
I suppose it is not normal to have 30k Goroutines if I limit my worker pool, what I think I do.
I composed most of the code from multiple files into one file and stripped away any non related code. I use the azure-go-sdk with my own fork which currently only adds 1 header to the request specifying the Content-Type.
I hacked it a bit together from different files, so the imports are somewhat duplicated etc.
package main
import(
"encoding/base64"
"fmt"
"log"
"net/http"
"strconv"
"time"
"path/filepath"
"bytes"
"encoding/json"
"errors"
"fmt"
"log"
"math"
"net/http"
"net/url"
"sort"
"strconv"
"strings"
"text/template"
"time"
"bytes"
"crypto/sha1"
"errors"
"fmt"
"io"
"log"
"math"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
_ "expvar"
"os"
"runtime/debug"
"strconv"
_ "net/http/pprof"
"github.com/gorilla/mux"
"github.com/daemonfire300/azure-sdk-for-go/storage"
"github.com/fatih/color"
_ "github.com/go-sql-driver/mysql"
"github.com/pkg/profile"
)
var defaultHttpClient *http.Client
func main(){
defaultHttpClient = &http.Client{
Timeout: 180 * time.Second,
}
uploadJobCollector = NewUploadJobCollector()
SetupUploadWorkers(uploadJobCollector, WorkerLimit)
uploadJobCollector.Loop()
dlJobCollector = NewDownloadJobCollector()
SetupDownloadWorkers(dlJobCollector, DownloadWorkerLimit)
dlJobCollector.Loop()
r := mux.NewRouter()
r.HandleFunc("/admin/download", contentTypeWrapper(uploadByURLHandler, "application/json")).Methods("GET")
port := "1234"
http.ListenAndServe(":"+port, nil)
}
func contentTypeWrapper(handler http.HandlerFunc, contentType string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", contentType)
handler(w, r)
}
}
func UploadServiceUpload(containerName string, blobName string, blobBytesPtr []byte, byteSize int, chunkCount int) error {
var blockList []storage.Block
var idx []byte
var err error
cl, _ := storage.NewBasicClient("yourAzureAccountName", "yourAzureAccountKey")
bs := cl.GetBlobService()
blobBytes := blobBytesPtr
ec := &errorCollector{}
ok, err := bs.CreateContainerIfNotExists(containerName, storage.ContainerAccessTypeBlob)
if ok == true {
// created new container
}
ec.collect(err)
contentType := http.DetectContentType(blobBytes)
log.Println("Creating blockblob in", "Container", containerName, "Blobname", blobName, len(blobName), "Content-Type:", contentType)
err = bs.CreateBlockBlobWithContentType(containerName, blobName, contentType) // I forked https://github.com/daemonfire300/azure-sdk-for-go/blob/master/storage/blob.go
//err = bs.CreateBlockBlob(containerName, blobName) // official Go Azure SDK
ec.collect(err)
for i := 0; i < chunkCount; i++ {
idx = []byte("idx" + strconv.Itoa(i))
err = bs.PutBlock(containerName, blobName, base64.StdEncoding.EncodeToString(idx), blobBytes[(byteSize/chunkCount)*i:(byteSize/chunkCount)*(i+1)])
ec.collect(err)
if err != nil {
log.Println("Err: BlockID", string(idx), base64.StdEncoding.EncodeToString(idx))
}
block := &storage.Block{}
block.ID = base64.StdEncoding.EncodeToString(idx)
block.Status = storage.BlockStatusUncommitted
blockList = append(blockList, *block)
}
//err = bs.PutBlockList(containerName, blobName, blockList)
err = bs.PutBlockListWithContentType(containerName, blobName, blockList, contentType)
ec.collect(err)
return ec.first()
}
func handleUploadRequest(ic ImageContainer) error {
fileSize := len(ic.Contents)
if fileSize == 0 {
return errors.New("[UPLOAD ERROR] Empty file, cannot upload")
}
chunkCount := int(math.Ceil(float64(fileSize) / (float64(4) * math.Pow(10, 6))))
err := uploadServiceUpload(ic.ContainerName, ic.FileName, ic.Contents, fileSize, chunkCount)
if err == nil {
return err
}
return err
}
const (
WorkerLimit = 35
WorkerJobLimit = 35
)
type UploadJob struct {
ImageContainer ImageContainer
Done chan error
}
type UploadWorker struct {
ID int
Job chan UploadJob
JobQueue chan chan UploadJob
Quit chan bool
PanicChan chan int
}
type UploadJobCollector struct {
JobQueue chan UploadJob
WorkerQueue chan chan UploadJob
Workers map[int]*UploadWorker
WorkerPanic chan int
}
func AddUploadJob(c *UploadJobCollector, ic ImageContainer) chan error {
uplDone := make(chan error)
c.Collect(NewUploadJob(ic, uplDone))
return uplDone
}
func (c *UploadJobCollector) Loop() {
go func() {
for {
select {
case job := <-c.JobQueue:
log.Println("Job IN")
go func() {
worker := <-c.WorkerQueue
log.Println("Job SEND")
worker <- job
}()
case panickedWorker := <-c.WorkerPanic:
log.Println(fmt.Sprintln("Worker %d panicked, killing now and setting up new", panickedWorker))
c.Workers[panickedWorker].Stop()
delete(c.Workers, panickedWorker)
w := NewUploadWorker(panickedWorker, c.WorkerQueue, c.WorkerPanic)
c.Workers[panickedWorker] = w
w.Run()
}
}
}()
}
func (c *UploadJobCollector) Collect(job UploadJob) {
c.JobQueue <- job
}
func NewUploadJobCollector() (c *UploadJobCollector) {
c = &UploadJobCollector{
JobQueue: make(chan UploadJob, WorkerJobLimit),
WorkerQueue: make(chan chan UploadJob, WorkerLimit),
Workers: make(map[int]*UploadWorker),
WorkerPanic: make(chan int, 1),
}
return
}
func NewUploadWorker(id int, jobQueue chan chan UploadJob, panicChan chan int) (w *UploadWorker) {
w = &UploadWorker{
ID: id,
Job: make(chan UploadJob, 1),
JobQueue: jobQueue,
Quit: make(chan bool),
PanicChan: panicChan,
}
return
}
func NewUploadJob(ic ImageContainer, done chan error) (j UploadJob) {
j = UploadJob{
ImageContainer: ic,
Done: done,
}
return
}
func SetupUploadWorkers(c *UploadJobCollector, nWorkers int) {
for i := 0; i < nWorkers; i++ {
w := NewUploadWorker(i, c.WorkerQueue, c.WorkerPanic)
c.Workers[i] = w
w.Run()
}
}
func (w *UploadWorker) Name() string {
return fmt.Sprintf("UploadWorker#%d", w.ID)
}
func (w *UploadWorker) Run() {
go func() {
for {
w.JobQueue <- w.Job
select {
case job := <-w.Job:
defer func() {
if panicErr := recover(); panicErr != nil {
workerFail(w)
log.Println("UploadWorker has panicked, sending kill comand")
close(w.Job)
job.Done <- panicErr.(error)
w.PanicChan <- w.ID
}
}()
ic := job.ImageContainer
log.Println(fmt.Sprintf("Worker %d starting work on %s for user %d with token %s", w.ID, ic.FileName, ic.UserID, ic.ContainerName))
uploadedBytes := len(job.ImageContainer.Contents)
err := handleUploadRequest(ic)
if err != nil {
workerFail(w)
log.Println(fmt.Sprintf("Worker %d", w.ID), err)
}
job.Done <- err
log.Println(fmt.Sprintf("Worker %d finished upload of %s, uploaded %d bytes", w.ID, job.ImageContainer.URL, uploadedBytes))
case <-w.Quit:
log.Println(fmt.Sprintf("Stopping Worker %d at %s", w.ID, time.Now().Format(time.RFC1123)))
return
}
}
}()
}
func (w *UploadWorker) Stop() {
go func() {
w.Quit <- true
}()
}
const (
DownloadWorkerLimit = 35
DownloadWorkerJobLimit = 35
)
type Worker interface {
Name() string
}
type DownloadJob struct {
URL *url.URL
Done chan error
Data chan []byte
}
type DownloadWorker struct {
ID int
Job chan DownloadJob
JobQueue chan chan DownloadJob
Quit chan bool
PanicChan chan int
}
type DownloadJobCollector struct {
JobQueue chan DownloadJob
WorkerQueue chan chan DownloadJob
Workers map[int]*DownloadWorker
WorkerPanic chan int
}
func (j *DownloadJob) SendDone(err error) {
j.Done <- err
}
func (j *DownloadJob) SendData(data []byte) {
j.Data <- data
}
func AddDownloadJob(c *DownloadJobCollector, urlString string) (chan []byte, chan error) {
uplDone := make(chan error)
data := make(chan []byte)
u, err := url.Parse(urlString)
if err != nil {
uplDone <- err
return data, uplDone
}
c.Collect(NewDownloadJob(u, uplDone, data))
return data, uplDone
}
func (c *DownloadJobCollector) Loop() {
go func() {
for {
select {
case job := <-c.JobQueue:
log.Println("Job IN")
go func() {
worker := <-c.WorkerQueue
log.Println("Job SEND")
worker <- job
}()
case panickedWorker := <-c.WorkerPanic:
log.Println(fmt.Sprintln("Worker %d panicked, killing now and setting up new", panickedWorker))
c.Workers[panickedWorker].Stop()
delete(c.Workers, panickedWorker)
w := NewDownloadWorker(panickedWorker, c.WorkerQueue, c.WorkerPanic)
c.Workers[panickedWorker] = w
w.Run()
}
}
}()
}
func (c *DownloadJobCollector) Collect(job DownloadJob) {
c.JobQueue <- job
}
func NewDownloadJobCollector() (c *DownloadJobCollector) {
c = &DownloadJobCollector{
JobQueue: make(chan DownloadJob, WorkerJobLimit),
WorkerQueue: make(chan chan DownloadJob, WorkerLimit),
Workers: make(map[int]*DownloadWorker),
WorkerPanic: make(chan int, 1),
}
return
}
func NewDownloadWorker(id int, jobQueue chan chan DownloadJob, panicChan chan int) (w *DownloadWorker) {
w = &DownloadWorker{
ID: id,
Job: make(chan DownloadJob, 1),
JobQueue: jobQueue,
Quit: make(chan bool),
PanicChan: panicChan,
}
return
}
func NewDownloadJob(u *url.URL, done chan error, data chan []byte) (j DownloadJob) {
j = DownloadJob{
URL: u,
Done: done,
Data: data,
}
return
}
func SetupDownloadWorkers(c *DownloadJobCollector, nWorkers int) {
for i := 0; i < nWorkers; i++ {
w := NewDownloadWorker(i, c.WorkerQueue, c.WorkerPanic)
c.Workers[i] = w
w.Run()
}
}
func (w *DownloadWorker) Name() string {
return fmt.Sprintf("DownloadWorker#%d", w.ID)
}
type NoError struct {
}
func (e *NoError) Error() string {
return "No Error"
}
func (w *DownloadWorker) Run() {
go func() {
defer func() {
if panicErr := recover(); panicErr != nil {
workerFail(w)
log.Println("Worker has panicked, sending kill comand")
waited := 0
for {
select {
case job := <-w.Job:
if waited > 4 {
job.SendDone(panicErr.(error))
job.SendData(nil)
close(w.Job)
w.PanicChan <- w.ID
break
}
default:
time.Sleep(time.Millisecond * 100)
waited++
}
}
}
}()
for {
w.JobQueue <- w.Job
select {
case job := <-w.Job:
log.Println(fmt.Sprintf("Worker %d starting download of %s", w.ID, job.URL.String()))
buf, err := downloadFromURL(job.URL.String())
if err != nil {
workerFail(w)
job.SendDone(err)
job.SendData(buf.Bytes())
}
if buf.Len() == 0 {
workerFail(w)
job.SendDone(errors.New("Empty file"))
job.SendData(buf.Bytes())
}
job.SendDone(nil)
job.SendData(buf.Bytes())
log.Println(fmt.Sprintf("Worker %d finished download of %s", w.ID, job.URL.String()))
case <-w.Quit:
log.Println(fmt.Sprintf("Stopping Worker %d at %s", w.ID, time.Now().Format(time.RFC1123)))
return
}
}
}()
}
func workerFail(w Worker) {
log.Println(fmt.Sprintf("Worker %s failed to run job", w.Name()))
}
func (w *DownloadWorker) Stop() {
w.Quit <- true
}
func fileReadAppend(file io.Reader, partialBuf []byte) ([]byte, error) {
var fileBuf []byte
fileBuf = append(fileBuf, partialBuf...)
for {
n, err := file.Read(partialBuf)
if err != nil && err != io.EOF {
return nil, err
}
if n == 0 {
break
}
fileBuf = append(fileBuf, partialBuf...)
}
return fileBuf, nil
}
func readFile(r *http.Request) (string, []byte, error) {
file, _, err := r.FormFile("picture")
if err != nil {
file, _, err = r.FormFile("file")
}
if err != nil {
return "", nil, err
}
defer file.Close()
var fileBuf []byte
partialBuf, err := fileReadPartial(file, 512)
if err != nil {
return "", nil, err
}
contentType := http.DetectContentType(partialBuf)
fileExtension := _MIMETypeToFileExtention(contentType)
fileBuf, err = fileReadAppend(file, partialBuf)
if err != nil {
return "", nil, err
}
return fileExtension, fileBuf, err
}
func downloadFromURL(urlString string) (*bytes.Buffer, error) {
if urlString == "" {
return bytes.NewBuffer(make([]byte, 0)), errors.New("Empty URL String for download")
}
tokens := strings.Split(urlString, "/")
fileName := tokens[len(tokens)-1]
log.Println("Downloading", urlString, "with fileName", fileName)
req, err := http.NewRequest("GET", urlString, nil)
if err != nil {
return bytes.NewBuffer(make([]byte, 0)), errors.New("Error while creating download request")
}
req.Header.Set("Connection", "close")
req.Close = true
/*client := &http.Client{
Timeout: 45 * time.Second,
}*/
s := unixTimestamp()
//response, err := client.Do(req)
response, err := defaultHttpClient.Do(req)
//response, err := http.Get(urlString)
if err != nil {
log.Println("Error while downloading", urlString, "-", err)
return bytes.NewBuffer(make([]byte, 0)), errors.New("Error while downloading")
}
defer func(){
log.Println("Closing http body")
response.Body.Close()
}()
buf := bytes.NewBuffer(make([]byte, 32 * 1024))
//rawBuf, err := ioutil.ReadAll(response.Body)
_, err = io.Copy(buf, response.Body)
//buf := bytes.NewBuffer(rawBuf)
if err != nil {
log.Println("Error while downloading copy from response.Body")
return buf, err
}
e := float64((unixTimestamp() - s)) * math.Pow10(-9)
log.Println(buf.Len(), fmt.Sprintf("bytes downloaded. Taking %f seconds", e))
return buf, err
}
func _MIMETypeToFileExtention(mimetype string) string {
var extension string
switch mimetype {
case "image/jpg":
extension = "jpeg"
break
case "image/jpeg":
extension = "jpeg"
break
case "image/png":
extension = "png"
break
case "image/gif":
extension = "gif"
break
default:
extension = "jpg"
}
return extension
}
type ImageContainer struct {
IsOnline bool
BackupPath string
ContainerName string
FileName string
URL string
ImageType string
Extension string
UserID int64
ID int64
Contents []byte
}
func NewImageContainerSimple(fileBuf []byte, userID int64, fileName string, containerName string, imageType string, extension string, url string) ImageContainer {
ic := ImageContainer{
IsOnline: false,
BackupPath: "",
ContainerName: containerName,
FileName: fileName,
URL: url,
ImageType: imageType,
Extension: extension,
UserID: userID,
Contents: fileBuf,
}
return ic
}
func unixTimestamp() int64 {
return time.Now().UnixNano()
}
func uploadByURLHandler(w http.ResponseWriter, r *http.Request) {
var dataChan chan []byte
var data []byte
var errChan chan error
var err error
urlStr := r.URL.Query().Get("url")
fileName := r.URL.Query().Get("name")
targetContainer := r.URL.Query().Get("target_container")
if targetContainer == "" {
targetContainer = "defaultcontainer"
}
if (urlStr == "" && r.Method == "GET") || fileName == "" {
http.Error(w, "Empty URL passed OR no fileName given", 404)
return
}
token := "abc"
uid := 1
if r.Method == "GET" {
s = unixTimestamp()
u, err := url.Parse(urlStr)
dataChan, errChan = AddDownloadJob(dlJobCollector, u.String())
log.Println("Waiting for Download (doneChan)")
err = <-errChan
if err != nil {
log.Println("Error while downloading URL", err)
http.Error(w, "Error while downloading URL", 500)
return
}
log.Println("Waiting for Download (dataChan)")
data = <-dataChan
close(dataChan)
} else {
_, data, err = readFile(r)
}
ext := _MIMETypeToFileExtention(http.DetectContentType(data))
fileName = fmt.Sprintf("%s.%s", fileName, ext)
ic := NewImageContainerSimple(data, 0, fileName, targetContainer, defaultImageType, ext, "")
errChan = AddUploadJob(uploadJobCollector, ic)
err = <-errChan
close(errChan)
if err != nil {
http.Error(w, "Error while uploading downloaded file from URL", 500)
return
}
resp, err := getDownloadURLResponse(fileName) // get JSON
w.Write(resp)
return
}