Support for buffer retention and include example.
This commit introduces ``DelimitedBuffer`` type, which functions as
a reusable buffer for message encoding and decoding. A strong
critique of the old implementation is that it carelessly allocated
ephemeral buffers, so this is a means of alleviating problems around
that.
diff --git a/README.md b/README.md
index dc0c92f..d975f9d 100644
--- a/README.md
+++ b/README.md
@@ -3,10 +3,48 @@
language (golang), namely support for record length-delimited message
streaming.
-Until [Code Review 9102043](https://codereview.appspot.com/9102043/) is merged, this repository will exist here in the wild.
+# Installing
+
+ $ go get github.com/matttproud/golang_protobuf_extensions/ext
+
+# Example
+
+```go
+package main
+
+import (
+ "io"
+
+ "code.google.com/p/goprotobuf/proto"
+ "github.com/matttproud/golang_protobuf_extensions/ext"
+)
+
+func main() {
+ // You have your pre-populated Protocol Buffer messages. Yay!
+ msgs := []proto.Message{firstMsg, secondMsg, thirdMsg}
+
+ // Destination for writing.
+ buf := new(ext.DelimitedBuffer)
+
+ for _, m := range msgs {
+ buf.Marshal(m) // Write each out.
+ m.Reset() // Clear the message, since we'll read it back in.
+ }
+
+ for _, m := range msgs {
+ _, err := buf.Unmarshal(m) // Read each in ...
+ if err == io.EOF { // until we hit EOF or
+ break
+ } else if err != nil { // encounter an error.
+ panic(err)
+ }
+ }
+}
+```
# Documentation
-We have [generated Go Doc documentation](http://godoc.org/github.com/matttproud/golang_protobuf_extensions/ext) here.
+We have [generated Go Doc documentation](
+http://godoc.org/github.com/matttproud/golang_protobuf_extensions/ext) here.
# Testing
[](https://travis-ci.org/matttproud/golang_protobuf_extensions)
diff --git a/ext/all_test.go b/ext/all_test.go
index f43c901..1e98e2f 100644
--- a/ext/all_test.go
+++ b/ext/all_test.go
@@ -16,6 +16,7 @@
import (
"bytes"
+ "io"
"testing"
"testing/quick"
@@ -39,18 +40,23 @@
reference, err := Marshal(input)
if err != nil {
+ t.Fatal(err)
return false
}
- var buffer bytes.Buffer
+ var buf DelimitedBuffer
var written int
- for i := 0; i < x%100; i++ {
- n, err := WriteDelimited(&buffer, input)
+ ents := x % 100
+
+ for i := 0; i < ents; i++ {
+ n, err := buf.Marshal(input)
if err != nil {
+ t.Fatal(err)
return false
}
if n < len(reference) {
+ t.Fatal(err)
return false
}
@@ -59,19 +65,22 @@
var read int
- for i := 0; i < x%100; i++ {
- output := &GoTest{}
- n, err := ReadDelimited(&buffer, output)
+ for i := 0; i < ents; i++ {
+ output := new(GoTest)
+ n, err := buf.Unmarshal(output)
if err != nil {
+ t.Fatal(err)
return false
}
raw, err := Marshal(output)
if err != nil {
+ t.Fatal(err)
return false
}
if !bytes.Equal(reference, raw) {
+ t.Fatal("not equal")
return false
}
@@ -79,6 +88,7 @@
}
if written != read {
+ t.Fatalf("read != written %d %d %d ", written, read, ents)
return false
}
@@ -89,3 +99,137 @@
t.Error(err)
}
}
+
+func BenchmarkRawMarshal(b *testing.B) {
+ m := initGoTest(true)
+ buf := new(bytes.Buffer)
+ for i := 0; i < b.N; i++ {
+ WriteDelimited(buf, m)
+ buf.Reset()
+ }
+
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ WriteDelimited(buf, m)
+ if len(buf.Bytes()) != 184 {
+ b.Fatalf("unexpected length: %d", len(buf.Bytes()))
+ }
+ buf.Reset()
+ }
+}
+
+func BenchmarkReusedMarshal(b *testing.B) {
+ m := initGoTest(true)
+ bb := new(DelimitedBuffer)
+ for i := 0; i < b.N; i++ {
+ bb.Marshal(m)
+ bb.Clear()
+ }
+
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ bb.Marshal(m)
+ if len(bb.Bytes()) != 184 {
+ b.Fatalf("unexpected length: %d", len(bb.Bytes()))
+ }
+ bb.Clear()
+ }
+}
+
+func BenchmarkRawUnmarshal(b *testing.B) {
+ m := initGoTest(true)
+ buf := new(bytes.Buffer)
+ WriteDelimited(buf, m)
+ out := buf.Bytes()
+ d := new(GoTest)
+ bufs := make([]*bytes.Buffer, b.N)
+ for i := 0; i < b.N; i++ {
+ bufs[i] = bytes.NewBuffer(out)
+ }
+ for i := 0; i < b.N; i++ {
+ ReadDelimited(bufs[i], d)
+ }
+ for i := 0; i < b.N; i++ {
+ bufs[i] = bytes.NewBuffer(out)
+ }
+
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ ReadDelimited(bufs[i], d)
+ }
+}
+
+func BenchmarkReusedUnmarshal(b *testing.B) {
+ m := initGoTest(true)
+ buf := new(bytes.Buffer)
+ WriteDelimited(buf, m)
+ out := buf.Bytes()
+ d := new(GoTest)
+ dec := new(DelimitedBuffer)
+ bufs := make([][]byte, b.N)
+ for i := 0; i < b.N; i++ {
+ bufs[i] = out
+ }
+ for i := 0; i < b.N; i++ {
+ dec.SetBuf(bufs[i])
+ if _, err := dec.Unmarshal(d); err != nil {
+ b.Fatal(err)
+ }
+ }
+
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ dec.SetBuf(bufs[i])
+ if _, err := dec.Unmarshal(d); err != nil {
+ b.Fatal(err)
+ }
+ }
+}
+
+func BenchmarkReusedUnmarshalRepeating(b *testing.B) {
+ m := initGoTest(true)
+ buf := new(bytes.Buffer)
+ for i := 0; i < b.N; i++ {
+ WriteDelimited(buf, m)
+ }
+ out := buf.Bytes()
+ d := new(GoTest)
+ dec := new(DelimitedBuffer)
+ dec.SetBuf(out)
+ deced := 0
+outer:
+ for {
+ _, err := dec.Unmarshal(d)
+ deced++
+ switch err {
+ case io.EOF:
+ break outer
+ case nil:
+ continue
+ default:
+ b.Fatal(err)
+ }
+ }
+
+ b.ResetTimer()
+
+ dec.SetBuf(out)
+ deced = 0
+outer2:
+ for {
+ _, err := dec.Unmarshal(d)
+ deced++
+ switch err {
+ case io.EOF:
+ break outer2
+ case nil:
+ continue
+ default:
+ b.Fatal(err)
+ }
+ }
+}
diff --git a/ext/coding.go b/ext/coding.go
new file mode 100644
index 0000000..ffd1415
--- /dev/null
+++ b/ext/coding.go
@@ -0,0 +1,118 @@
+// Copyright 2014 Matt T. Proud
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package ext
+
+import (
+ "encoding/binary"
+ "errors"
+ "io"
+
+ "code.google.com/p/goprotobuf/proto"
+)
+
+// DelimitedBuffer provides varint record length-delimited Protocol Buffer
+// message encoding, with the feature that all internal buffers are reused to
+// reduce memory usage overhead. The type is not goroutine safe.
+type DelimitedBuffer struct {
+ msgBuf proto.Buffer
+ buf []byte
+ headBuf []byte
+}
+
+// Marshal encodes a Protocol Buffer message to this DelimitedBuffer's internal
+// byte buffer, with the binary message prefixed by the varint encoded size of
+// the written message. It returns the number of bytes written to the buffer
+// along with any error it may have encountered.
+func (b *DelimitedBuffer) Marshal(m proto.Message) (n int, err error) {
+ if err = b.msgBuf.Marshal(m); err != nil {
+ return 0, err
+ }
+ if len(b.headBuf) < binary.MaxVarintLen64 {
+ b.headBuf = make([]byte, binary.MaxVarintLen64)
+ }
+ msg := b.msgBuf.Bytes()
+ msgLen := len(msg)
+ headLen := binary.PutUvarint(b.headBuf, uint64(msgLen))
+ b.buf = append(b.buf, b.headBuf[0:headLen]...)
+ b.buf = append(b.buf, msg...)
+ return headLen + msgLen, nil
+}
+
+var ErrTruncMsg = errors.New("truncated message")
+
+// Unmarshal decodes a Protocol Buffer message from this DelimitedBuffer's
+// internal and writes it to the provided message. A read advances the internal
+// buffer position accordingly. It returns the number of bytes written to the
+// buffer along with any error it may have encountered.
+func (b *DelimitedBuffer) Unmarshal(m proto.Message) (n int, err error) {
+ // Per AbstractParser#parsePartialDelimitedFrom with
+ // CodedInputStream#readRawVarint32.
+ bufLen := len(b.buf)
+ if bufLen == 0 || int(b.buf[0]) == -1 {
+ return 0, io.EOF
+ }
+ scanLen := binary.MaxVarintLen64
+ if bufLen < scanLen {
+ scanLen = bufLen
+ }
+ msgLen, syncLen := proto.DecodeVarint(b.buf)
+ b.buf = b.buf[syncLen:]
+ if len(b.buf)-int(msgLen) < 0 {
+ return syncLen, ErrTruncMsg
+ }
+ wind := b.buf[0:msgLen]
+ b.buf = b.buf[msgLen:]
+ err = proto.Unmarshal(wind, m)
+ if err != nil {
+ return syncLen, err
+ }
+ return syncLen + int(msgLen), nil
+}
+
+// Bytes yields the internal byte array buffer for this instance.
+func (b *DelimitedBuffer) Bytes() []byte {
+ return b.buf
+}
+
+// SetBuf replaces the internal buffer with the provided slice, thereby enabling
+// the DelimitedWriter to either begin decoding from its initial position or
+// begin writing thereto.
+func (b *DelimitedBuffer) SetBuf(buf []byte) {
+ b.buf = buf
+}
+
+// Clear clears the internal buffer, thereby deleting any values written to it.
+func (b *DelimitedBuffer) Clear() {
+ b.msgBuf.Reset()
+ b.buf = b.buf[0:0]
+}
+
+// Reset returns the buffer to a state similar to that of a new instance. The
+// use case for this, which differs from Clear, is that you may have encoded an
+// unusually large Protocol Buffer message and want to signal to the memory
+// manager that the large buffer should be reaped eventually.
+func (b *DelimitedBuffer) Reset() {
+ b.msgBuf = proto.Buffer{}
+ b.buf = nil
+}
+
+// NewDelimitedBuffer allocates a new DelimitedBuffer and initializes its
+// internal buffer with the contents of the provided slice. Any writes to
+// this DelimitedBuffer will be written to the provided buffer as well.
+func NewDelimitedBuffer(buf []byte) *DelimitedBuffer {
+ return &DelimitedBuffer{
+ buf: buf,
+ }
+}
diff --git a/ext/decode.go b/ext/decode.go
index 941db73..1e508fb 100644
--- a/ext/decode.go
+++ b/ext/decode.go
@@ -25,7 +25,11 @@
// ReadDelimited decodes a message from the provided length-delimited stream,
// where the length is encoded as 32-bit varint prefix to the message body.
// It returns the total number of bytes read and any applicable error.
+//
+// This API is slated for removal.
func ReadDelimited(r io.Reader, m proto.Message) (n int, err error) {
+ deprReadDelimited()
+
// Per AbstractParser#parsePartialDelimitedFrom with
// CodedInputStream#readRawVarint32.
buffer := make([]byte, binary.MaxVarintLen32)
diff --git a/ext/deprec.go b/ext/deprec.go
new file mode 100644
index 0000000..812811c
--- /dev/null
+++ b/ext/deprec.go
@@ -0,0 +1,39 @@
+// Copyright 2014 Matt T. Proud
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package ext
+
+import (
+ "log"
+ "sync"
+)
+
+func deprWarning(n string) func() {
+ return func() {
+ log.Printf("function %s is slated for deletion and will go away June, 20 2014", n)
+ }
+}
+
+var writeWarn = deprWarning("WriteDelimited")
+var readWarn = deprWarning("ReadDelimited")
+
+var readOnce, writeOnce sync.Once
+
+func deprWriteDelimited() {
+ writeOnce.Do(writeWarn)
+}
+
+func deprReadDelimited() {
+ readOnce.Do(readWarn)
+}
diff --git a/ext/encode.go b/ext/encode.go
index d92336d..887ce97 100644
--- a/ext/encode.go
+++ b/ext/encode.go
@@ -26,7 +26,11 @@
// a length-delimited record stream, which can be used to chain together
// encoded messages of the same type together in a file. It returns the total
// number of bytes written and any applicable error.
+//
+// This API is slated for removal.
func WriteDelimited(w io.Writer, m proto.Message) (n int, err error) {
+ deprWriteDelimited()
+
buffer, err := proto.Marshal(m)
if err != nil {
return 0, err