How can I bypass the maximum allowed concurrucny connections

I’ve 2 csv files to read data from:

  1. AllTransactions that is of 2_618_583 records
  2. SalesTransactions that is of 1_070_305 records

There is a field in the SalesTransactions i.e. the small file, not available in the AllTransactions i.e. the big file, I need to add this field for each record if the transaction in the big file, if this transaction is a sales transaction (ST_ID begins with “5”).

The idea came to my mind, is to make a for loop iterating for each line at the small file whenever the matched criteria with the big file is found, this means, I’ll need to read the 1_070_305 records at the SalesTransaction by 1_070_305 times. which is more than the allowed 1_048_576

So, I thought to use goroutine, and wrote my code as:

func NetTrx(item string, allTrx *[]models.MainModel, salesTrx *[]models.SalesTransactions) {

	var wg sync.WaitGroup

	for _, v := range *allTrx {
		trx := models.NetTrasActions{
			ST_TYPE:      v.ST_TP,
			ITEM_ID:      v.ITEM_ID,
			ITEM_NAME:    v.ITEM_NAME,
			QTY:          v.QTY,
			CURNT_COST_L: 0,
			PRICE:        v.PRICE,
			TTL_VAL:      v.QTY * v.PRICE,
			TTL_CST:      0,
			SLS_CNTR_ID:  v.SLS_CNTR_ID,
			BARCODE:      v.BARCODE,
		}

		if strings.HasPrefix(v.ST_ID, "5") { // sales orders transactions starts with 5
			wg.Add(1)
			go func() {
				defer wg.Done()
				for _, s := range *salesTrx {
					if v.ST_ID == s.ST_ID && v.ITEM_ID == s.ITEM_ID {
						trx.CURNT_COST_L = s.CURNT_COST_L
						fmt.Println("matched found for item", v.ITEM_ID, "cost is:", s.CURNT_COST_L)
						break
					}
				}
				fmt.Println("Exit goroutine")
			}()
		}

    wg.Wait()

    // balance of the code
}

While running the above, I got the error as:

PS D:\Desktop\Bravo for Power BI\2022\YTDTransactions> go run trx
Number of transactions: 2618583
Number of sales transactions: 1070305
Exit goroutine
Exit goroutine
Exit goroutine
Exit goroutine
Exit goroutine
Exit goroutine
Exit goroutine
Exit goroutine
matched found for item 100000105 cost is: 74
Exit goroutine
matched found for item 100000105 cost is: 74
Exit goroutine
matched found for item 100000105 cost is: 74
Exit goroutine
matched found for item 100000105 cost is: 74
Exit goroutine
.
.
.
Panic: too many concurrent operations on a single file or socket (max 1048575)

How can I bypass this limitation, should not the wg.Done() be closing the unrequired qoroutine once it is completed, so the number of goroutines is reduced by 1 each time the Exit goroutine is printed, which is printed just before wg.Done()

My assumtion is the million should not be running all together, some will be opened, others will be closed, the operation in each goroutine is quick, so qoroutines whould be closed regularly, and not reaching a million concurrent ones!

How are you calling NetTrx? NetTrx itself doesn’t perform any IO that I can see other than fmt.Println to standard output. I suspect the problem is somewhere else.

I’ve the model as below:

package models

import "time"

type NetTrasActions struct {
	ST_TYPE, QUARTER, YearDay                             int
	Weekday                                               time.Weekday
	MONTH                                                 time.Month
	DATE, ITEM_ID, ITEM_NAME                              string
	QTY, CURNT_COST_L, PRICE, TTL_VAL, TTL_CST            float64
	SLS_CNTR_ID, BARCODE, DESCR, SPL_ID, CL_1, CL_2, CL_3 string
}

And initiate the model holders as:

package global

import "trx/models"

var TotalTransactions []models.MainModel
var SalesTransactions []models.SalesTransactions

Then reading and calling the data as:

package main

import (
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"trx/global"
)

func main() {
	allTrx := "ITEMS_ACT_DETAIL.csv"
	global.TotalTransactions = readAllTrxFile(allTrx)
	fmt.Println("Number of transactions:", len(global.TotalTransactions))

	salesTrx := "SLS_DT_ALL_V.csv"
	global.SalesTransactions = readSalesTrxFile(salesTrx)
	fmt.Println("Number of sales transactions:", len(global.SalesTransactions))

	go NetTrx("133098")

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt, syscall.SIGTERM)

	<-c
}

