Need to run schedulers concurrently with net/http

Hello everyone,
I am working on a project which is built using REST API and can have multiple merchant accounts. We need to run crons for every merchant per 10 mins. The cron urls are our server’s own apis. So I used Golang’s net/http package to fulfil this requirement.
I have thousands of merchants and there was hundreds of bookings per merchant which are going to be completed with the cron, to complete a booking I need to call two more schedulers with http.
I need to call all the schedulers concurrently using go routines.

when i tried to run the code for 1000 merchants, it starts giving EOF error,
Then, I have increased the open file limits to 999000 int my system.

then it starts giving error [dial tcp 127.0.0.1:3333: can’t assign requested address], after too many research, I came to a point that, this error is due to the available tcp ports into the system,
it has used all the available tcp ports in the system,

then I have added some delay for the merchants, if all the ports in the system are used, so that further code will be run, whenever there was ports available in the system.

here is the code I am using,

func TestHttp2(c *gin.Context) {
	fileName := "portlog" + time.Now().String() + ".txt"
	_, e := os.Create(fileName)
	if e != nil {
		fmt.Println(1, e)
	}
	nStr := c.Query("num")
	n, _ := strconv.Atoi(nStr)
	fmt.Println("running")
	var ports int
	for i := 1; i <= n; i++ {
		url := "http://localhost:8000/public/print-test?num=" + strconv.Itoa(i)
	check:
		if i%1000 == 0 {
			ports = PortCheck()
			if ports >= 20000 {
				time.Sleep(10 * time.Second)
				goto check
			}
		}

		go MakeRequest2(url, fileName, ports)
	}
	return
}

/*
 * Function to hit url with get method and close the response body
 */
func MakeRequest2(url, fileName string, port int) {
	data := test{
		FileName: fileName,
		Url:      url,
		Port:     port,
	}
	b, err := json.Marshal(data)
	if err != nil {
		fmt.Println(234234324, err)
	}
	jsonString := string(b)
	client := &http.Client{}
	url2 := url + "?num2="
	req, err := http.NewRequest("GET", url2, bytes.NewBuffer([]byte(jsonString)))
	if err != nil {
		fmt.Println(2, "===========================================", err, "==================================")
		panic(err)
		return
	}
	resp, err := client.Do(req)
	if err != nil {
		fmt.Println(1, "===========================================", err, "==================================")
		panic(err)
		return
	}
	io.Copy(ioutil.Discard, resp.Body)
	client.CloseIdleConnections()
	resp.Body.Close()
}

func PortCheck() int {
	cmd := exec.Command("netstat", "-an")
	grep := exec.Command("grep", "8000")
	wc := exec.Command("wc", "-l")
	pipe, _ := cmd.StdoutPipe()
	defer pipe.Close()

	grep.Stdin = pipe
	pipe2, _ := grep.StdoutPipe()
	defer pipe2.Close()
	wc.Stdin = pipe2
	// Run ps first.
	cmd.Start()
	grep.Start()
	out, err := wc.Output()
	if err != nil {
		// if there was any error, print it here
		fmt.Println("could not run command: ", err)
	}
	// otherwise, print the output from running the command
	fmt.Println("Output: ", string(out))
	fmt.Println(cmd)
	wc.Wait()
	ports, err := strconv.Atoi(strings.Split(string(out), "\n")[0])
	if err != nil {
		fmt.Println(err)
	}
	return ports
}

