Consume HTTP response chunked

Hello,

I’m trying to consume an HTTP response using the transfer-encoding : chunked. I thought that I would be able to read chunk by chunk when they appear in the body.
But instead, It appears that reading the body is only available when the server has finished to send the last chunk.

I’m pretty sure that I miss something when setting up my client, because when I try to request my server using curl, it works well and curl prints the chunk directly when the server send it.

Here is my set up:

Go : 1.10.1 windows/amd64

Send the request

import (
   "net"
   "net/http"
   "time"
)

func Stream(objectSignature interface{}) (StreamInterface, error) {
   roundTripper := &http.Transport{
     Proxy: http.ProxyFromEnvironment,
		DialContext: (&net.Dialer{
			Timeout:   60 * time.Second,
			KeepAlive: 60 * time.Second,
		}).DialContext,
   }
   httpClient := &http.Client{
		Transport: roundTripper,
		Timeout:   60 * time.Second,
   }
   
   httpRequest = http.NewRequest("GET", "http://localhost:8080/api/v1/steam", nil)
   
   // set the accept content type
   httpRequest.Header.Set("Accept", "application/json")
   
   // send the request
   resp, err := httpClient.Do(httpRequest)
   
   if err != nil {
		ctx := httpRequest.Context()
		if ctx != nil {
			select {
			case <-ctx.Done():
				return nil, ctx.Err()
			default:
			}
		}
		return nil, err
	}
   
   if resp.StatusCode != http.StatusOK {
		defer resp.Body.Close()
		return nil, fmt.Errorf("wrong status code %d",resp.StatusCode)
   }
   
   // initialize stream reader
	dec := newDecoder(resp.Body)
	return NewStream(dec), nil
}

Stream consumer

import (
        "fmt"
        "io"
        "reflect"
        "sync"
)

func newDecoder(body io.ReadCloser) *decoder {
	return &decoder{
		r:      body,
	}
}

type decoder struct {
	r      io.ReadCloser
}

func (d *decoder) close() error {
	return d.r.Close()
}

func (d *decoder) decode() error {
	buffer := make([]byte,256)
	_, err := d.r.Read(buffer)
	if err != nil {
		fmt.Println(fmt.Sprintf("error : %s",err))
		return err
	}

	fmt.Println(buffer)
	return nil
}

type StreamInterface interface {
	Stop()
}

func NewStream(source *decoder) StreamInterface {
	stream := &streamImpl{
		source: source,
		result: make(chan model.Event),
		mutex:  sync.Mutex{},
	}

	go stream.receive()
	return stream
}

type streamImpl struct {
	StreamInterface
	mutex   sync.Mutex
	source  *decoder
	result  chan model.Event
	stopped bool
}

func (s *streamImpl) Stop() {
	s.mutex.Lock()
	defer s.mutex.Unlock()
	if !s.stopped {
		s.stopped = true
	}
}

func (s *streamImpl) isStopped() bool {
	s.mutex.Lock()
	defer s.mutex.Unlock()
	return s.stopped
}

//receive reads result from io, decodes the data and sends it to the result channel
func (s *streamImpl) receive() {
	defer s.source.close()
	for {
		if s.isStopped() {
			return
		}
		
		err := s.source.decode()
		if err != nil {
			return
		}
	}
}

On the server side

I use the code that you can see on echo : https://echo.labstack.com/cookbook/streaming-response.
For the demo I increase the time.sleep to 10 * time.Second

Result

Using curl :

curl -v http://localhost:8080/api/v1/stream

Curl log
  • STATE: INIT => CONNECT handle 0x8af160; line 1392 (connection #-5000)
  • Added connection 0. The cache now contains 1 members
  • STATE: CONNECT => WAITRESOLVE handle 0x8af160; line 1428 (connection #0)
  • Trying 127.0.0.1…
  • TCP_NODELAY set
  • STATE: WAITRESOLVE => WAITCONNECT handle 0x8af160; line 1509 (connection #0)
  • Connected to localhost (127.0.0.1) port 8080 (#0)
  • STATE: WAITCONNECT => SENDPROTOCONNECT handle 0x8af160; line 1561 (connection #0)
  • Marked for [keep alive]: HTTP default
  • STATE: SENDPROTOCONNECT => DO handle 0x8af160; line 1579 (connection #0)

GET /api/v1/prometheis/stream HTTP/1.1
Host: localhost:8080
User-Agent: curl/7.58.0
Accept: /

  • STATE: DO => DO_DONE handle 0x8af160; line 1658 (connection #0)
  • STATE: DO_DONE => WAITPERFORM handle 0x8af160; line 1783 (connection #0)
  • STATE: WAITPERFORM => PERFORM handle 0x8af160; line 1799 (connection #0)
  • HTTP 1.1 or later with persistent connection, pipelining supported
    < HTTP/1.1 200 OK
    < Content-Type: application/json
    < Vary: Accept-Encoding
    < Date: Wed, 31 Oct 2018 09:41:05 GMT
    < Transfer-Encoding: chunked

{“Altitude”:-97,“Latitude”:37.819929,“Longitude”:-122.478255}
{“Altitude”:1899,“Latitude”:39.096849,“Longitude”:-120.032351}
{“Altitude”:2619,“Latitude”:37.865101,“Longitude”:-119.538329}
{“Altitude”:42,“Latitude”:33.812092,“Longitude”:-117.918974}
{“Altitude”:15,“Latitude”:37.77493,“Longitude”:-122.419416}

Using the go program:

Get http://localhost:8080/api/v1/stream: net/http: request canceled (Client.Timeout exceeded while awaiting headers)

If I reduce the time in the server, It waits 5 seconds before printing the byte

If anyone have an idea in what I did wrong, that’s will be great !

Thanks guys by advance for your help.

Best regards,

It seems it’s because I activated the gzip compression at the api level. And it’s seems I have to gzip manually each chunk content

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.