feat: support batches api (#746)

* feat: support batches api

* update batch_test.go

* fix golangci-lint check

* fix golangci-lint check

* fix tests coverage

* fix tests coverage

* fix tests coverage

* fix tests coverage

* fix tests coverage

* fix tests coverage

* fix tests coverage

* fix: create batch api

* update batch_test.go

* feat: add `CreateBatchWithUploadFile`

* feat: add `UploadBatchFile`

* optimize variable and type naming

* expose `BatchLineItem` interface

* update batches const
This commit is contained in:
eiixy
2024-06-13 23:24:37 +08:00
committed by GitHub
parent c69c3bb1d2
commit 99cc170b54
4 changed files with 655 additions and 0 deletions

275
batch.go Normal file
View File

@@ -0,0 +1,275 @@
package openai
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
)
const batchesSuffix = "/batches"
type BatchEndpoint string
const (
BatchEndpointChatCompletions BatchEndpoint = "/v1/chat/completions"
BatchEndpointCompletions BatchEndpoint = "/v1/completions"
BatchEndpointEmbeddings BatchEndpoint = "/v1/embeddings"
)
type BatchLineItem interface {
MarshalBatchLineItem() []byte
}
type BatchChatCompletionRequest struct {
CustomID string `json:"custom_id"`
Body ChatCompletionRequest `json:"body"`
Method string `json:"method"`
URL BatchEndpoint `json:"url"`
}
func (r BatchChatCompletionRequest) MarshalBatchLineItem() []byte {
marshal, _ := json.Marshal(r)
return marshal
}
type BatchCompletionRequest struct {
CustomID string `json:"custom_id"`
Body CompletionRequest `json:"body"`
Method string `json:"method"`
URL BatchEndpoint `json:"url"`
}
func (r BatchCompletionRequest) MarshalBatchLineItem() []byte {
marshal, _ := json.Marshal(r)
return marshal
}
type BatchEmbeddingRequest struct {
CustomID string `json:"custom_id"`
Body EmbeddingRequest `json:"body"`
Method string `json:"method"`
URL BatchEndpoint `json:"url"`
}
func (r BatchEmbeddingRequest) MarshalBatchLineItem() []byte {
marshal, _ := json.Marshal(r)
return marshal
}
type Batch struct {
ID string `json:"id"`
Object string `json:"object"`
Endpoint BatchEndpoint `json:"endpoint"`
Errors *struct {
Object string `json:"object,omitempty"`
Data struct {
Code string `json:"code,omitempty"`
Message string `json:"message,omitempty"`
Param *string `json:"param,omitempty"`
Line *int `json:"line,omitempty"`
} `json:"data"`
} `json:"errors"`
InputFileID string `json:"input_file_id"`
CompletionWindow string `json:"completion_window"`
Status string `json:"status"`
OutputFileID *string `json:"output_file_id"`
ErrorFileID *string `json:"error_file_id"`
CreatedAt int `json:"created_at"`
InProgressAt *int `json:"in_progress_at"`
ExpiresAt *int `json:"expires_at"`
FinalizingAt *int `json:"finalizing_at"`
CompletedAt *int `json:"completed_at"`
FailedAt *int `json:"failed_at"`
ExpiredAt *int `json:"expired_at"`
CancellingAt *int `json:"cancelling_at"`
CancelledAt *int `json:"cancelled_at"`
RequestCounts BatchRequestCounts `json:"request_counts"`
Metadata map[string]any `json:"metadata"`
}
type BatchRequestCounts struct {
Total int `json:"total"`
Completed int `json:"completed"`
Failed int `json:"failed"`
}
type CreateBatchRequest struct {
InputFileID string `json:"input_file_id"`
Endpoint BatchEndpoint `json:"endpoint"`
CompletionWindow string `json:"completion_window"`
Metadata map[string]any `json:"metadata"`
}
type BatchResponse struct {
httpHeader
Batch
}
var ErrUploadBatchFileFailed = errors.New("upload batch file failed")
// CreateBatch — API call to Create batch.
func (c *Client) CreateBatch(
ctx context.Context,
request CreateBatchRequest,
) (response BatchResponse, err error) {
if request.CompletionWindow == "" {
request.CompletionWindow = "24h"
}
req, err := c.newRequest(ctx, http.MethodPost, c.fullURL(batchesSuffix), withBody(request))
if err != nil {
return
}
err = c.sendRequest(req, &response)
return
}
type UploadBatchFileRequest struct {
FileName string
Lines []BatchLineItem
}
func (r *UploadBatchFileRequest) MarshalJSONL() []byte {
buff := bytes.Buffer{}
for i, line := range r.Lines {
if i != 0 {
buff.Write([]byte("\n"))
}
buff.Write(line.MarshalBatchLineItem())
}
return buff.Bytes()
}
func (r *UploadBatchFileRequest) AddChatCompletion(customerID string, body ChatCompletionRequest) {
r.Lines = append(r.Lines, BatchChatCompletionRequest{
CustomID: customerID,
Body: body,
Method: "POST",
URL: BatchEndpointChatCompletions,
})
}
func (r *UploadBatchFileRequest) AddCompletion(customerID string, body CompletionRequest) {
r.Lines = append(r.Lines, BatchCompletionRequest{
CustomID: customerID,
Body: body,
Method: "POST",
URL: BatchEndpointCompletions,
})
}
func (r *UploadBatchFileRequest) AddEmbedding(customerID string, body EmbeddingRequest) {
r.Lines = append(r.Lines, BatchEmbeddingRequest{
CustomID: customerID,
Body: body,
Method: "POST",
URL: BatchEndpointEmbeddings,
})
}
// UploadBatchFile — upload batch file.
func (c *Client) UploadBatchFile(ctx context.Context, request UploadBatchFileRequest) (File, error) {
if request.FileName == "" {
request.FileName = "@batchinput.jsonl"
}
return c.CreateFileBytes(ctx, FileBytesRequest{
Name: request.FileName,
Bytes: request.MarshalJSONL(),
Purpose: PurposeBatch,
})
}
type CreateBatchWithUploadFileRequest struct {
Endpoint BatchEndpoint `json:"endpoint"`
CompletionWindow string `json:"completion_window"`
Metadata map[string]any `json:"metadata"`
UploadBatchFileRequest
}
// CreateBatchWithUploadFile — API call to Create batch with upload file.
func (c *Client) CreateBatchWithUploadFile(
ctx context.Context,
request CreateBatchWithUploadFileRequest,
) (response BatchResponse, err error) {
var file File
file, err = c.UploadBatchFile(ctx, UploadBatchFileRequest{
FileName: request.FileName,
Lines: request.Lines,
})
if err != nil {
err = errors.Join(ErrUploadBatchFileFailed, err)
return
}
return c.CreateBatch(ctx, CreateBatchRequest{
InputFileID: file.ID,
Endpoint: request.Endpoint,
CompletionWindow: request.CompletionWindow,
Metadata: request.Metadata,
})
}
// RetrieveBatch — API call to Retrieve batch.
func (c *Client) RetrieveBatch(
ctx context.Context,
batchID string,
) (response BatchResponse, err error) {
urlSuffix := fmt.Sprintf("%s/%s", batchesSuffix, batchID)
req, err := c.newRequest(ctx, http.MethodGet, c.fullURL(urlSuffix))
if err != nil {
return
}
err = c.sendRequest(req, &response)
return
}
// CancelBatch — API call to Cancel batch.
func (c *Client) CancelBatch(
ctx context.Context,
batchID string,
) (response BatchResponse, err error) {
urlSuffix := fmt.Sprintf("%s/%s/cancel", batchesSuffix, batchID)
req, err := c.newRequest(ctx, http.MethodPost, c.fullURL(urlSuffix))
if err != nil {
return
}
err = c.sendRequest(req, &response)
return
}
type ListBatchResponse struct {
httpHeader
Object string `json:"object"`
Data []Batch `json:"data"`
FirstID string `json:"first_id"`
LastID string `json:"last_id"`
HasMore bool `json:"has_more"`
}
// ListBatch API call to List batch.
func (c *Client) ListBatch(ctx context.Context, after *string, limit *int) (response ListBatchResponse, err error) {
urlValues := url.Values{}
if limit != nil {
urlValues.Add("limit", fmt.Sprintf("%d", *limit))
}
if after != nil {
urlValues.Add("after", *after)
}
encodedValues := ""
if len(urlValues) > 0 {
encodedValues = "?" + urlValues.Encode()
}
urlSuffix := fmt.Sprintf("%s%s", batchesSuffix, encodedValues)
req, err := c.newRequest(ctx, http.MethodGet, c.fullURL(urlSuffix))
if err != nil {
return
}
err = c.sendRequest(req, &response)
return
}