The csv file for AllTransactions is read as:

package main

import (
	"encoding/csv"
	"fmt"
	"io"
	"os"
	"strconv"
	"time"
	"trx/models"
)

func readAllTrxFile(filePath string) (transactions []models.MainModel) {
	isFirstRow := true
	headerMap := make(map[string]int)

	// Load a csv file.
	f, _ := os.Open(filePath)

	// Create a new reader.
	r := csv.NewReader(f)

	r.Comma = '\t'
	r.Comment = '#'

	for {
		// Read row
		record, err := r.Read()

		// Stop at EOF.
		if err == io.EOF {
			break
		}

		checkError("Some other error occurred", err)

		// Handle first row case
		if isFirstRow {
			isFirstRow = false

			// Add mapping: Column/property name --> record index
			for i, v := range record {
				headerMap[v] = i
			}

			// Skip next code
			continue
		}

		//  If you have 24 hours format, you should change the hour part in layout to 15 instead of 03
		layout := "2006-01-02 15:04:05.000"

		s := record[headerMap["USR_INS_DATE"]]
		var day, month string
		var d, y int
		var m time.Month
		if len(s) > 0 {
			theTime, err := time.Parse(layout, record[headerMap["USR_INS_DATE"]])
			if err != nil {
				fmt.Println("Could not parse time:", err)
				fmt.Println(record[headerMap["USR_INS_DATE"]])
			}

			y, m, d = theTime.Date()
			if d < 10 {
				day = fmt.Sprintf("0%v", d)
			} else {
				day = fmt.Sprintf("%v", d)
			}

			i := int(m)
			if i < 10 {
				month = fmt.Sprintf("0%v", i)
			} else {
				month = fmt.Sprintf("%v", i)
			}
		} else {
			day, month, y = "00", "00", 0
		}
		//trx.DATE1 = fmt.Sprintf("%v/%v/%v", day, month, y)

		trx := models.MainModel{
			IDSRL:         record[headerMap["IDSRL"]],
			ITEM_ID:       record[headerMap["ITEM_ID"]],
			ITEM_NAME:     record[headerMap["ITEM_NAME"]],
			WH_ID:         record[headerMap["WH_ID"]],
			ST_TP:         0,
			ST_ID:         record[headerMap["ST_ID"]],
			ST_DATE:       record[headerMap["ST_DATE"]],
			QTY:           0,
			FREE_QTY:      0,
			OPN_COST_L:    0,
			OPN_COST_F:    0,
			DESCR:         record[headerMap["DESCR"]],
			POSTED:        record[headerMap["POSTED"]],
			HWMNY:         record[headerMap["HWMNY"]],
			PRICE:         0,
			DISCNT:        0,
			BR_ID:         record[headerMap["BR_ID"]],
			STS:           record[headerMap["STS"]],
			BATCH_NO:      record[headerMap["BATCH_NO"]],
			EXPR_DATE:     record[headerMap["EXPR_DATE"]],
			SEASON_ID:     record[headerMap["SEASON_ID"]],
			CUS_ID:        record[headerMap["CUS_ID"]],
			CUS_NM1:       record[headerMap["CUS_NM1"]],
			SLS_CNTR_ID:   record[headerMap["SLS_CNTR_ID"]],
			CST_ID:        record[headerMap["CST_ID"]],
			SLS_MAN_ID:    record[headerMap["SLS_MAN_ID"]],
			USR_INS_DATE:  fmt.Sprintf("%v/%v/%v", day, month, y),
			HD_SRL:        record[headerMap["HD_SRL"]],
			CL_1:          record[headerMap["CL_1"]],
			CL_2:          record[headerMap["CL_2"]],
			CL_3:          record[headerMap["CL_3"]],
			CL_4:          record[headerMap["CL_4"]],
			CL_5:          record[headerMap["CL_5"]],
			CL_ALL:        record[headerMap["CL_ALL"]],
			ST_TYPE_TP:    record[headerMap["ST_TYPE_TP"]],
			SPL_ID:        record[headerMap["SPL_ID"]],
			WH_ID_2:       record[headerMap["WH_ID_2"]],
			SPL_INV_NO:    record[headerMap["SPL_INV_NO"]],
			ITEM_COLOR_ID: record[headerMap["ITEM_COLOR_ID"]],
			ITEM_SIZE_ID:  record[headerMap["ITEM_SIZE_ID"]],
			TXT1:          record[headerMap["TXT1"]],
			TXT2:          record[headerMap["TXT2"]],
			TXT3:          record[headerMap["TXT3"]],
			TXT4:          record[headerMap["TXT4"]],
			TXT5:          record[headerMap["TXT5"]],
			TXT6:          record[headerMap["TXT6"]],
			TXT7:          record[headerMap["TXT7"]],
			TXT8:          record[headerMap["TXT8"]],
			TXT9:          record[headerMap["TXT9"]],
			TXT10:         record[headerMap["TXT10"]],
			I_ID:          record[headerMap["I_ID"]],
			BARCODE:       record[headerMap["BARCODE"]],
			BINNO:         record[headerMap["BINNO"]],
			SUB_BIN:       record[headerMap["SUB_BIN"]],
			Value:         0,
		}

		if ST_TP, err := strconv.Atoi(record[headerMap["ST_TP"]]); err == nil {
			trx.ST_TP = ST_TP
		}

		if QTY, err := strconv.ParseFloat(record[headerMap["QTY"]], 64); err == nil {
			trx.QTY = QTY
		}
		if FREE_QTY, err := strconv.ParseFloat(record[headerMap["FREE_QTY"]], 64); err == nil {
			trx.FREE_QTY = FREE_QTY
		}

		if OPN_COST_L, err := strconv.ParseFloat(record[headerMap["OPN_COST_L"]], 64); err == nil {
			trx.OPN_COST_L = OPN_COST_L
		}
		if OPN_COST_F, err := strconv.ParseFloat(record[headerMap["OPN_COST_F"]], 64); err == nil {
			trx.OPN_COST_F = OPN_COST_F
		}

		if PRICE, err := strconv.ParseFloat(record[headerMap["PRICE"]], 64); err == nil {
			trx.PRICE = PRICE
		}
		if DISCNT, err := strconv.ParseFloat(record[headerMap["DISCNT"]], 64); err == nil {
			trx.DISCNT = DISCNT
		}

		if Value, err := strconv.ParseFloat(record[headerMap["Value"]], 64); err == nil {
			trx.Value = Value
		}

		// Create new transaction and add to transactions array
		transactions = append(transactions, trx)
	}
	return
}

