Hello,
I’m trying to consume an HTTP response using the transfer-encoding : chunked. I thought that I would be able to read chunk by chunk when they appear in the body.
But instead, It appears that reading the body is only available when the server has finished to send the last chunk.
I’m pretty sure that I miss something when setting up my client, because when I try to request my server using curl, it works well and curl prints the chunk directly when the server send it.
Here is my set up:
Go : 1.10.1 windows/amd64
Send the request
import (
"net"
"net/http"
"time"
)
func Stream(objectSignature interface{}) (StreamInterface, error) {
roundTripper := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 60 * time.Second,
KeepAlive: 60 * time.Second,
}).DialContext,
}
httpClient := &http.Client{
Transport: roundTripper,
Timeout: 60 * time.Second,
}
httpRequest = http.NewRequest("GET", "http://localhost:8080/api/v1/steam", nil)
// set the accept content type
httpRequest.Header.Set("Accept", "application/json")
// send the request
resp, err := httpClient.Do(httpRequest)
if err != nil {
ctx := httpRequest.Context()
if ctx != nil {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
}
return nil, err
}
if resp.StatusCode != http.StatusOK {
defer resp.Body.Close()
return nil, fmt.Errorf("wrong status code %d",resp.StatusCode)
}
// initialize stream reader
dec := newDecoder(resp.Body)
return NewStream(dec), nil
}
Stream consumer
import (
"fmt"
"io"
"reflect"
"sync"
)
func newDecoder(body io.ReadCloser) *decoder {
return &decoder{
r: body,
}
}
type decoder struct {
r io.ReadCloser
}
func (d *decoder) close() error {
return d.r.Close()
}
func (d *decoder) decode() error {
buffer := make([]byte,256)
_, err := d.r.Read(buffer)
if err != nil {
fmt.Println(fmt.Sprintf("error : %s",err))
return err
}
fmt.Println(buffer)
return nil
}
type StreamInterface interface {
Stop()
}
func NewStream(source *decoder) StreamInterface {
stream := &streamImpl{
source: source,
result: make(chan model.Event),
mutex: sync.Mutex{},
}
go stream.receive()
return stream
}
type streamImpl struct {
StreamInterface
mutex sync.Mutex
source *decoder
result chan model.Event
stopped bool
}
func (s *streamImpl) Stop() {
s.mutex.Lock()
defer s.mutex.Unlock()
if !s.stopped {
s.stopped = true
}
}
func (s *streamImpl) isStopped() bool {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.stopped
}
//receive reads result from io, decodes the data and sends it to the result channel
func (s *streamImpl) receive() {
defer s.source.close()
for {
if s.isStopped() {
return
}
err := s.source.decode()
if err != nil {
return
}
}
}
On the server side
I use the code that you can see on echo : https://echo.labstack.com/cookbook/streaming-response.
For the demo I increase the time.sleep to 10 * time.Second
Result
Using curl :
curl -v http://localhost:8080/api/v1/stream
Curl log
- STATE: INIT => CONNECT handle 0x8af160; line 1392 (connection #-5000)
- Added connection 0. The cache now contains 1 members
- STATE: CONNECT => WAITRESOLVE handle 0x8af160; line 1428 (connection #0)
- Trying 127.0.0.1…
- TCP_NODELAY set
- STATE: WAITRESOLVE => WAITCONNECT handle 0x8af160; line 1509 (connection #0)
- Connected to localhost (127.0.0.1) port 8080 (#0)
- STATE: WAITCONNECT => SENDPROTOCONNECT handle 0x8af160; line 1561 (connection #0)
- Marked for [keep alive]: HTTP default
- STATE: SENDPROTOCONNECT => DO handle 0x8af160; line 1579 (connection #0)
GET /api/v1/prometheis/stream HTTP/1.1
Host: localhost:8080
User-Agent: curl/7.58.0
Accept: /
- STATE: DO => DO_DONE handle 0x8af160; line 1658 (connection #0)
- STATE: DO_DONE => WAITPERFORM handle 0x8af160; line 1783 (connection #0)
- STATE: WAITPERFORM => PERFORM handle 0x8af160; line 1799 (connection #0)
- HTTP 1.1 or later with persistent connection, pipelining supported
< HTTP/1.1 200 OK
< Content-Type: application/json
< Vary: Accept-Encoding
< Date: Wed, 31 Oct 2018 09:41:05 GMT
< Transfer-Encoding: chunked
{“Altitude”:-97,“Latitude”:37.819929,“Longitude”:-122.478255}
{“Altitude”:1899,“Latitude”:39.096849,“Longitude”:-120.032351}
{“Altitude”:2619,“Latitude”:37.865101,“Longitude”:-119.538329}
{“Altitude”:42,“Latitude”:33.812092,“Longitude”:-117.918974}
{“Altitude”:15,“Latitude”:37.77493,“Longitude”:-122.419416}
Using the go program:
Get http://localhost:8080/api/v1/stream: net/http: request canceled (Client.Timeout exceeded while awaiting headers)
If I reduce the time in the server, It waits 5 seconds before printing the byte
If anyone have an idea in what I did wrong, that’s will be great !
Thanks guys by advance for your help.
Best regards,