368
batch_test.go Normal file
View File

@@ -0,0 +1,368 @@
package openai_test
import (
"context"
"fmt"
"net/http"
"reflect"
"testing"
"github.com/sashabaranov/go-openai"
"github.com/sashabaranov/go-openai/internal/test/checks"
)
func TestUploadBatchFile(t *testing.T) {
client, server, teardown := setupOpenAITestServer()
defer teardown()
server.RegisterHandler("/v1/files", handleCreateFile)
req := openai.UploadBatchFileRequest{}
req.AddChatCompletion("req-1", openai.ChatCompletionRequest{
MaxTokens: 5,
Model: openai.GPT3Dot5Turbo,
Messages: []openai.ChatCompletionMessage{
{
Role: openai.ChatMessageRoleUser,
Content: "Hello!",
},
},
})
_, err := client.UploadBatchFile(context.Background(), req)
checks.NoError(t, err, "UploadBatchFile error")
}
func TestCreateBatch(t *testing.T) {
client, server, teardown := setupOpenAITestServer()
defer teardown()
server.RegisterHandler("/v1/batches", handleBatchEndpoint)
_, err := client.CreateBatch(context.Background(), openai.CreateBatchRequest{
InputFileID: "file-abc",
Endpoint: openai.BatchEndpointChatCompletions,
CompletionWindow: "24h",
})
checks.NoError(t, err, "CreateBatch error")
}
func TestCreateBatchWithUploadFile(t *testing.T) {
client, server, teardown := setupOpenAITestServer()
defer teardown()
server.RegisterHandler("/v1/files", handleCreateFile)
server.RegisterHandler("/v1/batches", handleBatchEndpoint)
req := openai.CreateBatchWithUploadFileRequest{
Endpoint: openai.BatchEndpointChatCompletions,
}
req.AddChatCompletion("req-1", openai.ChatCompletionRequest{
MaxTokens: 5,
Model: openai.GPT3Dot5Turbo,
Messages: []openai.ChatCompletionMessage{
{
Role: openai.ChatMessageRoleUser,
Content: "Hello!",
},
},
})
_, err := client.CreateBatchWithUploadFile(context.Background(), req)
checks.NoError(t, err, "CreateBatchWithUploadFile error")
}
func TestRetrieveBatch(t *testing.T) {
client, server, teardown := setupOpenAITestServer()
defer teardown()
server.RegisterHandler("/v1/batches/file-id-1", handleRetrieveBatchEndpoint)
_, err := client.RetrieveBatch(context.Background(), "file-id-1")
checks.NoError(t, err, "RetrieveBatch error")
}
func TestCancelBatch(t *testing.T) {
client, server, teardown := setupOpenAITestServer()
defer teardown()
server.RegisterHandler("/v1/batches/file-id-1/cancel", handleCancelBatchEndpoint)
_, err := client.CancelBatch(context.Background(), "file-id-1")
checks.NoError(t, err, "RetrieveBatch error")
}
func TestListBatch(t *testing.T) {
client, server, teardown := setupOpenAITestServer()
defer teardown()
server.RegisterHandler("/v1/batches", handleBatchEndpoint)
after := "batch_abc123"
limit := 10
_, err := client.ListBatch(context.Background(), &after, &limit)
checks.NoError(t, err, "RetrieveBatch error")
}
func TestUploadBatchFileRequest_AddChatCompletion(t *testing.T) {
type args struct {
customerID string
body openai.ChatCompletionRequest
}
tests := []struct {
name string
args []args
want []byte
}{
{"", []args{
{
customerID: "req-1",
body: openai.ChatCompletionRequest{
MaxTokens: 5,
Model: openai.GPT3Dot5Turbo,
Messages: []openai.ChatCompletionMessage{
{
Role: openai.ChatMessageRoleUser,
Content: "Hello!",
},
},
},
},
{
customerID: "req-2",
body: openai.ChatCompletionRequest{
MaxTokens: 5,
Model: openai.GPT3Dot5Turbo,
Messages: []openai.ChatCompletionMessage{
{
Role: openai.ChatMessageRoleUser,
Content: "Hello!",
},
},
},
},
}, []byte("{\"custom_id\":\"req-1\",\"body\":{\"model\":\"gpt-3.5-turbo\",\"messages\":[{\"role\":\"user\",\"content\":\"Hello!\"}],\"max_tokens\":5},\"method\":\"POST\",\"url\":\"/v1/chat/completions\"}\n{\"custom_id\":\"req-2\",\"body\":{\"model\":\"gpt-3.5-turbo\",\"messages\":[{\"role\":\"user\",\"content\":\"Hello!\"}],\"max_tokens\":5},\"method\":\"POST\",\"url\":\"/v1/chat/completions\"}")}, //nolint:lll
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &openai.UploadBatchFileRequest{}
for _, arg := range tt.args {
r.AddChatCompletion(arg.customerID, arg.body)
}
got := r.MarshalJSONL()
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("Marshal() got = %v, want %v", got, tt.want)
}
})
}
}
func TestUploadBatchFileRequest_AddCompletion(t *testing.T) {
type args struct {
customerID string
body openai.CompletionRequest
}
tests := []struct {
name string
args []args
want []byte
}{
{"", []args{
{
customerID: "req-1",
body: openai.CompletionRequest{
Model: openai.GPT3Dot5Turbo,
User: "Hello",
},
},
{
customerID: "req-2",
body: openai.CompletionRequest{
Model: openai.GPT3Dot5Turbo,
User: "Hello",
},
},
}, []byte("{\"custom_id\":\"req-1\",\"body\":{\"model\":\"gpt-3.5-turbo\",\"user\":\"Hello\"},\"method\":\"POST\",\"url\":\"/v1/completions\"}\n{\"custom_id\":\"req-2\",\"body\":{\"model\":\"gpt-3.5-turbo\",\"user\":\"Hello\"},\"method\":\"POST\",\"url\":\"/v1/completions\"}")}, //nolint:lll
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &openai.UploadBatchFileRequest{}
for _, arg := range tt.args {
r.AddCompletion(arg.customerID, arg.body)
}
got := r.MarshalJSONL()
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("Marshal() got = %v, want %v", got, tt.want)
}
})
}
}
func TestUploadBatchFileRequest_AddEmbedding(t *testing.T) {
type args struct {
customerID string
body openai.EmbeddingRequest
}
tests := []struct {
name string
args []args
want []byte
}{
{"", []args{
{
customerID: "req-1",
body: openai.EmbeddingRequest{
Model: openai.GPT3Dot5Turbo,
Input: []string{"Hello", "World"},
},
},
{
customerID: "req-2",
body: openai.EmbeddingRequest{
Model: openai.AdaEmbeddingV2,
Input: []string{"Hello", "World"},
},
},
}, []byte("{\"custom_id\":\"req-1\",\"body\":{\"input\":[\"Hello\",\"World\"],\"model\":\"gpt-3.5-turbo\",\"user\":\"\"},\"method\":\"POST\",\"url\":\"/v1/embeddings\"}\n{\"custom_id\":\"req-2\",\"body\":{\"input\":[\"Hello\",\"World\"],\"model\":\"text-embedding-ada-002\",\"user\":\"\"},\"method\":\"POST\",\"url\":\"/v1/embeddings\"}")}, //nolint:lll
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &openai.UploadBatchFileRequest{}
for _, arg := range tt.args {
r.AddEmbedding(arg.customerID, arg.body)
}
got := r.MarshalJSONL()
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("Marshal() got = %v, want %v", got, tt.want)
}
})
}
}
func handleBatchEndpoint(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost {
_, _ = fmt.Fprintln(w, `{
"id": "batch_abc123",
"object": "batch",
"endpoint": "/v1/completions",
"errors": null,
"input_file_id": "file-abc123",
"completion_window": "24h",
"status": "completed",
"output_file_id": "file-cvaTdG",
"error_file_id": "file-HOWS94",
"created_at": 1711471533,
"in_progress_at": 1711471538,
"expires_at": 1711557933,
"finalizing_at": 1711493133,
"completed_at": 1711493163,
"failed_at": null,
"expired_at": null,
"cancelling_at": null,
"cancelled_at": null,
"request_counts": {
"total": 100,
"completed": 95,
"failed": 5
},
"metadata": {
"customer_id": "user_123456789",
"batch_description": "Nightly eval job"
}
}`)
} else if r.Method == http.MethodGet {
_, _ = fmt.Fprintln(w, `{
"object": "list",
"data": [
{
"id": "batch_abc123",
"object": "batch",
"endpoint": "/v1/chat/completions",
"errors": null,
"input_file_id": "file-abc123",
"completion_window": "24h",
"status": "completed",
"output_file_id": "file-cvaTdG",
"error_file_id": "file-HOWS94",
"created_at": 1711471533,
"in_progress_at": 1711471538,
"expires_at": 1711557933,
"finalizing_at": 1711493133,
"completed_at": 1711493163,
"failed_at": null,
"expired_at": null,
"cancelling_at": null,
"cancelled_at": null,
"request_counts": {
"total": 100,
"completed": 95,
"failed": 5
},
"metadata": {
"customer_id": "user_123456789",
"batch_description": "Nightly job"
}
}
],
"first_id": "batch_abc123",
"last_id": "batch_abc456",
"has_more": true
}`)
}
}
func handleRetrieveBatchEndpoint(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet {
_, _ = fmt.Fprintln(w, `{
"id": "batch_abc123",
"object": "batch",
"endpoint": "/v1/completions",
"errors": null,
"input_file_id": "file-abc123",
"completion_window": "24h",
"status": "completed",
"output_file_id": "file-cvaTdG",
"error_file_id": "file-HOWS94",
"created_at": 1711471533,
"in_progress_at": 1711471538,
"expires_at": 1711557933,
"finalizing_at": 1711493133,
"completed_at": 1711493163,
"failed_at": null,
"expired_at": null,
"cancelling_at": null,
"cancelled_at": null,
"request_counts": {
"total": 100,
"completed": 95,
"failed": 5
},
"metadata": {
"customer_id": "user_123456789",
"batch_description": "Nightly eval job"
}
}`)
}
}
func handleCancelBatchEndpoint(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost {
_, _ = fmt.Fprintln(w, `{
"id": "batch_abc123",
"object": "batch",
"endpoint": "/v1/chat/completions",
"errors": null,
"input_file_id": "file-abc123",
"completion_window": "24h",
"status": "cancelling",
"output_file_id": null,
"error_file_id": null,
"created_at": 1711471533,
"in_progress_at": 1711471538,
"expires_at": 1711557933,
"finalizing_at": null,
"completed_at": null,
"failed_at": null,
"expired_at": null,
"cancelling_at": 1711475133,
"cancelled_at": null,
"request_counts": {
"total": 100,
"completed": 23,
"failed": 1
},
"metadata": {
"customer_id": "user_123456789",
"batch_description": "Nightly eval job"
}
}`)
}
}