The sales only data are read almost in the same way, as:

package main

import (
	"encoding/csv"
	"fmt"
	"io"
	"os"
	"strconv"
	"time"
	"trx/models"
)

func readSalesTrxFile(filePath string) (salesTrx []models.SalesTransactions) {
	isFirstRow := true
	headerMap := make(map[string]int)

	// Load a csv file.
	f, _ := os.Open(filePath)

	// Create a new reader.
	r := csv.NewReader(f)

	r.Comma = '\t'
	r.Comment = '#'

	for {
		// Read row
		record, err := r.Read()

		// Stop at EOF.
		if err == io.EOF {
			break
		}

		checkError("Some other error occurred", err)

		// Handle first row case
		if isFirstRow {
			isFirstRow = false

			// Add mapping: Column/property name --> record index
			for i, v := range record {
				headerMap[v] = i
			}

			// Skip next code
			continue
		}
		theTime, err := time.Parse("2006-01-02 03:04:05", record[headerMap["DATE1"]])
		if err != nil {
			fmt.Println("Could not parse time:", err)
		}
		var day, month string
		y, m, d := theTime.Date()
		if d < 10 {
			day = fmt.Sprintf("0%v", d)
		} else {
			day = fmt.Sprintf("%v", d)
		}

		i := int(m)
		if i < 10 {
			month = fmt.Sprintf("0%v", i)
		} else {
			month = fmt.Sprintf("%v", i)
		}

		trx := models.SalesTransactions{
			QUARTER:        (int(theTime.Month()) + 2) / 3,
			YearDay:        theTime.YearDay(),
			Weekday:        theTime.Weekday(),
			MONTH:          theTime.Month(),
			ST_TYPE:        record[headerMap["ST_TYPE"]],
			ST_ID:          record[headerMap["ST_ID"]],
			DATE1:          fmt.Sprintf("%v/%v/%v", day, month, y),
			CUR_ID:         record[headerMap["CUR_ID"]],
			SRL:            record[headerMap["SRL"]],
			ITEM_ID:        record[headerMap["ITEM_ID"]],
			ITEM_NAME:      record[headerMap["ITEM_NAME"]],
			QTY:            0,
			FREE_QTY:       0,
			QTY_ALL:        0,
			HW_MNY:         record[headerMap["HW_MNY"]],
			CST:            record[headerMap["CST"]],
			PRICE:          0,
			TTL_VAL_B4_DIS: 0,
			DISCNT:         0,
			TTL_VAL:        0,
			TTL_CST:        0,
			GN:             record[headerMap["GN"]],
			GN_PR:          record[headerMap["GN_PR"]],
			EX_PR:          record[headerMap["EX_PR"]],
			CUS_ID:         record[headerMap["CUS_ID"]],
			CUS_NAME:       record[headerMap["CUS_NAME"]],
			ADRS:           record[headerMap["ADRS"]],
			CUS_MOBILE:     record[headerMap["CUS_MOBILE"]],
			CUS_CASH_ID:    record[headerMap["CUS_CASH_ID"]],
			ACC_ID:         record[headerMap["ACC_ID"]],
			SLS_CNTR_ID:    record[headerMap["SLS_CNTR_ID"]],
			BR_ID:          record[headerMap["BR_ID"]],
			ST_TP:          record[headerMap["ST_TP"]],
			BARCODE:        record[headerMap["BARCODE"]],
			BATCH_NO:       record[headerMap["BATCH_NO"]],
			SEASON_ID:      record[headerMap["SEASON_ID"]],
			DESCR:          record[headerMap["DESCR"]],
			POSTED:         record[headerMap["POSTED"]],
			FRM_WHID:       record[headerMap["FRM_WHID"]],
			SPL_ID:         record[headerMap["SPL_ID"]],
			SPL_INV_NO:     record[headerMap["SPL_INV_NO"]],
			CL_1:           record[headerMap["CL_1"]],
			CL_2:           record[headerMap["CL_2"]],
			CL_3:           record[headerMap["CL_3"]],
			CL_4:           record[headerMap["CL_4"]],
			CL_5:           record[headerMap["CL_5"]],
			CL_ALL:         record[headerMap["CL_ALL"]],
			FRM_WHID1:      record[headerMap["FRM_WHID1"]],
			MKH_ID:         record[headerMap["MKH_ID"]],
			SLS_MAN_ID:     record[headerMap["SLS_MAN_ID"]],
			SLS_MAN_ID_DT:  record[headerMap["SLS_MAN_ID_DT"]],
			UNIT:           record[headerMap["UNIT"]],
			OPN_BAL:        0,
			OPN_COST_L:     0,
			CURNT_COST_L:   0,
			MAIN_UNIT:      0,
			PRICE1:         0,
			SRVS_IT:        record[headerMap["SRVS_IT"]],
			FIXD_BOUNS:     0,
			CST_ID:         record[headerMap["CST_ID"]],
			USR_INS:        record[headerMap["USR_INS"]],
			USR_INS_DATE:   record[headerMap["USR_INS_DATE"]],
			USR_UPD:        record[headerMap["USR_UPD"]],
			USR_UPD_DATE:   record[headerMap["USR_UPD_DATE"]],
			DISCNT1:        0,
			DISCNT2:        0,
			DISCNT3:        0,
			AUTO_PUR_ID:    record[headerMap["AUTO_PUR_ID"]],
			QTY_UNIT:       0,
			PRICE_UNIT:     0,
			CST_UNIT:       0,
		}

		if QTY, err := strconv.ParseFloat(record[headerMap["QTY"]], 64); err == nil {
			trx.QTY = QTY
		}
		if FREE_QTY, err := strconv.ParseFloat(record[headerMap["FREE_QTY"]], 64); err == nil {
			trx.FREE_QTY = FREE_QTY
		}
		if QTY_ALL, err := strconv.ParseFloat(record[headerMap["QTY_ALL"]], 64); err == nil {
			trx.QTY_ALL = QTY_ALL
		}
		if PRICE, err := strconv.ParseFloat(record[headerMap["PRICE"]], 64); err == nil {
			trx.PRICE = PRICE
		}
		if TTL_VAL_B4_DIS, err := strconv.ParseFloat(record[headerMap["TTL_VAL_B4_DIS"]], 64); err == nil {
			trx.TTL_VAL_B4_DIS = TTL_VAL_B4_DIS
		}
		if DISCNT, err := strconv.ParseFloat(record[headerMap["DISCNT"]], 64); err == nil {
			trx.DISCNT = DISCNT
		}
		if TTL_VAL, err := strconv.ParseFloat(record[headerMap["TTL_VAL"]], 64); err == nil {
			trx.TTL_VAL = TTL_VAL
		}
		if TTL_CST, err := strconv.ParseFloat(record[headerMap["TTL_CST"]], 64); err == nil {
			trx.TTL_CST = TTL_CST
		}
		if OPN_BAL, err := strconv.ParseFloat(record[headerMap["OPN_BAL"]], 64); err == nil {
			trx.OPN_BAL = OPN_BAL
		}
		if OPN_COST_L, err := strconv.ParseFloat(record[headerMap["OPN_COST_L"]], 64); err == nil {
			trx.OPN_COST_L = OPN_COST_L
		}
		if CURNT_COST_L, err := strconv.ParseFloat(record[headerMap["CURNT_COST_L"]], 64); err == nil {
			trx.CURNT_COST_L = CURNT_COST_L
		}
		if MAIN_UNIT, err := strconv.ParseFloat(record[headerMap["MAIN_UNIT"]], 64); err == nil {
			trx.MAIN_UNIT = MAIN_UNIT
		}
		if PRICE1, err := strconv.ParseFloat(record[headerMap["PRICE1"]], 64); err == nil {
			trx.PRICE1 = PRICE1
		}

		if FIXD_BOUNS, err := strconv.ParseFloat(record[headerMap["FIXD_BOUNS"]], 64); err == nil {
			trx.FIXD_BOUNS = FIXD_BOUNS
		}
		if DISCNT1, err := strconv.ParseFloat(record[headerMap["DISCNT1"]], 64); err == nil {
			trx.DISCNT1 = DISCNT1
		}
		if DISCNT2, err := strconv.ParseFloat(record[headerMap["DISCNT2"]], 64); err == nil {
			trx.DISCNT2 = DISCNT2
		}
		if DISCNT3, err := strconv.ParseFloat(record[headerMap["DISCNT3"]], 64); err == nil {
			trx.DISCNT3 = DISCNT3
		}

		if QTY_UNIT, err := strconv.ParseFloat(record[headerMap["QTY_UNIT"]], 64); err == nil {
			trx.QTY_UNIT = QTY_UNIT
		}
		if PRICE_UNIT, err := strconv.ParseFloat(record[headerMap["PRICE_UNIT"]], 64); err == nil {
			trx.PRICE_UNIT = PRICE_UNIT
		}
		if CST_UNIT, err := strconv.ParseFloat(record[headerMap["CST_UNIT"]], 64); err == nil {
			trx.CST_UNIT = CST_UNIT
		}

		// Create new transaction and add to transactions array
		salesTrx = append(salesTrx, trx)
	}
	return
}

