* fix: stream return EOF when openai return error * perf: add error accumulator * fix: golangci-lint * fix: unmarshal error possibly null * fix: error accumulator * test: error accumulator use interface and add test code * test: error accumulator add test code * refactor: use stream reader to re-use stream code * refactor: stream reader use generics
47 lines
1.2 KiB
Go
47 lines
1.2 KiB
Go
package openai
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"errors"
|
|
)
|
|
|
|
var (
|
|
ErrTooManyEmptyStreamMessages = errors.New("stream has sent too many empty messages")
|
|
)
|
|
|
|
type CompletionStream struct {
|
|
*streamReader[CompletionResponse]
|
|
}
|
|
|
|
// CreateCompletionStream — API call to create a completion w/ streaming
|
|
// support. It sets whether to stream back partial progress. If set, tokens will be
|
|
// sent as data-only server-sent events as they become available, with the
|
|
// stream terminated by a data: [DONE] message.
|
|
func (c *Client) CreateCompletionStream(
|
|
ctx context.Context,
|
|
request CompletionRequest,
|
|
) (stream *CompletionStream, err error) {
|
|
request.Stream = true
|
|
req, err := c.newStreamRequest(ctx, "POST", "/completions", request)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
resp, err := c.config.HTTPClient.Do(req) //nolint:bodyclose // body is closed in stream.Close()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
stream = &CompletionStream{
|
|
streamReader: &streamReader[CompletionResponse]{
|
|
emptyMessagesLimit: c.config.EmptyMessagesLimit,
|
|
reader: bufio.NewReader(resp.Body),
|
|
response: resp,
|
|
errAccumulator: newErrorAccumulator(),
|
|
unmarshaler: &jsonUnmarshaler{},
|
|
},
|
|
}
|
|
return
|
|
}
|