View File

@@ -396,6 +396,17 @@ func TestClientReturnsRequestBuilderErrors(t *testing.T) {
{"CreateSpeech", func() (any, error) { {"CreateSpeech", func() (any, error) {
return client.CreateSpeech(ctx, CreateSpeechRequest{Model: TTSModel1, Voice: VoiceAlloy}) return client.CreateSpeech(ctx, CreateSpeechRequest{Model: TTSModel1, Voice: VoiceAlloy})
}}, }},
{"CreateBatch", func() (any, error) {
return client.CreateBatch(ctx, CreateBatchRequest{})
}},
{"CreateBatchWithUploadFile", func() (any, error) {
return client.CreateBatchWithUploadFile(ctx, CreateBatchWithUploadFileRequest{})
}},
{"RetrieveBatch", func() (any, error) {
return client.RetrieveBatch(ctx, "")
}},
{"CancelBatch", func() (any, error) { return client.CancelBatch(ctx, "") }},
{"ListBatch", func() (any, error) { return client.ListBatch(ctx, nil, nil) }},
} }
for _, testCase := range testCases { for _, testCase := range testCases {

View File

@@ -22,6 +22,7 @@ const (
PurposeFineTuneResults PurposeType = "fine-tune-results" PurposeFineTuneResults PurposeType = "fine-tune-results"
PurposeAssistants PurposeType = "assistants" PurposeAssistants PurposeType = "assistants"
PurposeAssistantsOutput PurposeType = "assistants_output" PurposeAssistantsOutput PurposeType = "assistants_output"
PurposeBatch PurposeType = "batch"
) )
// FileBytesRequest represents a file upload request. // FileBytesRequest represents a file upload request.