The only IO is to standard output in the fmt.Println calls. Maybe you’re getting more than a million goroutines and they’re all blocking on the call to fmt.Println? Here’s the same code, but I’m sending the messages to a channel that prints the messages one by one from a single goroutine:

func NetTrx(item string, allTrx *[]models.MainModel, salesTrx *[]models.SalesTransactions) {
	var wg sync.WaitGroup
	msgs := make(chan string, 128)
	defer close(msgs)
	println := func(args ...interface{}) {
		msgs <- fmt.Sprintln(args...)
	}
	go func() {
		for msg := range msgs {
			fmt.Println(msg)
		}
	}()
	for _, v := range *allTrx {
		trx := models.NetTrasActions{
			ST_TYPE:      v.ST_TP,
			ITEM_ID:      v.ITEM_ID,
			ITEM_NAME:    v.ITEM_NAME,
			QTY:          v.QTY,
			CURNT_COST_L: 0,
			PRICE:        v.PRICE,
			TTL_VAL:      v.QTY * v.PRICE,
			TTL_CST:      0,
			SLS_CNTR_ID:  v.SLS_CNTR_ID,
			BARCODE:      v.BARCODE,
		}
		if strings.HasPrefix(v.ST_ID, "5") { // sales orders transactions starts with 5
			wg.Add(1)
			go func() {
				defer wg.Done()
				for _, s := range *salesTrx {
					if v.ST_ID == s.ST_ID && v.ITEM_ID == s.ITEM_ID {
						trx.CURNT_COST_L = s.CURNT_COST_L
						println("matched found for item", v.ITEM_ID, "cost is:", s.CURNT_COST_L)
						break
					}
				}
				println("Exit goroutine")
			}()
		}
	wg.Wait()
	// balance of the code
}

This is just a guess; I’m really not sure yet!