Hi,
I am writing a go code with two functions readAndSendToCollector and collectAndPushToStream.
func readAndSendToCollector(w http.ResponseWriter, req *http.Request) {
b, _ := ioutil.ReadAll(req.Body)
bytesReader := bytes.NewReader(b)
bufReader := bufio.NewReader(bytesReader)
value, _, _ := bufReader.ReadLine()
if len(value) != 0 {
if debug || fine {
fmt.Printf("%s\n", time.Now().Format("2006-01-02 15:04:05")+" Reading the request payload")
}
mEntry := new(streaming.Entry)
mEntry.Key = []byte("dkey")
mEntry.Value = value
chanMessage <- *mEntry
}
func collectAndPushToStream(chanMessage chan streaming.Entry) {
wg.Add(1)
defer wg.Done()
var pmdEntry []streaming.PutMessagesDetailsEntry = make([]streaming.Entry, 0, 100)
var MAX_PAYLOAD_SIZE = 1000000 //1mb
var totalPayloadSize = 0
var completeBytesTransferred = 0
for mEntry := range chanMessage {
//fmt.Printf("key = %s Value = %s\n", mEntry.Key, mEntry.Value)
entryPayloadSize := len(mEntry.Key) + len(mEntry.Value)
if (totalPayloadSize + entryPayloadSize) > MAX_PAYLOAD_SIZE {
pushMessagesToStream(pmdEntry)
completeBytesTransferred = completeBytesTransferred + totalPayloadSize
fmt.Println("totalPayloadSize = " + strconv.Itoa(totalPayloadSize))
fmt.Println("totalMessages = " + strconv.Itoa(len(pmdEntry)))
fmt.Println("completeBytesTransferred = " + strconv.Itoa(completeBytesTransferred))
totalPayloadSize = 0 //reset the payload size
pmdEntry = pmdEntry[:0]
}
pmdEntry = append(pmdEntry, mEntry)
totalPayloadSize = totalPayloadSize + entryPayloadSize
}
}
Here func readAndSendToCollector reads the payload from the http requests and pushes it to the channel chanMessage
And collectAndPushToStream reads the channel chanMessage and collects 1MB of data and pushes it to the Stream.
Sometimes there may less than 1MB of data available and I want to wait for 10 seconds and if there are no more data in the channel, I want to push the data available to the stream.
How do i wait for 10 seconds and check if there are any data in the channel?