[ASK] Multithreaded input from each line

Hi,

i want to make a multithreaded job by reading each lines (more than 100k line) of a file.
for example i have a file contains this :

job-a
job-b
job-c
job-d

and now i want to give each my worker to do each job but not repeating the same job.

here is my current code : https://play.golang.org/p/SDShrJiNUN
from my code above, worker only doing the last job and its repeated. :frowning:
it was really far from my expectation.

any help would be appreciated.

thanks and regards

1 Like

I think you are trying to do something like https://play.golang.org/p/Xd0K78QPSG

Without knowing the details of what a job entails, it is hard to help in a specific way. Depending on the workload, the approach I provided may be performant enough, but I don’t really expect it be very good.

I would start by writing a single-threaded version (using a smaller dataset if needed) to gain an understanding of the work that needs to happen. It also needs to be correct. You will need enough tests to show that any optimized (by multithreading or other techniques) version is also correct.

The next step is to figure out the minimal scalable unit (explanation with example) and write out the parallel version.

Without knowing more about the actual work that needs to be done, my instincts say:

  • mmap the file
  • assign each worker a chunk of the file, by bytes
  • each worker is responsible for the first full job spec in its chunk through the last job spec that starts before its chunk ends. This is a little tricky, but I have done it before for csv files; make sure you test this throughly.
  • try to handle results without communicating between workers or coordinators. Communication is overhead and reduces performance.

Hope this helps.

Hi @gonoob,

The output of your code shows the repeated output of a single worker. This happens because the workers run an infinite for loop with no blocking calls. Because of this, the worker that runs first takes all CPU time, and all other goroutines have no chance to run.

The reason for this seemingly unexpected behavior is that the runtime scheduler is optimized for efficiency, so it only switches the execution context to another goroutine if the current goroutine blocks. (Calls that can block are, for example, I/O calls, system calls, or calls to time.Sleep().)

In this modification of your playground code, the workers also print their ID and the job # they receive. When you run this code, you can see that worker #5 executes job #1 endlessly.

Now remove the two comments from the "time" import and from the "time.Sleep" statement in the worker. Now other workers get a chance to run when the current worker goes asleep, and the output looks more like what you might have expected.

The output reveals something else.

You might have noticed that the job ID cycles from 1 to 5 once and then is 0 for all subsequent results.

The nested loop in main() creates 26*5 workers; yet the jobs queue contains only five values. After these five values have been consumed by the workers, the jobs channel produces null values because it is already closed.

This is a quite counterintuitive behavior of channels. Reading from a closed channel does not error out but rather returns the null value of the channel type.

Luckily, the <- operator can return two values on request. The second one is a boolean that switches to false once a channel is closed and contains no more buffered values. With the “comma,ok” idiom you can make use of this.

job, ok := <-jobs
if !ok {
    // no more jobs
    return
}
// do the job

But if the only purpose of the jobs channel is to start all workers at once, you can simply close the channel after all coworkers have been created. Then all workers can read a zero value from the channel and start executing.

Another option: Use a range loop to read from a channel.

for item := range channel { ... } 

This loop stops automatically when the channel is closed. Here is a simplified code where the work items are sent to the workers through a channel, and the workers range over the channel until it is closed.

(For extra fun, comment out the two time lines again and see what happens.)

This code might not be exactly what you are trying to achieve, but I think the concepts shown here can be easily taken over into your scenario.

Rookie mistake: not shadowing the cylcle variable (line). Add line:=line in the for cycle.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.