func PrintTest(c *gin.Context) {
	data := test{}
	_ = json.NewDecoder(c.Request.Body).Decode(&data)
	file, err := os.OpenFile(data.FileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
	if err != nil {
		fmt.Println(1, err)
		return
	}
	defer file.Close()
	_, err = file.WriteString(time.Now().String() + "==>" + data.Url + " ==> " + strconv.Itoa(data.Port) + "\n")
	if err != nil {
		fmt.Println(err)
		return
	}
	for i := 1; i <= 100; i++ {
		time.Sleep(2 * time.Second)
		url1 := "http://localhost:8000/public/test-func1?num=" + strconv.Itoa(i)
		url2 := "http://localhost:8000/public/test-func2?num=" + strconv.Itoa(i)
		go MakeRequest1(url1)
		go MakeRequest1(url2)
	}
	return
}

type test struct {
	Url      string `json:"url"`
	FileName string `json:"file_name"`
	Port     int    `json:"port"`
}

func TestFunc1(c *gin.Context) {
	time.Sleep(5 * time.Second)
	fmt.Println("test function 1")
}
func TestFunc2(c *gin.Context) {
	time.Sleep(5 * time.Second)
	fmt.Println("test function 2")
}

/*
 * Function to hit url with get method and close the response body
 */
func MakeRequest1(url string) {
	client := &http.Client{}
	req, err := http.NewRequest("GET", url, nil)
	if err != nil {
		fmt.Println(3, "===========================================", err, "==================================")
		panic(err)
		return
	}
	resp, err := client.Do(req)
	if err != nil {
		fmt.Println(4, "===========================================", err, "==================================")
		panic(err)
		return
	}
	io.Copy(ioutil.Discard, resp.Body)
	client.CloseIdleConnections()
	resp.Body.Close()
	return
}

In this I am calling the TestHttp2 functions first, it will run for 5000 merchants, it will check ports after ever 1000 merchants, if ports used are >20000 we will wait for ports to release,
then we are calling PrintTest function using http, in that func we are calling TestFunc1 and TestFunc2 using http command,
Now it starts giving same error

can’t assign requested address

because, we TeseFunc1 and TestFunc2 are consuming the ports, if i am not calling these 2 functions then the code was running perfectly with 2 millions of merchants, but i need to call both functions concurrently,

Then i have added port check condition in PrintTest function too:-

func PrintTest(c *gin.Context) {
	data := test{}
	_ = json.NewDecoder(c.Request.Body).Decode(&data)
	file, err := os.OpenFile(data.FileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
	if err != nil {
		fmt.Println(1, err)
		return
	}
	defer file.Close()
	_, err = file.WriteString(time.Now().String() + "==>" + data.Url + " ==> " + strconv.Itoa(data.Port) + "\n")
	if err != nil {
		fmt.Println(err)
		return
	}
 check:
	 	ports := PortCheck()
         	if ports >= 20000 {
	 		time.Sleep(10 * time.Second)
	 		goto check
	 	}
	for i := 1; i <= 100; i++ {
		time.Sleep(2 * time.Second)
		url1 := "http://localhost:8000/public/test-func1?num=" + strconv.Itoa(i)
		url2 := "http://localhost:8000/public/test-func2?num=" + strconv.Itoa(i)
		go MakeRequest1(url1)
		go MakeRequest1(url2)
	}
	return
}

now all the ports of the system are not consumed, but it starts giving error in the last

fork/exec /usr/bin/wc: resource temporarily unavailable

After that I tried to call TestFunc1 and TestFunc2 using curl command:-


/*
 * Function to hit url with get method and close the response body
 */
func CurlTest(url string) {
	cmd := exec.Command("curl", "-H", "Connection: close", "-H", "--no-keepalive", "-X", "GET", url)
	var out bytes.Buffer
	var stderr bytes.Buffer
	cmd.Stdout = &out
	cmd.Stderr = &stderr
	err := cmd.Start()
	if err == nil {
		_ = cmd.Wait()
	}
	// fmt.Println(i, time.Now())
	return
}
func PrintTest(c *gin.Context) {
	data := test{}
	_ = json.NewDecoder(c.Request.Body).Decode(&data)
	file, err := os.OpenFile(data.FileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
	if err != nil {
		fmt.Println(1, err)
		return
	}
	defer file.Close()
	_, err = file.WriteString(time.Now().String() + "==>" + data.Url + " ==> " + strconv.Itoa(data.Port) + "\n")
	if err != nil {
		fmt.Println(err)
		return
	}
	for i := 1; i <= 100; i++ {
		time.Sleep(2 * time.Second)
		url1 := "http://localhost:8000/public/test-func1?num=" + strconv.Itoa(i)
		url2 := "http://localhost:8000/public/test-func2?num=" + strconv.Itoa(i)
		go CurlTest(url1)
		go CurlTest(url2)
	}
	return
}

If I call the CurlTest func with go routines it was opening too many files in the system, and then it was starting giving error

http: Accept error: accept tcp [::]:8080: accept4: too many open files; retrying in 1s

even i have set the open file limit to around 1 million, it starts giving this error, whenever the open file limit has reached > 1 million.

Please help me to find out best solution of the problem,
I want to call the schedulers with net/http rather than curl, because curl is consuming too much resources.
Please suggest the best architecture for this problem.
can we handle the tcp ports situation in other way rather than adding delays?

  1. Implement a rate limit/worker pool and don’t try to connect all remotes at the same time. Spread the load.
  2. Check if you have perhaps just raised the softlimit and the hard limit remained at a million.
1 Like

Thanks for reply,
If I increase the soft limit, then I have to increase it every time whenever I restart my system.
And open file error comes only in the case, when I run the schedulers using curl,
I want to run schedulers using net/http.