Get data from DB to channel


(YoungGopher) #1

Hi all!
I’m trying get data from table PSQL to a channel and as a result get some specific behaviour for me

in main.go

type TableStruct struct {
ID int
Name string
}

var DB *sql.DB

func init() {
	var err error
	dbinfo := fmt.Sprintf("user=%s password=%s dbname=%s sslmode=disable", user, password, dbname)
	DB, err = sql.Open("postgres", dbinfo)
	if err != nil {
		panic(err)
	}
	//defer DB.Close() - because i need all time connection to DB in program

	if err = DB.Ping(); err != nil {
		panic(err)
	}
}

func getAllFromTable() chan TableStruct{

	dev := make(chan TableStruct)
	rows, err := DB.Query("SELECT * FROM table;")
	if err != nil {
		panic(err)
	}
	defer rows.Close()

	for rows.Next() {
		newV:= TableStruct{}
		err := rows.Scan(&newV.Id, &newV.Name)
		if err != nil {
			panic(err)
		}
		dev <- newV
	}

	close(dev)
	if err = rows.Err(); err != nil {
		fmt.Println(err.Error())
	}

	return dev
}

func main() {
 t := make(chan TableStrcut)
 t = getAllFromTable() 
for v := range t {
 log.Println(n)
}
}

when i start program - terminal just wait something
when i’m trying
go func() {
for rows.Next() {
}
}()
nothing changes
What i do wrong?
When i set buffer value channel like
make(chan TableStruct, 100) - all work’s fine
Thanks!


(Christoph Berger) #2

Without spawning a goroutine, your code gets stuck at the point it starts writing to the channel.

The channel is not buffered, and there is no other goroutine running that reads from it, so the code cannot move on. This also explains why the code runs when you use a buffered channel instead.

I cannot comment on the goroutine version, as the goroutine code you posted contains just an empty for loop. I believe this is not the goroutine you were using—can you please check this part and post the complete goroutine?

Another point: getAllFromTable() closes rows in the moment it returns. You might want to keep rows open until the goroutine has read all rows. (The problem does not exist in the non-goroutine version. Here, all rows are written into the channel’s buffer before the defer statement closes rows.)

And when using a goroutine and a non-buffered channel you would need to avoid closing dev before returning it.

Sorry if only half of what I wrote makes sense, it’s 11pm over here :slight_smile:


(YoungGopher) #3

i’ve understand my problem

My func getAllFromTable() - create channel
and i’ll try write in to the channel all values from table
But!!! Because this channel unbuferred
all values wich came
must be in use
How to do this current with another func concurrently and correct?

I thinks, i must using select statement in main
but no understand whow correctly

For example:

func getAllFromTable() (c chan TableStruct) 
{
  go func() {
     for rows.Next() {
             ....
        c <- newelementFromTable

     }

}()

    return c
}


func toDoSomethingWithTableValue() {

//get value from channel and work with

}


