I have a concurrency issue where the program cannot terminate properly. I have searched many resources but have not found an answer. I am here to seek for help
I have posted the program below. Its function is to scan all files in the C drive and record the paths in a txt file. I have described my problem at the location marked TODO. To comment out the line of code at the TODO position will cause the entire program to fail to terminate, which is a strange phenomenon. I suspect it is a CPU cache issue, but it does not match the actual situation. I am seeking for help. Please do not mind the Chinese in the code.
My idea is to use an infinite for loop to constantly check if all four channels have no content, and then break out of the for loop and end the program. However, the fact is that if I do not perform some time-consuming operations inside the for loop, the program will never end. My code is running on Windows 10.
After my research, I simplified the original code. The first section below is the simplified code for easier understanding, and the second section is the original code.
This is additional information. The simplified code was created after identifying the root cause of the issue and removing unnecessary logic for easier reading. After removing runtime.Gosched()
, the program may not exit properly. If possible, try increasing the input parameter of the rec
function, for example, change it to rec(80)
. This may be related to the number of CPU cores you have, where the higher the number of CPU cores, the larger the parameter value needed for rec
. If you adjust the buffer size of the channel, you also need to modify the parameter value of rec
accordingly. If the buffer is increased, the parameter value of rec
must also be increased, otherwise the program may still not exit properly.
Simplified code:
package main
import (
"fmt"
"runtime"
)
var c = make(chan struct{}, 1)
func main() {
rec(8)
for {
// TODO Not performing some time-consuming operations here will cause the entire program to fail to terminate.
// 这里不做点耗时操作,会导致整个程序无法结束
runtime.Gosched()
if len(c) == 0 {
fmt.Println("end")
break
}
}
}
func rec(count int) {
if count == 0 {
return
}
c <- struct{}{}
for i := 0; i < 5; i++ {
go rec(count - 1)
}
<-c
}
Original code:
package main
import (
"bufio"
"fmt"
"os"
"runtime"
"time"
)
var (
countDir = 0
countFile = 0
countErr = 0
dirsPath = `\dirsPath`
filesPath = `\filesPath`
errsPath = `\errsPath`
chanFile = make(chan string)
chanDir = make(chan string)
chanErr = make(chan string)
writeClose = make(chan struct{})
goMax chan struct{}
)
func main() {
goMax = make(chan struct{}, runtime.NumCPU()-1)
date := time.Now().Format("20060102")
go print()
root := `C:\`
pwd, _ := os.Getwd()
go write(fmt.Sprintf("%s\\%s-%s.txt", pwd, dirsPath, date), fmt.Sprintf("%s\\%s-%s.txt", pwd, filesPath, date), fmt.Sprintf("%s\\%s-%s.txt", pwd, errsPath, date))
recTraverse(root)
for {
// TODO Not performing some time-consuming operations here will cause the entire program to fail to terminate.
// 这里不做点耗时操作,会导致整个程序无法结束
runtime.Gosched()
if len(goMax) == 0 && len(chanDir) == 0 && len(chanFile) == 0 && len(chanErr) == 0 {
close(chanDir)
close(chanFile)
close(chanErr)
writeClose <- struct{}{}
break
}
}
fmt.Printf("执行完毕, 共记录文件 %d 个, 文件夹 %d 个, 错误 %d 条 \n", countFile, countDir, countErr)
// for {
// }
}
func recTraverse(path string) {
goMax <- struct{}{}
entries, err := os.ReadDir(path)
if err != nil {
chanErr <- fmt.Sprintf("目录读取失败 %s ; 日志: %s ", path, err)
}
for _, entry := range entries {
// 检查目录项是否是目录
if entry.IsDir() {
newPath := path + entry.Name()
chanDir <- newPath
go recTraverse(newPath + `\`)
} else {
chanFile <- path + entry.Name()
}
}
<-goMax
}
func write(dirPath string, filePath string, errPath string) {
// 打开文件,获取文件句柄和 bufio.Writer
dirFile, err := os.Create(dirPath)
fFile, err := os.Create(filePath)
errFile, err := os.Create(errPath)
if err != nil {
panic(err)
}
defer dirFile.Close()
defer fFile.Close()
defer errFile.Close()
dirWriter := bufio.NewWriter(dirFile)
fileWriter := bufio.NewWriter(fFile)
errWriter := bufio.NewWriter(errFile)
for {
select {
case data, _ := <-chanDir:
countDir++
fmt.Fprintf(dirWriter, "%s\n", data)
case data, _ := <-chanFile:
countFile++
fmt.Fprintf(fileWriter, "%s\n", data)
case data, _ := <-chanErr:
countErr++
fmt.Fprintf(errWriter, "%s\n", data)
case <-writeClose:
break
}
}
// 将缓冲区的内容写入文件
dirWriter.Flush()
fileWriter.Flush()
errWriter.Flush()
}
func print() {
fmt.Printf("\n")
start := time.Now()
ticker := time.Tick(time.Millisecond)
for {
select {
case <-ticker:
elapsed := time.Since(start)
fmt.Printf("运行时间: %.3f 秒 \r", elapsed.Seconds())
case <-writeClose:
elapsed := time.Since(start)
fmt.Printf("运行时间: %.3f 秒 \n", elapsed.Seconds())
return
}
}
}