Skip to content

Commit 4aa4d61

Browse files
mgilbirscritchley
authored andcommitted
Add support for zlib compression (#14)
* Add support for zlib compression * Address comments in PR. Use constant for Zlib default chunk size and use proper fmt printing verbs * Fix maximum chunkLength check to ensure that the value to be used in the header remains withing the available field bounds
1 parent 7bcca8a commit 4aa4d61

6 files changed

+735
-80
lines changed

buffererwriter.go

Lines changed: 135 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,66 +1,173 @@
11
package orc
22

33
import (
4-
"bufio"
54
"bytes"
5+
"fmt"
6+
"io"
7+
"sync"
68
)
79

8-
// BufferedWriter wraps a *bufio.Writer and records the current
9-
// position of the writer prior to flushing to the underlying
10-
// writer.
1110
type BufferedWriter struct {
12-
*bufio.Writer
13-
*bytes.Buffer
14-
codec CompressionCodec
15-
checkpoint uint64
16-
written uint64
11+
uncompressedBuffer *bytes.Buffer
12+
encodedBuffer *bytes.Buffer
13+
codec CompressionCodec
14+
chunkSize int
15+
checkpoint uint64
16+
written uint64
17+
sync.Mutex
1718
}
1819

1920
// NewBufferedWriter returns a new BufferedWriter using the provided
2021
// CompressionCodec.
2122
func NewBufferedWriter(codec CompressionCodec) *BufferedWriter {
22-
buf := &bytes.Buffer{}
23+
chunkSize := 1024
24+
switch codec.(type) {
25+
case CompressionNone:
26+
chunkSize = 1
27+
case CompressionZlib:
28+
chunkSize = int(DefaultCompressionChunkSize)
29+
}
2330
return &BufferedWriter{
24-
codec: codec,
25-
Writer: bufio.NewWriterSize(
26-
codec.Encoder(buf),
27-
int(DefaultCompressionChunkSize),
28-
),
29-
Buffer: buf,
31+
codec: codec,
32+
uncompressedBuffer: &bytes.Buffer{},
33+
encodedBuffer: &bytes.Buffer{},
34+
chunkSize: chunkSize,
3035
}
3136
}
3237

33-
// WriteByte writes a byte to the underlying buffer an increments the total
34-
// number of bytes written.
38+
// WriteByte writes a byte to the underlying buffer.
39+
// If the desired chunk size is reached, the buffer is compressed
3540
func (b *BufferedWriter) WriteByte(c byte) error {
36-
b.written++
37-
return b.Writer.WriteByte(c)
41+
b.Lock()
42+
defer b.Unlock()
43+
44+
if b.uncompressedBuffer.Len() == b.chunkSize {
45+
err := b.spill()
46+
if err != nil {
47+
return err
48+
}
49+
}
50+
_, err := b.uncompressedBuffer.Write([]byte{c})
51+
return err
3852
}
3953

40-
// Write writes the provided byte slice to the underlying buffer an increments
41-
// the total number of bytes written.
54+
// Write writes the provided byte slice to the underlying buffer.
55+
// If the desired chunk size is reached, the buffer is compressed
4256
func (b *BufferedWriter) Write(p []byte) (int, error) {
43-
b.written += uint64(len(p))
44-
return b.Writer.Write(p)
57+
b.Lock()
58+
defer b.Unlock()
59+
pos := 0
60+
61+
var remaining int
62+
l := len(p)
63+
c := b.chunkSize - b.uncompressedBuffer.Len()
64+
if c > l {
65+
remaining = l
66+
} else {
67+
remaining = c
68+
}
69+
70+
n, err := b.uncompressedBuffer.Write(p[pos : pos+remaining])
71+
if err != nil {
72+
return 0, err
73+
}
74+
pos += n
75+
l -= n
76+
77+
for l != 0 {
78+
if err := b.spill(); err != nil {
79+
return 0, err
80+
}
81+
82+
c = b.chunkSize - b.uncompressedBuffer.Len()
83+
if c > l {
84+
remaining = l
85+
} else {
86+
remaining = c
87+
}
88+
89+
n, err = b.uncompressedBuffer.Write(p[pos : pos+remaining])
90+
if err != nil {
91+
return 0, err
92+
}
93+
pos += n
94+
l -= n
95+
}
96+
97+
return pos, nil
98+
}
99+
100+
// spill to the encoder to handle the compression and update the number of
101+
// written bytes to the encoded buffer
102+
func (b *BufferedWriter) spill() error {
103+
encoder := b.codec.Encoder(b.encodedBuffer)
104+
l := b.uncompressedBuffer.Len()
105+
n, err := io.Copy(encoder, b.uncompressedBuffer)
106+
if err != nil {
107+
return err
108+
}
109+
if int(n) != l {
110+
return fmt.Errorf("Expected to write %d bytes, wrote %d", l, n)
111+
}
112+
113+
err = encoder.Close()
114+
if err != nil {
115+
return err
116+
}
117+
b.written += uint64(n)
118+
return nil
45119
}
46120

47121
func (b *BufferedWriter) Positions() []uint64 {
122+
b.Lock()
123+
defer b.Unlock()
124+
125+
//TODO: Do we still need the checkpoint?
48126
switch b.codec.(type) {
49127
case CompressionNone:
50128
checkpoint := b.checkpoint
51129
b.checkpoint = b.written
52130
return []uint64{checkpoint}
53131
default:
54-
return nil
132+
//TODO: check if this is correct
133+
checkpoint := b.checkpoint
134+
b.checkpoint = b.written
135+
return []uint64{checkpoint}
136+
137+
// return nil
55138
}
56139
}
57140

141+
func (b *BufferedWriter) Flush() error {
142+
b.Lock()
143+
defer b.Unlock()
144+
145+
return b.spill()
146+
}
147+
148+
func (b *BufferedWriter) Read(p []byte) (int, error) {
149+
b.Lock()
150+
defer b.Unlock()
151+
152+
return b.encodedBuffer.Read(p)
153+
}
154+
155+
func (b *BufferedWriter) Len() int {
156+
b.Lock()
157+
defer b.Unlock()
158+
159+
return b.encodedBuffer.Len()
160+
}
161+
58162
// Close flushes any buffered bytes to the underlying writer.
59163
func (b *BufferedWriter) Close() error {
60-
return b.Writer.Flush()
164+
return b.spill()
61165
}
62166

63-
// Reset resets the underlying bytes.Buffer.
167+
// Reset resets the underlying encoded buffer
64168
func (b *BufferedWriter) Reset() {
65-
b.Buffer.Reset()
169+
b.Lock()
170+
defer b.Unlock()
171+
172+
b.encodedBuffer.Reset()
66173
}

0 commit comments

Comments
 (0)