Async buffered Reader package?

I am looking for a package that will enable buffered asynchronous reading. While it is fairly trivial to implement I wanted to check if there was something out there before that.

bufio doesn’t fulfill this for me, since reads are only done when the buffer is empty, and therefore reading from the underlying reader blocks reads reads from the bufio.Reader.

I could imagine an interface like this:

// NewReader returns a reader that will asynchronously read from
// the supplied reader into a number of buffers each with a given size.
// The input can be read from the returned reader.
// When done use Close to release the buffers.
func NewReader(rd io.Reader, buffers, size int) io.ReadCloser 

Bonus points for a similar Writer, though I don’t need that right now.

Hi @klauspost, interesting question; I had fun thinking about this. I do not know of a package that does this and it definitely seems useful. If you have already implemented this, I’d love to see how.

I did have a question about your method signature. Why specify the number of buffers? Why not just just specify a size?

I just got directed elsewhere to bufioprop which can do a copy with async buffers, but not buffer a read input.

The number of buffers was to specify the read granularity. If the buffer is very big, input reads will take longer. For instance compare 1 big buffer with 4 smaller - when the Reader is called, then 2 of them may be ready while the next two are still running, and we do not have to wait for the “big” read to complete.

1 Like

Gotcha.

The method signature should really be driven by your use case, but just to float an alternative that might emphasize read granularity and total buffer size explicitly:

// NewReader returns a reader that will asynchronously read from rd 
// using reads of size readSize into a buffer of size bufferSizse. 
// The caller should use Close to release the buffer.
func NewReader(rd io.Reader, readSize, bufferSize int) io.ReadCloser

On the implementation size, you should be able to Read into different ranges of a single buffer and reuse the same buffer indefinitely as data is read from it w/o any additional allocation.

What do you think?

1 Like

My idea for the initial implementation was to have n buffers cycle in two channels of that size to avoid unneeded allocations, as well as avoid dealing with wrap-around.

I am not sure if you alternative signature makes more sense per se. Looking at it both of them can potentially give errors, so we probably should return that as well, but both are equally good.

Cool, makes sense. Well if your implementation makes it online, I’d be interested in checking it out!

Just did a quick and dirty implementation.

It could have a lot of small improvements, like filling the buffers better on short reads and check if there was already a new buffer ready before returning, but for my need it is already a significant improvement.

It could also have a Reset, for use cases where a lot of them are created.

I know it’s not a big deal, but on L31 you should replace if size <= 0 { with if buffers <= 0 {.

1 Like

No, thanks a lot. Obviously tests should also be added.

I fixed a few other small issues. Next up should be tests.

Another option would be to use a sync.Pool for buffer reuse which would let you drop the return channel. I don’t know if that would be better or worse.

This is something I’m working on as part of another package. It’s meant to be a component of a larger struct, wrapping the request in a Go routine. I’m planning to have a pool of these buffers and a separate pool of network connections.

Had to post a shortened link. There was an error about being a new user and having too many links in my post.

It could be, but that would only be optimal if the buffer sizes are the same.

I guess it would be an improvement if you cycle through many hundreds of these buffers per second, but at least for my use case that isn’t the case.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.