Concurrent programming problem

I have a concurrency issue where the program cannot terminate properly. I have searched many resources but have not found an answer. I am here to seek for help

I have posted the program below. Its function is to scan all files in the C drive and record the paths in a txt file. I have described my problem at the location marked TODO. To comment out the line of code at the TODO position will cause the entire program to fail to terminate, which is a strange phenomenon. I suspect it is a CPU cache issue, but it does not match the actual situation. I am seeking for help. Please do not mind the Chinese in the code.

My idea is to use an infinite for loop to constantly check if all four channels have no content, and then break out of the for loop and end the program. However, the fact is that if I do not perform some time-consuming operations inside the for loop, the program will never end. My code is running on Windows 10.

After my research, I simplified the original code. The first section below is the simplified code for easier understanding, and the second section is the original code.

This is additional information. The simplified code was created after identifying the root cause of the issue and removing unnecessary logic for easier reading. After removing runtime.Gosched() , the program may not exit properly. If possible, try increasing the input parameter of the rec function, for example, change it to rec(80) . This may be related to the number of CPU cores you have, where the higher the number of CPU cores, the larger the parameter value needed for rec . If you adjust the buffer size of the channel, you also need to modify the parameter value of rec accordingly. If the buffer is increased, the parameter value of rec must also be increased, otherwise the program may still not exit properly.

Simplified code:

package main

import (
    "fmt"
    "runtime"
)

var c = make(chan struct{}, 1)

func main() {
    rec(8)
    for {
        // TODO Not performing some time-consuming operations here will cause the entire program to fail to terminate.
        // 这里不做点耗时操作,会导致整个程序无法结束
        runtime.Gosched()
        if len(c) == 0 {
            fmt.Println("end")
            break
        }
    }
}
func rec(count int) {
    if count == 0 {
        return
    }
    c <- struct{}{}
    for i := 0; i < 5; i++ {
        go rec(count - 1)
    }
    <-c
}

Original code:

package main

import (
    "bufio"
    "fmt"
    "os"
    "runtime"
    "time"
)

var (
    countDir   = 0
    countFile  = 0
    countErr   = 0
    dirsPath   = `\dirsPath`
    filesPath  = `\filesPath`
    errsPath   = `\errsPath`
    chanFile   = make(chan string)
    chanDir    = make(chan string)
    chanErr    = make(chan string)
    writeClose = make(chan struct{})
    goMax      chan struct{}
)

