Hello,
im a starter in programming. so my code is pretty terrible.
and i think i have no idea what im doing.
Goal: multi threaded bulk importer into clickhouse database.
i want to STDIN json objects. and depening on the insert rate make batch query’s
add multithreading.
auto scale up and down depends on the stdin’s per second.
i desire to do 1 query’s per second
current issues:
not multi threaded
for eatch line i do insert into. i think its better to make on huge insert query.
so far i have 2 pieces of code:
measure stdin per second:
// Count lines of input per second on stdin
package main
import (
"bufio"
"fmt"
"os"
"time"
"flag"
)
func readLines(c chan int) {
count := 0
bio := bufio.NewReader(os.Stdin)
for {
_, more, err := bio.ReadLine()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if more {
continue
}
count += 1
c <- count
}
}
func main() {
var d time.Duration
flag.DurationVar(&d, "i", time.Second, "Update interval")
flag.Parse()
line := 0
count := 0
c := make(chan int)
tick := time.Tick(d)
go readLines(c)
for {
select {
// print counts
case <-tick:
fmt.Println(float64(line-count)/d.Seconds(), "/sec")
count = line
// update counts
case line = <-c:
}
}
}
my importer so far:
package main
import (
"bufio"
"database/sql"
"encoding/json"
"fmt"
"log"
"os"
"strings"
// "time"
"github.com/kshvakov/clickhouse"
)
func main() {
connect, err := sql.Open("clickhouse", "tcp://1.1.1.1:9000?database=access_log&debug=false")
if err != nil {
log.Fatal(err)
}
if err := connect.Ping(); err != nil {
if exception, ok := err.(*clickhouse.Exception); ok {
fmt.Printf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
} else {
fmt.Println(err)
}
return
}
const bufferLength = 40000
const numColumns = 12
const columnQM = "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
stmtQuery := "insert into access_log (date, server_addr, hostname, remote_addr, request_time, scheme, server_protocol, status, request, request_method, http_referer, http_user_agent) VALUES"
for i := 0; i < 1+0*bufferLength; i++ {
if i > 0 {
stmtQuery += ","
}
stmtQuery += columnQM
}
fmt.Println(stmtQuery)
var rowData [bufferLength][]interface{}
/*
for i := 0; i < bufferLength*numColumns; i++ {
rowData = append(rowData, "0")
}
*/
rowDataIndex := 0
reader := bufio.NewReader(os.Stdin)
for {
text, _ := reader.ReadString('\n')
strings.Replace(text, "\n", "\\n", -1)
strings.Replace(text, "\r", "\\r", -1)
var v map[string]interface{}
if err := json.Unmarshal([]byte(text), &v); err != nil {
log.Println(err)
continue
}
/*
fmt.Println(v["time"])
fmt.Println(v["server_addr"])
fmt.Println(v["hostname"])
fmt.Println(v["remote_addr"])
fmt.Println(v["request_time"])
fmt.Println(v["scheme"])
fmt.Println(v["server_protocol"])
fmt.Println(v["status"])
fmt.Println(v["request"])
fmt.Println(v["request_method"])
fmt.Println(v["http_referrer"])
fmt.Println(v["http_user_agent"])
continue;
*/
rowData[rowDataIndex] = []interface{}{
v["time"].(string),
v["server_addr"].(string),
v["hostname"].(string),
v["remote_addr"].(string),
v["request_time"].(string),
v["scheme"].(string),
v["server_protocol"].(string),
v["status"].(string),
v["request"].(string),
v["request_method"].(string),
v["http_referer"].(string),
v["http_user_agent"].(string)}
rowDataIndex++
if rowDataIndex == bufferLength {
tx, _ := connect.Begin()
stmt, _ := tx.Prepare(stmtQuery)
for i := 0; i < bufferLength; i++ {
if _, err := stmt.Exec(rowData[i]...); err != nil {
log.Println(err)
}
}
if err := tx.Commit(); err != nil {
log.Println(err)
}
rowDataIndex = 0
}
}
}