func main() {

     select {
         case x := <- getAllFromTable():
             toDoSomethingWithTableValue(x)
         default:
                     time.Sleep(time.Second * 5) // do all this stuff with all values from table every 5 sec

//most important to do this all parallel 
//if in my table 100 rows every 5 sec work all of them 
not one

}

I see two ways
1 Create buffered channel
2 Work all with unbuffered // this i’l trying to do
Thanks!!!


(Christoph Berger) #4

You’re almost there! The range loop you used in your first post works just fine. In getAllFromTable(), you already create a goroutine, so you already have two separate lines of execution. Now all you need to do is to read the channel returned from getAllFromTable().

Here is an extremely simplified version, with rows.Next() replaced by a simple loop counter. Note that the goroutine closes the channel after writing all values. This way, the range loop knows when to stop.

package main

import "fmt"

func getAllFromTable() (c chan int) {
	c = make(chan int)
	go func() {
		for i := 1; i < 10; i++ {
			c <- i
		}
		close(c)

	}()

	return c
}

func main() {
	c := getAllFromTable()
	for n := range c {
		fmt.Println(n)
	}

}

(Playground link)


(YoungGopher) #5

Yes, all right thanks
But in main routine i have some problem

select {
    //program return error 
    //select assignment must have receive on right hand side
case tO := getAllFromTable():  
    // if write this 
     tO := <-getAllFromTable() // return only one value from channel and program wait
	for r := range tO {
		fmt.Println(r)
	}
default:
	fmt.Println("ok")
	time.Sleep(5 * time.Second)
}

and in this case

select {
case tO := <- getAllFromTable():
     // program wait and once print OK
	go func() { 
        //here i need call for each value goroutine to work
		fmt.Println(tO)
	}()
default:
	fmt.Println("ok")
	time.Sleep(5 * time.Second)
}

(YoungGopher) #6

Hello again!
I thinks i lost one thing
if i work with slice
point one: when send values to slice without go func() all works correctly but not parallel (i guess)
point two: when vlues send in go func() program exit
without printing info about slice

type TableStructOne struct {
ID   int
Name string
}  

func getAllFromTable() []TableStructOne {
var tSlice []TableStructOne
var newElem TableStructOne

rows, err := DB.Query("SELECT * FROM tableone")
if err != nil {
	//panic(err)
}
defer rows.Close()
//go func() {
	for rows.Next() {
		err := rows.Scan(&newElem.ID, &newElem.Name)
		if err != nil {
			//panic(err)
		}
		tSlice = append(tSlice, newElem)
	}
//}()
return tSlice
}

func main() {
k := getAllFromTable()
for _, r := range k {
	fmt.Println(r)
 }
 } 

point three: when i use channel - program not work in this simple example like with int values

func getAllFromTable() chan TableStructOne {
chTO := make(chan TableStructOne)
var newElem TableStructOne

rows, err := DB.Query("SELECT * FROM tableone")
if err != nil {
	//panic(err)
}
defer rows.Close()
go func() {
	for rows.Next() {
		err := rows.Scan(&newElem.ID, &newElem.Name)
		if err != nil {
			//panic(err)
		}
		chTO <- newElem
	}
}()
return chTO
}

func main() {
k := getAllFromTable()
for r := range k {
	fmt.Println(r)
     }
}

I’m stuck(


(Christoph Berger) #7

Consider the sequence of events. When you turn the for loop into a goroutine, then getAllFromTable() may return before the goroutine has finished (or even before it has started). And when getAllFromTable() returns, rows get closed, and the goroutine cannot read from rows anymore.
(At least this is what I think happens here, without having tested the code.)


(YoungGopher) #8

Good day!
Tell me please, how to get all values from table DB at the same time concurrently?
In my last variant i have this way

type Device struct {
Id int
Name string
UserId int
}

type Metric struct {
Deviceid   int
Metric     [5]int64
LocalTime  time.Time
ServerTime time.Time
}

var DB *pg.DB
var Dev = make(chan Device) _//global chan devices_
var Metr = make(chan Metric) _//metric it's like some signal from device_
var LastIDm int

func init() {

DB = pg.Connect(&pg.Options{
	User: "test",
	Password: "test",
	Database: "test",
	Addr:"localhost:5432",
})

}

 // Here i tried get all devices from table at the same time
func getAllDevices() {
 //but for loop not concurrent
 //10000 because in table 10 000 rows
 for i:=1; i<10000; i++ {
	go func(i int) {
		var newD Device
		_, err := DB.QueryOne(pg.Scan(&newD.Id, &newD.Name, &newD.UserId), "SELECT * from devices where id = ?", i)
		if err != nil {
		}
		Dev <- newD
	}(i)
}
}

//on info about device tried concurrently create new signals at the same time ich of 10 000 devices
func createMetrics(in Device) {
var newM Metric
for i:=0; i<len(newM.Metric); i++ {
	newM.Metric[i] = rand.Int63n(100)
}
newM.Deviceid = in.Id
newM.LocalTime = time.Now()
newM.ServerTime = time.Now()
Metr <- newM
}

func main() {

for {
	go getAllDevices()
	go createMetrics(<-Dev)
	time.Sleep(time.Second*1)

}
}

But i think all of this work not concurrent how i want
10 000 devices immediately at the same time create one / or several signals and also write every signal in new table
What i do wrong not concurrent?
Thanks!


(system) #9

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.