func main() {
    goMax = make(chan struct{}, runtime.NumCPU()-1)
    date := time.Now().Format("20060102")
    go print()

    root := `C:\`
    pwd, _ := os.Getwd()
    go write(fmt.Sprintf("%s\\%s-%s.txt", pwd, dirsPath, date), fmt.Sprintf("%s\\%s-%s.txt", pwd, filesPath, date), fmt.Sprintf("%s\\%s-%s.txt", pwd, errsPath, date))

    recTraverse(root)

    for {
        // TODO Not performing some time-consuming operations here will cause the entire program to fail to terminate.
        // 这里不做点耗时操作,会导致整个程序无法结束
        runtime.Gosched()
        if len(goMax) == 0 && len(chanDir) == 0 && len(chanFile) == 0 && len(chanErr) == 0 {
            close(chanDir)
            close(chanFile)
            close(chanErr)
            writeClose <- struct{}{}
            break
        }
    }

    fmt.Printf("执行完毕, 共记录文件 %d 个, 文件夹 %d 个, 错误 %d 条 \n", countFile, countDir, countErr)
    // for {
    // }
}

func recTraverse(path string) {
    goMax <- struct{}{}
    entries, err := os.ReadDir(path)
    if err != nil {
        chanErr <- fmt.Sprintf("目录读取失败 %s ; 日志: %s ", path, err)
    }
    for _, entry := range entries {
        // 检查目录项是否是目录
        if entry.IsDir() {
            newPath := path + entry.Name()
            chanDir <- newPath
            go recTraverse(newPath + `\`)
        } else {
            chanFile <- path + entry.Name()
        }
    }
    <-goMax
}

func write(dirPath string, filePath string, errPath string) {

    // 打开文件,获取文件句柄和 bufio.Writer
    dirFile, err := os.Create(dirPath)
    fFile, err := os.Create(filePath)
    errFile, err := os.Create(errPath)
    if err != nil {
        panic(err)
    }
    defer dirFile.Close()
    defer fFile.Close()
    defer errFile.Close()
    dirWriter := bufio.NewWriter(dirFile)
    fileWriter := bufio.NewWriter(fFile)
    errWriter := bufio.NewWriter(errFile)

    for {
        select {
        case data, _ := <-chanDir:
            countDir++
            fmt.Fprintf(dirWriter, "%s\n", data)
        case data, _ := <-chanFile:
            countFile++
            fmt.Fprintf(fileWriter, "%s\n", data)
        case data, _ := <-chanErr:
            countErr++
            fmt.Fprintf(errWriter, "%s\n", data)
        case <-writeClose:
            break
        }
    }

    // 将缓冲区的内容写入文件
    dirWriter.Flush()
    fileWriter.Flush()
    errWriter.Flush()
}

func print() {
    fmt.Printf("\n")
    start := time.Now()
    ticker := time.Tick(time.Millisecond)
    for {
        select {
        case <-ticker:
            elapsed := time.Since(start)
            fmt.Printf("运行时间: %.3f 秒 \r", elapsed.Seconds())
        case <-writeClose:
            elapsed := time.Since(start)
            fmt.Printf("运行时间: %.3f 秒 \n", elapsed.Seconds())
            return
        }
    }
}

Hi @FIshInInnkGIT ,
There is a bug in the write function: you are breaking from the inner switch, not from the outer infinite for loop. The write function will never exit.

Not so, after my research I simplified the code to only include the recursive method and the main method. The issue was not with the write function. Below is the simplified code:

package main

import (
    "fmt"
    "runtime"
)

var c = make(chan struct{}, 1)

func main() {
    rec(8)
    for {
        runtime.Gosched() // TODO
        if len(c) == 0 {
            fmt.Println("end")
            break
        }
    }
}
func rec(count int) {
    if count == 0 {
        return
    }
    c <- struct{}{}
    for i := 0; i < 5; i++ {
        go rec(count - 1)
    }
    <-c
}

[/quote]

@FIshInInnkGIT Are you saying that your simplified code fixes the issue or that it demonstrates the issue? I tried running it both with runtime.Gosched() commented out and uncommented and the program completes both ways.

The simplified code only made it easier to observe where the problem was. If you are able to run the code after commenting out the runtime.Gosched() , try increasing the input parameter of the recursive function, for example, change it to rec(80) . If you are using the simplified code, that is.

Hi @FIshInInnkGIT ,

You did great by simplyfing the program. This is a great idea, and helps better understand any issue.

Hoewever, I am able to run this simplified code just fine, it works normally. I have increased rec(8) to rec(8000) and works just fine.

What is the behaviour on your machine ?

did you try it after removing runtime.Gosched()?

After commenting out runtime.Gosched() , running the program resulted in it never ending. Debugging showed that the channel was full. If I were to increase the buffer size of the channel, I would also need to increase the input parameter of the recursive function rec . Otherwise, the program may still not exit normally.

Hi @skillian and @FIshInInnkGIT

Yes , I did remove the Gosched(), it works just fine. Take a look for yourself: Go Playground - The Go Programming Language

I even ran it with both go1.20 and go1.19, works just fine. Tell me what else should I change.

I’m pretty sure that in the simplified example, the len(c) == 0 check happens before the rec goroutines start, so the count is still 0 and the program terminates.

@skillian ,

If the check for len(c) probably happens before the spawning of the first goroutines, and after the rec has written then consumed from the channel. This would cause the program to end normally, except that the goroutines did not run. The program ends, and fast.

If we remove the check, and replace it with a long enough sleep, the program would finish just fine for small values. For rec(8000) I get “panic: too many concurrent operations on a single file or socket (max 1048575)”

I do not see a scenario where this program runs forever (deadlock), it will end and likely fast. What scenario could make it deadlock ?

Unrelated: the complexity of this program is exponential O(5 to the power of the initial count with which rec is called). It is a very bad idea to ever implement any exponential algorithm (you run out of CPU, get integer overflows, etc fast, even for small inputs).

Concurrency-wise, calling rec in main function is synchronous, thus the check for len(c) == 0 happens after the goroutines.

I am not sure either. I can only confirm that this program is related to the number of CPU threads. If it runs normally on your computer, I do not understand why. Maybe you can ask a few friends to run it on their computers and see what happens. This program causes the program to fail to exit normally on my computer and my friends’ computers.

It could be that this is it. You PC may have less resources than mine, or the go playground, and simply having too many spawned goroutines slows it a lot. It looks like it is never ending, but it will end by panicking (too many concurrent operations on a single file or socket) because you have too many goroutines… but before panicking it takes a while as the computer becomes very slow.

Can you test this: try to run it for 8, 20, 40, and figure out the minimum number for which it will not exit. Then run it for that number and wait for half an hour, it should either finish or panic(too many concurrent operations on a single file or socket).

Not quite, this program does not cause a panic. The reason why it is related to CPU is that this program gets stuck in a deadlock (or an infinite loop) when the channel buffer is full. If the CPU resources are sufficient, the recursion logic will end even before the channel is full. In my guess, the reason why this program does not exit is that the channel is full, causing the goroutines to block and only one main goroutine is running the for loop. There is no time for the main goroutine to wake up the goroutines to consume the channel. This is why adding runtime.Gosched() can end the program. However, it still doesn’t make sense why the channel is full, considering that the channel should be consumed every time rec function ends. This may be related to the GPM scheduling mechanism and locking mechanism, but I have not found the answer yet.

Look at the exponential nature of the algoritm. If you picture each call of rec as a node in a tree, the lower levels of the tree have a huge amount of nodes.

Thus, if you can only run a few goroutines at a time, to schedule all the lower level nodes would take a lot of time. Lets say 1 goroutine has written to the channel, and it is preempted before it will read from the channel. Between these 2 consecutive runs of this goroutine, a great deal of other goroutines needs to be scheduled and run, all of them doing nothing as the channel is full.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.