Clickhouse Bulk import

Hello,

im a starter in programming. so my code is pretty terrible.

and i think i have no idea what im doing.

Goal: multi threaded bulk importer into clickhouse database.
i want to STDIN json objects. and depening on the insert rate make batch query’s
add multithreading.
auto scale up and down depends on the stdin’s per second.
i desire to do 1 query’s per second

current issues:
not multi threaded
for eatch line i do insert into. i think its better to make on huge insert query.

so far i have 2 pieces of code:

measure stdin per second:

// Count lines of input per second on stdin
package main

import (
    "bufio"
    "fmt"
    "os"
    "time"
    "flag"
)

func readLines(c chan int) {
    count := 0
    bio := bufio.NewReader(os.Stdin)
    for {
            _, more, err := bio.ReadLine()
            if err != nil {
                    fmt.Println(err)
                    os.Exit(1)
            }
            if more {
                    continue
            }
            count += 1
            c <- count
    }
}

func main() {
    var d time.Duration
    flag.DurationVar(&d, "i", time.Second, "Update interval")
    flag.Parse()
    line := 0
    count := 0
    c := make(chan int)
    tick := time.Tick(d)
    go readLines(c)

    for {
            select {
            // print counts
            case <-tick:
                    fmt.Println(float64(line-count)/d.Seconds(), "/sec")
                    count = line
            // update counts
            case line = <-c:
            }
    }
}

my importer so far:

package main

import (
"bufio"
"database/sql"
"encoding/json"
"fmt"
"log"
"os"
"strings"
//      "time"
"github.com/kshvakov/clickhouse"
)

func main() {

connect, err := sql.Open("clickhouse", "tcp://1.1.1.1:9000?database=access_log&debug=false")
if err != nil {
	log.Fatal(err)
}
if err := connect.Ping(); err != nil {
	if exception, ok := err.(*clickhouse.Exception); ok {
		fmt.Printf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
	} else {
		fmt.Println(err)
	}
	return
}

const bufferLength = 40000
const numColumns = 12
const columnQM = "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
stmtQuery := "insert into access_log (date, server_addr, hostname, remote_addr, request_time, scheme, server_protocol, status, request, request_method, http_referer, http_user_agent) VALUES"
for i := 0; i < 1+0*bufferLength; i++ {
	if i > 0 {
		stmtQuery += ","
	}
	stmtQuery += columnQM
}

fmt.Println(stmtQuery)

var rowData [bufferLength][]interface{}

/*
	for i := 0; i < bufferLength*numColumns; i++ {
	    rowData = append(rowData, "0")
	}
*/

rowDataIndex := 0

reader := bufio.NewReader(os.Stdin)

for {
	text, _ := reader.ReadString('\n')
	strings.Replace(text, "\n", "\\n", -1)
	strings.Replace(text, "\r", "\\r", -1)

	var v map[string]interface{}
	if err := json.Unmarshal([]byte(text), &v); err != nil {
		log.Println(err)
		continue
	}

	/*
		fmt.Println(v["time"])
		fmt.Println(v["server_addr"])
		fmt.Println(v["hostname"])
		fmt.Println(v["remote_addr"])
		fmt.Println(v["request_time"])
		fmt.Println(v["scheme"])
		fmt.Println(v["server_protocol"])
		fmt.Println(v["status"])
		fmt.Println(v["request"])
		fmt.Println(v["request_method"])
		fmt.Println(v["http_referrer"])
		fmt.Println(v["http_user_agent"])

		continue;
	*/
	rowData[rowDataIndex] = []interface{}{
		v["time"].(string),
		v["server_addr"].(string),
		v["hostname"].(string),
		v["remote_addr"].(string),
		v["request_time"].(string),
		v["scheme"].(string),
		v["server_protocol"].(string),
		v["status"].(string),
		v["request"].(string),
		v["request_method"].(string),
		v["http_referer"].(string),
		v["http_user_agent"].(string)}

	rowDataIndex++

	if rowDataIndex == bufferLength {
		tx, _ := connect.Begin()
		stmt, _ := tx.Prepare(stmtQuery)
		for i := 0; i < bufferLength; i++ {
			if _, err := stmt.Exec(rowData[i]...); err != nil {
				log.Println(err)
			}
		}
		if err := tx.Commit(); err != nil {
			log.Println(err)
		}
		rowDataIndex = 0
	}

}
}

im a starter in programming. so my code is pretty terrible.

Your importer compiles. The code looks like it should work; I can’t test it because I don’t have your JSON input or the database setup.

Working code is, by default, not terrible.

If I were given your code to work on, I would use the following process:

1. Make it serial

By serial, I mean that a single unmarshal is followed directly by a single insert.

The most important aspect of any code is that it works correctly. In this case, a serial version is the easiest to get correct. Multithreading, batch queries, and rate limiting are features that can be added to a correct serial version.

2. Make it correct

You need someway to know that everything is imported correctly. Figure out how to test this.

Testing probably includes using a known input and confirming that everything from that input ended in the database. If at all possible, make this testing automated. You will need to confirm that your later versions also work correctly.

3. Improve the serial version

Your program will be at its simplest form now. Adding rate limiting, etc. will only make things more complicated, making it harder to change. Some improvements you could make are:

  • use json.Decoder instead of json.Unmarshal. This will let you get rid of your line splitting code as Decoder handles that correctly.
  • Decode (unmarshal) into map[string]string instead of map[string]interface{} if your JSON allows you to do so. This will remove the need for type assertions.
  • Make sure invalid data is handled correctly (e.g., a JSON object is missing the hostname field).
  • Handle every error; don’t ignore them!
  • your transaction , tx, is always committed. If there is no possibility for it to rollback, you don’t need it.
  • Don’t do any work that doesn’t need to be done. Use your tests from the previous step

Make the code as simple and as easy to understand as possible; using your tests to make sure it it still correct.

The code will only get more complicated after this, so do your best. Feel free to post your code at this point and I will be happy to look it over.

4. Make it concurrent

Split the code into two parts:

  1. Decode JSON from Stdin (decoder)
  2. Insert into DB (inserter)

These can be ran concurrently, with the decoder sending rows to the inserter over a channel. The inserter will insert the row into the DB as soon as it is received.

At this point the program will be effectively serial, but include the structure needed to run in parallel (that is goroutines and channels). This will be slower than the serial version, but should speed up in the next step.

The hard part here is making sure that everything is inserted before the program exits. Use your tests to make sure this happens.

5. Batching & Rate Limiting

I see these two features as related. The only part of the implementation that will change is the inserter.

  • instead of directly inserting when a row is received, append the row to a slice of rows
  • use time.Tick to get notified (over a channel) every time a time.Second has elapsed
  • every time a tick is received, insert the queued rows and clear the queue

The general pattern for receiving from multiple channels indefinitely is:

for {
	select {
	// …
	}
} 

Make sure your importer still works.


Hope this helps. Please ask for help when you run into problems you can’t solve. This is more a general idea of how I would approach things than a tutorial.

3 Likes