Multiple web socket clients collide

I have created a web server app in Go. One of its functions is to provide on on-screen ticker in the browser. I use the gorilla web socket package to retrieve the prices, which in the stock market industry are referred to as ticks. Each has the stock symbol, the price, the volume and some other stuff. These ricks are accumulated. The browser does an sse call every 5 seconds or so and I send the latest ticks. I have this working pretty well and have been testing it for the past month. However, in the log, I see that I am occasionally getting errors. The call to WebSocketConnection.ReadJSON will return an error, websocket: close 1006 (abnormal closure): unexpected EOF. When this occurs, I close the connection and attempt to create a new one. Sometimes it works, sometimes it doesn’t. If it doesn’t, the error is ‘bad handshake’. So, I implemented a retry for creating the web socket connection. Eventually it usually succeeds and continues reading from the socket. If I activate 2 clients (go routines) accessing web sockets, it gets worse. More errors and it seems like at a given time, only one is succeeding to read and the other fails. Then the first one fails and the 2nd one succeeds.

So, my first question is: am I using the web socket correctly? Should I just ignore the errors and continue?

I extracted some code to exemplify this issue:

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"github.com/gorilla/websocket"
	"log"
	"time"
)

func main() {
	stockSymbol1 := "AAPL"
	ch := make(chan struct{})
	ctx, cancelFunc := context.WithCancel(context.Background())
	go webSocket1(ctx, stockSymbol1)

	stockSymbol2 := "TXN"
	go webSocket2(ctx, stockSymbol2)

	<-ch
	cancelFunc()
}

func webSocket1(ctx context.Context, symbol string) {

	readTicksForSymbol(ctx, symbol)
}

func webSocket2(ctx context.Context, symbol string) {

	readTicksForSymbol(ctx, symbol)
}

func readTicksForSymbol(ctx context.Context, symbol string) {
	reading := true

	go func() {
		select {
		case <-ctx.Done():
			reading = false
		}
	}()

	w, err := getFinnHubWebSocketConnection(symbol)
	if err != nil {
		log.Printf("error from getFinnHubWebSocketConnection: %s", err)
	}

	for reading {
		var msg interface{}
		err := w.ReadJSON(&msg)
		if err != nil {
			log.Println(fmt.Sprintf("error from finnHubRealTime.WebSocketConnection.ReadJSON: %s", err))
			w.Close()
			w, err = getFinnHubWebSocketConnection(symbol)
			if err != nil {
				log.Printf("%s error from getFinnHubWebSocketConnection: %s", symbol, err)
			}
		} else {
			msgMap, ok := msg.(map[string]interface{})
			if ok {
				msgType := msgMap["type"].(string)
				log.Printf("msgType: %s", msgType)
				// we need to process pings as well, since realtime.html needs to check when a new day occurs
				if msgType == "trade" {
					msgDataSlice := msgMap["data"].([]interface{})
					log.Printf("received %d dataElementInterface's", len(msgDataSlice))
					for _, dataElementInterface := range msgDataSlice {
						interfaceMap := dataElementInterface.(map[string]interface{})
						log.Printf("interfaceMap: %v", interfaceMap)
					}
				}
			}

		}

	}
}

func getFinnHubWebSocketConnection(symbol string) (*websocket.Conn, error) {

	var w *websocket.Conn
	var err error

	for retries := 0; retries < 100; retries++ {

		log.Println("getFinnHubWebSocketConnection")
		dialer := websocket.Dialer{
			NetDial:           nil,
			NetDialContext:    nil,
			Proxy:             nil,
			TLSClientConfig:   nil,
			HandshakeTimeout:  0,
			ReadBufferSize:    0,
			WriteBufferSize:   0,
			WriteBufferPool:   nil,
			Subprotocols:      nil,
			EnableCompression: false,
			Jar:               nil,
		}

		w, _, err = dialer.Dial("wss://ws.finnhub.io?token=xxxxxxxxxxxxxxxxx", nil)
		if err == nil {
			log.Println(fmt.Sprintf("successful websocket connection after try #: %v", retries))
			break
		} else {
			log.Println(fmt.Sprintf("%s wait 2 seconds after error in getFinnHubWebSocketConnection from dialer.Dial try #: %v error: %v", symbol, retries, err))
			time.Sleep(2 * time.Second)
		}
	}

	msg, _ := json.Marshal(map[string]interface{}{"type": "subscribe", "symbol": symbol})
	msgErr := w.WriteMessage(websocket.TextMessage, msg)
	if msgErr != nil {
		log.Printf("error submitting msg to websocket: %v", msgErr)
	}

	var errorMessage string
	if err != nil {
		errorMessage = fmt.Sprintf("error in getFinnHubWebSocketConnection from dialer.Dial: %v", err)
		return w, errors.New(errorMessage)
	} else {
		return w, nil
	}
}

Any help would be appreciated.

Hi! I don’t know finnhub apis but I think the problem is related to server limitations. What happens is common to many exchanges and markets I’ve used in time: your connection is closed due to time limit exceeded (about working sockets indeed, there’s a duration limit in 99% of platforms in order to not to monopolize server resources…that EOF). The ‘bad handshake’ comes from the other limitation about opening a new connection from the same IP right after your previous connection has been closed because in that instant you’re in the “black list”, which keep your token-IP for seconds or minutes. Usually you can’t have more than one connection at a time on the same token/account.
I advise you to keep an eye on the platform limitations (sometimes not adequately explained) and try to delay your next connection attempt. Note: if keeping your connection always alive is necessary, close the connection at a certain time and open a new connection, without falling into the black list. Don’t use multiple connections.

1 Like

Thank you for your reply. I’ll do some research on the server api.
OK, I have checked the available docs for the server. This did not provide an answer.
The app usually performs fine with one connection. However, if I create a 2nd connection in a go routine, the connections start to fail most of the time, with some successes on both threads. I would think this would be a common use case. A web server needs to support multiple clients. And multiple clients might need data from web sockets.

1 Like

Sure is what you would expect, but for trading/markets servers multiple client usually have to be multiple devices/IP. There are indeed custom user plans for doing what you want in many Finance APIs services

Try replacing ReadJSON with ReadMessage

From the docs (emphasis mine):

Stream real-time trades for US stocks, forex and crypto. Trades might not be available for some forex and crypto exchanges. In that case, a price update will be sent with volume = 0. A message can contain multiple trades. 1 API key can only open 1 connection at a time.

Maybe re-use the connection?