How to understand this behavior of wg.Add()?

Hello. I have encountered a problem that beats me. Imagine I have about 350K file XML files (for testing, I am using the same file copied that many times; the file names are sample_1.xml, sample_2.xml, …). I want to parse them, that’s all. Here’s a mock up of the function to do the parsing:

func singleRun(xmlFile fs.DirEntry) {
	xmlFileOpened, err := os.Open(filepath.Join("data", xmlFile.Name()))
	if err != nil {
		log.Println(err)
		return
	}
	defer xmlFileOpened.Close()

	byteValue, _ := ioutil.ReadAll(xmlFileOpened)
	var users Users
	xml.Unmarshal(byteValue, &users)
	for i := 0; i < len(users.Users); i++ {
		// We would do something with the data here.
        // Here, I do nothing
	}
}

So, this function only opens the file, loops over users that are XML fields, and then closes the file.

Now, this is the first version of the function:

func PareseWithGoroutines() {
	files, err := os.ReadDir("data")
	if err != nil {
		log.Fatal(err)
	}
	var xmlFiles []fs.DirEntry
	for _, file := range files {
		if strings.HasSuffix(file.Name(), ".xml") {
			xmlFiles = append(xmlFiles, file)
		}
	}
	if len(files) == len(xmlFiles) {
		log.Println("Found", len(files), "XML files in the data folder.")
	} else {
		log.Print("Among", len(files), "files in the data folder,", len(xmlFiles), "XML files were found.")
	}

	var wg sync.WaitGroup
	wg.Add(len(xmlFiles))
	nFiles := 0

	for _, xmlFile := range xmlFiles {
		go func(file fs.DirEntry) {
			defer wg.Done()
			singleRun(file)
			nFiles += 1
		}(xmlFile)
	}
	wg.Wait()
	fmt.Println("Parsed", nFiles, "XML files.")
}

The heart of the function starts off with var wg sync.WaitGroup. This function works totally fine (checked). I wanted to compare its time with a slightly different version, in which I do not use wg.Add(len(xmlFiles)) once by wg.Add(1) in the loop. Here’s this version:

func PareseWithGoroutinesV2() {
	files, err := os.ReadDir("data")
	if err != nil {
		log.Fatal(err)
	}

	var xmlFiles []fs.DirEntry
	for _, file := range files {
		if strings.HasSuffix(file.Name(), ".xml") {
			xmlFiles = append(xmlFiles, file)
		}
	}
	if len(files) == len(xmlFiles) {
		log.Println("Found", len(files), "XML files in the data folder.")
	} else {
		log.Print("Among", len(files), "files in the data folder,", len(xmlFiles), "XML files were found.")
	}

	var wg sync.WaitGroup
	nFiles := 0

	for _, xmlFile := range xmlFiles {
		wg.Add(1)
		go func(file fs.DirEntry) {
			defer wg.Done()
			singleRun(file)
			nFiles += 1
		}(xmlFile)
	}
	wg.Wait()
	fmt.Println("Parsed", nFiles, "XML files.")
}

Do note that the only difference is removing wg.Add(len(xmlFiles)) and adding wg.Add(1) inside the loop, before running the next goroutine.

And what amazed me is that this second version does not work! It does not close the files, and after some time of running I get a lot of lines claiming that too many files are open:

...
2021/07/26 08:12:36 open data/sample_345429.xml: too many open files
2021/07/26 08:12:36 open data/sample_345556.xml: too many open files
2021/07/26 08:12:36 open data/sample_345652.xml: too many open files
...

Frankly, I have no idea what’s going on, though it might be a very simple mistake that I made here. The singleRun() function has the deferred close, the loop has wg.Done() inside the goroutine function, so I really have no idea what this can be. Does anyone have an idea what’s going on?

(I can copy here a full code, but it requires generating hundreds of thousands of XML files, so — since this seems to be a generic problem — maybe someone knows what’s going on without running the actual code?)

1 Like

Hi nyggus,

This seems to be more of a unix problem related to the number of file descriptors currently open. As to why this affects the second parse function and not the first, I am not sure, and I know this information would have helped you the most.

