Hi! I’m new to Golang.
I have a small parser that writes found data to Postgres, as database framework I use https://github.com/jackc/pgx.
I write parsed data to an unbuffered channel from various goroutines.
I have special goroutine where I read data from this channel and write it to the database.
I’m debugging an application and it hangs forever sometime after (perhaps waiting for a free connection to a database in the pool).
How to determine which goroutine is blocking execution?
I’ve heard that there is a pprof, but I never used it.
Thanks.
Minimal example:
I’ve struct like this:
ParsingResults struct {
parser DataParser
data []*common.Data
err error
}
in separate goroutine I initialize unbuffered channel like this:
results = make(chan *ParsingResults)
then I start various goroutines, where I run parsers:
go fetcher.Parse(results)
each parser gathers data and passes it to the channel like this:
var (
results chan<- *ParsingResults
pageResults *ParsingResults
)
results <- pageResults
if pageResults.err != nil {
return
}
time.Sleep(p.provider.DelayBetweenPages)
and in a separate goroutine such a function is launched:
func (fetcher *Fetcher) waitForResults(ctx context.Context) {
for {
select {
case results := <-fetcher.resultsChannel:
provider := results.parser.GetProvider()
if results.err != nil {
common.Logger.Errorw("failed to fetch data from provider",
"provider", provider.Url,
"error", results.err)
continue
}
data := fetcher.removeDuplicates(results.data)
common.Logger.Infow("fetched some data",
"provider", provider.Url,
"rows_count", len(results.data),
"unique_rows_count", len(data))
_, err := fetcher.Repo.SaveFetchedData(ctx, data)
if err != nil {
common.Logger.Errorw("failed to save fetched data",
"provider", provider.Url,
"error", err)
continue
}
common.Logger.Infow("fetched data were saved successfully",
"provider", provider.Url,
"rows_count", len(results.data),
"unique_rows_count", len(data))
case <-ctx.Done():
return
default:
common.Logger.Infow("for debugging's sake! waiting for some data to arrive!")
}
}
}
the data is stored in the database in this function:
func (repo *Repository) SaveFetchedData(ctx context.Context, rows []*common.Data) (int64, error) {
if len(rows) == 0 {
return 0, nil
}
baseQB := sq.Insert(db.DataTableName).
Columns(saveFetchedDataCols...).
PlaceholderFormat(sq.Dollar)
batch := &pgx.Batch{}
for _, p := range rows {
curQB := baseQB.Values(p.Row1, p.Row2, sq.Expr("NOW()"))
curQuery, curArgs, err := curQB.ToSql()
if err != nil {
return 0, fmt.Errorf("failed to generate SQL query: %w", err)
}
batch.Queue(curQuery, curArgs...)
}
br := repo.pool.SendBatch(ctx, batch)
ct, err := br.Exec()
if err != nil {
return 0, fmt.Errorf("failed to run SQL query batch: %w", err)
}
return ct.RowsAffected(), nil
}