Two methods I have found on stack overflow, include immediately closing the open file at the end of the singleRun function rather than using defer. Or, using a counting semaphore to limit the number of open files.

Here are the links:
Method 1: https://stackoverflow.com/questions/37804804/too-many-open-file-error-in-golang

Method 2: https://stackoverflow.com/questions/38824899/golang-too-many-open-files-in-go-function-goroutine

I hope this helps,
Ed.

2 Likes

Thanks, Edward. Unfortunately, moving file close to the end of the function does not change anything. No other solution proposed in those threads helps. Basically the idea is to keep the number of open files limited, but do note that I am closing those files in singleRun(); and this is what makes things strange, because apparently this does not work — the files are not closed. Parsing one file lasts extremely shortly. Unless the code is so quick that it opens so many files at the same time? But in that case, would not the same problem occur in the first version of the code, the one with wg.Add(len(xmlFiles))?

This does seem a strange thing, also because the first code version works and the second does not, the difference between them being rather minor (is it minor indeed?). I do not see anything that could cause this strange behavior, but since I am no expert in Go, I hope I have overlooked something simple.

OK, it seems I found a solution, which can be found here.

The idea is to both defer closing the function and return it (as error). The below version gives no problems:

func singleRun(xmlFile fs.DirEntry) error {
	xmlFileOpened, err := os.Open(filepath.Join("data", xmlFile.Name()))
	if err != nil {
		log.Println(err)
		return err
	}
	defer xmlFileOpened.Close()

	byteValue, _ := ioutil.ReadAll(xmlFileOpened)
	xmlFileOpened.Close()

	var users Users
	xml.Unmarshal(byteValue, &users)
	for i := 0; i < len(users.Users); i++ {
		// We would do something with the data here.
        // Here, I do nothing
	}
	return xmlFileOpened.Close()
}

Of course, the call to this function needs to be revised slightly, to read and check the error. But this does work, however ugly and unintuitive this trick is.

But if anyone can show something more idiomatic, I will be obliged!

I’m glad you’ve found a solution. I agree, very unintuitive. If anyone can shed more light on it, I would also be obliged!

1 Like

Oh my, so it seems this solution does not work every time…

You have 350,000 XML files? What is the result of ulimit -n? That is the maximum number of file descriptors a process can have. Mine is 256, which means I would quickly run in to problems with your code. You could increase this, but it’s really not a good solution (you have already seen that your code intermittently works, and that’s because sometimes your function is returning fast enough that you don’t hit the maximum descriptors, and sometimes it is not). See this comment on SO:

If you are increasing your ulimit, you will still run out of file descriptors eventually. Try using a buffered channel and limit the number of go-routines that open a file to below the ulimit. 1000 open files at a time should be more then enough…

Basically, you just need to limit the number of open files to a reasonable number, supported by whatever computer you’re running this on. To do that, use:

2 Likes

Thanks a lot, Dean!

Actually, ulimit -n gives me a little more than 256: 65,536 :wink:. But thanks to you, I know what’s the problem. I will indeed use a buffered channel to handle the situation. I will post here the solution, when (or if) I succeed to find it.

OK, so indeed buffered channels do work!

func goroutinesBufferedChannel(chanSize int) {
	files, err := os.ReadDir("data")
	if err != nil {
		log.Fatal(err)
	}
	var xmlFiles []fs.DirEntry
	for _, file := range files {
		if strings.HasSuffix(file.Name(), ".xml") {
			xmlFiles = append(xmlFiles, file)
		}
	}
	if len(files) == len(xmlFiles) {
		log.Println("Found", len(files), "XML files in the data folder.")
	} else {
		log.Print("Among", len(files), "files in the data folder,", len(xmlFiles), "XML files were found.")
	}

	nFiles := 0
	chanFiles := make(chan fs.DirEntry, chanSize)
	go runReader(xmlFiles, chanFiles)
	for _ = range chanFiles {
		nFiles += 1
	}
	fmt.Println("Parsed", nFiles, "XML files.")
}

func runReader(xmlFiles []fs.DirEntry, chanFiles chan fs.DirEntry) {
	for _, xmlFile := range xmlFiles {
		_ = singleRun(xmlFile)
		chanFiles <- xmlFile
	}
	close(chanFiles)
}

BTW, the script with smaller buffers, like 10 or 100, work slower than those with 10,000, but increasing it to 20,000 or 50,000 slowed the script down. I suppose it depends on various elements, including the sizes of the XML files, and surely on the system itself.

1 Like

Hm, actually, I feel this is not really a concurrent solution… When I use go runReader(xmlFiles, chanFiles), this is just one goroutine doing all the work plus the main goroutine. I will work on this and report here if I find a really concurrent solution. (But this is a nice example for me to learn all this stuff, I must admit :smiley: .)

OK, then! Thanks to @Dean_Davidson, I was able to solve the issue using a worker pool. Here’s a shortened version of the code (which does work fast!):

// worker is based on https://gobyexample.com/worker-pools
func worker(id int, jobs <-chan fs.DirEntry, results chan<- int) {
	for j := range jobs {
		_ = singleRun(j)
		// send whatever to the results channel; later results can get the return of singleRun.
        results <- 1 
	}
}

// readAll parses all XML files from dirPath.
// It uses a working pool with noOfWorkers; the code structure comes from https://gobyexample.com/worker-pools
func readAll(dirPath string, noOfWorkers int) {
	// Read all files from the data dir
	files, err := os.ReadDir(dirPath)
	if err != nil {
		log.Fatal(err)
	}
	var xmlFiles []fs.DirEntry
	for _, file := range files {
		if strings.HasSuffix(file.Name(), ".xml") {
			xmlFiles = append(xmlFiles, file)
		}
	}
	if len(files) == len(xmlFiles) {
		log.Println("Found", len(files), "XML files in the data folder.")
	} else {
		log.Print("Among", len(files), "files in the data folder,", len(xmlFiles), "XML files were found.")
	}

	// Set up working pool
	numJobs := len(xmlFiles)
	jobs := make(chan fs.DirEntry, numJobs)
	results := make(chan int, numJobs)

	nFiles := 0
	for w := 1; w <= noOfWorkers; w++ {
		go worker(w, jobs, results)
	}

	for _, xmlFile := range xmlFiles {
		jobs <- xmlFile
	}
	close(jobs)

	for a := 1; a <= numJobs; a++ {
		<-results
	}
	fmt.Println("Parsed", nFiles, "XML files.")
}

// singleRun parses a single XML file
func singleRun(xmlFile fs.DirEntry) string {
	xmlFileOpened, err := os.Open(filepath.Join("..", "data", xmlFile.Name()))
	if err != nil {
		log.Println(err)
	}
	defer xmlFileOpened.Close()

	byteValue, _ := ioutil.ReadAll(xmlFileOpened)
	xmlFileOpened.Close()

	var users Users
	xml.Unmarshal(byteValue, &users)
	for i := 0; i < len(users.Users); i++ {
		// We can do something with the data here
	}
	// just return whatever to show that it's working fine :-)
	return users.Users[0].Type
}

func main() {
	readAll("../data", 10000)
}

Working on this program was a good lesson to me. Thanks all!

1 Like

I’m glad you built a workable solution! Yeah - it makes sense that there’s a point of diminishing returns. At some point you’re hitting your total throughput might be less with too many files open and trying to be read at once.

1 Like

BTW, I was quite disappointed to see that parsing those files in Python, using multiprocessing and eight logical (four physical) cores, was about 6 times faster than the above Go code. Perhaps I could optimize the Go code, but for the moment that’s not something I am able to do. Besides, though Python is very slow, the lxml library I used is known for its speed. As far as I remember, it was written using Cython, and Cython is a powerful sub-language of Python — and the resulting script spends most of the time in C, not in Python. But still, I did not expect Go to be as slower than Python. For the moment I have no idea what to do, but since I am learning Go, maybe I will be able to find a way to make the code faster.

Both versions of the code look basically OK, so I don’t really see why the second version is not working.
The only thing that strikes me is that you are not synchronizing the writing to the shared variable nFiles.
You should perhaps change this to an sync.atomic Add operation.
This does not explain the behaviour you are seeing.
Why not try running with the race detector enabled?

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.